@@ -1,7 +1,6 @@
|
||||
use crate::errors::Error;
|
||||
use crate::server::{AddVersionResult, GetVersionResult, Server};
|
||||
use crate::storage::{Operation, Storage, StorageTxn, TaskMap};
|
||||
use failure::{format_err, Fallible};
|
||||
use log::{info, trace, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
@@ -34,7 +33,7 @@ impl TaskDB {
|
||||
/// Apply an operation to the TaskDB. Aside from synchronization operations, this is the only way
|
||||
/// to modify the TaskDB. In cases where an operation does not make sense, this function will do
|
||||
/// nothing and return an error (but leave the TaskDB in a consistent state).
|
||||
pub fn apply(&mut self, op: Operation) -> Fallible<()> {
|
||||
pub fn apply(&mut self, op: Operation) -> anyhow::Result<()> {
|
||||
// TODO: differentiate error types here?
|
||||
let mut txn = self.storage.txn()?;
|
||||
if let err @ Err(_) = TaskDB::apply_op(txn.as_mut(), &op) {
|
||||
@@ -45,7 +44,7 @@ impl TaskDB {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn apply_op(txn: &mut dyn StorageTxn, op: &Operation) -> Fallible<()> {
|
||||
fn apply_op(txn: &mut dyn StorageTxn, op: &Operation) -> anyhow::Result<()> {
|
||||
match op {
|
||||
Operation::Create { uuid } => {
|
||||
// insert if the task does not already exist
|
||||
@@ -81,25 +80,25 @@ impl TaskDB {
|
||||
}
|
||||
|
||||
/// Get all tasks.
|
||||
pub fn all_tasks(&mut self) -> Fallible<Vec<(Uuid, TaskMap)>> {
|
||||
pub fn all_tasks(&mut self) -> anyhow::Result<Vec<(Uuid, TaskMap)>> {
|
||||
let mut txn = self.storage.txn()?;
|
||||
txn.all_tasks()
|
||||
}
|
||||
|
||||
/// Get the UUIDs of all tasks
|
||||
pub fn all_task_uuids(&mut self) -> Fallible<Vec<Uuid>> {
|
||||
pub fn all_task_uuids(&mut self) -> anyhow::Result<Vec<Uuid>> {
|
||||
let mut txn = self.storage.txn()?;
|
||||
txn.all_task_uuids()
|
||||
}
|
||||
|
||||
/// Get the working set
|
||||
pub fn working_set(&mut self) -> Fallible<Vec<Option<Uuid>>> {
|
||||
pub fn working_set(&mut self) -> anyhow::Result<Vec<Option<Uuid>>> {
|
||||
let mut txn = self.storage.txn()?;
|
||||
txn.get_working_set()
|
||||
}
|
||||
|
||||
/// Get a single task, by uuid.
|
||||
pub fn get_task(&mut self, uuid: Uuid) -> Fallible<Option<TaskMap>> {
|
||||
pub fn get_task(&mut self, uuid: Uuid) -> anyhow::Result<Option<TaskMap>> {
|
||||
let mut txn = self.storage.txn()?;
|
||||
txn.get_task(uuid)
|
||||
}
|
||||
@@ -108,7 +107,7 @@ impl TaskDB {
|
||||
/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that
|
||||
/// are not already in the working set but should be. The rebuild occurs in a single
|
||||
/// trasnsaction against the storage backend.
|
||||
pub fn rebuild_working_set<F>(&mut self, in_working_set: F, renumber: bool) -> Fallible<()>
|
||||
pub fn rebuild_working_set<F>(&mut self, in_working_set: F, renumber: bool) -> anyhow::Result<()>
|
||||
where
|
||||
F: Fn(&TaskMap) -> bool,
|
||||
{
|
||||
@@ -169,7 +168,7 @@ impl TaskDB {
|
||||
|
||||
/// Add the given uuid to the working set and return its index; if it is already in the working
|
||||
/// set, its index is returned. This does *not* renumber any existing tasks.
|
||||
pub fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible<usize> {
|
||||
pub fn add_to_working_set(&mut self, uuid: Uuid) -> anyhow::Result<usize> {
|
||||
let mut txn = self.storage.txn()?;
|
||||
// search for an existing entry for this task..
|
||||
for (i, elt) in txn.get_working_set()?.iter().enumerate() {
|
||||
@@ -185,7 +184,7 @@ impl TaskDB {
|
||||
}
|
||||
|
||||
/// Sync to the given server, pulling remote changes and pushing local changes.
|
||||
pub fn sync(&mut self, server: &mut Box<dyn Server>) -> Fallible<()> {
|
||||
pub fn sync(&mut self, server: &mut Box<dyn Server>) -> anyhow::Result<()> {
|
||||
let mut txn = self.storage.txn()?;
|
||||
|
||||
// retry synchronizing until the server accepts our version (this allows for races between
|
||||
@@ -247,9 +246,9 @@ impl TaskDB {
|
||||
);
|
||||
if let Some(requested) = requested_parent_version_id {
|
||||
if parent_version_id == requested {
|
||||
return Err(format_err!(
|
||||
anyhow::bail!(
|
||||
"Server's task history has diverged from this replica"
|
||||
));
|
||||
);
|
||||
}
|
||||
}
|
||||
requested_parent_version_id = Some(parent_version_id);
|
||||
@@ -261,7 +260,7 @@ impl TaskDB {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> Fallible<()> {
|
||||
fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Result<()> {
|
||||
// The situation here is that the server has already applied all server operations, and we
|
||||
// have already applied all local operations, so states have diverged by several
|
||||
// operations. We need to figure out what operations to apply locally and on the server in
|
||||
@@ -502,16 +501,16 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rebuild_working_set_renumber() -> Fallible<()> {
|
||||
fn rebuild_working_set_renumber() -> anyhow::Result<()> {
|
||||
rebuild_working_set(true)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rebuild_working_set_no_renumber() -> Fallible<()> {
|
||||
fn rebuild_working_set_no_renumber() -> anyhow::Result<()> {
|
||||
rebuild_working_set(false)
|
||||
}
|
||||
|
||||
fn rebuild_working_set(renumber: bool) -> Fallible<()> {
|
||||
fn rebuild_working_set(renumber: bool) -> anyhow::Result<()> {
|
||||
let mut db = TaskDB::new_inmemory();
|
||||
let mut uuids = vec![];
|
||||
uuids.push(Uuid::new_v4());
|
||||
|
||||
Reference in New Issue
Block a user