task storge implementation based on kv / LMDB

This commit is contained in:
Dustin J. Mitchell
2020-01-05 18:28:43 -05:00
parent afd11d08a7
commit 2f973d3e62
10 changed files with 884 additions and 210 deletions

View File

@@ -1,5 +1,6 @@
// TODO: remove this eventually when there's an API
#![allow(dead_code)]
#![allow(unused_variables)]
#[macro_use]
extern crate failure;

View File

@@ -47,17 +47,17 @@ impl Replica {
}
/// Get all tasks as an iterator of (&Uuid, &HashMap)
pub fn all_tasks<'a>(&'a self) -> Fallible<impl Iterator<Item = (Uuid, TaskMap)> + 'a> {
pub fn all_tasks<'a>(&'a mut self) -> Fallible<Vec<(Uuid, TaskMap)>> {
self.taskdb.all_tasks()
}
/// Get the UUIDs of all tasks
pub fn all_task_uuids<'a>(&'a self) -> Fallible<impl Iterator<Item = Uuid> + 'a> {
pub fn all_task_uuids<'a>(&'a mut self) -> Fallible<Vec<Uuid>> {
self.taskdb.all_task_uuids()
}
/// Get an existing task by its UUID
pub fn get_task(&self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
pub fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
self.taskdb.get_task(&uuid)
}
}
@@ -102,7 +102,7 @@ mod tests {
#[test]
fn get_does_not_exist() {
let rep = Replica::new(DB::new_inmemory().into());
let mut rep = Replica::new(DB::new_inmemory().into());
let uuid = Uuid::new_v4();
assert_eq!(rep.get_task(&uuid).unwrap(), None);
}

View File

@@ -1,14 +1,12 @@
use crate::errors::Error;
use crate::operation::Operation;
use crate::server::{Server, VersionAdd};
use crate::taskstorage::{TaskMap, TaskStorage};
use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn};
use failure::Fallible;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str;
use uuid::Uuid;
#[derive(Debug)]
pub struct DB {
storage: Box<dyn TaskStorage>,
}
@@ -35,23 +33,25 @@ impl DB {
/// nothing and return an error (but leave the DB in a consistent state).
pub fn apply(&mut self, op: Operation) -> Fallible<()> {
// TODO: differentiate error types here?
if let err @ Err(_) = self.apply_op(&op) {
let mut txn = self.storage.txn()?;
if let err @ Err(_) = DB::apply_op(txn.as_mut(), &op) {
return err;
}
self.storage.add_operation(op)?;
txn.add_operation(op)?;
txn.commit()?;
Ok(())
}
fn apply_op(&mut self, op: &Operation) -> Fallible<()> {
fn apply_op(txn: &mut dyn TaskStorageTxn, op: &Operation) -> Fallible<()> {
match op {
&Operation::Create { uuid } => {
// insert if the task does not already exist
if !self.storage.create_task(uuid, HashMap::new())? {
if !txn.create_task(uuid)? {
return Err(Error::DBError(format!("Task {} already exists", uuid)).into());
}
}
&Operation::Delete { ref uuid } => {
if !self.storage.delete_task(uuid)? {
if !txn.delete_task(uuid)? {
return Err(Error::DBError(format!("Task {} does not exist", uuid)).into());
}
}
@@ -62,13 +62,13 @@ impl DB {
timestamp: _,
} => {
// update if this task exists, otherwise ignore
if let Some(task) = self.storage.get_task(uuid)? {
if let Some(task) = txn.get_task(uuid)? {
let mut task = task.clone();
match value {
Some(ref val) => task.insert(property.to_string(), val.clone()),
None => task.remove(property),
};
self.storage.set_task(uuid.clone(), task)?;
txn.set_task(uuid.clone(), task)?;
} else {
return Err(Error::DBError(format!("Task {} does not exist", uuid)).into());
}
@@ -78,42 +78,43 @@ impl DB {
Ok(())
}
/// Get all tasks. This is not a terribly efficient operation.
pub fn all_tasks<'a>(&'a self) -> Fallible<impl Iterator<Item = (Uuid, TaskMap)> + 'a> {
Ok(self
.all_task_uuids()?
// TODO: don't unwrap result (just option)
.map(move |u| (u, self.get_task(&u).unwrap().unwrap())))
/// Get all tasks.
pub fn all_tasks<'a>(&'a mut self) -> Fallible<Vec<(Uuid, TaskMap)>> {
let mut txn = self.storage.txn()?;
txn.all_tasks()
}
/// Get the UUIDs of all tasks
pub fn all_task_uuids<'a>(&'a self) -> Fallible<impl Iterator<Item = Uuid> + 'a> {
self.storage.get_task_uuids()
pub fn all_task_uuids<'a>(&'a mut self) -> Fallible<Vec<Uuid>> {
let mut txn = self.storage.txn()?;
txn.all_task_uuids()
}
/// Get a single task, by uuid.
pub fn get_task(&self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
self.storage.get_task(uuid)
pub fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
let mut txn = self.storage.txn()?;
txn.get_task(uuid)
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub fn sync(&mut self, username: &str, server: &mut Server) -> Fallible<()> {
let mut txn = self.storage.txn()?;
// retry synchronizing until the server accepts our version (this allows for races between
// replicas trying to sync to the same server)
loop {
// first pull changes and "rebase" on top of them
let new_versions = server.get_versions(username, self.storage.base_version()?);
let new_versions = server.get_versions(username, txn.base_version()?);
for version_blob in new_versions {
let version_str = str::from_utf8(&version_blob).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();
assert_eq!(version.version, self.storage.base_version()? + 1);
assert_eq!(version.version, txn.base_version()? + 1);
println!("applying version {:?} from server", version.version);
self.apply_version(version)?;
DB::apply_version(txn.as_mut(), version)?;
}
let operations: Vec<Operation> =
self.storage.operations()?.map(|o| o.clone()).collect();
let operations: Vec<Operation> = txn.operations()?.iter().map(|o| o.clone()).collect();
if operations.len() == 0 {
// nothing to sync back to the server..
break;
@@ -121,7 +122,7 @@ impl DB {
// now make a version of our local changes and push those
let new_version = Version {
version: self.storage.base_version()? + 1,
version: txn.base_version()? + 1,
operations: operations,
};
let new_version_str = serde_json::to_string(&new_version).unwrap();
@@ -129,15 +130,16 @@ impl DB {
if let VersionAdd::Ok =
server.add_version(username, new_version.version, new_version_str.into())
{
self.storage.local_operations_synced(new_version.version)?;
txn.local_operations_synced(new_version.version)?;
break;
}
}
txn.commit()?;
Ok(())
}
fn apply_version(&mut self, mut version: Version) -> Fallible<()> {
fn apply_version(txn: &mut dyn TaskStorageTxn, mut version: Version) -> Fallible<()> {
// The situation here is that the server has already applied all server operations, and we
// have already applied all local operations, so states have diverged by several
// operations. We need to figure out what operations to apply locally and on the server in
@@ -163,8 +165,7 @@ impl DB {
// This is slightly complicated by the fact that the transform function can return None,
// indicating no operation is required. If this happens for a local op, we can just omit
// it. If it happens for server op, then we must copy the remaining local ops.
let mut local_operations: Vec<Operation> =
self.storage.operations()?.map(|o| o.clone()).collect();
let mut local_operations: Vec<Operation> = txn.operations()?;
for server_op in version.operations.drain(..) {
let mut new_local_ops = Vec::with_capacity(local_operations.len());
let mut svr_op = Some(server_op);
@@ -180,40 +181,41 @@ impl DB {
}
}
if let Some(o) = svr_op {
if let Err(e) = self.apply_op(&o) {
if let Err(e) = DB::apply_op(txn, &o) {
println!("Invalid operation when syncing: {} (ignored)", e);
}
}
local_operations = new_local_ops;
}
self.storage
.update_version(version.version, local_operations)?;
txn.update_version(version.version, local_operations)?;
Ok(())
}
// functions for supporting tests
pub fn sorted_tasks(&self) -> Vec<(Uuid, Vec<(String, String)>)> {
pub fn sorted_tasks(&mut self) -> Vec<(Uuid, Vec<(String, String)>)> {
let mut res: Vec<(Uuid, Vec<(String, String)>)> = self
.all_tasks()
.unwrap()
.iter()
.map(|(u, t)| {
let mut t = t
.iter()
.map(|(p, v)| (p.clone(), v.clone()))
.collect::<Vec<(String, String)>>();
t.sort();
(u, t)
(u.clone(), t)
})
.collect();
res.sort();
res
}
pub fn operations(&self) -> Vec<Operation> {
self.storage
.operations()
pub fn operations(&mut self) -> Vec<Operation> {
let mut txn = self.storage.txn().unwrap();
txn.operations()
.unwrap()
.iter()
.map(|o| o.clone())
.collect()
}
@@ -223,6 +225,7 @@ impl DB {
mod tests {
use super::*;
use chrono::Utc;
use std::collections::HashMap;
use uuid::Uuid;
#[test]

View File

@@ -1,106 +1,145 @@
use crate::operation::Operation;
use crate::taskstorage::{TaskMap, TaskStorage};
use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn};
use failure::Fallible;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use uuid::Uuid;
#[derive(PartialEq, Debug, Clone)]
pub struct InMemoryStorage {
// The current state, with all pending operations applied
struct Data {
tasks: HashMap<Uuid, TaskMap>,
// The version at which `operations` begins
base_version: u64,
// Operations applied since `base_version`, in order.
//
// INVARIANT: Given a snapshot at `base_version`, applying these operations produces `tasks`.
operations: Vec<Operation>,
}
impl InMemoryStorage {
pub fn new() -> InMemoryStorage {
InMemoryStorage {
tasks: HashMap::new(),
base_version: 0,
operations: vec![],
struct Txn<'t> {
storage: &'t mut InMemoryStorage,
new_data: Option<Data>,
}
impl<'t> Txn<'t> {
fn mut_data_ref(&mut self) -> &mut Data {
if self.new_data.is_none() {
self.new_data = Some(self.storage.data.clone());
}
if let Some(ref mut data) = self.new_data {
data
} else {
unreachable!();
}
}
fn data_ref(&mut self) -> &Data {
if let Some(ref data) = self.new_data {
data
} else {
&self.storage.data
}
}
}
impl TaskStorage for InMemoryStorage {
/// Get an (immutable) task, if it is in the storage
fn get_task(&self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
match self.tasks.get(uuid) {
impl<'t> TaskStorageTxn for Txn<'t> {
fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
match self.data_ref().tasks.get(uuid) {
None => Ok(None),
Some(t) => Ok(Some(t.clone())),
}
}
/// Create a task, only if it does not already exist. Returns true if
/// the task was created (did not already exist).
fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<bool> {
if let ent @ Entry::Vacant(_) = self.tasks.entry(uuid) {
ent.or_insert(task);
fn create_task(&mut self, uuid: Uuid) -> Fallible<bool> {
if let ent @ Entry::Vacant(_) = self.mut_data_ref().tasks.entry(uuid) {
ent.or_insert(TaskMap::new());
Ok(true)
} else {
Ok(false)
}
}
/// Set a task, overwriting any existing task.
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()> {
self.tasks.insert(uuid, task);
self.mut_data_ref().tasks.insert(uuid, task);
Ok(())
}
/// Delete a task, if it exists. Returns true if the task was deleted (already existed)
fn delete_task(&mut self, uuid: &Uuid) -> Fallible<bool> {
if let Some(_) = self.tasks.remove(uuid) {
if let Some(_) = self.mut_data_ref().tasks.remove(uuid) {
Ok(true)
} else {
Ok(false)
}
}
fn get_task_uuids<'a>(&'a self) -> Fallible<Box<dyn Iterator<Item = Uuid> + 'a>> {
Ok(Box::new(self.tasks.keys().map(|u| u.clone())))
fn all_tasks<'a>(&mut self) -> Fallible<Vec<(Uuid, TaskMap)>> {
Ok(self
.data_ref()
.tasks
.iter()
.map(|(u, t)| (u.clone(), t.clone()))
.collect())
}
fn all_task_uuids<'a>(&mut self) -> Fallible<Vec<Uuid>> {
Ok(self.data_ref().tasks.keys().map(|u| u.clone()).collect())
}
/// Add an operation to the list of operations in the storage. Note that this merely *stores*
/// the operation; it is up to the TaskDB to apply it.
fn add_operation(&mut self, op: Operation) -> Fallible<()> {
self.operations.push(op);
self.mut_data_ref().operations.push(op);
Ok(())
}
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&self) -> Fallible<u64> {
Ok(self.base_version)
fn base_version(&mut self) -> Fallible<u64> {
Ok(self.data_ref().base_version)
}
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations<'a>(&'a self) -> Fallible<Box<dyn Iterator<Item = &'a Operation> + 'a>> {
Ok(Box::new(self.operations.iter()))
fn operations(&mut self) -> Fallible<Vec<Operation>> {
Ok(self.data_ref().operations.clone())
}
/// Apply the next version from the server. This replaces the existing base_version and
/// operations. It's up to the caller (TaskDB) to ensure this is done consistently.
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) -> Fallible<()> {
// ensure that we are applying the versions in order..
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = new_operations;
assert_eq!(version, self.data_ref().base_version + 1);
self.mut_data_ref().base_version = version;
self.mut_data_ref().operations = new_operations;
Ok(())
}
/// Record the outstanding operations as synced to the server in the given version.
fn local_operations_synced(&mut self, version: u64) -> Fallible<()> {
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = vec![];
assert_eq!(version, self.data_ref().base_version + 1);
self.mut_data_ref().base_version = version;
self.mut_data_ref().operations = vec![];
Ok(())
}
fn commit(&mut self) -> Fallible<()> {
// copy the new_data back into storage to commit the transaction
if let Some(data) = self.new_data.take() {
self.storage.data = data;
}
Ok(())
}
}
#[derive(PartialEq, Debug, Clone)]
pub struct InMemoryStorage {
data: Data,
}
impl InMemoryStorage {
pub fn new() -> InMemoryStorage {
InMemoryStorage {
data: Data {
tasks: HashMap::new(),
base_version: 0,
operations: vec![],
},
}
}
}
impl TaskStorage for InMemoryStorage {
fn txn<'a>(&'a mut self) -> Fallible<Box<dyn TaskStorageTxn + 'a>> {
Ok(Box::new(Txn {
storage: self,
new_data: None,
}))
}
}

621
src/taskstorage/kv.rs Normal file
View File

@@ -0,0 +1,621 @@
use crate::operation::Operation;
use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn};
use failure::Fallible;
use kv::msgpack::Msgpack;
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
use std::convert::TryInto;
use std::path::Path;
use uuid::Uuid;
/// A representation of a UUID as a key. This is just a newtype wrapping the 128-bit packed form
/// of a UUID.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
struct Key(uuid::Bytes);
impl From<&[u8]> for Key {
fn from(bytes: &[u8]) -> Key {
let key = Key(bytes.try_into().unwrap());
key
}
}
impl From<&Uuid> for Key {
fn from(uuid: &Uuid) -> Key {
let key = Key(uuid.as_bytes().clone());
key
}
}
impl From<Uuid> for Key {
fn from(uuid: Uuid) -> Key {
let key = Key(uuid.as_bytes().clone());
key
}
}
impl From<Key> for Uuid {
fn from(key: Key) -> Uuid {
Uuid::from_bytes(key.0)
}
}
impl AsRef<[u8]> for Key {
fn as_ref(&self) -> &[u8] {
&self.0[..]
}
}
/// KVStorage is an on-disk storage backend which uses LMDB via the `kv` crate.
pub struct KVStorage<'t> {
store: Store,
tasks_bucket: Bucket<'t, Key, ValueBuf<Msgpack<TaskMap>>>,
numbers_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<u64>>>,
operations_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>>,
}
const BASE_VERSION: u64 = 1;
const NEXT_OPERATION: u64 = 2;
impl<'t> KVStorage<'t> {
pub fn new(directory: &Path) -> Fallible<KVStorage> {
let mut config = Config::default(directory);
config.bucket("tasks", None);
config.bucket("numbers", None);
config.bucket("operations", None);
let store = Store::new(config)?;
// tasks are stored indexed by uuid
let tasks_bucket = store.bucket::<Key, ValueBuf<Msgpack<TaskMap>>>(Some("tasks"))?;
// this bucket contains various u64s, indexed by constants above
let numbers_bucket = store.int_bucket::<ValueBuf<Msgpack<u64>>>(Some("numbers"))?;
// this bucket contains operations, numbered consecutively
let operations_bucket =
store.int_bucket::<ValueBuf<Msgpack<Operation>>>(Some("operations"))?;
Ok(KVStorage {
store,
tasks_bucket,
numbers_bucket,
operations_bucket,
})
}
}
impl<'t> TaskStorage for KVStorage<'t> {
fn txn<'a>(&'a mut self) -> Fallible<Box<dyn TaskStorageTxn + 'a>> {
Ok(Box::new(Txn {
storage: self,
txn: Some(self.store.write_txn()?),
}))
}
}
struct Txn<'t> {
storage: &'t KVStorage<'t>,
txn: Option<kv::Txn<'t>>,
}
impl<'t> Txn<'t> {
// get the underlying kv Txn
fn kvtxn<'a>(&mut self) -> &mut kv::Txn<'t> {
if let Some(ref mut txn) = self.txn {
txn
} else {
panic!("cannot use transaction after commit");
}
}
// Access to buckets
fn tasks_bucket(&self) -> &'t Bucket<'t, Key, ValueBuf<Msgpack<TaskMap>>> {
&self.storage.tasks_bucket
}
fn numbers_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<u64>>> {
&self.storage.numbers_bucket
}
fn operations_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>> {
&self.storage.operations_bucket
}
}
impl<'t> TaskStorageTxn for Txn<'t> {
fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
let bucket = self.tasks_bucket();
let buf = match self.kvtxn().get(bucket, uuid.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(None),
Err(e) => return Err(e.into()),
};
let value = buf.inner()?.to_serde();
Ok(Some(value))
}
fn create_task(&mut self, uuid: Uuid) -> Fallible<bool> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
match kvtxn.get(bucket, uuid.into()) {
Err(Error::NotFound) => {
kvtxn.set(bucket, uuid.into(), Msgpack::to_value_buf(TaskMap::new())?)?;
Ok(true)
}
Err(e) => Err(e.into()),
Ok(_) => Ok(false),
}
}
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
kvtxn.set(bucket, uuid.into(), Msgpack::to_value_buf(task)?)?;
Ok(())
}
fn delete_task(&mut self, uuid: &Uuid) -> Fallible<bool> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
match kvtxn.del(bucket, uuid.into()) {
Err(Error::NotFound) => Ok(false),
Err(e) => Err(e.into()),
Ok(_) => Ok(true),
}
}
fn all_tasks(&mut self) -> Fallible<Vec<(Uuid, TaskMap)>> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
let curs = kvtxn.read_cursor(bucket)?;
let all_tasks: Result<Vec<(Uuid, TaskMap)>, Error> = kvtxn
.read_cursor(bucket)?
.iter()
.map(|(k, v)| Ok((k.into(), v.inner()?.to_serde())))
.collect();
Ok(all_tasks?)
}
fn all_task_uuids(&mut self) -> Fallible<Vec<Uuid>> {
let bucket = self.tasks_bucket();
let kvtxn = self.kvtxn();
let curs = kvtxn.read_cursor(bucket)?;
Ok(kvtxn
.read_cursor(bucket)?
.iter()
.map(|(k, _)| k.into())
.collect())
}
fn add_operation(&mut self, op: Operation) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let operations_bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
let next_op = match kvtxn.get(numbers_bucket, NEXT_OPERATION.into()) {
Ok(buf) => buf.inner()?.to_serde(),
Err(Error::NotFound) => 0,
Err(e) => return Err(e.into()),
};
kvtxn.set(
operations_bucket,
next_op.into(),
Msgpack::to_value_buf(op)?,
)?;
kvtxn.set(
numbers_bucket,
NEXT_OPERATION.into(),
Msgpack::to_value_buf(next_op + 1)?,
)?;
Ok(())
}
fn base_version(&mut self) -> Fallible<u64> {
let bucket = self.numbers_bucket();
let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(0),
Err(e) => return Err(e.into()),
}
.inner()?
.to_serde();
Ok(base_version)
}
fn operations(&mut self) -> Fallible<Vec<Operation>> {
let bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
let curs = kvtxn.read_cursor(bucket)?;
let all_ops: Result<Vec<(u64, Operation)>, Error> = kvtxn
.read_cursor(bucket)?
.iter()
.map(|(i, v)| Ok((i.into(), v.inner()?.to_serde())))
.collect();
let mut all_ops = all_ops?;
// sort by key..
all_ops.sort_by(|a, b| a.0.cmp(&b.0));
// and return the values..
Ok(all_ops.iter().map(|(_, v)| v.clone()).collect())
}
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let operations_bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
kvtxn.clear_db(operations_bucket)?;
let mut i = 0u64;
for op in new_operations {
kvtxn.set(operations_bucket, i.into(), Msgpack::to_value_buf(op)?)?;
i += 1;
}
kvtxn.set(
numbers_bucket,
BASE_VERSION.into(),
Msgpack::to_value_buf(version)?,
)?;
kvtxn.set(
numbers_bucket,
NEXT_OPERATION.into(),
Msgpack::to_value_buf(i)?,
)?;
Ok(())
}
fn local_operations_synced(&mut self, version: u64) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let operations_bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
kvtxn.clear_db(operations_bucket)?;
kvtxn.set(
numbers_bucket,
BASE_VERSION.into(),
Msgpack::to_value_buf(version)?,
)?;
kvtxn.set(
numbers_bucket,
NEXT_OPERATION.into(),
Msgpack::to_value_buf(0)?,
)?;
Ok(())
}
fn commit(&mut self) -> Fallible<()> {
if let Some(kvtxn) = self.txn.take() {
kvtxn.commit()?;
} else {
panic!("transaction already committed");
}
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::taskstorage::taskmap_with;
use failure::Fallible;
use tempdir::TempDir;
#[test]
fn test_create() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid.clone())?);
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let task = txn.get_task(&uuid)?;
assert_eq!(task, Some(taskmap_with(vec![])));
}
Ok(())
}
#[test]
fn test_create_exists() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid.clone())?);
txn.commit()?;
}
{
let mut txn = storage.txn()?;
assert!(!txn.create_task(uuid.clone())?);
txn.commit()?;
}
Ok(())
}
#[test]
fn test_get_missing() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
let task = txn.get_task(&uuid)?;
assert_eq!(task, None);
}
Ok(())
}
#[test]
fn test_set_task() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
txn.set_task(
uuid.clone(),
taskmap_with(vec![("k".to_string(), "v".to_string())]),
)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let task = txn.get_task(&uuid)?;
assert_eq!(
task,
Some(taskmap_with(vec![("k".to_string(), "v".to_string())]))
);
}
Ok(())
}
#[test]
fn test_delete_task_missing() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(!txn.delete_task(&uuid)?);
}
Ok(())
}
#[test]
fn test_delete_task_exists() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid.clone())?);
txn.commit()?;
}
{
let mut txn = storage.txn()?;
assert!(txn.delete_task(&uuid)?);
}
Ok(())
}
#[test]
fn test_all_tasks_empty() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
let tasks = txn.all_tasks()?;
assert_eq!(tasks, vec![]);
}
Ok(())
}
#[test]
fn test_all_tasks_and_uuids() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
{
let mut txn = storage.txn()?;
assert!(txn.create_task(uuid1.clone())?);
txn.set_task(
uuid1.clone(),
taskmap_with(vec![("num".to_string(), "1".to_string())]),
)?;
assert!(txn.create_task(uuid2.clone())?);
txn.set_task(
uuid2.clone(),
taskmap_with(vec![("num".to_string(), "2".to_string())]),
)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
let mut tasks = txn.all_tasks()?;
// order is nondeterministic, so sort by uuid
tasks.sort_by(|a, b| a.0.cmp(&b.0));
let mut exp = vec![
(
uuid1.clone(),
taskmap_with(vec![("num".to_string(), "1".to_string())]),
),
(
uuid2.clone(),
taskmap_with(vec![("num".to_string(), "2".to_string())]),
),
];
exp.sort_by(|a, b| a.0.cmp(&b.0));
assert_eq!(tasks, exp);
}
{
let mut txn = storage.txn()?;
let mut uuids = txn.all_task_uuids()?;
uuids.sort();
let mut exp = vec![uuid1.clone(), uuid2.clone()];
exp.sort();
assert_eq!(uuids, exp);
}
Ok(())
}
#[test]
fn test_base_version_default() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
assert_eq!(txn.base_version()?, 0);
}
Ok(())
}
#[test]
fn test_operations() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
// create some operations
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid1 })?;
txn.add_operation(Operation::Create { uuid: uuid2 })?;
txn.commit()?;
}
// read them back
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(
ops,
vec![
Operation::Create { uuid: uuid1 },
Operation::Create { uuid: uuid2 },
]
);
}
// report them sync'd to the server
{
let mut txn = storage.txn()?;
txn.local_operations_synced(1)?;
txn.commit()?;
}
// check that the operations are gone and the base version is incremented
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(ops, vec![]);
assert_eq!(txn.base_version()?, 1);
}
// create some more operations (to test adding operations after clearing)
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Delete { uuid: uuid2 })?;
txn.add_operation(Operation::Delete { uuid: uuid1 })?;
txn.commit()?;
}
// read them back
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(
ops,
vec![
Operation::Delete { uuid: uuid2 },
Operation::Delete { uuid: uuid1 },
]
);
}
Ok(())
}
#[test]
fn test_update_version() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
let uuid3 = Uuid::new_v4();
let uuid4 = Uuid::new_v4();
// create some operations
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid1 })?;
txn.add_operation(Operation::Create { uuid: uuid2 })?;
txn.add_operation(Operation::Create { uuid: uuid3 })?;
txn.add_operation(Operation::Delete { uuid: uuid2 })?;
txn.commit()?;
}
// update version from the server..
{
let mut txn = storage.txn()?;
txn.update_version(
1,
vec![
Operation::Create { uuid: uuid2 },
Operation::Delete { uuid: uuid2 },
],
)?;
txn.commit()?;
}
// check that the operations are updated and the base version is incremented
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(
ops,
vec![
Operation::Create { uuid: uuid2 },
Operation::Delete { uuid: uuid2 },
]
);
assert_eq!(txn.base_version()?, 1);
}
// create some more operations (to test adding operations after updating)
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid4 })?;
txn.add_operation(Operation::Delete { uuid: uuid4 })?;
txn.commit()?;
}
// read them back
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(
ops,
vec![
Operation::Create { uuid: uuid2 },
Operation::Delete { uuid: uuid2 },
Operation::Create { uuid: uuid4 },
Operation::Delete { uuid: uuid4 },
]
);
}
Ok(())
}
}

View File

@@ -1,95 +0,0 @@
use crate::operation::Operation;
use crate::taskstorage::{TaskMap, TaskStorage};
use kv::{Config, Error, Manager, ValueRef};
use uuid::Uuid;
pub struct KVStorage {
// TODO: make the manager global with lazy-static
manager: Manager,
config: Config,
}
impl KVStorage {
pub fn new(directory: &str) -> KVStorage {
let mut config = Config::default(directory);
config.bucket("base_version", None);
config.bucket("operations", None);
config.bucket("tasks", None);
KVStorage {
manager: Manager::new(),
config,
}
}
}
impl TaskStorage for KVStorage {
/// Get an (immutable) task, if it is in the storage
fn get_task(&self, uuid: &Uuid) -> Option<TaskMap> {
match self.tasks.get(uuid) {
None => None,
Some(t) => Some(t.clone()),
}
}
/// Create a task, only if it does not already exist. Returns true if
/// the task was created (did not already exist).
fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> bool {
if let ent @ Entry::Vacant(_) = self.tasks.entry(uuid) {
ent.or_insert(task);
true
} else {
false
}
}
/// Set a task, overwriting any existing task.
fn set_task(&mut self, uuid: Uuid, task: TaskMap) {
self.tasks.insert(uuid, task);
}
/// Delete a task, if it exists. Returns true if the task was deleted (already existed)
fn delete_task(&mut self, uuid: &Uuid) -> bool {
if let Some(_) = self.tasks.remove(uuid) {
true
} else {
false
}
}
fn get_task_uuids<'a>(&'a self) -> Box<dyn Iterator<Item = Uuid> + 'a> {
Box::new(self.tasks.keys().map(|u| u.clone()))
}
/// Add an operation to the list of operations in the storage. Note that this merely *stores*
/// the operation; it is up to the TaskDB to apply it.
fn add_operation(&mut self, op: Operation) {
self.operations.push(op);
}
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&self) -> u64 {
return self.base_version;
}
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations<'a>(&'a self) -> Box<dyn Iterator<Item = &'a Operation> + 'a> {
Box::new(self.operations.iter())
}
/// Apply the next version from the server. This replaces the existing base_version and
/// operations. It's up to the caller (TaskDB) to ensure this is done consistently.
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) {
// ensure that we are applying the versions in order..
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = new_operations;
}
/// Record the outstanding operations as synced to the server in the given version.
fn local_operations_synced(&mut self, version: u64) {
assert_eq!(version, self.base_version + 1);
self.base_version = version;
self.operations = vec![];
}
}

View File

@@ -1,51 +1,91 @@
use crate::Operation;
use failure::Fallible;
use std::collections::HashMap;
use std::fmt;
use uuid::Uuid;
mod inmemory;
mod kv;
pub use inmemory::InMemoryStorage;
/// An in-memory representation of a task as a simple hashmap
pub type TaskMap = HashMap<String, String>;
/// A trait for objects able to act as backing storage for a TaskDB. This API is optimized to be
/// easy to implement, with all of the semantic meaning of the data located in the TaskDB
/// implementation, which is the sole consumer of this trait.
pub trait TaskStorage: fmt::Debug {
#[cfg(test)]
fn taskmap_with(mut properties: Vec<(String, String)>) -> TaskMap {
let mut rv = TaskMap::new();
for (p, v) in properties.drain(..) {
rv.insert(p, v);
}
rv
}
/// A TaskStorage transaction, in which storage operations are performed.
/// Serializable consistency is maintained, and implementations do not optimize
/// for concurrent access so some may simply apply a mutex to limit access to
/// one transaction at a time. Transactions are aborted if they are dropped.
/// It's safe to drop transactions that did not modify any data.
pub trait TaskStorageTxn {
/// Get an (immutable) task, if it is in the storage
fn get_task(&self, uuid: &Uuid) -> Fallible<Option<TaskMap>>;
fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<TaskMap>>;
/// Create a task, only if it does not already exist. Returns true if
/// Create an (empty) task, only if it does not already exist. Returns true if
/// the task was created (did not already exist).
fn create_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<bool>;
fn create_task(&mut self, uuid: Uuid) -> Fallible<bool>;
/// Set a task, overwriting any existing task.
/// Set a task, overwriting any existing task. If the task does not exist, this implicitly
/// creates it (use `get_task` to check first, if necessary).
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> Fallible<()>;
/// Delete a task, if it exists. Returns true if the task was deleted (already existed)
fn delete_task(&mut self, uuid: &Uuid) -> Fallible<bool>;
/// Get the uuids and bodies of all tasks in the storage, in undefined order.
fn all_tasks<'a>(&mut self) -> Fallible<Vec<(Uuid, TaskMap)>>;
/// Get the uuids of all tasks in the storage, in undefined order.
fn get_task_uuids<'a>(&'a self) -> Fallible<Box<dyn Iterator<Item = Uuid> + 'a>>;
fn all_task_uuids<'a>(&mut self) -> Fallible<Vec<Uuid>>;
/// Add an operation to the list of operations in the storage. Note that this merely *stores*
/// the operation; it is up to the TaskDB to apply it.
/// the operation; it is up to the DB to apply it.
fn add_operation(&mut self, op: Operation) -> Fallible<()>;
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&self) -> Fallible<u64>;
fn base_version(&mut self) -> Fallible<u64>;
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations<'a>(&'a self) -> Fallible<Box<dyn Iterator<Item = &Operation> + 'a>>;
fn operations<'a>(&mut self) -> Fallible<Vec<Operation>>;
/// Apply the next version from the server. This replaces the existing base_version and
/// operations. It's up to the caller (TaskDB) to ensure this is done consistently.
/// operations. It's up to the caller (DB) to ensure this is done consistently.
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) -> Fallible<()>;
/// Record the outstanding operations as synced to the server in the given version.
/// Record the outstanding operations as synced to the server in the given version: set
/// the base_version to the given value, and empty the operations list.
fn local_operations_synced(&mut self, version: u64) -> Fallible<()>;
/// Commit any changes made in the transaction. It is an error to call this more than
/// once.
fn commit(&mut self) -> Fallible<()>;
}
/// A trait for objects able to act as backing storage for a DB. This API is optimized to be
/// easy to implement, with all of the semantic meaning of the data located in the DB
/// implementation, which is the sole consumer of this trait.
///
/// Conceptually, task storage contains the following:
///
/// - tasks: a set of tasks indexed by uuid
/// - base_version: the number of the last version sync'd from the server
/// - operations: all operations performed since base_version
///
/// The `operations` are already reflected in `tasks`, so the following invariant holds:
/// > Applying `operations` to the set of tasks at `base_version` gives a set of tasks identical
/// > to `tasks`.
///
/// It is up to the caller (DB) to maintain this invariant.
pub trait TaskStorage {
/// Begin a transaction
fn txn<'a>(&'a mut self) -> Fallible<Box<dyn TaskStorageTxn + 'a>>;
}