diff --git a/syncserver-settings/src/lib.rs b/syncserver-settings/src/lib.rs index f4fa887f..cefabd23 100644 --- a/syncserver-settings/src/lib.rs +++ b/syncserver-settings/src/lib.rs @@ -141,6 +141,7 @@ impl Settings { settings.port = 8000; settings.syncstorage.database_pool_max_size = 1; settings.syncstorage.database_use_test_transactions = true; + settings.syncstorage.database_spanner_use_mutations = false; settings.syncstorage.database_pool_connection_max_idle = Some(300); settings.syncstorage.database_pool_connection_lifespan = Some(300); settings diff --git a/syncstorage-db-common/src/lib.rs b/syncstorage-db-common/src/lib.rs index 8cf3e684..81b119b4 100644 --- a/syncstorage-db-common/src/lib.rs +++ b/syncstorage-db-common/src/lib.rs @@ -278,22 +278,3 @@ pub struct UserIdentifier { pub fxa_uid: String, pub fxa_kid: String, } - -impl UserIdentifier { - /// Create a new legacy id user identifier - pub fn new_legacy(user_id: u64) -> Self { - Self { - legacy_id: user_id, - ..Default::default() - } - } -} - -impl From for UserIdentifier { - fn from(val: u32) -> Self { - Self { - legacy_id: val.into(), - ..Default::default() - } - } -} diff --git a/syncstorage-db/src/tests/db.rs b/syncstorage-db/src/tests/db.rs index 4e479d09..efe34b33 100644 --- a/syncstorage-db/src/tests/db.rs +++ b/syncstorage-db/src/tests/db.rs @@ -5,7 +5,7 @@ use lazy_static::lazy_static; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use syncserver_settings::Settings; use syncstorage_db_common::{ - error::DbErrorIntrospect, params, util::SyncTimestamp, Sorting, UserIdentifier, DEFAULT_BSO_TTL, + error::DbErrorIntrospect, params, util::SyncTimestamp, Sorting, DEFAULT_BSO_TTL, }; use super::support::{db_pool, dbso, dbsos, gbso, gbsos, hid, pbso, postbso, test_db}; @@ -489,7 +489,7 @@ async fn delete_collection() -> Result<(), DbError> { let result = db .get_collection_timestamp(params::GetCollectionTimestamp { - user_id: uid.into(), + user_id: hid(uid), collection: coll.to_string(), }) .await; @@ -577,7 +577,7 @@ async fn get_collection_timestamps() -> Result<(), DbError> { let ts = db .get_collection_timestamp(params::GetCollectionTimestamp { - user_id: uid.into(), + user_id: hid(uid), collection: coll.clone(), }) .await?; @@ -615,7 +615,7 @@ async fn get_collection_usage() -> Result<(), DbError> { let pool = db_pool(None).await?; let db = test_db(pool).await?; - let uid = 5; + let uid = *UID; let mut expected = HashMap::new(); for &coll in ["bookmarks", "history", "prefs"].iter() { @@ -648,7 +648,7 @@ async fn get_collection_usage() -> Result<(), DbError> { let collection_id = db.get_collection_id("bookmarks".to_owned()).await?; let quota = db .get_quota_usage(params::GetQuotaUsage { - user_id: UserIdentifier::new_legacy(uid as u64), + user_id: hid(uid), collection: "ignored".to_owned(), collection_id, }) @@ -674,7 +674,7 @@ async fn test_quota() -> Result<(), DbError> { let pool = db_pool(None).await?; let mut db = test_db(pool).await?; - let uid = 5; + let uid = *UID; let coll = "bookmarks"; let size = 5000; @@ -709,7 +709,7 @@ async fn get_collection_counts() -> Result<(), DbError> { let pool = db_pool(None).await?; let db = test_db(pool).await?; - let uid = 4; + let uid = *UID; let mut expected = HashMap::new(); let mut rng = thread_rng(); @@ -739,7 +739,7 @@ async fn put_bso() -> Result<(), DbError> { db.put_bso(bso1).await?; let ts = db .get_collection_timestamp(params::GetCollectionTimestamp { - user_id: uid.into(), + user_id: hid(uid), collection: coll.to_string(), }) .await?; @@ -754,7 +754,7 @@ async fn put_bso() -> Result<(), DbError> { db.put_bso(bso2).await?; let ts = db .get_collection_timestamp(params::GetCollectionTimestamp { - user_id: uid.into(), + user_id: hid(uid), collection: coll.to_string(), }) .await?; @@ -796,7 +796,7 @@ async fn post_bsos() -> Result<(), DbError> { let ts = db .get_collection_timestamp(params::GetCollectionTimestamp { - user_id: uid.into(), + user_id: hid(uid), collection: coll.to_string(), }) .await?; @@ -830,7 +830,7 @@ async fn post_bsos() -> Result<(), DbError> { let ts = db .get_collection_timestamp(params::GetCollectionTimestamp { - user_id: uid.into(), + user_id: hid(uid), collection: coll.to_string(), }) .await?; @@ -864,7 +864,7 @@ async fn get_bsos() -> Result<(), DbError> { let pool = db_pool(None).await?; let db = test_db(pool).await?; - let uid = 2; + let uid = *UID; let coll = "clients"; let sortindexes = [1, 3, 4, 2, 0]; for (i, (revi, sortindex)) in sortindexes.iter().enumerate().rev().enumerate() { @@ -942,7 +942,7 @@ async fn get_bso_timestamp() -> Result<(), DbError> { db.put_bso(bso).await?; let ts = db .get_bso_timestamp(params::GetBsoTimestamp { - user_id: uid.into(), + user_id: hid(uid), collection: coll.to_string(), id: bid.to_string(), }) diff --git a/syncstorage-db/src/tests/support.rs b/syncstorage-db/src/tests/support.rs index a952799e..18876b55 100644 --- a/syncstorage-db/src/tests/support.rs +++ b/syncstorage-db/src/tests/support.rs @@ -54,7 +54,7 @@ pub fn pbso( ttl: Option, ) -> params::PutBso { params::PutBso { - user_id: UserIdentifier::new_legacy(u64::from(user_id)), + user_id: hid(user_id), collection: coll.to_owned(), id: bid.to_owned(), payload: payload.map(|payload| payload.to_owned()), @@ -126,5 +126,9 @@ pub fn dbsos(user_id: u32, coll: &str, bids: &[&str]) -> params::DeleteBsos { } pub fn hid(user_id: u32) -> UserIdentifier { - UserIdentifier::new_legacy(u64::from(user_id)) + UserIdentifier { + legacy_id: u64::from(user_id), + fxa_uid: format!("xxx_unit_tests_fxa_uid{}", user_id), + fxa_kid: format!("xxx_unit_tests_fxa_kid{}", user_id), + } } diff --git a/syncstorage-mysql/src/models.rs b/syncstorage-mysql/src/models.rs index 3ff2793a..0514f8ea 100644 --- a/syncstorage-mysql/src/models.rs +++ b/syncstorage-mysql/src/models.rs @@ -399,7 +399,7 @@ impl MysqlDb { let timestamp = self.timestamp().as_i64(); if self.quota.enabled { let usage = self.get_quota_usage_sync(params::GetQuotaUsage { - user_id: UserIdentifier::new_legacy(user_id), + user_id: bso.user_id.clone(), collection: bso.collection.clone(), collection_id, })?; diff --git a/syncstorage-settings/src/lib.rs b/syncstorage-settings/src/lib.rs index 3011e5c8..721073aa 100644 --- a/syncstorage-settings/src/lib.rs +++ b/syncstorage-settings/src/lib.rs @@ -77,6 +77,8 @@ pub struct Settings { pub database_pool_connection_max_idle: Option, #[cfg(debug_assertions)] pub database_use_test_transactions: bool, + #[cfg(debug_assertions)] + pub database_spanner_use_mutations: bool, /// Server-enforced limits for request payloads. pub limits: ServerLimits, @@ -108,6 +110,8 @@ impl Default for Settings { database_pool_connection_timeout: Some(30), #[cfg(debug_assertions)] database_use_test_transactions: false, + #[cfg(debug_assertions)] + database_spanner_use_mutations: true, limits: ServerLimits::default(), statsd_label: "syncstorage".to_string(), enable_quota: false, diff --git a/syncstorage-spanner/src/manager/deadpool.rs b/syncstorage-spanner/src/manager/deadpool.rs index ededa3fa..29447532 100644 --- a/syncstorage-spanner/src/manager/deadpool.rs +++ b/syncstorage-spanner/src/manager/deadpool.rs @@ -46,9 +46,9 @@ impl SpannerSessionManager { .to_owned(); let env = Arc::new(EnvBuilder::new().build()); - #[cfg(not(test))] + #[cfg(not(debug_assertions))] let test_transactions = false; - #[cfg(test)] + #[cfg(debug_assertions)] let test_transactions = settings.database_use_test_transactions; Ok(Self { diff --git a/syncstorage-spanner/src/models.rs b/syncstorage-spanner/src/models.rs index fb56702d..7206de36 100644 --- a/syncstorage-spanner/src/models.rs +++ b/syncstorage-spanner/src/models.rs @@ -74,6 +74,10 @@ struct SpannerDbSession { pub struct SpannerDb { pub(super) inner: Arc, + /// Whether the put/post_bsos methods use Spanner mutations (which should + /// be more efficient for those specific bulk operations) + use_mutations: bool, + /// Pool level cache of collection_ids and their names coll_cache: Arc, @@ -104,6 +108,7 @@ impl Deref for SpannerDb { impl SpannerDb { pub(super) fn new( conn: Conn, + use_mutations: bool, coll_cache: Arc, metrics: &Metrics, quota: Quota, @@ -117,6 +122,7 @@ impl SpannerDb { // https://github.com/mozilla-services/syncstorage-rs/issues/1480 #[allow(clippy::arc_with_non_send_sync)] inner: Arc::new(inner), + use_mutations, coll_cache, metrics: metrics.clone(), quota, @@ -158,7 +164,7 @@ impl SpannerDb { // This should always run within a r/w transaction, so that: "If a // transaction successfully commits, then no other writer modified the // data that was read in the transaction after it was read." - if !cfg!(test) && !self.in_write_transaction() { + if !cfg!(debug_assertions) && !self.in_write_transaction() { return Err(DbError::internal( "Can't escalate read-lock to write-lock".to_owned(), )); @@ -448,7 +454,7 @@ impl SpannerDb { let spanner = &self.conn; - if cfg!(test) && spanner.use_test_transactions { + if cfg!(debug_assertions) && spanner.use_test_transactions { // don't commit test transactions return Ok(()); } @@ -475,7 +481,7 @@ impl SpannerDb { let spanner = &self.conn; - if cfg!(test) && spanner.use_test_transactions { + if cfg!(debug_assertions) && spanner.use_test_transactions { // don't commit test transactions return Ok(()); } @@ -1076,7 +1082,7 @@ impl SpannerDb { // buffered on the client side and only issued to Spanner in the final // transaction Commit. let timestamp = self.checked_timestamp()?; - if !cfg!(test) && self.session.borrow().updated_collection { + if !cfg!(debug_assertions) && self.session.borrow().updated_collection { // No need to touch it again (except during tests where we // currently reuse Dbs for multiple requests) return Ok(timestamp); @@ -1499,8 +1505,15 @@ impl SpannerDb { .map_err(Into::into) } - #[allow(unused)] async fn put_bso_async(&self, params: params::PutBso) -> DbResult { + if self.use_mutations { + self.put_bso_with_mutations(params).await + } else { + self.put_bso_without_mutations(params).await + } + } + + async fn put_bso_with_mutations(&self, params: params::PutBso) -> DbResult { let bsos = vec![params::PostCollectionBso { id: params.id, sortindex: params.sortindex, @@ -1508,7 +1521,7 @@ impl SpannerDb { ttl: params.ttl, }]; let result = self - .post_bsos_async(params::PostBsos { + .post_bsos_with_mutations(params::PostBsos { user_id: params.user_id, collection: params.collection, bsos, @@ -1521,6 +1534,17 @@ impl SpannerDb { } async fn post_bsos_async(&self, params: params::PostBsos) -> DbResult { + if self.use_mutations { + self.post_bsos_with_mutations(params).await + } else { + self.post_bsos_without_mutations(params).await + } + } + + async fn post_bsos_with_mutations( + &self, + params: params::PostBsos, + ) -> DbResult { let user_id = params.user_id; let collection_id = self .get_or_create_collection_id_async(¶ms.collection) @@ -1669,10 +1693,9 @@ impl SpannerDb { Ok(Some(usage.total_bytes)) } - // NOTE: Currently this put_bso_async_test impl. is only used during db tests, - // see above for the non-tests version - #[allow(unused)] - async fn put_bso_async_test(&self, bso: params::PutBso) -> DbResult { + // NOTE: Currently this put_bso_async_without_mutations impl is only used + // during db tests, see the with_mutations impl for the non-tests version + async fn put_bso_without_mutations(&self, bso: params::PutBso) -> DbResult { use syncstorage_db_common::util::to_rfc3339; let collection_id = self .get_or_create_collection_id_async(&bso.collection) @@ -1844,10 +1867,12 @@ impl SpannerDb { .await } - // NOTE: Currently this post_bso_async_test impl. is only used during db tests, - // see above for the non-tests version - #[allow(unused)] - async fn post_bsos_async_test(&self, input: params::PostBsos) -> DbResult { + // NOTE: Currently this post_bsos_without_mutations impl is only used + // during db tests, see the with_mutations impl for the non-tests version + async fn post_bsos_without_mutations( + &self, + input: params::PostBsos, + ) -> DbResult { let collection_id = self .get_or_create_collection_id_async(&input.collection) .await?; @@ -1859,7 +1884,7 @@ impl SpannerDb { for pbso in input.bsos { let id = pbso.id; - self.put_bso_async_test(params::PutBso { + self.put_bso_without_mutations(params::PutBso { user_id: input.user_id.clone(), collection: input.collection.clone(), id: id.clone(), @@ -2039,30 +2064,16 @@ impl Db for SpannerDb { Box::pin(async move { db.get_bso_timestamp_async(param).map_err(Into::into).await }) } - #[cfg(not(test))] fn put_bso(&self, param: params::PutBso) -> DbFuture<'_, results::PutBso, Self::Error> { let db = self.clone(); Box::pin(async move { db.put_bso_async(param).map_err(Into::into).await }) } - #[cfg(test)] - fn put_bso(&self, param: params::PutBso) -> DbFuture<'_, results::PutBso, Self::Error> { - let db = self.clone(); - Box::pin(async move { db.put_bso_async_test(param).map_err(Into::into).await }) - } - - #[cfg(not(test))] fn post_bsos(&self, param: params::PostBsos) -> DbFuture<'_, results::PostBsos, Self::Error> { let db = self.clone(); Box::pin(async move { db.post_bsos_async(param).map_err(Into::into).await }) } - #[cfg(test)] - fn post_bsos(&self, param: params::PostBsos) -> DbFuture<'_, results::PostBsos, Self::Error> { - let db = self.clone(); - Box::pin(async move { db.post_bsos_async_test(param).map_err(Into::into).await }) - } - fn create_batch( &self, param: params::CreateBatch, diff --git a/syncstorage-spanner/src/pool.rs b/syncstorage-spanner/src/pool.rs index 35de15dd..0c976046 100644 --- a/syncstorage-spanner/src/pool.rs +++ b/syncstorage-spanner/src/pool.rs @@ -19,6 +19,9 @@ use super::{ pub struct SpannerDbPool { /// Pool of db connections pool: deadpool::managed::Pool, + /// Whether `SpannerDb` use Spanner mutations (which should be more + /// efficient for their bulk operations) + use_mutations: bool, /// In-memory cache of collection_ids and their names coll_cache: Arc, @@ -53,9 +56,14 @@ impl SpannerDbPool { }; let config = deadpool::managed::PoolConfig { max_size, timeouts }; let pool = deadpool::managed::Pool::from_config(manager, config); + #[cfg(not(debug_assertions))] + let use_mutations = true; + #[cfg(debug_assertions)] + let use_mutations = settings.database_spanner_use_mutations; Ok(Self { pool, + use_mutations, coll_cache: Default::default(), metrics: metrics.clone(), quota: Quota { @@ -75,6 +83,7 @@ impl SpannerDbPool { })?; Ok(SpannerDb::new( conn, + self.use_mutations, Arc::clone(&self.coll_cache), &self.metrics, self.quota,