diff --git a/src/replica.rs b/src/replica.rs index 4d4a0e96e..de23f89b0 100644 --- a/src/replica.rs +++ b/src/replica.rs @@ -35,6 +35,21 @@ impl Replica { }) } + /// Return true if this status string is such that the task should be included in + /// the working set. + fn is_working_set_status(status: Option<&String>) -> bool { + if let Some(status) = status { + status == "pending" + } else { + false + } + } + + /// Add the given uuid to the working set, returning its index. + fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible { + self.taskdb.add_to_working_set(uuid) + } + /// Get all tasks represented as a map keyed by UUID pub fn all_tasks<'a>(&'a mut self) -> Fallible> { Ok(self @@ -72,6 +87,17 @@ impl Replica { Ok(self.taskdb.get_task(&uuid)?.map(|t| (&t).into())) } + /// Get an existing task by its working set index + pub fn get_working_set_task(&mut self, i: u64) -> Fallible> { + let working_set = self.taskdb.working_set()?; + if (i as usize) < working_set.len() { + if let Some(uuid) = working_set[i as usize] { + return Ok(self.taskdb.get_task(&uuid)?.map(|t| (&t).into())); + } + } + return Ok(None); + } + /// Create a new task. The task must not already exist. pub fn new_task( &mut self, @@ -115,9 +141,10 @@ impl Replica { } /// Perform "garbage collection" on this replica. In particular, this renumbers the working - /// set. + /// set to contain only pending tasks. pub fn gc(&mut self) -> Fallible<()> { - self.taskdb.rebuild_working_set()?; + self.taskdb + .rebuild_working_set(|t| Replica::is_working_set_status(t.get("status")))?; Ok(()) } } @@ -177,9 +204,14 @@ impl<'a> TaskMut<'a> { ) } - /// Set the task's status + /// Set the task's status. This also adds the task to the working set if the + /// new status puts it in that set. pub fn status(&mut self, status: Status) -> Fallible<()> { - self.set_string("status", Some(String::from(status.as_ref()))) + let status = String::from(status.as_ref()); + if Replica::is_working_set_status(Some(&status)) { + self.replica.add_to_working_set(&self.uuid)?; + } + self.set_string("status", Some(status)) } /// Set the task's description @@ -326,6 +358,22 @@ mod tests { assert_eq!(t.project, Some("work".into())); } + #[test] + fn set_pending_adds_to_working_set() { + let mut rep = Replica::new(DB::new_inmemory().into()); + let uuid = Uuid::new_v4(); + + rep.new_task(uuid.clone(), Status::Pending, "to-be-pending".into()) + .unwrap(); + + let mut tm = rep.get_task_mut(&uuid).unwrap().unwrap(); + tm.status(Status::Pending).unwrap(); + + let t = rep.get_working_set_task(1).unwrap().unwrap(); + assert_eq!(t.status, Status::Pending); + assert_eq!(t.description, String::from("to-be-pending")); + } + #[test] fn get_does_not_exist() { let mut rep = Replica::new(DB::new_inmemory().into()); diff --git a/src/taskdb.rs b/src/taskdb.rs index 2613cc39e..33306f359 100644 --- a/src/taskdb.rs +++ b/src/taskdb.rs @@ -104,24 +104,27 @@ impl DB { txn.get_task(uuid) } - /// Rebuild the working set. This renumbers the pending tasks to eliminate gaps, and also - /// finds any tasks whose statuses changed without being noticed. - pub fn rebuild_working_set(&mut self) -> Fallible<()> { - // TODO: this logic belongs in Replica - // TODO: it's every status but Completed and Deleted, I think? + /// Rebuild the working set using a function to identify tasks that should be in the set. This + /// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that + /// are not already in the working set but should be. The rebuild occurs in a single + /// trasnsaction against the storage backend. + pub fn rebuild_working_set(&mut self, in_working_set: F) -> Fallible<()> + where + F: Fn(&TaskMap) -> bool, + { let mut txn = self.storage.txn()?; let mut new_ws = vec![]; let mut seen = HashSet::new(); - let pending = String::from("pending"); - // The goal here is for existing working-set items to be "compressed' down to index - // 1, so we begin by scanning the current working set and inserting any still-pending - // tasks into the new list + // The goal here is for existing working-set items to be "compressed' down to index 1, so + // we begin by scanning the current working set and inserting any tasks that should still + // be in the set into new_ws, implicitly dropping any tasks that are no longer in the + // working set. for elt in txn.get_working_set()? { if let Some(uuid) = elt { if let Some(task) = txn.get_task(&uuid)? { - if task.get("status") == Some(&pending) { + if in_working_set(&task) { new_ws.push(uuid.clone()); seen.insert(uuid); } @@ -129,24 +132,43 @@ impl DB { } } - // Now go hunting for tasks that are pending and are not already in this list + // Now go hunting for tasks that should be in this list but are not, adding them at the + // end of the list. for (uuid, task) in txn.all_tasks()? { if !seen.contains(&uuid) { - if task.get("status") == Some(&pending) { + if in_working_set(&task) { new_ws.push(uuid.clone()); } } } + // clear and re-write the entire working set, in order txn.clear_working_set()?; for uuid in new_ws.drain(0..new_ws.len()) { - txn.add_to_working_set(uuid)?; + txn.add_to_working_set(&uuid)?; } txn.commit()?; Ok(()) } + /// Add the given uuid to the working set and return its index; if it is already in the working + /// set, its index is returned. This does *not* renumber any existing tasks. + pub fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible { + let mut txn = self.storage.txn()?; + // search for an existing entry for this task.. + for (i, elt) in txn.get_working_set()?.iter().enumerate() { + if *elt == Some(*uuid) { + // (note that this drops the transaction with no changes made) + return Ok(i as u64); + } + } + // and if not found, add one + let i = txn.add_to_working_set(uuid)?; + txn.commit()?; + Ok(i) + } + /// Sync to the given server, pulling remote changes and pushing local changes. pub fn sync(&mut self, username: &str, server: &mut Server) -> Fallible<()> { let mut txn = self.storage.txn()?; @@ -448,7 +470,7 @@ mod tests { txn.clear_working_set()?; for i in &[1usize, 3, 4] { - txn.add_to_working_set(uuids[*i])?; + txn.add_to_working_set(&uuids[*i])?; } txn.commit()?; @@ -464,7 +486,13 @@ mod tests { ] ); - db.rebuild_working_set()?; + db.rebuild_working_set(|t| { + if let Some(status) = t.get("status") { + status == "pending" + } else { + false + } + })?; // uuids[1] and uuids[4] are already in the working set, so are compressed // to the top, and then uuids[0] is added. diff --git a/src/taskstorage/inmemory.rs b/src/taskstorage/inmemory.rs index e89d31bc7..6b272874c 100644 --- a/src/taskstorage/inmemory.rs +++ b/src/taskstorage/inmemory.rs @@ -109,9 +109,9 @@ impl<'t> TaskStorageTxn for Txn<'t> { Ok(self.data_ref().working_set.clone()) } - fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible { + fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible { let working_set = &mut self.mut_data_ref().working_set; - working_set.push(Some(uuid)); + working_set.push(Some(uuid.clone())); Ok(working_set.len() as u64) } @@ -194,8 +194,8 @@ mod test { { let mut txn = storage.txn()?; - txn.add_to_working_set(uuid1.clone())?; - txn.add_to_working_set(uuid2.clone())?; + txn.add_to_working_set(&uuid1)?; + txn.add_to_working_set(&uuid2)?; txn.commit()?; } @@ -216,15 +216,15 @@ mod test { { let mut txn = storage.txn()?; - txn.add_to_working_set(uuid1.clone())?; - txn.add_to_working_set(uuid2.clone())?; + txn.add_to_working_set(&uuid1)?; + txn.add_to_working_set(&uuid2)?; txn.commit()?; } { let mut txn = storage.txn()?; txn.remove_from_working_set(1)?; - txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(&uuid1)?; txn.commit()?; } @@ -244,7 +244,7 @@ mod test { { let mut txn = storage.txn()?; - txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(&uuid1)?; txn.commit()?; } @@ -267,16 +267,16 @@ mod test { { let mut txn = storage.txn()?; - txn.add_to_working_set(uuid1.clone())?; - txn.add_to_working_set(uuid2.clone())?; + txn.add_to_working_set(&uuid1)?; + txn.add_to_working_set(&uuid2)?; txn.commit()?; } { let mut txn = storage.txn()?; txn.clear_working_set()?; - txn.add_to_working_set(uuid2.clone())?; - txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(&uuid2)?; + txn.add_to_working_set(&uuid1)?; txn.commit()?; } diff --git a/src/taskstorage/kv.rs b/src/taskstorage/kv.rs index c8a6d85d5..08f2ade01 100644 --- a/src/taskstorage/kv.rs +++ b/src/taskstorage/kv.rs @@ -307,7 +307,7 @@ impl<'t> TaskStorageTxn for Txn<'t> { Ok(res) } - fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible { + fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible { let working_set_bucket = self.working_set_bucket(); let numbers_bucket = self.numbers_bucket(); let kvtxn = self.kvtxn(); @@ -321,7 +321,7 @@ impl<'t> TaskStorageTxn for Txn<'t> { kvtxn.set( working_set_bucket, next_index.into(), - Msgpack::to_value_buf(uuid)?, + Msgpack::to_value_buf(uuid.clone())?, )?; kvtxn.set( numbers_bucket, @@ -666,8 +666,8 @@ mod test { { let mut txn = storage.txn()?; - txn.add_to_working_set(uuid1.clone())?; - txn.add_to_working_set(uuid2.clone())?; + txn.add_to_working_set(&uuid1)?; + txn.add_to_working_set(&uuid2)?; txn.commit()?; } @@ -689,15 +689,15 @@ mod test { { let mut txn = storage.txn()?; - txn.add_to_working_set(uuid1.clone())?; - txn.add_to_working_set(uuid2.clone())?; + txn.add_to_working_set(&uuid1)?; + txn.add_to_working_set(&uuid2)?; txn.commit()?; } { let mut txn = storage.txn()?; txn.remove_from_working_set(1)?; - txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(&uuid1)?; txn.commit()?; } @@ -718,7 +718,7 @@ mod test { { let mut txn = storage.txn()?; - txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(&uuid1)?; txn.commit()?; } @@ -742,16 +742,16 @@ mod test { { let mut txn = storage.txn()?; - txn.add_to_working_set(uuid1.clone())?; - txn.add_to_working_set(uuid2.clone())?; + txn.add_to_working_set(&uuid1)?; + txn.add_to_working_set(&uuid2)?; txn.commit()?; } { let mut txn = storage.txn()?; txn.clear_working_set()?; - txn.add_to_working_set(uuid2.clone())?; - txn.add_to_working_set(uuid1.clone())?; + txn.add_to_working_set(&uuid2)?; + txn.add_to_working_set(&uuid1)?; txn.commit()?; } diff --git a/src/taskstorage/mod.rs b/src/taskstorage/mod.rs index 92f0216c3..b5681f4de 100644 --- a/src/taskstorage/mod.rs +++ b/src/taskstorage/mod.rs @@ -70,7 +70,7 @@ pub trait TaskStorageTxn { /// Add a task to the working set and return its (one-based) index. This index will be one greater /// than the highest used index. - fn add_to_working_set(&mut self, uuid: Uuid) -> Fallible; + fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible; /// Remove a task from the working set. Other tasks' indexes are not affected. fn remove_from_working_set(&mut self, index: u64) -> Fallible<()>;