From 442c4c05a1939b70d9632ce2228e036ef8d7589c Mon Sep 17 00:00:00 2001 From: Donovan Preston Date: Thu, 12 Mar 2020 12:50:41 -0400 Subject: [PATCH 1/2] fix: Convert erect_tombstone to async/await And in the process, remove all the spanner synchronous support code. The spanner backend is now truly 100% sync code free. Fix #483 --- src/db/spanner/models.rs | 140 ++------------------------- src/db/spanner/support.rs | 192 +------------------------------------- 2 files changed, 9 insertions(+), 323 deletions(-) diff --git a/src/db/spanner/models.rs b/src/db/spanner/models.rs index 60eeff40..e15a4f1c 100644 --- a/src/db/spanner/models.rs +++ b/src/db/spanner/models.rs @@ -155,31 +155,6 @@ impl SpannerDb { Ok(id) } - #[allow(dead_code)] - pub(super) fn get_collection_id(&self, name: &str) -> Result { - if let Some(id) = self.coll_cache.get_id(name)? { - return Ok(id); - } - let result = self - .sql( - "SELECT collection_id - FROM collections - WHERE name = @name", - )? - .params(params! {"name" => name.to_string()}) - .execute(&self.conn)? - .one_or_none()? - .ok_or(DbErrorKind::CollectionNotFound)?; - let id = result[0] - .get_string_value() - .parse::() - .map_err(|e| DbErrorKind::Integrity(e.to_string()))?; - if !self.in_write_transaction() { - self.coll_cache.put(id, name.to_owned())?; - } - Ok(id) - } - pub(super) async fn create_collection_async(&self, name: &str) -> Result { // This should always run within a r/w transaction, so that: "If a // transaction successfully commits, then no other writer modified the @@ -214,39 +189,6 @@ impl SpannerDb { Ok(id) } - #[allow(dead_code)] - pub(super) fn create_collection(&self, name: &str) -> Result { - // This should always run within a r/w transaction, so that: "If a - // transaction successfully commits, then no other writer modified the - // data that was read in the transaction after it was read." - if !cfg!(test) && !self.in_write_transaction() { - Err(DbError::internal("Can't escalate read-lock to write-lock"))? - } - let result = self - .sql( - "SELECT COALESCE(MAX(collection_id), 1) - FROM collections", - )? - .execute(&self.conn)? - .one()?; - let max = result[0] - .get_string_value() - .parse::() - .map_err(|e| DbErrorKind::Integrity(e.to_string()))?; - let id = FIRST_CUSTOM_COLLECTION_ID.max(max + 1); - - self.sql( - "INSERT INTO collections (collection_id, name) - VALUES (@collection_id, @name)", - )? - .params(params! { - "name" => name.to_string(), - "collection_id" => id.to_string(), - }) - .execute_dml(&self.conn)?; - Ok(id) - } - async fn get_or_create_collection_id_async(&self, name: &str) -> Result { let result = self.get_collection_id_async(name).await; if let Err(err) = result { @@ -854,7 +796,7 @@ impl SpannerDb { } } - fn erect_tombstone(&self, user_id: &HawkIdentifier) -> Result { + async fn erect_tombstone(&self, user_id: &HawkIdentifier) -> Result { // Delete the old tombstone (if it exists) let params = params! { "fxa_uid" => user_id.fxa_uid.clone(), @@ -874,7 +816,8 @@ impl SpannerDb { )? .params(params.clone()) .param_types(types.clone()) - .execute_dml(&self.conn)?; + .execute_dml_async(&self.conn) + .await?; self.sql( "INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified) @@ -882,7 +825,8 @@ impl SpannerDb { )? .params(params) .param_types(types) - .execute_dml(&self.conn)?; + .execute_dml_async(&self.conn) + .await?; // Return timestamp, because sometimes there's a delay between writing and // reading the database. Ok(self.timestamp()?) @@ -938,81 +882,12 @@ impl SpannerDb { .execute_dml_async(&self.conn) .await?; if affected_rows > 0 { - self.erect_tombstone(¶ms.user_id) + self.erect_tombstone(¶ms.user_id).await } else { self.get_storage_timestamp(params.user_id).await } } - // I think we can remove this but I'm not 100% sure if the db tests use it or not. - #[allow(dead_code)] - pub(super) fn touch_collection( - &self, - user_id: &HawkIdentifier, - collection_id: i32, - ) -> Result { - // NOTE: Spanner supports upserts via its InsertOrUpdate mutation but - // lacks a SQL equivalent. This call could be 1 InsertOrUpdate instead - // of 2 queries but would require put/post_bsos to also use mutations. - // Due to case of when no parent row exists (in user_collections) - // before writing to bsos. Spanner requires a parent table row exist - // before child table rows are written. - // Mutations don't run in the same order as ExecuteSql calls, they are - // buffered on the client side and only issued to Spanner in the final - // transaction Commit. - let timestamp = self.timestamp()?; - if !cfg!(test) && self.session.borrow().touched_collection { - // No need to touch it again (except during tests where we - // currently reuse Dbs for multiple requests) - return Ok(timestamp); - } - - let sqlparams = params! { - "fxa_uid" => user_id.fxa_uid.clone(), - "fxa_kid" => user_id.fxa_kid.clone(), - "collection_id" => collection_id.to_string(), - "modified" => timestamp.as_rfc3339()?, - }; - let sql_types = param_types! { - "modified" => TypeCode::TIMESTAMP, - }; - let result = self - .sql( - "SELECT 1 AS count - FROM user_collections - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id", - )? - .params(sqlparams.clone()) - .execute(&self.conn)? - .one_or_none()?; - let exists = result.is_some(); - - if exists { - self.sql( - "UPDATE user_collections - SET modified = @modified - WHERE fxa_uid = @fxa_uid - AND fxa_kid = @fxa_kid - AND collection_id = @collection_id", - )? - .params(sqlparams) - .param_types(sql_types) - .execute_dml(&self.conn)?; - } else { - self.sql( - "INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified) - VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified)", - )? - .params(sqlparams) - .param_types(sql_types) - .execute_dml(&self.conn)?; - } - self.session.borrow_mut().touched_collection = true; - Ok(timestamp) - } - pub(super) async fn touch_collection_async( &self, user_id: &HawkIdentifier, @@ -1928,8 +1803,9 @@ impl Db for SpannerDb { fn touch_collection(&self, param: params::TouchCollection) -> DbFuture { let db = self.clone(); Box::pin(async move { - db.touch_collection(¶m.user_id, param.collection_id) + db.touch_collection_async(¶m.user_id, param.collection_id) .map_err(Into::into) + .await }) } diff --git a/src/db/spanner/support.rs b/src/db/spanner/support.rs index 4878fe8f..25bf8b48 100644 --- a/src/db/spanner/support.rs +++ b/src/db/spanner/support.rs @@ -1,11 +1,9 @@ use std::{ - cell::RefCell, collections::{HashMap, VecDeque}, - fmt, mem, + mem, result::Result as StdResult, }; -use actix_rt::{System, SystemRunner}; use futures::compat::{Compat01As03, Future01CompatExt, Stream01CompatExt}; use futures::stream::{StreamExt, StreamFuture}; use googleapis_raw::spanner::v1::{ @@ -103,14 +101,6 @@ impl ExecuteSqlRequestBuilder { request } - /// Execute a SQL read statement - pub fn execute(self, conn: &Conn) -> Result { - let stream = conn - .client - .execute_streaming_sql(&self.prepare_request(conn))?; - Ok(StreamedResultSet::new(stream)) - } - /// Execute a SQL read statement but return a non-blocking streaming result pub fn execute_async(self, conn: &Conn) -> Result { let stream = conn @@ -119,12 +109,6 @@ impl ExecuteSqlRequestBuilder { Ok(StreamedResultSetAsync::new(stream)) } - /// Execute a DML statement, returning the exact count of modified rows - pub fn execute_dml(self, conn: &Conn) -> Result { - let rs = conn.client.execute_sql(&self.prepare_request(conn))?; - Ok(rs.get_stats().get_row_count_exact()) - } - /// Execute a DML statement, returning the exact count of modified rows pub async fn execute_dml_async(self, conn: &Conn) -> Result { let rs = conn @@ -136,180 +120,6 @@ impl ExecuteSqlRequestBuilder { } } -/// Streams results from an ExecuteStreamingSql PartialResultSet -/// -/// Utilizies block_on, so should *always* be called from a thread -/// outside of an event loop -pub struct StreamedResultSet { - /// Stream from execute_streaming_sql - stream: Option>>>, - - metadata: Option, - stats: Option, - - /// Fully-processed rows - rows: VecDeque>, - /// Accumulated values for incomplete row - current_row: Vec, - /// Incomplete value - pending_chunk: Option, -} - -thread_local! { - static SYSTEM: RefCell = { - RefCell::new(System::new("syncstorage")) - }; -} - -impl StreamedResultSet { - pub fn new(stream: ClientSStreamReceiver) -> Self { - Self { - stream: Some(stream.compat().into_future()), - metadata: None, - stats: None, - rows: Default::default(), - current_row: vec![], - pending_chunk: None, - } - } - - #[allow(dead_code)] - pub fn metadata(&self) -> Option<&ResultSetMetadata> { - self.metadata.as_ref() - } - - #[allow(dead_code)] - pub fn stats(&self) -> Option<&ResultSetStats> { - self.stats.as_ref() - } - - pub fn fields(&self) -> &[StructType_Field] { - match self.metadata { - Some(ref metadata) => metadata.get_row_type().get_fields(), - None => &[], - } - } - - pub fn one(&mut self) -> Result> { - if let Some(result) = self.one_or_none()? { - Ok(result) - } else { - Err(DbError::internal("No rows matched the given query."))? - } - } - - pub fn one_or_none(&mut self) -> Result>> { - let result = self.next(); - if result.is_none() { - Ok(None) - } else if self.next().is_some() { - Err(DbError::internal("Execpted one result; got more."))? - } else { - result.transpose() - } - } - - /// Pull and process the next values from the Stream - /// - /// Returns false when the stream is finished - fn consume_next(&mut self) -> Result { - let (result, stream) = SYSTEM.with(|system| { - system.borrow_mut().block_on( - self.stream - .take() - .expect("Could not get next stream element"), - ) - }); - self.stream = Some(stream.into_future()); - let mut partial_rs = if let Some(result) = result { - result? - } else { - // Stream finished - return Ok(false); - }; - - if self.metadata.is_none() && partial_rs.has_metadata() { - // first response - self.metadata = Some(partial_rs.take_metadata()); - } - if partial_rs.has_stats() { - // last response - self.stats = Some(partial_rs.take_stats()); - } - - let mut values = partial_rs.take_values().into_vec(); - if values.is_empty() { - // sanity check - return Ok(true); - } - - if let Some(pending_chunk) = self.pending_chunk.take() { - let fields = self.fields(); - let current_row_i = self.current_row.len(); - if fields.len() <= current_row_i { - Err(DbErrorKind::Integrity( - "Invalid PartialResultSet fields".to_owned(), - ))?; - } - let field = &fields[current_row_i]; - values[0] = merge_by_type(pending_chunk, &values[0], field.get_field_type())?; - } - if partial_rs.get_chunked_value() { - self.pending_chunk = values.pop(); - } - - self.consume_values(values); - Ok(true) - } - - fn consume_values(&mut self, values: Vec) { - let width = self.fields().len(); - for value in values { - self.current_row.push(value); - if self.current_row.len() == width { - let current_row = mem::replace(&mut self.current_row, vec![]); - self.rows.push_back(current_row); - } - } - } -} - -/// Iteration around the result stream -/// -/// `Item` is a Result as errors may happen -/// mid-stream. `itertools::IterTools::map_results` and -/// `MapAndThenIterator::map_and_then` can aid in removing boilerplate around -/// handling of said Results -impl Iterator for StreamedResultSet { - type Item = Result>; - - fn next(&mut self) -> Option { - while self.rows.is_empty() { - match self.consume_next() { - Ok(true) => (), - Ok(false) => return None, - // Note: Iteration may continue after an error. We may want to - // stop afterwards instead for safety sake (it's not really - // recoverable) - Err(e) => return Some(Err(e)), - } - } - Ok(self.rows.pop_front()).transpose() - } -} - -impl fmt::Debug for StreamedResultSet { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("StreamedResultSet") - .field("metadata", &self.metadata) - .field("stats", &self.stats) - .field("rows", &self.rows) - .field("current_row", &self.current_row) - .field("pending_chunk", &self.pending_chunk) - .finish() - } -} - pub struct StreamedResultSetAsync { /// Stream from execute_streaming_sql stream: Option>>>, From 546d96ca2885003e4d912a72bccf33f2f6fcb1f2 Mon Sep 17 00:00:00 2001 From: Donovan Preston Date: Thu, 12 Mar 2020 13:08:44 -0400 Subject: [PATCH 2/2] fix: `cargo clippy` for rust 1.42 --- src/db/mysql/models.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/mysql/models.rs b/src/db/mysql/models.rs index 8672485b..950ee230 100644 --- a/src/db/mysql/models.rs +++ b/src/db/mysql/models.rs @@ -379,7 +379,7 @@ impl MysqlDb { let timestamp = self.timestamp().as_i64(); self.conn.transaction(|| { - let payload = bso.payload.as_ref().map(Deref::deref).unwrap_or_default(); + let payload = bso.payload.as_deref().unwrap_or_default(); let sortindex = bso.sortindex; let ttl = bso.ttl.map_or(DEFAULT_BSO_TTL, |ttl| ttl); let q = format!(r#"