Return SnapshotUrgency from AddVersion

This commit is contained in:
Dustin J. Mitchell
2021-10-07 19:35:06 -04:00
parent b8d892878c
commit bde19d7f07
5 changed files with 60 additions and 20 deletions

View File

@@ -1,5 +1,6 @@
use crate::server::{ use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NIL_VERSION_ID, AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId,
NIL_VERSION_ID,
}; };
use crate::storage::sqlite::StoredUuid; use crate::storage::sqlite::StoredUuid;
use anyhow::Context; use anyhow::Context;
@@ -116,14 +117,17 @@ impl Server for LocalServer {
&mut self, &mut self,
parent_version_id: VersionId, parent_version_id: VersionId,
history_segment: HistorySegment, history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult> { ) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
// no client lookup // no client lookup
// no signature validation // no signature validation
// check the parent_version_id for linearity // check the parent_version_id for linearity
let latest_version_id = self.get_latest_version_id()?; let latest_version_id = self.get_latest_version_id()?;
if latest_version_id != NIL_VERSION_ID && parent_version_id != latest_version_id { if latest_version_id != NIL_VERSION_ID && parent_version_id != latest_version_id {
return Ok(AddVersionResult::ExpectedParentVersion(latest_version_id)); return Ok((
AddVersionResult::ExpectedParentVersion(latest_version_id),
SnapshotUrgency::None,
));
} }
// invent a new ID for this version // invent a new ID for this version
@@ -136,7 +140,7 @@ impl Server for LocalServer {
})?; })?;
self.set_latest_version_id(version_id)?; self.set_latest_version_id(version_id)?;
Ok(AddVersionResult::Ok(version_id)) Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None))
} }
/// Get a vector of all versions after `since_version` /// Get a vector of all versions after `since_version`
@@ -176,7 +180,7 @@ mod test {
let tmp_dir = TempDir::new()?; let tmp_dir = TempDir::new()?;
let mut server = LocalServer::new(&tmp_dir.path())?; let mut server = LocalServer::new(&tmp_dir.path())?;
let history = b"1234".to_vec(); let history = b"1234".to_vec();
match server.add_version(NIL_VERSION_ID, history.clone())? { match server.add_version(NIL_VERSION_ID, history.clone())?.0 {
AddVersionResult::ExpectedParentVersion(_) => { AddVersionResult::ExpectedParentVersion(_) => {
panic!("should have accepted the version") panic!("should have accepted the version")
} }
@@ -204,7 +208,7 @@ mod test {
let parent_version_id = Uuid::new_v4() as VersionId; let parent_version_id = Uuid::new_v4() as VersionId;
// This is OK because the server has no latest_version_id yet // This is OK because the server has no latest_version_id yet
match server.add_version(parent_version_id, history.clone())? { match server.add_version(parent_version_id, history.clone())?.0 {
AddVersionResult::ExpectedParentVersion(_) => { AddVersionResult::ExpectedParentVersion(_) => {
panic!("should have accepted the version") panic!("should have accepted the version")
} }
@@ -232,14 +236,16 @@ mod test {
let parent_version_id = Uuid::new_v4() as VersionId; let parent_version_id = Uuid::new_v4() as VersionId;
// add a version // add a version
if let AddVersionResult::ExpectedParentVersion(_) = if let (AddVersionResult::ExpectedParentVersion(_), SnapshotUrgency::None) =
server.add_version(parent_version_id, history.clone())? server.add_version(parent_version_id, history.clone())?
{ {
panic!("should have accepted the version") panic!("should have accepted the version")
} }
// then add another, not based on that one // then add another, not based on that one
if let AddVersionResult::Ok(_) = server.add_version(parent_version_id, history.clone())? { if let (AddVersionResult::Ok(_), SnapshotUrgency::None) =
server.add_version(parent_version_id, history.clone())?
{
panic!("should not have accepted the version") panic!("should not have accepted the version")
} }

View File

@@ -1,4 +1,6 @@
use crate::server::{AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId}; use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId,
};
use std::convert::TryInto; use std::convert::TryInto;
use std::time::Duration; use std::time::Duration;
use uuid::Uuid; use uuid::Uuid;
@@ -43,12 +45,24 @@ fn get_uuid_header(resp: &ureq::Response, name: &str) -> anyhow::Result<Uuid> {
Ok(value) Ok(value)
} }
/// Read the X-Snapshot-Request header and return a SnapshotUrgency
fn get_snapshot_urgency(resp: &ureq::Response) -> SnapshotUrgency {
match resp.header("X-Snapshot-Request") {
None => SnapshotUrgency::None,
Some(hdr) => match hdr {
"urgency=low" => SnapshotUrgency::Low,
"urgency=high" => SnapshotUrgency::High,
_ => SnapshotUrgency::None,
},
}
}
impl Server for RemoteServer { impl Server for RemoteServer {
fn add_version( fn add_version(
&mut self, &mut self,
parent_version_id: VersionId, parent_version_id: VersionId,
history_segment: HistorySegment, history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult> { ) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
let url = format!( let url = format!(
"{}/v1/client/add-version/{}", "{}/v1/client/add-version/{}",
self.origin, parent_version_id self.origin, parent_version_id
@@ -70,11 +84,17 @@ impl Server for RemoteServer {
{ {
Ok(resp) => { Ok(resp) => {
let version_id = get_uuid_header(&resp, "X-Version-Id")?; let version_id = get_uuid_header(&resp, "X-Version-Id")?;
Ok(AddVersionResult::Ok(version_id)) Ok((
AddVersionResult::Ok(version_id),
get_snapshot_urgency(&resp),
))
} }
Err(ureq::Error::Status(status, resp)) if status == 409 => { Err(ureq::Error::Status(status, resp)) if status == 409 => {
let parent_version_id = get_uuid_header(&resp, "X-Parent-Version-Id")?; let parent_version_id = get_uuid_header(&resp, "X-Parent-Version-Id")?;
Ok(AddVersionResult::ExpectedParentVersion(parent_version_id)) Ok((
AddVersionResult::ExpectedParentVersion(parent_version_id),
SnapshotUrgency::None,
))
} }
Err(err) => Err(err.into()), Err(err) => Err(err.into()),
} }

View File

@@ -1,5 +1,6 @@
use crate::server::{ use crate::server::{
AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NIL_VERSION_ID, AddVersionResult, GetVersionResult, HistorySegment, Server, SnapshotUrgency, VersionId,
NIL_VERSION_ID,
}; };
use std::collections::HashMap; use std::collections::HashMap;
use uuid::Uuid; use uuid::Uuid;
@@ -33,15 +34,16 @@ impl Server for TestServer {
&mut self, &mut self,
parent_version_id: VersionId, parent_version_id: VersionId,
history_segment: HistorySegment, history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult> { ) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)> {
// no client lookup // no client lookup
// no signature validation // no signature validation
// check the parent_version_id for linearity // check the parent_version_id for linearity
if self.latest_version_id != NIL_VERSION_ID { if self.latest_version_id != NIL_VERSION_ID {
if parent_version_id != self.latest_version_id { if parent_version_id != self.latest_version_id {
return Ok(AddVersionResult::ExpectedParentVersion( return Ok((
self.latest_version_id, AddVersionResult::ExpectedParentVersion(self.latest_version_id),
SnapshotUrgency::None,
)); ));
} }
} }
@@ -59,7 +61,7 @@ impl Server for TestServer {
); );
self.latest_version_id = version_id; self.latest_version_id = version_id;
Ok(AddVersionResult::Ok(version_id)) Ok((AddVersionResult::Ok(version_id), SnapshotUrgency::None))
} }
/// Get a vector of all versions after `since_version` /// Get a vector of all versions after `since_version`

View File

@@ -10,7 +10,7 @@ pub const NIL_VERSION_ID: VersionId = Uuid::nil();
/// data is pre-encoded, and from the protocol level appears as a sequence of bytes. /// data is pre-encoded, and from the protocol level appears as a sequence of bytes.
pub type HistorySegment = Vec<u8>; pub type HistorySegment = Vec<u8>;
/// VersionAdd is the response type from [`crate::server::Server::add_version`]. /// AddVersionResult is the response type from [`crate::server::Server::add_version`].
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum AddVersionResult { pub enum AddVersionResult {
/// OK, version added with the given ID /// OK, version added with the given ID
@@ -19,6 +19,17 @@ pub enum AddVersionResult {
ExpectedParentVersion(VersionId), ExpectedParentVersion(VersionId),
} }
/// SnapshotUrgency indicates how much the server would like this replica to send a snapshot.
#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)]
pub enum SnapshotUrgency {
/// Don't need a snapshot right now.
None,
/// A snapshot would be good, but can wait for other replicas to provide it.
Low,
/// A snapshot is needed right now.
High,
}
/// A version as downloaded from the server /// A version as downloaded from the server
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum GetVersionResult { pub enum GetVersionResult {
@@ -40,7 +51,7 @@ pub trait Server {
&mut self, &mut self,
parent_version_id: VersionId, parent_version_id: VersionId,
history_segment: HistorySegment, history_segment: HistorySegment,
) -> anyhow::Result<AddVersionResult>; ) -> anyhow::Result<(AddVersionResult, SnapshotUrgency)>;
/// Get the version with the given parent VersionId /// Get the version with the given parent VersionId
fn get_child_version( fn get_child_version(

View File

@@ -57,7 +57,8 @@ pub(super) fn sync(server: &mut Box<dyn Server>, txn: &mut dyn StorageTxn) -> an
let new_version = Version { operations }; let new_version = Version { operations };
let history_segment = serde_json::to_string(&new_version).unwrap().into(); let history_segment = serde_json::to_string(&new_version).unwrap().into();
info!("sending new version to server"); info!("sending new version to server");
match server.add_version(base_version_id, history_segment)? { let (res, _snapshot_urgency) = server.add_version(base_version_id, history_segment)?;
match res {
AddVersionResult::Ok(new_version_id) => { AddVersionResult::Ok(new_version_id) => {
info!("version {:?} received by server", new_version_id); info!("version {:?} received by server", new_version_id);
txn.set_base_version(new_version_id)?; txn.set_base_version(new_version_id)?;