Merge pull request #486 from mozilla-services/483-convert-erect_tomstone-to-async-await

fix: Convert erect_tombstone to async/await
This commit is contained in:
Donovan Preston 2020-03-12 15:49:16 -04:00 committed by GitHub
commit 2e761e44fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 10 additions and 324 deletions

View File

@ -379,7 +379,7 @@ impl MysqlDb {
let timestamp = self.timestamp().as_i64();
self.conn.transaction(|| {
let payload = bso.payload.as_ref().map(Deref::deref).unwrap_or_default();
let payload = bso.payload.as_deref().unwrap_or_default();
let sortindex = bso.sortindex;
let ttl = bso.ttl.map_or(DEFAULT_BSO_TTL, |ttl| ttl);
let q = format!(r#"

View File

@ -155,31 +155,6 @@ impl SpannerDb {
Ok(id)
}
#[allow(dead_code)]
pub(super) fn get_collection_id(&self, name: &str) -> Result<i32> {
if let Some(id) = self.coll_cache.get_id(name)? {
return Ok(id);
}
let result = self
.sql(
"SELECT collection_id
FROM collections
WHERE name = @name",
)?
.params(params! {"name" => name.to_string()})
.execute(&self.conn)?
.one_or_none()?
.ok_or(DbErrorKind::CollectionNotFound)?;
let id = result[0]
.get_string_value()
.parse::<i32>()
.map_err(|e| DbErrorKind::Integrity(e.to_string()))?;
if !self.in_write_transaction() {
self.coll_cache.put(id, name.to_owned())?;
}
Ok(id)
}
pub(super) async fn create_collection_async(&self, name: &str) -> Result<i32> {
// This should always run within a r/w transaction, so that: "If a
// transaction successfully commits, then no other writer modified the
@ -214,39 +189,6 @@ impl SpannerDb {
Ok(id)
}
#[allow(dead_code)]
pub(super) fn create_collection(&self, name: &str) -> Result<i32> {
// This should always run within a r/w transaction, so that: "If a
// transaction successfully commits, then no other writer modified the
// data that was read in the transaction after it was read."
if !cfg!(test) && !self.in_write_transaction() {
Err(DbError::internal("Can't escalate read-lock to write-lock"))?
}
let result = self
.sql(
"SELECT COALESCE(MAX(collection_id), 1)
FROM collections",
)?
.execute(&self.conn)?
.one()?;
let max = result[0]
.get_string_value()
.parse::<i32>()
.map_err(|e| DbErrorKind::Integrity(e.to_string()))?;
let id = FIRST_CUSTOM_COLLECTION_ID.max(max + 1);
self.sql(
"INSERT INTO collections (collection_id, name)
VALUES (@collection_id, @name)",
)?
.params(params! {
"name" => name.to_string(),
"collection_id" => id.to_string(),
})
.execute_dml(&self.conn)?;
Ok(id)
}
async fn get_or_create_collection_id_async(&self, name: &str) -> Result<i32> {
let result = self.get_collection_id_async(name).await;
if let Err(err) = result {
@ -854,7 +796,7 @@ impl SpannerDb {
}
}
fn erect_tombstone(&self, user_id: &HawkIdentifier) -> Result<SyncTimestamp> {
async fn erect_tombstone(&self, user_id: &HawkIdentifier) -> Result<SyncTimestamp> {
// Delete the old tombstone (if it exists)
let params = params! {
"fxa_uid" => user_id.fxa_uid.clone(),
@ -874,7 +816,8 @@ impl SpannerDb {
)?
.params(params.clone())
.param_types(types.clone())
.execute_dml(&self.conn)?;
.execute_dml_async(&self.conn)
.await?;
self.sql(
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified)
@ -882,7 +825,8 @@ impl SpannerDb {
)?
.params(params)
.param_types(types)
.execute_dml(&self.conn)?;
.execute_dml_async(&self.conn)
.await?;
// Return timestamp, because sometimes there's a delay between writing and
// reading the database.
Ok(self.timestamp()?)
@ -938,81 +882,12 @@ impl SpannerDb {
.execute_dml_async(&self.conn)
.await?;
if affected_rows > 0 {
self.erect_tombstone(&params.user_id)
self.erect_tombstone(&params.user_id).await
} else {
self.get_storage_timestamp(params.user_id).await
}
}
// I think we can remove this but I'm not 100% sure if the db tests use it or not.
#[allow(dead_code)]
pub(super) fn touch_collection(
&self,
user_id: &HawkIdentifier,
collection_id: i32,
) -> Result<SyncTimestamp> {
// NOTE: Spanner supports upserts via its InsertOrUpdate mutation but
// lacks a SQL equivalent. This call could be 1 InsertOrUpdate instead
// of 2 queries but would require put/post_bsos to also use mutations.
// Due to case of when no parent row exists (in user_collections)
// before writing to bsos. Spanner requires a parent table row exist
// before child table rows are written.
// Mutations don't run in the same order as ExecuteSql calls, they are
// buffered on the client side and only issued to Spanner in the final
// transaction Commit.
let timestamp = self.timestamp()?;
if !cfg!(test) && self.session.borrow().touched_collection {
// No need to touch it again (except during tests where we
// currently reuse Dbs for multiple requests)
return Ok(timestamp);
}
let sqlparams = params! {
"fxa_uid" => user_id.fxa_uid.clone(),
"fxa_kid" => user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"modified" => timestamp.as_rfc3339()?,
};
let sql_types = param_types! {
"modified" => TypeCode::TIMESTAMP,
};
let result = self
.sql(
"SELECT 1 AS count
FROM user_collections
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id",
)?
.params(sqlparams.clone())
.execute(&self.conn)?
.one_or_none()?;
let exists = result.is_some();
if exists {
self.sql(
"UPDATE user_collections
SET modified = @modified
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id",
)?
.params(sqlparams)
.param_types(sql_types)
.execute_dml(&self.conn)?;
} else {
self.sql(
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified)",
)?
.params(sqlparams)
.param_types(sql_types)
.execute_dml(&self.conn)?;
}
self.session.borrow_mut().touched_collection = true;
Ok(timestamp)
}
pub(super) async fn touch_collection_async(
&self,
user_id: &HawkIdentifier,
@ -1928,8 +1803,9 @@ impl Db for SpannerDb {
fn touch_collection(&self, param: params::TouchCollection) -> DbFuture<SyncTimestamp> {
let db = self.clone();
Box::pin(async move {
db.touch_collection(&param.user_id, param.collection_id)
db.touch_collection_async(&param.user_id, param.collection_id)
.map_err(Into::into)
.await
})
}

View File

@ -1,11 +1,9 @@
use std::{
cell::RefCell,
collections::{HashMap, VecDeque},
fmt, mem,
mem,
result::Result as StdResult,
};
use actix_rt::{System, SystemRunner};
use futures::compat::{Compat01As03, Future01CompatExt, Stream01CompatExt};
use futures::stream::{StreamExt, StreamFuture};
use googleapis_raw::spanner::v1::{
@ -103,14 +101,6 @@ impl ExecuteSqlRequestBuilder {
request
}
/// Execute a SQL read statement
pub fn execute(self, conn: &Conn) -> Result<StreamedResultSet> {
let stream = conn
.client
.execute_streaming_sql(&self.prepare_request(conn))?;
Ok(StreamedResultSet::new(stream))
}
/// Execute a SQL read statement but return a non-blocking streaming result
pub fn execute_async(self, conn: &Conn) -> Result<StreamedResultSetAsync> {
let stream = conn
@ -119,12 +109,6 @@ impl ExecuteSqlRequestBuilder {
Ok(StreamedResultSetAsync::new(stream))
}
/// Execute a DML statement, returning the exact count of modified rows
pub fn execute_dml(self, conn: &Conn) -> Result<i64> {
let rs = conn.client.execute_sql(&self.prepare_request(conn))?;
Ok(rs.get_stats().get_row_count_exact())
}
/// Execute a DML statement, returning the exact count of modified rows
pub async fn execute_dml_async(self, conn: &Conn) -> Result<i64> {
let rs = conn
@ -136,180 +120,6 @@ impl ExecuteSqlRequestBuilder {
}
}
/// Streams results from an ExecuteStreamingSql PartialResultSet
///
/// Utilizies block_on, so should *always* be called from a thread
/// outside of an event loop
pub struct StreamedResultSet {
/// Stream from execute_streaming_sql
stream: Option<StreamFuture<Compat01As03<ClientSStreamReceiver<PartialResultSet>>>>,
metadata: Option<ResultSetMetadata>,
stats: Option<ResultSetStats>,
/// Fully-processed rows
rows: VecDeque<Vec<Value>>,
/// Accumulated values for incomplete row
current_row: Vec<Value>,
/// Incomplete value
pending_chunk: Option<Value>,
}
thread_local! {
static SYSTEM: RefCell<SystemRunner> = {
RefCell::new(System::new("syncstorage"))
};
}
impl StreamedResultSet {
pub fn new(stream: ClientSStreamReceiver<PartialResultSet>) -> Self {
Self {
stream: Some(stream.compat().into_future()),
metadata: None,
stats: None,
rows: Default::default(),
current_row: vec![],
pending_chunk: None,
}
}
#[allow(dead_code)]
pub fn metadata(&self) -> Option<&ResultSetMetadata> {
self.metadata.as_ref()
}
#[allow(dead_code)]
pub fn stats(&self) -> Option<&ResultSetStats> {
self.stats.as_ref()
}
pub fn fields(&self) -> &[StructType_Field] {
match self.metadata {
Some(ref metadata) => metadata.get_row_type().get_fields(),
None => &[],
}
}
pub fn one(&mut self) -> Result<Vec<Value>> {
if let Some(result) = self.one_or_none()? {
Ok(result)
} else {
Err(DbError::internal("No rows matched the given query."))?
}
}
pub fn one_or_none(&mut self) -> Result<Option<Vec<Value>>> {
let result = self.next();
if result.is_none() {
Ok(None)
} else if self.next().is_some() {
Err(DbError::internal("Execpted one result; got more."))?
} else {
result.transpose()
}
}
/// Pull and process the next values from the Stream
///
/// Returns false when the stream is finished
fn consume_next(&mut self) -> Result<bool> {
let (result, stream) = SYSTEM.with(|system| {
system.borrow_mut().block_on(
self.stream
.take()
.expect("Could not get next stream element"),
)
});
self.stream = Some(stream.into_future());
let mut partial_rs = if let Some(result) = result {
result?
} else {
// Stream finished
return Ok(false);
};
if self.metadata.is_none() && partial_rs.has_metadata() {
// first response
self.metadata = Some(partial_rs.take_metadata());
}
if partial_rs.has_stats() {
// last response
self.stats = Some(partial_rs.take_stats());
}
let mut values = partial_rs.take_values().into_vec();
if values.is_empty() {
// sanity check
return Ok(true);
}
if let Some(pending_chunk) = self.pending_chunk.take() {
let fields = self.fields();
let current_row_i = self.current_row.len();
if fields.len() <= current_row_i {
Err(DbErrorKind::Integrity(
"Invalid PartialResultSet fields".to_owned(),
))?;
}
let field = &fields[current_row_i];
values[0] = merge_by_type(pending_chunk, &values[0], field.get_field_type())?;
}
if partial_rs.get_chunked_value() {
self.pending_chunk = values.pop();
}
self.consume_values(values);
Ok(true)
}
fn consume_values(&mut self, values: Vec<Value>) {
let width = self.fields().len();
for value in values {
self.current_row.push(value);
if self.current_row.len() == width {
let current_row = mem::replace(&mut self.current_row, vec![]);
self.rows.push_back(current_row);
}
}
}
}
/// Iteration around the result stream
///
/// `Item` is a Result as errors may happen
/// mid-stream. `itertools::IterTools::map_results` and
/// `MapAndThenIterator::map_and_then` can aid in removing boilerplate around
/// handling of said Results
impl Iterator for StreamedResultSet {
type Item = Result<Vec<Value>>;
fn next(&mut self) -> Option<Self::Item> {
while self.rows.is_empty() {
match self.consume_next() {
Ok(true) => (),
Ok(false) => return None,
// Note: Iteration may continue after an error. We may want to
// stop afterwards instead for safety sake (it's not really
// recoverable)
Err(e) => return Some(Err(e)),
}
}
Ok(self.rows.pop_front()).transpose()
}
}
impl fmt::Debug for StreamedResultSet {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("StreamedResultSet")
.field("metadata", &self.metadata)
.field("stats", &self.stats)
.field("rows", &self.rows)
.field("current_row", &self.current_row)
.field("pending_chunk", &self.pending_chunk)
.finish()
}
}
pub struct StreamedResultSetAsync {
/// Stream from execute_streaming_sql
stream: Option<StreamFuture<Compat01As03<ClientSStreamReceiver<PartialResultSet>>>>,