Merge pull request #59 from djmitche/issue8

Refactor synchronization
This commit is contained in:
Dustin J. Mitchell
2020-11-25 20:12:46 -05:00
committed by GitHub
20 changed files with 844 additions and 257 deletions

122
Cargo.lock generated
View File

@@ -145,6 +145,12 @@ dependencies = [
"serde",
]
[[package]]
name = "bumpalo"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820"
[[package]]
name = "byteorder"
version = "1.3.4"
@@ -350,6 +356,15 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6"
[[package]]
name = "js-sys"
version = "0.3.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca059e81d9486668f12d455a4ea6daa600bd408134cd17e3d3fb5a32d1f016f8"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kv"
version = "0.10.0"
@@ -398,6 +413,15 @@ dependencies = [
"pkg-config",
]
[[package]]
name = "log"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b"
dependencies = [
"cfg-if 0.1.10",
]
[[package]]
name = "memchr"
version = "2.3.4"
@@ -445,6 +469,12 @@ version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d3b63360ec3cb337817c2dbd47ab4a0f170d285d8e5a2064600f3def1402397"
[[package]]
name = "once_cell"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
[[package]]
name = "pkg-config"
version = "0.3.19"
@@ -766,6 +796,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "ring"
version = "0.16.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5911690c9b773bab7e657471afc207f3827b249a657241327e3544d79bcabdd"
dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"untrusted",
"web-sys",
"winapi",
]
[[package]]
name = "rmp"
version = "0.8.9"
@@ -854,6 +899,12 @@ dependencies = [
"serde",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "strsim"
version = "0.8.0"
@@ -896,6 +947,7 @@ dependencies = [
"kv",
"lmdb-rkv",
"proptest",
"ring",
"serde",
"serde_json",
"tempdir",
@@ -1025,6 +1077,12 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "uuid"
version = "0.8.1"
@@ -1062,6 +1120,70 @@ version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasm-bindgen"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42"
dependencies = [
"cfg-if 0.1.10",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f22b422e2a757c35a73774860af8e112bff612ce6cb604224e8e47641a9e4f68"
dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b13312a745c08c469f0b292dd2fcd6411dba5f7160f593da6ef69b64e407038"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f249f06ef7ee334cc3b8ff031bfc11ec99d00f34d86da7498396dc1e3b1498fe"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307"
[[package]]
name = "web-sys"
version = "0.3.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bf6ef87ad7ae8008e15a355ce696bed26012b7caa21605188cfd8214ab51e2d"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@@ -1,7 +1,5 @@
use clap::{App, ArgMatches};
use failure::{Error, Fallible};
use std::path::Path;
use taskchampion::{taskstorage, Replica};
#[macro_use]
mod macros;
@@ -12,6 +10,7 @@ mod gc;
mod info;
mod list;
mod pending;
mod sync;
/// Get a list of all subcommands in this crate
pub(crate) fn subcommands() -> Vec<Box<dyn SubCommand>> {
@@ -21,6 +20,7 @@ pub(crate) fn subcommands() -> Vec<Box<dyn SubCommand>> {
list::cmd(),
pending::cmd(),
info::cmd(),
sync::cmd(),
]
}
@@ -54,24 +54,4 @@ pub(crate) trait SubCommandInvocation: std::fmt::Debug {
fn as_any(&self) -> &dyn std::any::Any;
}
/// A command invocation contains all of the necessary regarding a single invocation of the CLI.
#[derive(Debug)]
pub struct CommandInvocation {
pub(crate) subcommand: Box<dyn SubCommandInvocation>,
}
impl CommandInvocation {
pub(crate) fn new(subcommand: Box<dyn SubCommandInvocation>) -> Self {
Self { subcommand }
}
pub fn run(self) -> Fallible<()> {
self.subcommand.run(&self)
}
fn get_replica(&self) -> Replica {
Replica::new(Box::new(
taskstorage::KVStorage::new(Path::new("/tmp/tasks")).unwrap(),
))
}
}
pub use shared::CommandInvocation;

View File

@@ -1,6 +1,7 @@
use clap::Arg;
use failure::{format_err, Fallible};
use taskchampion::{Replica, Task, Uuid};
use std::path::Path;
use taskchampion::{server, taskstorage, Replica, Task, Uuid};
pub(super) fn task_arg<'a>() -> Arg<'a, 'a> {
Arg::with_name("task")
@@ -26,3 +27,33 @@ pub(super) fn get_task<S: AsRef<str>>(replica: &mut Replica, task_arg: S) -> Fal
Err(format_err!("Cannot interpret {:?} as a task", task_arg))
}
/// A command invocation contains all of the necessary regarding a single invocation of the CLI.
#[derive(Debug)]
pub struct CommandInvocation {
pub(crate) subcommand: Box<dyn super::SubCommandInvocation>,
}
impl CommandInvocation {
pub(crate) fn new(subcommand: Box<dyn super::SubCommandInvocation>) -> Self {
Self { subcommand }
}
pub fn run(self) -> Fallible<()> {
self.subcommand.run(&self)
}
// -- utilities for command invocations
pub(super) fn get_replica(&self) -> Replica {
Replica::new(Box::new(
taskstorage::KVStorage::new(Path::new("/tmp/tasks")).unwrap(),
))
}
pub(super) fn get_server(&self) -> Fallible<impl server::Server> {
Ok(server::LocalServer::new(Path::new(
"/tmp/task-sync-server",
))?)
}
}

39
cli/src/cmd/sync.rs Normal file
View File

@@ -0,0 +1,39 @@
use clap::{App, ArgMatches, SubCommand as ClapSubCommand};
use failure::Fallible;
use crate::cmd::{ArgMatchResult, CommandInvocation};
#[derive(Debug)]
struct Invocation {}
define_subcommand! {
fn decorate_app<'a>(&self, app: App<'a, 'a>) -> App<'a, 'a> {
app.subcommand(ClapSubCommand::with_name("sync").about("sync with the server"))
}
fn arg_match<'a>(&self, matches: &ArgMatches<'a>) -> ArgMatchResult {
match matches.subcommand() {
("sync", _) => ArgMatchResult::Ok(Box::new(Invocation {})),
_ => ArgMatchResult::None,
}
}
}
subcommand_invocation! {
fn run(&self, command: &CommandInvocation) -> Fallible<()> {
let mut replica = command.get_replica();
let mut server = command.get_server()?;
replica.sync(&mut server)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn parse_command() {
with_subcommand_invocation!(vec!["task", "sync"], |_inv| {});
}
}

View File

@@ -36,24 +36,32 @@ This process is analogous (vaguely) to rebasing a sequence of Git commits.
### Versions
Occasionally, database states are named with an integer, called a version.
The system as a whole (all replicas) constructs a monotonic sequence of versions and the operations that separate each version from the next.
No gaps are allowed in the version numbering.
Version 0 is implicitly the empty database.
Occasionally, database states are given a name (that takes the form of a UUID).
The system as a whole (all replicas) constructs a branch-free sequence of versions and the operations that separate each version from the next.
The version with the nil UUID is implicitly the empty database.
The server stores the operations to change a state from a version N to a version N+1, and provides that information as needed to replicas.
The server stores the operations to change a state from a "parent" version to a "child" version, and provides that information as needed to replicas.
Replicas use this information to update their local task databases, and to generate new versions to send to the server.
Replicas generate a new version to transmit changes made locally to the server.
Replicas generate a new version to transmit local changes to the server.
The changes are represented as a sequence of operations with the state resulting from the final operation corresponding to the version.
In order to keep the gap-free monotonic numbering, the server will only accept a proposed version from a replica if its number is one greater that the latest version on the server.
In order to keep the versions in a single sequence, the server will only accept a proposed version from a replica if its parent version matches the latest version on the server.
In the non-conflict case (such as with a single replica), then, a replica's synchronization process involves gathering up the operations it has accumulated since its last synchronization; bundling those operations into version N+1; and sending that version to the server.
In the non-conflict case (such as with a single replica), then, a replica's synchronization process involves gathering up the operations it has accumulated since its last synchronization; bundling those operations into a version; and sending that version to the server.
### Replica Invariant
The replica's [storage](./storage.md) contains the current state in `tasks`, the as-yet un-synchronized operations in `operations`, and the last version at which synchronization occurred in `base_version`.
The replica's un-synchronized operations are already reflected in its local `tasks`, so the following invariant holds:
> Applying `operations` to the set of tasks at `base_version` gives a set of tasks identical
> to `tasks`.
### Transformation
When the latest version on the server contains operations that are not present in the replica, then the states have diverged.
For example (with lower-case letters designating operations):
For example:
```text
o -- version N
@@ -67,6 +75,8 @@ For example (with lower-case letters designating operations):
o -- version N+1
```
(diagram notation: `o` designates a state, lower-case letters designate operations, and versions are presented as if they were numbered sequentially)
In this situation, the replica must "rebase" the local operations onto the latest version from the server and try again.
This process is performed using operational transformation (OT).
The result of this transformation is a sequence of operations based on the latest version, and a sequence of operations the replica can apply to its local task database to reach the same state
@@ -96,25 +106,23 @@ Careful selection of the operations and the transformation function ensure this.
See the comments in the source code for the details of how this transformation process is implemented.
## Replica Implementation
## Synchronization Process
The replica's [storage](./storage.md) contains the current state in `tasks`, the as-yet un-synchronized operations in `operations`, and the last version at which synchronization occurred in `base_version`.
To perform a synchronization, the replica first requests any versions greater than `base_version` from the server, and rebases any local operations on top of those new versions, updating `base_version`.
To perform a synchronization, the replica first requests the child version of `base_version` from the server (`get_child_version`).
It applies that version to its local `tasks`, rebases its local `operations` as described above, and updates `base_version`.
The replica repeats this process until the server indicates no additional child versions exist.
If there are no un-synchronized local operations, the process is complete.
Otherwise, the replica creates a new version containing those local operations and uploads that to the server.
In most cases, this will succeed, but if another replica has created a new version in the interim, then the new version will conflict with that other replica's new version.
Otherwise, the replica creates a new version containing its local operations, giving its `base_version` as the parent version, and transmits that to the server (`add_version`).
In most cases, this will succeed, but if another replica has created a new version in the interim, then the new version will conflict with that other replica's new version and the server will respond with the new expected parent version.
In this case, the process repeats.
If the server indicates a conflict twice with the same expected base version, that is an indication that the replica has diverged (something serious has gone wrong).
The replica's un-synchronized operations are already reflected in `tasks`, so the following invariant holds:
## Servers
> Applying `operations` to the set of tasks at `base_version` gives a set of tasks identical
> to `tasks`.
A replica depends on periodic synchronization for performant operation.
Without synchronization, its list of pending operations would grow indefinitely, and tasks could never be expired.
So all replicas, even "singleton" replicas which do not replicate task data with any other replica, must synchronize periodically.
## Server Implementation
The server implementation is simple.
It supports fetching versions keyed by number, and adding a new version.
In adding a new version, the version number must be one greater than the greatest existing version.
Critically, the server operates on nothing more than numbered, opaque blobs of data.
TaskChampion provides a `LocalServer` for this purpose.
It implements the `get_child_version` and `add_version` operations as described, storing data on-disk locally, all within the `task` binary.

View File

@@ -12,6 +12,7 @@ chrono = { version = "0.4.10", features = ["serde"] }
failure = {version = "0.1.5", features = ["derive"] }
kv = {version = "0.10.0", features = ["msgpack-value"]}
lmdb-rkv = {version = "0.12.3"}
ring = { version = "0.16.17", features = ["std"] }
[dev-dependencies]
proptest = "0.9.4"

View File

@@ -29,6 +29,7 @@ pub mod server;
mod task;
mod taskdb;
pub mod taskstorage;
mod utils;
pub use replica::Replica;
pub use task::Priority;
@@ -37,6 +38,3 @@ pub use task::{Task, TaskMut};
/// Re-exported type from the `uuid` crate, for ease of compatibility for consumers of this crate.
pub use uuid::Uuid;
#[cfg(test)]
pub(crate) mod testing;

View File

@@ -145,8 +145,8 @@ impl Replica {
}
/// Synchronize this replica against the given server.
pub fn sync(&mut self, username: &str, server: &mut dyn Server) -> Fallible<()> {
self.taskdb.sync(username, server)
pub fn sync(&mut self, server: &mut dyn Server) -> Fallible<()> {
self.taskdb.sync(server)
}
/// Perform "garbage collection" on this replica. In particular, this renumbers the working

View File

@@ -1,20 +0,0 @@
pub type Blob = Vec<u8>;
pub enum VersionAdd {
// OK, version added
Ok,
// Rejected, must be based on the the given version
ExpectedVersion(u64),
}
/// A value implementing this trait can act as a server against which a replica can sync.
pub trait Server {
/// Get a vector of all versions after `since_version`
fn get_versions(&self, username: &str, since_version: u64) -> Vec<Blob>;
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> VersionAdd;
fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob);
}

View File

@@ -0,0 +1,238 @@
use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NO_VERSION_ID,
};
use crate::utils::Key;
use failure::Fallible;
use kv::msgpack::Msgpack;
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
use serde::{Deserialize, Serialize};
use std::path::Path;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Debug)]
struct Version {
version_id: VersionId,
parent_version_id: VersionId,
history_segment: HistorySegment,
}
pub struct LocalServer<'t> {
store: Store,
// NOTE: indexed by parent_version_id!
versions_bucket: Bucket<'t, Key, ValueBuf<Msgpack<Version>>>,
latest_version_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
}
impl<'t> LocalServer<'t> {
/// A test server has no notion of clients, signatures, encryption, etc.
pub fn new(directory: &Path) -> Fallible<LocalServer> {
let mut config = Config::default(directory);
config.bucket("versions", None);
config.bucket("numbers", None);
config.bucket("latest_version", None);
config.bucket("operations", None);
config.bucket("working_set", None);
let store = Store::new(config)?;
// versions are stored indexed by VersionId (uuid)
let versions_bucket = store.bucket::<Key, ValueBuf<Msgpack<Version>>>(Some("versions"))?;
// this bucket contains the latest version at key 0
let latest_version_bucket =
store.int_bucket::<ValueBuf<Msgpack<Uuid>>>(Some("latest_version"))?;
Ok(LocalServer {
store,
versions_bucket,
latest_version_bucket,
})
}
fn get_latest_version_id(&mut self) -> Fallible<VersionId> {
let txn = self.store.read_txn()?;
let base_version = match txn.get(&self.latest_version_bucket, 0.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(NO_VERSION_ID),
Err(e) => return Err(e.into()),
}
.inner()?
.to_serde();
Ok(base_version as VersionId)
}
fn set_latest_version_id(&mut self, version_id: VersionId) -> Fallible<()> {
let mut txn = self.store.write_txn()?;
txn.set(
&self.latest_version_bucket,
0.into(),
Msgpack::to_value_buf(version_id as Uuid)?,
)?;
txn.commit()?;
Ok(())
}
fn get_version_by_parent_version_id(
&mut self,
parent_version_id: VersionId,
) -> Fallible<Option<Version>> {
let txn = self.store.read_txn()?;
let version = match txn.get(&self.versions_bucket, parent_version_id.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(None),
Err(e) => return Err(e.into()),
}
.inner()?
.to_serde();
Ok(Some(version))
}
fn add_version_by_parent_version_id(&mut self, version: Version) -> Fallible<()> {
let mut txn = self.store.write_txn()?;
txn.set(
&self.versions_bucket,
version.parent_version_id.into(),
Msgpack::to_value_buf(version)?,
)?;
txn.commit()?;
Ok(())
}
}
impl<'t> Server for LocalServer<'t> {
// TODO: better transaction isolation for add_version (gets and sets should be in the same
// transaction)
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
fn add_version(
&mut self,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> Fallible<AddVersionResult> {
// no client lookup
// no signature validation
// check the parent_version_id for linearity
let latest_version_id = self.get_latest_version_id()?;
if latest_version_id != NO_VERSION_ID && parent_version_id != latest_version_id {
return Ok(AddVersionResult::ExpectedParentVersion(latest_version_id));
}
// invent a new ID for this version
let version_id = Uuid::new_v4();
self.add_version_by_parent_version_id(Version {
version_id,
parent_version_id,
history_segment,
})?;
self.set_latest_version_id(version_id)?;
Ok(AddVersionResult::Ok(version_id))
}
/// Get a vector of all versions after `since_version`
fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible<GetVersionResult> {
if let Some(version) = self.get_version_by_parent_version_id(parent_version_id)? {
Ok(GetVersionResult::Version {
version_id: version.version_id,
parent_version_id: version.parent_version_id,
history_segment: version.history_segment.clone(),
})
} else {
Ok(GetVersionResult::NoSuchVersion)
}
}
}
#[cfg(test)]
mod test {
use super::*;
use failure::Fallible;
use tempdir::TempDir;
#[test]
fn test_empty() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut server = LocalServer::new(&tmp_dir.path())?;
let child_version = server.get_child_version(NO_VERSION_ID)?;
assert_eq!(child_version, GetVersionResult::NoSuchVersion);
Ok(())
}
#[test]
fn test_add_zero_base() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut server = LocalServer::new(&tmp_dir.path())?;
let history = b"1234".to_vec();
match server.add_version(NO_VERSION_ID, history.clone())? {
AddVersionResult::ExpectedParentVersion(_) => {
panic!("should have accepted the version")
}
AddVersionResult::Ok(version_id) => {
let new_version = server.get_child_version(NO_VERSION_ID)?;
assert_eq!(
new_version,
GetVersionResult::Version {
version_id,
parent_version_id: NO_VERSION_ID,
history_segment: history,
}
);
}
}
Ok(())
}
#[test]
fn test_add_nonzero_base() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut server = LocalServer::new(&tmp_dir.path())?;
let history = b"1234".to_vec();
let parent_version_id = Uuid::new_v4() as VersionId;
// This is OK because the server has no latest_version_id yet
match server.add_version(parent_version_id, history.clone())? {
AddVersionResult::ExpectedParentVersion(_) => {
panic!("should have accepted the version")
}
AddVersionResult::Ok(version_id) => {
let new_version = server.get_child_version(parent_version_id)?;
assert_eq!(
new_version,
GetVersionResult::Version {
version_id,
parent_version_id,
history_segment: history,
}
);
}
}
Ok(())
}
#[test]
fn test_add_nonzero_base_forbidden() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut server = LocalServer::new(&tmp_dir.path())?;
let history = b"1234".to_vec();
let parent_version_id = Uuid::new_v4() as VersionId;
// add a version
if let AddVersionResult::ExpectedParentVersion(_) =
server.add_version(parent_version_id, history.clone())?
{
panic!("should have accepted the version")
}
// then add another, not based on that one
if let AddVersionResult::Ok(_) = server.add_version(parent_version_id, history.clone())? {
panic!("should not have accepted the version")
}
Ok(())
}
}

View File

@@ -0,0 +1,9 @@
#[cfg(test)]
pub(crate) mod test;
mod local;
mod signing;
mod types;
pub use local::LocalServer;
pub use types::*;

View File

@@ -0,0 +1,87 @@
#![allow(dead_code)] // TODO: temporary until this module is used
//! This is a general wrapper around an asymmetric-key signature system.
use failure::Fallible;
use ring::{
rand,
signature::{Ed25519KeyPair, KeyPair, Signature, UnparsedPublicKey, ED25519},
};
type PublicKey = Vec<u8>;
type PrivateKey = Vec<u8>;
/// Generate a pair of (public, private) key material (in fact the private key is a keypair)
pub fn new_keypair() -> Fallible<(PublicKey, PrivateKey)> {
let rng = rand::SystemRandom::new();
let key_pkcs8 = Ed25519KeyPair::generate_pkcs8(&rng)?;
let key_pair = Ed25519KeyPair::from_pkcs8(key_pkcs8.as_ref())?;
let pub_key = key_pair.public_key();
Ok((
pub_key.as_ref().to_vec() as PublicKey,
key_pkcs8.as_ref().to_vec() as PrivateKey,
))
}
pub struct Signer {
key_pair: Ed25519KeyPair,
}
impl Signer {
/// Create a new signer, given a pkcs#8 v2 document containing the keypair.
fn new(priv_key: PrivateKey) -> Fallible<Self> {
Ok(Self {
key_pair: Ed25519KeyPair::from_pkcs8(&priv_key)?,
})
}
pub fn sign<B: AsRef<[u8]>>(&self, message: B) -> Fallible<Signature> {
Ok(self.key_pair.sign(message.as_ref()))
}
}
pub struct Verifier {
pub_key: PublicKey,
}
impl Verifier {
fn new(pub_key: PublicKey) -> Fallible<Self> {
Ok(Self { pub_key })
}
pub fn verify<B1: AsRef<[u8]>, B2: AsRef<[u8]>>(
&self,
message: B1,
signature: B2,
) -> Fallible<()> {
let pub_key = UnparsedPublicKey::new(&ED25519, &self.pub_key);
Ok(pub_key.verify(message.as_ref(), signature.as_ref())?)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_verify_ok() -> Fallible<()> {
let (public, private) = new_keypair()?;
let signer = Signer::new(private)?;
let verifier = Verifier::new(public)?;
let message = b"Hello, world";
let signature = signer.sign(message)?;
verifier.verify(message, signature)
}
#[test]
fn test_verify_bad_message() -> Fallible<()> {
let (public, private) = new_keypair()?;
let signer = Signer::new(private)?;
let verifier = Verifier::new(public)?;
let message = b"Hello, world";
let signature = signer.sign(message)?;
assert!(verifier.verify(b"Hello, cruel world", signature).is_err());
Ok(())
}
}

View File

@@ -0,0 +1,78 @@
use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NO_VERSION_ID,
};
use failure::Fallible;
use std::collections::HashMap;
use uuid::Uuid;
struct Version {
version_id: VersionId,
parent_version_id: VersionId,
history_segment: HistorySegment,
}
pub(crate) struct TestServer {
latest_version_id: VersionId,
// NOTE: indexed by parent_version_id!
versions: HashMap<VersionId, Version>,
}
impl TestServer {
/// A test server has no notion of clients, signatures, encryption, etc.
pub fn new() -> TestServer {
TestServer {
latest_version_id: NO_VERSION_ID,
versions: HashMap::new(),
}
}
}
impl Server for TestServer {
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
fn add_version(
&mut self,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> Fallible<AddVersionResult> {
// no client lookup
// no signature validation
// check the parent_version_id for linearity
if self.latest_version_id != NO_VERSION_ID {
if parent_version_id != self.latest_version_id {
return Ok(AddVersionResult::ExpectedParentVersion(
self.latest_version_id,
));
}
}
// invent a new ID for this version
let version_id = Uuid::new_v4();
self.versions.insert(
parent_version_id,
Version {
version_id,
parent_version_id,
history_segment,
},
);
self.latest_version_id = version_id;
Ok(AddVersionResult::Ok(version_id))
}
/// Get a vector of all versions after `since_version`
fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible<GetVersionResult> {
if let Some(version) = self.versions.get(&parent_version_id) {
Ok(GetVersionResult::Version {
version_id: version.version_id,
parent_version_id: version.parent_version_id,
history_segment: version.history_segment.clone(),
})
} else {
Ok(GetVersionResult::NoSuchVersion)
}
}
}

View File

@@ -0,0 +1,48 @@
use failure::Fallible;
use uuid::Uuid;
/// Versions are referred to with sha2 hashes.
pub type VersionId = Uuid;
/// The distinguished value for "no version"
pub const NO_VERSION_ID: VersionId = Uuid::nil();
/// A segment in the history of this task database, in the form of a sequence of operations. This
/// data is pre-encoded, and from the protocol level appears as a sequence of bytes.
pub type HistorySegment = Vec<u8>;
/// VersionAdd is the response type from [`crate:server::Server::add_version`].
#[derive(Debug, PartialEq)]
pub enum AddVersionResult {
/// OK, version added with the given ID
Ok(VersionId),
/// Rejected; expected a version with the given parent version
ExpectedParentVersion(VersionId),
}
/// A version as downloaded from the server
#[derive(Debug, PartialEq)]
pub enum GetVersionResult {
/// No such version exists
NoSuchVersion,
/// The requested version
Version {
version_id: VersionId,
parent_version_id: VersionId,
history_segment: HistorySegment,
},
}
/// A value implementing this trait can act as a server against which a replica can sync.
pub trait Server {
/// Add a new version.
fn add_version(
&mut self,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> Fallible<AddVersionResult>;
/// Get the version with the given parent VersionId
fn get_child_version(&mut self, parent_version_id: VersionId) -> Fallible<GetVersionResult>;
}

View File

@@ -1,7 +1,7 @@
use crate::errors::Error;
use crate::server::{Server, VersionAdd};
use crate::server::{AddVersionResult, GetVersionResult, Server};
use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn};
use failure::Fallible;
use failure::{format_err, Fallible};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::str;
@@ -13,7 +13,6 @@ pub struct TaskDB {
#[derive(Serialize, Deserialize, Debug)]
struct Version {
version: u64,
operations: Vec<Operation>,
}
@@ -165,42 +164,71 @@ impl TaskDB {
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub fn sync(&mut self, username: &str, server: &mut dyn Server) -> Fallible<()> {
pub fn sync(&mut self, server: &mut dyn Server) -> Fallible<()> {
let mut txn = self.storage.txn()?;
// retry synchronizing until the server accepts our version (this allows for races between
// replicas trying to sync to the same server)
// replicas trying to sync to the same server). If the server insists on the same base
// version twice, then we have diverged.
let mut requested_parent_version_id = None;
loop {
// first pull changes and "rebase" on top of them
let new_versions = server.get_versions(username, txn.base_version()?);
for version_blob in new_versions {
let version_str = str::from_utf8(&version_blob).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();
assert_eq!(version.version, txn.base_version()? + 1);
println!("applying version {:?} from server", version.version);
let mut base_version_id = txn.base_version()?;
TaskDB::apply_version(txn.as_mut(), version)?;
// first pull changes and "rebase" on top of them
loop {
if let GetVersionResult::Version {
version_id,
history_segment,
..
} = server.get_child_version(base_version_id)?
{
let version_str = str::from_utf8(&history_segment).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();
println!("applying version {:?} from server", version_id);
// apply this verison and update base_version in storage
TaskDB::apply_version(txn.as_mut(), version)?;
txn.set_base_version(version_id)?;
base_version_id = version_id;
} else {
println!("no child versions of {:?}", base_version_id);
// at the moment, no more child versions, so we can try adding our own
break;
}
}
let operations: Vec<Operation> = txn.operations()?.to_vec();
if operations.is_empty() {
println!("no changes to push to server");
// nothing to sync back to the server..
break;
}
// now make a version of our local changes and push those
let new_version = Version {
version: txn.base_version()? + 1,
operations,
};
let new_version_str = serde_json::to_string(&new_version).unwrap();
println!("sending version {:?} to server", new_version.version);
if let VersionAdd::Ok =
server.add_version(username, new_version.version, new_version_str.into())
{
txn.set_base_version(new_version.version)?;
txn.set_operations(vec![])?;
break;
let new_version = Version { operations };
let history_segment = serde_json::to_string(&new_version).unwrap().into();
println!("sending new version to server");
match server.add_version(base_version_id, history_segment)? {
AddVersionResult::Ok(new_version_id) => {
println!("version {:?} received by server", new_version_id);
txn.set_base_version(new_version_id)?;
txn.set_operations(vec![])?;
break;
}
AddVersionResult::ExpectedParentVersion(parent_version_id) => {
println!(
"new version rejected; must be based on {:?}",
parent_version_id
);
if let Some(requested) = requested_parent_version_id {
if parent_version_id == requested {
return Err(format_err!(
"Server's task history has diverged from this replica"
));
}
}
requested_parent_version_id = Some(parent_version_id);
}
}
}
@@ -256,7 +284,6 @@ impl TaskDB {
}
local_operations = new_local_ops;
}
txn.set_base_version(version.version)?;
txn.set_operations(local_operations)?;
Ok(())
}
@@ -296,8 +323,8 @@ impl TaskDB {
#[cfg(test)]
mod tests {
use super::*;
use crate::server::test::TestServer;
use crate::taskstorage::InMemoryStorage;
use crate::testing::testserver::TestServer;
use chrono::Utc;
use proptest::prelude::*;
use std::collections::HashMap;
@@ -518,10 +545,10 @@ mod tests {
let mut server = TestServer::new();
let mut db1 = newdb();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
let mut db2 = newdb();
db2.sync("me", &mut server).unwrap();
db2.sync(&mut server).unwrap();
// make some changes in parallel to db1 and db2..
let uuid1 = Uuid::new_v4();
@@ -545,9 +572,9 @@ mod tests {
.unwrap();
// and synchronize those around
db1.sync("me", &mut server).unwrap();
db2.sync("me", &mut server).unwrap();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// now make updates to the same task on both sides
@@ -567,9 +594,9 @@ mod tests {
.unwrap();
// and synchronize those around
db1.sync("me", &mut server).unwrap();
db2.sync("me", &mut server).unwrap();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
@@ -578,10 +605,10 @@ mod tests {
let mut server = TestServer::new();
let mut db1 = newdb();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
let mut db2 = newdb();
db2.sync("me", &mut server).unwrap();
db2.sync(&mut server).unwrap();
// create and update a task..
let uuid = Uuid::new_v4();
@@ -595,9 +622,9 @@ mod tests {
.unwrap();
// and synchronize those around
db1.sync("me", &mut server).unwrap();
db2.sync("me", &mut server).unwrap();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
// delete and re-create the task on db1
@@ -620,9 +647,9 @@ mod tests {
})
.unwrap();
db1.sync("me", &mut server).unwrap();
db2.sync("me", &mut server).unwrap();
db1.sync("me", &mut server).unwrap();
db1.sync(&mut server).unwrap();
db2.sync(&mut server).unwrap();
db1.sync(&mut server).unwrap();
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
@@ -678,7 +705,7 @@ mod tests {
println!(" {:?} (ignored)", e);
}
},
Action::Sync => db.sync("me", &mut server).unwrap(),
Action::Sync => db.sync(&mut server).unwrap(),
}
}

View File

@@ -1,6 +1,8 @@
#![allow(clippy::new_without_default)]
use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn};
use crate::taskstorage::{
Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION,
};
use failure::Fallible;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
@@ -9,7 +11,7 @@ use uuid::Uuid;
#[derive(PartialEq, Debug, Clone)]
struct Data {
tasks: HashMap<Uuid, TaskMap>,
base_version: u64,
base_version: VersionId,
operations: Vec<Operation>,
working_set: Vec<Option<Uuid>>,
}
@@ -79,11 +81,11 @@ impl<'t> TaskStorageTxn for Txn<'t> {
Ok(self.data_ref().tasks.keys().copied().collect())
}
fn base_version(&mut self) -> Fallible<u64> {
Ok(self.data_ref().base_version)
fn base_version(&mut self) -> Fallible<VersionId> {
Ok(self.data_ref().base_version.clone())
}
fn set_base_version(&mut self, version: u64) -> Fallible<()> {
fn set_base_version(&mut self, version: VersionId) -> Fallible<()> {
self.mut_data_ref().base_version = version;
Ok(())
}
@@ -138,7 +140,7 @@ impl InMemoryStorage {
InMemoryStorage {
data: Data {
tasks: HashMap::new(),
base_version: 0,
base_version: DEFAULT_BASE_VERSION.into(),
operations: vec![],
working_set: vec![None],
},

View File

@@ -1,53 +1,19 @@
use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn};
use crate::taskstorage::{
Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION,
};
use crate::utils::Key;
use failure::Fallible;
use kv::msgpack::Msgpack;
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
use std::convert::TryInto;
use std::path::Path;
use uuid::Uuid;
/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form
/// of a UUID.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
struct Key(uuid::Bytes);
impl From<&[u8]> for Key {
fn from(bytes: &[u8]) -> Key {
Key(bytes.try_into().unwrap())
}
}
impl From<&Uuid> for Key {
fn from(uuid: &Uuid) -> Key {
let key = Key(*uuid.as_bytes());
key
}
}
impl From<Uuid> for Key {
fn from(uuid: Uuid) -> Key {
let key = Key(*uuid.as_bytes());
key
}
}
impl From<Key> for Uuid {
fn from(key: Key) -> Uuid {
Uuid::from_bytes(key.0)
}
}
impl AsRef<[u8]> for Key {
fn as_ref(&self) -> &[u8] {
&self.0[..]
}
}
/// KVStorage is an on-disk storage backend which uses LMDB via the `kv` crate.
pub struct KVStorage<'t> {
store: Store,
tasks_bucket: Bucket<'t, Key, ValueBuf<Msgpack<TaskMap>>>,
numbers_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<u64>>>,
uuids_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
operations_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>>,
working_set_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
}
@@ -61,6 +27,7 @@ impl<'t> KVStorage<'t> {
let mut config = Config::default(directory);
config.bucket("tasks", None);
config.bucket("numbers", None);
config.bucket("uuids", None);
config.bucket("operations", None);
config.bucket("working_set", None);
let store = Store::new(config)?;
@@ -71,6 +38,9 @@ impl<'t> KVStorage<'t> {
// this bucket contains various u64s, indexed by constants above
let numbers_bucket = store.int_bucket::<ValueBuf<Msgpack<u64>>>(Some("numbers"))?;
// this bucket contains various Uuids, indexed by constants above
let uuids_bucket = store.int_bucket::<ValueBuf<Msgpack<Uuid>>>(Some("uuids"))?;
// this bucket contains operations, numbered consecutively; the NEXT_OPERATION number gives
// the index of the next operation to insert
let operations_bucket =
@@ -85,6 +55,7 @@ impl<'t> KVStorage<'t> {
store,
tasks_bucket,
numbers_bucket,
uuids_bucket,
operations_bucket,
working_set_bucket,
})
@@ -122,6 +93,9 @@ impl<'t> Txn<'t> {
fn numbers_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<u64>>> {
&self.storage.numbers_bucket
}
fn uuids_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>> {
&self.storage.uuids_bucket
}
fn operations_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>> {
&self.storage.operations_bucket
}
@@ -193,26 +167,26 @@ impl<'t> TaskStorageTxn for Txn<'t> {
.collect())
}
fn base_version(&mut self) -> Fallible<u64> {
let bucket = self.numbers_bucket();
fn base_version(&mut self) -> Fallible<VersionId> {
let bucket = self.uuids_bucket();
let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(0),
Err(Error::NotFound) => return Ok(DEFAULT_BASE_VERSION.into()),
Err(e) => return Err(e.into()),
}
.inner()?
.to_serde();
Ok(base_version)
Ok(base_version as VersionId)
}
fn set_base_version(&mut self, version: u64) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
fn set_base_version(&mut self, version: VersionId) -> Fallible<()> {
let uuids_bucket = self.uuids_bucket();
let kvtxn = self.kvtxn();
kvtxn.set(
numbers_bucket,
uuids_bucket,
BASE_VERSION.into(),
Msgpack::to_value_buf(version)?,
Msgpack::to_value_buf(version as Uuid)?,
)?;
Ok(())
}
@@ -528,7 +502,7 @@ mod test {
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
assert_eq!(txn.base_version()?, 0);
assert_eq!(txn.base_version()?, DEFAULT_BASE_VERSION);
}
Ok(())
}
@@ -537,14 +511,15 @@ mod test {
fn test_base_version_setting() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let u = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.set_base_version(3)?;
txn.set_base_version(u)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
assert_eq!(txn.base_version()?, 3);
assert_eq!(txn.base_version()?, u);
}
Ok(())
}

View File

@@ -23,6 +23,12 @@ fn taskmap_with(mut properties: Vec<(String, String)>) -> TaskMap {
rv
}
/// The type of VersionIds
pub use crate::server::VersionId;
/// The default for base_version.
pub(crate) const DEFAULT_BASE_VERSION: Uuid = crate::server::NO_VERSION_ID;
/// A TaskStorage transaction, in which storage operations are performed.
///
/// # Concurrency
@@ -58,10 +64,10 @@ pub trait TaskStorageTxn {
fn all_task_uuids(&mut self) -> Fallible<Vec<Uuid>>;
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&mut self) -> Fallible<u64>;
fn base_version(&mut self) -> Fallible<VersionId>;
/// Set the current base_version for this storage.
fn set_base_version(&mut self, version: u64) -> Fallible<()>;
fn set_base_version(&mut self, version: VersionId) -> Fallible<()>;
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)

View File

@@ -1,81 +0,0 @@
use crate::server::{Blob, Server, VersionAdd};
use std::collections::HashMap;
pub(crate) struct TestServer {
users: HashMap<String, User>,
}
struct User {
// versions, indexed at v-1
versions: Vec<Blob>,
snapshots: HashMap<u64, Blob>,
}
impl TestServer {
pub fn new() -> TestServer {
TestServer {
users: HashMap::new(),
}
}
fn get_user_mut(&mut self, username: &str) -> &mut User {
self.users
.entry(username.to_string())
.or_insert_with(User::new)
}
}
impl Server for TestServer {
/// Get a vector of all versions after `since_version`
fn get_versions(&self, username: &str, since_version: u64) -> Vec<Blob> {
self.users
.get(username)
.map(|user| user.get_versions(since_version))
.unwrap_or_else(|| vec![])
}
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> VersionAdd {
self.get_user_mut(username).add_version(version, blob)
}
fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob) {
self.get_user_mut(username).add_snapshot(version, blob);
}
}
impl User {
fn new() -> User {
User {
versions: vec![],
snapshots: HashMap::new(),
}
}
fn get_versions(&self, since_version: u64) -> Vec<Blob> {
let last_version = self.versions.len();
if last_version == since_version as usize {
return vec![];
}
self.versions[since_version as usize..last_version]
.iter()
.map(|r| r.clone())
.collect::<Vec<Blob>>()
}
fn add_version(&mut self, version: u64, blob: Blob) -> VersionAdd {
// of by one here: client wants to send version 1 first
let expected_version = self.versions.len() as u64 + 1;
if version != expected_version {
return VersionAdd::ExpectedVersion(expected_version);
}
self.versions.push(blob);
VersionAdd::Ok
}
fn add_snapshot(&mut self, version: u64, blob: Blob) {
self.snapshots.insert(version, blob);
}
}

39
taskchampion/src/utils.rs Normal file
View File

@@ -0,0 +1,39 @@
use std::convert::TryInto;
use uuid::Uuid;
/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form
/// of a UUID.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct Key(uuid::Bytes);
impl From<&[u8]> for Key {
fn from(bytes: &[u8]) -> Key {
Key(bytes.try_into().unwrap())
}
}
impl From<&Uuid> for Key {
fn from(uuid: &Uuid) -> Key {
let key = Key(*uuid.as_bytes());
key
}
}
impl From<Uuid> for Key {
fn from(uuid: Uuid) -> Key {
let key = Key(*uuid.as_bytes());
key
}
}
impl From<Key> for Uuid {
fn from(key: Key) -> Uuid {
Uuid::from_bytes(key.0)
}
}
impl AsRef<[u8]> for Key {
fn as_ref(&self) -> &[u8] {
&self.0[..]
}
}