support rebuilding the working set

This commit is contained in:
Dustin J. Mitchell
2020-01-19 15:55:15 -05:00
parent 12980da5fd
commit 61b2de132b
4 changed files with 157 additions and 5 deletions

View File

@@ -1,12 +1,13 @@
* assign types to properties in Replica
* move rebuild_working_set logic to replica.rs, also include states !=
completed / deleted
* [WIP] assign types to properties in Replica
- db / operation model is just k/v, but formatted names can be used for
structure:
- dependencies: `dependency.<uuid> = ""`
- annotations: `annotation.<epoch time> = "annotation"`
- tags: `tags.<tag> = ""`
* add HTTP API
* add pending-task indexing to Replica
* abstract server into trait
* add HTTP API
* fix TODO items in replica.rs
* implement snapshot requests
* implement backups

View File

@@ -17,6 +17,8 @@ fn main() {
),
)
.subcommand(SubCommand::with_name("list").about("lists tasks"))
.subcommand(SubCommand::with_name("pending").about("lists pending tasks"))
.subcommand(SubCommand::with_name("gc").about("run garbage collection"))
.get_matches();
let mut replica = Replica::new(
@@ -38,10 +40,21 @@ fn main() {
.unwrap();
}
("list", _) => {
for task in replica.all_tasks().unwrap() {
println!("{:?}", task);
for (uuid, task) in replica.all_tasks().unwrap() {
println!("{} - {:?}", uuid, task);
}
}
("pending", _) => {
let working_set = replica.working_set().unwrap();
for i in 1..working_set.len() {
if let Some((ref uuid, ref task)) = working_set[i] {
println!("{}: {} - {:?}", i, uuid, task);
}
}
}
("gc", _) => {
replica.gc().unwrap();
}
("", None) => {
unreachable!();
}

View File

@@ -48,6 +48,23 @@ impl Replica {
self.taskdb.all_task_uuids()
}
/// Get the "working set" for this replica -- the set of pending tasks, as indexed by small
/// integers
pub fn working_set(&mut self) -> Fallible<Vec<Option<(Uuid, Task)>>> {
let working_set = self.taskdb.working_set()?;
let mut res = Vec::with_capacity(working_set.len());
for i in 0..working_set.len() {
res.push(match working_set[i] {
Some(u) => match self.taskdb.get_task(&u)? {
Some(task) => Some((u, (&task).into())),
None => None,
},
None => None,
})
}
Ok(res)
}
/// Get an existing task by its UUID
pub fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<Task>> {
Ok(self.taskdb.get_task(&uuid)?.map(|t| (&t).into()))
@@ -85,6 +102,13 @@ impl Replica {
.get_task(&uuid)?
.map(move |_| TaskMut::new(self, uuid.clone())))
}
/// Perform "garbage collection" on this replica. In particular, this renumbers the working
/// set.
pub fn gc(&mut self) -> Fallible<()> {
self.taskdb.rebuild_working_set()?;
Ok(())
}
}
impl From<&TaskMap> for Task {

View File

@@ -4,6 +4,7 @@ use crate::server::{Server, VersionAdd};
use crate::taskstorage::{TaskMap, TaskStorage, TaskStorageTxn};
use failure::Fallible;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::str;
use uuid::Uuid;
@@ -64,6 +65,7 @@ impl DB {
// update if this task exists, otherwise ignore
if let Some(task) = txn.get_task(uuid)? {
let mut task = task.clone();
// TODO: update working_set if this is changing state to or from pending
match value {
Some(ref val) => task.insert(property.to_string(), val.clone()),
None => task.remove(property),
@@ -90,12 +92,61 @@ impl DB {
txn.all_task_uuids()
}
/// Get the working set
pub fn working_set<'a>(&'a mut self) -> Fallible<Vec<Option<Uuid>>> {
let mut txn = self.storage.txn()?;
txn.get_working_set()
}
/// Get a single task, by uuid.
pub fn get_task(&mut self, uuid: &Uuid) -> Fallible<Option<TaskMap>> {
let mut txn = self.storage.txn()?;
txn.get_task(uuid)
}
/// Rebuild the working set. This renumbers the pending tasks to eliminate gaps, and also
/// finds any tasks whose statuses changed without being noticed.
pub fn rebuild_working_set(&mut self) -> Fallible<()> {
// TODO: this logic belongs in Replica
// TODO: it's every status but Completed and Deleted, I think?
let mut txn = self.storage.txn()?;
let mut new_ws = vec![];
let mut seen = HashSet::new();
let pending = String::from("pending");
// The goal here is for existing working-set items to be "compressed' down to index
// 1, so we begin by scanning the current working set and inserting any still-pending
// tasks into the new list
for elt in txn.get_working_set()? {
if let Some(uuid) = elt {
if let Some(task) = txn.get_task(&uuid)? {
if task.get("status") == Some(&pending) {
new_ws.push(uuid.clone());
seen.insert(uuid);
}
}
}
}
// Now go hunting for tasks that are pending and are not already in this list
for (uuid, task) in txn.all_tasks()? {
if !seen.contains(&uuid) {
if task.get("status") == Some(&pending) {
new_ws.push(uuid.clone());
}
}
}
txn.clear_working_set()?;
for uuid in new_ws.drain(0..new_ws.len()) {
txn.add_to_working_set(uuid)?;
}
txn.commit()?;
Ok(())
}
/// Sync to the given server, pulling remote changes and pushing local changes.
pub fn sync(&mut self, username: &str, server: &mut Server) -> Fallible<()> {
let mut txn = self.storage.txn()?;
@@ -366,4 +417,67 @@ mod tests {
assert_eq!(db.sorted_tasks(), vec![]);
assert_eq!(db.operations(), vec![]);
}
#[test]
fn rebuild_working_set() -> Fallible<()> {
let mut db = DB::new_inmemory();
let uuids = vec![
Uuid::new_v4(), // 0: pending, not already in working set
Uuid::new_v4(), // 1: pending, already in working set
Uuid::new_v4(), // 2: not pending, not already in working set
Uuid::new_v4(), // 3: not pending, already in working set
Uuid::new_v4(), // 4: pending, already in working set
];
// add everything to the DB
for uuid in &uuids {
db.apply(Operation::Create { uuid: uuid.clone() })?;
}
for i in &[0usize, 1, 4] {
db.apply(Operation::Update {
uuid: uuids[*i].clone(),
property: String::from("status"),
value: Some("pending".into()),
timestamp: Utc::now(),
})?;
}
// set the existing working_set as we want it
{
let mut txn = db.storage.txn()?;
txn.clear_working_set()?;
for i in &[1usize, 3, 4] {
txn.add_to_working_set(uuids[*i])?;
}
txn.commit()?;
}
assert_eq!(
db.working_set()?,
vec![
None,
Some(uuids[1].clone()),
Some(uuids[3].clone()),
Some(uuids[4].clone())
]
);
db.rebuild_working_set()?;
// uuids[1] and uuids[4] are already in the working set, so are compressed
// to the top, and then uuids[0] is added.
assert_eq!(
db.working_set()?,
vec![
None,
Some(uuids[1].clone()),
Some(uuids[4].clone()),
Some(uuids[0].clone())
]
);
Ok(())
}
}