Send snapshots to server

This commit is contained in:
Dustin J. Mitchell
2021-10-10 17:27:29 -04:00
parent 13a96efacb
commit b97f6dc4d5
6 changed files with 206 additions and 104 deletions

View File

@@ -1,6 +1,6 @@
use crate::server::{ use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId, AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
NIL_VERSION_ID, VersionId, NIL_VERSION_ID,
}; };
use crate::storage::sqlite::StoredUuid; use crate::storage::sqlite::StoredUuid;
use anyhow::Context; use anyhow::Context;
@@ -111,8 +111,6 @@ impl Server for LocalServer {
// TODO: better transaction isolation for add_version (gets and sets should be in the same // TODO: better transaction isolation for add_version (gets and sets should be in the same
// transaction) // transaction)
/// Add a new version. If the given version number is incorrect, this responds with the
/// appropriate version and expects the caller to try again.
fn add_version( fn add_version(
&mut self, &mut self,
parent_version_id: VersionId, parent_version_id: VersionId,
@@ -143,7 +141,6 @@ impl Server for LocalServer {
Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None)) Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None))
} }
/// Get a vector of all versions after `since_version`
fn get_child_version( fn get_child_version(
&mut self, &mut self,
parent_version_id: VersionId, parent_version_id: VersionId,
@@ -158,6 +155,11 @@ impl Server for LocalServer {
Ok(GetVersionResult::NoSuchVersion) Ok(GetVersionResult::NoSuchVersion)
} }
} }
fn add_snapshot(&mut self, _version_id: VersionId, _snapshot: Snapshot) -> anyhow::Result<()> {
// the local server never requests a snapshot, so it should never get one
unreachable!()
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -1,5 +1,4 @@
use crate::server::HistorySegment; use crate::server::HistorySegment;
use std::convert::TryFrom;
use std::io::Read; use std::io::Read;
use tindercrypt::cryptors::RingCryptor; use tindercrypt::cryptors::RingCryptor;
use uuid::Uuid; use uuid::Uuid;
@@ -18,45 +17,31 @@ impl AsRef<[u8]> for Secret {
} }
} }
/// A cleartext payload containing a history segment. /// A cleartext payload with an attached version_id. The version_id is used to
pub(super) struct HistoryCleartext { /// validate the context of the payload.
pub(super) parent_version_id: Uuid, pub(super) struct Cleartext {
pub(super) history_segment: HistorySegment, pub(super) version_id: Uuid,
pub(super) payload: HistorySegment,
} }
impl HistoryCleartext { impl Cleartext {
/// Seal the payload into its ciphertext /// Seal the payload into its ciphertext
pub(super) fn seal(self, secret: &Secret) -> anyhow::Result<HistoryCiphertext> { pub(super) fn seal(self, secret: &Secret) -> anyhow::Result<Ciphertext> {
let cryptor = RingCryptor::new().with_aad(self.parent_version_id.as_bytes()); let cryptor = RingCryptor::new().with_aad(self.version_id.as_bytes());
let ciphertext = cryptor.seal_with_passphrase(secret.as_ref(), &self.history_segment)?; let ciphertext = cryptor.seal_with_passphrase(secret.as_ref(), &self.payload)?;
Ok(HistoryCiphertext(ciphertext)) Ok(Ciphertext(ciphertext))
} }
} }
/// An ecrypted payload containing a history segment /// An ecrypted payload
pub(super) struct HistoryCiphertext(pub(super) Vec<u8>); pub(super) struct Ciphertext(pub(super) Vec<u8>);
impl HistoryCiphertext { impl Ciphertext {
pub(super) fn open( pub(super) fn from_resp(
self, resp: ureq::Response,
secret: &Secret, content_type: &str,
parent_version_id: Uuid, ) -> Result<Ciphertext, anyhow::Error> {
) -> anyhow::Result<HistoryCleartext> { if resp.header("Content-Type") == Some(content_type) {
let cryptor = RingCryptor::new().with_aad(parent_version_id.as_bytes());
let plaintext = cryptor.open(secret.as_ref(), &self.0)?;
Ok(HistoryCleartext {
parent_version_id,
history_segment: plaintext,
})
}
}
impl TryFrom<ureq::Response> for HistoryCiphertext {
type Error = anyhow::Error;
fn try_from(resp: ureq::Response) -> Result<HistoryCiphertext, anyhow::Error> {
if let Some("application/vnd.taskchampion.history-segment") = resp.header("Content-Type") {
let mut reader = resp.into_reader(); let mut reader = resp.into_reader();
let mut bytes = vec![]; let mut bytes = vec![];
reader.read_to_end(&mut bytes)?; reader.read_to_end(&mut bytes)?;
@@ -67,9 +52,19 @@ impl TryFrom<ureq::Response> for HistoryCiphertext {
)) ))
} }
} }
pub(super) fn open(self, secret: &Secret, version_id: Uuid) -> anyhow::Result<Cleartext> {
let cryptor = RingCryptor::new().with_aad(version_id.as_bytes());
let plaintext = cryptor.open(secret.as_ref(), &self.0)?;
Ok(Cleartext {
version_id,
payload: plaintext,
})
}
} }
impl AsRef<[u8]> for HistoryCiphertext { impl AsRef<[u8]> for Ciphertext {
fn as_ref(&self) -> &[u8] { fn as_ref(&self) -> &[u8] {
self.0.as_ref() self.0.as_ref()
} }
@@ -82,52 +77,50 @@ mod test {
#[test] #[test]
fn round_trip() { fn round_trip() {
let parent_version_id = Uuid::new_v4(); let version_id = Uuid::new_v4();
let history_segment = b"HISTORY REPEATS ITSELF".to_vec(); let payload = b"HISTORY REPEATS ITSELF".to_vec();
let secret = Secret(b"SEKRIT".to_vec()); let secret = Secret(b"SEKRIT".to_vec());
let history_cleartext = HistoryCleartext { let cleartext = Cleartext {
parent_version_id, version_id,
history_segment: history_segment.clone(), payload: payload.clone(),
}; };
let history_ciphertext = history_cleartext.seal(&secret).unwrap(); let ciphertext = cleartext.seal(&secret).unwrap();
let history_cleartext = history_ciphertext.open(&secret, parent_version_id).unwrap(); let cleartext = ciphertext.open(&secret, version_id).unwrap();
assert_eq!(history_cleartext.history_segment, history_segment); assert_eq!(cleartext.payload, payload);
assert_eq!(history_cleartext.parent_version_id, parent_version_id); assert_eq!(cleartext.version_id, version_id);
} }
#[test] #[test]
fn round_trip_bad_key() { fn round_trip_bad_key() {
let parent_version_id = Uuid::new_v4(); let version_id = Uuid::new_v4();
let history_segment = b"HISTORY REPEATS ITSELF".to_vec(); let payload = b"HISTORY REPEATS ITSELF".to_vec();
let secret = Secret(b"SEKRIT".to_vec()); let secret = Secret(b"SEKRIT".to_vec());
let history_cleartext = HistoryCleartext { let cleartext = Cleartext {
parent_version_id, version_id,
history_segment: history_segment.clone(), payload: payload.clone(),
}; };
let history_ciphertext = history_cleartext.seal(&secret).unwrap(); let ciphertext = cleartext.seal(&secret).unwrap();
let secret = Secret(b"BADSEKRIT".to_vec()); let secret = Secret(b"BADSEKRIT".to_vec());
assert!(history_ciphertext.open(&secret, parent_version_id).is_err()); assert!(ciphertext.open(&secret, version_id).is_err());
} }
#[test] #[test]
fn round_trip_bad_pvid() { fn round_trip_bad_version() {
let parent_version_id = Uuid::new_v4(); let version_id = Uuid::new_v4();
let history_segment = b"HISTORY REPEATS ITSELF".to_vec(); let payload = b"HISTORY REPEATS ITSELF".to_vec();
let secret = Secret(b"SEKRIT".to_vec()); let secret = Secret(b"SEKRIT".to_vec());
let history_cleartext = HistoryCleartext { let cleartext = Cleartext {
parent_version_id, version_id,
history_segment: history_segment.clone(), payload: payload.clone(),
}; };
let history_ciphertext = history_cleartext.seal(&secret).unwrap(); let ciphertext = cleartext.seal(&secret).unwrap();
let bad_parent_version_id = Uuid::new_v4(); let bad_version_id = Uuid::new_v4();
assert!(history_ciphertext assert!(ciphertext.open(&secret, bad_version_id).is_err());
.open(&secret, bad_parent_version_id)
.is_err());
} }
} }

View File

@@ -1,12 +1,12 @@
use crate::server::{ use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId, AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
VersionId,
}; };
use std::convert::TryInto;
use std::time::Duration; use std::time::Duration;
use uuid::Uuid; use uuid::Uuid;
mod crypto; mod crypto;
use crypto::{HistoryCiphertext, HistoryCleartext, Secret}; use crypto::{Ciphertext, Cleartext, Secret};
pub struct RemoteServer { pub struct RemoteServer {
origin: String, origin: String,
@@ -15,6 +15,12 @@ pub struct RemoteServer {
agent: ureq::Agent, agent: ureq::Agent,
} }
/// The content-type for history segments (opaque blobs of bytes)
const HISTORY_SEGMENT_CONTENT_TYPE: &str = "application/vnd.taskchampion.history-segment";
/// The content-type for snapshots (opaque blobs of bytes)
const SNAPSHOT_CONTENT_TYPE: &str = "application/vnd.taskchampion.snapshot";
/// A RemoeServer communicates with a remote server over HTTP (such as with /// A RemoeServer communicates with a remote server over HTTP (such as with
/// taskchampion-sync-server). /// taskchampion-sync-server).
impl RemoteServer { impl RemoteServer {
@@ -67,20 +73,17 @@ impl Server for RemoteServer {
"{}/v1/client/add-version/{}", "{}/v1/client/add-version/{}",
self.origin, parent_version_id self.origin, parent_version_id
); );
let history_cleartext = HistoryCleartext { let cleartext = Cleartext {
parent_version_id, version_id: parent_version_id,
history_segment, payload: history_segment,
}; };
let history_ciphertext = history_cleartext.seal(&self.encryption_secret)?; let ciphertext = cleartext.seal(&self.encryption_secret)?;
match self match self
.agent .agent
.post(&url) .post(&url)
.set( .set("Content-Type", HISTORY_SEGMENT_CONTENT_TYPE)
"Content-Type",
"application/vnd.taskchampion.history-segment",
)
.set("X-Client-Key", &self.client_key.to_string()) .set("X-Client-Key", &self.client_key.to_string())
.send_bytes(history_ciphertext.as_ref()) .send_bytes(ciphertext.as_ref())
{ {
Ok(resp) => { Ok(resp) => {
let version_id = get_uuid_header(&resp, "X-Version-Id")?; let version_id = get_uuid_header(&resp, "X-Version-Id")?;
@@ -117,10 +120,10 @@ impl Server for RemoteServer {
Ok(resp) => { Ok(resp) => {
let parent_version_id = get_uuid_header(&resp, "X-Parent-Version-Id")?; let parent_version_id = get_uuid_header(&resp, "X-Parent-Version-Id")?;
let version_id = get_uuid_header(&resp, "X-Version-Id")?; let version_id = get_uuid_header(&resp, "X-Version-Id")?;
let history_ciphertext: HistoryCiphertext = resp.try_into()?; let ciphertext = Ciphertext::from_resp(resp, HISTORY_SEGMENT_CONTENT_TYPE)?;
let history_segment = history_ciphertext let history_segment = ciphertext
.open(&self.encryption_secret, parent_version_id)? .open(&self.encryption_secret, parent_version_id)?
.history_segment; .payload;
Ok(GetVersionResult::Version { Ok(GetVersionResult::Version {
version_id, version_id,
parent_version_id, parent_version_id,
@@ -133,4 +136,20 @@ impl Server for RemoteServer {
Err(err) => Err(err.into()), Err(err) => Err(err.into()),
} }
} }
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()> {
let url = format!("{}/v1/client/add-snapshot/{}", self.origin, version_id);
let cleartext = Cleartext {
version_id,
payload: snapshot,
};
let ciphertext = cleartext.seal(&self.encryption_secret)?;
Ok(self
.agent
.post(&url)
.set("Content-Type", SNAPSHOT_CONTENT_TYPE)
.set("X-Client-Key", &self.client_key.to_string())
.send_bytes(ciphertext.as_ref())
.map(|_| ())?)
}
} }

View File

@@ -1,8 +1,9 @@
use crate::server::{ use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId, AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
NIL_VERSION_ID, VersionId, NIL_VERSION_ID,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use uuid::Uuid; use uuid::Uuid;
struct Version { struct Version {
@@ -11,19 +12,44 @@ struct Version {
history_segment: HistorySegment, history_segment: HistorySegment,
} }
pub(crate) struct TestServer { #[derive(Clone)]
/// TestServer implements the Server trait with a test implementation.
pub(crate) struct TestServer(Arc<Mutex<Inner>>);
pub(crate) struct Inner {
latest_version_id: VersionId, latest_version_id: VersionId,
// NOTE: indexed by parent_version_id! // NOTE: indexed by parent_version_id!
versions: HashMap<VersionId, Version>, versions: HashMap<VersionId, Version>,
snapshot_urgency: SnapshotUrgency,
snapshot: Option<(VersionId, Snapshot)>,
} }
impl TestServer { impl TestServer {
/// A test server has no notion of clients, signatures, encryption, etc. /// A test server has no notion of clients, signatures, encryption, etc.
pub fn new() -> TestServer { pub(crate) fn new() -> TestServer {
TestServer { TestServer(Arc::new(Mutex::new(Inner {
latest_version_id: NIL_VERSION_ID, latest_version_id: NIL_VERSION_ID,
versions: HashMap::new(), versions: HashMap::new(),
} snapshot_urgency: SnapshotUrgency::None,
snapshot: None,
})))
}
/// Get a boxed Server implementation referring to this TestServer
pub(crate) fn server(&self) -> Box<dyn Server> {
Box::new(self.clone())
}
pub(crate) fn set_snapshot_urgency(&self, urgency: SnapshotUrgency) {
let mut inner = self.0.lock().unwrap();
inner.snapshot_urgency = urgency;
}
/// Get the latest snapshot added to this server
pub(crate) fn snapshot(&self) -> Option<(VersionId, Snapshot)> {
let inner = self.0.lock().unwrap();
inner.snapshot.as_ref().cloned()
} }
} }
@@ -35,23 +61,24 @@ impl Server for TestServer {
parent_version_id: VersionId, parent_version_id: VersionId,
history_segment: HistorySegment, history_segment: HistorySegment,
) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> { ) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
let mut inner = self.0.lock().unwrap();
// no client lookup // no client lookup
// no signature validation // no signature validation
// check the parent_version_id for linearity // check the parent_version_id for linearity
if self.latest_version_id != NIL_VERSION_ID { if inner.latest_version_id != NIL_VERSION_ID && parent_version_id != inner.latest_version_id
if parent_version_id != self.latest_version_id { {
return Ok(( return Ok((
AddVersionResult::ExpectedParentVersion(self.latest_version_id), AddVersionResult::ExpectedParentVersion(inner.latest_version_id),
SnapshotUrgency::None, SnapshotUrgency::None,
)); ));
}
} }
// invent a new ID for this version // invent a new ID for this version
let version_id = Uuid::new_v4(); let version_id = Uuid::new_v4();
self.versions.insert( inner.versions.insert(
parent_version_id, parent_version_id,
Version { Version {
version_id, version_id,
@@ -59,9 +86,12 @@ impl Server for TestServer {
history_segment, history_segment,
}, },
); );
self.latest_version_id = version_id; inner.latest_version_id = version_id;
Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None)) // reply with the configured urgency and reset it to None
let urgency = inner.snapshot_urgency;
inner.snapshot_urgency = SnapshotUrgency::None;
Ok((AddVersionResult::Ok(version_id), urgency))
} }
/// Get a vector of all versions after `since_version` /// Get a vector of all versions after `since_version`
@@ -69,7 +99,9 @@ impl Server for TestServer {
&mut self, &mut self,
parent_version_id: VersionId, parent_version_id: VersionId,
) -> anyhow::Result<GetVersionResult> { ) -> anyhow::Result<GetVersionResult> {
if let Some(version) = self.versions.get(&parent_version_id) { let inner = self.0.lock().unwrap();
if let Some(version) = inner.versions.get(&parent_version_id) {
Ok(GetVersionResult::Version { Ok(GetVersionResult::Version {
version_id: version.version_id, version_id: version.version_id,
parent_version_id: version.parent_version_id, parent_version_id: version.parent_version_id,
@@ -79,4 +111,12 @@ impl Server for TestServer {
Ok(GetVersionResult::NoSuchVersion) Ok(GetVersionResult::NoSuchVersion)
} }
} }
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()> {
let mut inner = self.0.lock().unwrap();
// test implementation -- does not perform any validation
inner.snapshot = Some((version_id, snapshot));
Ok(())
}
} }

View File

@@ -10,6 +10,10 @@ pub const NIL_VERSION_ID: VersionId = Uuid::nil();
/// data is pre-encoded, and from the protocol level appears as a sequence of bytes. /// data is pre-encoded, and from the protocol level appears as a sequence of bytes.
pub type HistorySegment = Vec<u8>; pub type HistorySegment = Vec<u8>;
/// A snapshot of the state of the task database. This is encoded by the taskdb implementation
/// and treated as a sequence of bytes by the server implementation.
pub type Snapshot = Vec<u8>;
/// AddVersionResult is the response type from [`crate::server::Server::add_version`]. /// AddVersionResult is the response type from [`crate::server::Server::add_version`].
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum AddVersionResult { pub enum AddVersionResult {
@@ -58,4 +62,7 @@ pub trait Server {
&mut self, &mut self,
parent_version_id: VersionId, parent_version_id: VersionId,
) -> anyhow::Result<GetVersionResult>; ) -> anyhow::Result<GetVersionResult>;
/// Add a snapshot on the server
fn add_snapshot(&mut self, version_id: VersionId, snapshot: Snapshot) -> anyhow::Result<()>;
} }

View File

@@ -1,5 +1,5 @@
use super::ops; use super::{ops, snapshot};
use crate::server::{AddVersionResult, GetVersionResult, Server}; use crate::server::{AddVersionResult, GetVersionResult, Server, SnapshotUrgency};
use crate::storage::{Operation, StorageTxn}; use crate::storage::{Operation, StorageTxn};
use log::{info, trace, warn}; use log::{info, trace, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -57,12 +57,19 @@ pub(super) fn sync(server: &mut Box<dyn Server>, txn: &mut dyn StorageTxn) -> an
let new_version = Version { operations }; let new_version = Version { operations };
let history_segment = serde_json::to_string(&new_version).unwrap().into(); let history_segment = serde_json::to_string(&new_version).unwrap().into();
info!("sending new version to server"); info!("sending new version to server");
let (res, _snapshot_urgency) = server.add_version(base_version_id, history_segment)?; let (res, snapshot_urgency) = server.add_version(base_version_id, history_segment)?;
match res { match res {
AddVersionResult::Ok(new_version_id) => { AddVersionResult::Ok(new_version_id) => {
info!("version {:?} received by server", new_version_id); info!("version {:?} received by server", new_version_id);
txn.set_base_version(new_version_id)?; txn.set_base_version(new_version_id)?;
txn.set_operations(vec![])?; txn.set_operations(vec![])?;
// TODO: configurable urgency levels
if snapshot_urgency != SnapshotUrgency::None {
let snapshot = snapshot::make_snapshot(txn)?;
server.add_snapshot(new_version_id, snapshot)?;
}
break; break;
} }
AddVersionResult::ExpectedParentVersion(parent_version_id) => { AddVersionResult::ExpectedParentVersion(parent_version_id) => {
@@ -150,8 +157,9 @@ mod test {
use super::*; use super::*;
use crate::server::test::TestServer; use crate::server::test::TestServer;
use crate::storage::{InMemoryStorage, Operation}; use crate::storage::{InMemoryStorage, Operation};
use crate::taskdb::TaskDb; use crate::taskdb::{snapshot::SnapshotTasks, TaskDb};
use chrono::Utc; use chrono::Utc;
use pretty_assertions::assert_eq;
use uuid::Uuid; use uuid::Uuid;
fn newdb() -> TaskDb { fn newdb() -> TaskDb {
@@ -160,7 +168,7 @@ mod test {
#[test] #[test]
fn test_sync() -> anyhow::Result<()> { fn test_sync() -> anyhow::Result<()> {
let mut server: Box<dyn Server> = Box::new(TestServer::new()); let mut server: Box<dyn Server> = TestServer::new().server();
let mut db1 = newdb(); let mut db1 = newdb();
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut()).unwrap();
@@ -222,7 +230,7 @@ mod test {
#[test] #[test]
fn test_sync_create_delete() -> anyhow::Result<()> { fn test_sync_create_delete() -> anyhow::Result<()> {
let mut server: Box<dyn Server> = Box::new(TestServer::new()); let mut server: Box<dyn Server> = TestServer::new().server();
let mut db1 = newdb(); let mut db1 = newdb();
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap(); sync(&mut server, db1.storage.txn()?.as_mut()).unwrap();
@@ -274,4 +282,37 @@ mod test {
Ok(()) Ok(())
} }
#[test]
fn test_sync_adds_snapshot() -> anyhow::Result<()> {
let test_server = TestServer::new();
let mut server: Box<dyn Server> = test_server.server();
let mut db1 = newdb();
let uuid = Uuid::new_v4();
db1.apply(Operation::Create { uuid }).unwrap();
db1.apply(Operation::Update {
uuid,
property: "title".into(),
value: Some("my first task".into()),
timestamp: Utc::now(),
})
.unwrap();
test_server.set_snapshot_urgency(SnapshotUrgency::High);
sync(&mut server, db1.storage.txn()?.as_mut()).unwrap();
// assert that a snapshot was added
let base_version = db1.storage.txn()?.base_version()?;
let (v, s) = test_server
.snapshot()
.ok_or_else(|| anyhow::anyhow!("no snapshot"))?;
assert_eq!(v, base_version);
let tasks = SnapshotTasks::decode(&s)?.into_inner();
assert_eq!(tasks[0].0, uuid);
Ok(())
}
} }