factor storage out to a trait object

This commit is contained in:
Dustin J. Mitchell
2020-01-05 14:58:24 -05:00
parent e228c99b83
commit 611b1cd68f
11 changed files with 177 additions and 108 deletions

View File

@@ -6,7 +6,7 @@
- tags: `tags.<tag> = ""` - tags: `tags.<tag> = ""`
* add HTTP API * add HTTP API
* add pending-task indexing to Replica * add pending-task indexing to Replica
* abstract server, storage, etc. into traits * abstract server into trait
* implement snapshot requests * implement snapshot requests
* implement backups * implement backups
* implement client-side encryption * implement client-side encryption

View File

@@ -1,6 +1,6 @@
extern crate clap; extern crate clap;
use clap::{App, Arg, SubCommand}; use clap::{App, Arg, SubCommand};
use taskwarrior_rust::{Replica, DB}; use taskwarrior_rust::{taskstorage, Replica, DB};
use uuid::Uuid; use uuid::Uuid;
fn main() { fn main() {
@@ -16,7 +16,7 @@ fn main() {
.subcommand(SubCommand::with_name("list").about("lists tasks")) .subcommand(SubCommand::with_name("list").about("lists tasks"))
.get_matches(); .get_matches();
let mut replica = Replica::new(DB::new().into()); let mut replica = Replica::new(DB::new(Box::new(taskstorage::InMemoryStorage::new())).into());
match matches.subcommand() { match matches.subcommand() {
("add", Some(matches)) => { ("add", Some(matches)) => {

View File

@@ -10,7 +10,7 @@ mod replica;
mod server; mod server;
mod task; mod task;
mod taskdb; mod taskdb;
mod taskstorage; pub mod taskstorage;
mod tdb2; mod tdb2;
pub use operation::Operation; pub use operation::Operation;

View File

@@ -146,7 +146,7 @@ mod test {
// check that the two operation sequences have the same effect, enforcing the invariant of // check that the two operation sequences have the same effect, enforcing the invariant of
// the transform function. // the transform function.
let mut db1 = DB::new(); let mut db1 = DB::new_inmemory();
if let Some(ref o) = setup { if let Some(ref o) = setup {
db1.apply(o.clone()).unwrap(); db1.apply(o.clone()).unwrap();
} }
@@ -155,7 +155,7 @@ mod test {
db1.apply(o).unwrap(); db1.apply(o).unwrap();
} }
let mut db2 = DB::new(); let mut db2 = DB::new_inmemory();
if let Some(ref o) = setup { if let Some(ref o) = setup {
db2.apply(o.clone()).unwrap(); db2.apply(o.clone()).unwrap();
} }

View File

@@ -70,7 +70,7 @@ mod tests {
#[test] #[test]
fn create() { fn create() {
let mut rep = Replica::new(DB::new().into()); let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
rep.create_task(uuid.clone()).unwrap(); rep.create_task(uuid.clone()).unwrap();
@@ -79,7 +79,7 @@ mod tests {
#[test] #[test]
fn delete() { fn delete() {
let mut rep = Replica::new(DB::new().into()); let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
rep.create_task(uuid.clone()).unwrap(); rep.create_task(uuid.clone()).unwrap();
@@ -89,7 +89,7 @@ mod tests {
#[test] #[test]
fn update() { fn update() {
let mut rep = Replica::new(DB::new().into()); let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
rep.create_task(uuid.clone()).unwrap(); rep.create_task(uuid.clone()).unwrap();
@@ -102,7 +102,7 @@ mod tests {
#[test] #[test]
fn get_does_not_exist() { fn get_does_not_exist() {
let rep = Replica::new(DB::new().into()); let rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
assert_eq!(rep.get_task(&uuid), None); assert_eq!(rep.get_task(&uuid), None);
} }

View File

@@ -1,16 +1,16 @@
use crate::errors::Error; use crate::errors::Error;
use crate::operation::Operation; use crate::operation::Operation;
use crate::server::{Server, VersionAdd}; use crate::server::{Server, VersionAdd};
use crate::taskstorage::{InMemoryStorage, TaskMap}; use crate::taskstorage::{TaskMap, TaskStorage};
use failure::Fallible; use failure::Fallible;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::str; use std::str;
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug, Clone)] #[derive(Debug)]
pub struct DB { pub struct DB {
storage: InMemoryStorage, storage: Box<dyn TaskStorage>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@@ -20,11 +20,14 @@ struct Version {
} }
impl DB { impl DB {
/// Create a new, empty database /// Create a new DB with the given backend storage
pub fn new() -> DB { pub fn new(storage: Box<dyn TaskStorage>) -> DB {
DB { DB { storage }
storage: InMemoryStorage::new(), }
}
#[cfg(test)]
pub fn new_inmemory() -> DB {
DB::new(Box::new(crate::taskstorage::InMemoryStorage::new()))
} }
/// Apply an operation to the DB. Aside from synchronization operations, this is the only way /// Apply an operation to the DB. Aside from synchronization operations, this is the only way
@@ -212,7 +215,7 @@ mod tests {
#[test] #[test]
fn test_apply_create() { fn test_apply_create() {
let mut db = DB::new(); let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let op = Operation::Create { uuid }; let op = Operation::Create { uuid };
db.apply(op.clone()).unwrap(); db.apply(op.clone()).unwrap();
@@ -223,7 +226,7 @@ mod tests {
#[test] #[test]
fn test_apply_create_exists() { fn test_apply_create_exists() {
let mut db = DB::new(); let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let op = Operation::Create { uuid }; let op = Operation::Create { uuid };
db.apply(op.clone()).unwrap(); db.apply(op.clone()).unwrap();
@@ -238,7 +241,7 @@ mod tests {
#[test] #[test]
fn test_apply_create_update() { fn test_apply_create_update() {
let mut db = DB::new(); let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid }; let op1 = Operation::Create { uuid };
db.apply(op1.clone()).unwrap(); db.apply(op1.clone()).unwrap();
@@ -259,7 +262,7 @@ mod tests {
#[test] #[test]
fn test_apply_create_update_delete_prop() { fn test_apply_create_update_delete_prop() {
let mut db = DB::new(); let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid }; let op1 = Operation::Create { uuid };
db.apply(op1.clone()).unwrap(); db.apply(op1.clone()).unwrap();
@@ -301,7 +304,7 @@ mod tests {
#[test] #[test]
fn test_apply_update_does_not_exist() { fn test_apply_update_does_not_exist() {
let mut db = DB::new(); let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let op = Operation::Update { let op = Operation::Update {
uuid, uuid,
@@ -320,7 +323,7 @@ mod tests {
#[test] #[test]
fn test_apply_create_delete() { fn test_apply_create_delete() {
let mut db = DB::new(); let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let op1 = Operation::Create { uuid }; let op1 = Operation::Create { uuid };
db.apply(op1.clone()).unwrap(); db.apply(op1.clone()).unwrap();
@@ -334,7 +337,7 @@ mod tests {
#[test] #[test]
fn test_apply_delete_not_present() { fn test_apply_delete_not_present() {
let mut db = DB::new(); let mut db = DB::new_inmemory();
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let op1 = Operation::Delete { uuid }; let op1 = Operation::Delete { uuid };

101
src/taskstorage/inmemory.rs Normal file
View File

@@ -0,0 +1,101 @@
use crate::operation::Operation;
use crate::taskstorage::{TaskMap, TaskStorage};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use uuid::Uuid;
#[derive(PartialEq, Debug, Clone)]
pub struct InMemoryStorage {
// The current state, with all pending operations applied
tasks: HashMap<Uuid, TaskMap>,
// The version at which `operations` begins
base_version: u64,
// Operations applied since `base_version`, in order.
//
// INVARIANT: Given a snapshot at `base_version`, applying these operations produces `tasks`.
operations: Vec<Operation>,
}
impl InMemoryStorage {
pub fn new() -> InMemoryStorage {
InMemoryStorage {
tasks: HashMap::new(),
base_version: 0,
operations: vec![],
}
}
}
impl TaskStorage for InMemoryStorage {
/// Get an (immutable) task, if it is in the storage
fn get_task(&self, uuid: &Uuid) -> Option<TaskMap> {
match self.tasks.get(uuid) {
None => None,
Some(t) => Some(t.clone()),
}
}
/// Create a task, only if it does not already exist. Returns true if
/// the task was created (did not already exist).
fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> bool {
if let ent @ Entry::Vacant(_) = self.tasks.entry(uuid) {
ent.or_insert(task);
true
} else {
false
}
}
/// Set a task, overwriting any existing task.
fn set_task(&mut self, uuid: Uuid, task: TaskMap) {
self.tasks.insert(uuid, task);
}
/// Delete a task, if it exists. Returns true if the task was deleted (already existed)
fn delete_task(&mut self, uuid: &Uuid) -> bool {
if let Some(_) = self.tasks.remove(uuid) {
true
} else {
false
}
}
fn get_task_uuids<'a>(&'a self) -> Box<dyn Iterator<Item = Uuid> + 'a> {
Box::new(self.tasks.keys().map(|u| u.clone()))
}
/// Add an operation to the list of operations in the storage. Note that this merely *stores*
/// the operation; it is up to the TaskDB to apply it.
fn add_operation(&mut self, op: Operation) {
self.operations.push(op);
}
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&self) -> u64 {
return self.base_version;
}
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations<'a>(&'a self) -> Box<dyn Iterator<Item = &'a Operation> + 'a> {
Box::new(self.operations.iter())
}
/// Apply the next version from the server. This replaces the existing base_version and
/// operations. It's up to the caller (TaskDB) to ensure this is done consistently.
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) {
// ensure that we are applying the versions in order..
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = new_operations;
}
/// Record the outstanding operations as synced to the server in the given version.
fn local_operations_synced(&mut self, version: u64) {
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = vec![];
}
}

View File

@@ -1,100 +1,50 @@
use crate::operation::Operation; use crate::Operation;
use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt;
use uuid::Uuid; use uuid::Uuid;
mod inmemory;
pub use inmemory::InMemoryStorage;
/// An in-memory representation of a task as a simple hashmap
pub type TaskMap = HashMap<String, String>; pub type TaskMap = HashMap<String, String>;
#[derive(PartialEq, Debug, Clone)] /// A trait for objects able to act as backing storage for a TaskDB. This API is optimized to be
pub struct InMemoryStorage { /// easy to implement, with all of the semantic meaning of the data located in the TaskDB
// The current state, with all pending operations applied /// implementation, which is the sole consumer of this trait.
tasks: HashMap<Uuid, TaskMap>, pub trait TaskStorage: fmt::Debug {
// The version at which `operations` begins
base_version: u64,
// Operations applied since `base_version`, in order.
//
// INVARIANT: Given a snapshot at `base_version`, applying these operations produces `tasks`.
operations: Vec<Operation>,
}
impl InMemoryStorage {
pub fn new() -> InMemoryStorage {
InMemoryStorage {
tasks: HashMap::new(),
base_version: 0,
operations: vec![],
}
}
/// Get an (immutable) task, if it is in the storage /// Get an (immutable) task, if it is in the storage
pub fn get_task(&self, uuid: &Uuid) -> Option<TaskMap> { fn get_task(&self, uuid: &Uuid) -> Option<TaskMap>;
match self.tasks.get(uuid) {
None => None,
Some(t) => Some(t.clone()),
}
}
/// Create a task, only if it does not already exist. Returns true if /// Create a task, only if it does not already exist. Returns true if
/// the task was created (did not already exist). /// the task was created (did not already exist).
pub fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> bool { fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> bool;
if let ent @ Entry::Vacant(_) = self.tasks.entry(uuid) {
ent.or_insert(task);
true
} else {
false
}
}
/// Set a task, overwriting any existing task. /// Set a task, overwriting any existing task.
pub fn set_task(&mut self, uuid: Uuid, task: TaskMap) { fn set_task(&mut self, uuid: Uuid, task: TaskMap);
self.tasks.insert(uuid, task);
}
/// Delete a task, if it exists. Returns true if the task was deleted (already existed) /// Delete a task, if it exists. Returns true if the task was deleted (already existed)
pub fn delete_task(&mut self, uuid: &Uuid) -> bool { fn delete_task(&mut self, uuid: &Uuid) -> bool;
if let Some(_) = self.tasks.remove(uuid) {
true
} else {
false
}
}
pub fn get_task_uuids<'a>(&'a self) -> impl Iterator<Item = Uuid> + 'a { /// Get the uuids of all tasks in the storage, in undefined order.
self.tasks.keys().map(|u| u.clone()) fn get_task_uuids<'a>(&'a self) -> Box<dyn Iterator<Item = Uuid> + 'a>;
}
/// Add an operation to the list of operations in the storage. Note that this merely *stores* /// Add an operation to the list of operations in the storage. Note that this merely *stores*
/// the operation; it is up to the TaskDB to apply it. /// the operation; it is up to the TaskDB to apply it.
pub fn add_operation(&mut self, op: Operation) { fn add_operation(&mut self, op: Operation);
self.operations.push(op);
}
/// Get the current base_version for this storage -- the last version synced from the server. /// Get the current base_version for this storage -- the last version synced from the server.
pub fn base_version(&self) -> u64 { fn base_version(&self) -> u64;
return self.base_version;
}
/// Get the current set of outstanding operations (operations that have not been sync'd to the /// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet) /// server yet)
pub fn operations(&self) -> impl Iterator<Item = &Operation> { fn operations<'a>(&'a self) -> Box<dyn Iterator<Item = &Operation> + 'a>;
self.operations.iter()
}
/// Apply the next version from the server. This replaces the existing base_version and /// Apply the next version from the server. This replaces the existing base_version and
/// operations. It's up to the caller (TaskDB) to ensure this is done consistently. /// operations. It's up to the caller (TaskDB) to ensure this is done consistently.
pub(crate) fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) { fn update_version(&mut self, version: u64, new_operations: Vec<Operation>);
// ensure that we are applying the versions in order..
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = new_operations;
}
/// Record the outstanding operations as synced to the server in the given version. /// Record the outstanding operations as synced to the server in the given version.
pub(crate) fn local_operations_synced(&mut self, version: u64) { fn local_operations_synced(&mut self, version: u64);
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = vec![];
}
} }

View File

@@ -1,8 +1,12 @@
use chrono::Utc; use chrono::Utc;
use proptest::prelude::*; use proptest::prelude::*;
use taskwarrior_rust::{Operation, DB}; use taskwarrior_rust::{taskstorage, Operation, DB};
use uuid::Uuid; use uuid::Uuid;
fn newdb() -> DB {
DB::new(Box::new(taskstorage::InMemoryStorage::new()))
}
fn uuid_strategy() -> impl Strategy<Value = Uuid> { fn uuid_strategy() -> impl Strategy<Value = Uuid> {
prop_oneof![ prop_oneof![
Just(Uuid::parse_str("83a2f9ef-f455-4195-b92e-a54c161eebfc").unwrap()), Just(Uuid::parse_str("83a2f9ef-f455-4195-b92e-a54c161eebfc").unwrap()),
@@ -37,27 +41,30 @@ proptest! {
fn transform_invariant_holds(o1 in operation_strategy(), o2 in operation_strategy()) { fn transform_invariant_holds(o1 in operation_strategy(), o2 in operation_strategy()) {
let (o1p, o2p) = Operation::transform(o1.clone(), o2.clone()); let (o1p, o2p) = Operation::transform(o1.clone(), o2.clone());
let mut db1 = DB::new(); let mut db1 = newdb();
let mut db2 = newdb();
// Ensure that any expected tasks already exist // Ensure that any expected tasks already exist
if let Operation::Update{ ref uuid, .. } = o1 { if let Operation::Update{ ref uuid, .. } = o1 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()}); let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
} }
if let Operation::Update{ ref uuid, .. } = o2 { if let Operation::Update{ ref uuid, .. } = o2 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()}); let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
} }
if let Operation::Delete{ ref uuid } = o1 { if let Operation::Delete{ ref uuid } = o1 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()}); let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
} }
if let Operation::Delete{ ref uuid } = o2 { if let Operation::Delete{ ref uuid } = o2 {
let _ = db1.apply(Operation::Create{uuid: uuid.clone()}); let _ = db1.apply(Operation::Create{uuid: uuid.clone()});
let _ = db2.apply(Operation::Create{uuid: uuid.clone()});
} }
let mut db2 = db1.clone();
// if applying the initial operations fail, that indicates the operation was invalid // if applying the initial operations fail, that indicates the operation was invalid
// in the base state, so consider the case successful. // in the base state, so consider the case successful.
if let Err(_) = db1.apply(o1) { if let Err(_) = db1.apply(o1) {

View File

@@ -1,15 +1,19 @@
use chrono::Utc; use chrono::Utc;
use taskwarrior_rust::{Operation, Server, DB}; use taskwarrior_rust::{taskstorage, Operation, Server, DB};
use uuid::Uuid; use uuid::Uuid;
fn newdb() -> DB {
DB::new(Box::new(taskstorage::InMemoryStorage::new()))
}
#[test] #[test]
fn test_sync() { fn test_sync() {
let mut server = Server::new(); let mut server = Server::new();
let mut db1 = DB::new(); let mut db1 = newdb();
db1.sync("me", &mut server); db1.sync("me", &mut server);
let mut db2 = DB::new(); let mut db2 = newdb();
db2.sync("me", &mut server); db2.sync("me", &mut server);
// make some changes in parallel to db1 and db2.. // make some changes in parallel to db1 and db2..
@@ -66,10 +70,10 @@ fn test_sync() {
fn test_sync_create_delete() { fn test_sync_create_delete() {
let mut server = Server::new(); let mut server = Server::new();
let mut db1 = DB::new(); let mut db1 = newdb();
db1.sync("me", &mut server); db1.sync("me", &mut server);
let mut db2 = DB::new(); let mut db2 = newdb();
db2.sync("me", &mut server); db2.sync("me", &mut server);
// create and update a task.. // create and update a task..

View File

@@ -1,8 +1,12 @@
use chrono::Utc; use chrono::Utc;
use proptest::prelude::*; use proptest::prelude::*;
use taskwarrior_rust::{Operation, Server, DB}; use taskwarrior_rust::{taskstorage, Operation, Server, DB};
use uuid::Uuid; use uuid::Uuid;
fn newdb() -> DB {
DB::new(Box::new(taskstorage::InMemoryStorage::new()))
}
#[derive(Debug)] #[derive(Debug)]
enum Action { enum Action {
Op(Operation), Op(Operation),
@@ -43,7 +47,7 @@ proptest! {
// another. So, the generated sequences focus on a single task UUID. // another. So, the generated sequences focus on a single task UUID.
fn transform_sequences_of_operations(action_sequence in action_sequence_strategy()) { fn transform_sequences_of_operations(action_sequence in action_sequence_strategy()) {
let mut server = Server::new(); let mut server = Server::new();
let mut dbs = [DB::new(), DB::new(), DB::new()]; let mut dbs = [newdb(), newdb(), newdb()];
for (action, db) in action_sequence { for (action, db) in action_sequence {
println!("{:?} on db {}", action, db); println!("{:?} on db {}", action, db);