refactor sync server to use pluggable storage

..with a fixed implementation of the replica / server protocol logic.
There isn't much logic yet, and there's a lot of boilerplate to take
care of, so this looks a little lopsided, but I'm confident this is the
right structure for this code's future.
This commit is contained in:
Dustin J. Mitchell
2020-11-26 19:19:51 -05:00
parent 7472749fee
commit fb22b9686f
9 changed files with 319 additions and 259 deletions

View File

@@ -1,11 +1,13 @@
use crate::api::{
ServerState, HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER,
};
use crate::server::{AddVersionResult, ClientId, VersionId};
use actix_web::{
error, http::StatusCode, post, web, HttpMessage, HttpRequest, HttpResponse, Result,
failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER,
VERSION_ID_HEADER,
};
use crate::server::{AddVersionResult, ClientId, HistorySegment, VersionId, NO_VERSION_ID};
use crate::storage::{Client, StorageTxn};
use actix_web::{error, post, web, HttpMessage, HttpRequest, HttpResponse, Result};
use failure::Fallible;
use futures::StreamExt;
use taskchampion::Uuid;
/// Max history segment size: 100MB
const MAX_SIZE: usize = 100 * 1024 * 1024;
@@ -23,7 +25,7 @@ const MAX_SIZE: usize = 100 * 1024 * 1024;
#[post("/client/{client_id}/add-version/{parent_version_id}")]
pub(crate) async fn service(
req: HttpRequest,
data: web::Data<ServerState>,
server_state: web::Data<ServerState>,
web::Path((client_id, parent_version_id)): web::Path<(ClientId, VersionId)>,
mut payload: web::Payload,
) -> Result<HttpResponse> {
@@ -47,9 +49,18 @@ pub(crate) async fn service(
return Err(error::ErrorBadRequest("Empty body"));
}
let result = data
.add_version(client_id, parent_version_id, body.to_vec())
.map_err(|e| error::InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?;
// note that we do not open the transaction until the body has been read
// completely, to avoid blocking other storage access while that data is
// in transit.
let mut txn = server_state.txn().map_err(failure_to_ise)?;
let client = txn
.get_client(client_id)
.map_err(failure_to_ise)?
.ok_or_else(|| error::ErrorNotFound("no such client"))?;
let result = add_version(txn, client_id, client, parent_version_id, body.to_vec())
.map_err(failure_to_ise)?;
Ok(match result {
AddVersionResult::Ok(version_id) => HttpResponse::Ok()
.header(VERSION_ID_HEADER, version_id.to_string())
@@ -60,14 +71,37 @@ pub(crate) async fn service(
})
}
fn add_version<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
client_id: ClientId,
client: Client,
parent_version_id: VersionId,
history_segment: HistorySegment,
) -> Fallible<AddVersionResult> {
// check if this version is acceptable, under the protection of the transaction
if client.latest_version_id != NO_VERSION_ID && parent_version_id != client.latest_version_id {
return Ok(AddVersionResult::ExpectedParentVersion(
client.latest_version_id,
));
}
// invent a version ID
let version_id = Uuid::new_v4();
// update the DB
txn.add_version(client_id, version_id, parent_version_id, history_segment)?;
txn.set_client_latest_version_id(client_id, version_id)?;
txn.commit()?;
Ok(AddVersionResult::Ok(version_id))
}
#[cfg(test)]
mod test {
use super::*;
use crate::api::ServerState;
use crate::app_scope;
use crate::server::SyncServer;
use crate::test::TestServer;
use actix_web::{test, App};
use crate::storage::{InMemoryStorage, Storage};
use actix_web::{http::StatusCode, test, App};
use taskchampion::Uuid;
#[actix_rt::test]
@@ -75,13 +109,15 @@ mod test {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn SyncServer> = Box::new(TestServer {
expected_client_id: client_id,
av_expected_parent_version_id: parent_version_id,
av_expected_history_segment: b"abcd".to_vec(),
av_result: Some(AddVersionResult::Ok(version_id)),
..Default::default()
});
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = server_box.txn().unwrap();
txn.set_client_latest_version_id(client_id, Uuid::nil())
.unwrap();
}
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
@@ -96,10 +132,12 @@ mod test {
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get("X-Version-Id").unwrap(),
&version_id.to_string()
);
// the returned version ID is random, but let's check that it's not
// the passed parent version ID, at least
let new_version_id = resp.headers().get("X-Version-Id").unwrap();
assert!(new_version_id != &version_id.to_string());
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
}
@@ -108,13 +146,15 @@ mod test {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn SyncServer> = Box::new(TestServer {
expected_client_id: client_id,
av_expected_parent_version_id: parent_version_id,
av_expected_history_segment: b"abcd".to_vec(),
av_result: Some(AddVersionResult::ExpectedParentVersion(version_id)),
..Default::default()
});
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = server_box.txn().unwrap();
txn.set_client_latest_version_id(client_id, version_id)
.unwrap();
}
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
@@ -140,9 +180,7 @@ mod test {
async fn test_bad_content_type() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn SyncServer> = Box::new(TestServer {
..Default::default()
});
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
@@ -160,9 +198,7 @@ mod test {
async fn test_empty_body() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn SyncServer> = Box::new(TestServer {
..Default::default()
});
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;

View File

@@ -1,8 +1,11 @@
use crate::api::{
ServerState, HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER, VERSION_ID_HEADER,
failure_to_ise, ServerState, HISTORY_SEGMENT_CONTENT_TYPE, PARENT_VERSION_ID_HEADER,
VERSION_ID_HEADER,
};
use crate::server::{ClientId, VersionId};
use actix_web::{error, get, http::StatusCode, web, HttpResponse, Result};
use crate::server::{ClientId, GetVersionResult, VersionId};
use crate::storage::StorageTxn;
use actix_web::{error, get, web, HttpResponse, Result};
use failure::Fallible;
/// Get a child version.
///
@@ -14,12 +17,16 @@ use actix_web::{error, get, http::StatusCode, web, HttpResponse, Result};
/// Returns other 4xx or 5xx responses on other errors.
#[get("/client/{client_id}/get-child-version/{parent_version_id}")]
pub(crate) async fn service(
data: web::Data<ServerState>,
server_state: web::Data<ServerState>,
web::Path((client_id, parent_version_id)): web::Path<(ClientId, VersionId)>,
) -> Result<HttpResponse> {
let result = data
.get_child_version(client_id, parent_version_id)
.map_err(|e| error::InternalError::new(e, StatusCode::INTERNAL_SERVER_ERROR))?;
let mut txn = server_state.txn().map_err(failure_to_ise)?;
txn.get_client(client_id)
.map_err(failure_to_ise)?
.ok_or_else(|| error::ErrorNotFound("no such client"))?;
let result = get_child_version(txn, client_id, parent_version_id).map_err(failure_to_ise)?;
if let Some(result) = result {
Ok(HttpResponse::Ok()
.content_type(HISTORY_SEGMENT_CONTENT_TYPE)
@@ -34,14 +41,26 @@ pub(crate) async fn service(
}
}
fn get_child_version<'a>(
mut txn: Box<dyn StorageTxn + 'a>,
client_id: ClientId,
parent_version_id: VersionId,
) -> Fallible<Option<GetVersionResult>> {
Ok(txn
.get_version_by_parent(client_id, parent_version_id)?
.map(|version| GetVersionResult {
version_id: version.version_id,
parent_version_id: version.parent_version_id,
history_segment: version.history_segment,
}))
}
#[cfg(test)]
mod test {
use super::*;
use crate::api::ServerState;
use crate::app_scope;
use crate::server::{GetVersionResult, SyncServer};
use crate::test::TestServer;
use actix_web::{test, App};
use crate::storage::{InMemoryStorage, Storage};
use actix_web::{http::StatusCode, test, App};
use taskchampion::Uuid;
#[actix_rt::test]
@@ -49,16 +68,17 @@ mod test {
let client_id = Uuid::new_v4();
let version_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn SyncServer> = Box::new(TestServer {
expected_client_id: client_id,
gcv_expected_parent_version_id: parent_version_id,
gcv_result: Some(GetVersionResult {
version_id: version_id,
parent_version_id: parent_version_id,
history_segment: b"abcd".to_vec(),
}),
..Default::default()
});
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// set up the storage contents..
{
let mut txn = server_box.txn().unwrap();
txn.set_client_latest_version_id(client_id, Uuid::new_v4())
.unwrap();
txn.add_version(client_id, version_id, parent_version_id, b"abcd".to_vec())
.unwrap();
}
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
@@ -88,15 +108,36 @@ mod test {
}
#[actix_rt::test]
async fn test_not_found() {
async fn test_client_not_found() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn SyncServer> = Box::new(TestServer {
expected_client_id: client_id,
gcv_expected_parent_version_id: parent_version_id,
gcv_result: None,
..Default::default()
});
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;
let uri = format!(
"/client/{}/get-child-version/{}",
client_id, parent_version_id
);
let req = test::TestRequest::get().uri(&uri).to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(resp.headers().get("X-Version-Id"), None);
assert_eq!(resp.headers().get("X-Parent-Version-Id"), None);
}
#[actix_rt::test]
async fn test_version_not_found() {
let client_id = Uuid::new_v4();
let parent_version_id = Uuid::new_v4();
let server_box: Box<dyn Storage> = Box::new(InMemoryStorage::new());
// create the client, but not the version
{
let mut txn = server_box.txn().unwrap();
txn.set_client_latest_version_id(client_id, Uuid::new_v4())
.unwrap();
}
let server_state = ServerState::new(server_box);
let mut app = test::init_service(App::new().service(app_scope(server_state))).await;

View File

@@ -1,5 +1,5 @@
use crate::server::SyncServer;
use actix_web::{web, Scope};
use crate::storage::Storage;
use actix_web::{error, http::StatusCode, web, Scope};
use std::sync::Arc;
mod add_version;
@@ -15,11 +15,16 @@ pub(crate) const VERSION_ID_HEADER: &str = "X-Version-Id";
/// The header names for parent version ID
pub(crate) const PARENT_VERSION_ID_HEADER: &str = "X-Parent-Version-Id";
/// The type containing a reference to the SyncServer object in the Actix state.
pub(crate) type ServerState = Arc<Box<dyn SyncServer>>;
/// The type containing a reference to the Storage object in the Actix state.
pub(crate) type ServerState = Arc<Box<dyn Storage>>;
pub(crate) fn api_scope() -> Scope {
web::scope("")
.service(get_child_version::service)
.service(add_version::service)
}
/// Convert a failure::Error to an Actix ISE
fn failure_to_ise(err: failure::Error) -> impl actix_web::ResponseError {
error::InternalError::new(err, StatusCode::INTERNAL_SERVER_ERROR)
}