Merge branch 'main' into issue90

This commit is contained in:
Dustin J. Mitchell
2021-12-26 19:04:10 -05:00
committed by GitHub
26 changed files with 1348 additions and 476 deletions

View File

@@ -1,6 +1,5 @@
use crate::errors::Error;
use crate::server::Server;
use crate::storage::{Operation, Storage, TaskMap};
use crate::server::{Server, SyncOp};
use crate::storage::{Storage, TaskMap};
use crate::task::{Status, Task};
use crate::taskdb::TaskDb;
use crate::workingset::WorkingSet;
@@ -29,12 +28,14 @@ use uuid::Uuid;
/// during the garbage-collection process.
pub struct Replica {
taskdb: TaskDb,
added_undo_point: bool,
}
impl Replica {
pub fn new(storage: Box<dyn Storage>) -> Replica {
Replica {
taskdb: TaskDb::new(storage),
added_undo_point: false,
}
}
@@ -51,12 +52,13 @@ impl Replica {
uuid: Uuid,
property: S1,
value: Option<S2>,
) -> anyhow::Result<()>
) -> anyhow::Result<TaskMap>
where
S1: Into<String>,
S2: Into<String>,
{
self.taskdb.apply(Operation::Update {
self.add_undo_point(false)?;
self.taskdb.apply(SyncOp::Update {
uuid,
property: property.into(),
value: value.map(|v| v.into()),
@@ -99,10 +101,11 @@ impl Replica {
/// Create a new task. The task must not already exist.
pub fn new_task(&mut self, status: Status, description: String) -> anyhow::Result<Task> {
self.add_undo_point(false)?;
let uuid = Uuid::new_v4();
self.taskdb.apply(Operation::Create { uuid })?;
let taskmap = self.taskdb.apply(SyncOp::Create { uuid })?;
trace!("task {} created", uuid);
let mut task = Task::new(uuid, TaskMap::new()).into_mut(self);
let mut task = Task::new(uuid, taskmap).into_mut(self);
task.set_description(description)?;
task.set_status(status)?;
Ok(task.into_immut())
@@ -113,12 +116,8 @@ impl Replica {
/// should only occur through expiration.
#[allow(dead_code)]
fn delete_task(&mut self, uuid: Uuid) -> anyhow::Result<()> {
// check that it already exists; this is a convenience check, as the task may already exist
// when this Create operation is finally sync'd with operations from other replicas
if self.taskdb.get_task(uuid)?.is_none() {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
self.taskdb.apply(Operation::Delete { uuid })?;
self.add_undo_point(false)?;
self.taskdb.apply(SyncOp::Delete { uuid })?;
trace!("task {} deleted", uuid);
Ok(())
}
@@ -146,6 +145,12 @@ impl Replica {
Ok(())
}
/// Undo local operations until the most recent UndoPoint, returning false if there are no
/// local operations to undo.
pub fn undo(&mut self) -> anyhow::Result<bool> {
self.taskdb.undo()
}
/// Rebuild this replica's working set, based on whether tasks are pending or not. If
/// `renumber` is true, then existing tasks may be moved to new working-set indices; in any
/// case, on completion all pending tasks are in the working set and all non- pending tasks are
@@ -156,11 +161,24 @@ impl Replica {
.rebuild_working_set(|t| t.get("status") == Some(&pending), renumber)?;
Ok(())
}
/// Add an UndoPoint, if one has not already been added by this Replica. This occurs
/// automatically when a change is made. The `force` flag allows forcing a new UndoPoint
/// even if one has laready been created by this Replica, and may be useful when a Replica
/// instance is held for a long time and used to apply more than one user-visible change.
pub fn add_undo_point(&mut self, force: bool) -> anyhow::Result<()> {
if force || !self.added_undo_point {
self.taskdb.add_undo_point()?;
self.added_undo_point = true;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::ReplicaOp;
use crate::task::Status;
use pretty_assertions::assert_eq;
use uuid::Uuid;
@@ -193,10 +211,95 @@ mod tests {
assert_eq!(t.get_description(), "past tense");
assert_eq!(t.get_status(), Status::Completed);
// check tha values have changed in storage, too
// check that values have changed in storage, too
let t = rep.get_task(t.get_uuid()).unwrap().unwrap();
assert_eq!(t.get_description(), "past tense");
assert_eq!(t.get_status(), Status::Completed);
// and check for the corresponding operations, cleaning out the timestamps
// and modified properties as these are based on the current time
let now = Utc::now();
let clean_op = |op: ReplicaOp| {
if let ReplicaOp::Update {
uuid,
property,
mut old_value,
mut value,
..
} = op
{
if property == "modified" {
if value.is_some() {
value = Some("just-now".into());
}
if old_value.is_some() {
old_value = Some("just-now".into());
}
}
ReplicaOp::Update {
uuid,
property,
old_value,
value,
timestamp: now,
}
} else {
op
}
};
assert_eq!(
rep.taskdb
.operations()
.drain(..)
.map(clean_op)
.collect::<Vec<_>>(),
vec![
ReplicaOp::UndoPoint,
ReplicaOp::Create { uuid: t.get_uuid() },
ReplicaOp::Update {
uuid: t.get_uuid(),
property: "modified".into(),
old_value: None,
value: Some("just-now".into()),
timestamp: now,
},
ReplicaOp::Update {
uuid: t.get_uuid(),
property: "description".into(),
old_value: None,
value: Some("a task".into()),
timestamp: now,
},
ReplicaOp::Update {
uuid: t.get_uuid(),
property: "status".into(),
old_value: None,
value: Some("P".into()),
timestamp: now,
},
ReplicaOp::Update {
uuid: t.get_uuid(),
property: "modified".into(),
old_value: Some("just-now".into()),
value: Some("just-now".into()),
timestamp: now,
},
ReplicaOp::Update {
uuid: t.get_uuid(),
property: "description".into(),
old_value: Some("a task".into()),
value: Some("past tense".into()),
timestamp: now,
},
ReplicaOp::Update {
uuid: t.get_uuid(),
property: "status".into(),
old_value: Some("P".into()),
value: Some("C".into()),
timestamp: now,
},
]
);
}
#[test]

View File

@@ -14,6 +14,7 @@ pub(crate) mod test;
mod config;
mod crypto;
mod local;
mod op;
mod remote;
mod types;
@@ -21,3 +22,5 @@ pub use config::ServerConfig;
pub use local::LocalServer;
pub use remote::RemoteServer;
pub use types::*;
pub(crate) use op::SyncOp;

View File

@@ -2,9 +2,10 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// An Operation defines a single change to the task database
/// A SyncOp defines a single change to the task database, that can be synchronized
/// via a server.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
pub enum SyncOp {
/// Create a new task.
///
/// On application, if the task already exists, the operation does nothing.
@@ -18,7 +19,7 @@ pub enum Operation {
/// Update an existing task, setting the given property to the given value. If the value is
/// None, then the corresponding property is deleted.
///
/// If the given task does not exist, the operation does nothing.
/// If the given task does not exist, the operation does nothing.
Update {
uuid: Uuid,
property: String,
@@ -27,9 +28,9 @@ pub enum Operation {
},
}
use Operation::*;
use SyncOp::*;
impl Operation {
impl SyncOp {
// Transform takes two operations A and B that happened concurrently and produces two
// operations A' and B' such that `apply(apply(S, A), B') = apply(apply(S, B), A')`. This
// function is used to serialize operations in a process similar to a Git "rebase".
@@ -52,10 +53,7 @@ impl Operation {
// allows two different systems which have already applied op1 and op2, respectively, and thus
// reached different states, to return to the same state by applying op2' and op1',
// respectively.
pub fn transform(
operation1: Operation,
operation2: Operation,
) -> (Option<Operation>, Option<Operation>) {
pub fn transform(operation1: SyncOp, operation2: SyncOp) -> (Option<SyncOp>, Option<SyncOp>) {
match (&operation1, &operation2) {
// Two creations or deletions of the same uuid reach the same state, so there's no need
// for any further operations to bring the state together.
@@ -131,17 +129,86 @@ mod test {
use pretty_assertions::assert_eq;
use proptest::prelude::*;
// note that `tests/operation_transform_invariant.rs` tests the transform function quite
// thoroughly, so this testing is light.
#[test]
fn test_json_create() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let op = Create { uuid };
let json = serde_json::to_string(&op)?;
assert_eq!(json, format!(r#"{{"Create":{{"uuid":"{}"}}}}"#, uuid));
let deser: SyncOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
#[test]
fn test_json_delete() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let op = Delete { uuid };
let json = serde_json::to_string(&op)?;
assert_eq!(json, format!(r#"{{"Delete":{{"uuid":"{}"}}}}"#, uuid));
let deser: SyncOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
#[test]
fn test_json_update() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
let op = Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp,
};
let json = serde_json::to_string(&op)?;
assert_eq!(
json,
format!(
r#"{{"Update":{{"uuid":"{}","property":"abc","value":"false","timestamp":"{:?}"}}}}"#,
uuid, timestamp,
)
);
let deser: SyncOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
#[test]
fn test_json_update_none() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
let op = Update {
uuid,
property: "abc".into(),
value: None,
timestamp,
};
let json = serde_json::to_string(&op)?;
assert_eq!(
json,
format!(
r#"{{"Update":{{"uuid":"{}","property":"abc","value":null,"timestamp":"{:?}"}}}}"#,
uuid, timestamp,
)
);
let deser: SyncOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
fn test_transform(
setup: Option<Operation>,
o1: Operation,
o2: Operation,
exp1p: Option<Operation>,
exp2p: Option<Operation>,
setup: Option<SyncOp>,
o1: SyncOp,
o2: SyncOp,
exp1p: Option<SyncOp>,
exp2p: Option<SyncOp>,
) {
let (o1p, o2p) = Operation::transform(o1.clone(), o2.clone());
let (o1p, o2p) = SyncOp::transform(o1.clone(), o2.clone());
assert_eq!((&o1p, &o2p), (&exp1p, &exp2p));
// check that the two operation sequences have the same effect, enforcing the invariant of
@@ -274,72 +341,6 @@ mod test {
);
}
#[test]
fn test_json_create() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let op = Create { uuid };
assert_eq!(
serde_json::to_string(&op)?,
format!(r#"{{"Create":{{"uuid":"{}"}}}}"#, uuid),
);
Ok(())
}
#[test]
fn test_json_delete() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let op = Delete { uuid };
assert_eq!(
serde_json::to_string(&op)?,
format!(r#"{{"Delete":{{"uuid":"{}"}}}}"#, uuid),
);
Ok(())
}
#[test]
fn test_json_update() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
let op = Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp,
};
assert_eq!(
serde_json::to_string(&op)?,
format!(
r#"{{"Update":{{"uuid":"{}","property":"abc","value":"false","timestamp":"{:?}"}}}}"#,
uuid, timestamp,
),
);
Ok(())
}
#[test]
fn test_json_update_none() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
let op = Update {
uuid,
property: "abc".into(),
value: None,
timestamp,
};
assert_eq!(
serde_json::to_string(&op)?,
format!(
r#"{{"Update":{{"uuid":"{}","property":"abc","value":null,"timestamp":"{:?}"}}}}"#,
uuid, timestamp,
),
);
Ok(())
}
fn uuid_strategy() -> impl Strategy<Value = Uuid> {
prop_oneof![
Just(Uuid::parse_str("83a2f9ef-f455-4195-b92e-a54c161eebfc").unwrap()),
@@ -349,12 +350,12 @@ mod test {
]
}
fn operation_strategy() -> impl Strategy<Value = Operation> {
fn operation_strategy() -> impl Strategy<Value = SyncOp> {
prop_oneof![
uuid_strategy().prop_map(|uuid| Operation::Create { uuid }),
uuid_strategy().prop_map(|uuid| Operation::Delete { uuid }),
uuid_strategy().prop_map(|uuid| Create { uuid }),
uuid_strategy().prop_map(|uuid| Delete { uuid }),
(uuid_strategy(), "(title|project|status)").prop_map(|(uuid, property)| {
Operation::Update {
Update {
uuid,
property,
value: Some("true".into()),
@@ -372,38 +373,38 @@ mod test {
// check that the two operation sequences have the same effect, enforcing the invariant of
// the transform function.
fn transform_invariant_holds(o1 in operation_strategy(), o2 in operation_strategy()) {
let (o1p, o2p) = Operation::transform(o1.clone(), o2.clone());
let (o1p, o2p) = SyncOp::transform(o1.clone(), o2.clone());
let mut db1 = TaskDb::new(Box::new(InMemoryStorage::new()));
let mut db2 = TaskDb::new(Box::new(InMemoryStorage::new()));
// Ensure that any expected tasks already exist
if let Operation::Update{ ref uuid, .. } = o1 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
if let Update{ uuid, .. } = o1 {
let _ = db1.apply(Create{uuid});
let _ = db2.apply(Create{uuid});
}
if let Operation::Update{ ref uuid, .. } = o2 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
if let Update{ uuid, .. } = o2 {
let _ = db1.apply(Create{uuid});
let _ = db2.apply(Create{uuid});
}
if let Operation::Delete{ ref uuid } = o1 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
if let Delete{ uuid } = o1 {
let _ = db1.apply(Create{uuid});
let _ = db2.apply(Create{uuid});
}
if let Operation::Delete{ ref uuid } = o2 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
if let Delete{ uuid } = o2 {
let _ = db1.apply(Create{uuid});
let _ = db2.apply(Create{uuid});
}
// if applying the initial operations fail, that indicates the operation was invalid
// in the base state, so consider the case successful.
if let Err(_) = db1.apply(o1) {
if db1.apply(o1).is_err() {
return Ok(());
}
if let Err(_) = db2.apply(o2) {
if db2.apply(o2).is_err() {
return Ok(());
}

View File

@@ -1,6 +1,6 @@
#![allow(clippy::new_without_default)]
use crate::storage::{Operation, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
use crate::storage::{ReplicaOp, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use uuid::Uuid;
@@ -9,7 +9,7 @@ use uuid::Uuid;
struct Data {
tasks: HashMap<Uuid, TaskMap>,
base_version: VersionId,
operations: Vec<Operation>,
operations: Vec<ReplicaOp>,
working_set: Vec<Option<Uuid>>,
}
@@ -87,16 +87,16 @@ impl<'t> StorageTxn for Txn<'t> {
Ok(())
}
fn operations(&mut self) -> anyhow::Result<Vec<Operation>> {
fn operations(&mut self) -> anyhow::Result<Vec<ReplicaOp>> {
Ok(self.data_ref().operations.clone())
}
fn add_operation(&mut self, op: Operation) -> anyhow::Result<()> {
fn add_operation(&mut self, op: ReplicaOp) -> anyhow::Result<()> {
self.mut_data_ref().operations.push(op);
Ok(())
}
fn set_operations(&mut self, ops: Vec<Operation>) -> anyhow::Result<()> {
fn set_operations(&mut self, ops: Vec<ReplicaOp>) -> anyhow::Result<()> {
self.mut_data_ref().operations = ops;
Ok(())
}

View File

@@ -11,14 +11,14 @@ use uuid::Uuid;
mod config;
mod inmemory;
mod operation;
mod op;
pub(crate) mod sqlite;
pub use config::StorageConfig;
pub use inmemory::InMemoryStorage;
pub use sqlite::SqliteStorage;
pub use operation::Operation;
pub use op::ReplicaOp;
/// An in-memory representation of a task as a simple hashmap
pub type TaskMap = HashMap<String, String>;
@@ -80,14 +80,14 @@ pub trait StorageTxn {
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations(&mut self) -> Result<Vec<Operation>>;
fn operations(&mut self) -> Result<Vec<ReplicaOp>>;
/// Add an operation to the end of the list of operations in the storage. Note that this
/// merely *stores* the operation; it is up to the TaskDb to apply it.
fn add_operation(&mut self, op: Operation) -> Result<()>;
fn add_operation(&mut self, op: ReplicaOp) -> Result<()>;
/// Replace the current list of operations with a new list.
fn set_operations(&mut self, ops: Vec<Operation>) -> Result<()>;
fn set_operations(&mut self, ops: Vec<ReplicaOp>) -> Result<()>;
/// Get the entire working set, with each task UUID at its appropriate (1-based) index.
/// Element 0 is always None.

View File

@@ -0,0 +1,283 @@
use crate::server::SyncOp;
use crate::storage::TaskMap;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// A ReplicaOp defines a single change to the task database, as stored locally in the replica.
/// This contains additional information not included in SyncOp.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum ReplicaOp {
/// Create a new task.
///
/// On undo, the task is deleted.
Create { uuid: Uuid },
/// Delete an existing task.
///
/// On undo, the task's data is restored from old_task.
Delete { uuid: Uuid, old_task: TaskMap },
/// Update an existing task, setting the given property to the given value. If the value is
/// None, then the corresponding property is deleted.
///
/// On undo, the property is set back to its previous value.
Update {
uuid: Uuid,
property: String,
old_value: Option<String>,
value: Option<String>,
timestamp: DateTime<Utc>,
},
/// Mark a point in the operations history to which the user might like to undo. Users
/// typically want to undo more than one operation at a time (for example, most changes update
/// both the `modified` property and some other task property -- the user would like to "undo"
/// both updates at the same time). Applying an UndoPoint does nothing.
UndoPoint,
}
impl ReplicaOp {
/// Convert this operation into a [`SyncOp`].
pub fn into_sync(self) -> Option<SyncOp> {
match self {
Self::Create { uuid } => Some(SyncOp::Create { uuid }),
Self::Delete { uuid, .. } => Some(SyncOp::Delete { uuid }),
Self::Update {
uuid,
property,
value,
timestamp,
..
} => Some(SyncOp::Update {
uuid,
property,
value,
timestamp,
}),
Self::UndoPoint => None,
}
}
/// Generate a sequence of SyncOp's to reverse the effects of this ReplicaOp.
pub fn reverse_ops(self) -> Vec<SyncOp> {
match self {
Self::Create { uuid } => vec![SyncOp::Delete { uuid }],
Self::Delete { uuid, mut old_task } => {
let mut ops = vec![SyncOp::Create { uuid }];
// We don't have the original update timestamp, but it doesn't
// matter because this SyncOp will just be applied and discarded.
let timestamp = Utc::now();
for (property, value) in old_task.drain() {
ops.push(SyncOp::Update {
uuid,
property,
value: Some(value),
timestamp,
});
}
ops
}
Self::Update {
uuid,
property,
old_value,
timestamp,
..
} => vec![SyncOp::Update {
uuid,
property,
value: old_value,
timestamp,
}],
Self::UndoPoint => vec![],
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::taskmap_with;
use chrono::Utc;
use pretty_assertions::assert_eq;
use ReplicaOp::*;
#[test]
fn test_json_create() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let op = Create { uuid };
let json = serde_json::to_string(&op)?;
assert_eq!(json, format!(r#"{{"Create":{{"uuid":"{}"}}}}"#, uuid));
let deser: ReplicaOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
#[test]
fn test_json_delete() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let old_task = vec![("foo".into(), "bar".into())].drain(..).collect();
let op = Delete { uuid, old_task };
let json = serde_json::to_string(&op)?;
assert_eq!(
json,
format!(
r#"{{"Delete":{{"uuid":"{}","old_task":{{"foo":"bar"}}}}}}"#,
uuid
)
);
let deser: ReplicaOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
#[test]
fn test_json_update() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
let op = Update {
uuid,
property: "abc".into(),
old_value: Some("true".into()),
value: Some("false".into()),
timestamp,
};
let json = serde_json::to_string(&op)?;
assert_eq!(
json,
format!(
r#"{{"Update":{{"uuid":"{}","property":"abc","old_value":"true","value":"false","timestamp":"{:?}"}}}}"#,
uuid, timestamp,
)
);
let deser: ReplicaOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
#[test]
fn test_json_update_none() -> anyhow::Result<()> {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
let op = Update {
uuid,
property: "abc".into(),
old_value: None,
value: None,
timestamp,
};
let json = serde_json::to_string(&op)?;
assert_eq!(
json,
format!(
r#"{{"Update":{{"uuid":"{}","property":"abc","old_value":null,"value":null,"timestamp":"{:?}"}}}}"#,
uuid, timestamp,
)
);
let deser: ReplicaOp = serde_json::from_str(&json)?;
assert_eq!(deser, op);
Ok(())
}
#[test]
fn test_into_sync_create() {
let uuid = Uuid::new_v4();
assert_eq!(Create { uuid }.into_sync(), Some(SyncOp::Create { uuid }));
}
#[test]
fn test_into_sync_delete() {
let uuid = Uuid::new_v4();
assert_eq!(
Delete {
uuid,
old_task: TaskMap::new()
}
.into_sync(),
Some(SyncOp::Delete { uuid })
);
}
#[test]
fn test_into_sync_update() {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
assert_eq!(
Update {
uuid,
property: "prop".into(),
old_value: Some("foo".into()),
value: Some("v".into()),
timestamp,
}
.into_sync(),
Some(SyncOp::Update {
uuid,
property: "prop".into(),
value: Some("v".into()),
timestamp,
})
);
}
#[test]
fn test_into_sync_undo_point() {
assert_eq!(UndoPoint.into_sync(), None);
}
#[test]
fn test_reverse_create() {
let uuid = Uuid::new_v4();
assert_eq!(Create { uuid }.reverse_ops(), vec![SyncOp::Delete { uuid }]);
}
#[test]
fn test_reverse_delete() {
let uuid = Uuid::new_v4();
let reversed = Delete {
uuid,
old_task: taskmap_with(vec![("prop1".into(), "v1".into())]),
}
.reverse_ops();
assert_eq!(reversed.len(), 2);
assert_eq!(reversed[0], SyncOp::Create { uuid });
assert!(matches!(
&reversed[1],
SyncOp::Update { uuid: u, property: p, value: Some(v), ..}
if u == &uuid && p == "prop1" && v == "v1"
));
}
#[test]
fn test_reverse_update() {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
assert_eq!(
Update {
uuid,
property: "prop".into(),
old_value: Some("foo".into()),
value: Some("v".into()),
timestamp,
}
.reverse_ops(),
vec![SyncOp::Update {
uuid,
property: "prop".into(),
value: Some("foo".into()),
timestamp,
}]
);
}
#[test]
fn test_reverse_undo_point() {
assert_eq!(UndoPoint.reverse_ops(), vec![]);
}
}

View File

@@ -1,4 +1,4 @@
use crate::storage::{Operation, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
use crate::storage::{ReplicaOp, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
use anyhow::Context;
use rusqlite::types::{FromSql, ToSql};
use rusqlite::{params, Connection, OptionalExtension};
@@ -52,17 +52,17 @@ impl ToSql for StoredTaskMap {
}
}
/// Stores [`Operation`] in SQLite
impl FromSql for Operation {
/// Stores [`ReplicaOp`] in SQLite
impl FromSql for ReplicaOp {
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
let o: Operation = serde_json::from_str(value.as_str()?)
let o: ReplicaOp = serde_json::from_str(value.as_str()?)
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
Ok(o)
}
}
/// Parsers Operation stored as JSON in string column
impl ToSql for Operation {
/// Parses ReplicaOp stored as JSON in string column
impl ToSql for ReplicaOp {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
let s = serde_json::to_string(&self)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
@@ -241,12 +241,12 @@ impl<'t> StorageTxn for Txn<'t> {
Ok(())
}
fn operations(&mut self) -> anyhow::Result<Vec<Operation>> {
fn operations(&mut self) -> anyhow::Result<Vec<ReplicaOp>> {
let t = self.get_txn()?;
let mut q = t.prepare("SELECT data FROM operations ORDER BY id ASC")?;
let rows = q.query_map([], |r| {
let data: Operation = r.get("data")?;
let data: ReplicaOp = r.get("data")?;
Ok(data)
})?;
@@ -257,7 +257,7 @@ impl<'t> StorageTxn for Txn<'t> {
Ok(ret)
}
fn add_operation(&mut self, op: Operation) -> anyhow::Result<()> {
fn add_operation(&mut self, op: ReplicaOp) -> anyhow::Result<()> {
let t = self.get_txn()?;
t.execute("INSERT INTO operations (data) VALUES (?)", params![&op])
@@ -265,7 +265,7 @@ impl<'t> StorageTxn for Txn<'t> {
Ok(())
}
fn set_operations(&mut self, ops: Vec<Operation>) -> anyhow::Result<()> {
fn set_operations(&mut self, ops: Vec<ReplicaOp>) -> anyhow::Result<()> {
let t = self.get_txn()?;
t.execute("DELETE FROM operations", [])
.context("Clear all existing operations")?;
@@ -611,8 +611,8 @@ mod test {
// create some operations
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid1 })?;
txn.add_operation(Operation::Create { uuid: uuid2 })?;
txn.add_operation(ReplicaOp::Create { uuid: uuid1 })?;
txn.add_operation(ReplicaOp::Create { uuid: uuid2 })?;
txn.commit()?;
}
@@ -623,8 +623,8 @@ mod test {
assert_eq!(
ops,
vec![
Operation::Create { uuid: uuid1 },
Operation::Create { uuid: uuid2 },
ReplicaOp::Create { uuid: uuid1 },
ReplicaOp::Create { uuid: uuid2 },
]
);
}
@@ -633,8 +633,14 @@ mod test {
{
let mut txn = storage.txn()?;
txn.set_operations(vec![
Operation::Delete { uuid: uuid2 },
Operation::Delete { uuid: uuid1 },
ReplicaOp::Delete {
uuid: uuid2,
old_task: TaskMap::new(),
},
ReplicaOp::Delete {
uuid: uuid1,
old_task: TaskMap::new(),
},
])?;
txn.commit()?;
}
@@ -642,8 +648,11 @@ mod test {
// create some more operations (to test adding operations after clearing)
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid3 })?;
txn.add_operation(Operation::Delete { uuid: uuid3 })?;
txn.add_operation(ReplicaOp::Create { uuid: uuid3 })?;
txn.add_operation(ReplicaOp::Delete {
uuid: uuid3,
old_task: TaskMap::new(),
})?;
txn.commit()?;
}
@@ -654,10 +663,19 @@ mod test {
assert_eq!(
ops,
vec![
Operation::Delete { uuid: uuid2 },
Operation::Delete { uuid: uuid1 },
Operation::Create { uuid: uuid3 },
Operation::Delete { uuid: uuid3 },
ReplicaOp::Delete {
uuid: uuid2,
old_task: TaskMap::new()
},
ReplicaOp::Delete {
uuid: uuid1,
old_task: TaskMap::new()
},
ReplicaOp::Create { uuid: uuid3 },
ReplicaOp::Delete {
uuid: uuid3,
old_task: TaskMap::new()
},
]
);
}

View File

@@ -387,12 +387,10 @@ impl<'r> TaskMut<'r> {
fn lastmod(&mut self) -> anyhow::Result<()> {
if !self.updated_modified {
let now = format!("{}", Utc::now().timestamp());
self.replica
.update_task(self.task.uuid, Prop::Modified.as_ref(), Some(now.clone()))?;
trace!("task {}: set property modified={:?}", self.task.uuid, now);
self.task
.taskmap
.insert(String::from(Prop::Modified.as_ref()), now);
self.task.taskmap = self
.replica
.update_task(self.task.uuid, Prop::Modified.as_ref(), Some(now))?;
self.updated_modified = true;
}
Ok(())
@@ -405,16 +403,17 @@ impl<'r> TaskMut<'r> {
) -> anyhow::Result<()> {
let property = property.into();
self.lastmod()?;
self.replica
.update_task(self.task.uuid, &property, value.as_ref())?;
if let Some(v) = value {
if let Some(ref v) = value {
trace!("task {}: set property {}={:?}", self.task.uuid, property, v);
self.task.taskmap.insert(property, v);
} else {
trace!("task {}: remove property {}", self.task.uuid, property);
self.task.taskmap.remove(&property);
}
self.task.taskmap = self
.replica
.update_task(self.task.uuid, &property, value.as_ref())?;
Ok(())
}
@@ -423,18 +422,7 @@ impl<'r> TaskMut<'r> {
property: &str,
value: Option<DateTime<Utc>>,
) -> anyhow::Result<()> {
self.lastmod()?;
if let Some(value) = value {
let ts = format!("{}", value.timestamp());
self.replica
.update_task(self.task.uuid, property, Some(ts.clone()))?;
self.task.taskmap.insert(property.to_string(), ts);
} else {
self.replica
.update_task::<_, &str>(self.task.uuid, property, None)?;
self.task.taskmap.remove(property);
}
Ok(())
self.set_string(property, value.map(|v| v.timestamp().to_string()))
}
/// Used by tests to ensure that updates are properly written

View File

@@ -0,0 +1,399 @@
use crate::errors::Error;
use crate::server::SyncOp;
use crate::storage::{ReplicaOp, StorageTxn, TaskMap};
/// Apply the given SyncOp to the replica, updating both the task data and adding a
/// ReplicaOp to the list of operations. Returns the TaskMap of the task after the
/// operation has been applied (or an empty TaskMap for Delete).
pub(super) fn apply_and_record(txn: &mut dyn StorageTxn, op: SyncOp) -> anyhow::Result<TaskMap> {
match op {
SyncOp::Create { uuid } => {
let created = txn.create_task(uuid)?;
if created {
txn.add_operation(ReplicaOp::Create { uuid })?;
txn.commit()?;
Ok(TaskMap::new())
} else {
// TODO: differentiate error types here?
Err(Error::Database(format!("Task {} already exists", uuid)).into())
}
}
SyncOp::Delete { uuid } => {
let task = txn.get_task(uuid)?;
if let Some(task) = task {
txn.delete_task(uuid)?;
txn.add_operation(ReplicaOp::Delete {
uuid,
old_task: task,
})?;
txn.commit()?;
Ok(TaskMap::new())
} else {
Err(Error::Database(format!("Task {} does not exist", uuid)).into())
}
}
SyncOp::Update {
uuid,
property,
value,
timestamp,
} => {
let task = txn.get_task(uuid)?;
if let Some(mut task) = task {
let old_value = task.get(&property).cloned();
if let Some(ref v) = value {
task.insert(property.clone(), v.clone());
} else {
task.remove(&property);
}
txn.set_task(uuid, task.clone())?;
txn.add_operation(ReplicaOp::Update {
uuid,
property,
old_value,
value,
timestamp,
})?;
txn.commit()?;
Ok(task)
} else {
Err(Error::Database(format!("Task {} does not exist", uuid)).into())
}
}
}
}
/// Apply an op to the TaskDb's set of tasks (without recording it in the list of operations)
pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> anyhow::Result<()> {
// TODO: test
// TODO: it'd be nice if this was integrated into apply() somehow, but that clones TaskMaps
// unnecessariliy
match op {
SyncOp::Create { uuid } => {
// insert if the task does not already exist
if !txn.create_task(*uuid)? {
return Err(Error::Database(format!("Task {} already exists", uuid)).into());
}
}
SyncOp::Delete { ref uuid } => {
if !txn.delete_task(*uuid)? {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
SyncOp::Update {
ref uuid,
ref property,
ref value,
timestamp: _,
} => {
// update if this task exists, otherwise ignore
if let Some(mut task) = txn.get_task(*uuid)? {
match value {
Some(ref val) => task.insert(property.to_string(), val.clone()),
None => task.remove(property),
};
txn.set_task(*uuid, task)?;
} else {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::taskdb::TaskDb;
use chrono::Utc;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use uuid::Uuid;
#[test]
fn test_apply_create() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply_and_record(txn.as_mut(), op)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![]),]);
assert_eq!(db.operations(), vec![ReplicaOp::Create { uuid }]);
Ok(())
}
#[test]
fn test_apply_create_exists() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply_and_record(txn.as_mut(), op.clone())?;
assert_eq!(taskmap.len(), 0);
assert_eq!(
apply_and_record(txn.as_mut(), op)
.err()
.unwrap()
.to_string(),
format!("Task Database Error: Task {} already exists", uuid)
);
txn.commit()?;
}
// first op was applied
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![])]);
assert_eq!(db.operations(), vec![ReplicaOp::Create { uuid }]);
Ok(())
}
#[test]
fn test_apply_create_update() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let now = Utc::now();
let op1 = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply_and_record(txn.as_mut(), op1)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
let op2 = SyncOp::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: now,
};
{
let mut txn = db.storage.txn()?;
let mut taskmap = apply_and_record(txn.as_mut(), op2)?;
assert_eq!(
taskmap.drain().collect::<Vec<(_, _)>>(),
vec![("title".into(), "my task".into())]
);
txn.commit()?;
}
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("title".into(), "my task".into())])]
);
assert_eq!(
db.operations(),
vec![
ReplicaOp::Create { uuid },
ReplicaOp::Update {
uuid,
property: "title".into(),
old_value: None,
value: Some("my task".into()),
timestamp: now
}
]
);
Ok(())
}
#[test]
fn test_apply_create_update_delete_prop() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let now = Utc::now();
let op1 = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply_and_record(txn.as_mut(), op1)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
let op2 = SyncOp::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: now,
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply_and_record(txn.as_mut(), op2)?;
assert_eq!(taskmap.get("title"), Some(&"my task".to_owned()));
txn.commit()?;
}
let op3 = SyncOp::Update {
uuid,
property: String::from("priority"),
value: Some("H".into()),
timestamp: now,
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply_and_record(txn.as_mut(), op3)?;
assert_eq!(taskmap.get("priority"), Some(&"H".to_owned()));
txn.commit()?;
}
let op4 = SyncOp::Update {
uuid,
property: String::from("title"),
value: None,
timestamp: now,
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply_and_record(txn.as_mut(), op4)?;
assert_eq!(taskmap.get("title"), None);
assert_eq!(taskmap.get("priority"), Some(&"H".to_owned()));
txn.commit()?;
}
let mut exp = HashMap::new();
let mut task = HashMap::new();
task.insert(String::from("priority"), String::from("H"));
exp.insert(uuid, task);
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("priority".into(), "H".into())])]
);
assert_eq!(
db.operations(),
vec![
ReplicaOp::Create { uuid },
ReplicaOp::Update {
uuid,
property: "title".into(),
old_value: None,
value: Some("my task".into()),
timestamp: now,
},
ReplicaOp::Update {
uuid,
property: "priority".into(),
old_value: None,
value: Some("H".into()),
timestamp: now,
},
ReplicaOp::Update {
uuid,
property: "title".into(),
old_value: Some("my task".into()),
value: None,
timestamp: now,
}
]
);
Ok(())
}
#[test]
fn test_apply_update_does_not_exist() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = SyncOp::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply_and_record(txn.as_mut(), op)
.err()
.unwrap()
.to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
}
Ok(())
}
#[test]
fn test_apply_create_delete() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let now = Utc::now();
let op1 = SyncOp::Create { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply_and_record(txn.as_mut(), op1)?;
assert_eq!(taskmap.len(), 0);
}
let op2 = SyncOp::Update {
uuid,
property: String::from("priority"),
value: Some("H".into()),
timestamp: now,
};
{
let mut txn = db.storage.txn()?;
let taskmap = apply_and_record(txn.as_mut(), op2)?;
assert_eq!(taskmap.get("priority"), Some(&"H".to_owned()));
txn.commit()?;
}
let op3 = SyncOp::Delete { uuid };
{
let mut txn = db.storage.txn()?;
let taskmap = apply_and_record(txn.as_mut(), op3)?;
assert_eq!(taskmap.len(), 0);
txn.commit()?;
}
assert_eq!(db.sorted_tasks(), vec![]);
let mut old_task = TaskMap::new();
old_task.insert("priority".into(), "H".into());
assert_eq!(
db.operations(),
vec![
ReplicaOp::Create { uuid },
ReplicaOp::Update {
uuid,
property: "priority".into(),
old_value: None,
value: Some("H".into()),
timestamp: now,
},
ReplicaOp::Delete { uuid, old_task },
]
);
Ok(())
}
#[test]
fn test_apply_delete_not_present() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = SyncOp::Delete { uuid };
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply_and_record(txn.as_mut(), op)
.err()
.unwrap()
.to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
}
Ok(())
}
}

View File

@@ -1,10 +1,11 @@
use crate::server::Server;
use crate::storage::{Operation, Storage, TaskMap};
use crate::server::{Server, SyncOp};
use crate::storage::{ReplicaOp, Storage, TaskMap};
use uuid::Uuid;
mod ops;
mod apply;
mod snapshot;
mod sync;
mod undo;
mod working_set;
/// A TaskDb is the backend for a replica. It manages the storage, operations, synchronization,
@@ -22,21 +23,29 @@ impl TaskDb {
#[cfg(test)]
pub fn new_inmemory() -> TaskDb {
TaskDb::new(Box::new(crate::storage::InMemoryStorage::new()))
#[cfg(test)]
use crate::storage::InMemoryStorage;
TaskDb::new(Box::new(InMemoryStorage::new()))
}
/// Apply an operation to the TaskDb. Aside from synchronization operations, this is the only way
/// to modify the TaskDb. In cases where an operation does not make sense, this function will do
/// nothing and return an error (but leave the TaskDb in a consistent state).
pub fn apply(&mut self, op: Operation) -> anyhow::Result<()> {
// TODO: differentiate error types here?
/// Apply an operation to the TaskDb. This will update the set of tasks and add a ReplicaOp to
/// the set of operations in the TaskDb, and return the TaskMap containing the resulting task's
/// properties (or an empty TaskMap for deletion).
///
/// Aside from synchronization operations, this is the only way to modify the TaskDb. In cases
/// where an operation does not make sense, this function will do nothing and return an error
/// (but leave the TaskDb in a consistent state).
pub fn apply(&mut self, op: SyncOp) -> anyhow::Result<TaskMap> {
let mut txn = self.storage.txn()?;
if let err @ Err(_) = ops::apply_op(txn.as_mut(), &op) {
return err;
}
txn.add_operation(op)?;
txn.commit()?;
Ok(())
apply::apply_and_record(txn.as_mut(), op)
}
/// Add an UndoPoint operation to the list of replica operations.
pub fn add_undo_point(&mut self) -> anyhow::Result<()> {
let mut txn = self.storage.txn()?;
txn.add_operation(ReplicaOp::UndoPoint)?;
txn.commit()
}
/// Get all tasks.
@@ -112,6 +121,13 @@ impl TaskDb {
sync::sync(server, txn.as_mut(), avoid_snapshots)
}
/// Undo local operations until the most recent UndoPoint, returning false if there are no
/// local operations to undo.
pub fn undo(&mut self) -> anyhow::Result<bool> {
let mut txn = self.storage.txn()?;
undo::undo(txn.as_mut())
}
// functions for supporting tests
#[cfg(test)]
@@ -134,7 +150,7 @@ impl TaskDb {
}
#[cfg(test)]
pub(crate) fn operations(&mut self) -> Vec<Operation> {
pub(crate) fn operations(&mut self) -> Vec<ReplicaOp> {
let mut txn = self.storage.txn().unwrap();
txn.operations()
.unwrap()
@@ -148,7 +164,7 @@ impl TaskDb {
mod tests {
use super::*;
use crate::server::test::TestServer;
use crate::storage::InMemoryStorage;
use crate::storage::{InMemoryStorage, ReplicaOp};
use chrono::Utc;
use pretty_assertions::assert_eq;
use proptest::prelude::*;
@@ -157,14 +173,21 @@ mod tests {
#[test]
fn test_apply() {
// this verifies that the operation is both applied and included in the list of
// operations; more detailed tests are in the `ops` module.
// operations; more detailed tests are in the `apply` module.
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Create { uuid };
let op = SyncOp::Create { uuid };
db.apply(op.clone()).unwrap();
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![]),]);
assert_eq!(db.operations(), vec![op]);
assert_eq!(db.operations(), vec![ReplicaOp::Create { uuid }]);
}
#[test]
fn test_add_undo_point() {
let mut db = TaskDb::new_inmemory();
db.add_undo_point().unwrap();
assert_eq!(db.operations(), vec![ReplicaOp::UndoPoint]);
}
fn newdb() -> TaskDb {
@@ -173,7 +196,7 @@ mod tests {
#[derive(Debug)]
enum Action {
Op(Operation),
Op(SyncOp),
Sync,
}
@@ -185,14 +208,14 @@ mod tests {
.chunks(2)
.map(|action_on| {
let action = match action_on[0] {
b'C' => Action::Op(Operation::Create { uuid }),
b'U' => Action::Op(Operation::Update {
b'C' => Action::Op(SyncOp::Create { uuid }),
b'U' => Action::Op(SyncOp::Update {
uuid,
property: "title".into(),
value: Some("foo".into()),
timestamp: Utc::now(),
}),
b'D' => Action::Op(Operation::Delete { uuid }),
b'D' => Action::Op(SyncOp::Delete { uuid }),
b'S' => Action::Sync,
_ => unreachable!(),
};

View File

@@ -1,233 +0,0 @@
use crate::errors::Error;
use crate::storage::{Operation, StorageTxn};
pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &Operation) -> anyhow::Result<()> {
match op {
Operation::Create { uuid } => {
// insert if the task does not already exist
if !txn.create_task(*uuid)? {
return Err(Error::Database(format!("Task {} already exists", uuid)).into());
}
}
Operation::Delete { ref uuid } => {
if !txn.delete_task(*uuid)? {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
Operation::Update {
ref uuid,
ref property,
ref value,
timestamp: _,
} => {
// update if this task exists, otherwise ignore
if let Some(mut task) = txn.get_task(*uuid)? {
match value {
Some(ref val) => task.insert(property.to_string(), val.clone()),
None => task.remove(property),
};
txn.set_task(*uuid, task)?;
} else {
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::taskdb::TaskDb;
use chrono::Utc;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use uuid::Uuid;
#[test]
fn test_apply_create() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op)?;
txn.commit()?;
}
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![]),]);
Ok(())
}
#[test]
fn test_apply_create_exists() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op)?;
assert_eq!(
apply_op(txn.as_mut(), &op).err().unwrap().to_string(),
format!("Task Database Error: Task {} already exists", uuid)
);
txn.commit()?;
}
// first op was applied
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![])]);
Ok(())
}
#[test]
fn test_apply_create_update() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op1)?;
txn.commit()?;
}
let op2 = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op2)?;
txn.commit()?;
}
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("title".into(), "my task".into())])]
);
Ok(())
}
#[test]
fn test_apply_create_update_delete_prop() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op1)?;
txn.commit()?;
}
let op2 = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op2)?;
txn.commit()?;
}
let op3 = Operation::Update {
uuid,
property: String::from("priority"),
value: Some("H".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op3)?;
txn.commit()?;
}
let op4 = Operation::Update {
uuid,
property: String::from("title"),
value: None,
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op4)?;
txn.commit()?;
}
let mut exp = HashMap::new();
let mut task = HashMap::new();
task.insert(String::from("priority"), String::from("H"));
exp.insert(uuid, task);
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("priority".into(), "H".into())])]
);
Ok(())
}
#[test]
fn test_apply_update_does_not_exist() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply_op(txn.as_mut(), &op).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
}
Ok(())
}
#[test]
fn test_apply_create_delete() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
let op2 = Operation::Delete { uuid };
{
let mut txn = db.storage.txn()?;
apply_op(txn.as_mut(), &op1)?;
apply_op(txn.as_mut(), &op2)?;
txn.commit()?;
}
assert_eq!(db.sorted_tasks(), vec![]);
Ok(())
}
#[test]
fn test_apply_delete_not_present() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Delete { uuid };
{
let mut txn = db.storage.txn()?;
assert_eq!(
apply_op(txn.as_mut(), &op).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
txn.commit()?;
}
Ok(())
}
}

View File

@@ -1,6 +1,6 @@
use super::{ops, snapshot};
use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency};
use crate::storage::{Operation, StorageTxn};
use super::{apply, snapshot};
use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency, SyncOp};
use crate::storage::StorageTxn;
use crate::Error;
use log::{info, trace, warn};
use serde::{Deserialize, Serialize};
@@ -8,7 +8,7 @@ use std::str;
#[derive(Serialize, Deserialize, Debug)]
struct Version {
operations: Vec<Operation>,
operations: Vec<SyncOp>,
}
/// Sync to the given server, pulling remote changes and pushing local changes.
@@ -34,6 +34,12 @@ pub(super) fn sync(
trace!("beginning sync outer loop");
let mut base_version_id = txn.base_version()?;
let mut local_ops: Vec<SyncOp> = txn
.operations()?
.drain(..)
.filter_map(|op| op.into_sync())
.collect();
// first pull changes and "rebase" on top of them
loop {
trace!("beginning sync inner loop");
@@ -48,7 +54,7 @@ pub(super) fn sync(
// apply this verison and update base_version in storage
info!("applying version {:?} from server", version_id);
apply_version(txn, version)?;
apply_version(txn, &mut local_ops, version)?;
txn.set_base_version(version_id)?;
base_version_id = version_id;
} else {
@@ -58,17 +64,18 @@ pub(super) fn sync(
}
}
let operations: Vec<Operation> = txn.operations()?.to_vec();
if operations.is_empty() {
if local_ops.is_empty() {
info!("no changes to push to server");
// nothing to sync back to the server..
break;
}
trace!("sending {} operations to the server", operations.len());
trace!("sending {} operations to the server", local_ops.len());
// now make a version of our local changes and push those
let new_version = Version { operations };
let new_version = Version {
operations: local_ops,
};
let history_segment = serde_json::to_string(&new_version).unwrap().into();
info!("sending new version to server");
let (res, snapshot_urgency) = server.add_version(base_version_id, history_segment)?;
@@ -76,7 +83,6 @@ pub(super) fn sync(
AddVersionResult::Ok(new_version_id) => {
info!("version {:?} received by server", new_version_id);
txn.set_base_version(new_version_id)?;
txn.set_operations(vec![])?;
// make a snapshot if the server indicates it is urgent enough
let base_urgency = if avoid_snapshots {
@@ -106,11 +112,16 @@ pub(super) fn sync(
}
}
txn.set_operations(vec![])?;
txn.commit()?;
Ok(())
}
fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Result<()> {
fn apply_version(
txn: &mut dyn StorageTxn,
local_ops: &mut Vec<SyncOp>,
mut version: Version,
) -> anyhow::Result<()> {
// The situation here is that the server has already applied all server operations, and we
// have already applied all local operations, so states have diverged by several
// operations. We need to figure out what operations to apply locally and on the server in
@@ -136,17 +147,16 @@ fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Resu
// This is slightly complicated by the fact that the transform function can return None,
// indicating no operation is required. If this happens for a local op, we can just omit
// it. If it happens for server op, then we must copy the remaining local ops.
let mut local_operations: Vec<Operation> = txn.operations()?;
for server_op in version.operations.drain(..) {
trace!(
"rebasing local operations onto server operation {:?}",
server_op
);
let mut new_local_ops = Vec::with_capacity(local_operations.len());
let mut new_local_ops = Vec::with_capacity(local_ops.len());
let mut svr_op = Some(server_op);
for local_op in local_operations.drain(..) {
for local_op in local_ops.drain(..) {
if let Some(o) = svr_op {
let (new_server_op, new_local_op) = Operation::transform(o, local_op.clone());
let (new_server_op, new_local_op) = SyncOp::transform(o, local_op.clone());
trace!("local operation {:?} -> {:?}", local_op, new_local_op);
svr_op = new_server_op;
if let Some(o) = new_local_op {
@@ -161,21 +171,20 @@ fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Resu
}
}
if let Some(o) = svr_op {
if let Err(e) = ops::apply_op(txn, &o) {
if let Err(e) = apply::apply_op(txn, &o) {
warn!("Invalid operation when syncing: {} (ignored)", e);
}
}
local_operations = new_local_ops;
*local_ops = new_local_ops;
}
txn.set_operations(local_operations)?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use crate::server::test::TestServer;
use crate::storage::{InMemoryStorage, Operation};
use crate::server::{test::TestServer, SyncOp};
use crate::storage::InMemoryStorage;
use crate::taskdb::{snapshot::SnapshotTasks, TaskDb};
use chrono::Utc;
use pretty_assertions::assert_eq;
@@ -197,8 +206,8 @@ mod test {
// make some changes in parallel to db1 and db2..
let uuid1 = Uuid::new_v4();
db1.apply(Operation::Create { uuid: uuid1 }).unwrap();
db1.apply(Operation::Update {
db1.apply(SyncOp::Create { uuid: uuid1 }).unwrap();
db1.apply(SyncOp::Update {
uuid: uuid1,
property: "title".into(),
value: Some("my first task".into()),
@@ -207,8 +216,8 @@ mod test {
.unwrap();
let uuid2 = Uuid::new_v4();
db2.apply(Operation::Create { uuid: uuid2 }).unwrap();
db2.apply(Operation::Update {
db2.apply(SyncOp::Create { uuid: uuid2 }).unwrap();
db2.apply(SyncOp::Update {
uuid: uuid2,
property: "title".into(),
value: Some("my second task".into()),
@@ -223,14 +232,14 @@ mod test {
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// now make updates to the same task on both sides
db1.apply(Operation::Update {
db1.apply(SyncOp::Update {
uuid: uuid2,
property: "priority".into(),
value: Some("H".into()),
timestamp: Utc::now(),
})
.unwrap();
db2.apply(Operation::Update {
db2.apply(SyncOp::Update {
uuid: uuid2,
property: "project".into(),
value: Some("personal".into()),
@@ -259,8 +268,8 @@ mod test {
// create and update a task..
let uuid = Uuid::new_v4();
db1.apply(Operation::Create { uuid }).unwrap();
db1.apply(Operation::Update {
db1.apply(SyncOp::Create { uuid }).unwrap();
db1.apply(SyncOp::Update {
uuid,
property: "title".into(),
value: Some("my first task".into()),
@@ -275,9 +284,9 @@ mod test {
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// delete and re-create the task on db1
db1.apply(Operation::Delete { uuid }).unwrap();
db1.apply(Operation::Create { uuid }).unwrap();
db1.apply(Operation::Update {
db1.apply(SyncOp::Delete { uuid }).unwrap();
db1.apply(SyncOp::Create { uuid }).unwrap();
db1.apply(SyncOp::Update {
uuid,
property: "title".into(),
value: Some("my second task".into()),
@@ -286,7 +295,7 @@ mod test {
.unwrap();
// and on db2, update a property of the task
db2.apply(Operation::Update {
db2.apply(SyncOp::Update {
uuid,
property: "project".into(),
value: Some("personal".into()),
@@ -310,8 +319,8 @@ mod test {
let mut db1 = newdb();
let uuid = Uuid::new_v4();
db1.apply(Operation::Create { uuid })?;
db1.apply(Operation::Update {
db1.apply(SyncOp::Create { uuid })?;
db1.apply(SyncOp::Update {
uuid,
property: "title".into(),
value: Some("my first task".into()),
@@ -332,7 +341,7 @@ mod test {
assert_eq!(tasks[0].0, uuid);
// update the taskdb and sync again
db1.apply(Operation::Update {
db1.apply(SyncOp::Update {
uuid,
property: "title".into(),
value: Some("my first task, updated".into()),
@@ -362,7 +371,7 @@ mod test {
let mut db1 = newdb();
let uuid = Uuid::new_v4();
db1.apply(Operation::Create { uuid }).unwrap();
db1.apply(SyncOp::Create { uuid }).unwrap();
test_server.set_snapshot_urgency(SnapshotUrgency::Low);
sync(&mut server, db1.storage.txn()?.as_mut(), true).unwrap();

View File

@@ -0,0 +1,117 @@
use super::apply;
use crate::storage::{ReplicaOp, StorageTxn};
use log::{debug, trace};
/// Undo local operations until an UndoPoint.
pub(super) fn undo(txn: &mut dyn StorageTxn) -> anyhow::Result<bool> {
let mut applied = false;
let mut popped = false;
let mut local_ops = txn.operations()?;
while let Some(op) = local_ops.pop() {
popped = true;
if op == ReplicaOp::UndoPoint {
break;
}
debug!("Reversing operation {:?}", op);
let rev_ops = op.reverse_ops();
for op in rev_ops {
trace!("Applying reversed operation {:?}", op);
apply::apply_op(txn, &op)?;
applied = true;
}
}
if popped {
txn.set_operations(local_ops)?;
txn.commit()?;
}
Ok(applied)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::server::SyncOp;
use crate::taskdb::TaskDb;
use chrono::Utc;
use pretty_assertions::assert_eq;
use uuid::Uuid;
#[test]
fn test_apply_create() -> anyhow::Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
let timestamp = Utc::now();
// apply a few ops, capture the DB state, make an undo point, and then apply a few more
// ops.
db.apply(SyncOp::Create { uuid: uuid1 })?;
db.apply(SyncOp::Update {
uuid: uuid1,
property: "prop".into(),
value: Some("v1".into()),
timestamp,
})?;
db.apply(SyncOp::Create { uuid: uuid2 })?;
db.apply(SyncOp::Update {
uuid: uuid2,
property: "prop".into(),
value: Some("v2".into()),
timestamp,
})?;
db.apply(SyncOp::Update {
uuid: uuid2,
property: "prop2".into(),
value: Some("v3".into()),
timestamp,
})?;
let db_state = db.sorted_tasks();
db.add_undo_point()?;
db.apply(SyncOp::Delete { uuid: uuid1 })?;
db.apply(SyncOp::Update {
uuid: uuid2,
property: "prop".into(),
value: None,
timestamp,
})?;
db.apply(SyncOp::Update {
uuid: uuid2,
property: "prop2".into(),
value: Some("new-value".into()),
timestamp,
})?;
assert_eq!(db.operations().len(), 9);
{
let mut txn = db.storage.txn()?;
assert!(undo(txn.as_mut())?);
}
// undo took db back to the snapshot
assert_eq!(db.operations().len(), 5);
assert_eq!(db.sorted_tasks(), db_state);
{
let mut txn = db.storage.txn()?;
assert!(undo(txn.as_mut())?);
}
// empty db
assert_eq!(db.operations().len(), 0);
assert_eq!(db.sorted_tasks(), vec![]);
{
let mut txn = db.storage.txn()?;
// nothing left to undo, so undo() returns false
assert!(!undo(txn.as_mut())?);
}
Ok(())
}
}

View File

@@ -63,7 +63,7 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::storage::Operation;
use crate::server::SyncOp;
use crate::taskdb::TaskDb;
use chrono::Utc;
use uuid::Uuid;
@@ -94,10 +94,10 @@ mod test {
// add everything to the TaskDb
for uuid in &uuids {
db.apply(Operation::Create { uuid: *uuid })?;
db.apply(SyncOp::Create { uuid: *uuid })?;
}
for i in &[0usize, 1, 4] {
db.apply(Operation::Update {
db.apply(SyncOp::Update {
uuid: uuids[*i].clone(),
property: String::from("status"),
value: Some("pending".into()),