From 027225d2a3a62dd5443776aa1c0f4820a61cbdde Mon Sep 17 00:00:00 2001 From: dbr Date: Sat, 8 May 2021 22:09:48 +1000 Subject: [PATCH] More WIP. Workaround for !Send SQLite causing problems as each get_txn creates a new transaction --- sync-server/src/storage/sqlite.rs | 51 ++++++++++++++++--------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/sync-server/src/storage/sqlite.rs b/sync-server/src/storage/sqlite.rs index 98ac5feab..81343b35d 100644 --- a/sync-server/src/storage/sqlite.rs +++ b/sync-server/src/storage/sqlite.rs @@ -1,9 +1,9 @@ use super::{Client, Storage, StorageTxn, Uuid, Version}; +use anyhow::Context; use rusqlite::types::{FromSql, ToSql}; use rusqlite::{params, Connection, OptionalExtension}; use std::path::Path; use std::sync::{Arc, Mutex}; -use anyhow::Context; #[derive(Debug, thiserror::Error)] enum SqliteError { @@ -11,7 +11,6 @@ enum SqliteError { TransactionAlreadyCommitted, } - /// Newtype to allow implementing `FromSql` for foreign `uuid::Uuid` struct StoredUuid(Uuid); @@ -50,8 +49,6 @@ impl ToSql for Client { } } - - /// DB Key for versions: concatenation of client_key and parent_version_id type VersionDbKey = [u8; 32]; @@ -89,7 +86,8 @@ impl SqliteStorage { let txn = con.transaction()?; let queries = vec![ - "CREATE TABLE IF NOT EXISTS clients (id INTEGER PRIMARY KEY AUTOINCREMENT, data STRING);", + "CREATE TABLE IF NOT EXISTS clients (client_key STRING PRIMARY KEY, latest_version_id STRING);", + "CREATE TABLE IF NOT EXISTS versions (id STRING PRIMARY KEY, client_key STRING, parent STRING, history_segment STRING);", ]; for q in queries { txn.execute(q, []).context("Creating table")?; @@ -104,7 +102,7 @@ impl SqliteStorage { impl Storage for SqliteStorage { fn txn<'a>(&'a self) -> anyhow::Result> { let con = self.new_connection()?; - let t = Txn{con, txn: None}; + let t = Txn { con, txn: None }; Ok(Box::new(t)) } } @@ -114,23 +112,28 @@ struct Txn<'t> { txn: Option>, } -impl <'t>Txn<'t> { +impl<'t> Txn<'t> { fn get_txn(&mut self) -> Result { Ok(self.con.transaction().unwrap()) } } - -impl <'t>StorageTxn for Txn<'t> { +impl<'t> StorageTxn for Txn<'t> { fn get_client(&mut self, client_key: Uuid) -> anyhow::Result> { let t = self.get_txn()?; let result: Option = t .query_row( - "SELECT data FROM clients WHERE client_key = ? LIMIT 1", + "SELECT latest_version_id FROM clients WHERE client_key = ? LIMIT 1", [&StoredUuid(client_key)], - |r| r.get("data"), + |r| { + let latest_version_id: StoredUuid = r.get(0)?; + Ok(Client { + latest_version_id: latest_version_id.0, + }) + }, ) - .optional()?; + .optional() + .context("Get client query")?; Ok(result) } @@ -138,10 +141,9 @@ impl <'t>StorageTxn for Txn<'t> { fn new_client(&mut self, client_key: Uuid, latest_version_id: Uuid) -> anyhow::Result<()> { let t = self.get_txn()?; - let client = Client{ latest_version_id }; t.execute( "INSERT OR REPLACE INTO clients (client_key, latest_version_id) VALUES (?, ?)", - params![&StoredUuid(latest_version_id), &client], + params![&StoredUuid(client_key), &StoredUuid(latest_version_id)], ) .context("Create client query")?; Ok(()) @@ -173,16 +175,16 @@ impl <'t>StorageTxn for Txn<'t> { ) -> anyhow::Result<()> { let t = self.get_txn()?; - let version = Version { - version_id, - parent_version_id, - history_segment, - }; - t.execute( - "INSERT INTO versions (client_key, id, parent, history_segment)", - params![], - ).context("Add version query")?; + "INSERT INTO versions (id, client_key, parent, history_segment) VALUES(?, ?, ?, ?)", + params![ + StoredUuid(version_id), + StoredUuid(client_key), + StoredUuid(parent_version_id), + history_segment + ], + ) + .context("Add version query")?; Ok(()) } @@ -191,8 +193,7 @@ impl <'t>StorageTxn for Txn<'t> { let t: rusqlite::Transaction = self .txn .take() - .unwrap(); - //.ok_or(SqliteError::TransactionAlreadyCommitted)?; + .ok_or(SqliteError::TransactionAlreadyCommitted)?; t.commit().context("Committing transaction")?; Ok(()) }