Merge pull request #231 from taskchampion/working_set_rebuild_bug
Fix a working set rebuild bug
This commit is contained in:
@@ -6,7 +6,7 @@ pub(crate) fn execute<W: WriteColor>(
|
||||
replica: &mut Replica,
|
||||
server: &mut Box<dyn Server>,
|
||||
) -> anyhow::Result<()> {
|
||||
replica.sync(server).unwrap();
|
||||
replica.sync(server)?;
|
||||
writeln!(w, "sync complete.")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::storage::{Operation, Storage, TaskMap};
|
||||
use crate::task::{Status, Task};
|
||||
use crate::taskdb::TaskDb;
|
||||
use crate::workingset::WorkingSet;
|
||||
use anyhow::Context;
|
||||
use chrono::Utc;
|
||||
use log::trace;
|
||||
use std::collections::HashMap;
|
||||
@@ -123,8 +124,10 @@ impl Replica {
|
||||
/// this occurs, but without renumbering, so any newly-pending tasks should appear in
|
||||
/// the working set.
|
||||
pub fn sync(&mut self, server: &mut Box<dyn Server>) -> anyhow::Result<()> {
|
||||
self.taskdb.sync(server)?;
|
||||
self.taskdb.sync(server).context("Failed to synchronize")?;
|
||||
self.rebuild_working_set(false)
|
||||
.context("Failed to rebuild working set after sync")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Rebuild this replica's working set, based on whether tasks are pending or not. If
|
||||
|
||||
@@ -308,7 +308,7 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
if index >= next_index {
|
||||
if index < 1 || index >= next_index {
|
||||
anyhow::bail!("Index {} is not in the working set", index);
|
||||
}
|
||||
|
||||
@@ -319,7 +319,11 @@ impl<'t> StorageTxn for Txn<'t> {
|
||||
Msgpack::to_value_buf(uuid)?,
|
||||
)?;
|
||||
} else {
|
||||
kvtxn.del(working_set_bucket, index.into())?;
|
||||
match kvtxn.del(working_set_bucket, index.into()) {
|
||||
Ok(_) => {}
|
||||
Err(Error::NotFound) => {}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -650,6 +654,82 @@ mod test {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_working_set_item() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
let uuid2 = Uuid::new_v4();
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.add_to_working_set(uuid1)?;
|
||||
txn.add_to_working_set(uuid2)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.set_working_set_item(1, Some(uuid2))?;
|
||||
txn.set_working_set_item(2, None)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
let ws = txn.get_working_set()?;
|
||||
assert_eq!(ws, vec![None, Some(uuid2), None]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_working_set_item_nonexistent() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.add_to_working_set(uuid1)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.set_working_set_item(1, None)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
// set it to None again, to check idempotency
|
||||
txn.set_working_set_item(1, None)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
let ws = txn.get_working_set()?;
|
||||
assert_eq!(ws, vec![None, None]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_working_set_item_zero() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
|
||||
let mut txn = storage.txn()?;
|
||||
assert!(txn.set_working_set_item(0, Some(uuid1)).is_err());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clear_working_set() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
|
||||
@@ -117,14 +117,14 @@ impl TaskDb {
|
||||
{
|
||||
let mut txn = self.storage.txn()?;
|
||||
|
||||
let mut new_ws = vec![];
|
||||
let mut new_ws = vec![None]; // index 0 is always None
|
||||
let mut seen = HashSet::new();
|
||||
|
||||
// The goal here is for existing working-set items to be "compressed' down to index 1, so
|
||||
// we begin by scanning the current working set and inserting any tasks that should still
|
||||
// be in the set into new_ws, implicitly dropping any tasks that are no longer in the
|
||||
// working set.
|
||||
for elt in txn.get_working_set()? {
|
||||
for elt in txn.get_working_set()?.drain(1..) {
|
||||
if let Some(uuid) = elt {
|
||||
if let Some(task) = txn.get_task(uuid)? {
|
||||
if in_working_set(&task) {
|
||||
@@ -144,12 +144,12 @@ impl TaskDb {
|
||||
// if renumbering, clear the working set and re-add
|
||||
if renumber {
|
||||
txn.clear_working_set()?;
|
||||
for uuid in new_ws.drain(0..new_ws.len()).flatten() {
|
||||
txn.add_to_working_set(uuid)?;
|
||||
for elt in new_ws.drain(1..new_ws.len()).flatten() {
|
||||
txn.add_to_working_set(elt)?;
|
||||
}
|
||||
} else {
|
||||
// ..otherwise, just clear the None items determined above from the working set
|
||||
for (i, elt) in new_ws.iter().enumerate() {
|
||||
for (i, elt) in new_ws.iter().enumerate().skip(1) {
|
||||
if elt.is_none() {
|
||||
txn.set_working_set_item(i, None)?;
|
||||
}
|
||||
|
||||
@@ -21,6 +21,10 @@ impl WorkingSet {
|
||||
/// Create a new WorkingSet. Typically this is acquired via `replica.working_set()`
|
||||
pub(crate) fn new(by_index: Vec<Option<Uuid>>) -> Self {
|
||||
let mut by_uuid = HashMap::new();
|
||||
|
||||
// working sets are 1-indexed, so element 0 should always be None
|
||||
assert!(by_index.is_empty() || by_index[0].is_none());
|
||||
|
||||
for (index, uuid) in by_index.iter().enumerate() {
|
||||
if let Some(uuid) = uuid {
|
||||
by_uuid.insert(*uuid, index);
|
||||
|
||||
Reference in New Issue
Block a user