From d1da8eee52d51f8752fbc92362f7963417a3b4a1 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Thu, 30 Sep 2021 02:45:59 +0000 Subject: [PATCH] Add add_snapshot API method --- sync-server/src/api/add_snapshot.rs | 191 +++++++++++++++++ sync-server/src/api/mod.rs | 2 + sync-server/src/lib.rs | 7 + sync-server/src/server.rs | 308 +++++++++++++++++++++++++++- 4 files changed, 507 insertions(+), 1 deletion(-) create mode 100644 sync-server/src/api/add_snapshot.rs diff --git a/sync-server/src/api/add_snapshot.rs b/sync-server/src/api/add_snapshot.rs new file mode 100644 index 000000000..49de42cf5 --- /dev/null +++ b/sync-server/src/api/add_snapshot.rs @@ -0,0 +1,191 @@ +use crate::api::{client_key_header, failure_to_ise, ServerState, SNAPSHOT_CONTENT_TYPE}; +use crate::server::{add_snapshot, VersionId, NO_VERSION_ID}; +use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result}; +use futures::StreamExt; + +/// Max snapshot size: 100MB +const MAX_SIZE: usize = 100 * 1024 * 1024; + +/// Add a new snapshot, after checking prerequisites. The snapshot should be transmitted in the +/// request entity body and must have content-type `application/vnd.taskchampion.snapshot`. The +/// content can be encoded in any of the formats supported by actix-web. +/// +/// On success, the response is a 200 OK. Even in a 200 OK, the snapshot may not appear in a +/// subsequent `GetSnapshot` call. +/// +/// Returns other 4xx or 5xx responses on other errors. +#[post("/v1/client/add-snapshot/{version_id}")] +pub(crate) async fn service( + req: HttpRequest, + server_state: web::Data, + web::Path((version_id,)): web::Path<(VersionId,)>, + mut payload: web::Payload, +) -> Result { + // check content-type + if req.content_type() != SNAPSHOT_CONTENT_TYPE { + return Err(error::ErrorBadRequest("Bad content-type")); + } + + let client_key = client_key_header(&req)?; + + // read the body in its entirety + let mut body = web::BytesMut::new(); + while let Some(chunk) = payload.next().await { + let chunk = chunk?; + // limit max size of in-memory payload + if (body.len() + chunk.len()) > MAX_SIZE { + return Err(error::ErrorBadRequest("Snapshot over maximum allowed size")); + } + body.extend_from_slice(&chunk); + } + + if body.is_empty() { + return Err(error::ErrorBadRequest("No snapshot supplied")); + } + + // note that we do not open the transaction until the body has been read + // completely, to avoid blocking other storage access while that data is + // in transit. + let mut txn = server_state.txn().map_err(failure_to_ise)?; + + // get, or create, the client + let client = match txn.get_client(client_key).map_err(failure_to_ise)? { + Some(client) => client, + None => { + txn.new_client(client_key, NO_VERSION_ID) + .map_err(failure_to_ise)?; + txn.get_client(client_key).map_err(failure_to_ise)?.unwrap() + } + }; + + add_snapshot(txn, client_key, client, version_id, body.to_vec()).map_err(failure_to_ise)?; + Ok(HttpResponse::Ok().body("")) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::storage::{InMemoryStorage, Storage}; + use crate::Server; + use actix_web::{http::StatusCode, test, App}; + use pretty_assertions::assert_eq; + use uuid::Uuid; + + #[actix_rt::test] + async fn test_success() -> anyhow::Result<()> { + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let storage: Box = Box::new(InMemoryStorage::new()); + + // set up the storage contents.. + { + let mut txn = storage.txn().unwrap(); + txn.new_client(client_key, version_id).unwrap(); + txn.add_version(client_key, version_id, NO_VERSION_ID, vec![])?; + } + + let server = Server::new(storage); + let mut app = test::init_service(App::new().service(server.service())).await; + + let uri = format!("/v1/client/add-snapshot/{}", version_id); + let req = test::TestRequest::post() + .uri(&uri) + .header("Content-Type", "application/vnd.taskchampion.snapshot") + .header("X-Client-Key", client_key.to_string()) + .set_payload(b"abcd".to_vec()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::OK); + + // read back that snapshot + let uri = "/v1/client/snapshot"; + let req = test::TestRequest::get() + .uri(uri) + .header("X-Client-Key", client_key.to_string()) + .to_request(); + let mut resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::OK); + + use futures::StreamExt; + let (bytes, _) = resp.take_body().into_future().await; + assert_eq!(bytes.unwrap().unwrap().as_ref(), b"abcd"); + + Ok(()) + } + + #[actix_rt::test] + async fn test_not_added_200() { + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let storage: Box = Box::new(InMemoryStorage::new()); + + // set up the storage contents.. + { + let mut txn = storage.txn().unwrap(); + txn.new_client(client_key, NO_VERSION_ID).unwrap(); + } + + let server = Server::new(storage); + let mut app = test::init_service(App::new().service(server.service())).await; + + // add a snapshot for a nonexistent version + let uri = format!("/v1/client/add-snapshot/{}", version_id); + let req = test::TestRequest::post() + .uri(&uri) + .header("Content-Type", "application/vnd.taskchampion.snapshot") + .header("X-Client-Key", client_key.to_string()) + .set_payload(b"abcd".to_vec()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::OK); + + // read back, seeing no snapshot + let uri = "/v1/client/snapshot"; + let req = test::TestRequest::get() + .uri(uri) + .header("X-Client-Key", client_key.to_string()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + #[actix_rt::test] + async fn test_bad_content_type() { + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let storage: Box = Box::new(InMemoryStorage::new()); + let server = Server::new(storage); + let mut app = test::init_service(App::new().service(server.service())).await; + + let uri = format!("/v1/client/add-snapshot/{}", version_id); + let req = test::TestRequest::post() + .uri(&uri) + .header("Content-Type", "not/correct") + .header("X-Client-Key", client_key.to_string()) + .set_payload(b"abcd".to_vec()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } + + #[actix_rt::test] + async fn test_empty_body() { + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + let storage: Box = Box::new(InMemoryStorage::new()); + let server = Server::new(storage); + let mut app = test::init_service(App::new().service(server.service())).await; + + let uri = format!("/v1/client/add-snapshot/{}", version_id); + let req = test::TestRequest::post() + .uri(&uri) + .header( + "Content-Type", + "application/vnd.taskchampion.history-segment", + ) + .header("X-Client-Key", client_key.to_string()) + .to_request(); + let resp = test::call_service(&mut app, req).await; + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } +} diff --git a/sync-server/src/api/mod.rs b/sync-server/src/api/mod.rs index 87666a35a..1c8dd48d5 100644 --- a/sync-server/src/api/mod.rs +++ b/sync-server/src/api/mod.rs @@ -3,6 +3,7 @@ use crate::storage::Storage; use actix_web::{error, http::StatusCode, web, HttpRequest, Result, Scope}; use std::sync::Arc; +mod add_snapshot; mod add_version; mod get_child_version; mod get_snapshot; @@ -31,6 +32,7 @@ pub(crate) fn api_scope() -> Scope { .service(get_child_version::service) .service(add_version::service) .service(get_snapshot::service) + .service(add_snapshot::service) } /// Convert a failure::Error to an Actix ISE diff --git a/sync-server/src/lib.rs b/sync-server/src/lib.rs index c219f2e1b..c817da29e 100644 --- a/sync-server/src/lib.rs +++ b/sync-server/src/lib.rs @@ -35,3 +35,10 @@ impl Server { .service(api_scope()) } } + +#[cfg(test)] +mod test { + pub(crate) fn init_logging() { + let _ = env_logger::builder().is_test(true).try_init(); + } +} diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index 0ed3f4e7b..3a4455847 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -1,12 +1,18 @@ //! This module implements the core logic of the server: handling transactions, upholding //! invariants, and so on. This does not implement the HTTP-specific portions; those //! are in [`crate::api`]. See the protocol documentation for details. -use crate::storage::{Client, StorageTxn}; +use crate::storage::{Client, Snapshot, StorageTxn}; +use chrono::Utc; use uuid::Uuid; /// The distinguished value for "no version" pub const NO_VERSION_ID: VersionId = Uuid::nil(); +/// Number of versions to search back from the latest to find the +/// version for a newly-added snapshot. Snapshots for versions older +/// than this will be rejected. +const SNAPSHOT_SEARCH_LEN: i32 = 5; + pub(crate) type HistorySegment = Vec; pub(crate) type ClientKey = Uuid; pub(crate) type VersionId = Uuid; @@ -105,6 +111,90 @@ pub(crate) fn add_version<'a>( Ok(AddVersionResult::Ok(version_id)) } +/// Implementation of the AddSnapshot protocol transaction +pub(crate) fn add_snapshot<'a>( + mut txn: Box, + client_key: ClientKey, + client: Client, + version_id: VersionId, + data: Vec, +) -> anyhow::Result<()> { + log::debug!( + "add_snapshot(client_key: {}, version_id: {})", + client_key, + version_id, + ); + + // NOTE: if the snapshot is rejected, this function logs about it and returns + // Ok(()), as there's no reason to report an errot to the client / user. + + let last_snapshot = client.snapshot.map(|snap| snap.version_id); + if Some(version_id) == last_snapshot { + log::debug!( + "rejecting snapshot for version {}: already exists", + version_id + ); + return Ok(()); + } + + // look for this version in the history of this client, starting at the latest version, and + // only iterating for a limited number of versions. + let mut search_len = SNAPSHOT_SEARCH_LEN; + let mut vid = client.latest_version_id; + + loop { + if vid == version_id && version_id != NO_VERSION_ID { + // the new snapshot is for a recent version, so proceed + break; + } + + if Some(vid) == last_snapshot { + // the new snapshot is older than the last snapshot, so ignore it + log::debug!( + "rejecting snapshot for version {}: newer snapshot already exists or no such version", + version_id + ); + return Ok(()); + } + + search_len -= 1; + if search_len <= 0 || vid == NO_VERSION_ID { + // this should not happen in normal operation, so warn about it + log::warn!( + "rejecting snapshot for version {}: version is too old or no such version", + version_id + ); + return Ok(()); + } + + // get the parent version ID + if let Some(parent) = txn.get_version(client_key, vid)? { + vid = parent.parent_version_id; + } else { + // this version does not exist; "this should not happen" but if it does, + // we don't need a snapshot earlier than the missing version. + log::warn!( + "rejecting snapshot for version {}: newer versions have already been deleted", + version_id + ); + return Ok(()); + } + } + + log::warn!("accepting snapshot for version {}", version_id); + txn.set_snapshot( + client_key, + Snapshot { + version_id, + timestamp: Utc::now(), + versions_since: 0, + }, + data, + )?; + txn.commit()?; + Ok(()) +} + /// Implementation of the GetSnapshot protocol transaction pub(crate) fn get_snapshot<'a>( mut txn: Box, @@ -123,11 +213,14 @@ pub(crate) fn get_snapshot<'a>( mod test { use super::*; use crate::storage::{InMemoryStorage, Snapshot, Storage}; + use crate::test::init_logging; use chrono::{TimeZone, Utc}; use pretty_assertions::assert_eq; #[test] fn gcv_not_found_initial() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -144,6 +237,8 @@ mod test { #[test] fn gcv_gone_initial() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -170,6 +265,8 @@ mod test { #[test] fn gcv_not_found_up_to_date() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -189,6 +286,8 @@ mod test { #[test] fn gcv_gone() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -208,6 +307,8 @@ mod test { #[test] fn gcv_found() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -237,6 +338,8 @@ mod test { #[test] fn av_conflict() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -265,6 +368,8 @@ mod test { } fn test_av_success(latest_version_id_nil: bool) -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -317,8 +422,207 @@ mod test { test_av_success(false) } + #[test] + fn add_snapshot_success_latest() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let version_id = Uuid::new_v4(); + + // set up a task DB with one version in it + txn.new_client(client_key, version_id)?; + txn.add_version(client_key, version_id, NO_VERSION_ID, vec![])?; + + // add a snapshot for that version + let client = txn.get_client(client_key)?.unwrap(); + add_snapshot(txn, client_key, client, version_id, vec![1, 2, 3])?; + + // verify the snapshot + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + let snapshot = client.snapshot.unwrap(); + assert_eq!(snapshot.version_id, version_id); + assert_eq!(snapshot.versions_since, 0); + assert_eq!( + txn.get_snapshot_data(client_key, version_id).unwrap(), + Some(vec![1, 2, 3]) + ); + + Ok(()) + } + + #[test] + fn add_snapshot_success_older() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let version_id_1 = Uuid::new_v4(); + let version_id_2 = Uuid::new_v4(); + + // set up a task DB with two versions in it + txn.new_client(client_key, version_id_2)?; + txn.add_version(client_key, version_id_1, NO_VERSION_ID, vec![])?; + txn.add_version(client_key, version_id_2, version_id_1, vec![])?; + + // add a snapshot for version 1 + let client = txn.get_client(client_key)?.unwrap(); + add_snapshot(txn, client_key, client, version_id_1, vec![1, 2, 3])?; + + // verify the snapshot + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + let snapshot = client.snapshot.unwrap(); + assert_eq!(snapshot.version_id, version_id_1); + assert_eq!(snapshot.versions_since, 0); + assert_eq!( + txn.get_snapshot_data(client_key, version_id_1).unwrap(), + Some(vec![1, 2, 3]) + ); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_no_such() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let version_id_1 = Uuid::new_v4(); + let version_id_2 = Uuid::new_v4(); + + // set up a task DB with two versions in it + txn.new_client(client_key, version_id_2)?; + txn.add_version(client_key, version_id_1, NO_VERSION_ID, vec![])?; + txn.add_version(client_key, version_id_2, version_id_1, vec![])?; + + // add a snapshot for unknown version + let client = txn.get_client(client_key)?.unwrap(); + let version_id_unk = Uuid::new_v4(); + add_snapshot(txn, client_key, client, version_id_unk, vec![1, 2, 3])?; + + // verify the snapshot does not exist + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + assert!(client.snapshot.is_none()); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_too_old() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let mut version_id = Uuid::new_v4(); + let mut parent_version_id = Uuid::nil(); + let mut version_ids = vec![]; + + // set up a task DB with 10 versions in it (oldest to newest) + txn.new_client(client_key, Uuid::nil())?; + for _ in 0..10 { + txn.add_version(client_key, version_id, parent_version_id, vec![])?; + version_ids.push(version_id); + parent_version_id = version_id; + version_id = Uuid::new_v4(); + } + + // add a snapshot for the earliest of those + let client = txn.get_client(client_key)?.unwrap(); + add_snapshot(txn, client_key, client, version_ids[0], vec![1, 2, 3])?; + + // verify the snapshot does not exist + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + assert!(client.snapshot.is_none()); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_newer_exists() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + let mut version_id = Uuid::new_v4(); + let mut parent_version_id = Uuid::nil(); + let mut version_ids = vec![]; + + // set up a task DB with 5 versions in it (oldest to newest) and a snapshot of the middle + // one + txn.new_client(client_key, Uuid::nil())?; + for _ in 0..5 { + txn.add_version(client_key, version_id, parent_version_id, vec![])?; + version_ids.push(version_id); + parent_version_id = version_id; + version_id = Uuid::new_v4(); + } + txn.set_snapshot( + client_key, + Snapshot { + version_id: version_ids[2], + versions_since: 2, + timestamp: Utc.ymd(2001, 9, 9).and_hms(1, 46, 40), + }, + vec![1, 2, 3], + )?; + + // add a snapshot for the earliest of those + let client = txn.get_client(client_key)?.unwrap(); + add_snapshot(txn, client_key, client, version_ids[0], vec![9, 9, 9])?; + + println!("{:?}", version_ids); + + // verify the snapshot was not replaced + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + let snapshot = client.snapshot.unwrap(); + assert_eq!(snapshot.version_id, version_ids[2]); + assert_eq!(snapshot.versions_since, 2); + assert_eq!( + txn.get_snapshot_data(client_key, version_ids[2]).unwrap(), + Some(vec![1, 2, 3]) + ); + + Ok(()) + } + + #[test] + fn add_snapshot_fails_nil_version() -> anyhow::Result<()> { + init_logging(); + + let storage = InMemoryStorage::new(); + let mut txn = storage.txn()?; + let client_key = Uuid::new_v4(); + + // just set up the client + txn.new_client(client_key, NO_VERSION_ID)?; + + // add a snapshot for the nil version + let client = txn.get_client(client_key)?.unwrap(); + add_snapshot(txn, client_key, client, NO_VERSION_ID, vec![9, 9, 9])?; + + // verify the snapshot does not exist + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + assert!(client.snapshot.is_none()); + + Ok(()) + } + #[test] fn get_snapshot_found() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); @@ -347,6 +651,8 @@ mod test { #[test] fn get_snapshot_not_found() -> anyhow::Result<()> { + init_logging(); + let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4();