Merge pull request #482 from mozilla-services/462-finish-async-await-conversion-in-spanner

refactor: Convert the rest of the spanner module to async await
This commit is contained in:
Donovan Preston 2020-03-11 19:28:23 -04:00 committed by GitHub
commit e1596bf47e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,5 +1,3 @@
#[cfg(test)]
use actix_web::web::block;
use futures::compat::Future01CompatExt;
use futures::future::TryFutureExt;
@ -157,6 +155,7 @@ impl SpannerDb {
Ok(id)
}
#[allow(dead_code)]
pub(super) fn get_collection_id(&self, name: &str) -> Result<i32> {
if let Some(id) = self.coll_cache.get_id(name)? {
return Ok(id);
@ -248,14 +247,6 @@ impl SpannerDb {
Ok(id)
}
#[allow(dead_code)]
fn get_or_create_collection_id(&self, name: &str) -> Result<i32> {
self.get_collection_id(name).or_else(|e| match e.kind() {
DbErrorKind::CollectionNotFound => self.create_collection(name),
_ => Err(e),
})
}
async fn get_or_create_collection_id_async(&self, name: &str) -> Result<i32> {
let result = self.get_collection_id_async(name).await;
if let Err(err) = result {
@ -1524,12 +1515,14 @@ impl SpannerDb {
Ok(result)
}
// NOTE: Currently this put_bso_sync impl. is only used during db tests,
// NOTE: Currently this put_bso_async_test impl. is only used during db tests,
// see above for the non-tests version
#[cfg(test)]
pub fn put_bso_sync(&self, bso: params::PutBso) -> Result<results::PutBso> {
pub async fn put_bso_async_test(&self, bso: params::PutBso) -> Result<results::PutBso> {
use crate::db::util::to_rfc3339;
let collection_id = self.get_or_create_collection_id(&bso.collection)?;
let collection_id = self
.get_or_create_collection_id_async(&bso.collection)
.await?;
let mut sqlparams = params! {
"fxa_uid" => bso.user_id.fxa_uid.clone(),
"fxa_kid" => bso.user_id.fxa_kid.clone(),
@ -1537,7 +1530,9 @@ impl SpannerDb {
"bso_id" => bso.id.to_string(),
};
let mut sqltypes = HashMap::new();
let touch = self.touch_collection(&bso.user_id, collection_id)?;
let touch = self
.touch_collection_async(&bso.user_id, collection_id)
.await?;
let timestamp = self.timestamp()?;
let result = self
@ -1550,8 +1545,9 @@ impl SpannerDb {
AND bso_id = @bso_id",
)?
.params(sqlparams.clone())
.execute(&self.conn)?
.one_or_none()?;
.execute_async(&self.conn)?
.one_or_none()
.await?;
let exists = result.is_some();
let sql = if exists {
@ -1656,7 +1652,7 @@ impl SpannerDb {
let now_millis = timestamp.as_i64();
let ttl = bso.ttl.map_or(i64::from(DEFAULT_BSO_TTL), |ttl| {
ttl.try_into()
.expect("Could not get ttl in put_bso_sync (test)")
.expect("Could not get ttl in put_bso_async_test")
}) * 1000;
let expirystring = to_rfc3339(now_millis + ttl)?;
debug!(
@ -1674,16 +1670,19 @@ impl SpannerDb {
self.sql(&sql)?
.params(sqlparams)
.param_types(sqltypes)
.execute_dml(&self.conn)?;
.execute_dml_async(&self.conn)
.await?;
Ok(touch)
}
// NOTE: Currently this post_bso_sync impl. is only used during db tests,
// NOTE: Currently this post_bso_async_test impl. is only used during db tests,
// see above for the non-tests version
#[cfg(test)]
pub fn post_bsos_sync(&self, input: params::PostBsos) -> Result<results::PostBsos> {
let collection_id = self.get_or_create_collection_id(&input.collection)?;
pub async fn post_bsos_async_test(&self, input: params::PostBsos) -> Result<results::PostBsos> {
let collection_id = self
.get_or_create_collection_id_async(&input.collection)
.await?;
let mut result = results::PostBsos {
modified: self.timestamp()?,
success: Default::default(),
@ -1692,17 +1691,19 @@ impl SpannerDb {
for pbso in input.bsos {
let id = pbso.id;
self.put_bso_sync(params::PutBso {
self.put_bso_async(params::PutBso {
user_id: input.user_id.clone(),
collection: input.collection.clone(),
id: id.clone(),
payload: pbso.payload,
sortindex: pbso.sortindex,
ttl: pbso.ttl,
})?;
})
.await?;
result.success.push(id);
}
self.touch_collection(&input.user_id, collection_id)?;
self.touch_collection_async(&input.user_id, collection_id)
.await?;
Ok(result)
}
@ -1867,7 +1868,7 @@ impl Db for SpannerDb {
#[cfg(test)]
fn put_bso(&self, param: params::PutBso) -> DbFuture<results::PutBso> {
let db = self.clone();
Box::pin(block(move || db.put_bso_sync(param).map_err(Into::into)).map_err(Into::into))
Box::pin(async move { db.put_bso_async_test(param).map_err(Into::into).await })
}
#[cfg(not(test))]
@ -1879,7 +1880,7 @@ impl Db for SpannerDb {
#[cfg(test)]
fn post_bsos(&self, param: params::PostBsos) -> DbFuture<results::PostBsos> {
let db = self.clone();
Box::pin(block(move || db.post_bsos_sync(param).map_err(Into::into)).map_err(Into::into))
Box::pin(async move { db.post_bsos_async_test(param).map_err(Into::into).await })
}
fn validate_batch_id(&self, id: String) -> Result<()> {
@ -1914,25 +1915,22 @@ impl Db for SpannerDb {
#[cfg(test)]
fn get_collection_id(&self, name: String) -> DbFuture<i32> {
let db = self.clone();
Box::pin(block(move || db.get_collection_id(&name).map_err(Into::into)).map_err(Into::into))
Box::pin(async move { db.get_collection_id_async(&name).map_err(Into::into).await })
}
#[cfg(test)]
fn create_collection(&self, name: String) -> DbFuture<i32> {
let db = self.clone();
Box::pin(block(move || db.create_collection(&name).map_err(Into::into)).map_err(Into::into))
Box::pin(async move { db.create_collection_async(&name).map_err(Into::into).await })
}
#[cfg(test)]
fn touch_collection(&self, param: params::TouchCollection) -> DbFuture<SyncTimestamp> {
let db = self.clone();
Box::pin(
block(move || {
db.touch_collection(&param.user_id, param.collection_id)
.map_err(Into::into)
})
.map_err(Into::into),
)
Box::pin(async move {
db.touch_collection(&param.user_id, param.collection_id)
.map_err(Into::into)
})
}
#[cfg(test)]