make the TaskStorage API fallible everywhere

This commit is contained in:
Dustin J. Mitchell
2020-01-05 15:39:34 -05:00
parent 611b1cd68f
commit afd11d08a7
11 changed files with 241 additions and 75 deletions

View File

@@ -27,7 +27,7 @@ fn main() {
.unwrap();
}
("list", _) => {
for task in replica.all_tasks() {
for task in replica.all_tasks().unwrap() {
println!("{:?}", task);
}
}

View File

@@ -47,17 +47,17 @@ impl Replica {
}
/// Get all tasks as an iterator of (&Uuid, &HashMap)
pub fn all_tasks<'a>(&'a self) -> impl Iterator<Item = (Uuid, TaskMap)> + 'a {
pub fn all_tasks<'a>(&'a self) -> Fallible<impl Iterator<Item = (Uuid, TaskMap)> + 'a> {
self.taskdb.all_tasks()
}
/// Get the UUIDs of all tasks
pub fn all_task_uuids<'a>(&'a self) -> impl Iterator<Item = Uuid> + 'a {
pub fn all_task_uuids<'a>(&'a self) -> Fallible<impl Iterator<Item = Uuid> + 'a> {
self.taskdb.all_task_uuids()
}
/// Get an existing task by its UUID
pub fn get_task(&self, uuid: &Uuid) -> Option<TaskMap> {
pub fn get_task(&self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
self.taskdb.get_task(&uuid)
}
}
@@ -74,7 +74,7 @@ mod tests {
let uuid = Uuid::new_v4();
rep.create_task(uuid.clone()).unwrap();
assert_eq!(rep.get_task(&uuid), Some(TaskMap::new()));
assert_eq!(rep.get_task(&uuid).unwrap(), Some(TaskMap::new()));
}
#[test]
@@ -84,7 +84,7 @@ mod tests {
rep.create_task(uuid.clone()).unwrap();
rep.delete_task(uuid.clone()).unwrap();
assert_eq!(rep.get_task(&uuid), None);
assert_eq!(rep.get_task(&uuid).unwrap(), None);
}
#[test]
@@ -97,13 +97,13 @@ mod tests {
.unwrap();
let mut task = TaskMap::new();
task.insert("title".into(), "snarsblat".into());
assert_eq!(rep.get_task(&uuid), Some(task));
assert_eq!(rep.get_task(&uuid).unwrap(), Some(task));
}
#[test]
fn get_does_not_exist() {
let rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4();
assert_eq!(rep.get_task(&uuid), None);
assert_eq!(rep.get_task(&uuid).unwrap(), None);
}
}

View File

@@ -34,10 +34,11 @@ impl DB {
/// to modify the DB. In cases where an operation does not make sense, this function will do
/// nothing and return an error (but leave the DB in a consistent state).
pub fn apply(&mut self, op: Operation) -> Fallible<()> {
// TODO: differentiate error types here?
if let err @ Err(_) = self.apply_op(&op) {
return err;
}
self.storage.add_operation(op);
self.storage.add_operation(op)?;
Ok(())
}
@@ -45,12 +46,12 @@ impl DB {
match op {
&Operation::Create { uuid } => {
// insert if the task does not already exist
if !self.storage.create_task(uuid, HashMap::new()) {
if !self.storage.create_task(uuid, HashMap::new())? {
return Err(Error::DBError(format!("Task {} already exists", uuid)).into());
}
}
&Operation::Delete { ref uuid } => {
if !self.storage.delete_task(uuid) {
if !self.storage.delete_task(uuid)? {
return Err(Error::DBError(format!("Task {} does not exist", uuid)).into());
}
}
@@ -61,13 +62,13 @@ impl DB {
timestamp: _,
} => {
// update if this task exists, otherwise ignore
if let Some(task) = self.storage.get_task(uuid) {
if let Some(task) = self.storage.get_task(uuid)? {
let mut task = task.clone();
match value {
Some(ref val) => task.insert(property.to_string(), val.clone()),
None => task.remove(property),
};
self.storage.set_task(uuid.clone(), task);
self.storage.set_task(uuid.clone(), task)?;
} else {
return Err(Error::DBError(format!("Task {} does not exist", uuid)).into());
}
@@ -78,38 +79,41 @@ impl DB {
}
/// Get all tasks. This is not a terribly efficient operation.
pub fn all_tasks<'a>(&'a self) -> impl Iterator<Item = (Uuid, TaskMap)> + 'a {
self.all_task_uuids()
.map(move |u| (u, self.get_task(&u).unwrap()))
pub fn all_tasks<'a>(&'a self) -> Fallible<impl Iterator<Item = (Uuid, TaskMap)> + 'a> {
Ok(self
.all_task_uuids()?
// TODO: don't unwrap result (just option)
.map(move |u| (u, self.get_task(&u).unwrap().unwrap())))
}
/// Get the UUIDs of all tasks
pub fn all_task_uuids<'a>(&'a self) -> impl Iterator<Item = Uuid> + 'a {
pub fn all_task_uuids<'a>(&'a self) -> Fallible<impl Iterator<Item = Uuid> + 'a> {
self.storage.get_task_uuids()
}
/// Get a single task, by uuid.
pub fn get_task(&self, uuid: &Uuid) -> Option<TaskMap> {
pub fn get_task(&self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
self.storage.get_task(uuid)
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub fn sync(&mut self, username: &str, server: &mut Server) {
pub fn sync(&mut self, username: &str, server: &mut Server) -> Fallible<()> {
// retry synchronizing until the server accepts our version (this allows for races between
// replicas trying to sync to the same server)
loop {
// first pull changes and "rebase" on top of them
let new_versions = server.get_versions(username, self.storage.base_version());
let new_versions = server.get_versions(username, self.storage.base_version()?);
for version_blob in new_versions {
let version_str = str::from_utf8(&version_blob).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();
assert_eq!(version.version, self.storage.base_version() + 1);
assert_eq!(version.version, self.storage.base_version()? + 1);
println!("applying version {:?} from server", version.version);
self.apply_version(version);
self.apply_version(version)?;
}
let operations: Vec<Operation> = self.storage.operations().map(|o| o.clone()).collect();
let operations: Vec<Operation> =
self.storage.operations()?.map(|o| o.clone()).collect();
if operations.len() == 0 {
// nothing to sync back to the server..
break;
@@ -117,7 +121,7 @@ impl DB {
// now make a version of our local changes and push those
let new_version = Version {
version: self.storage.base_version() + 1,
version: self.storage.base_version()? + 1,
operations: operations,
};
let new_version_str = serde_json::to_string(&new_version).unwrap();
@@ -125,13 +129,15 @@ impl DB {
if let VersionAdd::Ok =
server.add_version(username, new_version.version, new_version_str.into())
{
self.storage.local_operations_synced(new_version.version);
self.storage.local_operations_synced(new_version.version)?;
break;
}
}
Ok(())
}
fn apply_version(&mut self, mut version: Version) {
fn apply_version(&mut self, mut version: Version) -> Fallible<()> {
// The situation here is that the server has already applied all server operations, and we
// have already applied all local operations, so states have diverged by several
// operations. We need to figure out what operations to apply locally and on the server in
@@ -158,7 +164,7 @@ impl DB {
// indicating no operation is required. If this happens for a local op, we can just omit
// it. If it happens for server op, then we must copy the remaining local ops.
let mut local_operations: Vec<Operation> =
self.storage.operations().map(|o| o.clone()).collect();
self.storage.operations()?.map(|o| o.clone()).collect();
for server_op in version.operations.drain(..) {
let mut new_local_ops = Vec::with_capacity(local_operations.len());
let mut svr_op = Some(server_op);
@@ -181,7 +187,8 @@ impl DB {
local_operations = new_local_ops;
}
self.storage
.update_version(version.version, local_operations);
.update_version(version.version, local_operations)?;
Ok(())
}
// functions for supporting tests
@@ -189,6 +196,7 @@ impl DB {
pub fn sorted_tasks(&self) -> Vec<(Uuid, Vec<(String, String)>)> {
let mut res: Vec<(Uuid, Vec<(String, String)>)> = self
.all_tasks()
.unwrap()
.map(|(u, t)| {
let mut t = t
.iter()
@@ -203,7 +211,11 @@ impl DB {
}
pub fn operations(&self) -> Vec<Operation> {
self.storage.operations().map(|o| o.clone()).collect()
self.storage
.operations()
.unwrap()
.map(|o| o.clone())
.collect()
}
}

View File

@@ -1,5 +1,6 @@
use crate::operation::Operation;
use crate::taskstorage::{TaskMap, TaskStorage};
use failure::Fallible;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use uuid::Uuid;
@@ -30,72 +31,76 @@ impl InMemoryStorage {
impl TaskStorage for InMemoryStorage {
/// Get an (immutable) task, if it is in the storage
fn get_task(&self, uuid: &Uuid) -> Option<TaskMap> {
fn get_task(&self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
match self.tasks.get(uuid) {
None => None,
Some(t) => Some(t.clone()),
None => Ok(None),
Some(t) => Ok(Some(t.clone())),
}
}
/// Create a task, only if it does not already exist. Returns true if
/// the task was created (did not already exist).
fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> bool {
fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<bool> {
if let ent @ Entry::Vacant(_) = self.tasks.entry(uuid) {
ent.or_insert(task);
true
Ok(true)
} else {
false
Ok(false)
}
}
/// Set a task, overwriting any existing task.
fn set_task(&mut self, uuid: Uuid, task: TaskMap) {
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()> {
self.tasks.insert(uuid, task);
Ok(())
}
/// Delete a task, if it exists. Returns true if the task was deleted (already existed)
fn delete_task(&mut self, uuid: &Uuid) -> bool {
fn delete_task(&mut self, uuid: &Uuid) -> Fallible<bool> {
if let Some(_) = self.tasks.remove(uuid) {
true
Ok(true)
} else {
false
Ok(false)
}
}
fn get_task_uuids<'a>(&'a self) -> Box<dyn Iterator<Item = Uuid> + 'a> {
Box::new(self.tasks.keys().map(|u| u.clone()))
fn get_task_uuids<'a>(&'a self) -> Fallible<Box<dyn Iterator<Item = Uuid> + 'a>> {
Ok(Box::new(self.tasks.keys().map(|u| u.clone())))
}
/// Add an operation to the list of operations in the storage. Note that this merely *stores*
/// the operation; it is up to the TaskDB to apply it.
fn add_operation(&mut self, op: Operation) {
fn add_operation(&mut self, op: Operation) -> Fallible<()> {
self.operations.push(op);
Ok(())
}
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&self) -> u64 {
return self.base_version;
fn base_version(&self) -> Fallible<u64> {
Ok(self.base_version)
}
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations<'a>(&'a self) -> Box<dyn Iterator<Item = &'a Operation> + 'a> {
Box::new(self.operations.iter())
fn operations<'a>(&'a self) -> Fallible<Box<dyn Iterator<Item = &'a Operation> + 'a>> {
Ok(Box::new(self.operations.iter()))
}
/// Apply the next version from the server. This replaces the existing base_version and
/// operations. It's up to the caller (TaskDB) to ensure this is done consistently.
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) {
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) -> Fallible<()> {
// ensure that we are applying the versions in order..
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = new_operations;
Ok(())
}
/// Record the outstanding operations as synced to the server in the given version.
fn local_operations_synced(&mut self, version: u64) {
fn local_operations_synced(&mut self, version: u64) -> Fallible<()> {
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = vec![];
Ok(())
}
}

95
src/taskstorage/lmdb.rs Normal file
View File

@@ -0,0 +1,95 @@
use crate::operation::Operation;
use crate::taskstorage::{TaskMap, TaskStorage};
use kv::{Config, Error, Manager, ValueRef};
use uuid::Uuid;
pub struct KVStorage {
// TODO: make the manager global with lazy-static
manager: Manager,
config: Config,
}
impl KVStorage {
pub fn new(directory: &str) -> KVStorage {
let mut config = Config::default(directory);
config.bucket("base_version", None);
config.bucket("operations", None);
config.bucket("tasks", None);
KVStorage {
manager: Manager::new(),
config,
}
}
}
impl TaskStorage for KVStorage {
/// Get an (immutable) task, if it is in the storage
fn get_task(&self, uuid: &Uuid) -> Option<TaskMap> {
match self.tasks.get(uuid) {
None => None,
Some(t) => Some(t.clone()),
}
}
/// Create a task, only if it does not already exist. Returns true if
/// the task was created (did not already exist).
fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> bool {
if let ent @ Entry::Vacant(_) = self.tasks.entry(uuid) {
ent.or_insert(task);
true
} else {
false
}
}
/// Set a task, overwriting any existing task.
fn set_task(&mut self, uuid: Uuid, task: TaskMap) {
self.tasks.insert(uuid, task);
}
/// Delete a task, if it exists. Returns true if the task was deleted (already existed)
fn delete_task(&mut self, uuid: &Uuid) -> bool {
if let Some(_) = self.tasks.remove(uuid) {
true
} else {
false
}
}
fn get_task_uuids<'a>(&'a self) -> Box<dyn Iterator<Item = Uuid> + 'a> {
Box::new(self.tasks.keys().map(|u| u.clone()))
}
/// Add an operation to the list of operations in the storage. Note that this merely *stores*
/// the operation; it is up to the TaskDB to apply it.
fn add_operation(&mut self, op: Operation) {
self.operations.push(op);
}
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&self) -> u64 {
return self.base_version;
}
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations<'a>(&'a self) -> Box<dyn Iterator<Item = &'a Operation> + 'a> {
Box::new(self.operations.iter())
}
/// Apply the next version from the server. This replaces the existing base_version and
/// operations. It's up to the caller (TaskDB) to ensure this is done consistently.
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) {
// ensure that we are applying the versions in order..
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = new_operations;
}
/// Record the outstanding operations as synced to the server in the given version.
fn local_operations_synced(&mut self, version: u64) {
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = vec![];
}
}

View File

@@ -1,4 +1,5 @@
use crate::Operation;
use failure::Fallible;
use std::collections::HashMap;
use std::fmt;
use uuid::Uuid;
@@ -15,36 +16,36 @@ pub type TaskMap = HashMap<String, String>;
/// implementation, which is the sole consumer of this trait.
pub trait TaskStorage: fmt::Debug {
/// Get an (immutable) task, if it is in the storage
fn get_task(&self, uuid: &Uuid) -> Option<TaskMap>;
fn get_task(&self, uuid: &Uuid) -> Fallible<Option<TaskMap>>;
/// Create a task, only if it does not already exist. Returns true if
/// the task was created (did not already exist).
fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> bool;
fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<bool>;
/// Set a task, overwriting any existing task.
fn set_task(&mut self, uuid: Uuid, task: TaskMap);
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()>;
/// Delete a task, if it exists. Returns true if the task was deleted (already existed)
fn delete_task(&mut self, uuid: &Uuid) -> bool;
fn delete_task(&mut self, uuid: &Uuid) -> Fallible<bool>;
/// Get the uuids of all tasks in the storage, in undefined order.
fn get_task_uuids<'a>(&'a self) -> Box<dyn Iterator<Item = Uuid> + 'a>;
fn get_task_uuids<'a>(&'a self) -> Fallible<Box<dyn Iterator<Item = Uuid> + 'a>>;
/// Add an operation to the list of operations in the storage. Note that this merely *stores*
/// the operation; it is up to the TaskDB to apply it.
fn add_operation(&mut self, op: Operation);
fn add_operation(&mut self, op: Operation) -> Fallible<()>;
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&self) -> u64;
fn base_version(&self) -> Fallible<u64>;
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations<'a>(&'a self) -> Box<dyn Iterator<Item = &Operation> + 'a>;
fn operations<'a>(&'a self) -> Fallible<Box<dyn Iterator<Item = &Operation> + 'a>>;
/// Apply the next version from the server. This replaces the existing base_version and
/// operations. It's up to the caller (TaskDB) to ensure this is done consistently.
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>);
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) -> Fallible<()>;
/// Record the outstanding operations as synced to the server in the given version.
fn local_operations_synced(&mut self, version: u64);
fn local_operations_synced(&mut self, version: u64) -> Fallible<()>;
}