Merge pull request #206 from dbr/sqlstore
Switch to SQLite storage backend
This commit is contained in:
1
.changelogs/2021-09-10-sqlstore.md
Normal file
1
.changelogs/2021-09-10-sqlstore.md
Normal file
@@ -0,0 +1 @@
|
||||
- Breaking: Removed the KV based storage backend in client and server, and replaced with SQLite ([Issue #131](https://github.com/taskchampion/taskchampion/issues/131), [PR #206](https://github.com/taskchampion/taskchampion/pull/206))
|
||||
517
Cargo.lock
generated
517
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -72,7 +72,6 @@ mod test {
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_cleaning_command_name() {
|
||||
assert_eq!(
|
||||
|
||||
@@ -10,12 +10,14 @@ edition = "2018"
|
||||
uuid = { version = "^0.8.2", features = ["serde", "v4"] }
|
||||
actix-web = "^3.3.2"
|
||||
anyhow = "1.0"
|
||||
thiserror = "1.0"
|
||||
futures = "^0.3.8"
|
||||
serde = "^1.0.125"
|
||||
kv = {version = "^0.10.0", features = ["msgpack-value"]}
|
||||
serde_json = "^1.0"
|
||||
clap = "^2.33.0"
|
||||
log = "^0.4.14"
|
||||
env_logger = "^0.8.3"
|
||||
rusqlite = { version = "0.25", features = ["bundled"] }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "^2.2.0"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#![deny(clippy::all)]
|
||||
|
||||
use crate::storage::{KvStorage, Storage};
|
||||
use crate::storage::{SqliteStorage, Storage};
|
||||
use actix_web::{get, middleware::Logger, web, App, HttpServer, Responder, Scope};
|
||||
use api::{api_scope, ServerState};
|
||||
use clap::Arg;
|
||||
@@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let data_dir = matches.value_of("data-dir").unwrap();
|
||||
let port = matches.value_of("port").unwrap();
|
||||
|
||||
let server_box: Box<dyn Storage> = Box::new(KvStorage::new(data_dir)?);
|
||||
let server_box: Box<dyn Storage> = Box::new(SqliteStorage::new(data_dir)?);
|
||||
let server_state = ServerState::new(server_box);
|
||||
|
||||
log::warn!("Serving on port {}", port);
|
||||
|
||||
@@ -1,241 +0,0 @@
|
||||
use super::{Client, Storage, StorageTxn, Uuid, Version};
|
||||
use kv::msgpack::Msgpack;
|
||||
use kv::{Bucket, Config, Error, Serde, Store, ValueBuf};
|
||||
use std::path::Path;
|
||||
|
||||
/// DB Key for versions: concatenation of client_key and parent_version_id
|
||||
type VersionDbKey = [u8; 32];
|
||||
|
||||
fn version_db_key(client_key: Uuid, parent_version_id: Uuid) -> VersionDbKey {
|
||||
let mut key = [0u8; 32];
|
||||
key[..16].clone_from_slice(client_key.as_bytes());
|
||||
key[16..].clone_from_slice(parent_version_id.as_bytes());
|
||||
key
|
||||
}
|
||||
|
||||
/// Key for clients: just the client_key
|
||||
type ClientDbKey = [u8; 16];
|
||||
|
||||
fn client_db_key(client_key: Uuid) -> ClientDbKey {
|
||||
*client_key.as_bytes()
|
||||
}
|
||||
|
||||
/// KvStorage is an on-disk storage backend which uses LMDB via the `kv` crate.
|
||||
pub(crate) struct KvStorage<'t> {
|
||||
store: Store,
|
||||
clients_bucket: Bucket<'t, ClientDbKey, ValueBuf<Msgpack<Client>>>,
|
||||
versions_bucket: Bucket<'t, VersionDbKey, ValueBuf<Msgpack<Version>>>,
|
||||
}
|
||||
|
||||
impl<'t> KvStorage<'t> {
|
||||
pub fn new<P: AsRef<Path>>(directory: P) -> anyhow::Result<KvStorage<'t>> {
|
||||
let mut config = Config::default(directory);
|
||||
config.bucket("clients", None);
|
||||
config.bucket("versions", None);
|
||||
|
||||
let store = Store::new(config)?;
|
||||
|
||||
let clients_bucket =
|
||||
store.bucket::<ClientDbKey, ValueBuf<Msgpack<Client>>>(Some("clients"))?;
|
||||
let versions_bucket =
|
||||
store.bucket::<VersionDbKey, ValueBuf<Msgpack<Version>>>(Some("versions"))?;
|
||||
|
||||
Ok(KvStorage {
|
||||
store,
|
||||
clients_bucket,
|
||||
versions_bucket,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'t> Storage for KvStorage<'t> {
|
||||
fn txn<'a>(&'a self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> {
|
||||
Ok(Box::new(Txn {
|
||||
storage: self,
|
||||
txn: Some(self.store.write_txn()?),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
struct Txn<'t> {
|
||||
storage: &'t KvStorage<'t>,
|
||||
txn: Option<kv::Txn<'t>>,
|
||||
}
|
||||
|
||||
impl<'t> Txn<'t> {
|
||||
// get the underlying kv Txn
|
||||
fn kvtxn(&mut self) -> &mut kv::Txn<'t> {
|
||||
if let Some(ref mut txn) = self.txn {
|
||||
txn
|
||||
} else {
|
||||
panic!("cannot use transaction after commit");
|
||||
}
|
||||
}
|
||||
|
||||
fn clients_bucket(&self) -> &'t Bucket<'t, ClientDbKey, ValueBuf<Msgpack<Client>>> {
|
||||
&self.storage.clients_bucket
|
||||
}
|
||||
fn versions_bucket(&self) -> &'t Bucket<'t, VersionDbKey, ValueBuf<Msgpack<Version>>> {
|
||||
&self.storage.versions_bucket
|
||||
}
|
||||
}
|
||||
|
||||
impl<'t> StorageTxn for Txn<'t> {
|
||||
fn get_client(&mut self, client_key: Uuid) -> anyhow::Result<Option<Client>> {
|
||||
let key = client_db_key(client_key);
|
||||
let bucket = self.clients_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
|
||||
let client = match kvtxn.get(bucket, key) {
|
||||
Ok(buf) => buf,
|
||||
Err(Error::NotFound) => return Ok(None),
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
.inner()?
|
||||
.to_serde();
|
||||
Ok(Some(client))
|
||||
}
|
||||
|
||||
fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> {
|
||||
let key = client_db_key(client_key);
|
||||
let bucket = self.clients_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
let client = Client { latest_version_id };
|
||||
kvtxn.set(bucket, key, Msgpack::to_value_buf(client)?)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_client_latest_version_id(
|
||||
&mut self,
|
||||
client_key: Uuid,
|
||||
latest_version_id: Uuid,
|
||||
) -> anyhow::Result<()> {
|
||||
// implementation is the same as new_client..
|
||||
self.new_client(client_key, latest_version_id)
|
||||
}
|
||||
|
||||
fn get_version_by_parent(
|
||||
&mut self,
|
||||
client_key: Uuid,
|
||||
parent_version_id: Uuid,
|
||||
) -> anyhow::Result<Option<Version>> {
|
||||
let key = version_db_key(client_key, parent_version_id);
|
||||
let bucket = self.versions_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
let version = match kvtxn.get(bucket, key) {
|
||||
Ok(buf) => buf,
|
||||
Err(Error::NotFound) => return Ok(None),
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
.inner()?
|
||||
.to_serde();
|
||||
Ok(Some(version))
|
||||
}
|
||||
|
||||
fn add_version(
|
||||
&mut self,
|
||||
client_key: Uuid,
|
||||
version_id: Uuid,
|
||||
parent_version_id: Uuid,
|
||||
history_segment: Vec<u8>,
|
||||
) -> anyhow::Result<()> {
|
||||
let key = version_db_key(client_key, parent_version_id);
|
||||
let bucket = self.versions_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
let version = Version {
|
||||
version_id,
|
||||
parent_version_id,
|
||||
history_segment,
|
||||
};
|
||||
kvtxn.set(bucket, key, Msgpack::to_value_buf(version)?)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn commit(&mut self) -> anyhow::Result<()> {
|
||||
if let Some(kvtxn) = self.txn.take() {
|
||||
kvtxn.commit()?;
|
||||
} else {
|
||||
panic!("transaction already committed");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn test_get_client_empty() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut txn = storage.txn()?;
|
||||
let maybe_client = txn.get_client(Uuid::new_v4())?;
|
||||
assert!(maybe_client.is_none());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_client_storage() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut txn = storage.txn()?;
|
||||
|
||||
let client_key = Uuid::new_v4();
|
||||
let latest_version_id = Uuid::new_v4();
|
||||
txn.new_client(client_key, latest_version_id)?;
|
||||
|
||||
let client = txn.get_client(client_key)?.unwrap();
|
||||
assert_eq!(client.latest_version_id, latest_version_id);
|
||||
|
||||
let latest_version_id = Uuid::new_v4();
|
||||
txn.set_client_latest_version_id(client_key, latest_version_id)?;
|
||||
|
||||
let client = txn.get_client(client_key)?.unwrap();
|
||||
assert_eq!(client.latest_version_id, latest_version_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gvbp_empty() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut txn = storage.txn()?;
|
||||
let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?;
|
||||
assert!(maybe_version.is_none());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_version_and_gvbp() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut txn = storage.txn()?;
|
||||
|
||||
let client_key = Uuid::new_v4();
|
||||
let version_id = Uuid::new_v4();
|
||||
let parent_version_id = Uuid::new_v4();
|
||||
let history_segment = b"abc".to_vec();
|
||||
txn.add_version(
|
||||
client_key,
|
||||
version_id,
|
||||
parent_version_id,
|
||||
history_segment.clone(),
|
||||
)?;
|
||||
let version = txn
|
||||
.get_version_by_parent(client_key, parent_version_id)?
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
version,
|
||||
Version {
|
||||
version_id,
|
||||
parent_version_id,
|
||||
history_segment,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -6,8 +6,8 @@ mod inmemory;
|
||||
#[cfg(test)]
|
||||
pub(crate) use inmemory::InMemoryStorage;
|
||||
|
||||
mod kv;
|
||||
pub(crate) use self::kv::KvStorage;
|
||||
mod sqlite;
|
||||
pub(crate) use self::sqlite::SqliteStorage;
|
||||
|
||||
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
|
||||
pub(crate) struct Client {
|
||||
|
||||
291
sync-server/src/storage/sqlite.rs
Normal file
291
sync-server/src/storage/sqlite.rs
Normal file
@@ -0,0 +1,291 @@
|
||||
use super::{Client, Storage, StorageTxn, Uuid, Version};
|
||||
use anyhow::Context;
|
||||
use rusqlite::types::{FromSql, ToSql};
|
||||
use rusqlite::{params, Connection, OptionalExtension};
|
||||
use std::path::Path;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum SqliteError {
|
||||
#[error("Failed to create SQLite transaction")]
|
||||
CreateTransactionFailed,
|
||||
}
|
||||
|
||||
/// Newtype to allow implementing `FromSql` for foreign `uuid::Uuid`
|
||||
struct StoredUuid(Uuid);
|
||||
|
||||
/// Conversion from Uuid stored as a string (rusqlite's uuid feature stores as binary blob)
|
||||
impl FromSql for StoredUuid {
|
||||
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
|
||||
let u = Uuid::parse_str(value.as_str()?)
|
||||
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
|
||||
Ok(StoredUuid(u))
|
||||
}
|
||||
}
|
||||
|
||||
/// Store Uuid as string in database
|
||||
impl ToSql for StoredUuid {
|
||||
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
|
||||
let s = self.0.to_string();
|
||||
Ok(s.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores [`Client`] in SQLite
|
||||
impl FromSql for Client {
|
||||
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
|
||||
let o: Client = serde_json::from_str(value.as_str()?)
|
||||
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
|
||||
Ok(o)
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses Operation stored as JSON in string column
|
||||
impl ToSql for Client {
|
||||
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
|
||||
let s = serde_json::to_string(&self)
|
||||
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
|
||||
Ok(s.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// An on-disk storage backend which uses SQLite
|
||||
pub(crate) struct SqliteStorage {
|
||||
db_file: std::path::PathBuf,
|
||||
}
|
||||
|
||||
impl SqliteStorage {
|
||||
fn new_connection(&self) -> anyhow::Result<Connection> {
|
||||
Ok(Connection::open(&self.db_file)?)
|
||||
}
|
||||
|
||||
pub fn new<P: AsRef<Path>>(directory: P) -> anyhow::Result<SqliteStorage> {
|
||||
std::fs::create_dir_all(&directory)?;
|
||||
let db_file = directory.as_ref().join("taskchampion-sync-server.sqlite3");
|
||||
|
||||
let o = SqliteStorage { db_file };
|
||||
|
||||
{
|
||||
let mut con = o.new_connection()?;
|
||||
let txn = con.transaction()?;
|
||||
|
||||
let queries = vec![
|
||||
"CREATE TABLE IF NOT EXISTS clients (client_key STRING PRIMARY KEY, latest_version_id STRING);",
|
||||
"CREATE TABLE IF NOT EXISTS versions (version_id STRING PRIMARY KEY, client_key STRING, parent_version_id STRING, history_segment BLOB);",
|
||||
];
|
||||
for q in queries {
|
||||
txn.execute(q, []).context("Creating table")?;
|
||||
}
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
Ok(o)
|
||||
}
|
||||
}
|
||||
|
||||
impl Storage for SqliteStorage {
|
||||
fn txn<'a>(&'a self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> {
|
||||
let con = self.new_connection()?;
|
||||
let t = Txn { con };
|
||||
Ok(Box::new(t))
|
||||
}
|
||||
}
|
||||
|
||||
struct Txn {
|
||||
con: Connection,
|
||||
}
|
||||
|
||||
impl Txn {
|
||||
fn get_txn(&mut self) -> Result<rusqlite::Transaction, SqliteError> {
|
||||
self.con
|
||||
.transaction()
|
||||
.map_err(|_e| SqliteError::CreateTransactionFailed)
|
||||
}
|
||||
}
|
||||
|
||||
impl StorageTxn for Txn {
|
||||
fn get_client(&mut self, client_key: Uuid) -> anyhow::Result<Option<Client>> {
|
||||
let t = self.get_txn()?;
|
||||
let result: Option<Client> = t
|
||||
.query_row(
|
||||
"SELECT latest_version_id FROM clients WHERE client_key = ? LIMIT 1",
|
||||
[&StoredUuid(client_key)],
|
||||
|r| {
|
||||
let latest_version_id: StoredUuid = r.get(0)?;
|
||||
Ok(Client {
|
||||
latest_version_id: latest_version_id.0,
|
||||
})
|
||||
},
|
||||
)
|
||||
.optional()
|
||||
.context("Get client query")?;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> {
|
||||
let t = self.get_txn()?;
|
||||
|
||||
t.execute(
|
||||
"INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)",
|
||||
params![&StoredUuid(client_key), &StoredUuid(latest_version_id)],
|
||||
)
|
||||
.context("Create client query")?;
|
||||
t.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_client_latest_version_id(
|
||||
&mut self,
|
||||
client_key: Uuid,
|
||||
latest_version_id: Uuid,
|
||||
) -> anyhow::Result<()> {
|
||||
// Implementation is same as new_client
|
||||
self.new_client(client_key, latest_version_id)
|
||||
}
|
||||
|
||||
fn get_version_by_parent(
|
||||
&mut self,
|
||||
client_key: Uuid,
|
||||
parent_version_id: Uuid,
|
||||
) -> anyhow::Result<Option<Version>> {
|
||||
let t = self.get_txn()?;
|
||||
let r = t.query_row(
|
||||
"SELECT version_id, parent_version_id, history_segment FROM versions WHERE parent_version_id = ? AND client_key = ?",
|
||||
params![&StoredUuid(parent_version_id), &StoredUuid(client_key)],
|
||||
|r| {
|
||||
let version_id: StoredUuid = r.get("version_id")?;
|
||||
let parent_version_id: StoredUuid = r.get("parent_version_id")?;
|
||||
|
||||
Ok(Version{
|
||||
version_id: version_id.0,
|
||||
parent_version_id: parent_version_id.0,
|
||||
history_segment: r.get("history_segment")?,
|
||||
})}
|
||||
)
|
||||
.optional()
|
||||
.context("Get version query")
|
||||
?;
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
fn add_version(
|
||||
&mut self,
|
||||
client_key: Uuid,
|
||||
version_id: Uuid,
|
||||
parent_version_id: Uuid,
|
||||
history_segment: Vec<u8>,
|
||||
) -> anyhow::Result<()> {
|
||||
let t = self.get_txn()?;
|
||||
|
||||
t.execute(
|
||||
"INSERT INTO versions (version_id, client_key, parent_version_id, history_segment) VALUES(?, ?, ?, ?)",
|
||||
params![
|
||||
StoredUuid(version_id),
|
||||
StoredUuid(client_key),
|
||||
StoredUuid(parent_version_id),
|
||||
history_segment
|
||||
],
|
||||
)
|
||||
.context("Add version query")?;
|
||||
t.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn commit(&mut self) -> anyhow::Result<()> {
|
||||
// FIXME: Note the queries aren't currently run in a
|
||||
// transaction, as storing the transaction object and a pooled
|
||||
// connection in the `Txn` object is complex.
|
||||
// https://github.com/taskchampion/taskchampion/pull/206#issuecomment-860336073
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn test_emtpy_dir() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let non_existant = tmp_dir.path().join("subdir");
|
||||
let storage = SqliteStorage::new(&non_existant)?;
|
||||
let mut txn = storage.txn()?;
|
||||
let maybe_client = txn.get_client(Uuid::new_v4())?;
|
||||
assert!(maybe_client.is_none());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_client_empty() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let mut txn = storage.txn()?;
|
||||
let maybe_client = txn.get_client(Uuid::new_v4())?;
|
||||
assert!(maybe_client.is_none());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_client_storage() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let mut txn = storage.txn()?;
|
||||
|
||||
let client_key = Uuid::new_v4();
|
||||
let latest_version_id = Uuid::new_v4();
|
||||
txn.new_client(client_key, latest_version_id)?;
|
||||
|
||||
let client = txn.get_client(client_key)?.unwrap();
|
||||
assert_eq!(client.latest_version_id, latest_version_id);
|
||||
|
||||
let latest_version_id = Uuid::new_v4();
|
||||
txn.set_client_latest_version_id(client_key, latest_version_id)?;
|
||||
|
||||
let client = txn.get_client(client_key)?.unwrap();
|
||||
assert_eq!(client.latest_version_id, latest_version_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gvbp_empty() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let mut txn = storage.txn()?;
|
||||
let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?;
|
||||
assert!(maybe_version.is_none());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_version_and_gvbp() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let mut txn = storage.txn()?;
|
||||
|
||||
let client_key = Uuid::new_v4();
|
||||
let version_id = Uuid::new_v4();
|
||||
let parent_version_id = Uuid::new_v4();
|
||||
let history_segment = b"abc".to_vec();
|
||||
txn.add_version(
|
||||
client_key,
|
||||
version_id,
|
||||
parent_version_id,
|
||||
history_segment.clone(),
|
||||
)?;
|
||||
let version = txn
|
||||
.get_version_by_parent(client_key, parent_version_id)?
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
version,
|
||||
Version {
|
||||
version_id,
|
||||
parent_version_id,
|
||||
history_segment,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -17,11 +17,10 @@ serde_json = "^1.0"
|
||||
chrono = { version = "^0.4.10", features = ["serde"] }
|
||||
anyhow = "1.0"
|
||||
thiserror = "1.0"
|
||||
kv = {version = "^0.10.0", features = ["msgpack-value"]}
|
||||
lmdb-rkv = {version = "^0.14.0"}
|
||||
ureq = "^2.1.0"
|
||||
log = "^0.4.14"
|
||||
tindercrypt = { version = "^0.2.2", default-features = false }
|
||||
rusqlite = { version = "0.25", features = ["bundled"] }
|
||||
strum = "0.21"
|
||||
strum_macros = "0.21"
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
use crate::server::{
|
||||
AddVersionResult, GetVersionResult, HistorySegment, Server, VersionId, NO_VERSION_ID,
|
||||
};
|
||||
use crate::utils::Key;
|
||||
use kv::msgpack::Msgpack;
|
||||
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
|
||||
use crate::storage::sqlite::StoredUuid;
|
||||
use anyhow::Context;
|
||||
use rusqlite::params;
|
||||
use rusqlite::OptionalExtension;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::Path;
|
||||
use uuid::Uuid;
|
||||
@@ -15,58 +16,54 @@ struct Version {
|
||||
history_segment: HistorySegment,
|
||||
}
|
||||
|
||||
pub struct LocalServer<'t> {
|
||||
store: Store,
|
||||
// NOTE: indexed by parent_version_id!
|
||||
versions_bucket: Bucket<'t, Key, ValueBuf<Msgpack<Version>>>,
|
||||
latest_version_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
|
||||
pub struct LocalServer {
|
||||
con: rusqlite::Connection,
|
||||
}
|
||||
|
||||
impl<'t> LocalServer<'t> {
|
||||
/// A test server has no notion of clients, signatures, encryption, etc.
|
||||
pub fn new<P: AsRef<Path>>(directory: P) -> anyhow::Result<LocalServer<'t>> {
|
||||
let mut config = Config::default(directory);
|
||||
config.bucket("versions", None);
|
||||
config.bucket("numbers", None);
|
||||
config.bucket("latest_version", None);
|
||||
config.bucket("operations", None);
|
||||
config.bucket("working_set", None);
|
||||
let store = Store::new(config)?;
|
||||
impl LocalServer {
|
||||
fn txn(&mut self) -> anyhow::Result<rusqlite::Transaction> {
|
||||
let txn = self.con.transaction()?;
|
||||
Ok(txn)
|
||||
}
|
||||
|
||||
// versions are stored indexed by VersionId (uuid)
|
||||
let versions_bucket = store.bucket::<Key, ValueBuf<Msgpack<Version>>>(Some("versions"))?;
|
||||
/// A server which has no notion of clients, signatures, encryption, etc.
|
||||
pub fn new<P: AsRef<Path>>(directory: P) -> anyhow::Result<LocalServer> {
|
||||
let db_file = directory
|
||||
.as_ref()
|
||||
.join("taskchampion-local-sync-server.sqlite3");
|
||||
let con = rusqlite::Connection::open(&db_file)?;
|
||||
|
||||
// this bucket contains the latest version at key 0
|
||||
let latest_version_bucket =
|
||||
store.int_bucket::<ValueBuf<Msgpack<Uuid>>>(Some("latest_version"))?;
|
||||
let queries = vec![
|
||||
"CREATE TABLE IF NOT EXISTS data (key STRING PRIMARY KEY, value STRING);",
|
||||
"CREATE TABLE IF NOT EXISTS versions (version_id STRING PRIMARY KEY, parent_version_id STRING, data STRING);",
|
||||
];
|
||||
for q in queries {
|
||||
con.execute(q, []).context("Creating table")?;
|
||||
}
|
||||
|
||||
Ok(LocalServer {
|
||||
store,
|
||||
versions_bucket,
|
||||
latest_version_bucket,
|
||||
})
|
||||
Ok(LocalServer { con })
|
||||
}
|
||||
|
||||
fn get_latest_version_id(&mut self) -> anyhow::Result<VersionId> {
|
||||
let txn = self.store.read_txn()?;
|
||||
let base_version = match txn.get(&self.latest_version_bucket, 0.into()) {
|
||||
Ok(buf) => buf,
|
||||
Err(Error::NotFound) => return Ok(NO_VERSION_ID),
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
.inner()?
|
||||
.to_serde();
|
||||
Ok(base_version as VersionId)
|
||||
let t = self.txn()?;
|
||||
let result: Option<StoredUuid> = t
|
||||
.query_row(
|
||||
"SELECT value FROM data WHERE key = 'latest_version_id' LIMIT 1",
|
||||
rusqlite::params![],
|
||||
|r| r.get(0),
|
||||
)
|
||||
.optional()?;
|
||||
Ok(result.map(|x| x.0).unwrap_or(NO_VERSION_ID))
|
||||
}
|
||||
|
||||
fn set_latest_version_id(&mut self, version_id: VersionId) -> anyhow::Result<()> {
|
||||
let mut txn = self.store.write_txn()?;
|
||||
txn.set(
|
||||
&self.latest_version_bucket,
|
||||
0.into(),
|
||||
Msgpack::to_value_buf(version_id as Uuid)?,
|
||||
)?;
|
||||
txn.commit()?;
|
||||
let t = self.txn()?;
|
||||
t.execute(
|
||||
"INSERT OR REPLACE INTO data (key, value) VALUES ('latest_version_id', ?)",
|
||||
params![&StoredUuid(version_id)],
|
||||
)
|
||||
.context("Update task query")?;
|
||||
t.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -74,31 +71,42 @@ impl<'t> LocalServer<'t> {
|
||||
&mut self,
|
||||
parent_version_id: VersionId,
|
||||
) -> anyhow::Result<Option<Version>> {
|
||||
let txn = self.store.read_txn()?;
|
||||
let t = self.txn()?;
|
||||
let r = t.query_row(
|
||||
"SELECT version_id, parent_version_id, data FROM versions WHERE parent_version_id = ?",
|
||||
params![&StoredUuid(parent_version_id)],
|
||||
|r| {
|
||||
let version_id: StoredUuid = r.get("version_id")?;
|
||||
let parent_version_id: StoredUuid = r.get("parent_version_id")?;
|
||||
|
||||
let version = match txn.get(&self.versions_bucket, parent_version_id.into()) {
|
||||
Ok(buf) => buf,
|
||||
Err(Error::NotFound) => return Ok(None),
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
.inner()?
|
||||
.to_serde();
|
||||
Ok(Some(version))
|
||||
Ok(Version{
|
||||
version_id: version_id.0,
|
||||
parent_version_id: parent_version_id.0,
|
||||
history_segment: r.get("data")?,
|
||||
})}
|
||||
)
|
||||
.optional()
|
||||
.context("Get version query")
|
||||
?;
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
fn add_version_by_parent_version_id(&mut self, version: Version) -> anyhow::Result<()> {
|
||||
let mut txn = self.store.write_txn()?;
|
||||
txn.set(
|
||||
&self.versions_bucket,
|
||||
version.parent_version_id.into(),
|
||||
Msgpack::to_value_buf(version)?,
|
||||
let t = self.txn()?;
|
||||
t.execute(
|
||||
"INSERT INTO versions (version_id, parent_version_id, data) VALUES (?, ?, ?)",
|
||||
params![
|
||||
StoredUuid(version.version_id),
|
||||
StoredUuid(version.parent_version_id),
|
||||
version.history_segment
|
||||
],
|
||||
)?;
|
||||
txn.commit()?;
|
||||
t.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'t> Server for LocalServer<'t> {
|
||||
impl Server for LocalServer {
|
||||
// TODO: better transaction isolation for add_version (gets and sets should be in the same
|
||||
// transaction)
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use super::{InMemoryStorage, KvStorage, Storage};
|
||||
use super::{InMemoryStorage, SqliteStorage, Storage};
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// The configuration required for a replica's storage.
|
||||
@@ -15,7 +15,7 @@ pub enum StorageConfig {
|
||||
impl StorageConfig {
|
||||
pub fn into_storage(self) -> anyhow::Result<Box<dyn Storage>> {
|
||||
Ok(match self {
|
||||
StorageConfig::OnDisk { taskdb_dir } => Box::new(KvStorage::new(taskdb_dir)?),
|
||||
StorageConfig::OnDisk { taskdb_dir } => Box::new(SqliteStorage::new(taskdb_dir)?),
|
||||
StorageConfig::InMemory => Box::new(InMemoryStorage::new()),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -11,12 +11,12 @@ use uuid::Uuid;
|
||||
|
||||
mod config;
|
||||
mod inmemory;
|
||||
mod kv;
|
||||
mod operation;
|
||||
pub(crate) mod sqlite;
|
||||
|
||||
pub use self::kv::KvStorage;
|
||||
pub use config::StorageConfig;
|
||||
pub use inmemory::InMemoryStorage;
|
||||
pub use sqlite::SqliteStorage;
|
||||
|
||||
pub use operation::Operation;
|
||||
|
||||
|
||||
@@ -1,355 +1,348 @@
|
||||
use crate::storage::{Operation, Storage, StorageTxn, TaskMap, VersionId, DEFAULT_BASE_VERSION};
|
||||
use crate::utils::Key;
|
||||
use kv::msgpack::Msgpack;
|
||||
use kv::{Bucket, Config, Error, Integer, Serde, Store, ValueBuf};
|
||||
use anyhow::Context;
|
||||
use rusqlite::types::{FromSql, ToSql};
|
||||
use rusqlite::{params, Connection, OptionalExtension};
|
||||
use std::path::Path;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// KvStorage is an on-disk storage backend which uses LMDB via the `kv` crate.
|
||||
pub struct KvStorage<'t> {
|
||||
store: Store,
|
||||
tasks_bucket: Bucket<'t, Key, ValueBuf<Msgpack<TaskMap>>>,
|
||||
numbers_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<u64>>>,
|
||||
uuids_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
|
||||
operations_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>>,
|
||||
working_set_bucket: Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>>,
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum SqliteError {
|
||||
#[error("SQLite transaction already committted")]
|
||||
TransactionAlreadyCommitted,
|
||||
}
|
||||
|
||||
const BASE_VERSION: u64 = 1;
|
||||
const NEXT_OPERATION: u64 = 2;
|
||||
const NEXT_WORKING_SET_INDEX: u64 = 3;
|
||||
/// Newtype to allow implementing `FromSql` for foreign `uuid::Uuid`
|
||||
pub(crate) struct StoredUuid(pub(crate) Uuid);
|
||||
|
||||
impl<'t> KvStorage<'t> {
|
||||
pub fn new<P: AsRef<Path>>(directory: P) -> anyhow::Result<KvStorage<'t>> {
|
||||
let mut config = Config::default(directory);
|
||||
config.bucket("tasks", None);
|
||||
config.bucket("numbers", None);
|
||||
config.bucket("uuids", None);
|
||||
config.bucket("operations", None);
|
||||
config.bucket("working_set", None);
|
||||
let store = Store::new(config)?;
|
||||
|
||||
// tasks are stored indexed by uuid
|
||||
let tasks_bucket = store.bucket::<Key, ValueBuf<Msgpack<TaskMap>>>(Some("tasks"))?;
|
||||
|
||||
// this bucket contains various u64s, indexed by constants above
|
||||
let numbers_bucket = store.int_bucket::<ValueBuf<Msgpack<u64>>>(Some("numbers"))?;
|
||||
|
||||
// this bucket contains various Uuids, indexed by constants above
|
||||
let uuids_bucket = store.int_bucket::<ValueBuf<Msgpack<Uuid>>>(Some("uuids"))?;
|
||||
|
||||
// this bucket contains operations, numbered consecutively; the NEXT_OPERATION number gives
|
||||
// the index of the next operation to insert
|
||||
let operations_bucket =
|
||||
store.int_bucket::<ValueBuf<Msgpack<Operation>>>(Some("operations"))?;
|
||||
|
||||
// this bucket contains operations, numbered consecutively; the NEXT_WORKING_SET_INDEX
|
||||
// number gives the index of the next operation to insert
|
||||
let working_set_bucket =
|
||||
store.int_bucket::<ValueBuf<Msgpack<Uuid>>>(Some("working_set"))?;
|
||||
|
||||
Ok(KvStorage {
|
||||
store,
|
||||
tasks_bucket,
|
||||
numbers_bucket,
|
||||
uuids_bucket,
|
||||
operations_bucket,
|
||||
working_set_bucket,
|
||||
})
|
||||
/// Conversion from Uuid stored as a string (rusqlite's uuid feature stores as binary blob)
|
||||
impl FromSql for StoredUuid {
|
||||
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
|
||||
let u = Uuid::parse_str(value.as_str()?)
|
||||
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
|
||||
Ok(StoredUuid(u))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'t> Storage for KvStorage<'t> {
|
||||
fn txn<'a>(&'a mut self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> {
|
||||
Ok(Box::new(Txn {
|
||||
storage: self,
|
||||
txn: Some(self.store.write_txn()?),
|
||||
}))
|
||||
/// Store Uuid as string in database
|
||||
impl ToSql for StoredUuid {
|
||||
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
|
||||
let s = self.0.to_string();
|
||||
Ok(s.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps [`TaskMap`] (type alias for HashMap) so we can implement rusqlite conversion traits for it
|
||||
struct StoredTaskMap(TaskMap);
|
||||
|
||||
/// Parses TaskMap stored as JSON in string column
|
||||
impl FromSql for StoredTaskMap {
|
||||
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
|
||||
let o: TaskMap = serde_json::from_str(value.as_str()?)
|
||||
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
|
||||
Ok(StoredTaskMap(o))
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores TaskMap in string column
|
||||
impl ToSql for StoredTaskMap {
|
||||
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
|
||||
let s = serde_json::to_string(&self.0)
|
||||
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
|
||||
Ok(s.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores [`Operation`] in SQLite
|
||||
impl FromSql for Operation {
|
||||
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
|
||||
let o: Operation = serde_json::from_str(value.as_str()?)
|
||||
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
|
||||
Ok(o)
|
||||
}
|
||||
}
|
||||
|
||||
/// Parsers Operation stored as JSON in string column
|
||||
impl ToSql for Operation {
|
||||
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
|
||||
let s = serde_json::to_string(&self)
|
||||
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
|
||||
Ok(s.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// SqliteStorage is an on-disk storage backed by SQLite3.
|
||||
pub struct SqliteStorage {
|
||||
con: Connection,
|
||||
}
|
||||
|
||||
impl SqliteStorage {
|
||||
pub fn new<P: AsRef<Path>>(directory: P) -> anyhow::Result<SqliteStorage> {
|
||||
// Ensure parent folder exists
|
||||
std::fs::create_dir_all(&directory)?;
|
||||
|
||||
// Open (or create) database
|
||||
let db_file = directory.as_ref().join("taskchampion.sqlite3");
|
||||
let con = Connection::open(db_file)?;
|
||||
|
||||
// Initialize database
|
||||
let queries = vec![
|
||||
"CREATE TABLE IF NOT EXISTS operations (id INTEGER PRIMARY KEY AUTOINCREMENT, data STRING);",
|
||||
"CREATE TABLE IF NOT EXISTS sync_meta (key STRING PRIMARY KEY, value STRING);",
|
||||
"CREATE TABLE IF NOT EXISTS tasks (uuid STRING PRIMARY KEY, data STRING);",
|
||||
"CREATE TABLE IF NOT EXISTS working_set (id INTEGER PRIMARY KEY, uuid STRING);",
|
||||
];
|
||||
for q in queries {
|
||||
con.execute(q, []).context("Creating table")?;
|
||||
}
|
||||
|
||||
Ok(SqliteStorage { con })
|
||||
}
|
||||
}
|
||||
|
||||
struct Txn<'t> {
|
||||
storage: &'t KvStorage<'t>,
|
||||
txn: Option<kv::Txn<'t>>,
|
||||
txn: Option<rusqlite::Transaction<'t>>,
|
||||
}
|
||||
|
||||
impl<'t> Txn<'t> {
|
||||
// get the underlying kv Txn
|
||||
fn kvtxn(&mut self) -> &mut kv::Txn<'t> {
|
||||
if let Some(ref mut txn) = self.txn {
|
||||
txn
|
||||
} else {
|
||||
panic!("cannot use transaction after commit");
|
||||
}
|
||||
fn get_txn(&self) -> Result<&rusqlite::Transaction<'t>, SqliteError> {
|
||||
self.txn
|
||||
.as_ref()
|
||||
.ok_or(SqliteError::TransactionAlreadyCommitted)
|
||||
}
|
||||
|
||||
// Access to buckets
|
||||
fn tasks_bucket(&self) -> &'t Bucket<'t, Key, ValueBuf<Msgpack<TaskMap>>> {
|
||||
&self.storage.tasks_bucket
|
||||
fn get_next_working_set_number(&self) -> anyhow::Result<usize> {
|
||||
let t = self.get_txn()?;
|
||||
let next_id: Option<usize> = t
|
||||
.query_row("SELECT COALESCE(MAX(id), 0) + 1 FROM working_set", [], |r| {
|
||||
r.get(0)
|
||||
})
|
||||
.optional()
|
||||
.context("Getting highest working set ID")?;
|
||||
|
||||
Ok(next_id.unwrap_or(0))
|
||||
}
|
||||
fn numbers_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<u64>>> {
|
||||
&self.storage.numbers_bucket
|
||||
}
|
||||
fn uuids_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>> {
|
||||
&self.storage.uuids_bucket
|
||||
}
|
||||
fn operations_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Operation>>> {
|
||||
&self.storage.operations_bucket
|
||||
}
|
||||
fn working_set_bucket(&self) -> &'t Bucket<'t, Integer, ValueBuf<Msgpack<Uuid>>> {
|
||||
&self.storage.working_set_bucket
|
||||
}
|
||||
|
||||
impl Storage for SqliteStorage {
|
||||
fn txn<'a>(&'a mut self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> {
|
||||
let txn = self.con.transaction()?;
|
||||
Ok(Box::new(Txn { txn: Some(txn) }))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'t> StorageTxn for Txn<'t> {
|
||||
fn get_task(&mut self, uuid: Uuid) -> anyhow::Result<Option<TaskMap>> {
|
||||
let bucket = self.tasks_bucket();
|
||||
let buf = match self.kvtxn().get(bucket, uuid.into()) {
|
||||
Ok(buf) => buf,
|
||||
Err(Error::NotFound) => return Ok(None),
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
let value = buf.inner()?.to_serde();
|
||||
Ok(Some(value))
|
||||
let t = self.get_txn()?;
|
||||
let result: Option<StoredTaskMap> = t
|
||||
.query_row(
|
||||
"SELECT data FROM tasks WHERE uuid = ? LIMIT 1",
|
||||
[&StoredUuid(uuid)],
|
||||
|r| r.get("data"),
|
||||
)
|
||||
.optional()?;
|
||||
|
||||
// Get task from "stored" wrapper
|
||||
Ok(result.map(|t| t.0))
|
||||
}
|
||||
|
||||
fn create_task(&mut self, uuid: Uuid) -> anyhow::Result<bool> {
|
||||
let bucket = self.tasks_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
match kvtxn.get(bucket, uuid.into()) {
|
||||
Err(Error::NotFound) => {
|
||||
kvtxn.set(bucket, uuid.into(), Msgpack::to_value_buf(TaskMap::new())?)?;
|
||||
Ok(true)
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
Ok(_) => Ok(false),
|
||||
let t = self.get_txn()?;
|
||||
let count: usize = t.query_row(
|
||||
"SELECT count(uuid) FROM tasks WHERE uuid = ?",
|
||||
[&StoredUuid(uuid)],
|
||||
|x| x.get(0),
|
||||
)?;
|
||||
if count > 0 {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let data = TaskMap::default();
|
||||
t.execute(
|
||||
"INSERT INTO tasks (uuid, data) VALUES (?, ?)",
|
||||
params![&StoredUuid(uuid), &StoredTaskMap(data)],
|
||||
)
|
||||
.context("Create task query")?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn set_task(&mut self, uuid: Uuid, task: TaskMap) -> anyhow::Result<()> {
|
||||
let bucket = self.tasks_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
kvtxn.set(bucket, uuid.into(), Msgpack::to_value_buf(task)?)?;
|
||||
let t = self.get_txn()?;
|
||||
t.execute(
|
||||
"INSERT OR REPLACE INTO tasks (uuid, data) VALUES (?, ?)",
|
||||
params![&StoredUuid(uuid), &StoredTaskMap(task)],
|
||||
)
|
||||
.context("Update task query")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete_task(&mut self, uuid: Uuid) -> anyhow::Result<bool> {
|
||||
let bucket = self.tasks_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
match kvtxn.del(bucket, uuid.into()) {
|
||||
Err(Error::NotFound) => Ok(false),
|
||||
Err(e) => Err(e.into()),
|
||||
Ok(_) => Ok(true),
|
||||
}
|
||||
let t = self.get_txn()?;
|
||||
let changed = t
|
||||
.execute("DELETE FROM tasks WHERE uuid = ?", [&StoredUuid(uuid)])
|
||||
.context("Delete task query")?;
|
||||
Ok(changed > 0)
|
||||
}
|
||||
|
||||
fn all_tasks(&mut self) -> anyhow::Result<Vec<(Uuid, TaskMap)>> {
|
||||
let bucket = self.tasks_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
let all_tasks: Result<Vec<(Uuid, TaskMap)>, Error> = kvtxn
|
||||
.read_cursor(bucket)?
|
||||
.iter()
|
||||
.map(|(k, v)| Ok((k.into(), v.inner()?.to_serde())))
|
||||
.collect();
|
||||
Ok(all_tasks?)
|
||||
let t = self.get_txn()?;
|
||||
|
||||
let mut q = t.prepare("SELECT uuid, data FROM tasks")?;
|
||||
let rows = q.query_map([], |r| {
|
||||
let uuid: StoredUuid = r.get("uuid")?;
|
||||
let data: StoredTaskMap = r.get("data")?;
|
||||
Ok((uuid.0, data.0))
|
||||
})?;
|
||||
|
||||
let mut ret = vec![];
|
||||
for r in rows {
|
||||
ret.push(r?);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn all_task_uuids(&mut self) -> anyhow::Result<Vec<Uuid>> {
|
||||
let bucket = self.tasks_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
Ok(kvtxn
|
||||
.read_cursor(bucket)?
|
||||
.iter()
|
||||
.map(|(k, _)| k.into())
|
||||
.collect())
|
||||
let t = self.get_txn()?;
|
||||
|
||||
let mut q = t.prepare("SELECT uuid FROM tasks")?;
|
||||
let rows = q.query_map([], |r| {
|
||||
let uuid: StoredUuid = r.get("uuid")?;
|
||||
Ok(uuid.0)
|
||||
})?;
|
||||
|
||||
let mut ret = vec![];
|
||||
for r in rows {
|
||||
ret.push(r?);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn base_version(&mut self) -> anyhow::Result<VersionId> {
|
||||
let bucket = self.uuids_bucket();
|
||||
let base_version = match self.kvtxn().get(bucket, BASE_VERSION.into()) {
|
||||
Ok(buf) => buf,
|
||||
Err(Error::NotFound) => return Ok(DEFAULT_BASE_VERSION),
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
.inner()?
|
||||
.to_serde();
|
||||
Ok(base_version as VersionId)
|
||||
let t = self.get_txn()?;
|
||||
|
||||
let version: Option<StoredUuid> = t
|
||||
.query_row(
|
||||
"SELECT value FROM sync_meta WHERE key = 'base_version'",
|
||||
[],
|
||||
|r| r.get("value"),
|
||||
)
|
||||
.optional()?;
|
||||
Ok(version.map(|u| u.0).unwrap_or(DEFAULT_BASE_VERSION))
|
||||
}
|
||||
|
||||
fn set_base_version(&mut self, version: VersionId) -> anyhow::Result<()> {
|
||||
let uuids_bucket = self.uuids_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
|
||||
kvtxn.set(
|
||||
uuids_bucket,
|
||||
BASE_VERSION.into(),
|
||||
Msgpack::to_value_buf(version as Uuid)?,
|
||||
)?;
|
||||
let t = self.get_txn()?;
|
||||
t.execute(
|
||||
"INSERT OR REPLACE INTO sync_meta (key, value) VALUES (?, ?)",
|
||||
params!["base_version", &StoredUuid(version)],
|
||||
)
|
||||
.context("Set base version")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn operations(&mut self) -> anyhow::Result<Vec<Operation>> {
|
||||
let bucket = self.operations_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
let all_ops: Result<Vec<(u64, Operation)>, Error> = kvtxn
|
||||
.read_cursor(bucket)?
|
||||
.iter()
|
||||
.map(|(i, v)| Ok((i.into(), v.inner()?.to_serde())))
|
||||
.collect();
|
||||
let mut all_ops = all_ops?;
|
||||
// sort by key..
|
||||
all_ops.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
// and return the values..
|
||||
Ok(all_ops.iter().map(|(_, v)| v.clone()).collect())
|
||||
let t = self.get_txn()?;
|
||||
|
||||
let mut q = t.prepare("SELECT data FROM operations ORDER BY id ASC")?;
|
||||
let rows = q.query_map([], |r| {
|
||||
let data: Operation = r.get("data")?;
|
||||
Ok(data)
|
||||
})?;
|
||||
|
||||
let mut ret = vec![];
|
||||
for r in rows {
|
||||
ret.push(r?);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn add_operation(&mut self, op: Operation) -> anyhow::Result<()> {
|
||||
let numbers_bucket = self.numbers_bucket();
|
||||
let operations_bucket = self.operations_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
let t = self.get_txn()?;
|
||||
|
||||
let next_op = match kvtxn.get(numbers_bucket, NEXT_OPERATION.into()) {
|
||||
Ok(buf) => buf.inner()?.to_serde(),
|
||||
Err(Error::NotFound) => 0,
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
kvtxn.set(
|
||||
operations_bucket,
|
||||
next_op.into(),
|
||||
Msgpack::to_value_buf(op)?,
|
||||
)?;
|
||||
kvtxn.set(
|
||||
numbers_bucket,
|
||||
NEXT_OPERATION.into(),
|
||||
Msgpack::to_value_buf(next_op + 1)?,
|
||||
)?;
|
||||
t.execute("INSERT INTO operations (data) VALUES (?)", params![&op])
|
||||
.context("Add operation query")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_operations(&mut self, ops: Vec<Operation>) -> anyhow::Result<()> {
|
||||
let numbers_bucket = self.numbers_bucket();
|
||||
let operations_bucket = self.operations_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
let t = self.get_txn()?;
|
||||
t.execute("DELETE FROM operations", [])
|
||||
.context("Clear all existing operations")?;
|
||||
t.execute("DELETE FROM sqlite_sequence WHERE name = 'operations'", [])
|
||||
.context("Clear all existing operations")?;
|
||||
|
||||
kvtxn.clear_db(operations_bucket)?;
|
||||
|
||||
let mut i = 0u64;
|
||||
for op in ops {
|
||||
kvtxn.set(operations_bucket, i.into(), Msgpack::to_value_buf(op)?)?;
|
||||
i += 1;
|
||||
for o in ops {
|
||||
self.add_operation(o)?;
|
||||
}
|
||||
|
||||
kvtxn.set(
|
||||
numbers_bucket,
|
||||
NEXT_OPERATION.into(),
|
||||
Msgpack::to_value_buf(i)?,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_working_set(&mut self) -> anyhow::Result<Vec<Option<Uuid>>> {
|
||||
let working_set_bucket = self.working_set_bucket();
|
||||
let numbers_bucket = self.numbers_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
let t = self.get_txn()?;
|
||||
|
||||
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
|
||||
Ok(buf) => buf.inner()?.to_serde(),
|
||||
Err(Error::NotFound) => 1,
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
let mut q = t.prepare("SELECT id, uuid FROM working_set ORDER BY id ASC")?;
|
||||
let rows = q
|
||||
.query_map([], |r| {
|
||||
let id: usize = r.get("id")?;
|
||||
let uuid: StoredUuid = r.get("uuid")?;
|
||||
Ok((id, uuid.0))
|
||||
})
|
||||
.context("Get working set query")?;
|
||||
|
||||
let mut res = Vec::with_capacity(next_index as usize);
|
||||
for _ in 0..next_index {
|
||||
res.push(None)
|
||||
let rows: Vec<Result<(usize, Uuid), _>> = rows.collect();
|
||||
let mut res = Vec::with_capacity(rows.len());
|
||||
for _ in 0..self.get_next_working_set_number().context("Getting working set number")? {
|
||||
res.push(None);
|
||||
}
|
||||
for r in rows {
|
||||
let (id, uuid) = r?;
|
||||
res[id as usize] = Some(uuid);
|
||||
}
|
||||
|
||||
for (i, u) in kvtxn.read_cursor(working_set_bucket)?.iter() {
|
||||
let i: u64 = i.into();
|
||||
res[i as usize] = Some(u.inner()?.to_serde());
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn add_to_working_set(&mut self, uuid: Uuid) -> anyhow::Result<usize> {
|
||||
let working_set_bucket = self.working_set_bucket();
|
||||
let numbers_bucket = self.numbers_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
let t = self.get_txn()?;
|
||||
|
||||
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
|
||||
Ok(buf) => buf.inner()?.to_serde(),
|
||||
Err(Error::NotFound) => 1,
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
let next_working_id = self.get_next_working_set_number()?;
|
||||
|
||||
kvtxn.set(
|
||||
working_set_bucket,
|
||||
next_index.into(),
|
||||
Msgpack::to_value_buf(uuid)?,
|
||||
)?;
|
||||
kvtxn.set(
|
||||
numbers_bucket,
|
||||
NEXT_WORKING_SET_INDEX.into(),
|
||||
Msgpack::to_value_buf(next_index + 1)?,
|
||||
)?;
|
||||
Ok(next_index as usize)
|
||||
t.execute(
|
||||
"INSERT INTO working_set (id, uuid) VALUES (?, ?)",
|
||||
params![next_working_id, &StoredUuid(uuid)],
|
||||
)
|
||||
.context("Create task query")?;
|
||||
|
||||
Ok(next_working_id)
|
||||
}
|
||||
|
||||
fn set_working_set_item(&mut self, index: usize, uuid: Option<Uuid>) -> anyhow::Result<()> {
|
||||
let working_set_bucket = self.working_set_bucket();
|
||||
let numbers_bucket = self.numbers_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
let index = index as u64;
|
||||
|
||||
let next_index = match kvtxn.get(numbers_bucket, NEXT_WORKING_SET_INDEX.into()) {
|
||||
Ok(buf) => buf.inner()?.to_serde(),
|
||||
Err(Error::NotFound) => 1,
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
if index < 1 || index >= next_index {
|
||||
anyhow::bail!("Index {} is not in the working set", index);
|
||||
let t = self.get_txn()?;
|
||||
match uuid {
|
||||
// Add or override item
|
||||
Some(uuid) => t.execute(
|
||||
"INSERT OR REPLACE INTO working_set (id, uuid) VALUES (?, ?)",
|
||||
params![index, &StoredUuid(uuid)],
|
||||
),
|
||||
// Setting to None removes the row from database
|
||||
None => t.execute("DELETE FROM working_set WHERE id = ?", [index]),
|
||||
}
|
||||
|
||||
if let Some(uuid) = uuid {
|
||||
kvtxn.set(
|
||||
working_set_bucket,
|
||||
index.into(),
|
||||
Msgpack::to_value_buf(uuid)?,
|
||||
)?;
|
||||
} else {
|
||||
match kvtxn.del(working_set_bucket, index.into()) {
|
||||
Ok(_) => {}
|
||||
Err(Error::NotFound) => {}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
}
|
||||
|
||||
.context("Set working set item query")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn clear_working_set(&mut self) -> anyhow::Result<()> {
|
||||
let working_set_bucket = self.working_set_bucket();
|
||||
let numbers_bucket = self.numbers_bucket();
|
||||
let kvtxn = self.kvtxn();
|
||||
|
||||
kvtxn.clear_db(working_set_bucket)?;
|
||||
kvtxn.set(
|
||||
numbers_bucket,
|
||||
NEXT_WORKING_SET_INDEX.into(),
|
||||
Msgpack::to_value_buf(1)?,
|
||||
)?;
|
||||
|
||||
let t = self.get_txn()?;
|
||||
t.execute("DELETE FROM working_set", [])
|
||||
.context("Clear working set query")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn commit(&mut self) -> anyhow::Result<()> {
|
||||
if let Some(kvtxn) = self.txn.take() {
|
||||
kvtxn.commit()?;
|
||||
} else {
|
||||
panic!("transaction already committed");
|
||||
}
|
||||
let t = self
|
||||
.txn
|
||||
.take()
|
||||
.ok_or(SqliteError::TransactionAlreadyCommitted)?;
|
||||
t.commit().context("Committing transaction")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -360,10 +353,58 @@ mod test {
|
||||
use crate::storage::taskmap_with;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn test_empty_dir() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let non_existant = tmp_dir.path().join("subdir");
|
||||
let mut storage = SqliteStorage::new(&non_existant)?;
|
||||
let uuid = Uuid::new_v4();
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
assert!(txn.create_task(uuid)?);
|
||||
txn.commit()?;
|
||||
}
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
let task = txn.get_task(uuid)?;
|
||||
assert_eq!(task, Some(taskmap_with(vec![])));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_transaction() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
let uuid2 = Uuid::new_v4();
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
assert!(txn.create_task(uuid1)?);
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
assert!(txn.create_task(uuid2)?);
|
||||
std::mem::drop(txn); // Unnecessary explicit drop of transaction
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
let uuids = txn.all_task_uuids()?;
|
||||
|
||||
assert_eq!(uuids, [uuid1]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid = Uuid::new_v4();
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
@@ -381,7 +422,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_create_exists() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid = Uuid::new_v4();
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
@@ -399,7 +440,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_get_missing() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid = Uuid::new_v4();
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
@@ -412,7 +453,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_set_task() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid = Uuid::new_v4();
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
@@ -433,7 +474,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_delete_task_missing() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid = Uuid::new_v4();
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
@@ -445,7 +486,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_delete_task_exists() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid = Uuid::new_v4();
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
@@ -462,7 +503,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_all_tasks_empty() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
let tasks = txn.all_tasks()?;
|
||||
@@ -474,7 +515,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_all_tasks_and_uuids() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
let uuid2 = Uuid::new_v4();
|
||||
{
|
||||
@@ -528,7 +569,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_base_version_default() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
assert_eq!(txn.base_version()?, DEFAULT_BASE_VERSION);
|
||||
@@ -539,7 +580,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_base_version_setting() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let u = Uuid::new_v4();
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
@@ -556,7 +597,7 @@ mod test {
|
||||
#[test]
|
||||
fn test_operations() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
let uuid2 = Uuid::new_v4();
|
||||
let uuid3 = Uuid::new_v4();
|
||||
@@ -620,7 +661,7 @@ mod test {
|
||||
#[test]
|
||||
fn get_working_set_empty() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
@@ -634,7 +675,7 @@ mod test {
|
||||
#[test]
|
||||
fn add_to_working_set() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
let uuid2 = Uuid::new_v4();
|
||||
|
||||
@@ -654,86 +695,10 @@ mod test {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_working_set_item() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
let uuid2 = Uuid::new_v4();
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.add_to_working_set(uuid1)?;
|
||||
txn.add_to_working_set(uuid2)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.set_working_set_item(1, Some(uuid2))?;
|
||||
txn.set_working_set_item(2, None)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
let ws = txn.get_working_set()?;
|
||||
assert_eq!(ws, vec![None, Some(uuid2), None]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_working_set_item_nonexistent() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.add_to_working_set(uuid1)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.set_working_set_item(1, None)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
// set it to None again, to check idempotency
|
||||
txn.set_working_set_item(1, None)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
let ws = txn.get_working_set()?;
|
||||
assert_eq!(ws, vec![None, None]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_working_set_item_zero() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
|
||||
let mut txn = storage.txn()?;
|
||||
assert!(txn.set_working_set_item(0, Some(uuid1)).is_err());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clear_working_set() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = KvStorage::new(&tmp_dir.path())?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
let uuid2 = Uuid::new_v4();
|
||||
|
||||
@@ -760,4 +725,53 @@ mod test {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn set_working_set_item() -> anyhow::Result<()> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
let mut storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||
let uuid1 = Uuid::new_v4();
|
||||
let uuid2 = Uuid::new_v4();
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.add_to_working_set(uuid1)?;
|
||||
txn.add_to_working_set(uuid2)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
let ws = txn.get_working_set()?;
|
||||
assert_eq!(ws, vec![None, Some(uuid1), Some(uuid2)]);
|
||||
}
|
||||
|
||||
// Clear one item
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.set_working_set_item(1, None)?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
let ws = txn.get_working_set()?;
|
||||
assert_eq!(ws, vec![None, None, Some(uuid2)]);
|
||||
}
|
||||
|
||||
// Override item
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
txn.set_working_set_item(2, Some(uuid1))?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
{
|
||||
let mut txn = storage.txn()?;
|
||||
let ws = txn.get_working_set()?;
|
||||
assert_eq!(ws, vec![None, None, Some(uuid1)]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user