From e2017bbc2aee60399da2e9b750b7ecce856c4559 Mon Sep 17 00:00:00 2001 From: Donovan Preston Date: Wed, 11 Mar 2020 12:35:48 -0400 Subject: [PATCH 1/3] refactor: Convert the rest of the spanner module to async await This removes usage of block() from the spanner backend implementation, even in test code. Fix #462 --- src/db/spanner/models.rs | 53 ++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/src/db/spanner/models.rs b/src/db/spanner/models.rs index 7f0a5e7e..55bafc2f 100644 --- a/src/db/spanner/models.rs +++ b/src/db/spanner/models.rs @@ -1,5 +1,3 @@ -#[cfg(test)] -use actix_web::web::block; use futures::compat::Future01CompatExt; use futures::future::TryFutureExt; @@ -1524,12 +1522,12 @@ impl SpannerDb { Ok(result) } - // NOTE: Currently this put_bso_sync impl. is only used during db tests, + // NOTE: Currently this put_bso_async_test impl. is only used during db tests, // see above for the non-tests version #[cfg(test)] - pub fn put_bso_sync(&self, bso: params::PutBso) -> Result { + pub async fn put_bso_async_test(&self, bso: params::PutBso) -> Result { use crate::db::util::to_rfc3339; - let collection_id = self.get_or_create_collection_id(&bso.collection)?; + let collection_id = self.get_or_create_collection_id_async(&bso.collection).await?; let mut sqlparams = params! { "fxa_uid" => bso.user_id.fxa_uid.clone(), "fxa_kid" => bso.user_id.fxa_kid.clone(), @@ -1537,7 +1535,7 @@ impl SpannerDb { "bso_id" => bso.id.to_string(), }; let mut sqltypes = HashMap::new(); - let touch = self.touch_collection(&bso.user_id, collection_id)?; + let touch = self.touch_collection_async(&bso.user_id, collection_id).await?; let timestamp = self.timestamp()?; let result = self @@ -1550,8 +1548,8 @@ impl SpannerDb { AND bso_id = @bso_id", )? .params(sqlparams.clone()) - .execute(&self.conn)? - .one_or_none()?; + .execute_async(&self.conn)? + .one_or_none().await?; let exists = result.is_some(); let sql = if exists { @@ -1656,7 +1654,7 @@ impl SpannerDb { let now_millis = timestamp.as_i64(); let ttl = bso.ttl.map_or(i64::from(DEFAULT_BSO_TTL), |ttl| { ttl.try_into() - .expect("Could not get ttl in put_bso_sync (test)") + .expect("Could not get ttl in put_bso_async_test") }) * 1000; let expirystring = to_rfc3339(now_millis + ttl)?; debug!( @@ -1674,16 +1672,16 @@ impl SpannerDb { self.sql(&sql)? .params(sqlparams) .param_types(sqltypes) - .execute_dml(&self.conn)?; + .execute_dml_async(&self.conn).await?; Ok(touch) } - // NOTE: Currently this post_bso_sync impl. is only used during db tests, + // NOTE: Currently this post_bso_async_test impl. is only used during db tests, // see above for the non-tests version #[cfg(test)] - pub fn post_bsos_sync(&self, input: params::PostBsos) -> Result { - let collection_id = self.get_or_create_collection_id(&input.collection)?; + pub async fn post_bsos_async_test(&self, input: params::PostBsos) -> Result { + let collection_id = self.get_or_create_collection_id_async(&input.collection).await?; let mut result = results::PostBsos { modified: self.timestamp()?, success: Default::default(), @@ -1692,17 +1690,17 @@ impl SpannerDb { for pbso in input.bsos { let id = pbso.id; - self.put_bso_sync(params::PutBso { + self.put_bso_async(params::PutBso { user_id: input.user_id.clone(), collection: input.collection.clone(), id: id.clone(), payload: pbso.payload, sortindex: pbso.sortindex, ttl: pbso.ttl, - })?; + }).await?; result.success.push(id); } - self.touch_collection(&input.user_id, collection_id)?; + self.touch_collection_async(&input.user_id, collection_id).await?; Ok(result) } @@ -1867,7 +1865,7 @@ impl Db for SpannerDb { #[cfg(test)] fn put_bso(&self, param: params::PutBso) -> DbFuture { let db = self.clone(); - Box::pin(block(move || db.put_bso_sync(param).map_err(Into::into)).map_err(Into::into)) + Box::pin(async move { db.put_bso_async_test(param).map_err(Into::into).await }) } #[cfg(not(test))] @@ -1879,7 +1877,7 @@ impl Db for SpannerDb { #[cfg(test)] fn post_bsos(&self, param: params::PostBsos) -> DbFuture { let db = self.clone(); - Box::pin(block(move || db.post_bsos_sync(param).map_err(Into::into)).map_err(Into::into)) + Box::pin(async move { db.post_bsos_async_test(param).map_err(Into::into).await }) } fn validate_batch_id(&self, id: String) -> Result<()> { @@ -1914,25 +1912,26 @@ impl Db for SpannerDb { #[cfg(test)] fn get_collection_id(&self, name: String) -> DbFuture { let db = self.clone(); - Box::pin(block(move || db.get_collection_id(&name).map_err(Into::into)).map_err(Into::into)) + Box::pin(async move { + db.get_collection_id(&name).map_err(Into::into).await + }) } #[cfg(test)] fn create_collection(&self, name: String) -> DbFuture { let db = self.clone(); - Box::pin(block(move || db.create_collection(&name).map_err(Into::into)).map_err(Into::into)) + Box::pin(async move { + db.create_collection(&name).map_err(Into::into).await + }) } #[cfg(test)] fn touch_collection(&self, param: params::TouchCollection) -> DbFuture { let db = self.clone(); - Box::pin( - block(move || { - db.touch_collection(¶m.user_id, param.collection_id) - .map_err(Into::into) - }) - .map_err(Into::into), - ) + Box::pin(async move { + db.touch_collection(¶m.user_id, param.collection_id) + .map_err(Into::into) + }) } #[cfg(test)] From 1f8cb88f6b8dc77cb6350005f75b44a31af23caf Mon Sep 17 00:00:00 2001 From: Donovan Preston Date: Wed, 11 Mar 2020 13:01:51 -0400 Subject: [PATCH 2/3] Whoops, call the async versions of these functions And remove even more code! --- src/db/spanner/models.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/db/spanner/models.rs b/src/db/spanner/models.rs index 55bafc2f..5c82f667 100644 --- a/src/db/spanner/models.rs +++ b/src/db/spanner/models.rs @@ -155,6 +155,7 @@ impl SpannerDb { Ok(id) } + #[allow(dead_code)] pub(super) fn get_collection_id(&self, name: &str) -> Result { if let Some(id) = self.coll_cache.get_id(name)? { return Ok(id); @@ -246,14 +247,6 @@ impl SpannerDb { Ok(id) } - #[allow(dead_code)] - fn get_or_create_collection_id(&self, name: &str) -> Result { - self.get_collection_id(name).or_else(|e| match e.kind() { - DbErrorKind::CollectionNotFound => self.create_collection(name), - _ => Err(e), - }) - } - async fn get_or_create_collection_id_async(&self, name: &str) -> Result { let result = self.get_collection_id_async(name).await; if let Err(err) = result { @@ -1913,7 +1906,7 @@ impl Db for SpannerDb { fn get_collection_id(&self, name: String) -> DbFuture { let db = self.clone(); Box::pin(async move { - db.get_collection_id(&name).map_err(Into::into).await + db.get_collection_id_async(&name).map_err(Into::into).await }) } @@ -1921,7 +1914,7 @@ impl Db for SpannerDb { fn create_collection(&self, name: String) -> DbFuture { let db = self.clone(); Box::pin(async move { - db.create_collection(&name).map_err(Into::into).await + db.create_collection_async(&name).map_err(Into::into).await }) } From 8930a388dcad8ff8bc61082ae987960b2726cbed Mon Sep 17 00:00:00 2001 From: Donovan Preston Date: Wed, 11 Mar 2020 13:37:03 -0400 Subject: [PATCH 3/3] cargo fmt --- src/db/spanner/models.rs | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/src/db/spanner/models.rs b/src/db/spanner/models.rs index 5c82f667..60eeff40 100644 --- a/src/db/spanner/models.rs +++ b/src/db/spanner/models.rs @@ -1520,7 +1520,9 @@ impl SpannerDb { #[cfg(test)] pub async fn put_bso_async_test(&self, bso: params::PutBso) -> Result { use crate::db::util::to_rfc3339; - let collection_id = self.get_or_create_collection_id_async(&bso.collection).await?; + let collection_id = self + .get_or_create_collection_id_async(&bso.collection) + .await?; let mut sqlparams = params! { "fxa_uid" => bso.user_id.fxa_uid.clone(), "fxa_kid" => bso.user_id.fxa_kid.clone(), @@ -1528,7 +1530,9 @@ impl SpannerDb { "bso_id" => bso.id.to_string(), }; let mut sqltypes = HashMap::new(); - let touch = self.touch_collection_async(&bso.user_id, collection_id).await?; + let touch = self + .touch_collection_async(&bso.user_id, collection_id) + .await?; let timestamp = self.timestamp()?; let result = self @@ -1542,7 +1546,8 @@ impl SpannerDb { )? .params(sqlparams.clone()) .execute_async(&self.conn)? - .one_or_none().await?; + .one_or_none() + .await?; let exists = result.is_some(); let sql = if exists { @@ -1665,7 +1670,8 @@ impl SpannerDb { self.sql(&sql)? .params(sqlparams) .param_types(sqltypes) - .execute_dml_async(&self.conn).await?; + .execute_dml_async(&self.conn) + .await?; Ok(touch) } @@ -1674,7 +1680,9 @@ impl SpannerDb { // see above for the non-tests version #[cfg(test)] pub async fn post_bsos_async_test(&self, input: params::PostBsos) -> Result { - let collection_id = self.get_or_create_collection_id_async(&input.collection).await?; + let collection_id = self + .get_or_create_collection_id_async(&input.collection) + .await?; let mut result = results::PostBsos { modified: self.timestamp()?, success: Default::default(), @@ -1690,10 +1698,12 @@ impl SpannerDb { payload: pbso.payload, sortindex: pbso.sortindex, ttl: pbso.ttl, - }).await?; + }) + .await?; result.success.push(id); } - self.touch_collection_async(&input.user_id, collection_id).await?; + self.touch_collection_async(&input.user_id, collection_id) + .await?; Ok(result) } @@ -1905,17 +1915,13 @@ impl Db for SpannerDb { #[cfg(test)] fn get_collection_id(&self, name: String) -> DbFuture { let db = self.clone(); - Box::pin(async move { - db.get_collection_id_async(&name).map_err(Into::into).await - }) + Box::pin(async move { db.get_collection_id_async(&name).map_err(Into::into).await }) } #[cfg(test)] fn create_collection(&self, name: String) -> DbFuture { let db = self.clone(); - Box::pin(async move { - db.create_collection_async(&name).map_err(Into::into).await - }) + Box::pin(async move { db.create_collection_async(&name).map_err(Into::into).await }) } #[cfg(test)]