diff --git a/spanner-2019-10-01.ddl b/spanner-2019-10-01.ddl index 706fac5f..2f955795 100644 --- a/spanner-2019-10-01.ddl +++ b/spanner-2019-10-01.ddl @@ -37,7 +37,8 @@ CREATE TABLE bsos ( INTERLEAVE IN user_collections; CREATE INDEX BsoExpiry - ON bsos(expiry); + ON bsos(expiry), +INTERLEAVE IN user_collections; CREATE TABLE collections ( collection_id INT64 NOT NULL, diff --git a/src/db/spanner/batch.rs b/src/db/spanner/batch.rs index 4484a081..ebe71d2d 100644 --- a/src/db/spanner/batch.rs +++ b/src/db/spanner/batch.rs @@ -1,9 +1,13 @@ -use std::str::FromStr; +use std::{collections::HashMap, str::FromStr}; -use googleapis_raw::spanner::v1::type_pb::TypeCode; +use googleapis_raw::spanner::v1::type_pb::{StructType, Type, TypeCode}; +use protobuf::{ + well_known_types::{ListValue, Value}, + RepeatedField, +}; use uuid::Uuid; -use super::support::null_value; +use super::support::{null_value, struct_type_field}; use super::{ models::{Result, SpannerDb, DEFAULT_BSO_TTL, PRETOUCH_TS}, support::as_value, @@ -213,41 +217,83 @@ pub fn do_append( batch_id: String, bsos: Vec, ) -> Result<()> { - for bso in bsos { - let mut sqlparams = params! { - "fxa_uid" => user_id.fxa_uid.clone(), - "fxa_kid" => user_id.fxa_kid.clone(), - "collection_id" => collection_id.to_string(), - "batch_id" => batch_id.clone(), - "batch_bso_id" => bso.id, - }; - - sqlparams.insert( - "sortindex".to_string(), - bso.sortindex + // Pass an array of struct objects as @values (for UNNEST), e.g.: + // [("", "", 101, "ba1", "bso1", NULL, "payload1", NULL), + // ("", "", 101, "ba1", "bso2", NULL, "payload2", NULL)] + // https://cloud.google.com/spanner/docs/structs#creating_struct_objects + let rows: Vec<_> = bsos + .into_iter() + .map(|bso| { + let sortindex = bso + .sortindex .map(|sortindex| as_value(sortindex.to_string())) - .unwrap_or_else(null_value), - ); - sqlparams.insert( - "payload".to_string(), - bso.payload.map(as_value).unwrap_or_else(null_value), - ); - sqlparams.insert( - "ttl".to_string(), - bso.ttl + .unwrap_or_else(null_value); + let payload = bso.payload.map(as_value).unwrap_or_else(null_value); + let ttl = bso + .ttl .map(|ttl| as_value(ttl.to_string())) - .unwrap_or_else(null_value), - ); + .unwrap_or_else(null_value); + + let mut row = ListValue::new(); + row.set_values(RepeatedField::from_vec(vec![ + as_value(user_id.fxa_uid.clone()), + as_value(user_id.fxa_kid.clone()), + as_value(collection_id.to_string()), + as_value(batch_id.clone()), + as_value(bso.id), + sortindex, + payload, + ttl, + ])); + let mut value = Value::new(); + value.set_list_value(row); + value + }) + .collect(); + + let mut list_values = ListValue::new(); + list_values.set_values(RepeatedField::from_vec(rows)); + let mut values = Value::new(); + values.set_list_value(list_values); + + // values' type is an ARRAY of STRUCTs + let mut param_type = Type::new(); + param_type.set_code(TypeCode::ARRAY); + let mut array_type = Type::new(); + array_type.set_code(TypeCode::STRUCT); + + // STRUCT requires definition of all its field types + let mut struct_type = StructType::new(); + let fields = vec![ + ("fxa_uid", TypeCode::STRING), + ("fxa_kid", TypeCode::STRING), + ("collection_id", TypeCode::INT64), + ("batch_id", TypeCode::STRING), + ("batch_bso_id", TypeCode::STRING), + ("sortindex", TypeCode::INT64), + ("payload", TypeCode::STRING), + ("ttl", TypeCode::INT64), + ] + .into_iter() + .map(|(name, field_type)| struct_type_field(name, field_type)) + .collect(); + struct_type.set_fields(RepeatedField::from_vec(fields)); + array_type.set_struct_type(struct_type); + param_type.set_array_element_type(array_type); + + let mut sqlparams = HashMap::new(); + sqlparams.insert("values".to_owned(), values); + let mut sqlparam_types = HashMap::new(); + sqlparam_types.insert("values".to_owned(), param_type); + db.sql( + "INSERT INTO batch_bsos (fxa_uid, fxa_kid, collection_id, batch_id, batch_bso_id, + sortindex, payload, ttl) + SELECT * FROM UNNEST(@values)", + )? + .params(sqlparams) + .param_types(sqlparam_types) + .execute(&db.conn)?; - db.sql( - "INSERT INTO batch_bsos (fxa_uid, fxa_kid, collection_id, batch_id, batch_bso_id, - sortindex, payload, ttl) - VALUES (@fxa_uid, @fxa_kid, @collection_id, @batch_id, @batch_bso_id, - @sortindex, @payload, @ttl)", - )? - .params(sqlparams) - .execute(&db.conn)?; - } Ok(()) } diff --git a/src/db/spanner/models.rs b/src/db/spanner/models.rs index 69b23654..8a695b51 100644 --- a/src/db/spanner/models.rs +++ b/src/db/spanner/models.rs @@ -876,7 +876,7 @@ impl SpannerDb { "fxa_uid" => params.user_id.fxa_uid, "fxa_kid" => params.user_id.fxa_kid, "collection_id" => collection_id.to_string(), - "bso_id" => params.id.to_string(), + "bso_id" => params.id, }) .execute(&self.conn)?; if result.affected_rows()? == 0 { @@ -978,7 +978,7 @@ impl SpannerDb { pub fn get_bsos_sync(&self, params: params::GetBsos) -> Result { let query = "\ - SELECT bso_id, modified, payload, sortindex, expiry + SELECT bso_id, sortindex, payload, modified, expiry FROM bsos WHERE fxa_uid = @fxa_uid AND fxa_kid = @fxa_kid @@ -1053,29 +1053,25 @@ impl SpannerDb { pub fn get_bso_sync(&self, params: params::GetBso) -> Result> { let collection_id = self.get_collection_id(¶ms.collection)?; - let result = self - .sql( - "SELECT bso_id, modified, payload, sortindex, expiry + self.sql( + "SELECT bso_id, sortindex, payload, modified, expiry FROM bsos WHERE fxa_uid = @fxa_uid AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND bso_id = @bso_id AND expiry > CURRENT_TIMESTAMP()", - )? - .params(params! { - "fxa_uid" => params.user_id.fxa_uid, - "fxa_kid" => params.user_id.fxa_kid, - "collection_id" => collection_id.to_string(), - "bso_id" => params.id.to_string(), - }) - .execute(&self.conn)? - .one_or_none()?; - Ok(if let Some(row) = result { - Some(bso_from_row(row)?) - } else { - None + )? + .params(params! { + "fxa_uid" => params.user_id.fxa_uid, + "fxa_kid" => params.user_id.fxa_kid, + "collection_id" => collection_id.to_string(), + "bso_id" => params.id, }) + .execute(&self.conn)? + .one_or_none()? + .map(bso_from_row) + .transpose() } pub fn get_bso_timestamp_sync(&self, params: params::GetBsoTimestamp) -> Result { diff --git a/src/db/spanner/support.rs b/src/db/spanner/support.rs index ad61e9fa..945bf46d 100644 --- a/src/db/spanner/support.rs +++ b/src/db/spanner/support.rs @@ -1,5 +1,11 @@ use std::collections::HashMap; +use googleapis_raw::spanner::v1::{ + result_set::{ResultSet, ResultSetMetadata, ResultSetStats}, + spanner::ExecuteSqlRequest, + type_pb::{StructType_Field, Type, TypeCode}, +}; + use protobuf::{ well_known_types::{ListValue, NullValue, Struct, Value}, RepeatedField, @@ -14,21 +20,7 @@ use crate::{ web::extractors::HawkIdentifier, }; -use googleapis_raw::spanner::v1::type_pb::{Type, TypeCode}; - -type ParamValue = protobuf::well_known_types::Value; - -type ParamType = googleapis_raw::spanner::v1::type_pb::Type; - -pub type ExecuteSqlRequest = googleapis_raw::spanner::v1::spanner::ExecuteSqlRequest; - -type ResultSet = googleapis_raw::spanner::v1::result_set::ResultSet; - -type ResultSetMetadata = googleapis_raw::spanner::v1::result_set::ResultSetMetadata; - -type ResultSetStats = googleapis_raw::spanner::v1::result_set::ResultSetStats; - -pub fn as_value(string_value: String) -> protobuf::well_known_types::Value { +pub fn as_value(string_value: String) -> Value { let mut value = Value::new(); value.set_string_value(string_value); value @@ -39,6 +31,14 @@ pub fn as_type(v: TypeCode) -> Type { t.set_code(v); t } + +pub fn struct_type_field(name: &str, field_type: TypeCode) -> StructType_Field { + let mut field = StructType_Field::new(); + field.set_name(name.to_owned()); + field.set_field_type(as_type(field_type)); + field +} + pub fn as_list_value( string_values: impl Iterator, ) -> protobuf::well_known_types::Value { @@ -51,7 +51,7 @@ pub fn as_list_value( value } -pub fn null_value() -> protobuf::well_known_types::Value { +pub fn null_value() -> Value { let mut value = Value::new(); value.set_null_value(NullValue::NULL_VALUE); value @@ -60,8 +60,8 @@ pub fn null_value() -> protobuf::well_known_types::Value { #[derive(Default)] pub struct ExecuteSqlRequestBuilder { execute_sql: ExecuteSqlRequest, - params: Option>, - param_types: Option>, + params: Option>, + param_types: Option>, } impl ExecuteSqlRequestBuilder { @@ -72,12 +72,12 @@ impl ExecuteSqlRequestBuilder { } } - pub fn params(mut self, params: HashMap) -> Self { + pub fn params(mut self, params: HashMap) -> Self { self.params = Some(params); self } - pub fn param_types(mut self, param_types: HashMap) -> Self { + pub fn param_types(mut self, param_types: HashMap) -> Self { self.param_types = Some(param_types); self } @@ -163,18 +163,18 @@ impl Iterator for SyncResultSet { pub fn bso_from_row(row: Vec) -> Result { Ok(results::GetBso { id: row[0].get_string_value().to_owned(), - modified: SyncTimestamp::from_rfc3339(&row[1].get_string_value())?, - payload: row[2].get_string_value().to_owned(), - sortindex: if row[3].has_null_value() { + sortindex: if row[1].has_null_value() { None } else { Some( - row[3] + row[1] .get_string_value() .parse::() .map_err(|e| DbErrorKind::Integrity(e.to_string()))?, ) }, + payload: row[2].get_string_value().to_owned(), + modified: SyncTimestamp::from_rfc3339(&row[3].get_string_value())?, expiry: SyncTimestamp::from_rfc3339(&row[4].get_string_value())?.as_i64(), }) } diff --git a/src/web/handlers.rs b/src/web/handlers.rs index ef622c23..181fad28 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -242,6 +242,7 @@ pub fn post_collection_batch( })) }; + let commit = breq.commit; let db = coll.db.clone(); let user_id = coll.user_id.clone(); let collection = coll.collection.clone(); @@ -252,23 +253,56 @@ pub fn post_collection_batch( let mut failed = coll.bsos.invalid.clone(); let bso_ids: Vec<_> = coll.bsos.valid.iter().map(|bso| bso.id.clone()).collect(); - coll.db - .append_to_batch(params::AppendToBatch { + if commit && !coll.bsos.valid.is_empty() { + // There's pending items to append to the batch but since we're + // committing, write them to bsos immediately. Otherwise under + // Spanner we would pay twice the mutations for those pending + // items (once writing them to to batch_bsos, then again + // writing them to bsos) + + // NOTE: Unfortunately this means we make two calls to + // touch_collection (in post_bsos and then commit_batch). The + // second touch is redundant, writing the same timestamp + Either::A( + coll.db + .post_bsos(params::PostBsos { + user_id: coll.user_id.clone(), + collection: coll.collection.clone(), + // XXX: why does BatchBsoBody exist (it's the same struct + // as PostCollectionBso)? + bsos: coll + .bsos + .valid + .into_iter() + .map(|batch_bso| params::PostCollectionBso { + id: batch_bso.id, + sortindex: batch_bso.sortindex, + payload: batch_bso.payload, + ttl: batch_bso.ttl, + }) + .collect(), + failed: Default::default(), + }) + .and_then(|_| future::ok(())), + ) + } else { + Either::B(coll.db.append_to_batch(params::AppendToBatch { user_id: coll.user_id.clone(), collection: coll.collection.clone(), id: id.clone(), bsos: coll.bsos.valid.into_iter().map(From::from).collect(), - }) - .then(move |result| { - match result { - Ok(_) => success.extend(bso_ids), - Err(e) if e.is_conflict() => return future::err(e), - Err(_) => { - failed.extend(bso_ids.into_iter().map(|id| (id, "db error".to_owned()))) - } - }; - future::ok((id, success, failed)) - }) + })) + } + .then(move |result| { + match result { + Ok(_) => success.extend(bso_ids), + Err(e) if e.is_conflict() => return future::err(e), + Err(_) => { + failed.extend(bso_ids.into_iter().map(|id| (id, "db error".to_owned()))) + } + }; + future::ok((id, success, failed)) + }) }) .map_err(From::from) .and_then(move |(id, success, failed)| {