From 549d3b9f6d7d6c06518735667c2e78e8ef41dec2 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Tue, 24 Nov 2020 18:04:49 -0500 Subject: [PATCH 1/7] refactor taskchampion::server into a module with submodules --- taskchampion/src/lib.rs | 3 --- taskchampion/src/server/mod.rs | 6 ++++++ .../src/{testing/testserver.rs => server/test.rs} | 0 taskchampion/src/{server.rs => server/types.rs} | 8 ++++++-- taskchampion/src/taskdb.rs | 2 +- 5 files changed, 13 insertions(+), 6 deletions(-) create mode 100644 taskchampion/src/server/mod.rs rename taskchampion/src/{testing/testserver.rs => server/test.rs} (100%) rename taskchampion/src/{server.rs => server/types.rs} (69%) diff --git a/taskchampion/src/lib.rs b/taskchampion/src/lib.rs index cfc127a93..98ed5f8c3 100644 --- a/taskchampion/src/lib.rs +++ b/taskchampion/src/lib.rs @@ -37,6 +37,3 @@ pub use task::{Task, TaskMut}; /// Re-exported type from the `uuid` crate, for ease of compatibility for consumers of this crate. pub use uuid::Uuid; - -#[cfg(test)] -pub(crate) mod testing; diff --git a/taskchampion/src/server/mod.rs b/taskchampion/src/server/mod.rs new file mode 100644 index 000000000..e73c7f74a --- /dev/null +++ b/taskchampion/src/server/mod.rs @@ -0,0 +1,6 @@ +#[cfg(test)] +pub(crate) mod test; + +mod types; + +pub use types::{Blob, Server, VersionAdd}; diff --git a/taskchampion/src/testing/testserver.rs b/taskchampion/src/server/test.rs similarity index 100% rename from taskchampion/src/testing/testserver.rs rename to taskchampion/src/server/test.rs diff --git a/taskchampion/src/server.rs b/taskchampion/src/server/types.rs similarity index 69% rename from taskchampion/src/server.rs rename to taskchampion/src/server/types.rs index 233838fa6..cae8029a8 100644 --- a/taskchampion/src/server.rs +++ b/taskchampion/src/server/types.rs @@ -1,9 +1,12 @@ +/// A Blob is a hunk of encoded data that is sent to the server. The server does not interpret +/// this data at all. pub type Blob = Vec; +/// VersionAdd is the response type from [`crate:server::Server::add_version`]. pub enum VersionAdd { - // OK, version added + /// OK, version added Ok, - // Rejected, must be based on the the given version + /// Rejected, must be based on the the given version ExpectedVersion(u64), } @@ -18,3 +21,4 @@ pub trait Server { fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob); } + diff --git a/taskchampion/src/taskdb.rs b/taskchampion/src/taskdb.rs index f705f3d5a..542ca5cc3 100644 --- a/taskchampion/src/taskdb.rs +++ b/taskchampion/src/taskdb.rs @@ -296,8 +296,8 @@ impl TaskDB { #[cfg(test)] mod tests { use super::*; + use crate::server::test::TestServer; use crate::taskstorage::InMemoryStorage; - use crate::testing::testserver::TestServer; use chrono::Utc; use proptest::prelude::*; use std::collections::HashMap; From 75edd2773fb4cd3e8909004466913def938cfd7e Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Tue, 24 Nov 2020 18:12:48 -0500 Subject: [PATCH 2/7] make server operations fallible --- taskchampion/src/server/test.rs | 35 +++++++++++++++++--------------- taskchampion/src/server/types.rs | 10 +++++---- taskchampion/src/taskdb.rs | 4 ++-- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/taskchampion/src/server/test.rs b/taskchampion/src/server/test.rs index b81063b1b..e9a85f17d 100644 --- a/taskchampion/src/server/test.rs +++ b/taskchampion/src/server/test.rs @@ -1,4 +1,5 @@ use crate::server::{Blob, Server, VersionAdd}; +use failure::Fallible; use std::collections::HashMap; pub(crate) struct TestServer { @@ -27,21 +28,22 @@ impl TestServer { impl Server for TestServer { /// Get a vector of all versions after `since_version` - fn get_versions(&self, username: &str, since_version: u64) -> Vec { - self.users - .get(username) - .map(|user| user.get_versions(since_version)) - .unwrap_or_else(|| vec![]) + fn get_versions(&self, username: &str, since_version: u64) -> Fallible> { + if let Some(user) = self.users.get(username) { + user.get_versions(since_version) + } else { + Ok(vec![]) + } } /// Add a new version. If the given version number is incorrect, this responds with the /// appropriate version and expects the caller to try again. - fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> VersionAdd { + fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> Fallible { self.get_user_mut(username).add_version(version, blob) } - fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) { - self.get_user_mut(username).add_snapshot(version, blob); + fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) -> Fallible<()> { + self.get_user_mut(username).add_snapshot(version, blob) } } @@ -53,29 +55,30 @@ impl User { } } - fn get_versions(&self, since_version: u64) -> Vec { + fn get_versions(&self, since_version: u64) -> Fallible> { let last_version = self.versions.len(); if last_version == since_version as usize { - return vec![]; + return Ok(vec![]); } - self.versions[since_version as usize..last_version] + Ok(self.versions[since_version as usize..last_version] .iter() .map(|r| r.clone()) - .collect::>() + .collect::>()) } - fn add_version(&mut self, version: u64, blob: Blob) -> VersionAdd { + fn add_version(&mut self, version: u64, blob: Blob) -> Fallible { // of by one here: client wants to send version 1 first let expected_version = self.versions.len() as u64 + 1; if version != expected_version { - return VersionAdd::ExpectedVersion(expected_version); + return Ok(VersionAdd::ExpectedVersion(expected_version)); } self.versions.push(blob); - VersionAdd::Ok + Ok(VersionAdd::Ok) } - fn add_snapshot(&mut self, version: u64, blob: Blob) { + fn add_snapshot(&mut self, version: u64, blob: Blob) -> Fallible<()> { self.snapshots.insert(version, blob); + Ok(()) } } diff --git a/taskchampion/src/server/types.rs b/taskchampion/src/server/types.rs index cae8029a8..567997cac 100644 --- a/taskchampion/src/server/types.rs +++ b/taskchampion/src/server/types.rs @@ -1,3 +1,5 @@ +use failure::Fallible; + /// A Blob is a hunk of encoded data that is sent to the server. The server does not interpret /// this data at all. pub type Blob = Vec; @@ -13,12 +15,12 @@ pub enum VersionAdd { /// A value implementing this trait can act as a server against which a replica can sync. pub trait Server { /// Get a vector of all versions after `since_version` - fn get_versions(&self, username: &str, since_version: u64) -> Vec; + fn get_versions(&self, username: &str, since_version: u64) -> Fallible>; /// Add a new version. If the given version number is incorrect, this responds with the /// appropriate version and expects the caller to try again. - fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> VersionAdd; + fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> Fallible; - fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob); + /// TODO: undefined yet + fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) -> Fallible<()>; } - diff --git a/taskchampion/src/taskdb.rs b/taskchampion/src/taskdb.rs index 542ca5cc3..65e321888 100644 --- a/taskchampion/src/taskdb.rs +++ b/taskchampion/src/taskdb.rs @@ -172,7 +172,7 @@ impl TaskDB { // replicas trying to sync to the same server) loop { // first pull changes and "rebase" on top of them - let new_versions = server.get_versions(username, txn.base_version()?); + let new_versions = server.get_versions(username, txn.base_version()?)?; for version_blob in new_versions { let version_str = str::from_utf8(&version_blob).unwrap(); let version: Version = serde_json::from_str(version_str).unwrap(); @@ -196,7 +196,7 @@ impl TaskDB { let new_version_str = serde_json::to_string(&new_version).unwrap(); println!("sending version {:?} to server", new_version.version); if let VersionAdd::Ok = - server.add_version(username, new_version.version, new_version_str.into()) + server.add_version(username, new_version.version, new_version_str.into())? { txn.set_base_version(new_version.version)?; txn.set_operations(vec![])?; From e92fc0628b613005402799d532a3c3b1748794f8 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 25 Nov 2020 16:39:05 -0500 Subject: [PATCH 3/7] add signing support --- Cargo.lock | 122 +++++++++++++++++++++++++++++ taskchampion/Cargo.toml | 1 + taskchampion/src/server/mod.rs | 1 + taskchampion/src/server/signing.rs | 87 ++++++++++++++++++++ 4 files changed, 211 insertions(+) create mode 100644 taskchampion/src/server/signing.rs diff --git a/Cargo.lock b/Cargo.lock index 15ecf0a9f..c5ee93f04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,6 +145,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bumpalo" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" + [[package]] name = "byteorder" version = "1.3.4" @@ -350,6 +356,15 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" +[[package]] +name = "js-sys" +version = "0.3.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca059e81d9486668f12d455a4ea6daa600bd408134cd17e3d3fb5a32d1f016f8" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "kv" version = "0.10.0" @@ -398,6 +413,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "log" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b" +dependencies = [ + "cfg-if 0.1.10", +] + [[package]] name = "memchr" version = "2.3.4" @@ -445,6 +469,12 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d3b63360ec3cb337817c2dbd47ab4a0f170d285d8e5a2064600f3def1402397" +[[package]] +name = "once_cell" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0" + [[package]] name = "pkg-config" version = "0.3.19" @@ -766,6 +796,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "ring" +version = "0.16.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5911690c9b773bab7e657471afc207f3827b249a657241327e3544d79bcabdd" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rmp" version = "0.8.9" @@ -854,6 +899,12 @@ dependencies = [ "serde", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "strsim" version = "0.8.0" @@ -896,6 +947,7 @@ dependencies = [ "kv", "lmdb-rkv", "proptest", + "ring", "serde", "serde_json", "tempdir", @@ -1025,6 +1077,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "uuid" version = "0.8.1" @@ -1062,6 +1120,70 @@ version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasm-bindgen" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42" +dependencies = [ + "cfg-if 0.1.10", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f22b422e2a757c35a73774860af8e112bff612ce6cb604224e8e47641a9e4f68" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b13312a745c08c469f0b292dd2fcd6411dba5f7160f593da6ef69b64e407038" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f249f06ef7ee334cc3b8ff031bfc11ec99d00f34d86da7498396dc1e3b1498fe" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307" + +[[package]] +name = "web-sys" +version = "0.3.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bf6ef87ad7ae8008e15a355ce696bed26012b7caa21605188cfd8214ab51e2d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/taskchampion/Cargo.toml b/taskchampion/Cargo.toml index 1f6681454..509a7db8b 100644 --- a/taskchampion/Cargo.toml +++ b/taskchampion/Cargo.toml @@ -12,6 +12,7 @@ chrono = { version = "0.4.10", features = ["serde"] } failure = {version = "0.1.5", features = ["derive"] } kv = {version = "0.10.0", features = ["msgpack-value"]} lmdb-rkv = {version = "0.12.3"} +ring = { version = "0.16.17", features = ["std"] } [dev-dependencies] proptest = "0.9.4" diff --git a/taskchampion/src/server/mod.rs b/taskchampion/src/server/mod.rs index e73c7f74a..9a6214425 100644 --- a/taskchampion/src/server/mod.rs +++ b/taskchampion/src/server/mod.rs @@ -1,6 +1,7 @@ #[cfg(test)] pub(crate) mod test; +mod signing; mod types; pub use types::{Blob, Server, VersionAdd}; diff --git a/taskchampion/src/server/signing.rs b/taskchampion/src/server/signing.rs new file mode 100644 index 000000000..e1e49501a --- /dev/null +++ b/taskchampion/src/server/signing.rs @@ -0,0 +1,87 @@ +#![allow(dead_code)] // TODO: temporary until this module is used +//! This is a general wrapper around an asymmetric-key signature system. + +use failure::Fallible; +use ring::{ + rand, + signature::{Ed25519KeyPair, KeyPair, Signature, UnparsedPublicKey, ED25519}, +}; + +type PublicKey = Vec; +type PrivateKey = Vec; + +/// Generate a pair of (public, private) key material (in fact the private key is a keypair) +pub fn new_keypair() -> Fallible<(PublicKey, PrivateKey)> { + let rng = rand::SystemRandom::new(); + let key_pkcs8 = Ed25519KeyPair::generate_pkcs8(&rng)?; + let key_pair = Ed25519KeyPair::from_pkcs8(key_pkcs8.as_ref())?; + let pub_key = key_pair.public_key(); + Ok(( + pub_key.as_ref().to_vec() as PublicKey, + key_pkcs8.as_ref().to_vec() as PrivateKey, + )) +} + +pub struct Signer { + key_pair: Ed25519KeyPair, +} + +impl Signer { + /// Create a new signer, given a pkcs#8 v2 document containing the keypair. + fn new(priv_key: PrivateKey) -> Fallible { + Ok(Self { + key_pair: Ed25519KeyPair::from_pkcs8(&priv_key)?, + }) + } + + pub fn sign>(&self, message: B) -> Fallible { + Ok(self.key_pair.sign(message.as_ref())) + } +} + +pub struct Verifier { + pub_key: PublicKey, +} + +impl Verifier { + fn new(pub_key: PublicKey) -> Fallible { + Ok(Self { pub_key }) + } + + pub fn verify, B2: AsRef<[u8]>>( + &self, + message: B1, + signature: B2, + ) -> Fallible<()> { + let pub_key = UnparsedPublicKey::new(&ED25519, &self.pub_key); + Ok(pub_key.verify(message.as_ref(), signature.as_ref())?) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_verify_ok() -> Fallible<()> { + let (public, private) = new_keypair()?; + let signer = Signer::new(private)?; + let verifier = Verifier::new(public)?; + + let message = b"Hello, world"; + let signature = signer.sign(message)?; + verifier.verify(message, signature) + } + + #[test] + fn test_verify_bad_message() -> Fallible<()> { + let (public, private) = new_keypair()?; + let signer = Signer::new(private)?; + let verifier = Verifier::new(public)?; + + let message = b"Hello, world"; + let signature = signer.sign(message)?; + assert!(verifier.verify(b"Hello, cruel world", signature).is_err()); + Ok(()) + } +} From a81c84d7c75a3ab83c29a5c0044835d3bd84b00b Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 25 Nov 2020 17:52:47 -0500 Subject: [PATCH 4/7] refactor sync to new model --- taskchampion/src/replica.rs | 4 +- taskchampion/src/server/mod.rs | 2 +- taskchampion/src/server/test.rs | 120 +++++++++++------------ taskchampion/src/server/types.rs | 52 +++++++--- taskchampion/src/taskdb.rs | 98 ++++++++++-------- taskchampion/src/taskstorage/inmemory.rs | 14 +-- taskchampion/src/taskstorage/kv.rs | 36 ++++--- taskchampion/src/taskstorage/mod.rs | 10 +- 8 files changed, 193 insertions(+), 143 deletions(-) diff --git a/taskchampion/src/replica.rs b/taskchampion/src/replica.rs index 0ca2e67f6..f141e2348 100644 --- a/taskchampion/src/replica.rs +++ b/taskchampion/src/replica.rs @@ -145,8 +145,8 @@ impl Replica { } /// Synchronize this replica against the given server. - pub fn sync(&mut self, username: &str, server: &mut dyn Server) -> Fallible<()> { - self.taskdb.sync(username, server) + pub fn sync(&mut self, server: &mut dyn Server) -> Fallible<()> { + self.taskdb.sync(server) } /// Perform "garbage collection" on this replica. In particular, this renumbers the working diff --git a/taskchampion/src/server/mod.rs b/taskchampion/src/server/mod.rs index 9a6214425..06009b127 100644 --- a/taskchampion/src/server/mod.rs +++ b/taskchampion/src/server/mod.rs @@ -4,4 +4,4 @@ pub(crate) mod test; mod signing; mod types; -pub use types::{Blob, Server, VersionAdd}; +pub use types::*; diff --git a/taskchampion/src/server/test.rs b/taskchampion/src/server/test.rs index e9a85f17d..3175c2aa5 100644 --- a/taskchampion/src/server/test.rs +++ b/taskchampion/src/server/test.rs @@ -1,84 +1,78 @@ -use crate::server::{Blob, Server, VersionAdd}; +use crate::server::{ + AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NO_VERSION_ID, +}; use failure::Fallible; use std::collections::HashMap; +use uuid::Uuid; -pub(crate) struct TestServer { - users: HashMap, +struct Version { + version_id: VersionId, + parent_version_id: VersionId, + history_segment: HistorySegment, } -struct User { - // versions, indexed at v-1 - versions: Vec, - snapshots: HashMap, +pub(crate) struct TestServer { + latest_version_id: VersionId, + // NOTE: indexed by parent_version_id! + versions: HashMap, } impl TestServer { + /// A test server has no notion of clients, signatures, encryption, etc. pub fn new() -> TestServer { TestServer { - users: HashMap::new(), + latest_version_id: NO_VERSION_ID, + versions: HashMap::new(), } } - - fn get_user_mut(&mut self, username: &str) -> &mut User { - self.users - .entry(username.to_string()) - .or_insert_with(User::new) - } } impl Server for TestServer { - /// Get a vector of all versions after `since_version` - fn get_versions(&self, username: &str, since_version: u64) -> Fallible> { - if let Some(user) = self.users.get(username) { - user.get_versions(since_version) - } else { - Ok(vec![]) - } - } - /// Add a new version. If the given version number is incorrect, this responds with the /// appropriate version and expects the caller to try again. - fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> Fallible { - self.get_user_mut(username).add_version(version, blob) + fn add_version( + &mut self, + parent_version_id: VersionId, + history_segment: HistorySegment, + ) -> Fallible { + // no client lookup + // no signature validation + + // check the parent_version_id for linearity + if self.latest_version_id != NO_VERSION_ID { + if parent_version_id != self.latest_version_id { + return Ok(AddVersionResult::ExpectedParentVersion( + self.latest_version_id, + )); + } + } + + // invent a new ID for this version + let version_id = Uuid::new_v4(); + + self.versions.insert( + parent_version_id, + Version { + version_id, + parent_version_id, + history_segment, + }, + ); + self.latest_version_id = version_id; + + Ok(AddVersionResult::Ok(version_id)) } - fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) -> Fallible<()> { - self.get_user_mut(username).add_snapshot(version, blob) - } -} - -impl User { - fn new() -> User { - User { - versions: vec![], - snapshots: HashMap::new(), - } - } - - fn get_versions(&self, since_version: u64) -> Fallible> { - let last_version = self.versions.len(); - if last_version == since_version as usize { - return Ok(vec![]); - } - Ok(self.versions[since_version as usize..last_version] - .iter() - .map(|r| r.clone()) - .collect::>()) - } - - fn add_version(&mut self, version: u64, blob: Blob) -> Fallible { - // of by one here: client wants to send version 1 first - let expected_version = self.versions.len() as u64 + 1; - if version != expected_version { - return Ok(VersionAdd::ExpectedVersion(expected_version)); - } - self.versions.push(blob); - - Ok(VersionAdd::Ok) - } - - fn add_snapshot(&mut self, version: u64, blob: Blob) -> Fallible<()> { - self.snapshots.insert(version, blob); - Ok(()) + /// Get a vector of all versions after `since_version` + fn get_child_version(&self, parent_version_id: VersionId) -> Fallible { + if let Some(version) = self.versions.get(&parent_version_id) { + Ok(GetVersionResult::Version { + version_id: version.version_id, + parent_version_id: version.parent_version_id, + history_segment: version.history_segment.clone(), + }) + } else { + Ok(GetVersionResult::NoSuchVersion) + } } } diff --git a/taskchampion/src/server/types.rs b/taskchampion/src/server/types.rs index 567997cac..9d95f588b 100644 --- a/taskchampion/src/server/types.rs +++ b/taskchampion/src/server/types.rs @@ -1,26 +1,46 @@ use failure::Fallible; +use uuid::Uuid; -/// A Blob is a hunk of encoded data that is sent to the server. The server does not interpret -/// this data at all. -pub type Blob = Vec; +/// Versions are referred to with sha2 hashes. +pub type VersionId = Uuid; + +/// The distinguished value for "no version" +pub const NO_VERSION_ID: VersionId = Uuid::nil(); + +/// A segment in the history of this task database, in the form of a sequence of operations. This +/// data is pre-encoded, and from the protocol level appears as a sequence of bytes. +pub type HistorySegment = Vec; /// VersionAdd is the response type from [`crate:server::Server::add_version`]. -pub enum VersionAdd { - /// OK, version added - Ok, - /// Rejected, must be based on the the given version - ExpectedVersion(u64), +pub enum AddVersionResult { + /// OK, version added with the given ID + Ok(VersionId), + /// Rejected; expected a version with the given parent version + ExpectedParentVersion(VersionId), +} + +/// A version as downloaded from the server +pub enum GetVersionResult { + /// No such version exists + NoSuchVersion, + + /// The requested version + Version { + version_id: VersionId, + parent_version_id: VersionId, + history_segment: HistorySegment, + }, } /// A value implementing this trait can act as a server against which a replica can sync. pub trait Server { - /// Get a vector of all versions after `since_version` - fn get_versions(&self, username: &str, since_version: u64) -> Fallible>; + /// Add a new version. + fn add_version( + &mut self, + parent_version_id: VersionId, + history_segment: HistorySegment, + ) -> Fallible; - /// Add a new version. If the given version number is incorrect, this responds with the - /// appropriate version and expects the caller to try again. - fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> Fallible; - - /// TODO: undefined yet - fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) -> Fallible<()>; + /// Get the version with the given parent VersionId + fn get_child_version(&self, parent_version_id: VersionId) -> Fallible; } diff --git a/taskchampion/src/taskdb.rs b/taskchampion/src/taskdb.rs index 65e321888..86523717d 100644 --- a/taskchampion/src/taskdb.rs +++ b/taskchampion/src/taskdb.rs @@ -1,5 +1,5 @@ use crate::errors::Error; -use crate::server::{Server, VersionAdd}; +use crate::server::{AddVersionResult, GetVersionResult, Server}; use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn}; use failure::Fallible; use serde::{Deserialize, Serialize}; @@ -13,7 +13,6 @@ pub struct TaskDB { #[derive(Serialize, Deserialize, Debug)] struct Version { - version: u64, operations: Vec, } @@ -165,21 +164,34 @@ impl TaskDB { } /// Sync to the given server, pulling remote changes and pushing local changes. - pub fn sync(&mut self, username: &str, server: &mut dyn Server) -> Fallible<()> { + pub fn sync(&mut self, server: &mut dyn Server) -> Fallible<()> { let mut txn = self.storage.txn()?; // retry synchronizing until the server accepts our version (this allows for races between // replicas trying to sync to the same server) loop { - // first pull changes and "rebase" on top of them - let new_versions = server.get_versions(username, txn.base_version()?)?; - for version_blob in new_versions { - let version_str = str::from_utf8(&version_blob).unwrap(); - let version: Version = serde_json::from_str(version_str).unwrap(); - assert_eq!(version.version, txn.base_version()? + 1); - println!("applying version {:?} from server", version.version); + let mut base_version_id = txn.base_version()?; - TaskDB::apply_version(txn.as_mut(), version)?; + // first pull changes and "rebase" on top of them + loop { + if let GetVersionResult::Version { + version_id, + history_segment, + .. + } = server.get_child_version(base_version_id)? + { + let version_str = str::from_utf8(&history_segment).unwrap(); + let version: Version = serde_json::from_str(version_str).unwrap(); + println!("applying version {:?} from server", version_id); + + // apply this verison and update base_version in storage + TaskDB::apply_version(txn.as_mut(), version)?; + txn.set_base_version(version_id)?; + base_version_id = version_id; + } else { + // at the moment, no more child versions, so we can try adding our own + break; + } } let operations: Vec = txn.operations()?.to_vec(); @@ -189,18 +201,23 @@ impl TaskDB { } // now make a version of our local changes and push those - let new_version = Version { - version: txn.base_version()? + 1, - operations, - }; - let new_version_str = serde_json::to_string(&new_version).unwrap(); - println!("sending version {:?} to server", new_version.version); - if let VersionAdd::Ok = - server.add_version(username, new_version.version, new_version_str.into())? - { - txn.set_base_version(new_version.version)?; - txn.set_operations(vec![])?; - break; + let new_version = Version { operations }; + let history_segment = serde_json::to_string(&new_version).unwrap().into(); + println!("sending new version to server"); + match server.add_version(base_version_id, history_segment)? { + AddVersionResult::Ok(new_version_id) => { + println!("version {:?} received by server", new_version_id); + txn.set_base_version(new_version_id)?; + txn.set_operations(vec![])?; + break; + } + AddVersionResult::ExpectedParentVersion(parent_version_id) => { + println!( + "new version rejected; must be based on {:?}", + parent_version_id + ); + // ..continue the outer loop + } } } @@ -256,7 +273,6 @@ impl TaskDB { } local_operations = new_local_ops; } - txn.set_base_version(version.version)?; txn.set_operations(local_operations)?; Ok(()) } @@ -518,10 +534,10 @@ mod tests { let mut server = TestServer::new(); let mut db1 = newdb(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); let mut db2 = newdb(); - db2.sync("me", &mut server).unwrap(); + db2.sync(&mut server).unwrap(); // make some changes in parallel to db1 and db2.. let uuid1 = Uuid::new_v4(); @@ -545,9 +561,9 @@ mod tests { .unwrap(); // and synchronize those around - db1.sync("me", &mut server).unwrap(); - db2.sync("me", &mut server).unwrap(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); + db2.sync(&mut server).unwrap(); + db1.sync(&mut server).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); // now make updates to the same task on both sides @@ -567,9 +583,9 @@ mod tests { .unwrap(); // and synchronize those around - db1.sync("me", &mut server).unwrap(); - db2.sync("me", &mut server).unwrap(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); + db2.sync(&mut server).unwrap(); + db1.sync(&mut server).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); } @@ -578,10 +594,10 @@ mod tests { let mut server = TestServer::new(); let mut db1 = newdb(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); let mut db2 = newdb(); - db2.sync("me", &mut server).unwrap(); + db2.sync(&mut server).unwrap(); // create and update a task.. let uuid = Uuid::new_v4(); @@ -595,9 +611,9 @@ mod tests { .unwrap(); // and synchronize those around - db1.sync("me", &mut server).unwrap(); - db2.sync("me", &mut server).unwrap(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); + db2.sync(&mut server).unwrap(); + db1.sync(&mut server).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); // delete and re-create the task on db1 @@ -620,9 +636,9 @@ mod tests { }) .unwrap(); - db1.sync("me", &mut server).unwrap(); - db2.sync("me", &mut server).unwrap(); - db1.sync("me", &mut server).unwrap(); + db1.sync(&mut server).unwrap(); + db2.sync(&mut server).unwrap(); + db1.sync(&mut server).unwrap(); assert_eq!(db1.sorted_tasks(), db2.sorted_tasks()); } @@ -678,7 +694,7 @@ mod tests { println!(" {:?} (ignored)", e); } }, - Action::Sync => db.sync("me", &mut server).unwrap(), + Action::Sync => db.sync(&mut server).unwrap(), } } diff --git a/taskchampion/src/taskstorage/inmemory.rs b/taskchampion/src/taskstorage/inmemory.rs index 8fb3da3a1..66a871ed8 100644 --- a/taskchampion/src/taskstorage/inmemory.rs +++ b/taskchampion/src/taskstorage/inmemory.rs @@ -1,6 +1,8 @@ #![allow(clippy::new_without_default)] -use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn}; +use crate::taskstorage::{ + Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION, +}; use failure::Fallible; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -9,7 +11,7 @@ use uuid::Uuid; #[derive(PartialEq, Debug, Clone)] struct Data { tasks: HashMap, - base_version: u64, + base_version: VersionId, operations: Vec, working_set: Vec>, } @@ -79,11 +81,11 @@ impl<'t> TaskStorageTxn for Txn<'t> { Ok(self.data_ref().tasks.keys().copied().collect()) } - fn base_version(&mut self) -> Fallible { - Ok(self.data_ref().base_version) + fn base_version(&mut self) -> Fallible { + Ok(self.data_ref().base_version.clone()) } - fn set_base_version(&mut self, version: u64) -> Fallible<()> { + fn set_base_version(&mut self, version: VersionId) -> Fallible<()> { self.mut_data_ref().base_version = version; Ok(()) } @@ -138,7 +140,7 @@ impl InMemoryStorage { InMemoryStorage { data: Data { tasks: HashMap::new(), - base_version: 0, + base_version: DEFAULT_BASE_VERSION.into(), operations: vec![], working_set: vec![None], }, diff --git a/taskchampion/src/taskstorage/kv.rs b/taskchampion/src/taskstorage/kv.rs index 18e18fb19..52947a5c0 100644 --- a/taskchampion/src/taskstorage/kv.rs +++ b/taskchampion/src/taskstorage/kv.rs @@ -1,4 +1,6 @@ -use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn}; +use crate::taskstorage::{ + Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION, +}; use failure::Fallible; use kv::msgpack::Msgpack; use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf}; @@ -48,6 +50,7 @@ pub struct KVStorage<'t> { store: Store, tasks_bucket: Bucket<'t, Key, ValueBuf>>, numbers_bucket: Bucket<'t, Integer, ValueBuf>>, + uuids_bucket: Bucket<'t, Integer, ValueBuf>>, operations_bucket: Bucket<'t, Integer, ValueBuf>>, working_set_bucket: Bucket<'t, Integer, ValueBuf>>, } @@ -61,6 +64,7 @@ impl<'t> KVStorage<'t> { let mut config = Config::default(directory); config.bucket("tasks", None); config.bucket("numbers", None); + config.bucket("uuids", None); config.bucket("operations", None); config.bucket("working_set", None); let store = Store::new(config)?; @@ -71,6 +75,9 @@ impl<'t> KVStorage<'t> { // this bucket contains various u64s, indexed by constants above let numbers_bucket = store.int_bucket::>>(Some("numbers"))?; + // this bucket contains various Uuids, indexed by constants above + let uuids_bucket = store.int_bucket::>>(Some("uuids"))?; + // this bucket contains operations, numbered consecutively; the NEXT_OPERATION number gives // the index of the next operation to insert let operations_bucket = @@ -85,6 +92,7 @@ impl<'t> KVStorage<'t> { store, tasks_bucket, numbers_bucket, + uuids_bucket, operations_bucket, working_set_bucket, }) @@ -122,6 +130,9 @@ impl<'t> Txn<'t> { fn numbers_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf>> { &self.storage.numbers_bucket } + fn uuids_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf>> { + &self.storage.uuids_bucket + } fn operations_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf>> { &self.storage.operations_bucket } @@ -193,26 +204,26 @@ impl<'t> TaskStorageTxn for Txn<'t> { .collect()) } - fn base_version(&mut self) -> Fallible { - let bucket = self.numbers_bucket(); + fn base_version(&mut self) -> Fallible { + let bucket = self.uuids_bucket(); let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) { Ok(buf) => buf, - Err(Error::NotFound) => return Ok(0), + Err(Error::NotFound) => return Ok(DEFAULT_BASE_VERSION.into()), Err(e) => return Err(e.into()), } .inner()? .to_serde(); - Ok(base_version) + Ok(base_version as VersionId) } - fn set_base_version(&mut self, version: u64) -> Fallible<()> { - let numbers_bucket = self.numbers_bucket(); + fn set_base_version(&mut self, version: VersionId) -> Fallible<()> { + let uuids_bucket = self.uuids_bucket(); let kvtxn = self.kvtxn(); kvtxn.set( - numbers_bucket, + uuids_bucket, BASE_VERSION.into(), - Msgpack::to_value_buf(version)?, + Msgpack::to_value_buf(version as Uuid)?, )?; Ok(()) } @@ -528,7 +539,7 @@ mod test { let mut storage = KVStorage::new(&tmp_dir.path())?; { let mut txn = storage.txn()?; - assert_eq!(txn.base_version()?, 0); + assert_eq!(txn.base_version()?, DEFAULT_BASE_VERSION); } Ok(()) } @@ -537,14 +548,15 @@ mod test { fn test_base_version_setting() -> Fallible<()> { let tmp_dir = TempDir::new("test")?; let mut storage = KVStorage::new(&tmp_dir.path())?; + let u = Uuid::new_v4(); { let mut txn = storage.txn()?; - txn.set_base_version(3)?; + txn.set_base_version(u)?; txn.commit()?; } { let mut txn = storage.txn()?; - assert_eq!(txn.base_version()?, 3); + assert_eq!(txn.base_version()?, u); } Ok(()) } diff --git a/taskchampion/src/taskstorage/mod.rs b/taskchampion/src/taskstorage/mod.rs index 8f25781c5..4ddd7df75 100644 --- a/taskchampion/src/taskstorage/mod.rs +++ b/taskchampion/src/taskstorage/mod.rs @@ -23,6 +23,12 @@ fn taskmap_with(mut properties: Vec<(String, String)>) -> TaskMap { rv } +/// The type of VersionIds +pub use crate::server::VersionId; + +/// The default for base_version. +pub(crate) const DEFAULT_BASE_VERSION: Uuid = crate::server::NO_VERSION_ID; + /// A TaskStorage transaction, in which storage operations are performed. /// /// # Concurrency @@ -58,10 +64,10 @@ pub trait TaskStorageTxn { fn all_task_uuids(&mut self) -> Fallible>; /// Get the current base_version for this storage -- the last version synced from the server. - fn base_version(&mut self) -> Fallible; + fn base_version(&mut self) -> Fallible; /// Set the current base_version for this storage. - fn set_base_version(&mut self, version: u64) -> Fallible<()>; + fn set_base_version(&mut self, version: VersionId) -> Fallible<()>; /// Get the current set of outstanding operations (operations that have not been sync'd to the /// server yet) From 8f7e2e279005aa953006a72c3ce2ab88d934a590 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 25 Nov 2020 18:06:56 -0500 Subject: [PATCH 5/7] add a 'task sync' command using a copy of the test server --- cli/src/cmd/mod.rs | 26 ++--------- cli/src/cmd/shared.rs | 31 ++++++++++++- cli/src/cmd/sync.rs | 39 ++++++++++++++++ taskchampion/src/server/local.rs | 79 ++++++++++++++++++++++++++++++++ taskchampion/src/server/mod.rs | 2 + 5 files changed, 153 insertions(+), 24 deletions(-) create mode 100644 cli/src/cmd/sync.rs create mode 100644 taskchampion/src/server/local.rs diff --git a/cli/src/cmd/mod.rs b/cli/src/cmd/mod.rs index e7d27255b..3e55d0475 100644 --- a/cli/src/cmd/mod.rs +++ b/cli/src/cmd/mod.rs @@ -1,7 +1,5 @@ use clap::{App, ArgMatches}; use failure::{Error, Fallible}; -use std::path::Path; -use taskchampion::{taskstorage, Replica}; #[macro_use] mod macros; @@ -12,6 +10,7 @@ mod gc; mod info; mod list; mod pending; +mod sync; /// Get a list of all subcommands in this crate pub(crate) fn subcommands() -> Vec> { @@ -21,6 +20,7 @@ pub(crate) fn subcommands() -> Vec> { list::cmd(), pending::cmd(), info::cmd(), + sync::cmd(), ] } @@ -54,24 +54,4 @@ pub(crate) trait SubCommandInvocation: std::fmt::Debug { fn as_any(&self) -> &dyn std::any::Any; } -/// A command invocation contains all of the necessary regarding a single invocation of the CLI. -#[derive(Debug)] -pub struct CommandInvocation { - pub(crate) subcommand: Box, -} - -impl CommandInvocation { - pub(crate) fn new(subcommand: Box) -> Self { - Self { subcommand } - } - - pub fn run(self) -> Fallible<()> { - self.subcommand.run(&self) - } - - fn get_replica(&self) -> Replica { - Replica::new(Box::new( - taskstorage::KVStorage::new(Path::new("/tmp/tasks")).unwrap(), - )) - } -} +pub use shared::CommandInvocation; diff --git a/cli/src/cmd/shared.rs b/cli/src/cmd/shared.rs index 4e24bfcc9..411c85f25 100644 --- a/cli/src/cmd/shared.rs +++ b/cli/src/cmd/shared.rs @@ -1,6 +1,7 @@ use clap::Arg; use failure::{format_err, Fallible}; -use taskchampion::{Replica, Task, Uuid}; +use std::path::Path; +use taskchampion::{server, taskstorage, Replica, Task, Uuid}; pub(super) fn task_arg<'a>() -> Arg<'a, 'a> { Arg::with_name("task") @@ -26,3 +27,31 @@ pub(super) fn get_task>(replica: &mut Replica, task_arg: S) -> Fal Err(format_err!("Cannot interpret {:?} as a task", task_arg)) } + +/// A command invocation contains all of the necessary regarding a single invocation of the CLI. +#[derive(Debug)] +pub struct CommandInvocation { + pub(crate) subcommand: Box, +} + +impl CommandInvocation { + pub(crate) fn new(subcommand: Box) -> Self { + Self { subcommand } + } + + pub fn run(self) -> Fallible<()> { + self.subcommand.run(&self) + } + + // -- utilities for command invocations + + pub(super) fn get_replica(&self) -> Replica { + Replica::new(Box::new( + taskstorage::KVStorage::new(Path::new("/tmp/tasks")).unwrap(), + )) + } + + pub(super) fn get_server(&self) -> impl server::Server { + server::LocalServer::new() + } +} diff --git a/cli/src/cmd/sync.rs b/cli/src/cmd/sync.rs new file mode 100644 index 000000000..f968ecf39 --- /dev/null +++ b/cli/src/cmd/sync.rs @@ -0,0 +1,39 @@ +use clap::{App, ArgMatches, SubCommand as ClapSubCommand}; +use failure::Fallible; + +use crate::cmd::{ArgMatchResult, CommandInvocation}; + +#[derive(Debug)] +struct Invocation {} + +define_subcommand! { + fn decorate_app<'a>(&self, app: App<'a, 'a>) -> App<'a, 'a> { + app.subcommand(ClapSubCommand::with_name("sync").about("sync with the server")) + } + + fn arg_match<'a>(&self, matches: &ArgMatches<'a>) -> ArgMatchResult { + match matches.subcommand() { + ("sync", _) => ArgMatchResult::Ok(Box::new(Invocation {})), + _ => ArgMatchResult::None, + } + } +} + +subcommand_invocation! { + fn run(&self, command: &CommandInvocation) -> Fallible<()> { + let mut replica = command.get_replica(); + let mut server = command.get_server(); + replica.sync(&mut server)?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn parse_command() { + with_subcommand_invocation!(vec!["task", "sync"], |_inv| {}); + } +} diff --git a/taskchampion/src/server/local.rs b/taskchampion/src/server/local.rs new file mode 100644 index 000000000..30999496a --- /dev/null +++ b/taskchampion/src/server/local.rs @@ -0,0 +1,79 @@ +use crate::server::{ + AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NO_VERSION_ID, +}; +use failure::Fallible; +use std::collections::HashMap; +use uuid::Uuid; + +struct Version { + version_id: VersionId, + parent_version_id: VersionId, + history_segment: HistorySegment, +} + +pub struct LocalServer { + latest_version_id: VersionId, + // NOTE: indexed by parent_version_id! + versions: HashMap, +} + +impl LocalServer { + /// A test server has no notion of clients, signatures, encryption, etc. + pub fn new() -> LocalServer { + LocalServer { + latest_version_id: NO_VERSION_ID, + versions: HashMap::new(), + } + } +} + +impl Server for LocalServer { + /// Add a new version. If the given version number is incorrect, this responds with the + /// appropriate version and expects the caller to try again. + fn add_version( + &mut self, + parent_version_id: VersionId, + history_segment: HistorySegment, + ) -> Fallible { + // no client lookup + // no signature validation + + // check the parent_version_id for linearity + if self.latest_version_id != NO_VERSION_ID { + if parent_version_id != self.latest_version_id { + return Ok(AddVersionResult::ExpectedParentVersion( + self.latest_version_id, + )); + } + } + + // invent a new ID for this version + let version_id = Uuid::new_v4(); + + self.versions.insert( + parent_version_id, + Version { + version_id, + parent_version_id, + history_segment, + }, + ); + self.latest_version_id = version_id; + + Ok(AddVersionResult::Ok(version_id)) + } + + /// Get a vector of all versions after `since_version` + fn get_child_version(&self, parent_version_id: VersionId) -> Fallible { + if let Some(version) = self.versions.get(&parent_version_id) { + Ok(GetVersionResult::Version { + version_id: version.version_id, + parent_version_id: version.parent_version_id, + history_segment: version.history_segment.clone(), + }) + } else { + Ok(GetVersionResult::NoSuchVersion) + } + } +} + diff --git a/taskchampion/src/server/mod.rs b/taskchampion/src/server/mod.rs index 06009b127..0331bb63c 100644 --- a/taskchampion/src/server/mod.rs +++ b/taskchampion/src/server/mod.rs @@ -1,7 +1,9 @@ #[cfg(test)] pub(crate) mod test; +mod local; mod signing; mod types; +pub use local::LocalServer; pub use types::*; From 3537db9bbedf84e17406ed283b1ba8a303d298ed Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 25 Nov 2020 19:13:32 -0500 Subject: [PATCH 6/7] implement a local sync server --- cli/src/cmd/shared.rs | 6 +- cli/src/cmd/sync.rs | 2 +- taskchampion/src/lib.rs | 1 + taskchampion/src/server/local.rs | 211 +++++++++++++++++++++++++---- taskchampion/src/server/test.rs | 2 +- taskchampion/src/server/types.rs | 4 +- taskchampion/src/taskdb.rs | 17 ++- taskchampion/src/taskstorage/kv.rs | 39 +----- taskchampion/src/utils.rs | 39 ++++++ 9 files changed, 249 insertions(+), 72 deletions(-) create mode 100644 taskchampion/src/utils.rs diff --git a/cli/src/cmd/shared.rs b/cli/src/cmd/shared.rs index 411c85f25..25940bb28 100644 --- a/cli/src/cmd/shared.rs +++ b/cli/src/cmd/shared.rs @@ -51,7 +51,9 @@ impl CommandInvocation { )) } - pub(super) fn get_server(&self) -> impl server::Server { - server::LocalServer::new() + pub(super) fn get_server(&self) -> Fallible { + Ok(server::LocalServer::new(Path::new( + "/tmp/task-sync-server", + ))?) } } diff --git a/cli/src/cmd/sync.rs b/cli/src/cmd/sync.rs index f968ecf39..a17920436 100644 --- a/cli/src/cmd/sync.rs +++ b/cli/src/cmd/sync.rs @@ -22,7 +22,7 @@ define_subcommand! { subcommand_invocation! { fn run(&self, command: &CommandInvocation) -> Fallible<()> { let mut replica = command.get_replica(); - let mut server = command.get_server(); + let mut server = command.get_server()?; replica.sync(&mut server)?; Ok(()) } diff --git a/taskchampion/src/lib.rs b/taskchampion/src/lib.rs index 98ed5f8c3..932dbdeef 100644 --- a/taskchampion/src/lib.rs +++ b/taskchampion/src/lib.rs @@ -29,6 +29,7 @@ pub mod server; mod task; mod taskdb; pub mod taskstorage; +mod utils; pub use replica::Replica; pub use task::Priority; diff --git a/taskchampion/src/server/local.rs b/taskchampion/src/server/local.rs index 30999496a..9659625b3 100644 --- a/taskchampion/src/server/local.rs +++ b/taskchampion/src/server/local.rs @@ -1,33 +1,108 @@ use crate::server::{ AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NO_VERSION_ID, }; +use crate::utils::Key; use failure::Fallible; -use std::collections::HashMap; +use kv::msgpack::Msgpack; +use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf}; +use serde::{Deserialize, Serialize}; +use std::path::Path; use uuid::Uuid; +#[derive(Serialize, Deserialize, Debug)] struct Version { version_id: VersionId, parent_version_id: VersionId, history_segment: HistorySegment, } -pub struct LocalServer { - latest_version_id: VersionId, +pub struct LocalServer<'t> { + store: Store, // NOTE: indexed by parent_version_id! - versions: HashMap, + versions_bucket: Bucket<'t, Key, ValueBuf>>, + latest_version_bucket: Bucket<'t, Integer, ValueBuf>>, } -impl LocalServer { +impl<'t> LocalServer<'t> { /// A test server has no notion of clients, signatures, encryption, etc. - pub fn new() -> LocalServer { - LocalServer { - latest_version_id: NO_VERSION_ID, - versions: HashMap::new(), + pub fn new(directory: &Path) -> Fallible { + let mut config = Config::default(directory); + config.bucket("versions", None); + config.bucket("numbers", None); + config.bucket("latest_version", None); + config.bucket("operations", None); + config.bucket("working_set", None); + let store = Store::new(config)?; + + // versions are stored indexed by VersionId (uuid) + let versions_bucket = store.bucket::>>(Some("versions"))?; + + // this bucket contains the latest version at key 0 + let latest_version_bucket = + store.int_bucket::>>(Some("latest_version"))?; + + Ok(LocalServer { + store, + versions_bucket, + latest_version_bucket, + }) + } + + fn get_latest_version_id(&mut self) -> Fallible { + let txn = self.store.read_txn()?; + let base_version = match txn.get(&self.latest_version_bucket, 0.into()) { + Ok(buf) => buf, + Err(Error::NotFound) => return Ok(NO_VERSION_ID), + Err(e) => return Err(e.into()), } + .inner()? + .to_serde(); + Ok(base_version as VersionId) + } + + fn set_latest_version_id(&mut self, version_id: VersionId) -> Fallible<()> { + let mut txn = self.store.write_txn()?; + txn.set( + &self.latest_version_bucket, + 0.into(), + Msgpack::to_value_buf(version_id as Uuid)?, + )?; + txn.commit()?; + Ok(()) + } + + fn get_version_by_parent_version_id( + &mut self, + parent_version_id: VersionId, + ) -> Fallible> { + let txn = self.store.read_txn()?; + + let version = match txn.get(&self.versions_bucket, parent_version_id.into()) { + Ok(buf) => buf, + Err(Error::NotFound) => return Ok(None), + Err(e) => return Err(e.into()), + } + .inner()? + .to_serde(); + Ok(Some(version)) + } + + fn add_version_by_parent_version_id(&mut self, version: Version) -> Fallible<()> { + let mut txn = self.store.write_txn()?; + txn.set( + &self.versions_bucket, + version.parent_version_id.into(), + Msgpack::to_value_buf(version)?, + )?; + txn.commit()?; + Ok(()) } } -impl Server for LocalServer { +impl<'t> Server for LocalServer<'t> { + // TODO: better transaction isolation for add_version (gets and sets should be in the same + // transaction) + /// Add a new version. If the given version number is incorrect, this responds with the /// appropriate version and expects the caller to try again. fn add_version( @@ -39,33 +114,27 @@ impl Server for LocalServer { // no signature validation // check the parent_version_id for linearity - if self.latest_version_id != NO_VERSION_ID { - if parent_version_id != self.latest_version_id { - return Ok(AddVersionResult::ExpectedParentVersion( - self.latest_version_id, - )); - } + let latest_version_id = self.get_latest_version_id()?; + if latest_version_id != NO_VERSION_ID && parent_version_id != latest_version_id { + return Ok(AddVersionResult::ExpectedParentVersion(latest_version_id)); } // invent a new ID for this version let version_id = Uuid::new_v4(); - self.versions.insert( + self.add_version_by_parent_version_id(Version { + version_id, parent_version_id, - Version { - version_id, - parent_version_id, - history_segment, - }, - ); - self.latest_version_id = version_id; + history_segment, + })?; + self.set_latest_version_id(version_id)?; Ok(AddVersionResult::Ok(version_id)) } /// Get a vector of all versions after `since_version` - fn get_child_version(&self, parent_version_id: VersionId) -> Fallible { - if let Some(version) = self.versions.get(&parent_version_id) { + fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible { + if let Some(version) = self.get_version_by_parent_version_id(parent_version_id)? { Ok(GetVersionResult::Version { version_id: version.version_id, parent_version_id: version.parent_version_id, @@ -77,3 +146,93 @@ impl Server for LocalServer { } } +#[cfg(test)] +mod test { + use super::*; + use failure::Fallible; + use tempdir::TempDir; + + #[test] + fn test_empty() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut server = LocalServer::new(&tmp_dir.path())?; + let child_version = server.get_child_version(NO_VERSION_ID)?; + assert_eq!(child_version, GetVersionResult::NoSuchVersion); + Ok(()) + } + + #[test] + fn test_add_zero_base() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut server = LocalServer::new(&tmp_dir.path())?; + let history = b"1234".to_vec(); + match server.add_version(NO_VERSION_ID, history.clone())? { + AddVersionResult::ExpectedParentVersion(_) => { + panic!("should have accepted the version") + } + AddVersionResult::Ok(version_id) => { + let new_version = server.get_child_version(NO_VERSION_ID)?; + assert_eq!( + new_version, + GetVersionResult::Version { + version_id, + parent_version_id: NO_VERSION_ID, + history_segment: history, + } + ); + } + } + + Ok(()) + } + + #[test] + fn test_add_nonzero_base() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut server = LocalServer::new(&tmp_dir.path())?; + let history = b"1234".to_vec(); + let parent_version_id = Uuid::new_v4() as VersionId; + + // This is OK because the server has no latest_version_id yet + match server.add_version(parent_version_id, history.clone())? { + AddVersionResult::ExpectedParentVersion(_) => { + panic!("should have accepted the version") + } + AddVersionResult::Ok(version_id) => { + let new_version = server.get_child_version(parent_version_id)?; + assert_eq!( + new_version, + GetVersionResult::Version { + version_id, + parent_version_id, + history_segment: history, + } + ); + } + } + + Ok(()) + } + + #[test] + fn test_add_nonzero_base_forbidden() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut server = LocalServer::new(&tmp_dir.path())?; + let history = b"1234".to_vec(); + let parent_version_id = Uuid::new_v4() as VersionId; + + // add a version + if let AddVersionResult::ExpectedParentVersion(_) = + server.add_version(parent_version_id, history.clone())? + { + panic!("should have accepted the version") + } + + // then add another, not based on that one + if let AddVersionResult::Ok(_) = server.add_version(parent_version_id, history.clone())? { + panic!("should not have accepted the version") + } + + Ok(()) + } +} diff --git a/taskchampion/src/server/test.rs b/taskchampion/src/server/test.rs index 3175c2aa5..3d57147ca 100644 --- a/taskchampion/src/server/test.rs +++ b/taskchampion/src/server/test.rs @@ -64,7 +64,7 @@ impl Server for TestServer { } /// Get a vector of all versions after `since_version` - fn get_child_version(&self, parent_version_id: VersionId) -> Fallible { + fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible { if let Some(version) = self.versions.get(&parent_version_id) { Ok(GetVersionResult::Version { version_id: version.version_id, diff --git a/taskchampion/src/server/types.rs b/taskchampion/src/server/types.rs index 9d95f588b..473a17e46 100644 --- a/taskchampion/src/server/types.rs +++ b/taskchampion/src/server/types.rs @@ -12,6 +12,7 @@ pub const NO_VERSION_ID: VersionId = Uuid::nil(); pub type HistorySegment = Vec; /// VersionAdd is the response type from [`crate:server::Server::add_version`]. +#[derive(Debug, PartialEq)] pub enum AddVersionResult { /// OK, version added with the given ID Ok(VersionId), @@ -20,6 +21,7 @@ pub enum AddVersionResult { } /// A version as downloaded from the server +#[derive(Debug, PartialEq)] pub enum GetVersionResult { /// No such version exists NoSuchVersion, @@ -42,5 +44,5 @@ pub trait Server { ) -> Fallible; /// Get the version with the given parent VersionId - fn get_child_version(&self, parent_version_id: VersionId) -> Fallible; + fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible; } diff --git a/taskchampion/src/taskdb.rs b/taskchampion/src/taskdb.rs index 86523717d..eb69e35ae 100644 --- a/taskchampion/src/taskdb.rs +++ b/taskchampion/src/taskdb.rs @@ -1,7 +1,7 @@ use crate::errors::Error; use crate::server::{AddVersionResult, GetVersionResult, Server}; use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn}; -use failure::Fallible; +use failure::{format_err, Fallible}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::str; @@ -168,7 +168,9 @@ impl TaskDB { let mut txn = self.storage.txn()?; // retry synchronizing until the server accepts our version (this allows for races between - // replicas trying to sync to the same server) + // replicas trying to sync to the same server). If the server insists on the same base + // version twice, then we have diverged. + let mut requested_parent_version_id = None; loop { let mut base_version_id = txn.base_version()?; @@ -189,6 +191,7 @@ impl TaskDB { txn.set_base_version(version_id)?; base_version_id = version_id; } else { + println!("no child versions of {:?}", base_version_id); // at the moment, no more child versions, so we can try adding our own break; } @@ -196,6 +199,7 @@ impl TaskDB { let operations: Vec = txn.operations()?.to_vec(); if operations.is_empty() { + println!("no changes to push to server"); // nothing to sync back to the server.. break; } @@ -216,7 +220,14 @@ impl TaskDB { "new version rejected; must be based on {:?}", parent_version_id ); - // ..continue the outer loop + if let Some(requested) = requested_parent_version_id { + if parent_version_id == requested { + return Err(format_err!( + "Server's task history has diverged from this replica" + )); + } + } + requested_parent_version_id = Some(parent_version_id); } } } diff --git a/taskchampion/src/taskstorage/kv.rs b/taskchampion/src/taskstorage/kv.rs index 52947a5c0..f6ee9a7f0 100644 --- a/taskchampion/src/taskstorage/kv.rs +++ b/taskchampion/src/taskstorage/kv.rs @@ -1,50 +1,13 @@ use crate::taskstorage::{ Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION, }; +use crate::utils::Key; use failure::Fallible; use kv::msgpack::Msgpack; use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf}; -use std::convert::TryInto; use std::path::Path; use uuid::Uuid; -/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form -/// of a UUID. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] -struct Key(uuid::Bytes); - -impl From<&[u8]> for Key { - fn from(bytes: &[u8]) -> Key { - Key(bytes.try_into().unwrap()) - } -} - -impl From<&Uuid> for Key { - fn from(uuid: &Uuid) -> Key { - let key = Key(*uuid.as_bytes()); - key - } -} - -impl From for Key { - fn from(uuid: Uuid) -> Key { - let key = Key(*uuid.as_bytes()); - key - } -} - -impl From for Uuid { - fn from(key: Key) -> Uuid { - Uuid::from_bytes(key.0) - } -} - -impl AsRef<[u8]> for Key { - fn as_ref(&self) -> &[u8] { - &self.0[..] - } -} - /// KVStorage is an on-disk storage backend which uses LMDB via the `kv` crate. pub struct KVStorage<'t> { store: Store, diff --git a/taskchampion/src/utils.rs b/taskchampion/src/utils.rs new file mode 100644 index 000000000..aafe6f010 --- /dev/null +++ b/taskchampion/src/utils.rs @@ -0,0 +1,39 @@ +use std::convert::TryInto; +use uuid::Uuid; + +/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form +/// of a UUID. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct Key(uuid::Bytes); + +impl From<&[u8]> for Key { + fn from(bytes: &[u8]) -> Key { + Key(bytes.try_into().unwrap()) + } +} + +impl From<&Uuid> for Key { + fn from(uuid: &Uuid) -> Key { + let key = Key(*uuid.as_bytes()); + key + } +} + +impl From for Key { + fn from(uuid: Uuid) -> Key { + let key = Key(*uuid.as_bytes()); + key + } +} + +impl From for Uuid { + fn from(key: Key) -> Uuid { + Uuid::from_bytes(key.0) + } +} + +impl AsRef<[u8]> for Key { + fn as_ref(&self) -> &[u8] { + &self.0[..] + } +} From 1511a0e38e0177767677f0c4936a9b9b7fdcc446 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Wed, 25 Nov 2020 19:46:23 -0500 Subject: [PATCH 7/7] update docs based on modified sync designs --- docs/src/sync.md | 58 +++++++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/docs/src/sync.md b/docs/src/sync.md index 3f664542e..cd2621cdc 100644 --- a/docs/src/sync.md +++ b/docs/src/sync.md @@ -36,24 +36,32 @@ This process is analogous (vaguely) to rebasing a sequence of Git commits. ### Versions -Occasionally, database states are named with an integer, called a version. -The system as a whole (all replicas) constructs a monotonic sequence of versions and the operations that separate each version from the next. -No gaps are allowed in the version numbering. -Version 0 is implicitly the empty database. +Occasionally, database states are given a name (that takes the form of a UUID). +The system as a whole (all replicas) constructs a branch-free sequence of versions and the operations that separate each version from the next. +The version with the nil UUID is implicitly the empty database. -The server stores the operations to change a state from a version N to a version N+1, and provides that information as needed to replicas. +The server stores the operations to change a state from a "parent" version to a "child" version, and provides that information as needed to replicas. Replicas use this information to update their local task databases, and to generate new versions to send to the server. -Replicas generate a new version to transmit changes made locally to the server. +Replicas generate a new version to transmit local changes to the server. The changes are represented as a sequence of operations with the state resulting from the final operation corresponding to the version. -In order to keep the gap-free monotonic numbering, the server will only accept a proposed version from a replica if its number is one greater that the latest version on the server. +In order to keep the versions in a single sequence, the server will only accept a proposed version from a replica if its parent version matches the latest version on the server. -In the non-conflict case (such as with a single replica), then, a replica's synchronization process involves gathering up the operations it has accumulated since its last synchronization; bundling those operations into version N+1; and sending that version to the server. +In the non-conflict case (such as with a single replica), then, a replica's synchronization process involves gathering up the operations it has accumulated since its last synchronization; bundling those operations into a version; and sending that version to the server. + +### Replica Invariant + +The replica's [storage](./storage.md) contains the current state in `tasks`, the as-yet un-synchronized operations in `operations`, and the last version at which synchronization occurred in `base_version`. + +The replica's un-synchronized operations are already reflected in its local `tasks`, so the following invariant holds: + +> Applying `operations` to the set of tasks at `base_version` gives a set of tasks identical +> to `tasks`. ### Transformation When the latest version on the server contains operations that are not present in the replica, then the states have diverged. -For example (with lower-case letters designating operations): +For example: ```text o -- version N @@ -67,6 +75,8 @@ For example (with lower-case letters designating operations): o -- version N+1 ``` +(diagram notation: `o` designates a state, lower-case letters designate operations, and versions are presented as if they were numbered sequentially) + In this situation, the replica must "rebase" the local operations onto the latest version from the server and try again. This process is performed using operational transformation (OT). The result of this transformation is a sequence of operations based on the latest version, and a sequence of operations the replica can apply to its local task database to reach the same state @@ -96,25 +106,23 @@ Careful selection of the operations and the transformation function ensure this. See the comments in the source code for the details of how this transformation process is implemented. -## Replica Implementation +## Synchronization Process -The replica's [storage](./storage.md) contains the current state in `tasks`, the as-yet un-synchronized operations in `operations`, and the last version at which synchronization occurred in `base_version`. - -To perform a synchronization, the replica first requests any versions greater than `base_version` from the server, and rebases any local operations on top of those new versions, updating `base_version`. +To perform a synchronization, the replica first requests the child version of `base_version` from the server (`get_child_version`). +It applies that version to its local `tasks`, rebases its local `operations` as described above, and updates `base_version`. +The replica repeats this process until the server indicates no additional child versions exist. If there are no un-synchronized local operations, the process is complete. -Otherwise, the replica creates a new version containing those local operations and uploads that to the server. -In most cases, this will succeed, but if another replica has created a new version in the interim, then the new version will conflict with that other replica's new version. + +Otherwise, the replica creates a new version containing its local operations, giving its `base_version` as the parent version, and transmits that to the server (`add_version`). +In most cases, this will succeed, but if another replica has created a new version in the interim, then the new version will conflict with that other replica's new version and the server will respond with the new expected parent version. In this case, the process repeats. +If the server indicates a conflict twice with the same expected base version, that is an indication that the replica has diverged (something serious has gone wrong). -The replica's un-synchronized operations are already reflected in `tasks`, so the following invariant holds: +## Servers -> Applying `operations` to the set of tasks at `base_version` gives a set of tasks identical -> to `tasks`. +A replica depends on periodic synchronization for performant operation. +Without synchronization, its list of pending operations would grow indefinitely, and tasks could never be expired. +So all replicas, even "singleton" replicas which do not replicate task data with any other replica, must synchronize periodically. -## Server Implementation - -The server implementation is simple. -It supports fetching versions keyed by number, and adding a new version. -In adding a new version, the version number must be one greater than the greatest existing version. - -Critically, the server operates on nothing more than numbered, opaque blobs of data. +TaskChampion provides a `LocalServer` for this purpose. +It implements the `get_child_version` and `add_version` operations as described, storing data on-disk locally, all within the `task` binary.