diff --git a/src/taskdb.rs b/src/taskdb.rs index 20a8b74c3..3e9fbe0c1 100644 --- a/src/taskdb.rs +++ b/src/taskdb.rs @@ -130,7 +130,8 @@ impl DB { if let VersionAdd::Ok = server.add_version(username, new_version.version, new_version_str.into()) { - txn.local_operations_synced(new_version.version)?; + txn.set_base_version(new_version.version)?; + txn.set_operations(vec![])?; break; } } @@ -187,7 +188,8 @@ impl DB { } local_operations = new_local_ops; } - txn.update_version(version.version, local_operations)?; + txn.set_base_version(version.version)?; + txn.set_operations(local_operations)?; Ok(()) } diff --git a/src/taskstorage/inmemory.rs b/src/taskstorage/inmemory.rs index e85dcd054..ef654b0f3 100644 --- a/src/taskstorage/inmemory.rs +++ b/src/taskstorage/inmemory.rs @@ -81,31 +81,26 @@ impl<'t> TaskStorageTxn for Txn<'t> { Ok(self.data_ref().tasks.keys().map(|u| u.clone()).collect()) } - fn add_operation(&mut self, op: Operation) -> Fallible<()> { - self.mut_data_ref().operations.push(op); - Ok(()) - } - fn base_version(&mut self) -> Fallible { Ok(self.data_ref().base_version) } + fn set_base_version(&mut self, version: u64) -> Fallible<()> { + self.mut_data_ref().base_version = version; + Ok(()) + } + fn operations(&mut self) -> Fallible> { Ok(self.data_ref().operations.clone()) } - fn update_version(&mut self, version: u64, new_operations: Vec) -> Fallible<()> { - // ensure that we are applying the versions in order.. - assert_eq!(version, self.data_ref().base_version + 1); - self.mut_data_ref().base_version = version; - self.mut_data_ref().operations = new_operations; + fn add_operation(&mut self, op: Operation) -> Fallible<()> { + self.mut_data_ref().operations.push(op); Ok(()) } - fn local_operations_synced(&mut self, version: u64) -> Fallible<()> { - assert_eq!(version, self.data_ref().base_version + 1); - self.mut_data_ref().base_version = version; - self.mut_data_ref().operations = vec![]; + fn set_operations(&mut self, ops: Vec) -> Fallible<()> { + self.mut_data_ref().operations = ops; Ok(()) } diff --git a/src/taskstorage/kv.rs b/src/taskstorage/kv.rs index 4e56f697d..5506b7bd5 100644 --- a/src/taskstorage/kv.rs +++ b/src/taskstorage/kv.rs @@ -184,6 +184,46 @@ impl<'t> TaskStorageTxn for Txn<'t> { .collect()) } + fn base_version(&mut self) -> Fallible { + let bucket = self.numbers_bucket(); + let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) { + Ok(buf) => buf, + Err(Error::NotFound) => return Ok(0), + Err(e) => return Err(e.into()), + } + .inner()? + .to_serde(); + Ok(base_version) + } + + fn set_base_version(&mut self, version: u64) -> Fallible<()> { + let numbers_bucket = self.numbers_bucket(); + let kvtxn = self.kvtxn(); + + kvtxn.set( + numbers_bucket, + BASE_VERSION.into(), + Msgpack::to_value_buf(version)?, + )?; + Ok(()) + } + + fn operations(&mut self) -> Fallible> { + let bucket = self.operations_bucket(); + let kvtxn = self.kvtxn(); + let curs = kvtxn.read_cursor(bucket)?; + let all_ops: Result, Error> = kvtxn + .read_cursor(bucket)? + .iter() + .map(|(i, v)| Ok((i.into(), v.inner()?.to_serde()))) + .collect(); + let mut all_ops = all_ops?; + // sort by key.. + all_ops.sort_by(|a, b| a.0.cmp(&b.0)); + // and return the values.. + Ok(all_ops.iter().map(|(_, v)| v.clone()).collect()) + } + fn add_operation(&mut self, op: Operation) -> Fallible<()> { let numbers_bucket = self.numbers_bucket(); let operations_bucket = self.operations_bucket(); @@ -208,35 +248,7 @@ impl<'t> TaskStorageTxn for Txn<'t> { Ok(()) } - fn base_version(&mut self) -> Fallible { - let bucket = self.numbers_bucket(); - let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) { - Ok(buf) => buf, - Err(Error::NotFound) => return Ok(0), - Err(e) => return Err(e.into()), - } - .inner()? - .to_serde(); - Ok(base_version) - } - - fn operations(&mut self) -> Fallible> { - let bucket = self.operations_bucket(); - let kvtxn = self.kvtxn(); - let curs = kvtxn.read_cursor(bucket)?; - let all_ops: Result, Error> = kvtxn - .read_cursor(bucket)? - .iter() - .map(|(i, v)| Ok((i.into(), v.inner()?.to_serde()))) - .collect(); - let mut all_ops = all_ops?; - // sort by key.. - all_ops.sort_by(|a, b| a.0.cmp(&b.0)); - // and return the values.. - Ok(all_ops.iter().map(|(_, v)| v.clone()).collect()) - } - - fn update_version(&mut self, version: u64, new_operations: Vec) -> Fallible<()> { + fn set_operations(&mut self, ops: Vec) -> Fallible<()> { let numbers_bucket = self.numbers_bucket(); let operations_bucket = self.operations_bucket(); let kvtxn = self.kvtxn(); @@ -244,17 +256,11 @@ impl<'t> TaskStorageTxn for Txn<'t> { kvtxn.clear_db(operations_bucket)?; let mut i = 0u64; - for op in new_operations { + for op in ops { kvtxn.set(operations_bucket, i.into(), Msgpack::to_value_buf(op)?)?; i += 1; } - kvtxn.set( - numbers_bucket, - BASE_VERSION.into(), - Msgpack::to_value_buf(version)?, - )?; - kvtxn.set( numbers_bucket, NEXT_OPERATION.into(), @@ -264,28 +270,6 @@ impl<'t> TaskStorageTxn for Txn<'t> { Ok(()) } - fn local_operations_synced(&mut self, version: u64) -> Fallible<()> { - let numbers_bucket = self.numbers_bucket(); - let operations_bucket = self.operations_bucket(); - let kvtxn = self.kvtxn(); - - kvtxn.clear_db(operations_bucket)?; - - kvtxn.set( - numbers_bucket, - BASE_VERSION.into(), - Msgpack::to_value_buf(version)?, - )?; - - kvtxn.set( - numbers_bucket, - NEXT_OPERATION.into(), - Msgpack::to_value_buf(0)?, - )?; - - Ok(()) - } - fn commit(&mut self) -> Fallible<()> { if let Some(kvtxn) = self.txn.take() { kvtxn.commit()?; @@ -482,12 +466,29 @@ mod test { Ok(()) } + #[test] + fn test_base_version_setting() -> Fallible<()> { + let tmp_dir = TempDir::new("test")?; + let mut storage = KVStorage::new(&tmp_dir.path())?; + { + let mut txn = storage.txn()?; + txn.set_base_version(3)?; + txn.commit()?; + } + { + let mut txn = storage.txn()?; + assert_eq!(txn.base_version()?, 3); + } + Ok(()) + } + #[test] fn test_operations() -> Fallible<()> { let tmp_dir = TempDir::new("test")?; let mut storage = KVStorage::new(&tmp_dir.path())?; let uuid1 = Uuid::new_v4(); let uuid2 = Uuid::new_v4(); + let uuid3 = Uuid::new_v4(); // create some operations { @@ -510,26 +511,21 @@ mod test { ); } - // report them sync'd to the server + // set them to a different bunch { let mut txn = storage.txn()?; - txn.local_operations_synced(1)?; + txn.set_operations(vec![ + Operation::Delete { uuid: uuid2 }, + Operation::Delete { uuid: uuid1 }, + ])?; txn.commit()?; } - // check that the operations are gone and the base version is incremented - { - let mut txn = storage.txn()?; - let ops = txn.operations()?; - assert_eq!(ops, vec![]); - assert_eq!(txn.base_version()?, 1); - } - // create some more operations (to test adding operations after clearing) { let mut txn = storage.txn()?; - txn.add_operation(Operation::Delete { uuid: uuid2 })?; - txn.add_operation(Operation::Delete { uuid: uuid1 })?; + txn.add_operation(Operation::Create { uuid: uuid3 })?; + txn.add_operation(Operation::Delete { uuid: uuid3 })?; txn.commit()?; } @@ -542,77 +538,8 @@ mod test { vec![ Operation::Delete { uuid: uuid2 }, Operation::Delete { uuid: uuid1 }, - ] - ); - } - Ok(()) - } - - #[test] - fn test_update_version() -> Fallible<()> { - let tmp_dir = TempDir::new("test")?; - let mut storage = KVStorage::new(&tmp_dir.path())?; - let uuid1 = Uuid::new_v4(); - let uuid2 = Uuid::new_v4(); - let uuid3 = Uuid::new_v4(); - let uuid4 = Uuid::new_v4(); - - // create some operations - { - let mut txn = storage.txn()?; - txn.add_operation(Operation::Create { uuid: uuid1 })?; - txn.add_operation(Operation::Create { uuid: uuid2 })?; - txn.add_operation(Operation::Create { uuid: uuid3 })?; - txn.add_operation(Operation::Delete { uuid: uuid2 })?; - txn.commit()?; - } - - // update version from the server.. - { - let mut txn = storage.txn()?; - txn.update_version( - 1, - vec![ - Operation::Create { uuid: uuid2 }, - Operation::Delete { uuid: uuid2 }, - ], - )?; - txn.commit()?; - } - - // check that the operations are updated and the base version is incremented - { - let mut txn = storage.txn()?; - let ops = txn.operations()?; - assert_eq!( - ops, - vec![ - Operation::Create { uuid: uuid2 }, - Operation::Delete { uuid: uuid2 }, - ] - ); - assert_eq!(txn.base_version()?, 1); - } - - // create some more operations (to test adding operations after updating) - { - let mut txn = storage.txn()?; - txn.add_operation(Operation::Create { uuid: uuid4 })?; - txn.add_operation(Operation::Delete { uuid: uuid4 })?; - txn.commit()?; - } - - // read them back - { - let mut txn = storage.txn()?; - let ops = txn.operations()?; - assert_eq!( - ops, - vec![ - Operation::Create { uuid: uuid2 }, - Operation::Delete { uuid: uuid2 }, - Operation::Create { uuid: uuid4 }, - Operation::Delete { uuid: uuid4 }, + Operation::Create { uuid: uuid3 }, + Operation::Delete { uuid: uuid3 }, ] ); } diff --git a/src/taskstorage/mod.rs b/src/taskstorage/mod.rs index d7a175596..60541cfa8 100644 --- a/src/taskstorage/mod.rs +++ b/src/taskstorage/mod.rs @@ -46,24 +46,22 @@ pub trait TaskStorageTxn { /// Get the uuids of all tasks in the storage, in undefined order. fn all_task_uuids<'a>(&mut self) -> Fallible>; - /// Add an operation to the list of operations in the storage. Note that this merely *stores* - /// the operation; it is up to the DB to apply it. - fn add_operation(&mut self, op: Operation) -> Fallible<()>; - /// Get the current base_version for this storage -- the last version synced from the server. fn base_version(&mut self) -> Fallible; + /// Set the current base_version for this storage. + fn set_base_version(&mut self, version: u64) -> Fallible<()>; + /// Get the current set of outstanding operations (operations that have not been sync'd to the /// server yet) fn operations<'a>(&mut self) -> Fallible>; - /// Apply the next version from the server. This replaces the existing base_version and - /// operations. It's up to the caller (DB) to ensure this is done consistently. - fn update_version(&mut self, version: u64, new_operations: Vec) -> Fallible<()>; + /// Add an operation to the end of the list of operations in the storage. Note that this + /// merely *stores* the operation; it is up to the DB to apply it. + fn add_operation(&mut self, op: Operation) -> Fallible<()>; - /// Record the outstanding operations as synced to the server in the given version: set - /// the base_version to the given value, and empty the operations list. - fn local_operations_synced(&mut self, version: u64) -> Fallible<()>; + /// Replace the current list of operations with a new list. + fn set_operations(&mut self, ops: Vec) -> Fallible<()>; /// Commit any changes made in the transaction. It is an error to call this more than /// once.