refactor sync to use SyncOps
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use super::snapshot;
|
||||
use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency};
|
||||
use crate::storage::{ReplicaOp, StorageTxn};
|
||||
use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency, SyncOp};
|
||||
use crate::storage::StorageTxn;
|
||||
use crate::Error;
|
||||
use log::{info, trace, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -8,26 +8,26 @@ use std::str;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct Version {
|
||||
operations: Vec<ReplicaOp>,
|
||||
operations: Vec<SyncOp>,
|
||||
}
|
||||
|
||||
/// Apply an op to the TaskDb's set of tasks (without recording it in the list of operations)
|
||||
pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &ReplicaOp) -> anyhow::Result<()> {
|
||||
pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> anyhow::Result<()> {
|
||||
// TODO: it'd be nice if this was integrated into apply() somehow, but that clones TaskMaps
|
||||
// unnecessariliy
|
||||
match op {
|
||||
ReplicaOp::Create { uuid } => {
|
||||
SyncOp::Create { uuid } => {
|
||||
// insert if the task does not already exist
|
||||
if !txn.create_task(*uuid)? {
|
||||
return Err(Error::Database(format!("Task {} already exists", uuid)).into());
|
||||
}
|
||||
}
|
||||
ReplicaOp::Delete { ref uuid } => {
|
||||
SyncOp::Delete { ref uuid } => {
|
||||
if !txn.delete_task(*uuid)? {
|
||||
return Err(Error::Database(format!("Task {} does not exist", uuid)).into());
|
||||
}
|
||||
}
|
||||
ReplicaOp::Update {
|
||||
SyncOp::Update {
|
||||
ref uuid,
|
||||
ref property,
|
||||
ref value,
|
||||
@@ -72,6 +72,12 @@ pub(super) fn sync(
|
||||
trace!("beginning sync outer loop");
|
||||
let mut base_version_id = txn.base_version()?;
|
||||
|
||||
let mut local_ops: Vec<SyncOp> = txn
|
||||
.operations()?
|
||||
.drain(..)
|
||||
.map(|op| op.into_sync())
|
||||
.collect();
|
||||
|
||||
// first pull changes and "rebase" on top of them
|
||||
loop {
|
||||
trace!("beginning sync inner loop");
|
||||
@@ -86,7 +92,7 @@ pub(super) fn sync(
|
||||
|
||||
// apply this verison and update base_version in storage
|
||||
info!("applying version {:?} from server", version_id);
|
||||
apply_version(txn, version)?;
|
||||
apply_version(txn, &mut local_ops, version)?;
|
||||
txn.set_base_version(version_id)?;
|
||||
base_version_id = version_id;
|
||||
} else {
|
||||
@@ -96,17 +102,18 @@ pub(super) fn sync(
|
||||
}
|
||||
}
|
||||
|
||||
let operations: Vec<ReplicaOp> = txn.operations()?.to_vec();
|
||||
if operations.is_empty() {
|
||||
if local_ops.is_empty() {
|
||||
info!("no changes to push to server");
|
||||
// nothing to sync back to the server..
|
||||
break;
|
||||
}
|
||||
|
||||
trace!("sending {} operations to the server", operations.len());
|
||||
trace!("sending {} operations to the server", local_ops.len());
|
||||
|
||||
// now make a version of our local changes and push those
|
||||
let new_version = Version { operations };
|
||||
let new_version = Version {
|
||||
operations: local_ops,
|
||||
};
|
||||
let history_segment = serde_json::to_string(&new_version).unwrap().into();
|
||||
info!("sending new version to server");
|
||||
let (res, snapshot_urgency) = server.add_version(base_version_id, history_segment)?;
|
||||
@@ -114,7 +121,6 @@ pub(super) fn sync(
|
||||
AddVersionResult::Ok(new_version_id) => {
|
||||
info!("version {:?} received by server", new_version_id);
|
||||
txn.set_base_version(new_version_id)?;
|
||||
txn.set_operations(vec![])?;
|
||||
|
||||
// make a snapshot if the server indicates it is urgent enough
|
||||
let base_urgency = if avoid_snapshots {
|
||||
@@ -144,11 +150,16 @@ pub(super) fn sync(
|
||||
}
|
||||
}
|
||||
|
||||
txn.set_operations(vec![])?;
|
||||
txn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Result<()> {
|
||||
fn apply_version(
|
||||
txn: &mut dyn StorageTxn,
|
||||
local_ops: &mut Vec<SyncOp>,
|
||||
mut version: Version,
|
||||
) -> anyhow::Result<()> {
|
||||
// The situation here is that the server has already applied all server operations, and we
|
||||
// have already applied all local operations, so states have diverged by several
|
||||
// operations. We need to figure out what operations to apply locally and on the server in
|
||||
@@ -174,17 +185,16 @@ fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Resu
|
||||
// This is slightly complicated by the fact that the transform function can return None,
|
||||
// indicating no operation is required. If this happens for a local op, we can just omit
|
||||
// it. If it happens for server op, then we must copy the remaining local ops.
|
||||
let mut local_operations: Vec<ReplicaOp> = txn.operations()?;
|
||||
for server_op in version.operations.drain(..) {
|
||||
trace!(
|
||||
"rebasing local operations onto server operation {:?}",
|
||||
server_op
|
||||
);
|
||||
let mut new_local_ops = Vec::with_capacity(local_operations.len());
|
||||
let mut new_local_ops = Vec::with_capacity(local_ops.len());
|
||||
let mut svr_op = Some(server_op);
|
||||
for local_op in local_operations.drain(..) {
|
||||
for local_op in local_ops.drain(..) {
|
||||
if let Some(o) = svr_op {
|
||||
let (new_server_op, new_local_op) = ReplicaOp::transform(o, local_op.clone());
|
||||
let (new_server_op, new_local_op) = SyncOp::transform(o, local_op.clone());
|
||||
trace!("local operation {:?} -> {:?}", local_op, new_local_op);
|
||||
svr_op = new_server_op;
|
||||
if let Some(o) = new_local_op {
|
||||
@@ -203,9 +213,8 @@ fn apply_version(txn: &mut dyn StorageTxn, mut version: Version) -> anyhow::Resu
|
||||
warn!("Invalid operation when syncing: {} (ignored)", e);
|
||||
}
|
||||
}
|
||||
local_operations = new_local_ops;
|
||||
*local_ops = new_local_ops;
|
||||
}
|
||||
txn.set_operations(local_operations)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user