use log and env_logger, and add some logging around sync

This commit is contained in:
Dustin J. Mitchell
2020-11-29 18:18:28 -05:00
parent c117494ce6
commit 0a1ee470f7
13 changed files with 109 additions and 14 deletions

View File

@@ -2,6 +2,7 @@ use crate::errors::Error;
use crate::server::{AddVersionResult, GetVersionResult, Server};
use crate::taskstorage::{Operation, TaskMap, TaskStorage, TaskStorageTxn};
use failure::{format_err, Fallible};
use log::{info, trace, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::str;
@@ -172,10 +173,12 @@ impl TaskDB {
// version twice, then we have diverged.
let mut requested_parent_version_id = None;
loop {
trace!("beginning sync outer loop");
let mut base_version_id = txn.base_version()?;
// first pull changes and "rebase" on top of them
loop {
trace!("beginning sync inner loop");
if let GetVersionResult::Version {
version_id,
history_segment,
@@ -184,14 +187,14 @@ impl TaskDB {
{
let version_str = str::from_utf8(&history_segment).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();
println!("applying version {:?} from server", version_id);
// apply this verison and update base_version in storage
info!("applying version {:?} from server", version_id);
TaskDB::apply_version(txn.as_mut(), version)?;
txn.set_base_version(version_id)?;
base_version_id = version_id;
} else {
println!("no child versions of {:?}", base_version_id);
info!("no child versions of {:?}", base_version_id);
// at the moment, no more child versions, so we can try adding our own
break;
}
@@ -199,24 +202,26 @@ impl TaskDB {
let operations: Vec<Operation> = txn.operations()?.to_vec();
if operations.is_empty() {
println!("no changes to push to server");
info!("no changes to push to server");
// nothing to sync back to the server..
break;
}
trace!("sending {} operations to the server", operations.len());
// now make a version of our local changes and push those
let new_version = Version { operations };
let history_segment = serde_json::to_string(&new_version).unwrap().into();
println!("sending new version to server");
info!("sending new version to server");
match server.add_version(base_version_id, history_segment)? {
AddVersionResult::Ok(new_version_id) => {
println!("version {:?} received by server", new_version_id);
info!("version {:?} received by server", new_version_id);
txn.set_base_version(new_version_id)?;
txn.set_operations(vec![])?;
break;
}
AddVersionResult::ExpectedParentVersion(parent_version_id) => {
println!(
info!(
"new version rejected; must be based on {:?}",
parent_version_id
);
@@ -264,22 +269,31 @@ impl TaskDB {
// it. If it happens for server op, then we must copy the remaining local ops.
let mut local_operations: Vec<Operation> = txn.operations()?;
for server_op in version.operations.drain(..) {
trace!(
"rebasing local operations onto server operation {:?}",
server_op
);
let mut new_local_ops = Vec::with_capacity(local_operations.len());
let mut svr_op = Some(server_op);
for local_op in local_operations.drain(..) {
if let Some(o) = svr_op {
let (new_server_op, new_local_op) = Operation::transform(o, local_op);
let (new_server_op, new_local_op) = Operation::transform(o, local_op.clone());
trace!("local operation {:?} -> {:?}", local_op, new_local_op);
svr_op = new_server_op;
if let Some(o) = new_local_op {
new_local_ops.push(o);
}
} else {
trace!(
"local operation {:?} unchanged (server operation consumed)",
local_op
);
new_local_ops.push(local_op);
}
}
if let Some(o) = svr_op {
if let Err(e) = TaskDB::apply_op(txn, &o) {
println!("Invalid operation when syncing: {} (ignored)", e);
warn!("Invalid operation when syncing: {} (ignored)", e);
}
}
local_operations = new_local_ops;