From a0a3f36a16335fdb42f9ca53352b557d015fbc4a Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Thu, 7 Oct 2021 20:15:48 -0400 Subject: [PATCH] factor taskdb into multiple modules --- taskchampion/src/{taskdb.rs => taskdb/mod.rs} | 191 +----------------- taskchampion/src/taskdb/ops.rs | 37 ++++ taskchampion/src/taskdb/sync.rs | 145 +++++++++++++ 3 files changed, 192 insertions(+), 181 deletions(-) rename taskchampion/src/{taskdb.rs => taskdb/mod.rs} (71%) create mode 100644 taskchampion/src/taskdb/ops.rs create mode 100644 taskchampion/src/taskdb/sync.rs diff --git a/taskchampion/src/taskdb.rs b/taskchampion/src/taskdb/mod.rs similarity index 71% rename from taskchampion/src/taskdb.rs rename to taskchampion/src/taskdb/mod.rs index d611433f9..1b84f2c9b 100644 --- a/taskchampion/src/taskdb.rs +++ b/taskchampion/src/taskdb/mod.rs @@ -1,12 +1,11 @@ -use crate::errors::Error; -use crate::server::{AddVersionResult, GetVersionResult, Server}; -use crate::storage::{Operation, Storage, StorageTxn, TaskMap}; -use log::{info, trace, warn}; -use serde::{Deserialize, Serialize}; +use crate::server::Server; +use crate::storage::{Operation, Storage, TaskMap}; use std::collections::HashSet; -use std::str; use uuid::Uuid; +mod ops; +mod sync; + /// A TaskDb is the backend for a replica. It manages the storage, operations, synchronization, /// and so on, and all the invariants that come with it. It leaves the meaning of particular task /// properties to the replica and task implementations. @@ -14,11 +13,6 @@ pub struct TaskDb { storage: Box, } -#[derive(Serialize, Deserialize, Debug)] -struct Version { - operations: Vec, -} - impl TaskDb { /// Create a new TaskDb with the given backend storage pub fn new(storage: Box) -> TaskDb { @@ -36,7 +30,7 @@ impl TaskDb { pub fn apply(&mut self, op: Operation) -> anyhow::Result<()> { // TODO: differentiate error types here? let mut txn = self.storage.txn()?; - if let err @ Err(_) = TaskDb::apply_op(txn.as_mut(), &op) { + if let err @ Err(_) = ops::apply_op(txn.as_mut(), &op) { return err; } txn.add_operation(op)?; @@ -44,41 +38,6 @@ impl TaskDb { Ok(()) } - fn apply_op(txn: &mut dyn StorageTxn, op: &Operation) -> anyhow::Result<()> { - match op { - Operation::Create { uuid } => { - // insert if the task does not already exist - if !txn.create_task(*uuid)? { - return Err(Error::Database(format!("Task {} already exists", uuid)).into()); - } - } - Operation::Delete { ref uuid } => { - if !txn.delete_task(*uuid)? { - return Err(Error::Database(format!("Task {} does not exist", uuid)).into()); - } - } - Operation::Update { - ref uuid, - ref property, - ref value, - timestamp: _, - } => { - // update if this task exists, otherwise ignore - if let Some(mut task) = txn.get_task(*uuid)? { - match value { - Some(ref val) => task.insert(property.to_string(), val.clone()), - None => task.remove(property), - }; - txn.set_task(*uuid, task)?; - } else { - return Err(Error::Database(format!("Task {} does not exist", uuid)).into()); - } - } - } - - Ok(()) - } - /// Get all tasks. pub fn all_tasks(&mut self) -> anyhow::Result> { let mut txn = self.storage.txn()?; @@ -188,137 +147,7 @@ impl TaskDb { /// Sync to the given server, pulling remote changes and pushing local changes. pub fn sync(&mut self, server: &mut Box) -> anyhow::Result<()> { let mut txn = self.storage.txn()?; - - // retry synchronizing until the server accepts our version (this allows for races between - // replicas trying to sync to the same server). If the server insists on the same base - // version twice, then we have diverged. - let mut requested_parent_version_id = None; - loop { - trace!("beginning sync outer loop"); - let mut base_version_id = txn.base_version()?; - - // first pull changes and "rebase" on top of them - loop { - trace!("beginning sync inner loop"); - if let GetVersionResult::Version { - version_id, - history_segment, - .. - } = server.get_child_version(base_version_id)? - { - let version_str = str::from_utf8(&history_segment).unwrap(); - let version: Version = serde_json::from_str(version_str).unwrap(); - - // apply this verison and update base_version in storage - info!("applying version {:?} from server", version_id); - TaskDb::apply_version(txn.as_mut(), version)?; - txn.set_base_version(version_id)?; - base_version_id = version_id; - } else { - info!("no child versions of {:?}", base_version_id); - // at the moment, no more child versions, so we can try adding our own - break; - } - } - - let operations: Vec = txn.operations()?.to_vec(); - if operations.is_empty() { - info!("no changes to push to server"); - // nothing to sync back to the server.. - break; - } - - trace!("sending {} operations to the server", operations.len()); - - // now make a version of our local changes and push those - let new_version = Version { operations }; - let history_segment = serde_json::to_string(&new_version).unwrap().into(); - info!("sending new version to server"); - match server.add_version(base_version_id, history_segment)? { - AddVersionResult::Ok(new_version_id) => { - info!("version {:?} received by server", new_version_id); - txn.set_base_version(new_version_id)?; - txn.set_operations(vec![])?; - break; - } - AddVersionResult::ExpectedParentVersion(parent_version_id) => { - info!( - "new version rejected; must be based on {:?}", - parent_version_id - ); - if let Some(requested) = requested_parent_version_id { - if parent_version_id == requested { - anyhow::bail!("Server's task history has diverged from this replica"); - } - } - requested_parent_version_id = Some(parent_version_id); - } - } - } - - txn.commit()?; - Ok(()) - } - - fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Result<()> { - // The situation here is that the server has already applied all server operations, and we - // have already applied all local operations, so states have diverged by several - // operations. We need to figure out what operations to apply locally and on the server in - // order to return to the same state. - // - // Operational transforms provide this on an operation-by-operation basis. To break this - // down, we treat each server operation individually, in order. For each such operation, - // we start in this state: - // - // - // base state-* - // / \-server op - // * * - // local / \ / - // ops * * - // / \ / new - // * * local - // local / \ / ops - // state-* * - // new-\ / - // server op *-new local state - // - // This is slightly complicated by the fact that the transform function can return None, - // indicating no operation is required. If this happens for a local op, we can just omit - // it. If it happens for server op, then we must copy the remaining local ops. - let mut local_operations: Vec = txn.operations()?; - for server_op in version.operations.drain(..) { - trace!( - "rebasing local operations onto server operation {:?}", - server_op - ); - let mut new_local_ops = Vec::with_capacity(local_operations.len()); - let mut svr_op = Some(server_op); - for local_op in local_operations.drain(..) { - if let Some(o) = svr_op { - let (new_server_op, new_local_op) = Operation::transform(o, local_op.clone()); - trace!("local operation {:?} -> {:?}", local_op, new_local_op); - svr_op = new_server_op; - if let Some(o) = new_local_op { - new_local_ops.push(o); - } - } else { - trace!( - "local operation {:?} unchanged (server operation consumed)", - local_op - ); - new_local_ops.push(local_op); - } - } - if let Some(o) = svr_op { - if let Err(e) = TaskDb::apply_op(txn, &o) { - warn!("Invalid operation when syncing: {} (ignored)", e); - } - } - local_operations = new_local_ops; - } - txn.set_operations(local_operations)?; - Ok(()) + sync::sync(server, txn.as_mut()) } // functions for supporting tests @@ -675,7 +504,7 @@ mod tests { let uuid = Uuid::new_v4(); db1.apply(Operation::Create { uuid }).unwrap(); db1.apply(Operation::Update { - uuid: uuid, + uuid, property: "title".into(), value: Some("my first task".into()), timestamp: Utc::now(), @@ -692,7 +521,7 @@ mod tests { db1.apply(Operation::Delete { uuid }).unwrap(); db1.apply(Operation::Create { uuid }).unwrap(); db1.apply(Operation::Update { - uuid: uuid, + uuid, property: "title".into(), value: Some("my second task".into()), timestamp: Utc::now(), @@ -701,7 +530,7 @@ mod tests { // and on db2, update a property of the task db2.apply(Operation::Update { - uuid: uuid, + uuid, property: "project".into(), value: Some("personal".into()), timestamp: Utc::now(), diff --git a/taskchampion/src/taskdb/ops.rs b/taskchampion/src/taskdb/ops.rs new file mode 100644 index 000000000..8bfd003e0 --- /dev/null +++ b/taskchampion/src/taskdb/ops.rs @@ -0,0 +1,37 @@ +use crate::errors::Error; +use crate::storage::{Operation, StorageTxn}; + +pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &Operation) -> anyhow::Result<()> { + match op { + Operation::Create { uuid } => { + // insert if the task does not already exist + if !txn.create_task(*uuid)? { + return Err(Error::Database(format!("Task {} already exists", uuid)).into()); + } + } + Operation::Delete { ref uuid } => { + if !txn.delete_task(*uuid)? { + return Err(Error::Database(format!("Task {} does not exist", uuid)).into()); + } + } + Operation::Update { + ref uuid, + ref property, + ref value, + timestamp: _, + } => { + // update if this task exists, otherwise ignore + if let Some(mut task) = txn.get_task(*uuid)? { + match value { + Some(ref val) => task.insert(property.to_string(), val.clone()), + None => task.remove(property), + }; + txn.set_task(*uuid, task)?; + } else { + return Err(Error::Database(format!("Task {} does not exist", uuid)).into()); + } + } + } + + Ok(()) +} diff --git a/taskchampion/src/taskdb/sync.rs b/taskchampion/src/taskdb/sync.rs new file mode 100644 index 000000000..af076056e --- /dev/null +++ b/taskchampion/src/taskdb/sync.rs @@ -0,0 +1,145 @@ +use super::ops; +use crate::server::{AddVersionResult, GetVersionResult, Server}; +use crate::storage::{Operation, StorageTxn}; +use log::{info, trace, warn}; +use serde::{Deserialize, Serialize}; +use std::str; + +#[derive(Serialize, Deserialize, Debug)] +struct Version { + operations: Vec, +} + +/// Sync to the given server, pulling remote changes and pushing local changes. +pub(super) fn sync(server: &mut Box, txn: &mut dyn StorageTxn) -> anyhow::Result<()> { + // retry synchronizing until the server accepts our version (this allows for races between + // replicas trying to sync to the same server). If the server insists on the same base + // version twice, then we have diverged. + let mut requested_parent_version_id = None; + loop { + trace!("beginning sync outer loop"); + let mut base_version_id = txn.base_version()?; + + // first pull changes and "rebase" on top of them + loop { + trace!("beginning sync inner loop"); + if let GetVersionResult::Version { + version_id, + history_segment, + .. + } = server.get_child_version(base_version_id)? + { + let version_str = str::from_utf8(&history_segment).unwrap(); + let version: Version = serde_json::from_str(version_str).unwrap(); + + // apply this verison and update base_version in storage + info!("applying version {:?} from server", version_id); + apply_version(txn, version)?; + txn.set_base_version(version_id)?; + base_version_id = version_id; + } else { + info!("no child versions of {:?}", base_version_id); + // at the moment, no more child versions, so we can try adding our own + break; + } + } + + let operations: Vec = txn.operations()?.to_vec(); + if operations.is_empty() { + info!("no changes to push to server"); + // nothing to sync back to the server.. + break; + } + + trace!("sending {} operations to the server", operations.len()); + + // now make a version of our local changes and push those + let new_version = Version { operations }; + let history_segment = serde_json::to_string(&new_version).unwrap().into(); + info!("sending new version to server"); + match server.add_version(base_version_id, history_segment)? { + AddVersionResult::Ok(new_version_id) => { + info!("version {:?} received by server", new_version_id); + txn.set_base_version(new_version_id)?; + txn.set_operations(vec![])?; + break; + } + AddVersionResult::ExpectedParentVersion(parent_version_id) => { + info!( + "new version rejected; must be based on {:?}", + parent_version_id + ); + if let Some(requested) = requested_parent_version_id { + if parent_version_id == requested { + anyhow::bail!("Server's task history has diverged from this replica"); + } + } + requested_parent_version_id = Some(parent_version_id); + } + } + } + + txn.commit()?; + Ok(()) +} + +fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Result<()> { + // The situation here is that the server has already applied all server operations, and we + // have already applied all local operations, so states have diverged by several + // operations. We need to figure out what operations to apply locally and on the server in + // order to return to the same state. + // + // Operational transforms provide this on an operation-by-operation basis. To break this + // down, we treat each server operation individually, in order. For each such operation, + // we start in this state: + // + // + // base state-* + // / \-server op + // * * + // local / \ / + // ops * * + // / \ / new + // * * local + // local / \ / ops + // state-* * + // new-\ / + // server op *-new local state + // + // This is slightly complicated by the fact that the transform function can return None, + // indicating no operation is required. If this happens for a local op, we can just omit + // it. If it happens for server op, then we must copy the remaining local ops. + let mut local_operations: Vec = txn.operations()?; + for server_op in version.operations.drain(..) { + trace!( + "rebasing local operations onto server operation {:?}", + server_op + ); + let mut new_local_ops = Vec::with_capacity(local_operations.len()); + let mut svr_op = Some(server_op); + for local_op in local_operations.drain(..) { + if let Some(o) = svr_op { + let (new_server_op, new_local_op) = Operation::transform(o, local_op.clone()); + trace!("local operation {:?} -> {:?}", local_op, new_local_op); + svr_op = new_server_op; + if let Some(o) = new_local_op { + new_local_ops.push(o); + } + } else { + trace!( + "local operation {:?} unchanged (server operation consumed)", + local_op + ); + new_local_ops.push(local_op); + } + } + if let Some(o) = svr_op { + if let Err(e) = ops::apply_op(txn, &o) { + warn!("Invalid operation when syncing: {} (ignored)", e); + } + } + local_operations = new_local_ops; + } + txn.set_operations(local_operations)?; + Ok(()) +}