Merge branch 'master' into feat/syncstorage-diesel-async-STOR-364

This commit is contained in:
Philip Jenvey 2025-09-24 10:49:12 -07:00 committed by GitHub
commit 4bf2f32e38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 235 additions and 296 deletions

View File

@ -160,7 +160,7 @@ async fn get_bsos_limit_offset() -> Result<(), DbError> {
let offset = "0".to_owned();
// XXX: validation?
/*
let bsos = db.get_bsos_sync(gbsos(uid, coll, &[], MAX_TIMESTAMP, 0, Sorting::Index, -1, 0))?;
let bsos = db.get_bsos(gbsos(uid, coll, &[], MAX_TIMESTAMP, 0, Sorting::Index, -1, 0))?;
.. etc
*/

View File

@ -19,17 +19,14 @@ use super::models::{SpannerDb, PRETOUCH_TS};
use super::support::{as_type, null_value, struct_type_field, IntoSpannerValue};
use super::DbResult;
pub async fn create_async(
db: &SpannerDb,
params: params::CreateBatch,
) -> DbResult<results::CreateBatch> {
pub async fn create(db: &SpannerDb, params: params::CreateBatch) -> DbResult<results::CreateBatch> {
let batch_id = Uuid::new_v4().simple().to_string();
let collection_id = db.get_collection_id_async(&params.collection).await?;
let collection_id = db.get_collection_id(&params.collection).await?;
let timestamp = db.checked_timestamp()?.as_i64();
// Ensure a parent record exists in user_collections before writing to batches
// (INTERLEAVE IN PARENT user_collections)
pretouch_collection_async(db, &params.user_id, collection_id).await?;
pretouch_collection(db, &params.user_id, collection_id).await?;
let new_batch = results::CreateBatch {
size: db
.check_quota(&params.user_id, &params.collection, collection_id)
@ -48,13 +45,14 @@ pub async fn create_async(
db.sql(
"INSERT INTO batches (fxa_uid, fxa_kid, collection_id, batch_id, expiry)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @batch_id, @expiry)",
)?
)
.await?
.params(sqlparams)
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.execute_dml(&db.conn)
.await?;
do_append_async(
do_append(
db,
params.user_id,
collection_id,
@ -66,16 +64,16 @@ pub async fn create_async(
Ok(new_batch)
}
pub async fn validate_async(db: &SpannerDb, params: params::ValidateBatch) -> DbResult<bool> {
let exists = get_async(db, params.into()).await?;
pub async fn validate(db: &SpannerDb, params: params::ValidateBatch) -> DbResult<bool> {
let exists = get_query(db, params.into()).await?;
Ok(exists.is_some())
}
// Append a collection to a pending batch (`create_batch` creates a new batch)
pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> DbResult<()> {
pub async fn append(db: &SpannerDb, params: params::AppendToBatch) -> DbResult<()> {
let mut metrics = db.metrics.clone();
metrics.start_timer("storage.spanner.append_items_to_batch", None);
let collection_id = db.get_collection_id_async(&params.collection).await?;
let collection_id = db.get_collection_id(&params.collection).await?;
let current_size = db
.check_quota(&params.user_id, &params.collection, collection_id)
@ -86,7 +84,7 @@ pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> DbRe
}
// confirm that this batch exists or has not yet been committed.
let exists = validate_async(
let exists = validate(
db,
params::ValidateBatch {
user_id: params.user_id.clone(),
@ -101,7 +99,7 @@ pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> DbRe
return Err(DbError::batch_not_found());
}
do_append_async(
do_append(
db,
params.user_id,
collection_id,
@ -113,11 +111,11 @@ pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> DbRe
Ok(())
}
pub async fn get_async(
pub async fn get_query(
db: &SpannerDb,
params: params::GetBatch,
) -> DbResult<Option<results::GetBatch>> {
let collection_id = db.get_collection_id_async(&params.collection).await?;
let collection_id = db.get_collection_id(&params.collection).await?;
let (sqlparams, sqlparam_types) = params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
@ -133,18 +131,19 @@ pub async fn get_async(
AND collection_id = @collection_id
AND batch_id = @batch_id
AND expiry > CURRENT_TIMESTAMP()",
)?
)
.await?
.params(sqlparams)
.param_types(sqlparam_types)
.execute_async(&db.conn)?
.execute(&db.conn)?
.one_or_none()
.await?
.map(move |_| params::Batch { id: params.id });
Ok(batch)
}
pub async fn delete_async(db: &SpannerDb, params: params::DeleteBatch) -> DbResult<()> {
let collection_id = db.get_collection_id_async(&params.collection).await?;
pub async fn delete_query(db: &SpannerDb, params: params::DeleteBatch) -> DbResult<()> {
let collection_id = db.get_collection_id(&params.collection).await?;
let (sqlparams, sqlparam_types) = params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
@ -159,26 +158,27 @@ pub async fn delete_async(db: &SpannerDb, params: params::DeleteBatch) -> DbResu
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id
AND batch_id = @batch_id",
)?
)
.await?
.params(sqlparams)
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.execute_dml(&db.conn)
.await?;
Ok(())
}
pub async fn commit_async(
pub async fn commit_query(
db: &SpannerDb,
params: params::CommitBatch,
) -> DbResult<results::CommitBatch> {
let mut metrics = db.metrics.clone();
metrics.start_timer("storage.spanner.apply_batch", None);
let collection_id = db.get_collection_id_async(&params.collection).await?;
let collection_id = db.get_collection_id(&params.collection).await?;
// Ensure a parent record exists in user_collections before writing to bsos
// (INTERLEAVE IN PARENT user_collections)
let timestamp = db
.update_collection_async(&params.user_id, collection_id, &params.collection)
.update_collection(&params.user_id, collection_id, &params.collection)
.await?;
let as_rfc3339 = timestamp.as_rfc3339()?;
@ -197,10 +197,11 @@ pub async fn commit_async(
sqlparam_types.insert("timestamp".to_owned(), as_type(TypeCode::TIMESTAMP));
// NOTE: This write treats both expired and non-expired as existing
// bsos. See the note in [SpannerDb::post_bsos_with_mutations]
db.sql(include_str!("batch_commit_update.sql"))?
db.sql(include_str!("batch_commit_update.sql"))
.await?
.params(sqlparams)
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.execute_dml(&db.conn)
.await?;
}
@ -220,14 +221,15 @@ pub async fn commit_async(
timer3.start_timer("storage.spanner.apply_batch_insert", None);
// NOTE: This write treats both expired and non-expired as existing
// bsos. See the note in [SpannerDb::post_bsos_with_mutations]
db.sql(include_str!("batch_commit_insert.sql"))?
db.sql(include_str!("batch_commit_insert.sql"))
.await?
.params(sqlparams)
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.execute_dml(&db.conn)
.await?;
}
delete_async(
delete_query(
db,
params::DeleteBatch {
user_id: params.user_id.clone(),
@ -246,7 +248,7 @@ pub async fn commit_async(
}
// Append a collection to an existing, pending batch.
pub async fn do_append_async(
pub async fn do_append(
db: &SpannerDb,
user_id: UserIdentifier,
collection_id: i32,
@ -315,10 +317,11 @@ pub async fn do_append_async(
AND collection_id=@collection_id
AND batch_id=@batch_id
AND batch_bso_id in UNNEST(@ids);",
)?
)
.await?
.params(sqlparams)
.param_types(sqlparam_types)
.execute_async(&db.conn)?;
.execute(&db.conn)?;
while let Some(row) = existing_stream.try_next().await? {
existing.insert(exist_idx(
&collection_id.to_string(),
@ -439,10 +442,11 @@ pub async fn do_append_async(
"INSERT INTO batch_bsos (fxa_uid, fxa_kid, collection_id, batch_id, batch_bso_id,
sortindex, payload, ttl)
SELECT * FROM UNNEST(@values)",
)?
)
.await?
.params(sqlparams)
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.execute_dml(&db.conn)
.await?;
db.metrics.count_with_tags(
"storage.spanner.batch.insert",
@ -490,10 +494,11 @@ pub async fn do_append_async(
WHERE fxa_uid=@fxa_uid AND fxa_kid=@fxa_kid AND collection_id=@collection_id
AND batch_id=@batch_id AND batch_bso_id=@batch_bso_id",
updatable = updatable
))?
))
.await?
.params(params)
.param_types(param_types.clone())
.execute_dml_async(&db.conn)
.execute_dml(&db.conn)
.await?;
}
}
@ -509,7 +514,7 @@ pub async fn do_append_async(
///
/// For the special case of a user creating a batch for a collection with no
/// prior data.
async fn pretouch_collection_async(
async fn pretouch_collection(
db: &SpannerDb,
user_id: &UserIdentifier,
collection_id: i32,
@ -526,10 +531,11 @@ async fn pretouch_collection_async(
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id",
)?
)
.await?
.params(sqlparams.clone())
.param_types(sqlparam_types.clone())
.execute_async(&db.conn)?
.execute(&db.conn)?
.one_or_none()
.await?;
if result.is_none() {
@ -545,10 +551,11 @@ async fn pretouch_collection_async(
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified)"
};
db.sql(sql)?
db.sql(sql)
.await?
.params(sqlparams)
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.execute_dml(&db.conn)
.await?;
}
Ok(())

File diff suppressed because it is too large Load Diff

View File

@ -72,7 +72,7 @@ impl SpannerDbPool {
})
}
pub async fn get_async(&self) -> DbResult<SpannerDb> {
pub async fn get_spanner_db(&self) -> DbResult<SpannerDb> {
let conn = self.pool.get().await.map_err(|e| match e {
deadpool::managed::PoolError::Backend(dbe) => dbe,
deadpool::managed::PoolError::Timeout(timeout_type) => {
@ -121,7 +121,7 @@ impl DbPool for SpannerDbPool {
let mut metrics = self.metrics.clone();
metrics.start_timer("storage.spanner.get_pool", None);
self.get_async()
self.get_spanner_db()
.await
.map(|db| Box::new(db) as Box<dyn Db<Error = Self::Error>>)
}

View File

@ -160,7 +160,7 @@ impl ExecuteSqlRequestBuilder {
}
/// Execute a SQL read statement but return a non-blocking streaming result
pub fn execute_async(self, conn: &Conn) -> DbResult<StreamedResultSetAsync> {
pub fn execute(self, conn: &Conn) -> DbResult<StreamedResultSetAsync> {
let stream = conn
.client
.execute_streaming_sql_opt(&self.prepare_request(conn), conn.session_opt()?)?;
@ -168,7 +168,7 @@ impl ExecuteSqlRequestBuilder {
}
/// Execute a DML statement, returning the exact count of modified rows
pub async fn execute_dml_async(self, conn: &Conn) -> DbResult<i64> {
pub async fn execute_dml(self, conn: &Conn) -> DbResult<i64> {
let rs = conn
.client
.execute_sql_async_opt(&self.prepare_request(conn), conn.session_opt()?)?