Very WIP (i.e broken) start of SQLite storage in server
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -2228,8 +2228,11 @@ dependencies = [
|
|||||||
"futures",
|
"futures",
|
||||||
"kv",
|
"kv",
|
||||||
"log",
|
"log",
|
||||||
|
"rusqlite",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_json",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
|
"thiserror",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -10,12 +10,15 @@ edition = "2018"
|
|||||||
uuid = { version = "^0.8.2", features = ["serde", "v4"] }
|
uuid = { version = "^0.8.2", features = ["serde", "v4"] }
|
||||||
actix-web = "^3.3.2"
|
actix-web = "^3.3.2"
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
|
thiserror = "1.0"
|
||||||
futures = "^0.3.8"
|
futures = "^0.3.8"
|
||||||
serde = "^1.0.125"
|
serde = "^1.0.125"
|
||||||
|
serde_json = "^1.0"
|
||||||
kv = {version = "^0.10.0", features = ["msgpack-value"]}
|
kv = {version = "^0.10.0", features = ["msgpack-value"]}
|
||||||
clap = "^2.33.0"
|
clap = "^2.33.0"
|
||||||
log = "^0.4.14"
|
log = "^0.4.14"
|
||||||
env_logger = "^0.8.3"
|
env_logger = "^0.8.3"
|
||||||
|
rusqlite = { version = "0.25", features = ["bundled"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "^2.2.0"
|
actix-rt = "^2.2.0"
|
||||||
|
|||||||
@@ -6,6 +6,9 @@ mod inmemory;
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) use inmemory::InMemoryStorage;
|
pub(crate) use inmemory::InMemoryStorage;
|
||||||
|
|
||||||
|
mod sqlite;
|
||||||
|
pub(crate) use self::sqlite::SqliteStorage;
|
||||||
|
|
||||||
mod kv;
|
mod kv;
|
||||||
pub(crate) use self::kv::KvStorage;
|
pub(crate) use self::kv::KvStorage;
|
||||||
|
|
||||||
|
|||||||
270
sync-server/src/storage/sqlite.rs
Normal file
270
sync-server/src/storage/sqlite.rs
Normal file
@@ -0,0 +1,270 @@
|
|||||||
|
use super::{Client, Storage, StorageTxn, Uuid, Version};
|
||||||
|
use rusqlite::types::{FromSql, ToSql};
|
||||||
|
use rusqlite::{params, Connection, OptionalExtension};
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use anyhow::Context;
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
enum SqliteError {
|
||||||
|
#[error("SQLite transaction already committted")]
|
||||||
|
TransactionAlreadyCommitted,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Newtype to allow implementing `FromSql` for foreign `uuid::Uuid`
|
||||||
|
struct StoredUuid(Uuid);
|
||||||
|
|
||||||
|
/// Conversion from Uuid stored as a string (rusqlite's uuid feature stores as binary blob)
|
||||||
|
impl FromSql for StoredUuid {
|
||||||
|
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
|
||||||
|
let u = Uuid::parse_str(value.as_str()?)
|
||||||
|
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
|
||||||
|
Ok(StoredUuid(u))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store Uuid as string in database
|
||||||
|
impl ToSql for StoredUuid {
|
||||||
|
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
|
||||||
|
let s = self.0.to_string();
|
||||||
|
Ok(s.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stores [`Client`] in SQLite
|
||||||
|
impl FromSql for Client {
|
||||||
|
fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
|
||||||
|
let o: Client = serde_json::from_str(value.as_str()?)
|
||||||
|
.map_err(|_| rusqlite::types::FromSqlError::InvalidType)?;
|
||||||
|
Ok(o)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parsers Operation stored as JSON in string column
|
||||||
|
impl ToSql for Client {
|
||||||
|
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
|
||||||
|
let s = serde_json::to_string(&self)
|
||||||
|
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
|
||||||
|
Ok(s.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/// DB Key for versions: concatenation of client_key and parent_version_id
|
||||||
|
type VersionDbKey = [u8; 32];
|
||||||
|
|
||||||
|
fn version_db_key(client_key: Uuid, parent_version_id: Uuid) -> VersionDbKey {
|
||||||
|
let mut key = [0u8; 32];
|
||||||
|
key[..16].clone_from_slice(client_key.as_bytes());
|
||||||
|
key[16..].clone_from_slice(parent_version_id.as_bytes());
|
||||||
|
key
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Key for clients: just the client_key
|
||||||
|
type ClientDbKey = [u8; 16];
|
||||||
|
|
||||||
|
fn client_db_key(client_key: Uuid) -> ClientDbKey {
|
||||||
|
*client_key.as_bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An on-disk storage backend which uses SQLite
|
||||||
|
pub(crate) struct SqliteStorage {
|
||||||
|
db_file: std::path::PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SqliteStorage {
|
||||||
|
fn new_connection(&self) -> anyhow::Result<Connection> {
|
||||||
|
Ok(Connection::open(&self.db_file)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new<P: AsRef<Path>>(directory: P) -> anyhow::Result<SqliteStorage> {
|
||||||
|
let db_file = directory.as_ref().join("taskchampion-sync-server.sqlite3");
|
||||||
|
|
||||||
|
let o = SqliteStorage { db_file };
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut con = o.new_connection()?;
|
||||||
|
let txn = con.transaction()?;
|
||||||
|
|
||||||
|
let queries = vec![
|
||||||
|
"CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY AUTOINCREMENT, data STRING);",
|
||||||
|
];
|
||||||
|
for q in queries {
|
||||||
|
txn.execute(q, []).context("Creating table")?;
|
||||||
|
}
|
||||||
|
txn.commit()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(o)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Storage for SqliteStorage {
|
||||||
|
fn txn<'a>(&'a self) -> anyhow::Result<Box<dyn StorageTxn + 'a>> {
|
||||||
|
let mut con = self.new_connection()?;
|
||||||
|
let mut t = Txn{con, txn: None};
|
||||||
|
Ok(Box::new(t))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Txn<'t> {
|
||||||
|
con: Connection,
|
||||||
|
txn: Option<&'t rusqlite::Transaction<'t>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <'t>Txn<'t> {
|
||||||
|
fn get_txn(&mut self) -> Result<&'t rusqlite::Transaction, SqliteError> {
|
||||||
|
Ok(&self.con.transaction().unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl <'t>StorageTxn for Txn<'t> {
|
||||||
|
fn get_client(&mut self, client_key: Uuid) -> anyhow::Result<Option<Client>> {
|
||||||
|
let t = self.get_txn()?;
|
||||||
|
let result: Option<Client> = t
|
||||||
|
.query_row(
|
||||||
|
"SELECT data FROM clients WHERE client_key = ? LIMIT 1",
|
||||||
|
[&StoredUuid(client_key)],
|
||||||
|
|r| r.get("data"),
|
||||||
|
)
|
||||||
|
.optional()?;
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> {
|
||||||
|
let t = self.get_txn()?;
|
||||||
|
|
||||||
|
let client = Client{ latest_version_id };
|
||||||
|
t.execute(
|
||||||
|
"INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)",
|
||||||
|
params![&StoredUuid(latest_version_id), &client],
|
||||||
|
)
|
||||||
|
.context("Create client query")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_client_latest_version_id(
|
||||||
|
&mut self,
|
||||||
|
client_key: Uuid,
|
||||||
|
latest_version_id: Uuid,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// Implementation is same as new_client
|
||||||
|
self.new_client(client_key, latest_version_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_version_by_parent(
|
||||||
|
&mut self,
|
||||||
|
client_key: Uuid,
|
||||||
|
parent_version_id: Uuid,
|
||||||
|
) -> anyhow::Result<Option<Version>> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_version(
|
||||||
|
&mut self,
|
||||||
|
client_key: Uuid,
|
||||||
|
version_id: Uuid,
|
||||||
|
parent_version_id: Uuid,
|
||||||
|
history_segment: Vec<u8>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let version = Version {
|
||||||
|
version_id,
|
||||||
|
parent_version_id,
|
||||||
|
history_segment,
|
||||||
|
};
|
||||||
|
todo!();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn commit(&mut self) -> anyhow::Result<()> {
|
||||||
|
let t = self
|
||||||
|
.txn
|
||||||
|
.take()
|
||||||
|
.ok_or(SqliteError::TransactionAlreadyCommitted)?;
|
||||||
|
t.commit().context("Committing transaction")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_client_empty() -> anyhow::Result<()> {
|
||||||
|
let tmp_dir = TempDir::new()?;
|
||||||
|
let storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||||
|
let mut txn = storage.txn()?;
|
||||||
|
let maybe_client = txn.get_client(Uuid::new_v4())?;
|
||||||
|
assert!(maybe_client.is_none());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_client_storage() -> anyhow::Result<()> {
|
||||||
|
let tmp_dir = TempDir::new()?;
|
||||||
|
let storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||||
|
let mut txn = storage.txn()?;
|
||||||
|
|
||||||
|
let client_key = Uuid::new_v4();
|
||||||
|
let latest_version_id = Uuid::new_v4();
|
||||||
|
txn.new_client(client_key, latest_version_id)?;
|
||||||
|
|
||||||
|
let client = txn.get_client(client_key)?.unwrap();
|
||||||
|
assert_eq!(client.latest_version_id, latest_version_id);
|
||||||
|
|
||||||
|
let latest_version_id = Uuid::new_v4();
|
||||||
|
txn.set_client_latest_version_id(client_key, latest_version_id)?;
|
||||||
|
|
||||||
|
let client = txn.get_client(client_key)?.unwrap();
|
||||||
|
assert_eq!(client.latest_version_id, latest_version_id);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_gvbp_empty() -> anyhow::Result<()> {
|
||||||
|
let tmp_dir = TempDir::new()?;
|
||||||
|
let storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||||
|
let mut txn = storage.txn()?;
|
||||||
|
let maybe_version = txn.get_version_by_parent(Uuid::new_v4(), Uuid::new_v4())?;
|
||||||
|
assert!(maybe_version.is_none());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add_version_and_gvbp() -> anyhow::Result<()> {
|
||||||
|
let tmp_dir = TempDir::new()?;
|
||||||
|
let storage = SqliteStorage::new(&tmp_dir.path())?;
|
||||||
|
let mut txn = storage.txn()?;
|
||||||
|
|
||||||
|
let client_key = Uuid::new_v4();
|
||||||
|
let version_id = Uuid::new_v4();
|
||||||
|
let parent_version_id = Uuid::new_v4();
|
||||||
|
let history_segment = b"abc".to_vec();
|
||||||
|
txn.add_version(
|
||||||
|
client_key,
|
||||||
|
version_id,
|
||||||
|
parent_version_id,
|
||||||
|
history_segment.clone(),
|
||||||
|
)?;
|
||||||
|
let version = txn
|
||||||
|
.get_version_by_parent(client_key, parent_version_id)?
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
version,
|
||||||
|
Version {
|
||||||
|
version_id,
|
||||||
|
parent_version_id,
|
||||||
|
history_segment,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user