refactor sync to new model

This commit is contained in:
Dustin J. Mitchell
2020-11-25 17:52:47 -05:00
parent e92fc0628b
commit a81c84d7c7
8 changed files with 193 additions and 143 deletions

View File

@@ -1,5 +1,5 @@
use crate::errors::Error;
use crate::server::{Server, VersionAdd};
use crate::server::{AddVersionResult, GetVersionResult, Server};
use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn};
use failure::Fallible;
use serde::{Deserialize, Serialize};
@@ -13,7 +13,6 @@ pub struct TaskDB {
#[derive(Serialize, Deserialize, Debug)]
struct Version {
version: u64,
operations: Vec<Operation>,
}
@@ -165,21 +164,34 @@ impl TaskDB {
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub fn sync(&mut self, username: &str, server: &mut dyn Server) -> Fallible<()> {
pub fn sync(&mut self, server: &mut dyn Server) -> Fallible<()> {
let mut txn = self.storage.txn()?;
// retry synchronizing until the server accepts our version (this allows for races between
// replicas trying to sync to the same server)
loop {
// first pull changes and "rebase" on top of them
let new_versions = server.get_versions(username, txn.base_version()?)?;
for version_blob in new_versions {
let version_str = str::from_utf8(&version_blob).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();
assert_eq!(version.version, txn.base_version()? + 1);
println!("applying version {:?} from server", version.version);
let mut base_version_id = txn.base_version()?;
TaskDB::apply_version(txn.as_mut(), version)?;
// first pull changes and "rebase" on top of them
loop {
if let GetVersionResult::Version {
version_id,
history_segment,
..
} = server.get_child_version(base_version_id)?
{
let version_str = str::from_utf8(&history_segment).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();
println!("applying version {:?} from server", version_id);
// apply this verison and update base_version in storage
TaskDB::apply_version(txn.as_mut(), version)?;
txn.set_base_version(version_id)?;
base_version_id = version_id;
} else {
// at the moment, no more child versions, so we can try adding our own
break;
}
}
let operations: Vec<Operation> = txn.operations()?.to_vec();
@@ -189,18 +201,23 @@ impl TaskDB {
}
// now make a version of our local changes and push those
let new_version = Version {
version: txn.base_version()? + 1,
operations,
};
let new_version_str = serde_json::to_string(&new_version).unwrap();
println!("sending version {:?} to server", new_version.version);
if let VersionAdd::Ok =
server.add_version(username, new_version.version, new_version_str.into())?
{
txn.set_base_version(new_version.version)?;
txn.set_operations(vec![])?;
break;
let new_version = Version { operations };
let history_segment = serde_json::to_string(&new_version).unwrap().into();
println!("sending new version to server");
match server.add_version(base_version_id, history_segment)? {
AddVersionResult::Ok(new_version_id) => {
println!("version {:?} received by server", new_version_id);
txn.set_base_version(new_version_id)?;
txn.set_operations(vec![])?;
break;
}
AddVersionResult::ExpectedParentVersion(parent_version_id) => {
println!(
"new version rejected; must be based on {:?}",
parent_version_id
);
// ..continue the outer loop
}
}
}
@@ -256,7 +273,6 @@ impl TaskDB {
}
local_operations = new_local_ops;
}
txn.set_base_version(version.version)?;
txn.set_operations(local_operations)?;
Ok(())
}
@@ -518,10 +534,10 @@ mod tests {
let mut server = TestServer::new();
let mut db1 = newdb();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
let mut db2 = newdb();
db2.sync("me", &mut server).unwrap();
db2.sync(&mut server).unwrap();
// make some changes in parallel to db1 and db2..
let uuid1 = Uuid::new_v4();
@@ -545,9 +561,9 @@ mod tests {
.unwrap();
// and synchronize those around
db1.sync("me", &mut server).unwrap();
db2.sync("me", &mut server).unwrap();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// now make updates to the same task on both sides
@@ -567,9 +583,9 @@ mod tests {
.unwrap();
// and synchronize those around
db1.sync("me", &mut server).unwrap();
db2.sync("me", &mut server).unwrap();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
@@ -578,10 +594,10 @@ mod tests {
let mut server = TestServer::new();
let mut db1 = newdb();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
let mut db2 = newdb();
db2.sync("me", &mut server).unwrap();
db2.sync(&mut server).unwrap();
// create and update a task..
let uuid = Uuid::new_v4();
@@ -595,9 +611,9 @@ mod tests {
.unwrap();
// and synchronize those around
db1.sync("me", &mut server).unwrap();
db2.sync("me", &mut server).unwrap();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// delete and re-create the task on db1
@@ -620,9 +636,9 @@ mod tests {
})
.unwrap();
db1.sync("me", &mut server).unwrap();
db2.sync("me", &mut server).unwrap();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
@@ -678,7 +694,7 @@ mod tests {
println!(" {:?} (ignored)", e);
}
},
Action::Sync => db.sync("me", &mut server).unwrap(),
Action::Sync => db.sync(&mut server).unwrap(),
}
}