Control whether to renumber the working set when rebuilding it
This commit is contained in:
@@ -4,7 +4,7 @@ use termcolor::WriteColor;
|
|||||||
|
|
||||||
pub(crate) fn execute<W: WriteColor>(w: &mut W, replica: &mut Replica) -> Fallible<()> {
|
pub(crate) fn execute<W: WriteColor>(w: &mut W, replica: &mut Replica) -> Fallible<()> {
|
||||||
log::debug!("rebuilding working set");
|
log::debug!("rebuilding working set");
|
||||||
replica.rebuild_working_set()?;
|
replica.rebuild_working_set(true)?;
|
||||||
writeln!(w, "garbage collected.")?;
|
writeln!(w, "garbage collected.")?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -184,7 +184,7 @@ mod test {
|
|||||||
let t1 = replica.new_task(Status::Pending, s!("A")).unwrap();
|
let t1 = replica.new_task(Status::Pending, s!("A")).unwrap();
|
||||||
let t2 = replica.new_task(Status::Completed, s!("B")).unwrap();
|
let t2 = replica.new_task(Status::Completed, s!("B")).unwrap();
|
||||||
let _t = replica.new_task(Status::Pending, s!("C")).unwrap();
|
let _t = replica.new_task(Status::Pending, s!("C")).unwrap();
|
||||||
replica.rebuild_working_set().unwrap();
|
replica.rebuild_working_set(true).unwrap();
|
||||||
|
|
||||||
let t1uuid = *t1.get_uuid();
|
let t1uuid = *t1.get_uuid();
|
||||||
|
|
||||||
@@ -210,7 +210,7 @@ mod test {
|
|||||||
let t1 = replica.new_task(Status::Pending, s!("A")).unwrap();
|
let t1 = replica.new_task(Status::Pending, s!("A")).unwrap();
|
||||||
let t2 = replica.new_task(Status::Completed, s!("B")).unwrap();
|
let t2 = replica.new_task(Status::Completed, s!("B")).unwrap();
|
||||||
let _t = replica.new_task(Status::Pending, s!("C")).unwrap();
|
let _t = replica.new_task(Status::Pending, s!("C")).unwrap();
|
||||||
replica.rebuild_working_set().unwrap();
|
replica.rebuild_working_set(true).unwrap();
|
||||||
|
|
||||||
let t1uuid = *t1.get_uuid();
|
let t1uuid = *t1.get_uuid();
|
||||||
let t2uuid = t2.get_uuid().to_string();
|
let t2uuid = t2.get_uuid().to_string();
|
||||||
@@ -238,7 +238,7 @@ mod test {
|
|||||||
replica.new_task(Status::Pending, s!("A")).unwrap();
|
replica.new_task(Status::Pending, s!("A")).unwrap();
|
||||||
replica.new_task(Status::Completed, s!("B")).unwrap();
|
replica.new_task(Status::Completed, s!("B")).unwrap();
|
||||||
replica.new_task(Status::Deleted, s!("C")).unwrap();
|
replica.new_task(Status::Deleted, s!("C")).unwrap();
|
||||||
replica.rebuild_working_set().unwrap();
|
replica.rebuild_working_set(true).unwrap();
|
||||||
|
|
||||||
let filter = Filter { conditions: vec![] };
|
let filter = Filter { conditions: vec![] };
|
||||||
let mut filtered: Vec<_> = filtered_tasks(&mut replica, &filter)
|
let mut filtered: Vec<_> = filtered_tasks(&mut replica, &filter)
|
||||||
@@ -309,7 +309,7 @@ mod test {
|
|||||||
replica.new_task(Status::Pending, s!("A")).unwrap();
|
replica.new_task(Status::Pending, s!("A")).unwrap();
|
||||||
replica.new_task(Status::Completed, s!("B")).unwrap();
|
replica.new_task(Status::Completed, s!("B")).unwrap();
|
||||||
replica.new_task(Status::Deleted, s!("C")).unwrap();
|
replica.new_task(Status::Deleted, s!("C")).unwrap();
|
||||||
replica.rebuild_working_set().unwrap();
|
replica.rebuild_working_set(true).unwrap();
|
||||||
|
|
||||||
let filter = Filter {
|
let filter = Filter {
|
||||||
conditions: vec![Condition::Status(Status::Pending)],
|
conditions: vec![Condition::Status(Status::Pending)],
|
||||||
|
|||||||
@@ -163,7 +163,7 @@ mod test {
|
|||||||
t2.set_status(Status::Completed).unwrap();
|
t2.set_status(Status::Completed).unwrap();
|
||||||
let t2 = t2.into_immut();
|
let t2 = t2.into_immut();
|
||||||
|
|
||||||
replica.rebuild_working_set().unwrap();
|
replica.rebuild_working_set(true).unwrap();
|
||||||
|
|
||||||
[*t1.get_uuid(), *t2.get_uuid(), *t3.get_uuid()]
|
[*t1.get_uuid(), *t2.get_uuid(), *t3.get_uuid()]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -170,12 +170,14 @@ impl Replica {
|
|||||||
self.taskdb.sync(server)
|
self.taskdb.sync(server)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Perform "garbage collection" on this replica. In particular, this renumbers the working
|
/// Rebuild this replica's working set, based on whether tasks are pending or not. If
|
||||||
/// set to contain only pending tasks.
|
/// `renumber` is true, then existing tasks may be moved to new working-set indices; in any
|
||||||
pub fn rebuild_working_set(&mut self) -> Fallible<()> {
|
/// case, on completion all pending tasks are in the working set and all non- pending tasks are
|
||||||
|
/// not.
|
||||||
|
pub fn rebuild_working_set(&mut self, renumber: bool) -> Fallible<()> {
|
||||||
let pending = String::from(Status::Pending.to_taskmap());
|
let pending = String::from(Status::Pending.to_taskmap());
|
||||||
self.taskdb
|
self.taskdb
|
||||||
.rebuild_working_set(|t| t.get("status") == Some(&pending))?;
|
.rebuild_working_set(|t| t.get("status") == Some(&pending), renumber)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -251,7 +253,7 @@ mod tests {
|
|||||||
assert_eq!(t.get_status(), Status::Deleted);
|
assert_eq!(t.get_status(), Status::Deleted);
|
||||||
assert_eq!(t.get_description(), "gone");
|
assert_eq!(t.get_description(), "gone");
|
||||||
|
|
||||||
rep.rebuild_working_set().unwrap();
|
rep.rebuild_working_set(true).unwrap();
|
||||||
|
|
||||||
assert!(rep.get_working_set_index(t.get_uuid()).unwrap().is_none());
|
assert!(rep.get_working_set_index(t.get_uuid()).unwrap().is_none());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,9 @@ use std::collections::HashSet;
|
|||||||
use std::str;
|
use std::str;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
/// A TaskDB is the backend for a replica. It manages the storage, operations, synchronization,
|
||||||
|
/// and so on, and all the invariants that come with it. It leaves the meaning of particular task
|
||||||
|
/// properties to the replica and task implementations.
|
||||||
pub struct TaskDB {
|
pub struct TaskDB {
|
||||||
storage: Box<dyn TaskStorage>,
|
storage: Box<dyn TaskStorage>,
|
||||||
}
|
}
|
||||||
@@ -105,7 +108,7 @@ impl TaskDB {
|
|||||||
/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that
|
/// renumbers the existing working-set tasks to eliminate gaps, and also adds any tasks that
|
||||||
/// are not already in the working set but should be. The rebuild occurs in a single
|
/// are not already in the working set but should be. The rebuild occurs in a single
|
||||||
/// trasnsaction against the storage backend.
|
/// trasnsaction against the storage backend.
|
||||||
pub fn rebuild_working_set<F>(&mut self, in_working_set: F) -> Fallible<()>
|
pub fn rebuild_working_set<F>(&mut self, in_working_set: F, renumber: bool) -> Fallible<()>
|
||||||
where
|
where
|
||||||
F: Fn(&TaskMap) -> bool,
|
F: Fn(&TaskMap) -> bool,
|
||||||
{
|
{
|
||||||
@@ -122,27 +125,44 @@ impl TaskDB {
|
|||||||
if let Some(uuid) = elt {
|
if let Some(uuid) = elt {
|
||||||
if let Some(task) = txn.get_task(&uuid)? {
|
if let Some(task) = txn.get_task(&uuid)? {
|
||||||
if in_working_set(&task) {
|
if in_working_set(&task) {
|
||||||
new_ws.push(uuid);
|
new_ws.push(Some(uuid));
|
||||||
seen.insert(uuid);
|
seen.insert(uuid);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if we are not renumbering, then insert a blank working-set entry here
|
||||||
|
if !renumber {
|
||||||
|
new_ws.push(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if renumbering, clear the working set and re-add
|
||||||
|
if renumber {
|
||||||
|
txn.clear_working_set()?;
|
||||||
|
for elt in new_ws.drain(0..new_ws.len()) {
|
||||||
|
if let Some(uuid) = elt {
|
||||||
|
txn.add_to_working_set(&uuid)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// ..otherwise, just clear the None items determined above from the working set
|
||||||
|
for (i, elt) in new_ws.iter().enumerate() {
|
||||||
|
if elt.is_none() {
|
||||||
|
txn.set_working_set_item(i, None)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now go hunting for tasks that should be in this list but are not, adding them at the
|
// Now go hunting for tasks that should be in this list but are not, adding them at the
|
||||||
// end of the list.
|
// end of the list, whether renumbering or not
|
||||||
for (uuid, task) in txn.all_tasks()? {
|
for (uuid, task) in txn.all_tasks()? {
|
||||||
if !seen.contains(&uuid) && in_working_set(&task) {
|
if !seen.contains(&uuid) && in_working_set(&task) {
|
||||||
new_ws.push(uuid);
|
txn.add_to_working_set(&uuid)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// clear and re-write the entire working set, in order
|
|
||||||
txn.clear_working_set()?;
|
|
||||||
for uuid in new_ws.drain(0..new_ws.len()) {
|
|
||||||
txn.add_to_working_set(&uuid)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
txn.commit()?;
|
txn.commit()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -482,15 +502,28 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn rebuild_working_set() -> Fallible<()> {
|
fn rebuild_working_set_renumber() -> Fallible<()> {
|
||||||
|
rebuild_working_set(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rebuild_working_set_no_renumber() -> Fallible<()> {
|
||||||
|
rebuild_working_set(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rebuild_working_set(renumber: bool) -> Fallible<()> {
|
||||||
let mut db = TaskDB::new_inmemory();
|
let mut db = TaskDB::new_inmemory();
|
||||||
let uuids = vec![
|
let mut uuids = vec![];
|
||||||
Uuid::new_v4(), // 0: pending, not already in working set
|
uuids.push(Uuid::new_v4());
|
||||||
Uuid::new_v4(), // 1: pending, already in working set
|
println!("uuids[0]: {:?} - pending, not in working set", uuids[0]);
|
||||||
Uuid::new_v4(), // 2: not pending, not already in working set
|
uuids.push(Uuid::new_v4());
|
||||||
Uuid::new_v4(), // 3: not pending, already in working set
|
println!("uuids[1]: {:?} - pending, in working set", uuids[1]);
|
||||||
Uuid::new_v4(), // 4: pending, already in working set
|
uuids.push(Uuid::new_v4());
|
||||||
];
|
println!("uuids[2]: {:?} - not pending, not in working set", uuids[2]);
|
||||||
|
uuids.push(Uuid::new_v4());
|
||||||
|
println!("uuids[3]: {:?} - not pending, in working set", uuids[3]);
|
||||||
|
uuids.push(Uuid::new_v4());
|
||||||
|
println!("uuids[4]: {:?} - pending, in working set", uuids[4]);
|
||||||
|
|
||||||
// add everything to the TaskDB
|
// add everything to the TaskDB
|
||||||
for uuid in &uuids {
|
for uuid in &uuids {
|
||||||
@@ -527,25 +560,39 @@ mod tests {
|
|||||||
]
|
]
|
||||||
);
|
);
|
||||||
|
|
||||||
db.rebuild_working_set(|t| {
|
db.rebuild_working_set(
|
||||||
if let Some(status) = t.get("status") {
|
|t| {
|
||||||
status == "pending"
|
if let Some(status) = t.get("status") {
|
||||||
} else {
|
status == "pending"
|
||||||
false
|
} else {
|
||||||
}
|
false
|
||||||
})?;
|
}
|
||||||
|
},
|
||||||
|
renumber,
|
||||||
|
)?;
|
||||||
|
|
||||||
// uuids[1] and uuids[4] are already in the working set, so are compressed
|
let exp = if renumber {
|
||||||
// to the top, and then uuids[0] is added.
|
// uuids[1] and uuids[4] are already in the working set, so are compressed
|
||||||
assert_eq!(
|
// to the top, and then uuids[0] is added.
|
||||||
db.working_set()?,
|
|
||||||
vec![
|
vec![
|
||||||
None,
|
None,
|
||||||
Some(uuids[1].clone()),
|
Some(uuids[1].clone()),
|
||||||
Some(uuids[4].clone()),
|
Some(uuids[4].clone()),
|
||||||
Some(uuids[0].clone())
|
Some(uuids[0].clone()),
|
||||||
]
|
]
|
||||||
);
|
} else {
|
||||||
|
// uuids[1] and uuids[4] are already in the working set, at indexes 1 and 3,
|
||||||
|
// and then uuids[0] is added.
|
||||||
|
vec![
|
||||||
|
None,
|
||||||
|
Some(uuids[1].clone()),
|
||||||
|
None,
|
||||||
|
Some(uuids[4].clone()),
|
||||||
|
Some(uuids[0].clone()),
|
||||||
|
]
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(db.working_set()?, exp);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
use crate::taskstorage::{
|
use crate::taskstorage::{
|
||||||
Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION,
|
Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION,
|
||||||
};
|
};
|
||||||
use failure::Fallible;
|
use failure::{bail, Fallible};
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@@ -114,6 +114,15 @@ impl<'t> TaskStorageTxn for Txn<'t> {
|
|||||||
Ok(working_set.len())
|
Ok(working_set.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> Fallible<()> {
|
||||||
|
let working_set = &mut self.mut_data_ref().working_set;
|
||||||
|
if index >= working_set.len() {
|
||||||
|
bail!("Index {} is not in the working set", index);
|
||||||
|
}
|
||||||
|
working_set[index] = uuid;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn clear_working_set(&mut self) -> Fallible<()> {
|
fn clear_working_set(&mut self) -> Fallible<()> {
|
||||||
self.mut_data_ref().working_set = vec![None];
|
self.mut_data_ref().working_set = vec![None];
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ use crate::taskstorage::{
|
|||||||
Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION,
|
Operation, TaskMap, TaskStorage, TaskStorageTxn, VersionId, DEFAULT_BASE_VERSION,
|
||||||
};
|
};
|
||||||
use crate::utils::Key;
|
use crate::utils::Key;
|
||||||
use failure::Fallible;
|
use failure::{bail, Fallible};
|
||||||
use kv::msgpack::Msgpack;
|
use kv::msgpack::Msgpack;
|
||||||
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
|
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
@@ -299,6 +299,35 @@ impl<'t> TaskStorageTxn for Txn<'t> {
|
|||||||
Ok(next_index as usize)
|
Ok(next_index as usize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> Fallible<()> {
|
||||||
|
let working_set_bucket = self.working_set_bucket();
|
||||||
|
let numbers_bucket = self.numbers_bucket();
|
||||||
|
let kvtxn = self.kvtxn();
|
||||||
|
let index = index as u64;
|
||||||
|
|
||||||
|
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
|
||||||
|
Ok(buf) => buf.inner()?.to_serde(),
|
||||||
|
Err(Error::NotFound) => 1,
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
if index >= next_index {
|
||||||
|
bail!("Index {} is not in the working set", index);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(uuid) = uuid {
|
||||||
|
kvtxn.set(
|
||||||
|
working_set_bucket,
|
||||||
|
index.into(),
|
||||||
|
Msgpack::to_value_buf(uuid)?,
|
||||||
|
)?;
|
||||||
|
} else {
|
||||||
|
kvtxn.del(working_set_bucket, index.into())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn clear_working_set(&mut self) -> Fallible<()> {
|
fn clear_working_set(&mut self) -> Fallible<()> {
|
||||||
let working_set_bucket = self.working_set_bucket();
|
let working_set_bucket = self.working_set_bucket();
|
||||||
let numbers_bucket = self.numbers_bucket();
|
let numbers_bucket = self.numbers_bucket();
|
||||||
|
|||||||
@@ -88,6 +88,10 @@ pub trait TaskStorageTxn {
|
|||||||
/// than the highest used index.
|
/// than the highest used index.
|
||||||
fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<usize>;
|
fn add_to_working_set(&mut self, uuid: &Uuid) -> Fallible<usize>;
|
||||||
|
|
||||||
|
/// Update the working set task at the given index. This cannot add a new item to the
|
||||||
|
/// working set.
|
||||||
|
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> Fallible<()>;
|
||||||
|
|
||||||
/// Clear all tasks from the working set in preparation for a garbage-collection operation.
|
/// Clear all tasks from the working set in preparation for a garbage-collection operation.
|
||||||
/// Note that this is the only way items are removed from the set.
|
/// Note that this is the only way items are removed from the set.
|
||||||
fn clear_working_set(&mut self) -> Fallible<()>;
|
fn clear_working_set(&mut self) -> Fallible<()>;
|
||||||
|
|||||||
Reference in New Issue
Block a user