From 7bb6ea6865e101041ae0d154a1e9f2ae970d8bc2 Mon Sep 17 00:00:00 2001 From: "Dustin J. Mitchell" Date: Sun, 3 Oct 2021 02:14:36 +0000 Subject: [PATCH] Request snapshots in AddVersion --- sync-server/src/api/add_version.rs | 42 +++- sync-server/src/api/mod.rs | 3 + sync-server/src/server.rs | 361 +++++++++++++++++++++++------ 3 files changed, 328 insertions(+), 78 deletions(-) diff --git a/sync-server/src/api/add_version.rs b/sync-server/src/api/add_version.rs index c6d7314d0..92d86ef52 100644 --- a/sync-server/src/api/add_version.rs +++ b/sync-server/src/api/add_version.rs @@ -1,8 +1,8 @@ use crate::api::{ client_key_header, failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, - PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER, + PARENT_VERSION_ID_HEADER, SNAPSHOT_REQUEST_HEADER, VERSION_ID_HEADER, }; -use crate::server::{add_version, AddVersionResult, VersionId, NO_VERSION_ID}; +use crate::server::{add_version, AddVersionResult, SnapshotUrgency, VersionId, NO_VERSION_ID}; use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result}; use futures::StreamExt; @@ -18,6 +18,9 @@ const MAX_SIZE: usize = 100 * 1024 * 1024; /// the version cannot be added due to a conflict, the response is a 409 CONFLICT with the expected /// parent version ID in the `X-Parent-Version-Id` header. /// +/// If included, a snapshot request appears in the `X-Snapshot-Request` header with value +/// `urgency=low` or `urgency=high`. +/// /// Returns other 4xx or 5xx responses on other errors. #[post("/v1/client/add-version/{parent_version_id}")] pub(crate) async fn service( @@ -63,15 +66,30 @@ pub(crate) async fn service( } }; - let result = add_version(txn, client_key, client, parent_version_id, body.to_vec()) - .map_err(failure_to_ise)?; + let (result, snap_urgency) = + add_version(txn, client_key, client, parent_version_id, body.to_vec()) + .map_err(failure_to_ise)?; + Ok(match result { - AddVersionResult::Ok(version_id) => HttpResponse::Ok() - .header(VERSION_ID_HEADER, version_id.to_string()) - .body(""), - AddVersionResult::ExpectedParentVersion(parent_version_id) => HttpResponse::Conflict() - .header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string()) - .body(""), + AddVersionResult::Ok(version_id) => { + let mut rb = HttpResponse::Ok(); + rb.header(VERSION_ID_HEADER, version_id.to_string()); + match snap_urgency { + SnapshotUrgency::None => {} + SnapshotUrgency::Low => { + rb.header(SNAPSHOT_REQUEST_HEADER, "urgency=low"); + } + SnapshotUrgency::High => { + rb.header(SNAPSHOT_REQUEST_HEADER, "urgency=high"); + } + }; + rb.finish() + } + AddVersionResult::ExpectedParentVersion(parent_version_id) => { + let mut rb = HttpResponse::Conflict(); + rb.header(PARENT_VERSION_ID_HEADER, parent_version_id.to_string()); + rb.finish() + } }) } @@ -117,6 +135,10 @@ mod test { let new_version_id = resp.headers().get("X-Version-Id").unwrap(); assert!(new_version_id != &version_id.to_string()); + // Shapshot should be requested, since there is no existing snapshot + let snapshot_request = resp.headers().get("X-Snapshot-Request").unwrap(); + assert_eq!(snapshot_request, "urgency=high"); + assert_eq!(resp.headers().get("X-Parent-Version-Id"), None); } diff --git a/sync-server/src/api/mod.rs b/sync-server/src/api/mod.rs index 1c8dd48d5..b3157438a 100644 --- a/sync-server/src/api/mod.rs +++ b/sync-server/src/api/mod.rs @@ -24,6 +24,9 @@ pub(crate) const CLIENT_KEY_HEADER: &str = "X-Client-Key"; /// The header name for parent version ID pub(crate) const PARENT_VERSION_ID_HEADER: &str = "X-Parent-Version-Id"; +/// The header name for parent version ID +pub(crate) const SNAPSHOT_REQUEST_HEADER: &str = "X-Snapshot-Request"; + /// The type containing a reference to the Storage object in the Actix state. pub(crate) type ServerState = Arc; diff --git a/sync-server/src/server.rs b/sync-server/src/server.rs index 3a4455847..69a65a088 100644 --- a/sync-server/src/server.rs +++ b/sync-server/src/server.rs @@ -13,6 +13,12 @@ pub const NO_VERSION_ID: VersionId = Uuid::nil(); /// than this will be rejected. const SNAPSHOT_SEARCH_LEN: i32 = 5; +/// Maximum number of days between snapshots +const SNAPSHOT_DAYS: i64 = 14; + +/// Maximum number of versions between snapshots +const SNAPSHOT_VERSIONS: u32 = 30; + pub(crate) type HistorySegment = Vec; pub(crate) type ClientKey = Uuid; pub(crate) type VersionId = Uuid; @@ -75,6 +81,43 @@ pub(crate) enum AddVersionResult { ExpectedParentVersion(VersionId), } +/// Urgency of a snapshot for a client; used to create the `X-Snapshot-Request` header. +#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)] +pub(crate) enum SnapshotUrgency { + /// Don't need a snapshot right now. + None, + + /// A snapshot would be good, but can wait for other replicas to provide it. + Low, + + /// A snapshot is needed right now. + High, +} + +impl SnapshotUrgency { + /// Calculate the urgency for a snapshot based on its age in days + fn for_days(days: i64) -> Self { + if days >= SNAPSHOT_DAYS * 3 / 2 { + SnapshotUrgency::High + } else if days >= SNAPSHOT_DAYS { + SnapshotUrgency::Low + } else { + SnapshotUrgency::None + } + } + + /// Calculate the urgency for a snapshot based on its age in versions + fn for_versions_since(versions_since: u32) -> Self { + if versions_since >= SNAPSHOT_VERSIONS * 3 / 2 { + SnapshotUrgency::High + } else if versions_since >= SNAPSHOT_VERSIONS { + SnapshotUrgency::Low + } else { + SnapshotUrgency::None + } + } +} + /// Implementation of the AddVersion protocol transaction pub(crate) fn add_version<'a>( mut txn: Box, @@ -82,7 +125,7 @@ pub(crate) fn add_version<'a>( client: Client, parent_version_id: VersionId, history_segment: HistorySegment, -) -> anyhow::Result { +) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> { log::debug!( "add_version(client_key: {}, parent_version_id: {})", client_key, @@ -92,8 +135,9 @@ pub(crate) fn add_version<'a>( // check if this version is acceptable, under the protection of the transaction if client.latest_version_id != NO_VERSION_ID && parent_version_id != client.latest_version_id { log::debug!("add_version request rejected: mismatched latest_version_id"); - return Ok(AddVersionResult::ExpectedParentVersion( - client.latest_version_id, + return Ok(( + AddVersionResult::ExpectedParentVersion(client.latest_version_id), + SnapshotUrgency::None, )); } @@ -108,7 +152,26 @@ pub(crate) fn add_version<'a>( txn.add_version(client_key, version_id, parent_version_id, history_segment)?; txn.commit()?; - Ok(AddVersionResult::Ok(version_id)) + // calculate the urgency + let time_urgency = match client.snapshot { + None => SnapshotUrgency::High, + Some(Snapshot { timestamp, .. }) => { + SnapshotUrgency::for_days((Utc::now() - timestamp).num_days()) + } + }; + + println!("{:?}", client.snapshot); + let version_urgency = match client.snapshot { + None => SnapshotUrgency::High, + Some(Snapshot { versions_since, .. }) => { + SnapshotUrgency::for_versions_since(versions_since) + } + }; + + Ok(( + AddVersionResult::Ok(version_id), + std::cmp::max(time_urgency, version_urgency), + )) } /// Implementation of the AddSnapshot protocol transaction @@ -214,11 +277,44 @@ mod test { use super::*; use crate::storage::{InMemoryStorage, Snapshot, Storage}; use crate::test::init_logging; - use chrono::{TimeZone, Utc}; + use chrono::{Duration, TimeZone, Utc}; use pretty_assertions::assert_eq; #[test] - fn gcv_not_found_initial() -> anyhow::Result<()> { + fn snapshot_urgency_max() { + use SnapshotUrgency::*; + assert_eq!(std::cmp::max(None, None), None); + assert_eq!(std::cmp::max(None, Low), Low); + assert_eq!(std::cmp::max(None, High), High); + assert_eq!(std::cmp::max(Low, None), Low); + assert_eq!(std::cmp::max(Low, Low), Low); + assert_eq!(std::cmp::max(Low, High), High); + assert_eq!(std::cmp::max(High, None), High); + assert_eq!(std::cmp::max(High, Low), High); + assert_eq!(std::cmp::max(High, High), High); + } + + #[test] + fn snapshot_urgency_for_days() { + use SnapshotUrgency::*; + assert_eq!(SnapshotUrgency::for_days(0), None); + assert_eq!(SnapshotUrgency::for_days(SNAPSHOT_DAYS), Low); + assert_eq!(SnapshotUrgency::for_days(SNAPSHOT_DAYS * 2), High); + } + + #[test] + fn snapshot_urgency_for_versions_since() { + use SnapshotUrgency::*; + assert_eq!(SnapshotUrgency::for_versions_since(0), None); + assert_eq!(SnapshotUrgency::for_versions_since(SNAPSHOT_VERSIONS), Low); + assert_eq!( + SnapshotUrgency::for_versions_since(SNAPSHOT_VERSIONS * 2), + High + ); + } + + #[test] + fn get_child_version_not_found_initial() -> anyhow::Result<()> { init_logging(); let storage = InMemoryStorage::new(); @@ -236,7 +332,7 @@ mod test { } #[test] - fn gcv_gone_initial() -> anyhow::Result<()> { + fn get_child_version_gone_initial() -> anyhow::Result<()> { init_logging(); let storage = InMemoryStorage::new(); @@ -264,7 +360,7 @@ mod test { } #[test] - fn gcv_not_found_up_to_date() -> anyhow::Result<()> { + fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> { init_logging(); let storage = InMemoryStorage::new(); @@ -285,7 +381,7 @@ mod test { } #[test] - fn gcv_gone() -> anyhow::Result<()> { + fn get_child_version_gone() -> anyhow::Result<()> { init_logging(); let storage = InMemoryStorage::new(); @@ -306,7 +402,7 @@ mod test { } #[test] - fn gcv_found() -> anyhow::Result<()> { + fn get_child_version_found() -> anyhow::Result<()> { init_logging(); let storage = InMemoryStorage::new(); @@ -336,90 +432,221 @@ mod test { Ok(()) } - #[test] - fn av_conflict() -> anyhow::Result<()> { + /// Utility setup function for add_version tests + fn av_setup( + storage: &InMemoryStorage, + num_versions: u32, + snapshot_version: Option, + snapshot_days_ago: Option, + ) -> anyhow::Result<(Uuid, Vec)> { init_logging(); - - let storage = InMemoryStorage::new(); let mut txn = storage.txn()?; let client_key = Uuid::new_v4(); - let parent_version_id = Uuid::new_v4(); - let history_segment = b"abcd".to_vec(); - let existing_parent_version_id = Uuid::new_v4(); - let client = Client { - latest_version_id: existing_parent_version_id, - snapshot: None, - }; + let mut versions = vec![]; + let mut version_id = Uuid::nil(); + txn.new_client(client_key, Uuid::nil())?; + for vnum in 0..num_versions { + let parent_version_id = version_id; + version_id = Uuid::new_v4(); + versions.push(version_id); + txn.add_version( + client_key, + version_id, + parent_version_id, + vec![0, 0, vnum as u8], + )?; + if Some(vnum) == snapshot_version { + txn.set_snapshot( + client_key, + Snapshot { + version_id, + versions_since: 0, + timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)), + }, + vec![vnum as u8], + )?; + } + } + + Ok((client_key, versions)) + } + + /// Utility function to check the results of an add_version call + fn av_success_check( + storage: &InMemoryStorage, + client_key: Uuid, + existing_versions: &[Uuid], + result: (AddVersionResult, SnapshotUrgency), + expected_history: Vec, + expected_urgency: SnapshotUrgency, + ) -> anyhow::Result<()> { + if let AddVersionResult::Ok(new_version_id) = result.0 { + // check that it invented a new version ID + for v in existing_versions { + assert_ne!(&new_version_id, v); + } + + // verify that the storage was updated + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + assert_eq!(client.latest_version_id, new_version_id); + + let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil); + let version = txn.get_version(client_key, new_version_id)?.unwrap(); + assert_eq!(version.version_id, new_version_id); + assert_eq!(version.parent_version_id, parent_version_id); + assert_eq!(version.history_segment, expected_history); + } else { + panic!("did not get Ok from add_version: {:?}", result); + } + + assert_eq!(result.1, expected_urgency); + + Ok(()) + } + + #[test] + fn add_version_conflict() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let (client_key, versions) = av_setup(&storage, 3, None, None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + + // try to add a child of a version other than the latest assert_eq!( - add_version(txn, client_key, client, parent_version_id, history_segment)?, - AddVersionResult::ExpectedParentVersion(existing_parent_version_id) + add_version(txn, client_key, client, versions[1], vec![3, 6, 9])?.0, + AddVersionResult::ExpectedParentVersion(versions[2]) ); // verify that the storage wasn't updated txn = storage.txn()?; - assert_eq!(txn.get_client(client_key)?, None); assert_eq!( - txn.get_version_by_parent(client_key, parent_version_id)?, - None + txn.get_client(client_key)?.unwrap().latest_version_id, + versions[2] ); + assert_eq!(txn.get_version_by_parent(client_key, versions[2])?, None); Ok(()) } - fn test_av_success(latest_version_id_nil: bool) -> anyhow::Result<()> { - init_logging(); - + #[test] + fn add_version_with_existing_history() -> anyhow::Result<()> { let storage = InMemoryStorage::new(); - let mut txn = storage.txn()?; - let client_key = Uuid::new_v4(); - let parent_version_id = Uuid::new_v4(); - let history_segment = b"abcd".to_vec(); - let latest_version_id = if latest_version_id_nil { - Uuid::nil() - } else { - parent_version_id - }; + let (client_key, versions) = av_setup(&storage, 1, None, None)?; - txn.new_client(client_key, latest_version_id)?; + let mut txn = storage.txn()?; let client = txn.get_client(client_key)?.unwrap(); - let result = add_version( - txn, - client_key, - client, - parent_version_id, - history_segment.clone(), - )?; - if let AddVersionResult::Ok(new_version_id) = result { - // check that it invented a new version ID - assert!(new_version_id != parent_version_id); + let result = add_version(txn, client_key, client, versions[0], vec![3, 6, 9])?; - // verify that the storage was updated - txn = storage.txn()?; - let client = txn.get_client(client_key)?.unwrap(); - assert_eq!(client.latest_version_id, new_version_id); - let version = txn - .get_version_by_parent(client_key, parent_version_id)? - .unwrap(); - assert_eq!(version.version_id, new_version_id); - assert_eq!(version.parent_version_id, parent_version_id); - assert_eq!(version.history_segment, history_segment); - } else { - panic!("did not get Ok from add_version"); - } + av_success_check( + &storage, + client_key, + &versions, + result, + vec![3, 6, 9], + // urgency=high because there are no snapshots yet + SnapshotUrgency::High, + )?; Ok(()) } #[test] - fn av_success_with_existing_history() -> anyhow::Result<()> { - test_av_success(true) + fn add_version_with_no_history() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let (client_key, versions) = av_setup(&storage, 0, None, None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + + let parent_version_id = Uuid::nil(); + let result = add_version(txn, client_key, client, parent_version_id, vec![3, 6, 9])?; + + av_success_check( + &storage, + client_key, + &versions, + result, + vec![3, 6, 9], + // urgency=high because there are no snapshots yet + SnapshotUrgency::High, + )?; + + Ok(()) } #[test] - fn av_success_nil_latest_version_id() -> anyhow::Result<()> { - test_av_success(false) + fn add_version_success_recent_snapshot() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + let (client_key, versions) = av_setup(&storage, 1, Some(0), None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + + let result = add_version(txn, client_key, client, versions[0], vec![1, 2, 3])?; + + av_success_check( + &storage, + client_key, + &versions, + result, + vec![1, 2, 3], + // no snapshot request since the previous version has a snapshot + SnapshotUrgency::None, + )?; + + Ok(()) + } + + #[test] + fn add_version_success_aged_snapshot() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + // one snapshot, but it was 50 days ago + let (client_key, versions) = av_setup(&storage, 1, Some(0), Some(50))?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + + let result = add_version(txn, client_key, client, versions[0], vec![1, 2, 3])?; + + av_success_check( + &storage, + client_key, + &versions, + result, + vec![1, 2, 3], + // urgency=high due to days since the snapshot + SnapshotUrgency::High, + )?; + + Ok(()) + } + + #[test] + fn add_version_success_snapshot_many_versions_ago() -> anyhow::Result<()> { + let storage = InMemoryStorage::new(); + // one snapshot, but it was 50 versions ago + let (client_key, versions) = av_setup(&storage, 50, Some(0), None)?; + + let mut txn = storage.txn()?; + let client = txn.get_client(client_key)?.unwrap(); + + let result = add_version(txn, client_key, client, versions[49], vec![1, 2, 3])?; + + av_success_check( + &storage, + client_key, + &versions, + result, + vec![1, 2, 3], + // urgency=high due to number of versions since the snapshot + SnapshotUrgency::High, + )?; + + Ok(()) } #[test] @@ -580,8 +807,6 @@ mod test { let client = txn.get_client(client_key)?.unwrap(); add_snapshot(txn, client_key, client, version_ids[0], vec![9, 9, 9])?; - println!("{:?}", version_ids); - // verify the snapshot was not replaced let mut txn = storage.txn()?; let client = txn.get_client(client_key)?.unwrap();