From e228c99b8319bc4ee5d66eb0b36ac45faa1f735c Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Sun, 5 Jan 2020 13:17:07 -0500 Subject: [PATCH] partial refactor to separate taskdb and storage --- src/lib.rs | 1 + src/operation.rs | 2 +- src/replica.rs | 21 ++-- src/taskdb.rs | 142 +++++++++++++++---------- src/taskstorage/mod.rs | 100 +++++++++++++++++ tests/operation_transform_invariant.rs | 2 +- tests/sync.rs | 8 +- tests/sync_action_sequences.rs | 10 +- 8 files changed, 208 insertions(+), 78 deletions(-) create mode 100644 src/taskstorage/mod.rs diff --git a/src/lib.rs b/src/lib.rs index 8ead13f9d..e5374f4f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ mod replica; mod server; mod task; mod taskdb; +mod taskstorage; mod tdb2; pub use operation::Operation; diff --git a/src/operation.rs b/src/operation.rs index 4fe44df21..f21c5b465 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -164,7 +164,7 @@ mod test { db2.apply(o).unwrap(); } - assert_eq!(db1.tasks(), db2.tasks()); + assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); } #[test] diff --git a/src/replica.rs b/src/replica.rs index 419dbf2be..3c98c2db1 100644 --- a/src/replica.rs +++ b/src/replica.rs @@ -1,8 +1,8 @@ use crate::operation::Operation; use crate::taskdb::DB; +use crate::taskstorage::TaskMap; use chrono::Utc; use failure::Fallible; -use std::collections::HashMap; use uuid::Uuid; /// A replica represents an instance of a user's task data. @@ -47,13 +47,18 @@ impl Replica { } /// Get all tasks as an iterator of (&Uuid, &HashMap) - pub fn all_tasks(&self) -> impl Iterator)> { - self.taskdb.tasks().iter() + pub fn all_tasks<'a>(&'a self) -> impl Iterator + 'a { + self.taskdb.all_tasks() + } + + /// Get the UUIDs of all tasks + pub fn all_task_uuids<'a>(&'a self) -> impl Iterator + 'a { + self.taskdb.all_task_uuids() } /// Get an existing task by its UUID - pub fn get_task(&self, uuid: &Uuid) -> Option<&HashMap> { - self.taskdb.tasks().get(&uuid) + pub fn get_task(&self, uuid: &Uuid) -> Option { + self.taskdb.get_task(&uuid) } } @@ -69,7 +74,7 @@ mod tests { let uuid = Uuid::new_v4(); rep.create_task(uuid.clone()).unwrap(); - assert_eq!(rep.get_task(&uuid), Some(&HashMap::new())); + assert_eq!(rep.get_task(&uuid), Some(TaskMap::new())); } #[test] @@ -90,9 +95,9 @@ mod tests { rep.create_task(uuid.clone()).unwrap(); rep.update_task(uuid.clone(), "title", Some("snarsblat")) .unwrap(); - let mut task = HashMap::new(); + let mut task = TaskMap::new(); task.insert("title".into(), "snarsblat".into()); - assert_eq!(rep.get_task(&uuid), Some(&task)); + assert_eq!(rep.get_task(&uuid), Some(task)); } #[test] diff --git a/src/taskdb.rs b/src/taskdb.rs index 103b9f4e6..8f77e5fed 100644 --- a/src/taskdb.rs +++ b/src/taskdb.rs @@ -1,27 +1,16 @@ use crate::errors::Error; use crate::operation::Operation; use crate::server::{Server, VersionAdd}; +use crate::taskstorage::{InMemoryStorage, TaskMap}; use failure::Fallible; use serde::{Deserialize, Serialize}; -use std::collections::hash_map::Entry; use std::collections::HashMap; use std::str; use uuid::Uuid; -type TaskMap = HashMap; - -#[derive(PartialEq, Debug, Clone)] +#[derive(Debug, Clone)] pub struct DB { - // The current state, with all pending operations applied - tasks: HashMap, - - // The version at which `operations` begins - base_version: u64, - - // Operations applied since `base_version`, in order. - // - // INVARIANT: Given a snapshot at `base_version`, applying these operations produces `tasks`. - operations: Vec, + storage: InMemoryStorage, } #[derive(Serialize, Deserialize, Debug)] @@ -34,9 +23,7 @@ impl DB { /// Create a new, empty database pub fn new() -> DB { DB { - tasks: HashMap::new(), - base_version: 0, - operations: vec![], + storage: InMemoryStorage::new(), } } @@ -47,7 +34,7 @@ impl DB { if let err @ Err(_) = self.apply_op(&op) { return err; } - self.operations.push(op); + self.storage.add_operation(op); Ok(()) } @@ -55,14 +42,12 @@ impl DB { match op { &Operation::Create { uuid } => { // insert if the task does not already exist - if let ent @ Entry::Vacant(_) = self.tasks.entry(uuid) { - ent.or_insert(HashMap::new()); - } else { + if !self.storage.create_task(uuid, HashMap::new()) { return Err(Error::DBError(format!("Task {} already exists", uuid)).into()); } } &Operation::Delete { ref uuid } => { - if let None = self.tasks.remove(uuid) { + if !self.storage.delete_task(uuid) { return Err(Error::DBError(format!("Task {} does not exist", uuid)).into()); } } @@ -73,11 +58,13 @@ impl DB { timestamp: _, } => { // update if this task exists, otherwise ignore - if let Some(task) = self.tasks.get_mut(uuid) { + if let Some(task) = self.storage.get_task(uuid) { + let mut task = task.clone(); match value { Some(ref val) => task.insert(property.to_string(), val.clone()), None => task.remove(property), }; + self.storage.set_task(uuid.clone(), task); } else { return Err(Error::DBError(format!("Task {} does not exist", uuid)).into()); } @@ -87,41 +74,55 @@ impl DB { Ok(()) } - /// Get a read-only reference to the underlying set of tasks. - /// - /// This API is temporary, but provides query access to the DB. - pub fn tasks(&self) -> &HashMap { - &self.tasks + /// Get all tasks. This is not a terribly efficient operation. + pub fn all_tasks<'a>(&'a self) -> impl Iterator + 'a { + self.all_task_uuids() + .map(move |u| (u, self.get_task(&u).unwrap())) + } + + /// Get the UUIDs of all tasks + pub fn all_task_uuids<'a>(&'a self) -> impl Iterator + 'a { + self.storage.get_task_uuids() + } + + /// Get a single task, by uuid. + pub fn get_task(&self, uuid: &Uuid) -> Option { + self.storage.get_task(uuid) } /// Sync to the given server, pulling remote changes and pushing local changes. pub fn sync(&mut self, username: &str, server: &mut Server) { + // retry synchronizing until the server accepts our version (this allows for races between + // replicas trying to sync to the same server) loop { // first pull changes and "rebase" on top of them - let new_versions = server.get_versions(username, self.base_version); + let new_versions = server.get_versions(username, self.storage.base_version()); for version_blob in new_versions { let version_str = str::from_utf8(&version_blob).unwrap(); let version: Version = serde_json::from_str(version_str).unwrap(); - assert_eq!(version.version, self.base_version + 1); + assert_eq!(version.version, self.storage.base_version() + 1); println!("applying version {:?} from server", version.version); self.apply_version(version); } - if self.operations.len() == 0 { + let operations: Vec = self.storage.operations().map(|o| o.clone()).collect(); + if operations.len() == 0 { + // nothing to sync back to the server.. break; } // now make a version of our local changes and push those let new_version = Version { - version: self.base_version + 1, - operations: self.operations.clone(), + version: self.storage.base_version() + 1, + operations: operations, }; let new_version_str = serde_json::to_string(&new_version).unwrap(); println!("sending version {:?} to server", new_version.version); if let VersionAdd::Ok = server.add_version(username, new_version.version, new_version_str.into()) { + self.storage.local_operations_synced(new_version.version); break; } } @@ -153,10 +154,12 @@ impl DB { // This is slightly complicated by the fact that the transform function can return None, // indicating no operation is required. If this happens for a local op, we can just omit // it. If it happens for server op, then we must copy the remaining local ops. + let mut local_operations: Vec = + self.storage.operations().map(|o| o.clone()).collect(); for server_op in version.operations.drain(..) { - let mut new_local_ops = Vec::with_capacity(self.operations.len()); + let mut new_local_ops = Vec::with_capacity(local_operations.len()); let mut svr_op = Some(server_op); - for local_op in self.operations.drain(..) { + for local_op in local_operations.drain(..) { if let Some(o) = svr_op { let (new_server_op, new_local_op) = Operation::transform(o, local_op); svr_op = new_server_op; @@ -172,9 +175,32 @@ impl DB { println!("Invalid operation when syncing: {} (ignored)", e); } } - self.operations = new_local_ops; + local_operations = new_local_ops; } - self.base_version = version.version; + self.storage + .update_version(version.version, local_operations); + } + + // functions for supporting tests + + pub fn sorted_tasks(&self) -> Vec<(Uuid, Vec<(String, String)>)> { + let mut res: Vec<(Uuid, Vec<(String, String)>)> = self + .all_tasks() + .map(|(u, t)| { + let mut t = t + .iter() + .map(|(p, v)| (p.clone(), v.clone())) + .collect::>(); + t.sort(); + (u, t) + }) + .collect(); + res.sort(); + res + } + + pub fn operations(&self) -> Vec { + self.storage.operations().map(|o| o.clone()).collect() } } @@ -191,10 +217,8 @@ mod tests { let op = Operation::Create { uuid }; db.apply(op.clone()).unwrap(); - let mut exp = HashMap::new(); - exp.insert(uuid, HashMap::new()); - assert_eq!(db.tasks(), &exp); - assert_eq!(db.operations, vec![op]); + assert_eq!(db.sorted_tasks(), vec![(uuid, vec![]),]); + assert_eq!(db.operations(), vec![op]); } #[test] @@ -208,10 +232,8 @@ mod tests { format!("Task Database Error: Task {} already exists", uuid) ); - let mut exp = HashMap::new(); - exp.insert(uuid, HashMap::new()); - assert_eq!(db.tasks(), &exp); - assert_eq!(db.operations, vec![op]); + assert_eq!(db.sorted_tasks(), vec![(uuid, vec![])]); + assert_eq!(db.operations(), vec![op]); } #[test] @@ -228,12 +250,11 @@ mod tests { }; db.apply(op2.clone()).unwrap(); - let mut exp = HashMap::new(); - let mut task = HashMap::new(); - task.insert(String::from("title"), String::from("my task")); - exp.insert(uuid, task); - assert_eq!(db.tasks(), &exp); - assert_eq!(db.operations, vec![op1, op2]); + assert_eq!( + db.sorted_tasks(), + vec![(uuid, vec![("title".into(), "my task".into())])] + ); + assert_eq!(db.operations(), vec![op1, op2]); } #[test] @@ -271,8 +292,11 @@ mod tests { let mut task = HashMap::new(); task.insert(String::from("priority"), String::from("H")); exp.insert(uuid, task); - assert_eq!(db.tasks(), &exp); - assert_eq!(db.operations, vec![op1, op2, op3, op4]); + assert_eq!( + db.sorted_tasks(), + vec![(uuid, vec![("priority".into(), "H".into())])] + ); + assert_eq!(db.operations(), vec![op1, op2, op3, op4]); } #[test] @@ -290,8 +314,8 @@ mod tests { format!("Task Database Error: Task {} does not exist", uuid) ); - assert_eq!(db.tasks(), &HashMap::new()); - assert_eq!(db.operations, vec![]); + assert_eq!(db.sorted_tasks(), vec![]); + assert_eq!(db.operations(), vec![]); } #[test] @@ -304,8 +328,8 @@ mod tests { let op2 = Operation::Delete { uuid }; db.apply(op2.clone()).unwrap(); - assert_eq!(db.tasks(), &HashMap::new()); - assert_eq!(db.operations, vec![op1, op2]); + assert_eq!(db.sorted_tasks(), vec![]); + assert_eq!(db.operations(), vec![op1, op2]); } #[test] @@ -319,7 +343,7 @@ mod tests { format!("Task Database Error: Task {} does not exist", uuid) ); - assert_eq!(db.tasks(), &HashMap::new()); - assert_eq!(db.operations, vec![]); + assert_eq!(db.sorted_tasks(), vec![]); + assert_eq!(db.operations(), vec![]); } } diff --git a/src/taskstorage/mod.rs b/src/taskstorage/mod.rs new file mode 100644 index 000000000..bbc7707a2 --- /dev/null +++ b/src/taskstorage/mod.rs @@ -0,0 +1,100 @@ +use crate::operation::Operation; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use uuid::Uuid; + +pub type TaskMap = HashMap; + +#[derive(PartialEq, Debug, Clone)] +pub struct InMemoryStorage { + // The current state, with all pending operations applied + tasks: HashMap, + + // The version at which `operations` begins + base_version: u64, + + // Operations applied since `base_version`, in order. + // + // INVARIANT: Given a snapshot at `base_version`, applying these operations produces `tasks`. + operations: Vec, +} + +impl InMemoryStorage { + pub fn new() -> InMemoryStorage { + InMemoryStorage { + tasks: HashMap::new(), + base_version: 0, + operations: vec![], + } + } + + /// Get an (immutable) task, if it is in the storage + pub fn get_task(&self, uuid: &Uuid) -> Option { + match self.tasks.get(uuid) { + None => None, + Some(t) => Some(t.clone()), + } + } + + /// Create a task, only if it does not already exist. Returns true if + /// the task was created (did not already exist). + pub fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> bool { + if let ent @ Entry::Vacant(_) = self.tasks.entry(uuid) { + ent.or_insert(task); + true + } else { + false + } + } + + /// Set a task, overwriting any existing task. + pub fn set_task(&mut self, uuid: Uuid, task: TaskMap) { + self.tasks.insert(uuid, task); + } + + /// Delete a task, if it exists. Returns true if the task was deleted (already existed) + pub fn delete_task(&mut self, uuid: &Uuid) -> bool { + if let Some(_) = self.tasks.remove(uuid) { + true + } else { + false + } + } + + pub fn get_task_uuids<'a>(&'a self) -> impl Iterator + 'a { + self.tasks.keys().map(|u| u.clone()) + } + + /// Add an operation to the list of operations in the storage. Note that this merely *stores* + /// the operation; it is up to the TaskDB to apply it. + pub fn add_operation(&mut self, op: Operation) { + self.operations.push(op); + } + + /// Get the current base_version for this storage -- the last version synced from the server. + pub fn base_version(&self) -> u64 { + return self.base_version; + } + + /// Get the current set of outstanding operations (operations that have not been sync'd to the + /// server yet) + pub fn operations(&self) -> impl Iterator { + self.operations.iter() + } + + /// Apply the next version from the server. This replaces the existing base_version and + /// operations. It's up to the caller (TaskDB) to ensure this is done consistently. + pub(crate) fn update_version(&mut self, version: u64, new_operations: Vec) { + // ensure that we are applying the versions in order.. + assert_eq!(version, self.base_version + 1); + self.base_version = version; + self.operations = new_operations; + } + + /// Record the outstanding operations as synced to the server in the given version. + pub(crate) fn local_operations_synced(&mut self, version: u64) { + assert_eq!(version, self.base_version + 1); + self.base_version = version; + self.operations = vec![]; + } +} diff --git a/tests/operation_transform_invariant.rs b/tests/operation_transform_invariant.rs index 8c7dd39af..f76850891 100644 --- a/tests/operation_transform_invariant.rs +++ b/tests/operation_transform_invariant.rs @@ -73,6 +73,6 @@ proptest! { if let Some(o) = o1p { db2.apply(o).map_err(|e| TestCaseError::Fail(format!("Applying to db2: {}", e).into()))?; } - assert_eq!(db1.tasks(), db2.tasks()); + assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); } } diff --git a/tests/sync.rs b/tests/sync.rs index 76049aed0..aed8bbc47 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -37,7 +37,7 @@ fn test_sync() { db1.sync("me", &mut server); db2.sync("me", &mut server); db1.sync("me", &mut server); - assert_eq!(db1.tasks(), db2.tasks()); + assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); // now make updates to the same task on both sides db1.apply(Operation::Update { @@ -59,7 +59,7 @@ fn test_sync() { db1.sync("me", &mut server); db2.sync("me", &mut server); db1.sync("me", &mut server); - assert_eq!(db1.tasks(), db2.tasks()); + assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); } #[test] @@ -87,7 +87,7 @@ fn test_sync_create_delete() { db1.sync("me", &mut server); db2.sync("me", &mut server); db1.sync("me", &mut server); - assert_eq!(db1.tasks(), db2.tasks()); + assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); // delete and re-create the task on db1 db1.apply(Operation::Delete { uuid }).unwrap(); @@ -112,5 +112,5 @@ fn test_sync_create_delete() { db1.sync("me", &mut server); db2.sync("me", &mut server); db1.sync("me", &mut server); - assert_eq!(db1.tasks(), db2.tasks()); + assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); } diff --git a/tests/sync_action_sequences.rs b/tests/sync_action_sequences.rs index ab02342ef..7a92d6e89 100644 --- a/tests/sync_action_sequences.rs +++ b/tests/sync_action_sequences.rs @@ -59,11 +59,11 @@ proptest! { } } - println!("{:?}", dbs[0].tasks()); - println!("{:?}", dbs[1].tasks()); - println!("{:?}", dbs[2].tasks()); + println!("{:?}", dbs[0]); + println!("{:?}", dbs[1]); + println!("{:?}", dbs[2]); - assert_eq!(dbs[0].tasks(), dbs[1].tasks()); - assert_eq!(dbs[1].tasks(), dbs[2].tasks()); + assert_eq!(dbs[0].sorted_tasks(), dbs[0].sorted_tasks()); + assert_eq!(dbs[1].sorted_tasks(), dbs[2].sorted_tasks()); } }