reorganize into separate crates

- taskchampion -- core implementation of a replica
 - taskchampion-cli -- command-line interface
 - taskchampion-sync-server -- server implementation (not much yet!)
This commit is contained in:
Dustin J. Mitchell
2020-11-23 14:08:42 -05:00
parent 2830043e13
commit 779a331003
36 changed files with 349 additions and 154 deletions

View File

@@ -0,0 +1,10 @@
use failure::Fail;
#[derive(Debug, Fail, Eq, PartialEq, Clone)]
pub enum Error {
#[fail(display = "Task Database Error: {}", _0)]
DBError(String),
#[fail(display = "Replica Error: {}", _0)]
ReplicaError(String),
}

21
taskchampion/src/lib.rs Normal file
View File

@@ -0,0 +1,21 @@
// TODO: remove this eventually when there's an API
#![allow(dead_code)]
#![allow(unused_variables)]
#[macro_use]
extern crate failure;
mod errors;
mod operation;
mod replica;
pub mod server;
mod task;
mod taskdb;
pub mod taskstorage;
pub use operation::Operation;
pub use replica::Replica;
pub use task::Priority;
pub use task::Status;
pub use task::Task;
pub use taskdb::DB;

View File

@@ -0,0 +1,276 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// An Operation defines a single change to the task database
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
/// Create a new task.
///
/// On application, if the task already exists, the operation does nothing.
Create { uuid: Uuid },
/// Delete an existing task.
///
/// On application, if the task does not exist, the operation does nothing.
Delete { uuid: Uuid },
/// Update an existing task, setting the given property to the given value. If the value is
/// None, then the corresponding property is deleted.
///
/// If the given task does not exist, the operation does nothing.
Update {
uuid: Uuid,
property: String,
value: Option<String>,
timestamp: DateTime<Utc>,
},
}
use Operation::*;
impl Operation {
// Transform takes two operations A and B that happened concurrently and produces two
// operations A' and B' such that `apply(apply(S, A), B') = apply(apply(S, B), A')`. This
// function is used to serialize operations in a process similar to a Git "rebase".
//
// *
// / \
// op1 / \ op2
// / \
// * *
//
// this function "completes the diamond:
//
// * *
// \ /
// op2' \ / op1'
// \ /
// *
//
// such that applying op2' after op1 has the same effect as applying op1' after op2. This
// allows two different systems which have already applied op1 and op2, respectively, and thus
// reached different states, to return to the same state by applying op2' and op1',
// respectively.
pub fn transform(
operation1: Operation,
operation2: Operation,
) -> (Option<Operation>, Option<Operation>) {
match (&operation1, &operation2) {
// Two creations or deletions of the same uuid reach the same state, so there's no need
// for any further operations to bring the state together.
(&Create { uuid: uuid1 }, &Create { uuid: uuid2 }) if uuid1 == uuid2 => (None, None),
(&Delete { uuid: uuid1 }, &Delete { uuid: uuid2 }) if uuid1 == uuid2 => (None, None),
// Given a create and a delete of the same task, one of the operations is invalid: the
// create implies the task does not exist, but the delete implies it exists. Somewhat
// arbitrarily, we prefer the Create
(&Create { uuid: uuid1 }, &Delete { uuid: uuid2 }) if uuid1 == uuid2 => {
(Some(operation1), None)
}
(&Delete { uuid: uuid1 }, &Create { uuid: uuid2 }) if uuid1 == uuid2 => {
(None, Some(operation2))
}
// And again from an Update and a Create, prefer the Update
(&Update { uuid: uuid1, .. }, &Create { uuid: uuid2 }) if uuid1 == uuid2 => {
(Some(operation1), None)
}
(&Create { uuid: uuid1 }, &Update { uuid: uuid2, .. }) if uuid1 == uuid2 => {
(None, Some(operation2))
}
// Given a delete and an update, prefer the delete
(&Update { uuid: uuid1, .. }, &Delete { uuid: uuid2 }) if uuid1 == uuid2 => {
(None, Some(operation2))
}
(&Delete { uuid: uuid1 }, &Update { uuid: uuid2, .. }) if uuid1 == uuid2 => {
(Some(operation1), None)
}
// Two updates to the same property of the same task might conflict.
(
&Update {
uuid: ref uuid1,
property: ref property1,
value: ref value1,
timestamp: ref timestamp1,
},
&Update {
uuid: ref uuid2,
property: ref property2,
value: ref value2,
timestamp: ref timestamp2,
},
) if uuid1 == uuid2 && property1 == property2 => {
// if the value is the same, there's no conflict
if value1 == value2 {
(None, None)
} else if timestamp1 < timestamp2 {
// prefer the later modification
(None, Some(operation2))
} else if timestamp1 > timestamp2 {
// prefer the later modification
//(Some(operation1), None)
(Some(operation1), None)
} else {
// arbitrarily resolve in favor of the first operation
(Some(operation1), None)
}
}
// anything else is not a conflict of any sort, so return the operations unchanged
(_, _) => (Some(operation1), Some(operation2)),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::taskdb::DB;
use chrono::{Duration, Utc};
// note that `tests/operation_transform_invariant.rs` tests the transform function quite
// thoroughly, so this testing is light.
fn test_transform(
setup: Option<Operation>,
o1: Operation,
o2: Operation,
exp1p: Option<Operation>,
exp2p: Option<Operation>,
) {
let (o1p, o2p) = Operation::transform(o1.clone(), o2.clone());
assert_eq!((&o1p, &o2p), (&exp1p, &exp2p));
// check that the two operation sequences have the same effect, enforcing the invariant of
// the transform function.
let mut db1 = DB::new_inmemory();
if let Some(ref o) = setup {
db1.apply(o.clone()).unwrap();
}
db1.apply(o1).unwrap();
if let Some(o) = o2p {
db1.apply(o).unwrap();
}
let mut db2 = DB::new_inmemory();
if let Some(ref o) = setup {
db2.apply(o.clone()).unwrap();
}
db2.apply(o2).unwrap();
if let Some(o) = o1p {
db2.apply(o).unwrap();
}
assert_eq!(db1.sorted_tasks(), db2.sorted_tasks());
}
#[test]
fn test_unrelated_create() {
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
test_transform(
None,
Create { uuid: uuid1 },
Create { uuid: uuid2 },
Some(Create { uuid: uuid1 }),
Some(Create { uuid: uuid2 }),
);
}
#[test]
fn test_related_updates_different_props() {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
test_transform(
Some(Create { uuid }),
Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
},
Update {
uuid,
property: "def".into(),
value: Some("false".into()),
timestamp,
},
Some(Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
}),
Some(Update {
uuid,
property: "def".into(),
value: Some("false".into()),
timestamp,
}),
);
}
#[test]
fn test_related_updates_same_prop() {
let uuid = Uuid::new_v4();
let timestamp1 = Utc::now();
let timestamp2 = timestamp1 + Duration::seconds(10);
test_transform(
Some(Create { uuid }),
Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp: timestamp1,
},
Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp: timestamp2,
},
None,
Some(Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp: timestamp2,
}),
);
}
#[test]
fn test_related_updates_same_prop_same_time() {
let uuid = Uuid::new_v4();
let timestamp = Utc::now();
test_transform(
Some(Create { uuid }),
Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
},
Update {
uuid,
property: "abc".into(),
value: Some("false".into()),
timestamp,
},
Some(Update {
uuid,
property: "abc".into(),
value: Some("true".into()),
timestamp,
}),
None,
);
}
}

240
taskchampion/src/replica.rs Normal file
View File

@@ -0,0 +1,240 @@
use crate::errors::Error;
use crate::operation::Operation;
use crate::task::{Status, Task};
use crate::taskdb::DB;
use crate::taskstorage::TaskMap;
use chrono::Utc;
use failure::Fallible;
use std::collections::HashMap;
use uuid::Uuid;
/// A replica represents an instance of a user's task data.
pub struct Replica {
taskdb: Box<DB>,
}
impl Replica {
pub fn new(taskdb: Box<DB>) -> Replica {
return Replica { taskdb };
}
/// Update an existing task. If the value is Some, the property is added or updated. If the
/// value is None, the property is deleted. It is not an error to delete a nonexistent
/// property.
pub(crate) fn update_task<S1, S2>(
&mut self,
uuid: Uuid,
property: S1,
value: Option<S2>,
) -> Fallible<()>
where
S1: Into<String>,
S2: Into<String>,
{
self.taskdb.apply(Operation::Update {
uuid,
property: property.into(),
value: value.map(|v| v.into()),
timestamp: Utc::now(),
})
}
/// Add the given uuid to the working set, returning its index.
pub(crate) fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<u64> {
self.taskdb.add_to_working_set(uuid)
}
/// Get all tasks represented as a map keyed by UUID
pub fn all_tasks<'a>(&'a mut self) -> Fallible<HashMap<Uuid, Task>> {
let mut res = HashMap::new();
for (uuid, tm) in self.taskdb.all_tasks()?.drain(..) {
res.insert(uuid.clone(), Task::new(uuid.clone(), tm));
}
Ok(res)
}
/// Get the UUIDs of all tasks
pub fn all_task_uuids<'a>(&'a mut self) -> Fallible<Vec<Uuid>> {
self.taskdb.all_task_uuids()
}
/// Get the "working set" for this replica -- the set of pending tasks, as indexed by small
/// integers
pub fn working_set(&mut self) -> Fallible<Vec<Option<Task>>> {
let working_set = self.taskdb.working_set()?;
let mut res = Vec::with_capacity(working_set.len());
for i in 0..working_set.len() {
res.push(match working_set[i] {
Some(u) => match self.taskdb.get_task(&u)? {
Some(tm) => Some(Task::new(u, tm)),
None => None,
},
None => None,
})
}
Ok(res)
}
/// Get an existing task by its UUID
pub fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<Task>> {
Ok(self
.taskdb
.get_task(uuid)?
.map(move |tm| Task::new(uuid.clone(), tm)))
}
/// Get an existing task by its working set index
pub fn get_working_set_task(&mut self, i: u64) -> Fallible<Option<Task>> {
let working_set = self.taskdb.working_set()?;
if (i as usize) < working_set.len() {
if let Some(uuid) = working_set[i as usize] {
return Ok(self
.taskdb
.get_task(&uuid)?
.map(move |tm| Task::new(uuid, tm)));
}
}
return Ok(None);
}
/// Create a new task. The task must not already exist.
pub fn new_task(&mut self, uuid: Uuid, status: Status, description: String) -> Fallible<Task> {
// check that it doesn't exist; this is a convenience check, as the task
// may already exist when this Create operation is finally sync'd with
// operations from other replicas
if self.taskdb.get_task(&uuid)?.is_some() {
return Err(Error::DBError(format!("Task {} already exists", uuid)).into());
}
self.taskdb
.apply(Operation::Create { uuid: uuid.clone() })?;
let mut task = Task::new(uuid, TaskMap::new()).into_mut(self);
task.set_description(description)?;
task.set_status(status)?;
Ok(task.into_immut())
}
/// Delete a task. The task must exist. Note that this is different from setting status to
/// Deleted; this is the final purge of the task.
pub fn delete_task(&mut self, uuid: Uuid) -> Fallible<()> {
// check that it already exists; this is a convenience check, as the task may already exist
// when this Create operation is finally sync'd with operations from other replicas
if self.taskdb.get_task(&uuid)?.is_none() {
return Err(Error::DBError(format!("Task {} does not exist", uuid)).into());
}
self.taskdb.apply(Operation::Delete { uuid })
}
/// Perform "garbage collection" on this replica. In particular, this renumbers the working
/// set to contain only pending tasks.
pub fn gc(&mut self) -> Fallible<()> {
let pending = String::from(Status::Pending.to_taskmap());
self.taskdb
.rebuild_working_set(|t| t.get("status") == Some(&pending))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::task::Status;
use crate::taskdb::DB;
use uuid::Uuid;
#[test]
fn new_task() {
let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4();
let t = rep
.new_task(uuid.clone(), Status::Pending, "a task".into())
.unwrap();
assert_eq!(t.get_description(), String::from("a task"));
assert_eq!(t.get_status(), Status::Pending);
assert!(t.get_modified().is_some());
}
#[test]
fn modify_task() {
let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4();
let t = rep
.new_task(uuid.clone(), Status::Pending, "a task".into())
.unwrap();
let mut t = t.into_mut(&mut rep);
t.set_description(String::from("past tense")).unwrap();
t.set_status(Status::Completed).unwrap();
// check that values have changed on the TaskMut
assert_eq!(t.get_description(), "past tense");
assert_eq!(t.get_status(), Status::Completed);
// check that values have changed after into_immut
let t = t.into_immut();
assert_eq!(t.get_description(), "past tense");
assert_eq!(t.get_status(), Status::Completed);
// check tha values have changed in storage, too
let t = rep.get_task(&uuid).unwrap().unwrap();
assert_eq!(t.get_description(), "past tense");
assert_eq!(t.get_status(), Status::Completed);
}
#[test]
fn delete_task() {
let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4();
rep.new_task(uuid.clone(), Status::Pending, "a task".into())
.unwrap();
rep.delete_task(uuid.clone()).unwrap();
assert_eq!(rep.get_task(&uuid).unwrap(), None);
}
#[test]
fn get_and_modify() {
let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4();
rep.new_task(uuid.clone(), Status::Pending, "another task".into())
.unwrap();
let t = rep.get_task(&uuid).unwrap().unwrap();
assert_eq!(t.get_description(), String::from("another task"));
let mut t = t.into_mut(&mut rep);
t.set_status(Status::Deleted).unwrap();
t.set_description("gone".into()).unwrap();
let t = rep.get_task(&uuid).unwrap().unwrap();
assert_eq!(t.get_status(), Status::Deleted);
assert_eq!(t.get_description(), "gone");
}
#[test]
fn new_pending_adds_to_working_set() {
let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4();
rep.new_task(uuid.clone(), Status::Pending, "to-be-pending".into())
.unwrap();
let t = rep.get_working_set_task(1).unwrap().unwrap();
assert_eq!(t.get_status(), Status::Pending);
assert_eq!(t.get_description(), "to-be-pending");
let ws = rep.working_set().unwrap();
assert_eq!(ws.len(), 2);
assert!(ws[0].is_none());
assert_eq!(ws[1].as_ref().unwrap().get_uuid(), &uuid);
}
#[test]
fn get_does_not_exist() {
let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4();
assert_eq!(rep.get_task(&uuid).unwrap(), None);
}
}

View File

@@ -0,0 +1,20 @@
pub type Blob = Vec<u8>;
pub enum VersionAdd {
// OK, version added
Ok,
// Rejected, must be based on the the given version
ExpectedVersion(u64),
}
/// A value implementing this trait can act as a server against which a replica can sync.
pub trait Server {
/// Get a vector of all versions after `since_version`
fn get_versions(&self, username: &str, since_version: u64) -> Vec<Blob>;
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
fn add_version(&mut self, username: &str, version: u64, blob: Blob) -> VersionAdd;
fn add_snapshot(&mut self, username: &str, version: u64, blob: Blob);
}

215
taskchampion/src/task.rs Normal file
View File

@@ -0,0 +1,215 @@
use crate::replica::Replica;
use crate::taskstorage::TaskMap;
use chrono::prelude::*;
use failure::Fallible;
use std::convert::TryFrom;
use uuid::Uuid;
pub type Timestamp = DateTime<Utc>;
#[derive(Debug, PartialEq)]
pub enum Priority {
L,
M,
H,
}
impl TryFrom<&str> for Priority {
type Error = failure::Error;
fn try_from(s: &str) -> Result<Self, Self::Error> {
match s {
"L" => Ok(Priority::L),
"M" => Ok(Priority::M),
"H" => Ok(Priority::H),
_ => Err(format_err!("invalid status {}", s)),
}
}
}
impl AsRef<str> for Priority {
fn as_ref(&self) -> &str {
match self {
Priority::L => "L",
Priority::M => "M",
Priority::H => "H",
}
}
}
#[derive(Debug, PartialEq)]
pub enum Status {
Pending,
Completed,
Deleted,
}
impl Status {
/// Get a Status from the 1-character value in a TaskMap,
/// defaulting to Pending
pub(crate) fn from_taskmap(s: &str) -> Status {
match s {
"P" => Status::Pending,
"C" => Status::Completed,
"D" => Status::Deleted,
_ => Status::Pending,
}
}
/// Get the 1-character value for this status to use in the TaskMap.
pub(crate) fn to_taskmap(&self) -> &str {
match self {
Status::Pending => "P",
Status::Completed => "C",
Status::Deleted => "D",
}
}
}
#[derive(Debug, PartialEq)]
pub struct Annotation {
pub entry: Timestamp,
pub description: String,
}
/// A task, as publicly exposed by this crate.
///
/// Note that Task objects represent a snapshot of the task at a moment in time, and are not
/// protected by the atomicity of the backend storage. Concurrent modifications are safe,
/// but a Task that is cached for more than a few seconds may cause the user to see stale
/// data. Fetch, use, and drop Tasks quickly.
///
/// This struct contains only getters for various values on the task. The `into_mut` method returns
/// a TaskMut which can be used to modify the task.
#[derive(Debug, PartialEq)]
pub struct Task {
uuid: Uuid,
taskmap: TaskMap,
}
/// A mutable task, with setter methods. Calling a setter will update the Replica, as well as the
/// included Task.
pub struct TaskMut<'r> {
task: Task,
replica: &'r mut Replica,
updated_modified: bool,
}
impl Task {
pub(crate) fn new(uuid: Uuid, taskmap: TaskMap) -> Task {
Task { uuid, taskmap }
}
pub fn get_uuid(&self) -> &Uuid {
&self.uuid
}
/// Prepare to mutate this task, requiring a mutable Replica
/// in order to update the data it contains.
pub fn into_mut(self, replica: &mut Replica) -> TaskMut {
TaskMut {
task: self,
replica: replica,
updated_modified: false,
}
}
pub fn get_status(&self) -> Status {
self.taskmap
.get("status")
.map(|s| Status::from_taskmap(s))
.unwrap_or(Status::Pending)
}
pub fn get_description(&self) -> &str {
self.taskmap
.get("description")
.map(|s| s.as_ref())
.unwrap_or("")
}
pub fn get_modified(&self) -> Option<DateTime<Utc>> {
self.get_timestamp("modified")
}
// -- utility functions
pub fn get_timestamp(&self, property: &str) -> Option<DateTime<Utc>> {
if let Some(ts) = self.taskmap.get(property) {
if let Ok(ts) = ts.parse() {
return Some(Utc.timestamp(ts, 0));
}
// if the value does not parse as an integer, default to None
}
None
}
}
impl<'r> TaskMut<'r> {
/// Get the immutable task
pub fn into_immut(self) -> Task {
self.task
}
/// Set the task's status. This also adds the task to the working set if the
/// new status puts it in that set.
pub fn set_status(&mut self, status: Status) -> Fallible<()> {
if status == Status::Pending {
let uuid = self.uuid.clone();
self.replica.add_to_working_set(&uuid)?;
}
self.set_string("status", Some(String::from(status.to_taskmap())))
}
/// Set the task's description
pub fn set_description(&mut self, description: String) -> Fallible<()> {
self.set_string("description", Some(description))
}
/// Set the task's description
pub fn set_modified(&mut self, modified: DateTime<Utc>) -> Fallible<()> {
self.set_timestamp("modified", Some(modified))
}
// -- utility functions
fn lastmod(&mut self) -> Fallible<()> {
if !self.updated_modified {
let now = format!("{}", Utc::now().timestamp());
self.replica
.update_task(self.task.uuid.clone(), "modified", Some(now.clone()))?;
self.task.taskmap.insert(String::from("modified"), now);
self.updated_modified = true;
}
Ok(())
}
fn set_string(&mut self, property: &str, value: Option<String>) -> Fallible<()> {
self.lastmod()?;
self.replica
.update_task(self.task.uuid.clone(), property, value.as_ref())?;
if let Some(v) = value {
self.task.taskmap.insert(property.to_string(), v);
} else {
self.task.taskmap.remove(property);
}
Ok(())
}
fn set_timestamp(&mut self, property: &str, value: Option<DateTime<Utc>>) -> Fallible<()> {
self.lastmod()?;
self.replica.update_task(
self.task.uuid.clone(),
property,
value.map(|v| format!("{}", v.timestamp())),
)
}
}
impl<'r> std::ops::Deref for TaskMut<'r> {
type Target = Task;
fn deref(&self) -> &Self::Target {
&self.task
}
}

511
taskchampion/src/taskdb.rs Normal file
View File

@@ -0,0 +1,511 @@
use crate::errors::Error;
use crate::operation::Operation;
use crate::server::{Server, VersionAdd};
use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn};
use failure::Fallible;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::str;
use uuid::Uuid;
pub struct DB {
storage: Box<dyn TaskStorage>,
}
#[derive(Serialize, Deserialize, Debug)]
struct Version {
version: u64,
operations: Vec<Operation>,
}
impl DB {
/// Create a new DB with the given backend storage
pub fn new(storage: Box<dyn TaskStorage>) -> DB {
DB { storage }
}
#[cfg(test)]
pub fn new_inmemory() -> DB {
DB::new(Box::new(crate::taskstorage::InMemoryStorage::new()))
}
/// Apply an operation to the DB. Aside from synchronization operations, this is the only way
/// to modify the DB. In cases where an operation does not make sense, this function will do
/// nothing and return an error (but leave the DB in a consistent state).
pub fn apply(&mut self, op: Operation) -> Fallible<()> {
// TODO: differentiate error types here?
let mut txn = self.storage.txn()?;
if let err @ Err(_) = DB::apply_op(txn.as_mut(), &op) {
return err;
}
txn.add_operation(op)?;
txn.commit()?;
Ok(())
}
fn apply_op(txn: &mut dyn TaskStorageTxn, op: &Operation) -> Fallible<()> {
match op {
&Operation::Create { uuid } => {
// insert if the task does not already exist
if !txn.create_task(uuid)? {
return Err(Error::DBError(format!("Task {} already exists", uuid)).into());
}
}
&Operation::Delete { ref uuid } => {
if !txn.delete_task(uuid)? {
return Err(Error::DBError(format!("Task {} does not exist", uuid)).into());
}
}
&Operation::Update {
ref uuid,
ref property,
ref value,
timestamp: _,
} => {
// update if this task exists, otherwise ignore
if let Some(task) = txn.get_task(uuid)? {
let mut task = task.clone();
// TODO: update working_set if this is changing state to or from pending
match value {
Some(ref val) => task.insert(property.to_string(), val.clone()),
None => task.remove(property),
};
txn.set_task(uuid.clone(), task)?;
} else {
return Err(Error::DBError(format!("Task {} does not exist", uuid)).into());
}
}
}
Ok(())
}
/// Get all tasks.
pub fn all_tasks<'a>(&'a mut self) -> Fallible<Vec<(Uuid, TaskMap)>> {
let mut txn = self.storage.txn()?;
txn.all_tasks()
}
/// Get the UUIDs of all tasks
pub fn all_task_uuids<'a>(&'a mut self) -> Fallible<Vec<Uuid>> {
let mut txn = self.storage.txn()?;
txn.all_task_uuids()
}
/// Get the working set
pub fn working_set<'a>(&'a mut self) -> Fallible<Vec<Option<Uuid>>> {
let mut txn = self.storage.txn()?;
txn.get_working_set()
}
/// Get a single task, by uuid.
pub fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
let mut txn = self.storage.txn()?;
txn.get_task(uuid)
}
/// Rebuild the working set using a function to identify tasks that should be in the set. This
/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that
/// are not already in the working set but should be. The rebuild occurs in a single
/// trasnsaction against the storage backend.
pub fn rebuild_working_set<F>(&mut self, in_working_set: F) -> Fallible<()>
where
F: Fn(&TaskMap) -> bool,
{
let mut txn = self.storage.txn()?;
let mut new_ws = vec![];
let mut seen = HashSet::new();
// The goal here is for existing working-set items to be "compressed' down to index 1, so
// we begin by scanning the current working set and inserting any tasks that should still
// be in the set into new_ws, implicitly dropping any tasks that are no longer in the
// working set.
for elt in txn.get_working_set()? {
if let Some(uuid) = elt {
if let Some(task) = txn.get_task(&uuid)? {
if in_working_set(&task) {
new_ws.push(uuid.clone());
seen.insert(uuid);
}
}
}
}
// Now go hunting for tasks that should be in this list but are not, adding them at the
// end of the list.
for (uuid, task) in txn.all_tasks()? {
if !seen.contains(&uuid) {
if in_working_set(&task) {
new_ws.push(uuid.clone());
}
}
}
// clear and re-write the entire working set, in order
txn.clear_working_set()?;
for uuid in new_ws.drain(0..new_ws.len()) {
txn.add_to_working_set(&uuid)?;
}
txn.commit()?;
Ok(())
}
/// Add the given uuid to the working set and return its index; if it is already in the working
/// set, its index is returned. This does *not* renumber any existing tasks.
pub fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<u64> {
let mut txn = self.storage.txn()?;
// search for an existing entry for this task..
for (i, elt) in txn.get_working_set()?.iter().enumerate() {
if *elt == Some(*uuid) {
// (note that this drops the transaction with no changes made)
return Ok(i as u64);
}
}
// and if not found, add one
let i = txn.add_to_working_set(uuid)?;
txn.commit()?;
Ok(i)
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub fn sync(&mut self, username: &str, server: &mut dyn Server) -> Fallible<()> {
let mut txn = self.storage.txn()?;
// retry synchronizing until the server accepts our version (this allows for races between
// replicas trying to sync to the same server)
loop {
// first pull changes and "rebase" on top of them
let new_versions = server.get_versions(username, txn.base_version()?);
for version_blob in new_versions {
let version_str = str::from_utf8(&version_blob).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();
assert_eq!(version.version, txn.base_version()? + 1);
println!("applying version {:?} from server", version.version);
DB::apply_version(txn.as_mut(), version)?;
}
let operations: Vec<Operation> = txn.operations()?.iter().map(|o| o.clone()).collect();
if operations.len() == 0 {
// nothing to sync back to the server..
break;
}
// now make a version of our local changes and push those
let new_version = Version {
version: txn.base_version()? + 1,
operations: operations,
};
let new_version_str = serde_json::to_string(&new_version).unwrap();
println!("sending version {:?} to server", new_version.version);
if let VersionAdd::Ok =
server.add_version(username, new_version.version, new_version_str.into())
{
txn.set_base_version(new_version.version)?;
txn.set_operations(vec![])?;
break;
}
}
txn.commit()?;
Ok(())
}
fn apply_version(txn: &mut dyn TaskStorageTxn, mut version: Version) -> Fallible<()> {
// The situation here is that the server has already applied all server operations, and we
// have already applied all local operations, so states have diverged by several
// operations. We need to figure out what operations to apply locally and on the server in
// order to return to the same state.
//
// Operational transforms provide this on an operation-by-operation basis. To break this
// down, we treat each server operation individually, in order. For each such operation,
// we start in this state:
//
//
// base state-*
// / \-server op
// * *
// local / \ /
// ops * *
// / \ / new
// * * local
// local / \ / ops
// state-* *
// new-\ /
// server op *-new local state
//
// This is slightly complicated by the fact that the transform function can return None,
// indicating no operation is required. If this happens for a local op, we can just omit
// it. If it happens for server op, then we must copy the remaining local ops.
let mut local_operations: Vec<Operation> = txn.operations()?;
for server_op in version.operations.drain(..) {
let mut new_local_ops = Vec::with_capacity(local_operations.len());
let mut svr_op = Some(server_op);
for local_op in local_operations.drain(..) {
if let Some(o) = svr_op {
let (new_server_op, new_local_op) = Operation::transform(o, local_op);
svr_op = new_server_op;
if let Some(o) = new_local_op {
new_local_ops.push(o);
}
} else {
new_local_ops.push(local_op);
}
}
if let Some(o) = svr_op {
if let Err(e) = DB::apply_op(txn, &o) {
println!("Invalid operation when syncing: {} (ignored)", e);
}
}
local_operations = new_local_ops;
}
txn.set_base_version(version.version)?;
txn.set_operations(local_operations)?;
Ok(())
}
// functions for supporting tests
pub fn sorted_tasks(&mut self) -> Vec<(Uuid, Vec<(String, String)>)> {
let mut res: Vec<(Uuid, Vec<(String, String)>)> = self
.all_tasks()
.unwrap()
.iter()
.map(|(u, t)| {
let mut t = t
.iter()
.map(|(p, v)| (p.clone(), v.clone()))
.collect::<Vec<(String, String)>>();
t.sort();
(u.clone(), t)
})
.collect();
res.sort();
res
}
pub fn operations(&mut self) -> Vec<Operation> {
let mut txn = self.storage.txn().unwrap();
txn.operations()
.unwrap()
.iter()
.map(|o| o.clone())
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use std::collections::HashMap;
use uuid::Uuid;
#[test]
fn test_apply_create() {
let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Create { uuid };
db.apply(op.clone()).unwrap();
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![]),]);
assert_eq!(db.operations(), vec![op]);
}
#[test]
fn test_apply_create_exists() {
let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Create { uuid };
db.apply(op.clone()).unwrap();
assert_eq!(
db.apply(op.clone()).err().unwrap().to_string(),
format!("Task Database Error: Task {} already exists", uuid)
);
assert_eq!(db.sorted_tasks(), vec![(uuid, vec![])]);
assert_eq!(db.operations(), vec![op]);
}
#[test]
fn test_apply_create_update() {
let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
db.apply(op1.clone()).unwrap();
let op2 = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
db.apply(op2.clone()).unwrap();
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("title".into(), "my task".into())])]
);
assert_eq!(db.operations(), vec![op1, op2]);
}
#[test]
fn test_apply_create_update_delete_prop() {
let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
db.apply(op1.clone()).unwrap();
let op2 = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
db.apply(op2.clone()).unwrap();
let op3 = Operation::Update {
uuid,
property: String::from("priority"),
value: Some("H".into()),
timestamp: Utc::now(),
};
db.apply(op3.clone()).unwrap();
let op4 = Operation::Update {
uuid,
property: String::from("title"),
value: None,
timestamp: Utc::now(),
};
db.apply(op4.clone()).unwrap();
let mut exp = HashMap::new();
let mut task = HashMap::new();
task.insert(String::from("priority"), String::from("H"));
exp.insert(uuid, task);
assert_eq!(
db.sorted_tasks(),
vec![(uuid, vec![("priority".into(), "H".into())])]
);
assert_eq!(db.operations(), vec![op1, op2, op3, op4]);
}
#[test]
fn test_apply_update_does_not_exist() {
let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4();
let op = Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: Utc::now(),
};
assert_eq!(
db.apply(op).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
assert_eq!(db.sorted_tasks(), vec![]);
assert_eq!(db.operations(), vec![]);
}
#[test]
fn test_apply_create_delete() {
let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid };
db.apply(op1.clone()).unwrap();
let op2 = Operation::Delete { uuid };
db.apply(op2.clone()).unwrap();
assert_eq!(db.sorted_tasks(), vec![]);
assert_eq!(db.operations(), vec![op1, op2]);
}
#[test]
fn test_apply_delete_not_present() {
let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4();
let op1 = Operation::Delete { uuid };
assert_eq!(
db.apply(op1).err().unwrap().to_string(),
format!("Task Database Error: Task {} does not exist", uuid)
);
assert_eq!(db.sorted_tasks(), vec![]);
assert_eq!(db.operations(), vec![]);
}
#[test]
fn rebuild_working_set() -> Fallible<()> {
let mut db = DB::new_inmemory();
let uuids = vec![
Uuid::new_v4(), // 0: pending, not already in working set
Uuid::new_v4(), // 1: pending, already in working set
Uuid::new_v4(), // 2: not pending, not already in working set
Uuid::new_v4(), // 3: not pending, already in working set
Uuid::new_v4(), // 4: pending, already in working set
];
// add everything to the DB
for uuid in &uuids {
db.apply(Operation::Create { uuid: uuid.clone() })?;
}
for i in &[0usize, 1, 4] {
db.apply(Operation::Update {
uuid: uuids[*i].clone(),
property: String::from("status"),
value: Some("pending".into()),
timestamp: Utc::now(),
})?;
}
// set the existing working_set as we want it
{
let mut txn = db.storage.txn()?;
txn.clear_working_set()?;
for i in &[1usize, 3, 4] {
txn.add_to_working_set(&uuids[*i])?;
}
txn.commit()?;
}
assert_eq!(
db.working_set()?,
vec![
None,
Some(uuids[1].clone()),
Some(uuids[3].clone()),
Some(uuids[4].clone())
]
);
db.rebuild_working_set(|t| {
if let Some(status) = t.get("status") {
status == "pending"
} else {
false
}
})?;
// uuids[1] and uuids[4] are already in the working set, so are compressed
// to the top, and then uuids[0] is added.
assert_eq!(
db.working_set()?,
vec![
None,
Some(uuids[1].clone()),
Some(uuids[4].clone()),
Some(uuids[0].clone())
]
);
Ok(())
}
}

View File

@@ -0,0 +1,291 @@
use crate::operation::Operation;
use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn};
use failure::Fallible;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use uuid::Uuid;
#[derive(PartialEq, Debug, Clone)]
struct Data {
tasks: HashMap<Uuid, TaskMap>,
base_version: u64,
operations: Vec<Operation>,
working_set: Vec<Option<Uuid>>,
}
struct Txn<'t> {
storage: &'t mut InMemoryStorage,
new_data: Option<Data>,
}
impl<'t> Txn<'t> {
fn mut_data_ref(&mut self) -> &mut Data {
if self.new_data.is_none() {
self.new_data = Some(self.storage.data.clone());
}
if let Some(ref mut data) = self.new_data {
data
} else {
unreachable!();
}
}
fn data_ref(&mut self) -> &Data {
if let Some(ref data) = self.new_data {
data
} else {
&self.storage.data
}
}
}
impl<'t> TaskStorageTxn for Txn<'t> {
fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
match self.data_ref().tasks.get(uuid) {
None => Ok(None),
Some(t) => Ok(Some(t.clone())),
}
}
fn create_task(&mut self, uuid: Uuid) -> Fallible<bool> {
if let ent @ Entry::Vacant(_) = self.mut_data_ref().tasks.entry(uuid) {
ent.or_insert(TaskMap::new());
Ok(true)
} else {
Ok(false)
}
}
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()> {
self.mut_data_ref().tasks.insert(uuid, task);
Ok(())
}
fn delete_task(&mut self, uuid: &Uuid) -> Fallible<bool> {
if let Some(_) = self.mut_data_ref().tasks.remove(uuid) {
Ok(true)
} else {
Ok(false)
}
}
fn all_tasks<'a>(&mut self) -> Fallible<Vec<(Uuid, TaskMap)>> {
Ok(self
.data_ref()
.tasks
.iter()
.map(|(u, t)| (u.clone(), t.clone()))
.collect())
}
fn all_task_uuids<'a>(&mut self) -> Fallible<Vec<Uuid>> {
Ok(self.data_ref().tasks.keys().map(|u| u.clone()).collect())
}
fn base_version(&mut self) -> Fallible<u64> {
Ok(self.data_ref().base_version)
}
fn set_base_version(&mut self, version: u64) -> Fallible<()> {
self.mut_data_ref().base_version = version;
Ok(())
}
fn operations(&mut self) -> Fallible<Vec<Operation>> {
Ok(self.data_ref().operations.clone())
}
fn add_operation(&mut self, op: Operation) -> Fallible<()> {
self.mut_data_ref().operations.push(op);
Ok(())
}
fn set_operations(&mut self, ops: Vec<Operation>) -> Fallible<()> {
self.mut_data_ref().operations = ops;
Ok(())
}
fn get_working_set(&mut self) -> Fallible<Vec<Option<Uuid>>> {
Ok(self.data_ref().working_set.clone())
}
fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<u64> {
let working_set = &mut self.mut_data_ref().working_set;
working_set.push(Some(uuid.clone()));
Ok(working_set.len() as u64)
}
fn remove_from_working_set(&mut self, index: u64) -> Fallible<()> {
let index = index as usize;
let working_set = &mut self.mut_data_ref().working_set;
if index >= working_set.len() || working_set[index].is_none() {
return Err(format_err!("No task found with index {}", index));
}
working_set[index] = None;
Ok(())
}
fn clear_working_set(&mut self) -> Fallible<()> {
self.mut_data_ref().working_set = vec![None];
Ok(())
}
fn commit(&mut self) -> Fallible<()> {
// copy the new_data back into storage to commit the transaction
if let Some(data) = self.new_data.take() {
self.storage.data = data;
}
Ok(())
}
}
#[derive(PartialEq, Debug, Clone)]
pub struct InMemoryStorage {
data: Data,
}
impl InMemoryStorage {
pub fn new() -> InMemoryStorage {
InMemoryStorage {
data: Data {
tasks: HashMap::new(),
base_version: 0,
operations: vec![],
working_set: vec![None],
},
}
}
}
impl TaskStorage for InMemoryStorage {
fn txn<'a>(&'a mut self) -> Fallible<Box<dyn TaskStorageTxn + 'a>> {
Ok(Box::new(Txn {
storage: self,
new_data: None,
}))
}
}
#[cfg(test)]
mod test {
use super::*;
// (note: this module is heavily used in tests so most of its functionality is well-tested
// elsewhere and not tested here)
#[test]
fn get_working_set_empty() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None]);
}
Ok(())
}
#[test]
fn add_to_working_set() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid1), Some(uuid2)]);
}
Ok(())
}
#[test]
fn add_and_remove_from_working_set_holes() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.remove_from_working_set(1)?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, None, Some(uuid2), Some(uuid1)]);
}
Ok(())
}
#[test]
fn remove_working_set_doesnt_exist() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
let uuid1 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let res = txn.remove_from_working_set(0);
assert!(res.is_err());
let res = txn.remove_from_working_set(2);
assert!(res.is_err());
}
Ok(())
}
#[test]
fn clear_working_set() -> Fallible<()> {
let mut storage = InMemoryStorage::new();
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.clear_working_set()?;
txn.add_to_working_set(&uuid2)?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid2), Some(uuid1)]);
}
Ok(())
}
}

View File

@@ -0,0 +1,766 @@
use crate::operation::Operation;
use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn};
use failure::Fallible;
use kv::msgpack::Msgpack;
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
use std::convert::TryInto;
use std::path::Path;
use uuid::Uuid;
/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form
/// of a UUID.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
struct Key(uuid::Bytes);
impl From<&[u8]> for Key {
fn from(bytes: &[u8]) -> Key {
let key = Key(bytes.try_into().unwrap());
key
}
}
impl From<&Uuid> for Key {
fn from(uuid: &Uuid) -> Key {
let key = Key(uuid.as_bytes().clone());
key
}
}
impl From<Uuid> for Key {
fn from(uuid: Uuid) -> Key {
let key = Key(uuid.as_bytes().clone());
key
}
}
impl From<Key> for Uuid {
fn from(key: Key) -> Uuid {
Uuid::from_bytes(key.0)
}
}
impl AsRef<[u8]> for Key {
fn as_ref(&self) -> &[u8] {
&self.0[..]
}
}
/// KVStorage is an on-disk storage backend which uses LMDB via the `kv` crate.
pub struct KVStorage<'t> {
store: Store,
tasks_bucket: Bucket<'t, Key, ValueBuf<Msgpack<TaskMap>>>,
numbers_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<u64>>>,
operations_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>>,
working_set_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
}
const BASE_VERSION: u64 = 1;
const NEXT_OPERATION: u64 = 2;
const NEXT_WORKING_SET_INDEX: u64 = 3;
impl<'t> KVStorage<'t> {
pub fn new(directory: &Path) -> Fallible<KVStorage> {
let mut config = Config::default(directory);
config.bucket("tasks", None);
config.bucket("numbers", None);
config.bucket("operations", None);
config.bucket("working_set", None);
let store = Store::new(config)?;
// tasks are stored indexed by uuid
let tasks_bucket = store.bucket::<Key, ValueBuf<Msgpack<TaskMap>>>(Some("tasks"))?;
// this bucket contains various u64s, indexed by constants above
let numbers_bucket = store.int_bucket::<ValueBuf<Msgpack<u64>>>(Some("numbers"))?;
// this bucket contains operations, numbered consecutively; the NEXT_OPERATION number gives
// the index of the next operation to insert
let operations_bucket =
store.int_bucket::<ValueBuf<Msgpack<Operation>>>(Some("operations"))?;
// this bucket contains operations, numbered consecutively; the NEXT_WORKING_SET_INDEX
// number gives the index of the next operation to insert
let working_set_bucket =
store.int_bucket::<ValueBuf<Msgpack<Uuid>>>(Some("working_set"))?;
Ok(KVStorage {
store,
tasks_bucket,
numbers_bucket,
operations_bucket,
working_set_bucket,
})
}
}
impl<'t> TaskStorage for KVStorage<'t> {
fn txn<'a>(&'a mut self) -> Fallible<Box<dyn TaskStorageTxn + 'a>> {
Ok(Box::new(Txn {
storage: self,
txn: Some(self.store.write_txn()?),
}))
}
}
struct Txn<'t> {
storage: &'t KVStorage<'t>,
txn: Option<kv::Txn<'t>>,
}
impl<'t> Txn<'t> {
// get the underlying kv Txn
fn kvtxn<'a>(&mut self) -> &mut kv::Txn<'t> {
if let Some(ref mut txn) = self.txn {
txn
} else {
panic!("cannot use transaction after commit");
}
}
// Access to buckets
fn tasks_bucket(&self) -> &'t Bucket<'t, Key, ValueBuf<Msgpack<TaskMap>>> {
&self.storage.tasks_bucket
}
fn numbers_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<u64>>> {
&self.storage.numbers_bucket
}
fn operations_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>> {
&self.storage.operations_bucket
}
fn working_set_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>> {
&self.storage.working_set_bucket
}
}
impl<'t> TaskStorageTxn for Txn<'t> {
fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
let bucket = self.tasks_bucket();
let buf = match self.kvtxn().get(bucket, uuid.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(None),
Err(e) => return Err(e.into()),
};
let value = buf.inner()?.to_serde();
Ok(Some(value))
}
fn create_task(&mut self, uuid: Uuid) -> Fallible<bool> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
match kvtxn.get(bucket, uuid.into()) {
Err(Error::NotFound) => {
kvtxn.set(bucket, uuid.into(), Msgpack::to_value_buf(TaskMap::new())?)?;
Ok(true)
}
Err(e) => Err(e.into()),
Ok(_) => Ok(false),
}
}
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
kvtxn.set(bucket, uuid.into(), Msgpack::to_value_buf(task)?)?;
Ok(())
}
fn delete_task(&mut self, uuid: &Uuid) -> Fallible<bool> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
match kvtxn.del(bucket, uuid.into()) {
Err(Error::NotFound) => Ok(false),
Err(e) => Err(e.into()),
Ok(_) => Ok(true),
}
}
fn all_tasks(&mut self) -> Fallible<Vec<(Uuid, TaskMap)>> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
let curs = kvtxn.read_cursor(bucket)?;
let all_tasks: Result<Vec<(Uuid, TaskMap)>, Error> = kvtxn
.read_cursor(bucket)?
.iter()
.map(|(k, v)| Ok((k.into(), v.inner()?.to_serde())))
.collect();
Ok(all_tasks?)
}
fn all_task_uuids(&mut self) -> Fallible<Vec<Uuid>> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
let curs = kvtxn.read_cursor(bucket)?;
Ok(kvtxn
.read_cursor(bucket)?
.iter()
.map(|(k, _)| k.into())
.collect())
}
fn base_version(&mut self) -> Fallible<u64> {
let bucket = self.numbers_bucket();
let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(0),
Err(e) => return Err(e.into()),
}
.inner()?
.to_serde();
Ok(base_version)
}
fn set_base_version(&mut self, version: u64) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
kvtxn.set(
numbers_bucket,
BASE_VERSION.into(),
Msgpack::to_value_buf(version)?,
)?;
Ok(())
}
fn operations(&mut self) -> Fallible<Vec<Operation>> {
let bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
let curs = kvtxn.read_cursor(bucket)?;
let all_ops: Result<Vec<(u64, Operation)>, Error> = kvtxn
.read_cursor(bucket)?
.iter()
.map(|(i, v)| Ok((i.into(), v.inner()?.to_serde())))
.collect();
let mut all_ops = all_ops?;
// sort by key..
all_ops.sort_by(|a, b| a.0.cmp(&b.0));
// and return the values..
Ok(all_ops.iter().map(|(_, v)| v.clone()).collect())
}
fn add_operation(&mut self, op: Operation) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let operations_bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
let next_op = match kvtxn.get(numbers_bucket, NEXT_OPERATION.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 0,
Err(e) => return Err(e.into()),
};
kvtxn.set(
operations_bucket,
next_op.into(),
Msgpack::to_value_buf(op)?,
)?;
kvtxn.set(
numbers_bucket,
NEXT_OPERATION.into(),
Msgpack::to_value_buf(next_op + 1)?,
)?;
Ok(())
}
fn set_operations(&mut self, ops: Vec<Operation>) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let operations_bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
kvtxn.clear_db(operations_bucket)?;
let mut i = 0u64;
for op in ops {
kvtxn.set(operations_bucket, i.into(), Msgpack::to_value_buf(op)?)?;
i += 1;
}
kvtxn.set(
numbers_bucket,
NEXT_OPERATION.into(),
Msgpack::to_value_buf(i)?,
)?;
Ok(())
}
fn get_working_set(&mut self) -> Fallible<Vec<Option<Uuid>>> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 1,
Err(e) => return Err(e.into()),
};
let mut res = Vec::with_capacity(next_index as usize);
for _ in 0..next_index {
res.push(None)
}
let curs = kvtxn.read_cursor(working_set_bucket)?;
for (i, u) in kvtxn.read_cursor(working_set_bucket)?.iter() {
let i: u64 = i.into();
res[i as usize] = Some(u.inner()?.to_serde());
}
Ok(res)
}
fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<u64> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 1,
Err(e) => return Err(e.into()),
};
kvtxn.set(
working_set_bucket,
next_index.into(),
Msgpack::to_value_buf(uuid.clone())?,
)?;
kvtxn.set(
numbers_bucket,
NEXT_WORKING_SET_INDEX.into(),
Msgpack::to_value_buf(next_index + 1)?,
)?;
Ok(next_index)
}
fn remove_from_working_set(&mut self, index: u64) -> Fallible<()> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 1,
Err(e) => return Err(e.into()),
};
if index == 0 || index >= next_index {
return Err(format_err!("No task found with index {}", index));
}
match kvtxn.del(working_set_bucket, index.into()) {
Err(Error::NotFound) => Err(format_err!("No task found with index {}", index)),
Err(e) => Err(e.into()),
Ok(_) => Ok(()),
}
}
fn clear_working_set(&mut self) -> Fallible<()> {
let working_set_bucket = self.working_set_bucket();
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
kvtxn.clear_db(working_set_bucket)?;
kvtxn.set(
numbers_bucket,
NEXT_WORKING_SET_INDEX.into(),
Msgpack::to_value_buf(1)?,
)?;
Ok(())
}
fn commit(&mut self) -> Fallible<()> {
if let Some(kvtxn) = self.txn.take() {
kvtxn.commit()?;
} else {
panic!("transaction already committed");
}
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::taskstorage::taskmap_with;
use failure::Fallible;
use tempdir::TempDir;
#[test]
fn test_create() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid.clone())?);
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let task = txn.get_task(&uuid)?;
assert_eq!(task, Some(taskmap_with(vec![])));
}
Ok(())
}
#[test]
fn test_create_exists() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid.clone())?);
txn.commit()?;
}
{
let mut txn = storage.txn()?;
assert!(!txn.create_task(uuid.clone())?);
txn.commit()?;
}
Ok(())
}
#[test]
fn test_get_missing() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
let task = txn.get_task(&uuid)?;
assert_eq!(task, None);
}
Ok(())
}
#[test]
fn test_set_task() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.set_task(
uuid.clone(),
taskmap_with(vec![("k".to_string(), "v".to_string())]),
)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let task = txn.get_task(&uuid)?;
assert_eq!(
task,
Some(taskmap_with(vec![("k".to_string(), "v".to_string())]))
);
}
Ok(())
}
#[test]
fn test_delete_task_missing() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(!txn.delete_task(&uuid)?);
}
Ok(())
}
#[test]
fn test_delete_task_exists() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid.clone())?);
txn.commit()?;
}
{
let mut txn = storage.txn()?;
assert!(txn.delete_task(&uuid)?);
}
Ok(())
}
#[test]
fn test_all_tasks_empty() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
let tasks = txn.all_tasks()?;
assert_eq!(tasks, vec![]);
}
Ok(())
}
#[test]
fn test_all_tasks_and_uuids() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid1.clone())?);
txn.set_task(
uuid1.clone(),
taskmap_with(vec![("num".to_string(), "1".to_string())]),
)?;
assert!(txn.create_task(uuid2.clone())?);
txn.set_task(
uuid2.clone(),
taskmap_with(vec![("num".to_string(), "2".to_string())]),
)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let mut tasks = txn.all_tasks()?;
// order is nondeterministic, so sort by uuid
tasks.sort_by(|a, b| a.0.cmp(&b.0));
let mut exp = vec![
(
uuid1.clone(),
taskmap_with(vec![("num".to_string(), "1".to_string())]),
),
(
uuid2.clone(),
taskmap_with(vec![("num".to_string(), "2".to_string())]),
),
];
exp.sort_by(|a, b| a.0.cmp(&b.0));
assert_eq!(tasks, exp);
}
{
let mut txn = storage.txn()?;
let mut uuids = txn.all_task_uuids()?;
uuids.sort();
let mut exp = vec![uuid1.clone(), uuid2.clone()];
exp.sort();
assert_eq!(uuids, exp);
}
Ok(())
}
#[test]
fn test_base_version_default() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
assert_eq!(txn.base_version()?, 0);
}
Ok(())
}
#[test]
fn test_base_version_setting() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
txn.set_base_version(3)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
assert_eq!(txn.base_version()?, 3);
}
Ok(())
}
#[test]
fn test_operations() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
let uuid3 = Uuid::new_v4();
// create some operations
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid1 })?;
txn.add_operation(Operation::Create { uuid: uuid2 })?;
txn.commit()?;
}
// read them back
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(
ops,
vec![
Operation::Create { uuid: uuid1 },
Operation::Create { uuid: uuid2 },
]
);
}
// set them to a different bunch
{
let mut txn = storage.txn()?;
txn.set_operations(vec![
Operation::Delete { uuid: uuid2 },
Operation::Delete { uuid: uuid1 },
])?;
txn.commit()?;
}
// create some more operations (to test adding operations after clearing)
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid3 })?;
txn.add_operation(Operation::Delete { uuid: uuid3 })?;
txn.commit()?;
}
// read them back
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(
ops,
vec![
Operation::Delete { uuid: uuid2 },
Operation::Delete { uuid: uuid1 },
Operation::Create { uuid: uuid3 },
Operation::Delete { uuid: uuid3 },
]
);
}
Ok(())
}
#[test]
fn get_working_set_empty() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None]);
}
Ok(())
}
#[test]
fn add_to_working_set() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid1), Some(uuid2)]);
}
Ok(())
}
#[test]
fn add_and_remove_from_working_set_holes() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.remove_from_working_set(1)?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, None, Some(uuid2), Some(uuid1)]);
}
Ok(())
}
#[test]
fn remove_working_set_doesnt_exist() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let res = txn.remove_from_working_set(0);
assert!(res.is_err());
let res = txn.remove_from_working_set(2);
assert!(res.is_err());
}
Ok(())
}
#[test]
fn clear_working_set() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.add_to_working_set(&uuid1)?;
txn.add_to_working_set(&uuid2)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
txn.clear_working_set()?;
txn.add_to_working_set(&uuid2)?;
txn.add_to_working_set(&uuid1)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let ws = txn.get_working_set()?;
assert_eq!(ws, vec![None, Some(uuid2), Some(uuid1)]);
}
Ok(())
}
}

View File

@@ -0,0 +1,92 @@
use crate::Operation;
use failure::Fallible;
use std::collections::HashMap;
use uuid::Uuid;
mod inmemory;
mod kv;
pub use self::kv::KVStorage;
pub use inmemory::InMemoryStorage;
/// An in-memory representation of a task as a simple hashmap
pub type TaskMap = HashMap<String, String>;
#[cfg(test)]
fn taskmap_with(mut properties: Vec<(String, String)>) -> TaskMap {
let mut rv = TaskMap::new();
for (p, v) in properties.drain(..) {
rv.insert(p, v);
}
rv
}
/// A TaskStorage transaction, in which storage operations are performed.
/// Serializable consistency is maintained, and implementations do not optimize
/// for concurrent access so some may simply apply a mutex to limit access to
/// one transaction at a time. Transactions are aborted if they are dropped.
/// It's safe to drop transactions that did not modify any data.
pub trait TaskStorageTxn {
/// Get an (immutable) task, if it is in the storage
fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<TaskMap>>;
/// Create an (empty) task, only if it does not already exist. Returns true if
/// the task was created (did not already exist).
fn create_task(&mut self, uuid: Uuid) -> Fallible<bool>;
/// Set a task, overwriting any existing task. If the task does not exist, this implicitly
/// creates it (use `get_task` to check first, if necessary).
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()>;
/// Delete a task, if it exists. Returns true if the task was deleted (already existed)
fn delete_task(&mut self, uuid: &Uuid) -> Fallible<bool>;
/// Get the uuids and bodies of all tasks in the storage, in undefined order.
fn all_tasks<'a>(&mut self) -> Fallible<Vec<(Uuid, TaskMap)>>;
/// Get the uuids of all tasks in the storage, in undefined order.
fn all_task_uuids<'a>(&mut self) -> Fallible<Vec<Uuid>>;
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&mut self) -> Fallible<u64>;
/// Set the current base_version for this storage.
fn set_base_version(&mut self, version: u64) -> Fallible<()>;
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations<'a>(&mut self) -> Fallible<Vec<Operation>>;
/// Add an operation to the end of the list of operations in the storage. Note that this
/// merely *stores* the operation; it is up to the DB to apply it.
fn add_operation(&mut self, op: Operation) -> Fallible<()>;
/// Replace the current list of operations with a new list.
fn set_operations(&mut self, ops: Vec<Operation>) -> Fallible<()>;
/// Get the entire working set, with each task UUID at its appropriate (1-based) index.
/// Element 0 is always None.
fn get_working_set(&mut self) -> Fallible<Vec<Option<Uuid>>>;
/// Add a task to the working set and return its (one-based) index. This index will be one greater
/// than the highest used index.
fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<u64>;
/// Remove a task from the working set. Other tasks' indexes are not affected.
fn remove_from_working_set(&mut self, index: u64) -> Fallible<()>;
/// Clear all tasks from the working set in preparation for a garbage-collection operation.
fn clear_working_set(&mut self) -> Fallible<()>;
/// Commit any changes made in the transaction. It is an error to call this more than
/// once.
fn commit(&mut self) -> Fallible<()>;
}
/// A trait for objects able to act as backing storage for a DB. This API is optimized to be
/// easy to implement, with all of the semantic meaning of the data located in the DB
/// implementation, which is the sole consumer of this trait.
pub trait TaskStorage {
/// Begin a transaction
fn txn<'a>(&'a mut self) -> Fallible<Box<dyn TaskStorageTxn + 'a>>;
}