diff --git a/syncstorage-db/src/tests/db.rs b/syncstorage-db/src/tests/db.rs index 537e9c18..6a540084 100644 --- a/syncstorage-db/src/tests/db.rs +++ b/syncstorage-db/src/tests/db.rs @@ -160,7 +160,7 @@ async fn get_bsos_limit_offset() -> Result<(), DbError> { let offset = "0".to_owned(); // XXX: validation? /* - let bsos = db.get_bsos_sync(gbsos(uid, coll, &[], MAX_TIMESTAMP, 0, Sorting::Index, -1, 0))?; + let bsos = db.get_bsos(gbsos(uid, coll, &[], MAX_TIMESTAMP, 0, Sorting::Index, -1, 0))?; .. etc */ diff --git a/syncstorage-spanner/src/batch.rs b/syncstorage-spanner/src/batch.rs index f3926a06..b7f2fc82 100644 --- a/syncstorage-spanner/src/batch.rs +++ b/syncstorage-spanner/src/batch.rs @@ -19,17 +19,14 @@ use super::models::{SpannerDb, PRETOUCH_TS}; use super::support::{as_type, null_value, struct_type_field, IntoSpannerValue}; use super::DbResult; -pub async fn create_async( - db: &SpannerDb, - params: params::CreateBatch, -) -> DbResult { +pub async fn create(db: &SpannerDb, params: params::CreateBatch) -> DbResult { let batch_id = Uuid::new_v4().simple().to_string(); - let collection_id = db.get_collection_id_async(¶ms.collection).await?; + let collection_id = db.get_collection_id(¶ms.collection).await?; let timestamp = db.checked_timestamp()?.as_i64(); // Ensure a parent record exists in user_collections before writing to batches // (INTERLEAVE IN PARENT user_collections) - pretouch_collection_async(db, ¶ms.user_id, collection_id).await?; + pretouch_collection(db, ¶ms.user_id, collection_id).await?; let new_batch = results::CreateBatch { size: db .check_quota(¶ms.user_id, ¶ms.collection, collection_id) @@ -48,13 +45,14 @@ pub async fn create_async( db.sql( "INSERT INTO batches (fxa_uid, fxa_kid, collection_id, batch_id, expiry) VALUES (@fxa_uid, @fxa_kid, @collection_id, @batch_id, @expiry)", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&db.conn) + .execute_dml(&db.conn) .await?; - do_append_async( + do_append( db, params.user_id, collection_id, @@ -66,16 +64,16 @@ pub async fn create_async( Ok(new_batch) } -pub async fn validate_async(db: &SpannerDb, params: params::ValidateBatch) -> DbResult { - let exists = get_async(db, params.into()).await?; +pub async fn validate(db: &SpannerDb, params: params::ValidateBatch) -> DbResult { + let exists = get_query(db, params.into()).await?; Ok(exists.is_some()) } // Append a collection to a pending batch (`create_batch` creates a new batch) -pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> DbResult<()> { +pub async fn append(db: &SpannerDb, params: params::AppendToBatch) -> DbResult<()> { let mut metrics = db.metrics.clone(); metrics.start_timer("storage.spanner.append_items_to_batch", None); - let collection_id = db.get_collection_id_async(¶ms.collection).await?; + let collection_id = db.get_collection_id(¶ms.collection).await?; let current_size = db .check_quota(¶ms.user_id, ¶ms.collection, collection_id) @@ -86,7 +84,7 @@ pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> DbRe } // confirm that this batch exists or has not yet been committed. - let exists = validate_async( + let exists = validate( db, params::ValidateBatch { user_id: params.user_id.clone(), @@ -101,7 +99,7 @@ pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> DbRe return Err(DbError::batch_not_found()); } - do_append_async( + do_append( db, params.user_id, collection_id, @@ -113,11 +111,11 @@ pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> DbRe Ok(()) } -pub async fn get_async( +pub async fn get_query( db: &SpannerDb, params: params::GetBatch, ) -> DbResult> { - let collection_id = db.get_collection_id_async(¶ms.collection).await?; + let collection_id = db.get_collection_id(¶ms.collection).await?; let (sqlparams, sqlparam_types) = params! { "fxa_uid" => params.user_id.fxa_uid.clone(), "fxa_kid" => params.user_id.fxa_kid.clone(), @@ -133,18 +131,19 @@ pub async fn get_async( AND collection_id = @collection_id AND batch_id = @batch_id AND expiry > CURRENT_TIMESTAMP()", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&db.conn)? + .execute(&db.conn)? .one_or_none() .await? .map(move |_| params::Batch { id: params.id }); Ok(batch) } -pub async fn delete_async(db: &SpannerDb, params: params::DeleteBatch) -> DbResult<()> { - let collection_id = db.get_collection_id_async(¶ms.collection).await?; +pub async fn delete_query(db: &SpannerDb, params: params::DeleteBatch) -> DbResult<()> { + let collection_id = db.get_collection_id(¶ms.collection).await?; let (sqlparams, sqlparam_types) = params! { "fxa_uid" => params.user_id.fxa_uid.clone(), "fxa_kid" => params.user_id.fxa_kid.clone(), @@ -159,26 +158,27 @@ pub async fn delete_async(db: &SpannerDb, params: params::DeleteBatch) -> DbResu AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND batch_id = @batch_id", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&db.conn) + .execute_dml(&db.conn) .await?; Ok(()) } -pub async fn commit_async( +pub async fn commit_query( db: &SpannerDb, params: params::CommitBatch, ) -> DbResult { let mut metrics = db.metrics.clone(); metrics.start_timer("storage.spanner.apply_batch", None); - let collection_id = db.get_collection_id_async(¶ms.collection).await?; + let collection_id = db.get_collection_id(¶ms.collection).await?; // Ensure a parent record exists in user_collections before writing to bsos // (INTERLEAVE IN PARENT user_collections) let timestamp = db - .update_collection_async(¶ms.user_id, collection_id, ¶ms.collection) + .update_collection(¶ms.user_id, collection_id, ¶ms.collection) .await?; let as_rfc3339 = timestamp.as_rfc3339()?; @@ -197,10 +197,11 @@ pub async fn commit_async( sqlparam_types.insert("timestamp".to_owned(), as_type(TypeCode::TIMESTAMP)); // NOTE: This write treats both expired and non-expired as existing // bsos. See the note in [SpannerDb::post_bsos_with_mutations] - db.sql(include_str!("batch_commit_update.sql"))? + db.sql(include_str!("batch_commit_update.sql")) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&db.conn) + .execute_dml(&db.conn) .await?; } @@ -220,14 +221,15 @@ pub async fn commit_async( timer3.start_timer("storage.spanner.apply_batch_insert", None); // NOTE: This write treats both expired and non-expired as existing // bsos. See the note in [SpannerDb::post_bsos_with_mutations] - db.sql(include_str!("batch_commit_insert.sql"))? + db.sql(include_str!("batch_commit_insert.sql")) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&db.conn) + .execute_dml(&db.conn) .await?; } - delete_async( + delete_query( db, params::DeleteBatch { user_id: params.user_id.clone(), @@ -246,7 +248,7 @@ pub async fn commit_async( } // Append a collection to an existing, pending batch. -pub async fn do_append_async( +pub async fn do_append( db: &SpannerDb, user_id: UserIdentifier, collection_id: i32, @@ -315,10 +317,11 @@ pub async fn do_append_async( AND collection_id=@collection_id AND batch_id=@batch_id AND batch_bso_id in UNNEST(@ids);", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&db.conn)?; + .execute(&db.conn)?; while let Some(row) = existing_stream.try_next().await? { existing.insert(exist_idx( &collection_id.to_string(), @@ -439,10 +442,11 @@ pub async fn do_append_async( "INSERT INTO batch_bsos (fxa_uid, fxa_kid, collection_id, batch_id, batch_bso_id, sortindex, payload, ttl) SELECT * FROM UNNEST(@values)", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&db.conn) + .execute_dml(&db.conn) .await?; db.metrics.count_with_tags( "storage.spanner.batch.insert", @@ -490,10 +494,11 @@ pub async fn do_append_async( WHERE fxa_uid=@fxa_uid AND fxa_kid=@fxa_kid AND collection_id=@collection_id AND batch_id=@batch_id AND batch_bso_id=@batch_bso_id", updatable = updatable - ))? + )) + .await? .params(params) .param_types(param_types.clone()) - .execute_dml_async(&db.conn) + .execute_dml(&db.conn) .await?; } } @@ -509,7 +514,7 @@ pub async fn do_append_async( /// /// For the special case of a user creating a batch for a collection with no /// prior data. -async fn pretouch_collection_async( +async fn pretouch_collection( db: &SpannerDb, user_id: &UserIdentifier, collection_id: i32, @@ -526,10 +531,11 @@ async fn pretouch_collection_async( WHERE fxa_uid = @fxa_uid AND fxa_kid = @fxa_kid AND collection_id = @collection_id", - )? + ) + .await? .params(sqlparams.clone()) .param_types(sqlparam_types.clone()) - .execute_async(&db.conn)? + .execute(&db.conn)? .one_or_none() .await?; if result.is_none() { @@ -545,10 +551,11 @@ async fn pretouch_collection_async( "INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified) VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified)" }; - db.sql(sql)? + db.sql(sql) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&db.conn) + .execute_dml(&db.conn) .await?; } Ok(()) diff --git a/syncstorage-spanner/src/models.rs b/syncstorage-spanner/src/models.rs index 5e232a93..23c37194 100644 --- a/syncstorage-spanner/src/models.rs +++ b/syncstorage-spanner/src/models.rs @@ -127,7 +127,7 @@ impl SpannerDb { self.coll_cache.get_name(id).await } - pub(super) async fn get_collection_id_async(&self, name: &str) -> DbResult { + pub(super) async fn get_collection_id(&self, name: &str) -> DbResult { if let Some(id) = self.coll_cache.get_id(name).await { return Ok(id); } @@ -137,10 +137,11 @@ impl SpannerDb { "SELECT collection_id FROM collections WHERE name = @name", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await? .ok_or_else(DbError::collection_not_found)?; @@ -154,7 +155,7 @@ impl SpannerDb { Ok(id) } - pub(super) async fn create_collection_async(&self, name: &str) -> DbResult { + pub(super) async fn create_collection(&self, name: &str) -> DbResult { // This should always run within a r/w transaction, so that: "If a // transaction successfully commits, then no other writer modified the // data that was read in the transaction after it was read." @@ -167,8 +168,9 @@ impl SpannerDb { .sql( "SELECT COALESCE(MAX(collection_id), 1) FROM collections", - )? - .execute_async(&self.conn)? + ) + .await? + .execute(&self.conn)? .one() .await?; let max = result[0] @@ -184,27 +186,28 @@ impl SpannerDb { self.sql( "INSERT INTO collections (collection_id, name) VALUES (@collection_id, @name)", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&self.conn) + .execute_dml(&self.conn) .await?; Ok(id) } - async fn get_or_create_collection_id_async(&self, name: &str) -> DbResult { - match self.get_collection_id_async(name).await { - Err(err) if err.is_collection_not_found() => self.create_collection_async(name).await, + async fn get_or_create_collection_id(&self, name: &str) -> DbResult { + match self.get_collection_id(name).await { + Err(err) if err.is_collection_not_found() => self.create_collection(name).await, result => result, } } - async fn lock_for_read_async(&self, params: params::LockCollection) -> DbResult<()> { + async fn lock_for_read(&self, params: params::LockCollection) -> DbResult<()> { // Begin a transaction - self.begin_async(false).await?; + self.begin(false).await?; let collection_id = self - .get_collection_id_async(¶ms.collection) + .get_collection_id(¶ms.collection) .await .or_else(|e| { if e.is_collection_not_found() { @@ -235,12 +238,10 @@ impl SpannerDb { Ok(()) } - async fn lock_for_write_async(&self, params: params::LockCollection) -> DbResult<()> { + async fn lock_for_write(&self, params: params::LockCollection) -> DbResult<()> { // Begin a transaction - self.begin_async(true).await?; - let collection_id = self - .get_or_create_collection_id_async(¶ms.collection) - .await?; + self.begin(true).await?; + let collection_id = self.get_or_create_collection_id(¶ms.collection).await?; if let Some(CollectionLock::Read) = self .inner .session @@ -268,10 +269,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND modified > @pretouch_ts", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await?; @@ -290,8 +292,9 @@ impl SpannerDb { now } else { let result = self - .sql("SELECT CURRENT_TIMESTAMP()")? - .execute_async(&self.conn)? + .sql("SELECT CURRENT_TIMESTAMP()") + .await? + .execute(&self.conn)? .one() .await?; sync_timestamp_from_rfc3339(result[0].get_string_value())? @@ -310,29 +313,7 @@ impl SpannerDb { self.session.borrow_mut().timestamp = Some(timestamp); } - pub(super) fn begin(&self, for_write: bool) -> DbResult<()> { - let spanner = &self.conn; - let mut options = TransactionOptions::new(); - if for_write { - options.set_read_write(TransactionOptions_ReadWrite::new()); - self.session.borrow_mut().in_write_transaction = true; - } else { - options.set_read_only(TransactionOptions_ReadOnly::new()); - } - let mut req = BeginTransactionRequest::new(); - req.set_session(spanner.session.get_name().to_owned()); - req.set_options(options); - let mut transaction = spanner - .client - .begin_transaction_opt(&req, spanner.session_opt()?)?; - - let mut ts = TransactionSelector::new(); - ts.set_id(transaction.take_id()); - self.session.borrow_mut().transaction = Some(ts); - Ok(()) - } - - pub(super) async fn begin_async(&self, for_write: bool) -> DbResult<()> { + pub(super) async fn begin(&self, for_write: bool) -> DbResult<()> { let spanner = &self.conn; let mut options = TransactionOptions::new(); if for_write { @@ -356,27 +337,18 @@ impl SpannerDb { } /// Return the current transaction metadata (TransactionSelector) if one is active. - fn get_transaction(&self) -> DbResult> { + async fn get_transaction(&self) -> DbResult> { if self.session.borrow().transaction.is_none() { - self.begin(true)?; + self.begin(true).await?; } Ok(self.session.borrow().transaction.clone()) } - /// Return the current transaction metadata (TransactionSelector) if one is active. - async fn get_transaction_async(&self) -> DbResult> { - if self.session.borrow().transaction.is_none() { - self.begin_async(true).await?; - } - - Ok(self.session.borrow().transaction.clone()) - } - - fn sql_request(&self, sql: &str) -> DbResult { + async fn sql_request(&self, sql: &str) -> DbResult { let mut sqlr = ExecuteSqlRequest::new(); sqlr.set_sql(sql.to_owned()); - if let Some(transaction) = self.get_transaction()? { + if let Some(transaction) = self.get_transaction().await? { sqlr.set_transaction(transaction); let mut session = self.session.borrow_mut(); sqlr.seqno = session @@ -388,8 +360,8 @@ impl SpannerDb { Ok(sqlr) } - pub(super) fn sql(&self, sql: &str) -> DbResult { - Ok(ExecuteSqlRequestBuilder::new(self.sql_request(sql)?)) + pub(super) async fn sql(&self, sql: &str) -> DbResult { + Ok(ExecuteSqlRequestBuilder::new(self.sql_request(sql).await?)) } #[allow(unused)] @@ -444,7 +416,7 @@ impl SpannerDb { self.session.borrow().in_write_transaction } - pub fn commit_sync(&self) -> DbResult<()> { + async fn commit(&self) -> DbResult<()> { if !self.in_write_transaction() { // read-only return Ok(()); @@ -457,34 +429,7 @@ impl SpannerDb { return Ok(()); } - if let Some(transaction) = self.get_transaction()? { - let mut req = CommitRequest::new(); - req.set_session(spanner.session.get_name().to_owned()); - req.set_transaction_id(transaction.get_id().to_vec()); - if let Some(mutations) = self.session.borrow_mut().mutations.take() { - req.set_mutations(RepeatedField::from_vec(mutations)); - } - spanner.client.commit_opt(&req, spanner.session_opt()?)?; - Ok(()) - } else { - Err(DbError::internal("No transaction to commit".to_owned())) - } - } - - async fn commit_async(&self) -> DbResult<()> { - if !self.in_write_transaction() { - // read-only - return Ok(()); - } - - let spanner = &self.conn; - - if cfg!(debug_assertions) && spanner.settings.use_test_transactions { - // don't commit test transactions - return Ok(()); - } - - if let Some(transaction) = self.get_transaction_async().await? { + if let Some(transaction) = self.get_transaction().await? { let mut req = CommitRequest::new(); req.set_session(spanner.session.get_name().to_owned()); req.set_transaction_id(transaction.get_id().to_vec()); @@ -501,31 +446,13 @@ impl SpannerDb { } } - pub fn rollback_sync(&self) -> DbResult<()> { + async fn rollback(&self) -> DbResult<()> { if !self.in_write_transaction() { // read-only return Ok(()); } - if let Some(transaction) = self.get_transaction()? { - let spanner = &self.conn; - let mut req = RollbackRequest::new(); - req.set_session(spanner.session.get_name().to_owned()); - req.set_transaction_id(transaction.get_id().to_vec()); - spanner.client.rollback_opt(&req, spanner.session_opt()?)?; - Ok(()) - } else { - Err(DbError::internal("No transaction to rollback".to_owned())) - } - } - - async fn rollback_async(&self) -> DbResult<()> { - if !self.in_write_transaction() { - // read-only - return Ok(()); - } - - if let Some(transaction) = self.get_transaction_async().await? { + if let Some(transaction) = self.get_transaction().await? { let spanner = &self.conn; let mut req = RollbackRequest::new(); req.set_session(spanner.session.get_name().to_owned()); @@ -540,11 +467,11 @@ impl SpannerDb { } } - async fn get_collection_timestamp_async( + async fn get_collection_timestamp( &self, params: params::GetCollectionTimestamp, ) -> DbResult { - let collection_id = self.get_collection_id_async(¶ms.collection).await?; + let collection_id = self.get_collection_id(¶ms.collection).await?; if let Some(modified) = self .session .borrow() @@ -569,10 +496,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND modified > @pretouch_ts", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await? .ok_or_else(DbError::collection_not_found)?; @@ -580,7 +508,7 @@ impl SpannerDb { Ok(modified) } - async fn get_collection_timestamps_async( + async fn get_collection_timestamps( &self, user_id: params::GetCollectionTimestamps, ) -> DbResult { @@ -599,10 +527,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND collection_id != @collection_id AND modified > @pretouch_ts", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)?; + .execute(&self.conn)?; let mut results = HashMap::new(); while let Some(row) = streaming.try_next().await? { let collection_id = row[0] @@ -655,9 +584,10 @@ impl SpannerDb { "SELECT collection_id, name FROM collections WHERE collection_id IN UNNEST(@ids)", - )? + ) + .await? .params(params) - .execute_async(&self.conn)?; + .execute(&self.conn)?; while let Some(mut row) = rs.try_next().await? { let id = row[0] .get_string_value() @@ -674,7 +604,7 @@ impl SpannerDb { Ok(names) } - async fn get_collection_counts_async( + async fn get_collection_counts( &self, user_id: params::GetCollectionCounts, ) -> DbResult { @@ -690,10 +620,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND expiry > CURRENT_TIMESTAMP() GROUP BY collection_id", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)?; + .execute(&self.conn)?; let mut counts = HashMap::new(); while let Some(row) = streaming.try_next().await? { let collection_id = row[0] @@ -709,7 +640,7 @@ impl SpannerDb { self.map_collection_names(counts).await } - async fn get_collection_usage_async( + async fn get_collection_usage( &self, user_id: params::GetCollectionUsage, ) -> DbResult { @@ -725,10 +656,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND expiry > CURRENT_TIMESTAMP() GROUP BY collection_id", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)?; + .execute(&self.conn)?; let mut usages = HashMap::new(); while let Some(row) = streaming.try_next().await? { let collection_id = row[0] @@ -761,10 +693,11 @@ impl SpannerDb { WHERE fxa_uid = @fxa_uid AND fxa_kid = @fxa_kid AND modified > @pretouch_ts", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)? + .execute(&self.conn)? .one() .await?; if row[0].has_null_value() { @@ -774,7 +707,7 @@ impl SpannerDb { } } - async fn get_storage_usage_async( + async fn get_storage_usage( &self, user_id: params::GetStorageUsage, ) -> DbResult { @@ -790,10 +723,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND expiry > CURRENT_TIMESTAMP() GROUP BY fxa_uid", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await?; if let Some(result) = result { @@ -807,7 +741,7 @@ impl SpannerDb { } } - async fn get_quota_usage_async( + async fn get_quota_usage( &self, params: params::GetQuotaUsage, ) -> DbResult { @@ -825,10 +759,11 @@ impl SpannerDb { "collection_id" => params.collection_id, }; let result = self - .sql(check_sql)? + .sql(check_sql) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await?; if let Some(result) = result { @@ -895,10 +830,11 @@ impl SpannerDb { "collection_id" => collection_id, }; - self.sql(calc_sql)? + self.sql(calc_sql) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await? }; @@ -941,10 +877,11 @@ impl SpannerDb { .sql( "SELECT 1 FROM user_collections WHERE fxa_uid=@fxa_uid AND fxa_kid=@fxa_kid AND collection_id=@collection_id", - )? + ) + .await? .params(sqlparams.clone()) .param_types(sqltypes.clone()) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await?; if result.is_none() { @@ -968,10 +905,11 @@ impl SpannerDb { } } }; - self.sql(set_sql)? + self.sql(set_sql) + .await? .params(sqlparams) .param_types(sqltypes) - .execute_dml_async(&self.conn) + .execute_dml(&self.conn) .await?; Ok(timestamp) } @@ -990,10 +928,11 @@ impl SpannerDb { WHERE fxa_uid = @fxa_uid AND fxa_kid = @fxa_kid AND collection_id = @collection_id", - )? + ) + .await? .params(params.clone()) .param_types(param_types.clone()) - .execute_dml_async(&self.conn) + .execute_dml(&self.conn) .await?; self.update_user_collection_quotas(user_id, TOMBSTONE) .await?; @@ -1002,7 +941,7 @@ impl SpannerDb { self.checked_timestamp() } - async fn delete_storage_async(&self, user_id: params::DeleteStorage) -> DbResult<()> { + async fn delete_storage(&self, user_id: params::DeleteStorage) -> DbResult<()> { // Also deletes child bsos/batch rows (INTERLEAVE IN PARENT // user_collections ON DELETE CASCADE) let (sqlparams, sqlparam_types) = params! { @@ -1013,10 +952,11 @@ impl SpannerDb { "DELETE FROM user_collections WHERE fxa_uid = @fxa_uid AND fxa_kid = @fxa_kid", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&self.conn) + .execute_dml(&self.conn) .await?; Ok(()) } @@ -1028,13 +968,13 @@ impl SpannerDb { .ok_or_else(|| DbError::internal("CURRENT_TIMESTAMP() not read yet".to_owned())) } - async fn delete_collection_async( + async fn delete_collection( &self, params: params::DeleteCollection, ) -> DbResult { // Also deletes child bsos/batch rows (INTERLEAVE IN PARENT // user_collections ON DELETE CASCADE) - let collection_id = self.get_collection_id_async(¶ms.collection).await?; + let collection_id = self.get_collection_id(¶ms.collection).await?; let (sqlparams, mut sqlparam_types) = params! { "fxa_uid" => params.user_id.fxa_uid.clone(), "fxa_kid" => params.user_id.fxa_kid.clone(), @@ -1049,10 +989,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND modified > @pretouch_ts", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&self.conn) + .execute_dml(&self.conn) .await?; if affected_rows > 0 { let mut tags = HashMap::default(); @@ -1065,7 +1006,7 @@ impl SpannerDb { } } - pub(super) async fn update_collection_async( + pub(super) async fn update_collection( &self, user_id: &UserIdentifier, collection_id: i32, @@ -1101,10 +1042,11 @@ impl SpannerDb { WHERE fxa_uid = @fxa_uid AND fxa_kid = @fxa_kid AND collection_id = @collection_id", - )? + ) + .await? .params(sqlparams.clone()) .param_types(sqlparam_types.clone()) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await?; if result.is_some() { @@ -1113,10 +1055,11 @@ impl SpannerDb { WHERE fxa_uid = @fxa_uid AND fxa_kid = @fxa_kid AND collection_id = @collection_id"; - self.sql(sql)? + self.sql(sql) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&self.conn) + .execute_dml(&self.conn) .await?; } else { let mut tags = HashMap::default(); @@ -1131,18 +1074,19 @@ impl SpannerDb { "INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified) VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified)" }; - self.sql(update_sql)? + self.sql(update_sql) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&self.conn) + .execute_dml(&self.conn) .await?; } self.session.borrow_mut().updated_collection = true; Ok(timestamp) } - async fn delete_bso_async(&self, params: params::DeleteBso) -> DbResult { - let collection_id = self.get_collection_id_async(¶ms.collection).await?; + async fn delete_bso(&self, params: params::DeleteBso) -> DbResult { + let collection_id = self.get_collection_id(¶ms.collection).await?; let user_id = params.user_id.clone(); let (sqlparams, sqlparam_types) = params! { "fxa_uid" => params.user_id.fxa_uid, @@ -1157,10 +1101,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND bso_id = @bso_id", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&self.conn) + .execute_dml(&self.conn) .await?; if affected_rows == 0 { Err(DbError::bso_not_found()) @@ -1172,9 +1117,9 @@ impl SpannerDb { } } - async fn delete_bsos_async(&self, params: params::DeleteBsos) -> DbResult { + async fn delete_bsos(&self, params: params::DeleteBsos) -> DbResult { let user_id = params.user_id.clone(); - let collection_id = self.get_collection_id_async(¶ms.collection).await?; + let collection_id = self.get_collection_id(¶ms.collection).await?; let (sqlparams, sqlparam_types) = params! { "fxa_uid" => user_id.fxa_uid, @@ -1188,10 +1133,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND bso_id IN UNNEST(@ids)", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&self.conn) + .execute_dml(&self.conn) .await?; let mut tags = HashMap::default(); tags.insert("collection".to_string(), params.collection.clone()); @@ -1201,7 +1147,7 @@ impl SpannerDb { .await } - async fn bsos_query_async( + async fn bsos_query( &self, query_str: &str, params: params::GetBsos, @@ -1210,7 +1156,7 @@ impl SpannerDb { let (mut sqlparams, mut sqlparam_types) = params! { "fxa_uid" => params.user_id.fxa_uid, "fxa_kid" => params.user_id.fxa_kid, - "collection_id" => self.get_collection_id_async(¶ms.collection).await?, + "collection_id" => self.get_collection_id(¶ms.collection).await?, }; if !params.ids.is_empty() { @@ -1293,10 +1239,11 @@ impl SpannerDb { if let Some(offset) = params.offset { query = format!("{} OFFSET {}", query, offset.offset); } - self.sql(&query)? + self.sql(&query) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn) + .execute(&self.conn) } /// Whether to stabilize the sort order for get_bsos_async @@ -1357,7 +1304,7 @@ impl SpannerDb { */ } - async fn get_bsos_async(&self, params: params::GetBsos) -> DbResult { + async fn get_bsos(&self, params: params::GetBsos) -> DbResult { let query = "\ SELECT bso_id, sortindex, payload, modified, expiry FROM bsos @@ -1369,7 +1316,7 @@ impl SpannerDb { let params::Offset { offset, timestamp } = params.offset.clone().unwrap_or_default(); let sort = params.sort; - let mut streaming = self.bsos_query_async(query, params).await?; + let mut streaming = self.bsos_query(query, params).await?; let mut bsos = vec![]; while let Some(row) = streaming.try_next().await? { bsos.push(bso_from_row(row)?); @@ -1396,7 +1343,7 @@ impl SpannerDb { }) } - async fn get_bso_ids_async(&self, params: params::GetBsos) -> DbResult { + async fn get_bso_ids(&self, params: params::GetBsos) -> DbResult { let limit = params.limit.map(i64::from).unwrap_or(-1); let params::Offset { offset, timestamp } = params.offset.clone().unwrap_or_default(); let sort = params.sort; @@ -1408,7 +1355,7 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND expiry > CURRENT_TIMESTAMP()"; - let mut stream = self.bsos_query_async(query, params).await?; + let mut stream = self.bsos_query(query, params).await?; let mut ids = vec![]; let mut modifieds = vec![]; @@ -1437,8 +1384,8 @@ impl SpannerDb { }) } - async fn get_bso_async(&self, params: params::GetBso) -> DbResult> { - let collection_id = self.get_collection_id_async(¶ms.collection).await?; + async fn get_bso(&self, params: params::GetBso) -> DbResult> { + let collection_id = self.get_collection_id(¶ms.collection).await?; let (sqlparams, sqlparam_types) = params! { "fxa_uid" => params.user_id.fxa_uid, "fxa_kid" => params.user_id.fxa_kid, @@ -1453,21 +1400,19 @@ impl SpannerDb { AND collection_id = @collection_id AND bso_id = @bso_id AND expiry > CURRENT_TIMESTAMP()", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await? .map(bso_from_row) .transpose() } - async fn get_bso_timestamp_async( - &self, - params: params::GetBsoTimestamp, - ) -> DbResult { - let collection_id = self.get_collection_id_async(¶ms.collection).await?; + async fn get_bso_timestamp(&self, params: params::GetBsoTimestamp) -> DbResult { + let collection_id = self.get_collection_id(¶ms.collection).await?; let (sqlparams, sqlparam_types) = params! { "fxa_uid" => params.user_id.fxa_uid, "fxa_kid" => params.user_id.fxa_kid, @@ -1484,10 +1429,11 @@ impl SpannerDb { AND collection_id = @collection_id AND bso_id = @bso_id AND expiry > CURRENT_TIMESTAMP()", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await?; if let Some(result) = result { @@ -1497,7 +1443,7 @@ impl SpannerDb { } } - async fn put_bso_async(&self, params: params::PutBso) -> DbResult { + async fn put_bso(&self, params: params::PutBso) -> DbResult { if self.conn.settings.use_mutations { self.put_bso_with_mutations(params).await } else { @@ -1525,7 +1471,7 @@ impl SpannerDb { Ok(result.modified) } - async fn post_bsos_async(&self, params: params::PostBsos) -> DbResult { + async fn post_bsos(&self, params: params::PostBsos) -> DbResult { if self.conn.settings.use_mutations { self.post_bsos_with_mutations(params).await } else { @@ -1538,9 +1484,7 @@ impl SpannerDb { params: params::PostBsos, ) -> DbResult { let user_id = params.user_id; - let collection_id = self - .get_or_create_collection_id_async(¶ms.collection) - .await?; + let collection_id = self.get_or_create_collection_id(¶ms.collection).await?; if !params.for_batch { self.check_quota(&user_id, ¶ms.collection, collection_id) @@ -1550,7 +1494,7 @@ impl SpannerDb { // Ensure a parent record exists in user_collections before writing to // bsos (INTERLEAVE IN PARENT user_collections) let timestamp = self - .update_collection_async(&user_id, collection_id, ¶ms.collection) + .update_collection(&user_id, collection_id, ¶ms.collection) .await?; let (sqlparams, sqlparam_types) = params! { @@ -1583,10 +1527,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND bso_id IN UNNEST(@ids)", - )? + ) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_async(&self.conn)?; + .execute(&self.conn)?; let mut existing = HashSet::new(); while let Some(mut row) = streaming.try_next().await? { existing.insert(row[0].take_string_value()); @@ -1652,10 +1597,11 @@ impl SpannerDb { Ok(result) } - async fn check_async(&self) -> DbResult { + async fn check(&self) -> DbResult { // TODO: is there a better check than just fetching UTC? - self.sql("SELECT CURRENT_TIMESTAMP()")? - .execute_async(&self.conn)? + self.sql("SELECT CURRENT_TIMESTAMP()") + .await? + .execute(&self.conn)? .one() .await?; Ok(true) @@ -1680,7 +1626,7 @@ impl SpannerDb { return Ok(None); } let usage = self - .get_quota_usage_async(params::GetQuotaUsage { + .get_quota_usage(params::GetQuotaUsage { user_id: user_id.clone(), collection: collection.to_owned(), collection_id, @@ -1700,9 +1646,7 @@ impl SpannerDb { // during db tests, see the with_mutations impl for the non-tests version async fn put_bso_without_mutations(&self, bso: params::PutBso) -> DbResult { use syncstorage_db_common::util::to_rfc3339; - let collection_id = self - .get_or_create_collection_id_async(&bso.collection) - .await?; + let collection_id = self.get_or_create_collection_id(&bso.collection).await?; self.check_quota(&bso.user_id, &bso.collection, collection_id) .await?; @@ -1714,7 +1658,7 @@ impl SpannerDb { "bso_id" => bso.id, }; // prewarm the collections table by ensuring that the row is added if not present. - self.update_collection_async(&bso.user_id, collection_id, &bso.collection) + self.update_collection(&bso.user_id, collection_id, &bso.collection) .await?; let timestamp = self.checked_timestamp()?; @@ -1726,10 +1670,11 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND bso_id = @bso_id", - )? + ) + .await? .params(sqlparams.clone()) .param_types(sqlparam_types.clone()) - .execute_async(&self.conn)? + .execute(&self.conn)? .one_or_none() .await?; let exists = result.is_some(); @@ -1857,10 +1802,11 @@ impl SpannerDb { sql.to_owned() }; - self.sql(&sql)? + self.sql(&sql) + .await? .params(sqlparams) .param_types(sqlparam_types) - .execute_dml_async(&self.conn) + .execute_dml(&self.conn) .await?; // update the counts for the user_collections table. self.update_user_collection_quotas(&bso.user_id, collection_id) @@ -1873,9 +1819,7 @@ impl SpannerDb { &self, input: params::PostBsos, ) -> DbResult { - let collection_id = self - .get_or_create_collection_id_async(&input.collection) - .await?; + let collection_id = self.get_or_create_collection_id(&input.collection).await?; let mut result = results::PostBsos { modified: self.checked_timestamp()?, success: Default::default(), @@ -1906,27 +1850,27 @@ impl Db for SpannerDb { fn commit(&mut self) -> DbFuture<'_, (), Self::Error> { let db = self.clone(); - Box::pin(async move { db.commit_async().map_err(Into::into).await }) + Box::pin(async move { db.commit().map_err(Into::into).await }) } fn rollback(&mut self) -> DbFuture<'_, (), Self::Error> { let db = self.clone(); - Box::pin(async move { db.rollback_async().map_err(Into::into).await }) + Box::pin(async move { db.rollback().map_err(Into::into).await }) } fn lock_for_read(&mut self, param: params::LockCollection) -> DbFuture<'_, (), Self::Error> { let db = self.clone(); - Box::pin(async move { db.lock_for_read_async(param).map_err(Into::into).await }) + Box::pin(async move { db.lock_for_read(param).map_err(Into::into).await }) } fn lock_for_write(&mut self, param: params::LockCollection) -> DbFuture<'_, (), Self::Error> { let db = self.clone(); - Box::pin(async move { db.lock_for_write_async(param).map_err(Into::into).await }) + Box::pin(async move { db.lock_for_write(param).map_err(Into::into).await }) } fn begin(&mut self, for_write: bool) -> DbFuture<'_, (), Self::Error> { let db = self.clone(); - Box::pin(async move { db.begin_async(for_write).map_err(Into::into).await }) + Box::pin(async move { db.begin(for_write).map_err(Into::into).await }) } fn get_collection_timestamp( @@ -1934,11 +1878,7 @@ impl Db for SpannerDb { param: params::GetCollectionTimestamp, ) -> DbFuture<'_, results::GetCollectionTimestamp, Self::Error> { let db = self.clone(); - Box::pin(async move { - db.get_collection_timestamp_async(param) - .map_err(Into::into) - .await - }) + Box::pin(async move { db.get_collection_timestamp(param).map_err(Into::into).await }) } fn get_storage_timestamp( @@ -1954,12 +1894,12 @@ impl Db for SpannerDb { param: params::DeleteCollection, ) -> DbFuture<'_, results::DeleteCollection, Self::Error> { let db = self.clone(); - Box::pin(async move { db.delete_collection_async(param).map_err(Into::into).await }) + Box::pin(async move { db.delete_collection(param).map_err(Into::into).await }) } fn check(&mut self) -> DbFuture<'_, results::Check, Self::Error> { let db = self.clone(); - Box::pin(async move { db.check_async().map_err(Into::into).await }) + Box::pin(async move { db.check().map_err(Into::into).await }) } fn get_collection_timestamps( @@ -1968,7 +1908,7 @@ impl Db for SpannerDb { ) -> DbFuture<'_, results::GetCollectionTimestamps, Self::Error> { let db = self.clone(); Box::pin(async move { - db.get_collection_timestamps_async(user_id) + db.get_collection_timestamps(user_id) .map_err(Into::into) .await }) @@ -1979,11 +1919,7 @@ impl Db for SpannerDb { user_id: params::GetCollectionCounts, ) -> DbFuture<'_, results::GetCollectionCounts, Self::Error> { let db = self.clone(); - Box::pin(async move { - db.get_collection_counts_async(user_id) - .map_err(Into::into) - .await - }) + Box::pin(async move { db.get_collection_counts(user_id).map_err(Into::into).await }) } fn get_collection_usage( @@ -1991,11 +1927,7 @@ impl Db for SpannerDb { user_id: params::GetCollectionUsage, ) -> DbFuture<'_, results::GetCollectionUsage, Self::Error> { let db = self.clone(); - Box::pin(async move { - db.get_collection_usage_async(user_id) - .map_err(Into::into) - .await - }) + Box::pin(async move { db.get_collection_usage(user_id).map_err(Into::into).await }) } fn get_storage_usage( @@ -2003,7 +1935,7 @@ impl Db for SpannerDb { param: params::GetStorageUsage, ) -> DbFuture<'_, results::GetStorageUsage, Self::Error> { let db = self.clone(); - Box::pin(async move { db.get_storage_usage_async(param).map_err(Into::into).await }) + Box::pin(async move { db.get_storage_usage(param).map_err(Into::into).await }) } fn get_quota_usage( @@ -2011,7 +1943,7 @@ impl Db for SpannerDb { param: params::GetQuotaUsage, ) -> DbFuture<'_, results::GetQuotaUsage, Self::Error> { let db = self.clone(); - Box::pin(async move { db.get_quota_usage_async(param).map_err(Into::into).await }) + Box::pin(async move { db.get_quota_usage(param).map_err(Into::into).await }) } fn delete_storage( @@ -2019,7 +1951,7 @@ impl Db for SpannerDb { param: params::DeleteStorage, ) -> DbFuture<'_, results::DeleteStorage, Self::Error> { let db = self.clone(); - Box::pin(async move { db.delete_storage_async(param).map_err(Into::into).await }) + Box::pin(async move { db.delete_storage(param).map_err(Into::into).await }) } fn delete_bso( @@ -2027,7 +1959,7 @@ impl Db for SpannerDb { param: params::DeleteBso, ) -> DbFuture<'_, results::DeleteBso, Self::Error> { let db = self.clone(); - Box::pin(async move { db.delete_bso_async(param).map_err(Into::into).await }) + Box::pin(async move { db.delete_bso(param).map_err(Into::into).await }) } fn delete_bsos( @@ -2035,12 +1967,12 @@ impl Db for SpannerDb { param: params::DeleteBsos, ) -> DbFuture<'_, results::DeleteBsos, Self::Error> { let db = self.clone(); - Box::pin(async move { db.delete_bsos_async(param).map_err(Into::into).await }) + Box::pin(async move { db.delete_bsos(param).map_err(Into::into).await }) } fn get_bsos(&mut self, param: params::GetBsos) -> DbFuture<'_, results::GetBsos, Self::Error> { let db = self.clone(); - Box::pin(async move { db.get_bsos_async(param).map_err(Into::into).await }) + Box::pin(async move { db.get_bsos(param).map_err(Into::into).await }) } fn get_bso_ids( @@ -2048,7 +1980,7 @@ impl Db for SpannerDb { param: params::GetBsoIds, ) -> DbFuture<'_, results::GetBsoIds, Self::Error> { let db = self.clone(); - Box::pin(async move { db.get_bso_ids_async(param).map_err(Into::into).await }) + Box::pin(async move { db.get_bso_ids(param).map_err(Into::into).await }) } fn get_bso( @@ -2056,7 +1988,7 @@ impl Db for SpannerDb { param: params::GetBso, ) -> DbFuture<'_, Option, Self::Error> { let db = self.clone(); - Box::pin(async move { db.get_bso_async(param).map_err(Into::into).await }) + Box::pin(async move { db.get_bso(param).map_err(Into::into).await }) } fn get_bso_timestamp( @@ -2064,12 +1996,12 @@ impl Db for SpannerDb { param: params::GetBsoTimestamp, ) -> DbFuture<'_, results::GetBsoTimestamp, Self::Error> { let db = self.clone(); - Box::pin(async move { db.get_bso_timestamp_async(param).map_err(Into::into).await }) + Box::pin(async move { db.get_bso_timestamp(param).map_err(Into::into).await }) } fn put_bso(&mut self, param: params::PutBso) -> DbFuture<'_, results::PutBso, Self::Error> { let db = self.clone(); - Box::pin(async move { db.put_bso_async(param).map_err(Into::into).await }) + Box::pin(async move { db.put_bso(param).map_err(Into::into).await }) } fn post_bsos( @@ -2077,7 +2009,7 @@ impl Db for SpannerDb { param: params::PostBsos, ) -> DbFuture<'_, results::PostBsos, Self::Error> { let db = self.clone(); - Box::pin(async move { db.post_bsos_async(param).map_err(Into::into).await }) + Box::pin(async move { db.post_bsos(param).map_err(Into::into).await }) } fn create_batch( @@ -2085,7 +2017,7 @@ impl Db for SpannerDb { param: params::CreateBatch, ) -> DbFuture<'_, results::CreateBatch, Self::Error> { let db = self.clone(); - Box::pin(async move { batch::create_async(&db, param).map_err(Into::into).await }) + Box::pin(async move { batch::create(&db, param).map_err(Into::into).await }) } fn validate_batch( @@ -2093,7 +2025,7 @@ impl Db for SpannerDb { param: params::ValidateBatch, ) -> DbFuture<'_, results::ValidateBatch, Self::Error> { let db = self.clone(); - Box::pin(async move { batch::validate_async(&db, param).map_err(Into::into).await }) + Box::pin(async move { batch::validate(&db, param).map_err(Into::into).await }) } fn append_to_batch( @@ -2101,7 +2033,7 @@ impl Db for SpannerDb { param: params::AppendToBatch, ) -> DbFuture<'_, results::AppendToBatch, Self::Error> { let db = self.clone(); - Box::pin(async move { batch::append_async(&db, param).map_err(Into::into).await }) + Box::pin(async move { batch::append(&db, param).map_err(Into::into).await }) } fn get_batch( @@ -2109,7 +2041,7 @@ impl Db for SpannerDb { param: params::GetBatch, ) -> DbFuture<'_, Option, Self::Error> { let db = self.clone(); - Box::pin(async move { batch::get_async(&db, param).map_err(Into::into).await }) + Box::pin(async move { batch::get_query(&db, param).map_err(Into::into).await }) } fn commit_batch( @@ -2117,12 +2049,12 @@ impl Db for SpannerDb { param: params::CommitBatch, ) -> DbFuture<'_, results::CommitBatch, Self::Error> { let db = self.clone(); - Box::pin(async move { batch::commit_async(&db, param).map_err(Into::into).await }) + Box::pin(async move { batch::commit_query(&db, param).map_err(Into::into).await }) } fn get_collection_id(&mut self, name: String) -> DbFuture<'_, i32, Self::Error> { let db = self.clone(); - Box::pin(async move { db.get_collection_id_async(&name).map_err(Into::into).await }) + Box::pin(async move { db.get_collection_id(&name).map_err(Into::into).await }) } fn get_connection_info(&self) -> results::ConnectionInfo { @@ -2145,7 +2077,7 @@ impl Db for SpannerDb { fn create_collection(&mut self, name: String) -> DbFuture<'_, i32, Self::Error> { let db = self.clone(); - Box::pin(async move { db.create_collection_async(&name).map_err(Into::into).await }) + Box::pin(async move { db.create_collection(&name).map_err(Into::into).await }) } fn update_collection( @@ -2154,7 +2086,7 @@ impl Db for SpannerDb { ) -> DbFuture<'_, SyncTimestamp, Self::Error> { let db = self.clone(); Box::pin(async move { - db.update_collection_async(¶m.user_id, param.collection_id, ¶m.collection) + db.update_collection(¶m.user_id, param.collection_id, ¶m.collection) .map_err(Into::into) .await }) @@ -2174,7 +2106,7 @@ impl Db for SpannerDb { param: params::DeleteBatch, ) -> DbFuture<'_, results::DeleteBatch, Self::Error> { let db = self.clone(); - Box::pin(async move { batch::delete_async(&db, param).map_err(Into::into).await }) + Box::pin(async move { batch::delete_query(&db, param).map_err(Into::into).await }) } fn clear_coll_cache(&mut self) -> DbFuture<'_, (), Self::Error> { diff --git a/syncstorage-spanner/src/pool.rs b/syncstorage-spanner/src/pool.rs index e7fca22b..02dc4418 100644 --- a/syncstorage-spanner/src/pool.rs +++ b/syncstorage-spanner/src/pool.rs @@ -72,7 +72,7 @@ impl SpannerDbPool { }) } - pub async fn get_async(&self) -> DbResult { + pub async fn get_spanner_db(&self) -> DbResult { let conn = self.pool.get().await.map_err(|e| match e { deadpool::managed::PoolError::Backend(dbe) => dbe, deadpool::managed::PoolError::Timeout(timeout_type) => { @@ -121,7 +121,7 @@ impl DbPool for SpannerDbPool { let mut metrics = self.metrics.clone(); metrics.start_timer("storage.spanner.get_pool", None); - self.get_async() + self.get_spanner_db() .await .map(|db| Box::new(db) as Box>) } diff --git a/syncstorage-spanner/src/support.rs b/syncstorage-spanner/src/support.rs index 533067e2..8ecb147e 100644 --- a/syncstorage-spanner/src/support.rs +++ b/syncstorage-spanner/src/support.rs @@ -160,7 +160,7 @@ impl ExecuteSqlRequestBuilder { } /// Execute a SQL read statement but return a non-blocking streaming result - pub fn execute_async(self, conn: &Conn) -> DbResult { + pub fn execute(self, conn: &Conn) -> DbResult { let stream = conn .client .execute_streaming_sql_opt(&self.prepare_request(conn), conn.session_opt()?)?; @@ -168,7 +168,7 @@ impl ExecuteSqlRequestBuilder { } /// Execute a DML statement, returning the exact count of modified rows - pub async fn execute_dml_async(self, conn: &Conn) -> DbResult { + pub async fn execute_dml(self, conn: &Conn) -> DbResult { let rs = conn .client .execute_sql_async_opt(&self.prepare_request(conn), conn.session_opt()?)?