From 4145a469f8eb34686723be946d53edec44b040c2 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Tue, 4 Nov 2025 17:11:10 -0800 Subject: [PATCH 1/2] refactor: models/batch/schema/diesel_ext -> db Issue STOR-403 --- syncstorage-mysql/src/{ => db}/batch_commit.sql | 0 syncstorage-mysql/src/{batch.rs => db/batch_impl.rs} | 3 +-- syncstorage-mysql/src/{models.rs => db/db_impl.rs} | 10 +++++++--- syncstorage-mysql/src/{ => db}/diesel_ext.rs | 0 syncstorage-mysql/src/{ => db}/schema.rs | 0 syncstorage-mysql/src/lib.rs | 7 ++----- syncstorage-mysql/src/pool.rs | 4 ++-- syncstorage-mysql/src/test.rs | 2 +- syncstorage-spanner/src/db/mod.rs | 12 ++++++------ 9 files changed, 19 insertions(+), 19 deletions(-) rename syncstorage-mysql/src/{ => db}/batch_commit.sql (100%) rename syncstorage-mysql/src/{batch.rs => db/batch_impl.rs} (99%) rename syncstorage-mysql/src/{models.rs => db/db_impl.rs} (99%) rename syncstorage-mysql/src/{ => db}/diesel_ext.rs (100%) rename syncstorage-mysql/src/{ => db}/schema.rs (100%) diff --git a/syncstorage-mysql/src/batch_commit.sql b/syncstorage-mysql/src/db/batch_commit.sql similarity index 100% rename from syncstorage-mysql/src/batch_commit.sql rename to syncstorage-mysql/src/db/batch_commit.sql diff --git a/syncstorage-mysql/src/batch.rs b/syncstorage-mysql/src/db/batch_impl.rs similarity index 99% rename from syncstorage-mysql/src/batch.rs rename to syncstorage-mysql/src/db/batch_impl.rs index 8dfc7bac..41c7d762 100644 --- a/syncstorage-mysql/src/batch.rs +++ b/syncstorage-mysql/src/db/batch_impl.rs @@ -14,9 +14,8 @@ use diesel_async::RunQueryDsl; use syncstorage_db_common::{params, results, UserIdentifier, BATCH_LIFETIME}; use super::{ - models::MysqlDb, schema::{batch_upload_items, batch_uploads}, - DbError, DbResult, + DbError, DbResult, MysqlDb, }; const MAX_TTL: i32 = 2_100_000_000; diff --git a/syncstorage-mysql/src/models.rs b/syncstorage-mysql/src/db/db_impl.rs similarity index 99% rename from syncstorage-mysql/src/models.rs rename to syncstorage-mysql/src/db/db_impl.rs index ddcb73a1..c422bf81 100644 --- a/syncstorage-mysql/src/models.rs +++ b/syncstorage-mysql/src/db/db_impl.rs @@ -18,12 +18,16 @@ use syncstorage_db_common::{ use syncstorage_settings::{Quota, DEFAULT_MAX_TOTAL_RECORDS}; use super::{ - batch, diesel_ext::LockInShareModeDsl, pool::{CollectionCache, Conn}, - schema::{bso, collections, user_collections}, DbError, DbResult, }; +use schema::{bso, collections, user_collections}; + +mod batch_impl; +pub(crate) mod schema; + +pub use batch_impl::validate_batch_id; // this is the max number of records we will return. static DEFAULT_LIMIT: u32 = DEFAULT_MAX_TOTAL_RECORDS; @@ -941,7 +945,7 @@ impl MysqlDb { batch_db_method!(delete_batch, delete, DeleteBatch); async fn get_batch(&mut self, params: params::GetBatch) -> DbResult> { - batch::get(self, params).await + batch_impl::get(self, params).await } pub(super) fn timestamp(&self) -> SyncTimestamp { diff --git a/syncstorage-mysql/src/diesel_ext.rs b/syncstorage-mysql/src/db/diesel_ext.rs similarity index 100% rename from syncstorage-mysql/src/diesel_ext.rs rename to syncstorage-mysql/src/db/diesel_ext.rs diff --git a/syncstorage-mysql/src/schema.rs b/syncstorage-mysql/src/db/schema.rs similarity index 100% rename from syncstorage-mysql/src/schema.rs rename to syncstorage-mysql/src/db/schema.rs diff --git a/syncstorage-mysql/src/lib.rs b/syncstorage-mysql/src/lib.rs index 5dff416f..ae98bb74 100644 --- a/syncstorage-mysql/src/lib.rs +++ b/syncstorage-mysql/src/lib.rs @@ -6,15 +6,12 @@ extern crate diesel_migrations; extern crate slog_scope; #[macro_use] -mod batch; -mod diesel_ext; -mod models; +mod db; mod pool; -mod schema; #[cfg(test)] mod test; -pub use models::MysqlDb; +pub use db::MysqlDb; pub use pool::MysqlDbPool; pub use syncstorage_db_common::diesel::DbError; diff --git a/syncstorage-mysql/src/pool.rs b/syncstorage-mysql/src/pool.rs index 495d2c1b..0ed9d5b1 100644 --- a/syncstorage-mysql/src/pool.rs +++ b/syncstorage-mysql/src/pool.rs @@ -28,7 +28,7 @@ use syncstorage_db_common::{Db, DbPool, STD_COLLS}; use syncstorage_settings::{Quota, Settings}; use tokio::task::spawn_blocking; -use super::{models::MysqlDb, DbError, DbResult}; +use super::{db::MysqlDb, DbError, DbResult}; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); @@ -171,7 +171,7 @@ impl DbPool for MysqlDbPool { } fn validate_batch_id(&self, id: String) -> DbResult<()> { - super::batch::validate_batch_id(&id) + super::db::validate_batch_id(&id) } fn box_clone(&self) -> Box> { diff --git a/syncstorage-mysql/src/test.rs b/syncstorage-mysql/src/test.rs index 9a6a3ac6..233d28e6 100644 --- a/syncstorage-mysql/src/test.rs +++ b/syncstorage-mysql/src/test.rs @@ -12,7 +12,7 @@ use syncstorage_db_common::DbPool; use syncstorage_settings::Settings as SyncstorageSettings; use url::Url; -use crate::{models::MysqlDb, pool::MysqlDbPool, schema::collections, DbResult}; +use crate::{db::{schema::collections, MysqlDb}, pool::MysqlDbPool, DbResult}; async fn db(settings: &SyncstorageSettings) -> DbResult { let _ = env_logger::try_init(); diff --git a/syncstorage-spanner/src/db/mod.rs b/syncstorage-spanner/src/db/mod.rs index ed377302..bdfc767f 100644 --- a/syncstorage-spanner/src/db/mod.rs +++ b/syncstorage-spanner/src/db/mod.rs @@ -33,12 +33,6 @@ use support::{ StreamedResultSetAsync, }; -#[derive(Debug, Eq, PartialEq)] -enum CollectionLock { - Read, - Write, -} - mod batch_impl; mod db_impl; mod stream; @@ -46,6 +40,12 @@ pub(crate) mod support; pub use batch_impl::validate_batch_id; +#[derive(Debug, Eq, PartialEq)] +enum CollectionLock { + Read, + Write, +} + const TOMBSTONE: i32 = 0; pub const PRETOUCH_TS: &str = "0001-01-01T00:00:00.00Z"; From 7ecab8bb3967ab7f85352e58c286b4bfa71ebdfe Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Tue, 4 Nov 2025 17:56:28 -0800 Subject: [PATCH 2/2] refactor: divvy up the db/batch impls into their own mods Closes STOR-403 --- syncstorage-mysql/src/db/batch_impl.rs | 310 ++++++++-------- syncstorage-mysql/src/db/db_impl.rs | 491 +++---------------------- syncstorage-mysql/src/db/mod.rs | 231 ++++++++++++ syncstorage-mysql/src/test.rs | 8 +- 4 files changed, 431 insertions(+), 609 deletions(-) create mode 100644 syncstorage-mysql/src/db/mod.rs diff --git a/syncstorage-mysql/src/db/batch_impl.rs b/syncstorage-mysql/src/db/batch_impl.rs index 41c7d762..ea918adf 100644 --- a/syncstorage-mysql/src/db/batch_impl.rs +++ b/syncstorage-mysql/src/db/batch_impl.rs @@ -1,6 +1,7 @@ use base64::Engine; use std::collections::HashSet; +use async_trait::async_trait; use diesel::{ self, dsl::sql, @@ -11,181 +12,182 @@ use diesel::{ ExpressionMethods, OptionalExtension, QueryDsl, }; use diesel_async::RunQueryDsl; -use syncstorage_db_common::{params, results, UserIdentifier, BATCH_LIFETIME}; +use syncstorage_db_common::{params, results, BatchDb, Db, UserIdentifier, BATCH_LIFETIME}; use super::{ schema::{batch_upload_items, batch_uploads}, - DbError, DbResult, MysqlDb, + MysqlDb, }; +use crate::{DbError, DbResult}; const MAX_TTL: i32 = 2_100_000_000; const MAX_BATCH_CREATE_RETRY: u8 = 5; -pub async fn create( - db: &mut MysqlDb, - params: params::CreateBatch, -) -> DbResult { - let user_id = params.user_id.legacy_id as i64; - let collection_id = db.get_collection_id(¶ms.collection).await?; - // Careful, there's some weirdness here! - // - // Sync timestamps are in seconds and quantized to two decimal places, so - // when we convert one to a bigint in milliseconds, the final digit is - // always zero. But we want to use the lower digits of the batchid for - // sharding writes via (batchid % num_tables), and leaving it as zero would - // skew the sharding distribution. - // - // So we mix in the lowest digit of the uid to improve the distribution - // while still letting us treat these ids as millisecond timestamps. It's - // yuck, but it works and it keeps the weirdness contained to this single - // line of code. - let mut batch_id = db.timestamp().as_i64() + (user_id % 10); - // Occasionally batch_ids clash (usually during unit testing), so also - // retry w/ increments - for i in 1..=MAX_BATCH_CREATE_RETRY { - let result = insert_into(batch_uploads::table) - .values(( - batch_uploads::batch_id.eq(&batch_id), - batch_uploads::user_id.eq(&user_id), - batch_uploads::collection_id.eq(&collection_id), - )) - .execute(&mut db.conn) - .await; - match result { - Ok(_) => break, - Err(DieselError::DatabaseError(UniqueViolation, _)) => { - if i == MAX_BATCH_CREATE_RETRY { - return Err(DbError::conflict()); +#[async_trait(?Send)] +impl BatchDb for MysqlDb { + type Error = DbError; + + async fn create_batch( + &mut self, + params: params::CreateBatch, + ) -> DbResult { + let user_id = params.user_id.legacy_id as i64; + let collection_id = self.get_collection_id(¶ms.collection).await?; + // Careful, there's some weirdness here! + // + // Sync timestamps are in seconds and quantized to two decimal places, so + // when we convert one to a bigint in milliseconds, the final digit is + // always zero. But we want to use the lower digits of the batchid for + // sharding writes via (batchid % num_tables), and leaving it as zero would + // skew the sharding distribution. + // + // So we mix in the lowest digit of the uid to improve the distribution + // while still letting us treat these ids as millisecond timestamps. It's + // yuck, but it works and it keeps the weirdness contained to this single + // line of code. + let mut batch_id = self.timestamp().as_i64() + (user_id % 10); + // Occasionally batch_ids clash (usually during unit testing), so also + // retry w/ increments + for i in 1..=MAX_BATCH_CREATE_RETRY { + let result = insert_into(batch_uploads::table) + .values(( + batch_uploads::batch_id.eq(&batch_id), + batch_uploads::user_id.eq(&user_id), + batch_uploads::collection_id.eq(&collection_id), + )) + .execute(&mut self.conn) + .await; + match result { + Ok(_) => break, + Err(DieselError::DatabaseError(UniqueViolation, _)) => { + if i == MAX_BATCH_CREATE_RETRY { + return Err(DbError::conflict()); + } + batch_id += 1; } - batch_id += 1; + Err(e) => return Err(e.into()), } - Err(e) => return Err(e.into()), } + + do_append(self, batch_id, params.user_id, collection_id, params.bsos).await?; + Ok(results::CreateBatch { + id: encode_id(batch_id), + size: None, + }) } - do_append(db, batch_id, params.user_id, collection_id, params.bsos).await?; - Ok(results::CreateBatch { - id: encode_id(batch_id), - size: None, - }) -} + async fn validate_batch(&mut self, params: params::ValidateBatch) -> DbResult { + let batch_id = decode_id(¶ms.id)?; + // Avoid hitting the db for batches that are obviously too old. Recall + // that the batchid is a millisecond timestamp. + if (batch_id + BATCH_LIFETIME) < self.timestamp().as_i64() { + return Ok(false); + } -pub async fn validate(db: &mut MysqlDb, params: params::ValidateBatch) -> DbResult { - let batch_id = decode_id(¶ms.id)?; - // Avoid hitting the db for batches that are obviously too old. Recall - // that the batchid is a millisecond timestamp. - if (batch_id + BATCH_LIFETIME) < db.timestamp().as_i64() { - return Ok(false); + let user_id = params.user_id.legacy_id as i64; + let collection_id = self.get_collection_id(¶ms.collection).await?; + let exists = batch_uploads::table + .select(sql::("1")) + .filter(batch_uploads::batch_id.eq(&batch_id)) + .filter(batch_uploads::user_id.eq(&user_id)) + .filter(batch_uploads::collection_id.eq(&collection_id)) + .get_result::(&mut self.conn) + .await + .optional()?; + Ok(exists.is_some()) } - let user_id = params.user_id.legacy_id as i64; - let collection_id = db.get_collection_id(¶ms.collection).await?; - let exists = batch_uploads::table - .select(sql::("1")) - .filter(batch_uploads::batch_id.eq(&batch_id)) - .filter(batch_uploads::user_id.eq(&user_id)) - .filter(batch_uploads::collection_id.eq(&collection_id)) - .get_result::(&mut db.conn) - .await - .optional()?; - Ok(exists.is_some()) -} + async fn append_to_batch(&mut self, params: params::AppendToBatch) -> DbResult<()> { + let exists = self + .validate_batch(params::ValidateBatch { + user_id: params.user_id.clone(), + collection: params.collection.clone(), + id: params.batch.id.clone(), + }) + .await?; -pub async fn append(db: &mut MysqlDb, params: params::AppendToBatch) -> DbResult<()> { - let exists = validate( - db, - params::ValidateBatch { + if !exists { + return Err(DbError::batch_not_found()); + } + + let batch_id = decode_id(¶ms.batch.id)?; + let collection_id = self.get_collection_id(¶ms.collection).await?; + do_append(self, batch_id, params.user_id, collection_id, params.bsos).await?; + Ok(()) + } + + async fn get_batch(&mut self, params: params::GetBatch) -> DbResult> { + let is_valid = self + .validate_batch(params::ValidateBatch { + user_id: params.user_id, + collection: params.collection, + id: params.id.clone(), + }) + .await?; + let batch = if is_valid { + Some(results::GetBatch { id: params.id }) + } else { + None + }; + Ok(batch) + } + + async fn delete_batch(&mut self, params: params::DeleteBatch) -> DbResult<()> { + let batch_id = decode_id(¶ms.id)?; + let user_id = params.user_id.legacy_id as i64; + let collection_id = self.get_collection_id(¶ms.collection).await?; + diesel::delete(batch_uploads::table) + .filter(batch_uploads::batch_id.eq(&batch_id)) + .filter(batch_uploads::user_id.eq(&user_id)) + .filter(batch_uploads::collection_id.eq(&collection_id)) + .execute(&mut self.conn) + .await?; + diesel::delete(batch_upload_items::table) + .filter(batch_upload_items::batch_id.eq(&batch_id)) + .filter(batch_upload_items::user_id.eq(&user_id)) + .execute(&mut self.conn) + .await?; + Ok(()) + } + + /// Commits a batch to the bsos table, deleting the batch when succesful + async fn commit_batch( + &mut self, + params: params::CommitBatch, + ) -> DbResult { + let batch_id = decode_id(¶ms.batch.id)?; + let user_id = params.user_id.legacy_id as i64; + let collection_id = self.get_collection_id(¶ms.collection).await?; + let timestamp = self.timestamp(); + sql_query(include_str!("batch_commit.sql")) + .bind::(user_id) + .bind::(&collection_id) + .bind::(&self.timestamp().as_i64()) + .bind::(&self.timestamp().as_i64()) + .bind::((MAX_TTL as i64) * 1000) // XXX: + .bind::(&batch_id) + .bind::(user_id) + .bind::(&self.timestamp().as_i64()) + .bind::(&self.timestamp().as_i64()) + .execute(&mut self.conn) + .await?; + + self.update_collection(params::UpdateCollection { user_id: params.user_id.clone(), + collection_id, collection: params.collection.clone(), - id: params.batch.id.clone(), - }, - ) - .await?; - - if !exists { - return Err(DbError::batch_not_found()); - } - - let batch_id = decode_id(¶ms.batch.id)?; - let collection_id = db.get_collection_id(¶ms.collection).await?; - do_append(db, batch_id, params.user_id, collection_id, params.bsos).await?; - Ok(()) -} - -pub async fn get( - db: &mut MysqlDb, - params: params::GetBatch, -) -> DbResult> { - let is_valid = validate( - db, - params::ValidateBatch { - user_id: params.user_id, - collection: params.collection, - id: params.id.clone(), - }, - ) - .await?; - let batch = if is_valid { - Some(results::GetBatch { id: params.id }) - } else { - None - }; - Ok(batch) -} - -pub async fn delete(db: &mut MysqlDb, params: params::DeleteBatch) -> DbResult<()> { - let batch_id = decode_id(¶ms.id)?; - let user_id = params.user_id.legacy_id as i64; - let collection_id = db.get_collection_id(¶ms.collection).await?; - diesel::delete(batch_uploads::table) - .filter(batch_uploads::batch_id.eq(&batch_id)) - .filter(batch_uploads::user_id.eq(&user_id)) - .filter(batch_uploads::collection_id.eq(&collection_id)) - .execute(&mut db.conn) - .await?; - diesel::delete(batch_upload_items::table) - .filter(batch_upload_items::batch_id.eq(&batch_id)) - .filter(batch_upload_items::user_id.eq(&user_id)) - .execute(&mut db.conn) - .await?; - Ok(()) -} - -/// Commits a batch to the bsos table, deleting the batch when succesful -pub async fn commit( - db: &mut MysqlDb, - params: params::CommitBatch, -) -> DbResult { - let batch_id = decode_id(¶ms.batch.id)?; - let user_id = params.user_id.legacy_id as i64; - let collection_id = db.get_collection_id(¶ms.collection).await?; - let timestamp = db.timestamp(); - sql_query(include_str!("batch_commit.sql")) - .bind::(user_id) - .bind::(&collection_id) - .bind::(&db.timestamp().as_i64()) - .bind::(&db.timestamp().as_i64()) - .bind::((MAX_TTL as i64) * 1000) // XXX: - .bind::(&batch_id) - .bind::(user_id) - .bind::(&db.timestamp().as_i64()) - .bind::(&db.timestamp().as_i64()) - .execute(&mut db.conn) + }) .await?; - db.update_collection(user_id as u32, collection_id).await?; - - delete( - db, - params::DeleteBatch { + self.delete_batch(params::DeleteBatch { user_id: params.user_id, collection: params.collection, id: params.batch.id, - }, - ) - .await?; - Ok(timestamp) + }) + .await?; + Ok(timestamp) + } } pub async fn do_append( @@ -297,11 +299,3 @@ fn decode_id(id: &str) -> DbResult { .parse::() .map_err(|e| DbError::internal(format!("Invalid batch_id: {}", e))) } - -macro_rules! batch_db_method { - ($name:ident, $batch_name:ident, $type:ident) => { - pub async fn $name(&mut self, params: params::$type) -> DbResult { - batch::$batch_name(self, params).await - } - }; -} diff --git a/syncstorage-mysql/src/db/db_impl.rs b/syncstorage-mysql/src/db/db_impl.rs index c422bf81..8f692757 100644 --- a/syncstorage-mysql/src/db/db_impl.rs +++ b/syncstorage-mysql/src/db/db_impl.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt, sync::Arc}; +use std::collections::HashMap; use async_trait::async_trait; use diesel::{ @@ -10,95 +10,25 @@ use diesel::{ ExpressionMethods, OptionalExtension, QueryDsl, }; use diesel_async::{AsyncConnection, RunQueryDsl, TransactionManager}; -use syncserver_common::Metrics; use syncstorage_db_common::{ - error::DbErrorIntrospect, params, results, util::SyncTimestamp, BatchDb, Db, Sorting, - UserIdentifier, DEFAULT_BSO_TTL, + error::DbErrorIntrospect, params, results, util::SyncTimestamp, Db, Sorting, UserIdentifier, + DEFAULT_BSO_TTL, }; use syncstorage_settings::{Quota, DEFAULT_MAX_TOTAL_RECORDS}; use super::{ diesel_ext::LockInShareModeDsl, - pool::{CollectionCache, Conn}, - DbError, DbResult, + schema::{bso, user_collections}, + CollectionLock, MysqlDb, COLLECTION_ID, COUNT, EXPIRY, LAST_MODIFIED, MODIFIED, TOMBSTONE, + TOTAL_BYTES, USER_ID, }; -use schema::{bso, collections, user_collections}; - -mod batch_impl; -pub(crate) mod schema; - -pub use batch_impl::validate_batch_id; +use crate::{pool::Conn, DbError, DbResult}; // this is the max number of records we will return. static DEFAULT_LIMIT: u32 = DEFAULT_MAX_TOTAL_RECORDS; -const TOMBSTONE: i32 = 0; -/// SQL Variable remapping -/// These names are the legacy values mapped to the new names. -const COLLECTION_ID: &str = "collection"; -const USER_ID: &str = "userid"; -const MODIFIED: &str = "modified"; -const EXPIRY: &str = "ttl"; -const LAST_MODIFIED: &str = "last_modified"; -const COUNT: &str = "count"; -const TOTAL_BYTES: &str = "total_bytes"; - -#[derive(Debug)] -enum CollectionLock { - Read, - Write, -} - -/// Per session Db metadata -#[derive(Debug, Default)] -struct MysqlDbSession { - /// The "current time" on the server used for this session's operations - timestamp: SyncTimestamp, - /// Cache of collection modified timestamps per (user_id, collection_id) - coll_modified_cache: HashMap<(u32, i32), SyncTimestamp>, - /// Currently locked collections - coll_locks: HashMap<(u32, i32), CollectionLock>, - /// Whether a transaction was started (begin() called) - in_transaction: bool, - in_write_transaction: bool, -} - -pub struct MysqlDb { - pub(super) conn: Conn, - session: MysqlDbSession, - /// Pool level cache of collection_ids and their names - coll_cache: Arc, - metrics: Metrics, - quota: Quota, -} - -impl fmt::Debug for MysqlDb { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MysqlDb") - .field("session", &self.session) - .field("coll_cache", &self.coll_cache) - .field("metrics", &self.metrics) - .field("quota", &self.quota) - .finish() - } -} - -impl MysqlDb { - pub(super) fn new( - conn: Conn, - coll_cache: Arc, - metrics: &Metrics, - quota: &Quota, - ) -> Self { - MysqlDb { - conn, - session: Default::default(), - coll_cache, - metrics: metrics.clone(), - quota: *quota, - } - } - +#[async_trait(?Send)] +impl Db for MysqlDb { /// APIs for collection-level locking /// /// Explicitly lock the matching row in the user_collections table. Read @@ -194,7 +124,7 @@ impl MysqlDb { Ok(()) } - pub(super) async fn begin(&mut self, for_write: bool) -> DbResult<()> { + async fn begin(&mut self, for_write: bool) -> DbResult<()> { ::TransactionManager::begin_transaction(&mut self.conn).await?; self.session.in_transaction = true; if for_write { @@ -219,24 +149,6 @@ impl MysqlDb { Ok(()) } - async fn erect_tombstone(&mut self, user_id: i32) -> DbResult<()> { - sql_query(format!( - r#"INSERT INTO user_collections ({user_id}, {collection_id}, {modified}) - VALUES (?, ?, ?) - ON DUPLICATE KEY UPDATE - {modified} = VALUES({modified})"#, - user_id = USER_ID, - collection_id = COLLECTION_ID, - modified = LAST_MODIFIED - )) - .bind::(user_id as i64) - .bind::(TOMBSTONE) - .bind::(self.timestamp().as_i64()) - .execute(&mut self.conn) - .await?; - Ok(()) - } - async fn delete_storage(&mut self, user_id: UserIdentifier) -> DbResult<()> { let user_id = user_id.legacy_id as i64; // Delete user data. @@ -279,30 +191,7 @@ impl MysqlDb { self.get_storage_timestamp(params.user_id).await } - pub(super) async fn get_or_create_collection_id(&mut self, name: &str) -> DbResult { - if let Some(id) = self.coll_cache.get_id(name)? { - return Ok(id); - } - - diesel::insert_or_ignore_into(collections::table) - .values(collections::name.eq(name)) - .execute(&mut self.conn) - .await?; - - let id = collections::table - .select(collections::id) - .filter(collections::name.eq(name)) - .first(&mut self.conn) - .await?; - - if !self.session.in_write_transaction { - self.coll_cache.put(id, name.to_owned())?; - } - - Ok(id) - } - - pub(super) async fn get_collection_id(&mut self, name: &str) -> DbResult { + async fn get_collection_id(&mut self, name: &str) -> DbResult { if let Some(id) = self.coll_cache.get_id(name)? { return Ok(id); } @@ -324,25 +213,6 @@ impl MysqlDb { Ok(id) } - async fn _get_collection_name(&mut self, id: i32) -> DbResult { - let name = if let Some(name) = self.coll_cache.get_name(id)? { - name - } else { - sql_query( - "SELECT name - FROM collections - WHERE id = ?", - ) - .bind::(&id) - .get_result::(&mut self.conn) - .await - .optional()? - .ok_or_else(DbError::collection_not_found)? - .name - }; - Ok(name) - } - async fn put_bso(&mut self, bso: params::PutBso) -> DbResult { /* if bso.payload.is_none() && bso.sortindex.is_none() && bso.ttl.is_none() { @@ -438,7 +308,12 @@ impl MysqlDb { .bind::(timestamp + (i64::from(ttl) * 1000)) // remember: this is in millis .execute(&mut self.conn) .await?; - self.update_collection(user_id as u32, collection_id).await + self.update_collection(params::UpdateCollection { + user_id: bso.user_id, + collection_id, + collection: bso.collection, + }) + .await } async fn get_bsos(&mut self, params: params::GetBsos) -> DbResult { @@ -627,7 +502,12 @@ impl MysqlDb { if affected_rows == 0 { return Err(DbError::bso_not_found()); } - self.update_collection(user_id as u32, collection_id).await + self.update_collection(params::UpdateCollection { + user_id: params.user_id, + collection_id, + collection: params.collection, + }) + .await } async fn delete_bsos(&mut self, params: params::DeleteBsos) -> DbResult { @@ -639,7 +519,12 @@ impl MysqlDb { .filter(bso::id.eq_any(params.ids)) .execute(&mut self.conn) .await?; - self.update_collection(user_id as u32, collection_id).await + self.update_collection(params::UpdateCollection { + user_id: params.user_id, + collection_id, + collection: params.collection, + }) + .await } async fn post_bsos(&mut self, input: params::PostBsos) -> DbResult { @@ -657,8 +542,12 @@ impl MysqlDb { }) .await?; } - self.update_collection(input.user_id.legacy_id as u32, collection_id) - .await?; + self.update_collection(params::UpdateCollection { + user_id: input.user_id, + collection_id, + collection: input.collection, + }) + .await?; Ok(modified) } @@ -747,60 +636,13 @@ impl MysqlDb { Ok(true) } - async fn map_collection_names( + async fn update_collection( &mut self, - by_id: HashMap, - ) -> DbResult> { - let mut names = self.load_collection_names(by_id.keys()).await?; - by_id - .into_iter() - .map(|(id, value)| { - names.remove(&id).map(|name| (name, value)).ok_or_else(|| { - DbError::internal("load_collection_names unknown collection id".to_owned()) - }) - }) - .collect() - } - - async fn load_collection_names<'a>( - &mut self, - collection_ids: impl Iterator, - ) -> DbResult> { - let mut names = HashMap::new(); - let mut uncached = Vec::new(); - for &id in collection_ids { - if let Some(name) = self.coll_cache.get_name(id)? { - names.insert(id, name); - } else { - uncached.push(id); - } - } - - if !uncached.is_empty() { - let result = collections::table - .select((collections::id, collections::name)) - .filter(collections::id.eq_any(uncached)) - .load::<(i32, String)>(&mut self.conn) - .await?; - - for (id, name) in result { - names.insert(id, name.clone()); - if !self.session.in_write_transaction { - self.coll_cache.put(id, name)?; - } - } - } - - Ok(names) - } - - pub(super) async fn update_collection( - &mut self, - user_id: u32, - collection_id: i32, + params: params::UpdateCollection, ) -> DbResult { let quota = if self.quota.enabled { - self.calc_quota_usage(user_id, collection_id).await? + self.calc_quota_usage(params.user_id.legacy_id as i64, params.collection_id) + .await? } else { results::GetQuotaUsage { count: 0, @@ -825,8 +667,8 @@ impl MysqlDb { let total_bytes = quota.total_bytes as i64; let timestamp = self.timestamp().as_i64(); sql_query(upsert) - .bind::(user_id as i64) - .bind::(&collection_id) + .bind::(params.user_id.legacy_id as i64) + .bind::(¶ms.collection_id) .bind::(×tamp) .bind::(&total_bytes) .bind::("a.count) @@ -876,30 +718,6 @@ impl MysqlDb { }) } - // perform a heavier weight quota calculation - async fn calc_quota_usage( - &mut self, - user_id: u32, - collection_id: i32, - ) -> DbResult { - let (total_bytes, count): (i64, i32) = bso::table - .select(( - sql::(r#"COALESCE(SUM(LENGTH(COALESCE(payload, ""))),0)"#), - sql::("COALESCE(COUNT(*),0)"), - )) - .filter(bso::user_id.eq(user_id as i64)) - .filter(bso::expiry.gt(self.timestamp().as_i64())) - .filter(bso::collection_id.eq(collection_id)) - .get_result(&mut self.conn) - .await - .optional()? - .unwrap_or_default(); - Ok(results::GetQuotaUsage { - total_bytes: total_bytes as usize, - count, - }) - } - async fn get_collection_usage( &mut self, user_id: UserIdentifier, @@ -938,169 +756,9 @@ impl MysqlDb { self.map_collection_names(counts).await } - batch_db_method!(create_batch, create, CreateBatch); - batch_db_method!(validate_batch, validate, ValidateBatch); - batch_db_method!(append_to_batch, append, AppendToBatch); - batch_db_method!(commit_batch, commit, CommitBatch); - batch_db_method!(delete_batch, delete, DeleteBatch); - - async fn get_batch(&mut self, params: params::GetBatch) -> DbResult> { - batch_impl::get(self, params).await - } - - pub(super) fn timestamp(&self) -> SyncTimestamp { + fn timestamp(&self) -> SyncTimestamp { self.session.timestamp } -} - -#[async_trait(?Send)] -impl Db for MysqlDb { - async fn commit(&mut self) -> Result<(), Self::Error> { - MysqlDb::commit(self).await - } - - async fn rollback(&mut self) -> Result<(), Self::Error> { - MysqlDb::rollback(self).await - } - - async fn begin(&mut self, for_write: bool) -> Result<(), Self::Error> { - MysqlDb::begin(self, for_write).await - } - - async fn check(&mut self) -> Result { - MysqlDb::check(self).await - } - - async fn lock_for_read( - &mut self, - params: params::LockCollection, - ) -> Result { - MysqlDb::lock_for_read(self, params).await - } - - async fn lock_for_write( - &mut self, - params: params::LockCollection, - ) -> Result { - MysqlDb::lock_for_write(self, params).await - } - - async fn get_collection_timestamps( - &mut self, - params: params::GetCollectionTimestamps, - ) -> Result { - MysqlDb::get_collection_timestamps(self, params).await - } - - async fn get_collection_timestamp( - &mut self, - params: params::GetCollectionTimestamp, - ) -> Result { - MysqlDb::get_collection_timestamp(self, params).await - } - - async fn get_collection_counts( - &mut self, - params: params::GetCollectionCounts, - ) -> Result { - MysqlDb::get_collection_counts(self, params).await - } - - async fn get_collection_usage( - &mut self, - params: params::GetCollectionUsage, - ) -> Result { - MysqlDb::get_collection_usage(self, params).await - } - - async fn get_storage_timestamp( - &mut self, - params: params::GetStorageTimestamp, - ) -> Result { - MysqlDb::get_storage_timestamp(self, params).await - } - - async fn get_storage_usage( - &mut self, - params: params::GetStorageUsage, - ) -> Result { - MysqlDb::get_storage_usage(self, params).await - } - - async fn get_quota_usage( - &mut self, - params: params::GetQuotaUsage, - ) -> Result { - MysqlDb::get_quota_usage(self, params).await - } - - async fn delete_storage( - &mut self, - params: params::DeleteStorage, - ) -> Result { - MysqlDb::delete_storage(self, params).await - } - - async fn delete_collection( - &mut self, - params: params::DeleteCollection, - ) -> Result { - MysqlDb::delete_collection(self, params).await - } - - async fn delete_bsos( - &mut self, - params: params::DeleteBsos, - ) -> Result { - MysqlDb::delete_bsos(self, params).await - } - - async fn get_bsos(&mut self, params: params::GetBsos) -> Result { - MysqlDb::get_bsos(self, params).await - } - - async fn get_bso_ids( - &mut self, - params: params::GetBsoIds, - ) -> Result { - MysqlDb::get_bso_ids(self, params).await - } - - async fn post_bsos( - &mut self, - params: params::PostBsos, - ) -> Result { - MysqlDb::post_bsos(self, params).await - } - - async fn delete_bso( - &mut self, - params: params::DeleteBso, - ) -> Result { - MysqlDb::delete_bso(self, params).await - } - - async fn get_bso( - &mut self, - params: params::GetBso, - ) -> Result, Self::Error> { - MysqlDb::get_bso(self, params).await - } - - async fn get_bso_timestamp( - &mut self, - params: params::GetBsoTimestamp, - ) -> Result { - MysqlDb::get_bso_timestamp(self, params).await - } - - async fn put_bso(&mut self, params: params::PutBso) -> Result { - MysqlDb::put_bso(self, params).await - } - - async fn get_collection_id(&mut self, name: &str) -> Result { - MysqlDb::get_collection_id(self, name).await - } fn get_connection_info(&self) -> results::ConnectionInfo { results::ConnectionInfo::default() @@ -1110,17 +768,6 @@ impl Db for MysqlDb { self.get_or_create_collection_id(name).await } - async fn update_collection( - &mut self, - param: params::UpdateCollection, - ) -> Result { - MysqlDb::update_collection(self, param.user_id.legacy_id as u32, param.collection_id).await - } - - fn timestamp(&self) -> SyncTimestamp { - MysqlDb::timestamp(self) - } - fn set_timestamp(&mut self, timestamp: SyncTimestamp) { self.session.timestamp = timestamp; } @@ -1139,66 +786,12 @@ impl Db for MysqlDb { } } -#[async_trait(?Send)] -impl BatchDb for MysqlDb { - type Error = DbError; - - async fn create_batch( - &mut self, - params: params::CreateBatch, - ) -> Result { - MysqlDb::create_batch(self, params).await - } - - async fn validate_batch( - &mut self, - params: params::ValidateBatch, - ) -> Result { - MysqlDb::validate_batch(self, params).await - } - - async fn append_to_batch( - &mut self, - params: params::AppendToBatch, - ) -> Result { - MysqlDb::append_to_batch(self, params).await - } - - async fn get_batch( - &mut self, - params: params::GetBatch, - ) -> Result, Self::Error> { - MysqlDb::get_batch(self, params).await - } - - async fn commit_batch( - &mut self, - params: params::CommitBatch, - ) -> Result { - MysqlDb::commit_batch(self, params).await - } - - async fn delete_batch( - &mut self, - params: params::DeleteBatch, - ) -> Result { - MysqlDb::delete_batch(self, params).await - } -} - #[derive(Debug, QueryableByName)] struct IdResult { #[diesel(sql_type = Integer)] id: i32, } -#[allow(dead_code)] // Not really dead, Rust can't see the use above -#[derive(Debug, QueryableByName)] -struct NameResult { - #[diesel(sql_type = Text)] - name: String, -} - #[derive(Debug, QueryableByName)] struct UserCollectionsResult { // Can't substitute column names here. diff --git a/syncstorage-mysql/src/db/mod.rs b/syncstorage-mysql/src/db/mod.rs new file mode 100644 index 00000000..7fd089f6 --- /dev/null +++ b/syncstorage-mysql/src/db/mod.rs @@ -0,0 +1,231 @@ +use std::{collections::HashMap, fmt, sync::Arc}; + +use diesel::{ + dsl::sql, + sql_query, + sql_types::{BigInt, Integer, Text}, + ExpressionMethods, OptionalExtension, QueryDsl, +}; +use diesel_async::RunQueryDsl; +use syncserver_common::Metrics; +use syncstorage_db_common::{results, util::SyncTimestamp, Db}; +use syncstorage_settings::Quota; + +use crate::{ + pool::{CollectionCache, Conn}, + DbError, DbResult, +}; +use schema::{bso, collections}; + +mod batch_impl; +mod db_impl; +mod diesel_ext; +pub(crate) mod schema; + +pub use batch_impl::validate_batch_id; + +const TOMBSTONE: i32 = 0; +/// SQL Variable remapping +/// These names are the legacy values mapped to the new names. +const COLLECTION_ID: &str = "collection"; +const USER_ID: &str = "userid"; +const MODIFIED: &str = "modified"; +const EXPIRY: &str = "ttl"; +const LAST_MODIFIED: &str = "last_modified"; +const COUNT: &str = "count"; +const TOTAL_BYTES: &str = "total_bytes"; + +#[derive(Debug)] +enum CollectionLock { + Read, + Write, +} + +/// Per session Db metadata +#[derive(Debug, Default)] +struct MysqlDbSession { + /// The "current time" on the server used for this session's operations + timestamp: SyncTimestamp, + /// Cache of collection modified timestamps per (user_id, collection_id) + coll_modified_cache: HashMap<(u32, i32), SyncTimestamp>, + /// Currently locked collections + coll_locks: HashMap<(u32, i32), CollectionLock>, + /// Whether a transaction was started (begin() called) + in_transaction: bool, + in_write_transaction: bool, +} + +pub struct MysqlDb { + pub(super) conn: Conn, + session: MysqlDbSession, + /// Pool level cache of collection_ids and their names + coll_cache: Arc, + metrics: Metrics, + quota: Quota, +} + +impl fmt::Debug for MysqlDb { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MysqlDb") + .field("session", &self.session) + .field("coll_cache", &self.coll_cache) + .field("metrics", &self.metrics) + .field("quota", &self.quota) + .finish() + } +} + +impl MysqlDb { + pub(super) fn new( + conn: Conn, + coll_cache: Arc, + metrics: &Metrics, + quota: &Quota, + ) -> Self { + MysqlDb { + conn, + session: Default::default(), + coll_cache, + metrics: metrics.clone(), + quota: *quota, + } + } + + async fn erect_tombstone(&mut self, user_id: i32) -> DbResult<()> { + sql_query(format!( + r#"INSERT INTO user_collections ({user_id}, {collection_id}, {modified}) + VALUES (?, ?, ?) + ON DUPLICATE KEY UPDATE + {modified} = VALUES({modified})"#, + user_id = USER_ID, + collection_id = COLLECTION_ID, + modified = LAST_MODIFIED + )) + .bind::(user_id as i64) + .bind::(TOMBSTONE) + .bind::(self.timestamp().as_i64()) + .execute(&mut self.conn) + .await?; + Ok(()) + } + + pub(super) async fn get_or_create_collection_id(&mut self, name: &str) -> DbResult { + if let Some(id) = self.coll_cache.get_id(name)? { + return Ok(id); + } + + diesel::insert_or_ignore_into(collections::table) + .values(collections::name.eq(name)) + .execute(&mut self.conn) + .await?; + + let id = collections::table + .select(collections::id) + .filter(collections::name.eq(name)) + .first(&mut self.conn) + .await?; + + if !self.session.in_write_transaction { + self.coll_cache.put(id, name.to_owned())?; + } + + Ok(id) + } + + async fn _get_collection_name(&mut self, id: i32) -> DbResult { + let name = if let Some(name) = self.coll_cache.get_name(id)? { + name + } else { + sql_query( + "SELECT name + FROM collections + WHERE id = ?", + ) + .bind::(&id) + .get_result::(&mut self.conn) + .await + .optional()? + .ok_or_else(DbError::collection_not_found)? + .name + }; + Ok(name) + } + + async fn map_collection_names( + &mut self, + by_id: HashMap, + ) -> DbResult> { + let mut names = self.load_collection_names(by_id.keys()).await?; + by_id + .into_iter() + .map(|(id, value)| { + names.remove(&id).map(|name| (name, value)).ok_or_else(|| { + DbError::internal("load_collection_names unknown collection id".to_owned()) + }) + }) + .collect() + } + + async fn load_collection_names<'a>( + &mut self, + collection_ids: impl Iterator, + ) -> DbResult> { + let mut names = HashMap::new(); + let mut uncached = Vec::new(); + for &id in collection_ids { + if let Some(name) = self.coll_cache.get_name(id)? { + names.insert(id, name); + } else { + uncached.push(id); + } + } + + if !uncached.is_empty() { + let result = collections::table + .select((collections::id, collections::name)) + .filter(collections::id.eq_any(uncached)) + .load::<(i32, String)>(&mut self.conn) + .await?; + + for (id, name) in result { + names.insert(id, name.clone()); + if !self.session.in_write_transaction { + self.coll_cache.put(id, name)?; + } + } + } + + Ok(names) + } + + // perform a heavier weight quota calculation + async fn calc_quota_usage( + &mut self, + user_id: i64, + collection_id: i32, + ) -> DbResult { + let (total_bytes, count): (i64, i32) = bso::table + .select(( + sql::(r#"COALESCE(SUM(LENGTH(COALESCE(payload, ""))),0)"#), + sql::("COALESCE(COUNT(*),0)"), + )) + .filter(bso::user_id.eq(user_id)) + .filter(bso::expiry.gt(self.timestamp().as_i64())) + .filter(bso::collection_id.eq(collection_id)) + .get_result(&mut self.conn) + .await + .optional()? + .unwrap_or_default(); + Ok(results::GetQuotaUsage { + total_bytes: total_bytes as usize, + count, + }) + } +} + +#[allow(dead_code)] // Not really dead, Rust can't see the use above +#[derive(Debug, QueryableByName)] +struct NameResult { + #[diesel(sql_type = Text)] + name: String, +} diff --git a/syncstorage-mysql/src/test.rs b/syncstorage-mysql/src/test.rs index 233d28e6..10bdde03 100644 --- a/syncstorage-mysql/src/test.rs +++ b/syncstorage-mysql/src/test.rs @@ -8,11 +8,15 @@ use diesel::{ use diesel_async::RunQueryDsl; use syncserver_common::{BlockingThreadpool, Metrics}; use syncserver_settings::Settings as SyncserverSettings; -use syncstorage_db_common::DbPool; +use syncstorage_db_common::{Db, DbPool}; use syncstorage_settings::Settings as SyncstorageSettings; use url::Url; -use crate::{db::{schema::collections, MysqlDb}, pool::MysqlDbPool, DbResult}; +use crate::{ + db::{schema::collections, MysqlDb}, + pool::MysqlDbPool, + DbResult, +}; async fn db(settings: &SyncstorageSettings) -> DbResult { let _ = env_logger::try_init();