From 59f4e6abd7ff4dfd7aa75e87826560c6e0569dc0 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Sat, 28 Dec 2019 22:46:10 -0500 Subject: [PATCH] actually support synchronization --- Cargo.lock | 2 ++ Cargo.toml | 3 +- src/lib.rs | 2 ++ src/operation.rs | 3 +- src/server.rs | 87 ++++++++++++++++++++++++++++++++++++++++++++++++ src/taskdb.rs | 65 ++++++++++++++++++++++++++++++++++++ tests/sync.rs | 59 ++++++++++++++++++++++++++++++++ 7 files changed, 219 insertions(+), 2 deletions(-) create mode 100644 src/server.rs create mode 100644 tests/sync.rs diff --git a/Cargo.lock b/Cargo.lock index 828a6238d..7c27f39e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -73,6 +73,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "num-integer 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -163,6 +164,7 @@ dependencies = [ "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "proptest 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index f5e337fa0..a50095725 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,9 @@ edition = "2018" [dependencies] uuid = { version = "0.8.1", features = ["serde", "v4"] } +serde = "1.0.104" serde_json = "1.0" -chrono = "0.4.10" +chrono = { version = "0.4.10", features = ["serde"] } failure = {version = "0.1.5", features = ["derive"] } [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 98390d7c2..c2c80cfef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,9 @@ mod errors; mod operation; +mod server; mod taskdb; pub use operation::Operation; +pub use server::Server; pub use taskdb::DB; diff --git a/src/operation.rs b/src/operation.rs index 8aafbce5b..29b42a51f 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -1,8 +1,9 @@ use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; use serde_json::Value; use uuid::Uuid; -#[derive(PartialEq, Clone, Debug)] +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum Operation { Create { uuid: Uuid, diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 000000000..4a50d77a2 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,87 @@ +use std::collections::HashMap; + +type Blob = Vec; + +struct User { + // versions, indexed at v-1 + versions: Vec, + snapshots: HashMap, +} + +pub struct Server { + users: HashMap, +} + +pub enum VersionAdd { + // OK, version added + Ok, + // Rejected, must be based on the the given version + ExpectedVersion(u64), +} + +impl User { + fn new() -> User { + User { + versions: vec![], + snapshots: HashMap::new(), + } + } + + fn get_versions(&self, since_version: u64) -> Vec { + let last_version = self.versions.len(); + if last_version == since_version as usize { + return vec![]; + } + self.versions[since_version as usize..last_version] + .iter() + .map(|r| r.clone()) + .collect::>() + } + + fn add_version(&mut self, version: u64, blob: Blob) -> VersionAdd { + // of by one here: client wants to send version 1 first + let expected_version = self.versions.len() as u64 + 1; + if version != expected_version { + return VersionAdd::ExpectedVersion(expected_version); + } + self.versions.push(blob); + + VersionAdd::Ok + } + + fn add_snapshot(&mut self, version: u64, blob: Blob) { + self.snapshots.insert(version, blob); + } +} + +impl Server { + pub fn new() -> Server { + Server { + users: HashMap::new(), + } + } + + fn get_user_mut(&mut self, username: &str) -> &mut User { + self.users + .entry(username.to_string()) + .or_insert_with(User::new) + } + + /// Get a vector of all versions after `since_version` + pub fn get_versions(&self, username: &str, since_version: u64) -> Vec { + self.users + .get(username) + .map(|user| user.get_versions(since_version)) + .unwrap_or_else(|| vec![]) + } + + /// Add a new version. If the given version number is incorrect, this responds with the + /// appropriate version and expects the caller to try again. + pub fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> VersionAdd { + self.get_user_mut(username).add_version(version, blob) + } + + pub fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) { + self.get_user_mut(username).add_snapshot(version, blob); + } +} diff --git a/src/taskdb.rs b/src/taskdb.rs index 3417aa020..8af0bc823 100644 --- a/src/taskdb.rs +++ b/src/taskdb.rs @@ -1,7 +1,10 @@ use crate::operation::Operation; +use crate::server::{Server, VersionAdd}; +use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::str; use uuid::Uuid; #[derive(PartialEq, Debug, Clone)] @@ -18,6 +21,12 @@ pub struct DB { operations: Vec, } +#[derive(Serialize, Deserialize, Debug)] +struct Version { + version: u64, + operations: Vec, +} + impl DB { /// Create a new, empty database pub fn new() -> DB { @@ -60,6 +69,62 @@ impl DB { pub fn tasks(&self) -> &HashMap> { &self.tasks } + + /// Sync to the given server, pulling remote changes and pushing local changes. + pub fn sync(&mut self, username: &str, server: &mut Server) { + loop { + // first pull changes and "rebase" on top of them + let new_versions = server.get_versions(username, self.base_version); + for version_blob in new_versions { + let version_str = str::from_utf8(&version_blob).unwrap(); + let version: Version = serde_json::from_str(version_str).unwrap(); + assert_eq!(version.version, self.base_version + 1); + println!("applying version {:?} from server", version.version); + + self.apply_version(version); + } + + if self.operations.len() == 0 { + break; + } + + // now make a version of our local changes and push those + let new_version = Version { + version: self.base_version + 1, + operations: self.operations.clone(), + }; + let new_version_str = serde_json::to_string(&new_version).unwrap(); + println!("sending version {:?} to server", new_version.version); + if let VersionAdd::Ok = + server.add_version(username, new_version.version, new_version_str.into()) + { + break; + } + } + } + + fn apply_version(&mut self, mut version: Version) { + for server_op in version.operations.drain(..) { + let mut new_local_ops = Vec::with_capacity(self.operations.len()); + let mut svr_op = Some(server_op); + for local_op in self.operations.drain(..) { + if let Some(o) = svr_op { + let (new_server_op, new_local_op) = Operation::transform(o, local_op); + svr_op = new_server_op; + if let Some(o) = new_local_op { + new_local_ops.push(o); + } + } else { + new_local_ops.push(local_op); + } + } + if let Some(o) = svr_op { + self.apply(o); + } + self.operations = new_local_ops; + } + self.base_version = version.version; + } } #[cfg(test)] diff --git a/tests/sync.rs b/tests/sync.rs new file mode 100644 index 000000000..055f88779 --- /dev/null +++ b/tests/sync.rs @@ -0,0 +1,59 @@ +use chrono::Utc; +use ot::{Operation, Server, DB}; +use uuid::Uuid; + +#[test] +fn test_sync() { + let mut server = Server::new(); + + let mut db1 = DB::new(); + db1.sync("me", &mut server); + + let mut db2 = DB::new(); + db2.sync("me", &mut server); + + // make some changes in parallel to db1 and db2.. + let uuid1 = Uuid::new_v4(); + db1.apply(Operation::Create { uuid: uuid1 }); + db1.apply(Operation::Update { + uuid: uuid1, + property: "title".into(), + value: "my first task".into(), + timestamp: Utc::now(), + }); + + let uuid2 = Uuid::new_v4(); + db2.apply(Operation::Create { uuid: uuid2 }); + db2.apply(Operation::Update { + uuid: uuid2, + property: "title".into(), + value: "my second task".into(), + timestamp: Utc::now(), + }); + + // and synchronize those around + db1.sync("me", &mut server); + db2.sync("me", &mut server); + db1.sync("me", &mut server); + assert_eq!(db1.tasks(), db2.tasks()); + + // now make updates to the same task on both sides + db1.apply(Operation::Update { + uuid: uuid2, + property: "priority".into(), + value: "H".into(), + timestamp: Utc::now(), + }); + db2.apply(Operation::Update { + uuid: uuid2, + property: "project".into(), + value: "personal".into(), + timestamp: Utc::now(), + }); + + // and synchronize those around + db1.sync("me", &mut server); + db2.sync("me", &mut server); + db1.sync("me", &mut server); + assert_eq!(db1.tasks(), db2.tasks()); +}