From 3537db9bbedf84e17406ed283b1ba8a303d298ed Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 25 Nov 2020 19:13:32 -0500 Subject: [PATCH] implement a local sync server --- cli/src/cmd/shared.rs | 6 +- cli/src/cmd/sync.rs | 2 +- taskchampion/src/lib.rs | 1 + taskchampion/src/server/local.rs | 211 +++++++++++++++++++++++++---- taskchampion/src/server/test.rs | 2 +- taskchampion/src/server/types.rs | 4 +- taskchampion/src/taskdb.rs | 17 ++- taskchampion/src/taskstorage/kv.rs | 39 +----- taskchampion/src/utils.rs | 39 ++++++ 9 files changed, 249 insertions(+), 72 deletions(-) create mode 100644 taskchampion/src/utils.rs diff --git a/cli/src/cmd/shared.rs b/cli/src/cmd/shared.rs index 411c85f25..25940bb28 100644 --- a/cli/src/cmd/shared.rs +++ b/cli/src/cmd/shared.rs @@ -51,7 +51,9 @@ impl CommandInvocation { )) } - pub(super) fn get_server(&self) -> impl server::Server { - server::LocalServer::new() + pub(super) fn get_server(&self) -> Fallible { + Ok(server::LocalServer::new(Path::new( + "/tmp/task-sync-server", + ))?) } } diff --git a/cli/src/cmd/sync.rs b/cli/src/cmd/sync.rs index f968ecf39..a17920436 100644 --- a/cli/src/cmd/sync.rs +++ b/cli/src/cmd/sync.rs @@ -22,7 +22,7 @@ define_subcommand! { subcommand_invocation! { fn run(&self, command: &CommandInvocation) -> Fallible<()> { let mut replica = command.get_replica(); - let mut server = command.get_server(); + let mut server = command.get_server()?; replica.sync(&mut server)?; Ok(()) } diff --git a/taskchampion/src/lib.rs b/taskchampion/src/lib.rs index 98ed5f8c3..932dbdeef 100644 --- a/taskchampion/src/lib.rs +++ b/taskchampion/src/lib.rs @@ -29,6 +29,7 @@ pub mod server; mod task; mod taskdb; pub mod taskstorage; +mod utils; pub use replica::Replica; pub use task::Priority; diff --git a/taskchampion/src/server/local.rs b/taskchampion/src/server/local.rs index 30999496a..9659625b3 100644 --- a/taskchampion/src/server/local.rs +++ b/taskchampion/src/server/local.rs @@ -1,33 +1,108 @@ use crate::server::{ AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NO_VERSION_ID, }; +use crate::utils::Key; use failure::Fallible; -use std::collections::HashMap; +use kv::msgpack::Msgpack; +use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf}; +use serde::{Deserialize, Serialize}; +use std::path::Path; use uuid::Uuid; +#[derive(Serialize, Deserialize, Debug)] struct Version { version_id: VersionId, parent_version_id: VersionId, history_segment: HistorySegment, } -pub struct LocalServer { - latest_version_id: VersionId, +pub struct LocalServer<'t> { + store: Store, // NOTE: indexed by parent_version_id! - versions: HashMap, + versions_bucket: Bucket<'t, Key, ValueBuf>>, + latest_version_bucket: Bucket<'t, Integer, ValueBuf>>, } -impl LocalServer { +impl<'t> LocalServer<'t> { /// A test server has no notion of clients, signatures, encryption, etc. - pub fn new() -> LocalServer { - LocalServer { - latest_version_id: NO_VERSION_ID, - versions: HashMap::new(), + pub fn new(directory: &Path) -> Fallible { + let mut config = Config::default(directory); + config.bucket("versions", None); + config.bucket("numbers", None); + config.bucket("latest_version", None); + config.bucket("operations", None); + config.bucket("working_set", None); + let store = Store::new(config)?; + + // versions are stored indexed by VersionId (uuid) + let versions_bucket = store.bucket::>>(Some("versions"))?; + + // this bucket contains the latest version at key 0 + let latest_version_bucket = + store.int_bucket::>>(Some("latest_version"))?; + + Ok(LocalServer { + store, + versions_bucket, + latest_version_bucket, + }) + } + + fn get_latest_version_id(&mut self) -> Fallible { + let txn = self.store.read_txn()?; + let base_version = match txn.get(&self.latest_version_bucket, 0.into()) { + Ok(buf) => buf, + Err(Error::NotFound) => return Ok(NO_VERSION_ID), + Err(e) => return Err(e.into()), } + .inner()? + .to_serde(); + Ok(base_version as VersionId) + } + + fn set_latest_version_id(&mut self, version_id: VersionId) -> Fallible<()> { + let mut txn = self.store.write_txn()?; + txn.set( + &self.latest_version_bucket, + 0.into(), + Msgpack::to_value_buf(version_id as Uuid)?, + )?; + txn.commit()?; + Ok(()) + } + + fn get_version_by_parent_version_id( + &mut self, + parent_version_id: VersionId, + ) -> Fallible> { + let txn = self.store.read_txn()?; + + let version = match txn.get(&self.versions_bucket, parent_version_id.into()) { + Ok(buf) => buf, + Err(Error::NotFound) => return Ok(None), + Err(e) => return Err(e.into()), + } + .inner()? + .to_serde(); + Ok(Some(version)) + } + + fn add_version_by_parent_version_id(&mut self, version: Version) -> Fallible<()> { + let mut txn = self.store.write_txn()?; + txn.set( + &self.versions_bucket, + version.parent_version_id.into(), + Msgpack::to_value_buf(version)?, + )?; + txn.commit()?; + Ok(()) } } -impl Server for LocalServer { +impl<'t> Server for LocalServer<'t> { + // TODO: better transaction isolation for add_version (gets and sets should be in the same + // transaction) + /// Add a new version. If the given version number is incorrect, this responds with the /// appropriate version and expects the caller to try again. fn add_version( @@ -39,33 +114,27 @@ impl Server for LocalServer { // no signature validation // check the parent_version_id for linearity - if self.latest_version_id != NO_VERSION_ID { - if parent_version_id != self.latest_version_id { - return Ok(AddVersionResult::ExpectedParentVersion( - self.latest_version_id, - )); - } + let latest_version_id = self.get_latest_version_id()?; + if latest_version_id != NO_VERSION_ID && parent_version_id != latest_version_id { + return Ok(AddVersionResult::ExpectedParentVersion(latest_version_id)); } // invent a new ID for this version let version_id = Uuid::new_v4(); - self.versions.insert( + self.add_version_by_parent_version_id(Version { + version_id, parent_version_id, - Version { - version_id, - parent_version_id, - history_segment, - }, - ); - self.latest_version_id = version_id; + history_segment, + })?; + self.set_latest_version_id(version_id)?; Ok(AddVersionResult::Ok(version_id)) } /// Get a vector of all versions after `since_version` - fn get_child_version(&self, parent_version_id: VersionId) -> Fallible { - if let Some(version) = self.versions.get(&parent_version_id) { + fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible { + if let Some(version) = self.get_version_by_parent_version_id(parent_version_id)? { Ok(GetVersionResult::Version { version_id: version.version_id, parent_version_id: version.parent_version_id, @@ -77,3 +146,93 @@ impl Server for LocalServer { } } +#[cfg(test)] +mod test { + use super::*; + use failure::Fallible; + use tempdir::TempDir; + + #[test] + fn test_empty() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut server = LocalServer::new(&tmp_dir.path())?; + let child_version = server.get_child_version(NO_VERSION_ID)?; + assert_eq!(child_version, GetVersionResult::NoSuchVersion); + Ok(()) + } + + #[test] + fn test_add_zero_base() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut server = LocalServer::new(&tmp_dir.path())?; + let history = b"1234".to_vec(); + match server.add_version(NO_VERSION_ID, history.clone())? { + AddVersionResult::ExpectedParentVersion(_) => { + panic!("should have accepted the version") + } + AddVersionResult::Ok(version_id) => { + let new_version = server.get_child_version(NO_VERSION_ID)?; + assert_eq!( + new_version, + GetVersionResult::Version { + version_id, + parent_version_id: NO_VERSION_ID, + history_segment: history, + } + ); + } + } + + Ok(()) + } + + #[test] + fn test_add_nonzero_base() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut server = LocalServer::new(&tmp_dir.path())?; + let history = b"1234".to_vec(); + let parent_version_id = Uuid::new_v4() as VersionId; + + // This is OK because the server has no latest_version_id yet + match server.add_version(parent_version_id, history.clone())? { + AddVersionResult::ExpectedParentVersion(_) => { + panic!("should have accepted the version") + } + AddVersionResult::Ok(version_id) => { + let new_version = server.get_child_version(parent_version_id)?; + assert_eq!( + new_version, + GetVersionResult::Version { + version_id, + parent_version_id, + history_segment: history, + } + ); + } + } + + Ok(()) + } + + #[test] + fn test_add_nonzero_base_forbidden() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut server = LocalServer::new(&tmp_dir.path())?; + let history = b"1234".to_vec(); + let parent_version_id = Uuid::new_v4() as VersionId; + + // add a version + if let AddVersionResult::ExpectedParentVersion(_) = + server.add_version(parent_version_id, history.clone())? + { + panic!("should have accepted the version") + } + + // then add another, not based on that one + if let AddVersionResult::Ok(_) = server.add_version(parent_version_id, history.clone())? { + panic!("should not have accepted the version") + } + + Ok(()) + } +} diff --git a/taskchampion/src/server/test.rs b/taskchampion/src/server/test.rs index 3175c2aa5..3d57147ca 100644 --- a/taskchampion/src/server/test.rs +++ b/taskchampion/src/server/test.rs @@ -64,7 +64,7 @@ impl Server for TestServer { } /// Get a vector of all versions after `since_version` - fn get_child_version(&self, parent_version_id: VersionId) -> Fallible { + fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible { if let Some(version) = self.versions.get(&parent_version_id) { Ok(GetVersionResult::Version { version_id: version.version_id, diff --git a/taskchampion/src/server/types.rs b/taskchampion/src/server/types.rs index 9d95f588b..473a17e46 100644 --- a/taskchampion/src/server/types.rs +++ b/taskchampion/src/server/types.rs @@ -12,6 +12,7 @@ pub const NO_VERSION_ID: VersionId = Uuid::nil(); pub type HistorySegment = Vec; /// VersionAdd is the response type from [`crate:server::Server::add_version`]. +#[derive(Debug, PartialEq)] pub enum AddVersionResult { /// OK, version added with the given ID Ok(VersionId), @@ -20,6 +21,7 @@ pub enum AddVersionResult { } /// A version as downloaded from the server +#[derive(Debug, PartialEq)] pub enum GetVersionResult { /// No such version exists NoSuchVersion, @@ -42,5 +44,5 @@ pub trait Server { ) -> Fallible; /// Get the version with the given parent VersionId - fn get_child_version(&self, parent_version_id: VersionId) -> Fallible; + fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible; } diff --git a/taskchampion/src/taskdb.rs b/taskchampion/src/taskdb.rs index 86523717d..eb69e35ae 100644 --- a/taskchampion/src/taskdb.rs +++ b/taskchampion/src/taskdb.rs @@ -1,7 +1,7 @@ use crate::errors::Error; use crate::server::{AddVersionResult, GetVersionResult, Server}; use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn}; -use failure::Fallible; +use failure::{format_err, Fallible}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::str; @@ -168,7 +168,9 @@ impl TaskDB { let mut txn = self.storage.txn()?; // retry synchronizing until the server accepts our version (this allows for races between - // replicas trying to sync to the same server) + // replicas trying to sync to the same server). If the server insists on the same base + // version twice, then we have diverged. + let mut requested_parent_version_id = None; loop { let mut base_version_id = txn.base_version()?; @@ -189,6 +191,7 @@ impl TaskDB { txn.set_base_version(version_id)?; base_version_id = version_id; } else { + println!("no child versions of {:?}", base_version_id); // at the moment, no more child versions, so we can try adding our own break; } @@ -196,6 +199,7 @@ impl TaskDB { let operations: Vec = txn.operations()?.to_vec(); if operations.is_empty() { + println!("no changes to push to server"); // nothing to sync back to the server.. break; } @@ -216,7 +220,14 @@ impl TaskDB { "new version rejected; must be based on {:?}", parent_version_id ); - // ..continue the outer loop + if let Some(requested) = requested_parent_version_id { + if parent_version_id == requested { + return Err(format_err!( + "Server's task history has diverged from this replica" + )); + } + } + requested_parent_version_id = Some(parent_version_id); } } } diff --git a/taskchampion/src/taskstorage/kv.rs b/taskchampion/src/taskstorage/kv.rs index 52947a5c0..f6ee9a7f0 100644 --- a/taskchampion/src/taskstorage/kv.rs +++ b/taskchampion/src/taskstorage/kv.rs @@ -1,50 +1,13 @@ use crate::taskstorage::{ Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION, }; +use crate::utils::Key; use failure::Fallible; use kv::msgpack::Msgpack; use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf}; -use std::convert::TryInto; use std::path::Path; use uuid::Uuid; -/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form -/// of a UUID. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] -struct Key(uuid::Bytes); - -impl From<&[u8]> for Key { - fn from(bytes: &[u8]) -> Key { - Key(bytes.try_into().unwrap()) - } -} - -impl From<&Uuid> for Key { - fn from(uuid: &Uuid) -> Key { - let key = Key(*uuid.as_bytes()); - key - } -} - -impl From for Key { - fn from(uuid: Uuid) -> Key { - let key = Key(*uuid.as_bytes()); - key - } -} - -impl From for Uuid { - fn from(key: Key) -> Uuid { - Uuid::from_bytes(key.0) - } -} - -impl AsRef<[u8]> for Key { - fn as_ref(&self) -> &[u8] { - &self.0[..] - } -} - /// KVStorage is an on-disk storage backend which uses LMDB via the `kv` crate. pub struct KVStorage<'t> { store: Store, diff --git a/taskchampion/src/utils.rs b/taskchampion/src/utils.rs new file mode 100644 index 000000000..aafe6f010 --- /dev/null +++ b/taskchampion/src/utils.rs @@ -0,0 +1,39 @@ +use std::convert::TryInto; +use uuid::Uuid; + +/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form +/// of a UUID. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct Key(uuid::Bytes); + +impl From<&[u8]> for Key { + fn from(bytes: &[u8]) -> Key { + Key(bytes.try_into().unwrap()) + } +} + +impl From<&Uuid> for Key { + fn from(uuid: &Uuid) -> Key { + let key = Key(*uuid.as_bytes()); + key + } +} + +impl From for Key { + fn from(uuid: Uuid) -> Key { + let key = Key(*uuid.as_bytes()); + key + } +} + +impl From for Uuid { + fn from(key: Key) -> Uuid { + Uuid::from_bytes(key.0) + } +} + +impl AsRef<[u8]> for Key { + fn as_ref(&self) -> &[u8] { + &self.0[..] + } +}