simplify the taskstorage methods

This commit is contained in:
Dustin J. Mitchell
2020-01-12 13:48:16 -05:00
parent 2f973d3e62
commit 4172c7012c
4 changed files with 89 additions and 167 deletions

View File

@@ -130,7 +130,8 @@ impl DB {
if let VersionAdd::Ok =
server.add_version(username, new_version.version, new_version_str.into())
{
txn.local_operations_synced(new_version.version)?;
txn.set_base_version(new_version.version)?;
txn.set_operations(vec![])?;
break;
}
}
@@ -187,7 +188,8 @@ impl DB {
}
local_operations = new_local_ops;
}
txn.update_version(version.version, local_operations)?;
txn.set_base_version(version.version)?;
txn.set_operations(local_operations)?;
Ok(())
}

View File

@@ -81,31 +81,26 @@ impl<'t> TaskStorageTxn for Txn<'t> {
Ok(self.data_ref().tasks.keys().map(|u| u.clone()).collect())
}
fn add_operation(&mut self, op: Operation) -> Fallible<()> {
self.mut_data_ref().operations.push(op);
Ok(())
}
fn base_version(&mut self) -> Fallible<u64> {
Ok(self.data_ref().base_version)
}
fn set_base_version(&mut self, version: u64) -> Fallible<()> {
self.mut_data_ref().base_version = version;
Ok(())
}
fn operations(&mut self) -> Fallible<Vec<Operation>> {
Ok(self.data_ref().operations.clone())
}
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) -> Fallible<()> {
// ensure that we are applying the versions in order..
assert_eq!(version, self.data_ref().base_version + 1);
self.mut_data_ref().base_version = version;
self.mut_data_ref().operations = new_operations;
fn add_operation(&mut self, op: Operation) -> Fallible<()> {
self.mut_data_ref().operations.push(op);
Ok(())
}
fn local_operations_synced(&mut self, version: u64) -> Fallible<()> {
assert_eq!(version, self.data_ref().base_version + 1);
self.mut_data_ref().base_version = version;
self.mut_data_ref().operations = vec![];
fn set_operations(&mut self, ops: Vec<Operation>) -> Fallible<()> {
self.mut_data_ref().operations = ops;
Ok(())
}

View File

@@ -184,6 +184,46 @@ impl<'t> TaskStorageTxn for Txn<'t> {
.collect())
}
fn base_version(&mut self) -> Fallible<u64> {
let bucket = self.numbers_bucket();
let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(0),
Err(e) => return Err(e.into()),
}
.inner()?
.to_serde();
Ok(base_version)
}
fn set_base_version(&mut self, version: u64) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let kvtxn = self.kvtxn();
kvtxn.set(
numbers_bucket,
BASE_VERSION.into(),
Msgpack::to_value_buf(version)?,
)?;
Ok(())
}
fn operations(&mut self) -> Fallible<Vec<Operation>> {
let bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
let curs = kvtxn.read_cursor(bucket)?;
let all_ops: Result<Vec<(u64, Operation)>, Error> = kvtxn
.read_cursor(bucket)?
.iter()
.map(|(i, v)| Ok((i.into(), v.inner()?.to_serde())))
.collect();
let mut all_ops = all_ops?;
// sort by key..
all_ops.sort_by(|a, b| a.0.cmp(&b.0));
// and return the values..
Ok(all_ops.iter().map(|(_, v)| v.clone()).collect())
}
fn add_operation(&mut self, op: Operation) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let operations_bucket = self.operations_bucket();
@@ -208,35 +248,7 @@ impl<'t> TaskStorageTxn for Txn<'t> {
Ok(())
}
fn base_version(&mut self) -> Fallible<u64> {
let bucket = self.numbers_bucket();
let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) {
Ok(buf) => buf,
Err(Error::NotFound) => return Ok(0),
Err(e) => return Err(e.into()),
}
.inner()?
.to_serde();
Ok(base_version)
}
fn operations(&mut self) -> Fallible<Vec<Operation>> {
let bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
let curs = kvtxn.read_cursor(bucket)?;
let all_ops: Result<Vec<(u64, Operation)>, Error> = kvtxn
.read_cursor(bucket)?
.iter()
.map(|(i, v)| Ok((i.into(), v.inner()?.to_serde())))
.collect();
let mut all_ops = all_ops?;
// sort by key..
all_ops.sort_by(|a, b| a.0.cmp(&b.0));
// and return the values..
Ok(all_ops.iter().map(|(_, v)| v.clone()).collect())
}
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) -> Fallible<()> {
fn set_operations(&mut self, ops: Vec<Operation>) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let operations_bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
@@ -244,17 +256,11 @@ impl<'t> TaskStorageTxn for Txn<'t> {
kvtxn.clear_db(operations_bucket)?;
let mut i = 0u64;
for op in new_operations {
for op in ops {
kvtxn.set(operations_bucket, i.into(), Msgpack::to_value_buf(op)?)?;
i += 1;
}
kvtxn.set(
numbers_bucket,
BASE_VERSION.into(),
Msgpack::to_value_buf(version)?,
)?;
kvtxn.set(
numbers_bucket,
NEXT_OPERATION.into(),
@@ -264,28 +270,6 @@ impl<'t> TaskStorageTxn for Txn<'t> {
Ok(())
}
fn local_operations_synced(&mut self, version: u64) -> Fallible<()> {
let numbers_bucket = self.numbers_bucket();
let operations_bucket = self.operations_bucket();
let kvtxn = self.kvtxn();
kvtxn.clear_db(operations_bucket)?;
kvtxn.set(
numbers_bucket,
BASE_VERSION.into(),
Msgpack::to_value_buf(version)?,
)?;
kvtxn.set(
numbers_bucket,
NEXT_OPERATION.into(),
Msgpack::to_value_buf(0)?,
)?;
Ok(())
}
fn commit(&mut self) -> Fallible<()> {
if let Some(kvtxn) = self.txn.take() {
kvtxn.commit()?;
@@ -482,12 +466,29 @@ mod test {
Ok(())
}
#[test]
fn test_base_version_setting() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
{
let mut txn = storage.txn()?;
txn.set_base_version(3)?;
txn.commit()?;
}
{
let mut txn = storage.txn()?;
assert_eq!(txn.base_version()?, 3);
}
Ok(())
}
#[test]
fn test_operations() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
let uuid3 = Uuid::new_v4();
// create some operations
{
@@ -510,26 +511,21 @@ mod test {
);
}
// report them sync'd to the server
// set them to a different bunch
{
let mut txn = storage.txn()?;
txn.local_operations_synced(1)?;
txn.set_operations(vec![
Operation::Delete { uuid: uuid2 },
Operation::Delete { uuid: uuid1 },
])?;
txn.commit()?;
}
// check that the operations are gone and the base version is incremented
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(ops, vec![]);
assert_eq!(txn.base_version()?, 1);
}
// create some more operations (to test adding operations after clearing)
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Delete { uuid: uuid2 })?;
txn.add_operation(Operation::Delete { uuid: uuid1 })?;
txn.add_operation(Operation::Create { uuid: uuid3 })?;
txn.add_operation(Operation::Delete { uuid: uuid3 })?;
txn.commit()?;
}
@@ -542,77 +538,8 @@ mod test {
vec![
Operation::Delete { uuid: uuid2 },
Operation::Delete { uuid: uuid1 },
]
);
}
Ok(())
}
#[test]
fn test_update_version() -> Fallible<()> {
let tmp_dir = TempDir::new("test")?;
let mut storage = KVStorage::new(&tmp_dir.path())?;
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
let uuid3 = Uuid::new_v4();
let uuid4 = Uuid::new_v4();
// create some operations
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid1 })?;
txn.add_operation(Operation::Create { uuid: uuid2 })?;
txn.add_operation(Operation::Create { uuid: uuid3 })?;
txn.add_operation(Operation::Delete { uuid: uuid2 })?;
txn.commit()?;
}
// update version from the server..
{
let mut txn = storage.txn()?;
txn.update_version(
1,
vec![
Operation::Create { uuid: uuid2 },
Operation::Delete { uuid: uuid2 },
],
)?;
txn.commit()?;
}
// check that the operations are updated and the base version is incremented
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(
ops,
vec![
Operation::Create { uuid: uuid2 },
Operation::Delete { uuid: uuid2 },
]
);
assert_eq!(txn.base_version()?, 1);
}
// create some more operations (to test adding operations after updating)
{
let mut txn = storage.txn()?;
txn.add_operation(Operation::Create { uuid: uuid4 })?;
txn.add_operation(Operation::Delete { uuid: uuid4 })?;
txn.commit()?;
}
// read them back
{
let mut txn = storage.txn()?;
let ops = txn.operations()?;
assert_eq!(
ops,
vec![
Operation::Create { uuid: uuid2 },
Operation::Delete { uuid: uuid2 },
Operation::Create { uuid: uuid4 },
Operation::Delete { uuid: uuid4 },
Operation::Create { uuid: uuid3 },
Operation::Delete { uuid: uuid3 },
]
);
}

View File

@@ -46,24 +46,22 @@ pub trait TaskStorageTxn {
/// Get the uuids of all tasks in the storage, in undefined order.
fn all_task_uuids<'a>(&mut self) -> Fallible<Vec<Uuid>>;
/// Add an operation to the list of operations in the storage. Note that this merely *stores*
/// the operation; it is up to the DB to apply it.
fn add_operation(&mut self, op: Operation) -> Fallible<()>;
/// Get the current base_version for this storage -- the last version synced from the server.
fn base_version(&mut self) -> Fallible<u64>;
/// Set the current base_version for this storage.
fn set_base_version(&mut self, version: u64) -> Fallible<()>;
/// Get the current set of outstanding operations (operations that have not been sync'd to the
/// server yet)
fn operations<'a>(&mut self) -> Fallible<Vec<Operation>>;
/// Apply the next version from the server. This replaces the existing base_version and
/// operations. It's up to the caller (DB) to ensure this is done consistently.
fn update_version(&mut self, version: u64, new_operations: Vec<Operation>) -> Fallible<()>;
/// Add an operation to the end of the list of operations in the storage. Note that this
/// merely *stores* the operation; it is up to the DB to apply it.
fn add_operation(&mut self, op: Operation) -> Fallible<()>;
/// Record the outstanding operations as synced to the server in the given version: set
/// the base_version to the given value, and empty the operations list.
fn local_operations_synced(&mut self, version: u64) -> Fallible<()>;
/// Replace the current list of operations with a new list.
fn set_operations(&mut self, ops: Vec<Operation>) -> Fallible<()>;
/// Commit any changes made in the transaction. It is an error to call this more than
/// once.