diff --git a/TODO.txt b/TODO.txt index 23ad27b6f..0f210f2ce 100644 --- a/TODO.txt +++ b/TODO.txt @@ -1,12 +1,13 @@ -* assign types to properties in Replica +* move rebuild_working_set logic to replica.rs, also include states != + completed / deleted +* [WIP] assign types to properties in Replica - db / operation model is just k/v, but formatted names can be used for structure: - dependencies: `dependency. = ""` - annotations: `annotation. = "annotation"` - tags: `tags. = ""` -* add HTTP API -* add pending-task indexing to Replica * abstract server into trait +* add HTTP API * fix TODO items in replica.rs * implement snapshot requests * implement backups diff --git a/src/bin/task.rs b/src/bin/task.rs index 13a3a749b..8149374d5 100644 --- a/src/bin/task.rs +++ b/src/bin/task.rs @@ -17,6 +17,8 @@ fn main() { ), ) .subcommand(SubCommand::with_name("list").about("lists tasks")) + .subcommand(SubCommand::with_name("pending").about("lists pending tasks")) + .subcommand(SubCommand::with_name("gc").about("run garbage collection")) .get_matches(); let mut replica = Replica::new( @@ -38,10 +40,21 @@ fn main() { .unwrap(); } ("list", _) => { - for task in replica.all_tasks().unwrap() { - println!("{:?}", task); + for (uuid, task) in replica.all_tasks().unwrap() { + println!("{} - {:?}", uuid, task); } } + ("pending", _) => { + let working_set = replica.working_set().unwrap(); + for i in 1..working_set.len() { + if let Some((ref uuid, ref task)) = working_set[i] { + println!("{}: {} - {:?}", i, uuid, task); + } + } + } + ("gc", _) => { + replica.gc().unwrap(); + } ("", None) => { unreachable!(); } diff --git a/src/replica.rs b/src/replica.rs index 59a9dd89d..abfe2ab7e 100644 --- a/src/replica.rs +++ b/src/replica.rs @@ -48,6 +48,23 @@ impl Replica { self.taskdb.all_task_uuids() } + /// Get the "working set" for this replica -- the set of pending tasks, as indexed by small + /// integers + pub fn working_set(&mut self) -> Fallible>> { + let working_set = self.taskdb.working_set()?; + let mut res = Vec::with_capacity(working_set.len()); + for i in 0..working_set.len() { + res.push(match working_set[i] { + Some(u) => match self.taskdb.get_task(&u)? { + Some(task) => Some((u, (&task).into())), + None => None, + }, + None => None, + }) + } + Ok(res) + } + /// Get an existing task by its UUID pub fn get_task(&mut self, uuid: &Uuid) -> Fallible> { Ok(self.taskdb.get_task(&uuid)?.map(|t| (&t).into())) @@ -85,6 +102,13 @@ impl Replica { .get_task(&uuid)? .map(move |_| TaskMut::new(self, uuid.clone()))) } + + /// Perform "garbage collection" on this replica. In particular, this renumbers the working + /// set. + pub fn gc(&mut self) -> Fallible<()> { + self.taskdb.rebuild_working_set()?; + Ok(()) + } } impl From<&TaskMap> for Task { diff --git a/src/taskdb.rs b/src/taskdb.rs index 3e9fbe0c1..2613cc39e 100644 --- a/src/taskdb.rs +++ b/src/taskdb.rs @@ -4,6 +4,7 @@ use crate::server::{Server, VersionAdd}; use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn}; use failure::Fallible; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::str; use uuid::Uuid; @@ -64,6 +65,7 @@ impl DB { // update if this task exists, otherwise ignore if let Some(task) = txn.get_task(uuid)? { let mut task = task.clone(); + // TODO: update working_set if this is changing state to or from pending match value { Some(ref val) => task.insert(property.to_string(), val.clone()), None => task.remove(property), @@ -90,12 +92,61 @@ impl DB { txn.all_task_uuids() } + /// Get the working set + pub fn working_set<'a>(&'a mut self) -> Fallible>> { + let mut txn = self.storage.txn()?; + txn.get_working_set() + } + /// Get a single task, by uuid. pub fn get_task(&mut self, uuid: &Uuid) -> Fallible> { let mut txn = self.storage.txn()?; txn.get_task(uuid) } + /// Rebuild the working set. This renumbers the pending tasks to eliminate gaps, and also + /// finds any tasks whose statuses changed without being noticed. + pub fn rebuild_working_set(&mut self) -> Fallible<()> { + // TODO: this logic belongs in Replica + // TODO: it's every status but Completed and Deleted, I think? + let mut txn = self.storage.txn()?; + + let mut new_ws = vec![]; + let mut seen = HashSet::new(); + let pending = String::from("pending"); + + // The goal here is for existing working-set items to be "compressed' down to index + // 1, so we begin by scanning the current working set and inserting any still-pending + // tasks into the new list + for elt in txn.get_working_set()? { + if let Some(uuid) = elt { + if let Some(task) = txn.get_task(&uuid)? { + if task.get("status") == Some(&pending) { + new_ws.push(uuid.clone()); + seen.insert(uuid); + } + } + } + } + + // Now go hunting for tasks that are pending and are not already in this list + for (uuid, task) in txn.all_tasks()? { + if !seen.contains(&uuid) { + if task.get("status") == Some(&pending) { + new_ws.push(uuid.clone()); + } + } + } + + txn.clear_working_set()?; + for uuid in new_ws.drain(0..new_ws.len()) { + txn.add_to_working_set(uuid)?; + } + + txn.commit()?; + Ok(()) + } + /// Sync to the given server, pulling remote changes and pushing local changes. pub fn sync(&mut self, username: &str, server: &mut Server) -> Fallible<()> { let mut txn = self.storage.txn()?; @@ -366,4 +417,67 @@ mod tests { assert_eq!(db.sorted_tasks(), vec![]); assert_eq!(db.operations(), vec![]); } + + #[test] + fn rebuild_working_set() -> Fallible<()> { + let mut db = DB::new_inmemory(); + let uuids = vec![ + Uuid::new_v4(), // 0: pending, not already in working set + Uuid::new_v4(), // 1: pending, already in working set + Uuid::new_v4(), // 2: not pending, not already in working set + Uuid::new_v4(), // 3: not pending, already in working set + Uuid::new_v4(), // 4: pending, already in working set + ]; + + // add everything to the DB + for uuid in &uuids { + db.apply(Operation::Create { uuid: uuid.clone() })?; + } + for i in &[0usize, 1, 4] { + db.apply(Operation::Update { + uuid: uuids[*i].clone(), + property: String::from("status"), + value: Some("pending".into()), + timestamp: Utc::now(), + })?; + } + + // set the existing working_set as we want it + { + let mut txn = db.storage.txn()?; + txn.clear_working_set()?; + + for i in &[1usize, 3, 4] { + txn.add_to_working_set(uuids[*i])?; + } + + txn.commit()?; + } + + assert_eq!( + db.working_set()?, + vec![ + None, + Some(uuids[1].clone()), + Some(uuids[3].clone()), + Some(uuids[4].clone()) + ] + ); + + db.rebuild_working_set()?; + + // uuids[1] and uuids[4] are already in the working set, so are compressed + // to the top, and then uuids[0] is added. + assert_eq!( + db.working_set()?, + vec![ + None, + Some(uuids[1].clone()), + Some(uuids[4].clone()), + Some(uuids[0].clone()) + ] + ); + + Ok(()) + } }