From 1789344cd0f6254c0e2c3f78d3f19a5e2fc1accc Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Sun, 19 Dec 2021 21:13:55 +0000 Subject: [PATCH] refactor sync to use SyncOps --- taskchampion/src/storage/op.rs | 140 ++++++++++++-------------------- taskchampion/src/taskdb/sync.rs | 49 ++++++----- 2 files changed, 79 insertions(+), 110 deletions(-) diff --git a/taskchampion/src/storage/op.rs b/taskchampion/src/storage/op.rs index 2e0f45b26..25225a848 100644 --- a/taskchampion/src/storage/op.rs +++ b/taskchampion/src/storage/op.rs @@ -1,3 +1,4 @@ +use crate::server::SyncOp; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -28,97 +29,23 @@ pub enum ReplicaOp { }, } -use ReplicaOp::*; - impl ReplicaOp { - // Transform takes two operations A and B that happened concurrently and produces two - // operations A' and B' such that `apply(apply(S, A), B') = apply(apply(S, B), A')`. This - // function is used to serialize operations in a process similar to a Git "rebase". - // - // * - // / \ - // op1 / \ op2 - // / \ - // * * - // - // this function "completes the diamond: - // - // * * - // \ / - // op2' \ / op1' - // \ / - // * - // - // such that applying op2' after op1 has the same effect as applying op1' after op2. This - // allows two different systems which have already applied op1 and op2, respectively, and thus - // reached different states, to return to the same state by applying op2' and op1', - // respectively. - pub fn transform( - operation1: ReplicaOp, - operation2: ReplicaOp, - ) -> (Option, Option) { - match (&operation1, &operation2) { - // Two creations or deletions of the same uuid reach the same state, so there's no need - // for any further operations to bring the state together. - (&Create { uuid: uuid1 }, &Create { uuid: uuid2 }) if uuid1 == uuid2 => (None, None), - (&Delete { uuid: uuid1 }, &Delete { uuid: uuid2 }) if uuid1 == uuid2 => (None, None), - - // Given a create and a delete of the same task, one of the operations is invalid: the - // create implies the task does not exist, but the delete implies it exists. Somewhat - // arbitrarily, we prefer the Create - (&Create { uuid: uuid1 }, &Delete { uuid: uuid2 }) if uuid1 == uuid2 => { - (Some(operation1), None) - } - (&Delete { uuid: uuid1 }, &Create { uuid: uuid2 }) if uuid1 == uuid2 => { - (None, Some(operation2)) - } - - // And again from an Update and a Create, prefer the Update - (&Update { uuid: uuid1, .. }, &Create { uuid: uuid2 }) if uuid1 == uuid2 => { - (Some(operation1), None) - } - (&Create { uuid: uuid1 }, &Update { uuid: uuid2, .. }) if uuid1 == uuid2 => { - (None, Some(operation2)) - } - - // Given a delete and an update, prefer the delete - (&Update { uuid: uuid1, .. }, &Delete { uuid: uuid2 }) if uuid1 == uuid2 => { - (None, Some(operation2)) - } - (&Delete { uuid: uuid1 }, &Update { uuid: uuid2, .. }) if uuid1 == uuid2 => { - (Some(operation1), None) - } - - // Two updates to the same property of the same task might conflict. - ( - &Update { - uuid: ref uuid1, - property: ref property1, - value: ref value1, - timestamp: ref timestamp1, - }, - &Update { - uuid: ref uuid2, - property: ref property2, - value: ref value2, - timestamp: ref timestamp2, - }, - ) if uuid1 == uuid2 && property1 == property2 => { - // if the value is the same, there's no conflict - if value1 == value2 { - (None, None) - } else if timestamp1 < timestamp2 { - // prefer the later modification - (None, Some(operation2)) - } else { - // prefer the later modification or, if the modifications are the same, - // just choose one of them - (Some(operation1), None) - } - } - - // anything else is not a conflict of any sort, so return the operations unchanged - (_, _) => (Some(operation1), Some(operation2)), + /// Convert this operation into a [`SyncOp`]. + pub fn into_sync(self) -> SyncOp { + match self { + Self::Create { uuid } => SyncOp::Create { uuid }, + Self::Delete { uuid } => SyncOp::Delete { uuid }, + Self::Update { + uuid, + property, + value, + timestamp, + } => SyncOp::Update { + uuid, + property, + value, + timestamp, + }, } } } @@ -200,4 +127,37 @@ mod test { assert_eq!(deser, op); Ok(()) } + + #[test] + fn test_into_sync_create() { + let uuid = Uuid::new_v4(); + assert_eq!(Create { uuid }.into_sync(), SyncOp::Create { uuid }); + } + + #[test] + fn test_into_sync_delete() { + let uuid = Uuid::new_v4(); + assert_eq!(Delete { uuid }.into_sync(), SyncOp::Delete { uuid }); + } + + #[test] + fn test_into_sync_update() { + let uuid = Uuid::new_v4(); + let timestamp = Utc::now(); + assert_eq!( + Update { + uuid, + property: "prop".into(), + value: Some("v".into()), + timestamp, + } + .into_sync(), + SyncOp::Update { + uuid, + property: "prop".into(), + value: Some("v".into()), + timestamp, + } + ); + } } diff --git a/taskchampion/src/taskdb/sync.rs b/taskchampion/src/taskdb/sync.rs index 944deda57..b6c92f093 100644 --- a/taskchampion/src/taskdb/sync.rs +++ b/taskchampion/src/taskdb/sync.rs @@ -1,6 +1,6 @@ use super::snapshot; -use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency}; -use crate::storage::{ReplicaOp, StorageTxn}; +use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency, SyncOp}; +use crate::storage::StorageTxn; use crate::Error; use log::{info, trace, warn}; use serde::{Deserialize, Serialize}; @@ -8,26 +8,26 @@ use std::str; #[derive(Serialize, Deserialize, Debug)] struct Version { - operations: Vec, + operations: Vec, } /// Apply an op to the TaskDb's set of tasks (without recording it in the list of operations) -pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &ReplicaOp) -> anyhow::Result<()> { +pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> anyhow::Result<()> { // TODO: it'd be nice if this was integrated into apply() somehow, but that clones TaskMaps // unnecessariliy match op { - ReplicaOp::Create { uuid } => { + SyncOp::Create { uuid } => { // insert if the task does not already exist if !txn.create_task(*uuid)? { return Err(Error::Database(format!("Task {} already exists", uuid)).into()); } } - ReplicaOp::Delete { ref uuid } => { + SyncOp::Delete { ref uuid } => { if !txn.delete_task(*uuid)? { return Err(Error::Database(format!("Task {} does not exist", uuid)).into()); } } - ReplicaOp::Update { + SyncOp::Update { ref uuid, ref property, ref value, @@ -72,6 +72,12 @@ pub(super) fn sync( trace!("beginning sync outer loop"); let mut base_version_id = txn.base_version()?; + let mut local_ops: Vec = txn + .operations()? + .drain(..) + .map(|op| op.into_sync()) + .collect(); + // first pull changes and "rebase" on top of them loop { trace!("beginning sync inner loop"); @@ -86,7 +92,7 @@ pub(super) fn sync( // apply this verison and update base_version in storage info!("applying version {:?} from server", version_id); - apply_version(txn, version)?; + apply_version(txn, &mut local_ops, version)?; txn.set_base_version(version_id)?; base_version_id = version_id; } else { @@ -96,17 +102,18 @@ pub(super) fn sync( } } - let operations: Vec = txn.operations()?.to_vec(); - if operations.is_empty() { + if local_ops.is_empty() { info!("no changes to push to server"); // nothing to sync back to the server.. break; } - trace!("sending {} operations to the server", operations.len()); + trace!("sending {} operations to the server", local_ops.len()); // now make a version of our local changes and push those - let new_version = Version { operations }; + let new_version = Version { + operations: local_ops, + }; let history_segment = serde_json::to_string(&new_version).unwrap().into(); info!("sending new version to server"); let (res, snapshot_urgency) = server.add_version(base_version_id, history_segment)?; @@ -114,7 +121,6 @@ pub(super) fn sync( AddVersionResult::Ok(new_version_id) => { info!("version {:?} received by server", new_version_id); txn.set_base_version(new_version_id)?; - txn.set_operations(vec![])?; // make a snapshot if the server indicates it is urgent enough let base_urgency = if avoid_snapshots { @@ -144,11 +150,16 @@ pub(super) fn sync( } } + txn.set_operations(vec![])?; txn.commit()?; Ok(()) } -fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Result<()> { +fn apply_version( + txn: &mut dyn StorageTxn, + local_ops: &mut Vec, + mut version: Version, +) -> anyhow::Result<()> { // The situation here is that the server has already applied all server operations, and we // have already applied all local operations, so states have diverged by several // operations. We need to figure out what operations to apply locally and on the server in @@ -174,17 +185,16 @@ fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Resu // This is slightly complicated by the fact that the transform function can return None, // indicating no operation is required. If this happens for a local op, we can just omit // it. If it happens for server op, then we must copy the remaining local ops. - let mut local_operations: Vec = txn.operations()?; for server_op in version.operations.drain(..) { trace!( "rebasing local operations onto server operation {:?}", server_op ); - let mut new_local_ops = Vec::with_capacity(local_operations.len()); + let mut new_local_ops = Vec::with_capacity(local_ops.len()); let mut svr_op = Some(server_op); - for local_op in local_operations.drain(..) { + for local_op in local_ops.drain(..) { if let Some(o) = svr_op { - let (new_server_op, new_local_op) = ReplicaOp::transform(o, local_op.clone()); + let (new_server_op, new_local_op) = SyncOp::transform(o, local_op.clone()); trace!("local operation {:?} -> {:?}", local_op, new_local_op); svr_op = new_server_op; if let Some(o) = new_local_op { @@ -203,9 +213,8 @@ fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Resu warn!("Invalid operation when syncing: {} (ignored)", e); } } - local_operations = new_local_ops; + *local_ops = new_local_ops; } - txn.set_operations(local_operations)?; Ok(()) }