Add support for cloud sync, specifically GCP (#3223)
* Add support for cloud sync, specifically GCP This adds generic support for sync to cloud services, with specific spuport for GCP. Adding others -- so long as they support a compare-and-set operation -- should be comparatively straightforward. The cloud support includes cleanup of unnecessary data, and should keep total space usage roughly proportional to the number of tasks. Co-authored-by: ryneeverett <ryneeverett@gmail.com>
This commit is contained in:
committed by
GitHub
parent
6f1c16fecd
commit
9566c929e2
@@ -9,12 +9,19 @@ repository = "https://github.com/GothenburgBitFactory/taskwarrior"
|
||||
readme = "../README.md"
|
||||
license = "MIT"
|
||||
edition = "2021"
|
||||
rust-version = "1.65"
|
||||
rust-version = "1.70.0"
|
||||
|
||||
[features]
|
||||
default = ["server-sync" ]
|
||||
server-sync = ["crypto", "dep:ureq"]
|
||||
crypto = ["dep:ring"]
|
||||
default = ["server-sync", "server-gcp"]
|
||||
|
||||
# Support for sync to a server
|
||||
server-sync = ["encryption", "dep:ureq"]
|
||||
# Support for sync to GCP
|
||||
server-gcp = ["cloud", "encryption", "dep:google-cloud-storage", "dep:tokio"]
|
||||
# (private) Support for sync protocol encryption
|
||||
encryption = ["dep:ring"]
|
||||
# (private) Generic support for cloud sync
|
||||
cloud = []
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
all-features = true
|
||||
@@ -34,7 +41,11 @@ strum_macros.workspace = true
|
||||
flate2.workspace = true
|
||||
byteorder.workspace = true
|
||||
ring.workspace = true
|
||||
google-cloud-storage.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
google-cloud-storage.optional = true
|
||||
tokio.optional = true
|
||||
ureq.optional = true
|
||||
ring.optional = true
|
||||
|
||||
|
||||
@@ -40,5 +40,9 @@ other_error!(io::Error);
|
||||
other_error!(serde_json::Error);
|
||||
other_error!(rusqlite::Error);
|
||||
other_error!(crate::storage::sqlite::SqliteError);
|
||||
#[cfg(feature = "server-gcp")]
|
||||
other_error!(google_cloud_storage::http::Error);
|
||||
#[cfg(feature = "server-gcp")]
|
||||
other_error!(google_cloud_storage::client::google_cloud_auth::error::Error);
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
@@ -40,6 +40,7 @@ Support for some optional functionality is controlled by feature flags.
|
||||
|
||||
Sync server client support:
|
||||
|
||||
* `server-gcp` - sync to Google Cloud Platform
|
||||
* `server-sync` - sync to the taskchampion-sync-server
|
||||
|
||||
# See Also
|
||||
@@ -49,7 +50,7 @@ for more information about the design and usage of the tool.
|
||||
|
||||
# Minimum Supported Rust Version (MSRV)
|
||||
|
||||
This crate supports Rust version 1.65 and higher.
|
||||
This crate supports Rust version 1.70.0 and higher.
|
||||
|
||||
*/
|
||||
|
||||
|
||||
392
taskchampion/taskchampion/src/server/cloud/gcp.rs
Normal file
392
taskchampion/taskchampion/src/server/cloud/gcp.rs
Normal file
@@ -0,0 +1,392 @@
|
||||
use super::service::{ObjectInfo, Service};
|
||||
use crate::errors::Result;
|
||||
use google_cloud_storage::client::{Client, ClientConfig};
|
||||
use google_cloud_storage::http::error::ErrorResponse;
|
||||
use google_cloud_storage::http::Error as GcsError;
|
||||
use google_cloud_storage::http::{self, objects};
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
/// A [`Service`] implementation based on the Google Cloud Storage service.
|
||||
pub(in crate::server) struct GcpService {
|
||||
client: Client,
|
||||
rt: Runtime,
|
||||
bucket: String,
|
||||
}
|
||||
|
||||
/// Determine whether the given result contains an HTTP error with the given code.
|
||||
fn is_http_error<T>(query: u16, res: &std::result::Result<T, http::Error>) -> bool {
|
||||
match res {
|
||||
// Errors from RPC's.
|
||||
Err(GcsError::Response(ErrorResponse { code, .. })) => *code == query,
|
||||
// Errors from reqwest (downloads, uploads).
|
||||
Err(GcsError::HttpClient(e)) => e.status().map(|s| s.as_u16()) == Some(query),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
impl GcpService {
|
||||
pub(in crate::server) fn new(bucket: String) -> Result<Self> {
|
||||
let rt = Runtime::new()?;
|
||||
let config = rt.block_on(ClientConfig::default().with_auth())?;
|
||||
Ok(Self {
|
||||
client: Client::new(config),
|
||||
rt,
|
||||
bucket,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Service for GcpService {
|
||||
fn put(&mut self, name: &[u8], value: &[u8]) -> Result<()> {
|
||||
let name = String::from_utf8(name.to_vec()).expect("non-UTF8 object name");
|
||||
let upload_type = objects::upload::UploadType::Simple(objects::upload::Media::new(name));
|
||||
self.rt.block_on(self.client.upload_object(
|
||||
&objects::upload::UploadObjectRequest {
|
||||
bucket: self.bucket.clone(),
|
||||
..Default::default()
|
||||
},
|
||||
value.to_vec(),
|
||||
&upload_type,
|
||||
))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get(&mut self, name: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||
let name = String::from_utf8(name.to_vec()).expect("non-UTF8 object name");
|
||||
let download_res = self.rt.block_on(self.client.download_object(
|
||||
&objects::get::GetObjectRequest {
|
||||
bucket: self.bucket.clone(),
|
||||
object: name,
|
||||
..Default::default()
|
||||
},
|
||||
&objects::download::Range::default(),
|
||||
));
|
||||
if is_http_error(404, &download_res) {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(download_res?))
|
||||
}
|
||||
}
|
||||
|
||||
fn del(&mut self, name: &[u8]) -> Result<()> {
|
||||
let name = String::from_utf8(name.to_vec()).expect("non-UTF8 object name");
|
||||
let del_res = self.rt.block_on(self.client.delete_object(
|
||||
&objects::delete::DeleteObjectRequest {
|
||||
bucket: self.bucket.clone(),
|
||||
object: name,
|
||||
..Default::default()
|
||||
},
|
||||
));
|
||||
if !is_http_error(404, &del_res) {
|
||||
del_res?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list<'a>(&'a mut self, prefix: &[u8]) -> Box<dyn Iterator<Item = Result<ObjectInfo>> + 'a> {
|
||||
let prefix = String::from_utf8(prefix.to_vec()).expect("non-UTF8 object prefix");
|
||||
Box::new(ObjectIterator {
|
||||
service: self,
|
||||
prefix,
|
||||
last_response: None,
|
||||
next_index: 0,
|
||||
})
|
||||
}
|
||||
|
||||
fn compare_and_swap(
|
||||
&mut self,
|
||||
name: &[u8],
|
||||
existing_value: Option<Vec<u8>>,
|
||||
new_value: Vec<u8>,
|
||||
) -> Result<bool> {
|
||||
let name = String::from_utf8(name.to_vec()).expect("non-UTF8 object name");
|
||||
let get_res = self
|
||||
.rt
|
||||
.block_on(self.client.get_object(&objects::get::GetObjectRequest {
|
||||
bucket: self.bucket.clone(),
|
||||
object: name.clone(),
|
||||
..Default::default()
|
||||
}));
|
||||
// Determine the object's generation. See https://cloud.google.com/storage/docs/metadata#generation-number
|
||||
let generation = if is_http_error(404, &get_res) {
|
||||
// If a value was expected, that expectation has not been met.
|
||||
if existing_value.is_some() {
|
||||
return Ok(false);
|
||||
}
|
||||
// Generation 0 indicates that the object does not yet exist.
|
||||
0
|
||||
} else {
|
||||
get_res?.generation
|
||||
};
|
||||
|
||||
// If the file existed, then verify its contents.
|
||||
if generation > 0 {
|
||||
let data = self.rt.block_on(self.client.download_object(
|
||||
&objects::get::GetObjectRequest {
|
||||
bucket: self.bucket.clone(),
|
||||
object: name.clone(),
|
||||
// Fetch the same generation.
|
||||
generation: Some(generation),
|
||||
..Default::default()
|
||||
},
|
||||
&objects::download::Range::default(),
|
||||
))?;
|
||||
if Some(data) != existing_value {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, put the new value with a condition that the generation hasn't changed.
|
||||
let upload_type = objects::upload::UploadType::Simple(objects::upload::Media::new(name));
|
||||
let upload_res = self.rt.block_on(self.client.upload_object(
|
||||
&objects::upload::UploadObjectRequest {
|
||||
bucket: self.bucket.clone(),
|
||||
if_generation_match: Some(generation),
|
||||
..Default::default()
|
||||
},
|
||||
new_value.to_vec(),
|
||||
&upload_type,
|
||||
));
|
||||
if is_http_error(412, &upload_res) {
|
||||
// A 412 indicates the precondition was not satisfied: the given generation
|
||||
// is no longer the latest.
|
||||
Ok(false)
|
||||
} else {
|
||||
upload_res?;
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An Iterator returning names of objects from `list_objects`.
|
||||
///
|
||||
/// This handles response pagination by fetching one page at a time.
|
||||
struct ObjectIterator<'a> {
|
||||
service: &'a mut GcpService,
|
||||
prefix: String,
|
||||
last_response: Option<objects::list::ListObjectsResponse>,
|
||||
next_index: usize,
|
||||
}
|
||||
|
||||
impl<'a> ObjectIterator<'a> {
|
||||
fn fetch_batch(&mut self) -> Result<()> {
|
||||
let mut page_token = None;
|
||||
if let Some(ref resp) = self.last_response {
|
||||
page_token = resp.next_page_token.clone();
|
||||
}
|
||||
self.last_response = Some(self.service.rt.block_on(self.service.client.list_objects(
|
||||
&objects::list::ListObjectsRequest {
|
||||
bucket: self.service.bucket.clone(),
|
||||
prefix: Some(self.prefix.clone()),
|
||||
page_token,
|
||||
#[cfg(test)] // For testing, use a small page size.
|
||||
max_results: Some(6),
|
||||
..Default::default()
|
||||
},
|
||||
))?);
|
||||
self.next_index = 0;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for ObjectIterator<'a> {
|
||||
type Item = Result<ObjectInfo>;
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// If the iterator is just starting, fetch the first response.
|
||||
if self.last_response.is_none() {
|
||||
if let Err(e) = self.fetch_batch() {
|
||||
return Some(Err(e));
|
||||
}
|
||||
}
|
||||
if let Some(ref result) = self.last_response {
|
||||
if let Some(ref items) = result.items {
|
||||
if self.next_index < items.len() {
|
||||
// Return a result from the existing response.
|
||||
let obj = &items[self.next_index];
|
||||
self.next_index += 1;
|
||||
// It's unclear when `time_created` would be None, so default to 0 in that case
|
||||
// or when the timestamp is not a valid u64 (before 1970).
|
||||
let creation = obj.time_created.map(|t| t.unix_timestamp()).unwrap_or(0);
|
||||
let creation: u64 = creation.try_into().unwrap_or(0);
|
||||
return Some(Ok(ObjectInfo {
|
||||
name: obj.name.as_bytes().to_vec(),
|
||||
creation,
|
||||
}));
|
||||
} else if result.next_page_token.is_some() {
|
||||
// Fetch the next page and try again.
|
||||
if let Err(e) = self.fetch_batch() {
|
||||
return Some(Err(e));
|
||||
}
|
||||
return self.next();
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Make a service if `GCP_TEST_BUCKET` is set, as well as a function to put a unique prefix on
|
||||
/// an object name, so that tests do not interfere with one another.
|
||||
///
|
||||
/// Set up this bucket with a lifecyle policy to delete objects with age > 1 day. While passing
|
||||
/// tests should correctly clean up after themselves, failing tests may leave objects in the
|
||||
/// bucket.
|
||||
///
|
||||
/// When the environment variable is not set, this returns false and the test does not run.
|
||||
/// Note that the Rust test runner will still show "ok" for the test, as there is no way to
|
||||
/// indicate anything else.
|
||||
fn make_service() -> Option<(GcpService, impl Fn(&str) -> Vec<u8>)> {
|
||||
let Ok(bucket) = std::env::var("GCP_TEST_BUCKET") else {
|
||||
return None;
|
||||
};
|
||||
let prefix = Uuid::new_v4();
|
||||
Some((GcpService::new(bucket).unwrap(), move |n: &_| {
|
||||
format!("{}-{}", prefix.as_simple(), n).into_bytes()
|
||||
}))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn put_and_get() {
|
||||
let Some((mut svc, pfx)) = make_service() else {
|
||||
return;
|
||||
};
|
||||
svc.put(&pfx("testy"), b"foo").unwrap();
|
||||
let got = svc.get(&pfx("testy")).unwrap();
|
||||
assert_eq!(got, Some(b"foo".to_vec()));
|
||||
|
||||
// Clean up.
|
||||
svc.del(&pfx("testy")).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_missing() {
|
||||
let Some((mut svc, pfx)) = make_service() else {
|
||||
return;
|
||||
};
|
||||
let got = svc.get(&pfx("testy")).unwrap();
|
||||
assert_eq!(got, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn del() {
|
||||
let Some((mut svc, pfx)) = make_service() else {
|
||||
return;
|
||||
};
|
||||
svc.put(&pfx("testy"), b"data").unwrap();
|
||||
svc.del(&pfx("testy")).unwrap();
|
||||
let got = svc.get(&pfx("testy")).unwrap();
|
||||
assert_eq!(got, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn del_missing() {
|
||||
// Deleting an object that does not exist is not an error.
|
||||
let Some((mut svc, pfx)) = make_service() else {
|
||||
return;
|
||||
};
|
||||
|
||||
assert!(svc.del(&pfx("testy")).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn list() {
|
||||
let Some((mut svc, pfx)) = make_service() else {
|
||||
return;
|
||||
};
|
||||
let mut names: Vec<_> = (0..20).map(|i| pfx(&format!("pp-{i:02}"))).collect();
|
||||
names.sort();
|
||||
// Create 20 objects that will be listed.
|
||||
for n in &names {
|
||||
svc.put(n, b"data").unwrap();
|
||||
}
|
||||
// And another object that should not be included in the list.
|
||||
svc.put(&pfx("xxx"), b"data").unwrap();
|
||||
|
||||
let got_objects: Vec<_> = svc.list(&pfx("pp-")).collect::<Result<_>>().unwrap();
|
||||
let mut got_names: Vec<_> = got_objects.into_iter().map(|oi| oi.name).collect();
|
||||
got_names.sort();
|
||||
assert_eq!(got_names, names);
|
||||
|
||||
// Clean up.
|
||||
for n in got_names {
|
||||
svc.del(&n).unwrap();
|
||||
}
|
||||
svc.del(&pfx("xxx")).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compare_and_swap_create() {
|
||||
let Some((mut svc, pfx)) = make_service() else {
|
||||
return;
|
||||
};
|
||||
|
||||
assert!(svc
|
||||
.compare_and_swap(&pfx("testy"), None, b"bar".to_vec())
|
||||
.unwrap());
|
||||
let got = svc.get(&pfx("testy")).unwrap();
|
||||
assert_eq!(got, Some(b"bar".to_vec()));
|
||||
|
||||
// Clean up.
|
||||
svc.del(&pfx("testy")).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compare_and_swap_matches() {
|
||||
let Some((mut svc, pfx)) = make_service() else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Create the existing file, with two generations.
|
||||
svc.put(&pfx("testy"), b"foo1").unwrap();
|
||||
svc.put(&pfx("testy"), b"foo2").unwrap();
|
||||
assert!(svc
|
||||
.compare_and_swap(&pfx("testy"), Some(b"foo2".to_vec()), b"bar".to_vec())
|
||||
.unwrap());
|
||||
let got = svc.get(&pfx("testy")).unwrap();
|
||||
assert_eq!(got, Some(b"bar".to_vec()));
|
||||
|
||||
// Clean up.
|
||||
svc.del(&pfx("testy")).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compare_and_swap_expected_no_file() {
|
||||
let Some((mut svc, pfx)) = make_service() else {
|
||||
return;
|
||||
};
|
||||
|
||||
svc.put(&pfx("testy"), b"foo1").unwrap();
|
||||
assert!(!svc
|
||||
.compare_and_swap(&pfx("testy"), None, b"bar".to_vec())
|
||||
.unwrap());
|
||||
let got = svc.get(&pfx("testy")).unwrap();
|
||||
assert_eq!(got, Some(b"foo1".to_vec()));
|
||||
|
||||
// Clean up.
|
||||
svc.del(&pfx("testy")).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compare_and_swap_mismatch() {
|
||||
let Some((mut svc, pfx)) = make_service() else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Create the existing file, with two generations.
|
||||
svc.put(&pfx("testy"), b"foo1").unwrap();
|
||||
svc.put(&pfx("testy"), b"foo2").unwrap();
|
||||
assert!(!svc
|
||||
.compare_and_swap(&pfx("testy"), Some(b"foo1".to_vec()), b"bar".to_vec())
|
||||
.unwrap());
|
||||
let got = svc.get(&pfx("testy")).unwrap();
|
||||
assert_eq!(got, Some(b"foo2".to_vec()));
|
||||
|
||||
// Clean up.
|
||||
svc.del(&pfx("testy")).unwrap();
|
||||
}
|
||||
}
|
||||
16
taskchampion/taskchampion/src/server/cloud/mod.rs
Normal file
16
taskchampion/taskchampion/src/server/cloud/mod.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
/*!
|
||||
* Support for cloud-service-backed sync.
|
||||
*
|
||||
* All of these operate using a similar approach, with specific patterns of object names. The
|
||||
* process of adding a new version requires a compare-and-swap operation that sets a new version
|
||||
* as the "latest" only if the existing "latest" has the expected value. This ensures a continuous
|
||||
* chain of versions, even if multiple replicas attempt to sync at the same time.
|
||||
*/
|
||||
|
||||
mod server;
|
||||
mod service;
|
||||
|
||||
pub(in crate::server) use server::CloudServer;
|
||||
|
||||
#[cfg(feature = "server-gcp")]
|
||||
pub(in crate::server) mod gcp;
|
||||
1183
taskchampion/taskchampion/src/server/cloud/server.rs
Normal file
1183
taskchampion/taskchampion/src/server/cloud/server.rs
Normal file
File diff suppressed because it is too large
Load Diff
38
taskchampion/taskchampion/src/server/cloud/service.rs
Normal file
38
taskchampion/taskchampion/src/server/cloud/service.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
use crate::errors::Result;
|
||||
|
||||
/// Information about an object as returned from `Service::list`
|
||||
pub(in crate::server) struct ObjectInfo {
|
||||
/// Name of the object.
|
||||
pub(in crate::server) name: Vec<u8>,
|
||||
/// Creation time of the object, in seconds since the UNIX epoch.
|
||||
pub(in crate::server) creation: u64,
|
||||
}
|
||||
|
||||
/// An abstraction of a cloud-storage service.
|
||||
///
|
||||
/// The underlying cloud storage is assumed to be a map from object names to object values,
|
||||
/// similar to a HashMap, with the addition of a compare-and-swap operation. Object names
|
||||
/// are always simple strings from the character set `[a-zA-Z0-9-]`, no more than 100 characters
|
||||
/// in length.
|
||||
pub(in crate::server) trait Service {
|
||||
/// Put an object into cloud storage. If the object exists, it is overwritten.
|
||||
fn put(&mut self, name: &[u8], value: &[u8]) -> Result<()>;
|
||||
|
||||
/// Get an object from cloud storage, or None if the object does not exist.
|
||||
fn get(&mut self, name: &[u8]) -> Result<Option<Vec<u8>>>;
|
||||
|
||||
/// Delete an object. Does nothing if the object does not exist.
|
||||
fn del(&mut self, name: &[u8]) -> Result<()>;
|
||||
|
||||
/// Enumerate objects with the given prefix.
|
||||
fn list<'a>(&'a mut self, prefix: &[u8]) -> Box<dyn Iterator<Item = Result<ObjectInfo>> + 'a>;
|
||||
|
||||
/// Compare the existing object's value with `existing_value`, and replace with `new_value`
|
||||
/// only if the values match. Returns true if the replacement occurred.
|
||||
fn compare_and_swap(
|
||||
&mut self,
|
||||
name: &[u8],
|
||||
existing_value: Option<Vec<u8>>,
|
||||
new_value: Vec<u8>,
|
||||
) -> Result<bool>;
|
||||
}
|
||||
@@ -1,5 +1,9 @@
|
||||
use super::types::Server;
|
||||
use crate::errors::Result;
|
||||
#[cfg(feature = "server-gcp")]
|
||||
use crate::server::cloud::gcp::GcpService;
|
||||
#[cfg(feature = "cloud")]
|
||||
use crate::server::cloud::CloudServer;
|
||||
use crate::server::local::LocalServer;
|
||||
#[cfg(feature = "server-sync")]
|
||||
use crate::server::sync::SyncServer;
|
||||
@@ -23,6 +27,17 @@ pub enum ServerConfig {
|
||||
/// Client ID to identify and authenticate this replica to the server
|
||||
client_id: Uuid,
|
||||
|
||||
/// Private encryption secret used to encrypt all data sent to the server. This can
|
||||
/// be any suitably un-guessable string of bytes.
|
||||
encryption_secret: Vec<u8>,
|
||||
},
|
||||
/// A remote taskchampion-sync-server instance
|
||||
#[cfg(feature = "server-gcp")]
|
||||
Gcp {
|
||||
/// Bucket in which to store the task data. This bucket must not be used for any other
|
||||
/// purpose.
|
||||
bucket: String,
|
||||
|
||||
/// Private encryption secret used to encrypt all data sent to the server. This can
|
||||
/// be any suitably un-guessable string of bytes.
|
||||
encryption_secret: Vec<u8>,
|
||||
@@ -40,6 +55,14 @@ impl ServerConfig {
|
||||
client_id,
|
||||
encryption_secret,
|
||||
} => Box::new(SyncServer::new(origin, client_id, encryption_secret)?),
|
||||
#[cfg(feature = "server-gcp")]
|
||||
ServerConfig::Gcp {
|
||||
bucket,
|
||||
encryption_secret,
|
||||
} => Box::new(CloudServer::new(
|
||||
GcpService::new(bucket)?,
|
||||
encryption_secret,
|
||||
)?),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
/// This module implements the encryption specified in the sync-protocol
|
||||
/// document.
|
||||
use crate::errors::{Error, Result};
|
||||
use ring::{aead, digest, pbkdf2, rand, rand::SecureRandom};
|
||||
use ring::{aead, pbkdf2, rand, rand::SecureRandom};
|
||||
use uuid::Uuid;
|
||||
|
||||
const PBKDF2_ITERATIONS: u32 = 100000;
|
||||
@@ -11,24 +11,32 @@ const TASK_APP_ID: u8 = 1;
|
||||
|
||||
/// An Cryptor stores a secret and allows sealing and unsealing. It derives a key from the secret,
|
||||
/// which takes a nontrivial amount of time, so it should be created once and re-used for the given
|
||||
/// client_id.
|
||||
/// context.
|
||||
#[derive(Clone)]
|
||||
pub(super) struct Cryptor {
|
||||
key: aead::LessSafeKey,
|
||||
rng: rand::SystemRandom,
|
||||
}
|
||||
|
||||
impl Cryptor {
|
||||
pub(super) fn new(client_id: Uuid, secret: &Secret) -> Result<Self> {
|
||||
pub(super) fn new(salt: impl AsRef<[u8]>, secret: &Secret) -> Result<Self> {
|
||||
Ok(Cryptor {
|
||||
key: Self::derive_key(client_id, secret)?,
|
||||
key: Self::derive_key(salt, secret)?,
|
||||
rng: rand::SystemRandom::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Derive a key as specified for version 1. Note that this may take 10s of ms.
|
||||
fn derive_key(client_id: Uuid, secret: &Secret) -> Result<aead::LessSafeKey> {
|
||||
let salt = digest::digest(&digest::SHA256, client_id.as_bytes());
|
||||
/// Generate a suitable random salt.
|
||||
pub(super) fn gen_salt() -> Result<Vec<u8>> {
|
||||
let rng = rand::SystemRandom::new();
|
||||
let mut salt = [0u8; 16];
|
||||
rng.fill(&mut salt)
|
||||
.map_err(|_| anyhow::anyhow!("error generating random salt"))?;
|
||||
Ok(salt.to_vec())
|
||||
}
|
||||
|
||||
/// Derive a key as specified for version 1. Note that this may take 10s of ms.
|
||||
fn derive_key(salt: impl AsRef<[u8]>, secret: &Secret) -> Result<aead::LessSafeKey> {
|
||||
let mut key_bytes = vec![0u8; aead::CHACHA20_POLY1305.key_len()];
|
||||
pbkdf2::derive(
|
||||
pbkdf2::PBKDF2_HMAC_SHA256,
|
||||
@@ -93,7 +101,7 @@ impl Cryptor {
|
||||
let plaintext = self
|
||||
.key
|
||||
.open_in_place(nonce, aad, payload.as_mut())
|
||||
.map_err(|_| anyhow::anyhow!("error while creating AEAD key"))?;
|
||||
.map_err(|_| anyhow::anyhow!("error while unsealing encrypted value"))?;
|
||||
|
||||
Ok(Unsealed {
|
||||
version_id,
|
||||
@@ -169,46 +177,39 @@ pub(super) struct Unsealed {
|
||||
pub(super) payload: Vec<u8>,
|
||||
}
|
||||
|
||||
impl From<Unsealed> for Vec<u8> {
|
||||
fn from(val: Unsealed) -> Self {
|
||||
val.payload
|
||||
}
|
||||
}
|
||||
|
||||
/// An encrypted payload
|
||||
pub(super) struct Sealed {
|
||||
pub(super) version_id: Uuid,
|
||||
pub(super) payload: Vec<u8>,
|
||||
}
|
||||
|
||||
impl Sealed {
|
||||
#[cfg(feature = "server-sync")]
|
||||
pub(super) fn from_resp(
|
||||
resp: ureq::Response,
|
||||
version_id: Uuid,
|
||||
content_type: &str,
|
||||
) -> Result<Sealed> {
|
||||
use std::io::Read;
|
||||
if resp.header("Content-Type") == Some(content_type) {
|
||||
let mut reader = resp.into_reader();
|
||||
let mut payload = vec![];
|
||||
reader.read_to_end(&mut payload)?;
|
||||
Ok(Self {
|
||||
version_id,
|
||||
payload,
|
||||
})
|
||||
} else {
|
||||
Err(Error::Server(String::from(
|
||||
"Response did not have expected content-type",
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8]> for Sealed {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
self.payload.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Sealed> for Vec<u8> {
|
||||
fn from(val: Sealed) -> Self {
|
||||
val.payload
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use ring::digest;
|
||||
|
||||
fn make_salt() -> Vec<u8> {
|
||||
Cryptor::gen_salt().unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn envelope_round_trip() {
|
||||
@@ -252,7 +253,7 @@ mod test {
|
||||
let payload = b"HISTORY REPEATS ITSELF".to_vec();
|
||||
|
||||
let secret = Secret(b"SEKRIT".to_vec());
|
||||
let cryptor = Cryptor::new(Uuid::new_v4(), &secret).unwrap();
|
||||
let cryptor = Cryptor::new(make_salt(), &secret).unwrap();
|
||||
|
||||
let unsealed = Unsealed {
|
||||
version_id,
|
||||
@@ -269,10 +270,10 @@ mod test {
|
||||
fn round_trip_bad_key() {
|
||||
let version_id = Uuid::new_v4();
|
||||
let payload = b"HISTORY REPEATS ITSELF".to_vec();
|
||||
let client_id = Uuid::new_v4();
|
||||
let salt = make_salt();
|
||||
|
||||
let secret = Secret(b"SEKRIT".to_vec());
|
||||
let cryptor = Cryptor::new(client_id, &secret).unwrap();
|
||||
let cryptor = Cryptor::new(&salt, &secret).unwrap();
|
||||
|
||||
let unsealed = Unsealed {
|
||||
version_id,
|
||||
@@ -281,7 +282,7 @@ mod test {
|
||||
let sealed = cryptor.seal(unsealed).unwrap();
|
||||
|
||||
let secret = Secret(b"DIFFERENT_SECRET".to_vec());
|
||||
let cryptor = Cryptor::new(client_id, &secret).unwrap();
|
||||
let cryptor = Cryptor::new(&salt, &secret).unwrap();
|
||||
assert!(cryptor.unseal(sealed).is_err());
|
||||
}
|
||||
|
||||
@@ -289,10 +290,9 @@ mod test {
|
||||
fn round_trip_bad_version() {
|
||||
let version_id = Uuid::new_v4();
|
||||
let payload = b"HISTORY REPEATS ITSELF".to_vec();
|
||||
let client_id = Uuid::new_v4();
|
||||
|
||||
let secret = Secret(b"SEKRIT".to_vec());
|
||||
let cryptor = Cryptor::new(client_id, &secret).unwrap();
|
||||
let cryptor = Cryptor::new(make_salt(), &secret).unwrap();
|
||||
|
||||
let unsealed = Unsealed {
|
||||
version_id,
|
||||
@@ -304,13 +304,12 @@ mod test {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_bad_client_id() {
|
||||
fn round_trip_bad_salt() {
|
||||
let version_id = Uuid::new_v4();
|
||||
let payload = b"HISTORY REPEATS ITSELF".to_vec();
|
||||
let client_id = Uuid::new_v4();
|
||||
|
||||
let secret = Secret(b"SEKRIT".to_vec());
|
||||
let cryptor = Cryptor::new(client_id, &secret).unwrap();
|
||||
let cryptor = Cryptor::new(make_salt(), &secret).unwrap();
|
||||
|
||||
let unsealed = Unsealed {
|
||||
version_id,
|
||||
@@ -318,8 +317,7 @@ mod test {
|
||||
};
|
||||
let sealed = cryptor.seal(unsealed).unwrap();
|
||||
|
||||
let client_id = Uuid::new_v4();
|
||||
let cryptor = Cryptor::new(client_id, &secret).unwrap();
|
||||
let cryptor = Cryptor::new(make_salt(), &secret).unwrap();
|
||||
assert!(cryptor.unseal(sealed).is_err());
|
||||
}
|
||||
|
||||
@@ -331,23 +329,25 @@ mod test {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
/// The values in generate-test-data.py
|
||||
fn defaults() -> (Uuid, Uuid, Vec<u8>) {
|
||||
(
|
||||
Uuid::parse_str("b0517957-f912-4d49-8330-f612e73030c4").unwrap(),
|
||||
Uuid::parse_str("0666d464-418a-4a08-ad53-6f15c78270cd").unwrap(),
|
||||
b"b4a4e6b7b811eda1dc1a2693ded".to_vec(),
|
||||
)
|
||||
fn defaults() -> (Uuid, Vec<u8>, Vec<u8>) {
|
||||
let version_id = Uuid::parse_str("b0517957-f912-4d49-8330-f612e73030c4").unwrap();
|
||||
let encryption_secret = b"b4a4e6b7b811eda1dc1a2693ded".to_vec();
|
||||
let client_id = Uuid::parse_str("0666d464-418a-4a08-ad53-6f15c78270cd").unwrap();
|
||||
let salt = dbg!(digest::digest(&digest::SHA256, client_id.as_ref()))
|
||||
.as_ref()
|
||||
.to_vec();
|
||||
(version_id, salt, encryption_secret)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn good() {
|
||||
let (version_id, client_id, encryption_secret) = defaults();
|
||||
let (version_id, salt, encryption_secret) = defaults();
|
||||
let sealed = Sealed {
|
||||
version_id,
|
||||
payload: include_bytes!("test-good.data").to_vec(),
|
||||
};
|
||||
|
||||
let cryptor = Cryptor::new(client_id, &Secret(encryption_secret)).unwrap();
|
||||
let cryptor = Cryptor::new(salt, &Secret(encryption_secret)).unwrap();
|
||||
let unsealed = cryptor.unseal(sealed).unwrap();
|
||||
|
||||
assert_eq!(unsealed.payload, b"SUCCESS");
|
||||
@@ -356,61 +356,61 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn bad_version_id() {
|
||||
let (version_id, client_id, encryption_secret) = defaults();
|
||||
let (version_id, salt, encryption_secret) = defaults();
|
||||
let sealed = Sealed {
|
||||
version_id,
|
||||
payload: include_bytes!("test-bad-version-id.data").to_vec(),
|
||||
};
|
||||
|
||||
let cryptor = Cryptor::new(client_id, &Secret(encryption_secret)).unwrap();
|
||||
let cryptor = Cryptor::new(salt, &Secret(encryption_secret)).unwrap();
|
||||
assert!(cryptor.unseal(sealed).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bad_client_id() {
|
||||
let (version_id, client_id, encryption_secret) = defaults();
|
||||
fn bad_salt() {
|
||||
let (version_id, salt, encryption_secret) = defaults();
|
||||
let sealed = Sealed {
|
||||
version_id,
|
||||
payload: include_bytes!("test-bad-client-id.data").to_vec(),
|
||||
};
|
||||
|
||||
let cryptor = Cryptor::new(client_id, &Secret(encryption_secret)).unwrap();
|
||||
let cryptor = Cryptor::new(salt, &Secret(encryption_secret)).unwrap();
|
||||
assert!(cryptor.unseal(sealed).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bad_secret() {
|
||||
let (version_id, client_id, encryption_secret) = defaults();
|
||||
let (version_id, salt, encryption_secret) = defaults();
|
||||
let sealed = Sealed {
|
||||
version_id,
|
||||
payload: include_bytes!("test-bad-secret.data").to_vec(),
|
||||
};
|
||||
|
||||
let cryptor = Cryptor::new(client_id, &Secret(encryption_secret)).unwrap();
|
||||
let cryptor = Cryptor::new(salt, &Secret(encryption_secret)).unwrap();
|
||||
assert!(cryptor.unseal(sealed).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bad_version() {
|
||||
let (version_id, client_id, encryption_secret) = defaults();
|
||||
let (version_id, salt, encryption_secret) = defaults();
|
||||
let sealed = Sealed {
|
||||
version_id,
|
||||
payload: include_bytes!("test-bad-version.data").to_vec(),
|
||||
};
|
||||
|
||||
let cryptor = Cryptor::new(client_id, &Secret(encryption_secret)).unwrap();
|
||||
let cryptor = Cryptor::new(salt, &Secret(encryption_secret)).unwrap();
|
||||
assert!(cryptor.unseal(sealed).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bad_app_id() {
|
||||
let (version_id, client_id, encryption_secret) = defaults();
|
||||
let (version_id, salt, encryption_secret) = defaults();
|
||||
let sealed = Sealed {
|
||||
version_id,
|
||||
payload: include_bytes!("test-bad-app-id.data").to_vec(),
|
||||
};
|
||||
|
||||
let cryptor = Cryptor::new(client_id, &Secret(encryption_secret)).unwrap();
|
||||
let cryptor = Cryptor::new(salt, &Secret(encryption_secret)).unwrap();
|
||||
assert!(cryptor.unseal(sealed).is_err());
|
||||
}
|
||||
}
|
||||
@@ -16,12 +16,15 @@ mod local;
|
||||
mod op;
|
||||
mod types;
|
||||
|
||||
#[cfg(feature = "crypto")]
|
||||
mod crypto;
|
||||
#[cfg(feature = "encryption")]
|
||||
mod encryption;
|
||||
|
||||
#[cfg(feature = "server-sync")]
|
||||
mod sync;
|
||||
|
||||
#[cfg(feature = "cloud")]
|
||||
mod cloud;
|
||||
|
||||
pub use config::ServerConfig;
|
||||
pub use types::*;
|
||||
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use crate::errors::Result;
|
||||
use crate::errors::{Error, Result};
|
||||
use crate::server::{
|
||||
AddVersionResult, GetVersionResult, HistorySegment, Server, Snapshot, SnapshotUrgency,
|
||||
VersionId,
|
||||
};
|
||||
use ring::digest;
|
||||
use std::time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::crypto::{Cryptor, Sealed, Secret, Unsealed};
|
||||
use super::encryption::{Cryptor, Sealed, Secret, Unsealed};
|
||||
|
||||
pub struct SyncServer {
|
||||
origin: String,
|
||||
@@ -28,10 +29,11 @@ impl SyncServer {
|
||||
/// identify this client to the server. Multiple replicas synchronizing the same task history
|
||||
/// should use the same client_id.
|
||||
pub fn new(origin: String, client_id: Uuid, encryption_secret: Vec<u8>) -> Result<SyncServer> {
|
||||
let salt = dbg!(digest::digest(&digest::SHA256, client_id.as_ref()));
|
||||
Ok(SyncServer {
|
||||
origin,
|
||||
client_id,
|
||||
cryptor: Cryptor::new(client_id, &Secret(encryption_secret.to_vec()))?,
|
||||
cryptor: Cryptor::new(salt, &Secret(encryption_secret.to_vec()))?,
|
||||
agent: ureq::AgentBuilder::new()
|
||||
.timeout_connect(Duration::from_secs(10))
|
||||
.timeout_read(Duration::from_secs(60))
|
||||
@@ -62,6 +64,23 @@ fn get_snapshot_urgency(resp: &ureq::Response) -> SnapshotUrgency {
|
||||
}
|
||||
}
|
||||
|
||||
fn sealed_from_resp(resp: ureq::Response, version_id: Uuid, content_type: &str) -> Result<Sealed> {
|
||||
use std::io::Read;
|
||||
if resp.header("Content-Type") == Some(content_type) {
|
||||
let mut reader = resp.into_reader();
|
||||
let mut payload = vec![];
|
||||
reader.read_to_end(&mut payload)?;
|
||||
Ok(Sealed {
|
||||
version_id,
|
||||
payload,
|
||||
})
|
||||
} else {
|
||||
Err(Error::Server(String::from(
|
||||
"Response did not have expected content-type",
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
impl Server for SyncServer {
|
||||
fn add_version(
|
||||
&mut self,
|
||||
@@ -117,7 +136,7 @@ impl Server for SyncServer {
|
||||
let parent_version_id = get_uuid_header(&resp, "X-Parent-Version-Id")?;
|
||||
let version_id = get_uuid_header(&resp, "X-Version-Id")?;
|
||||
let sealed =
|
||||
Sealed::from_resp(resp, parent_version_id, HISTORY_SEGMENT_CONTENT_TYPE)?;
|
||||
sealed_from_resp(resp, parent_version_id, HISTORY_SEGMENT_CONTENT_TYPE)?;
|
||||
let history_segment = self.cryptor.unseal(sealed)?.payload;
|
||||
Ok(GetVersionResult::Version {
|
||||
version_id,
|
||||
@@ -158,7 +177,7 @@ impl Server for SyncServer {
|
||||
{
|
||||
Ok(resp) => {
|
||||
let version_id = get_uuid_header(&resp, "X-Version-Id")?;
|
||||
let sealed = Sealed::from_resp(resp, version_id, SNAPSHOT_CONTENT_TYPE)?;
|
||||
let sealed = sealed_from_resp(resp, version_id, SNAPSHOT_CONTENT_TYPE)?;
|
||||
let snapshot = self.cryptor.unseal(sealed)?.payload;
|
||||
Ok(Some((version_id, snapshot)))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user