Merge branch 'master' of github.com:mozilla-services/syncstorage-rs into bug/342

This commit is contained in:
jrconlin 2019-12-03 15:07:06 -08:00
commit 87df17c2b4
No known key found for this signature in database
GPG Key ID: D2B33CD056ABD330
5 changed files with 168 additions and 91 deletions

View File

@ -37,7 +37,8 @@ CREATE TABLE bsos (
INTERLEAVE IN user_collections;
CREATE INDEX BsoExpiry
ON bsos(expiry);
ON bsos(expiry),
INTERLEAVE IN user_collections;
CREATE TABLE collections (
collection_id INT64 NOT NULL,

View File

@ -1,9 +1,13 @@
use std::str::FromStr;
use std::{collections::HashMap, str::FromStr};
use googleapis_raw::spanner::v1::type_pb::TypeCode;
use googleapis_raw::spanner::v1::type_pb::{StructType, Type, TypeCode};
use protobuf::{
well_known_types::{ListValue, Value},
RepeatedField,
};
use uuid::Uuid;
use super::support::null_value;
use super::support::{null_value, struct_type_field};
use super::{
models::{Result, SpannerDb, DEFAULT_BSO_TTL, PRETOUCH_TS},
support::as_value,
@ -213,41 +217,83 @@ pub fn do_append(
batch_id: String,
bsos: Vec<params::PostCollectionBso>,
) -> Result<()> {
for bso in bsos {
let mut sqlparams = params! {
"fxa_uid" => user_id.fxa_uid.clone(),
"fxa_kid" => user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"batch_id" => batch_id.clone(),
"batch_bso_id" => bso.id,
};
sqlparams.insert(
"sortindex".to_string(),
bso.sortindex
// Pass an array of struct objects as @values (for UNNEST), e.g.:
// [("<fxa_uid>", "<fxa_kid>", 101, "ba1", "bso1", NULL, "payload1", NULL),
// ("<fxa_uid>", "<fxa_kid>", 101, "ba1", "bso2", NULL, "payload2", NULL)]
// https://cloud.google.com/spanner/docs/structs#creating_struct_objects
let rows: Vec<_> = bsos
.into_iter()
.map(|bso| {
let sortindex = bso
.sortindex
.map(|sortindex| as_value(sortindex.to_string()))
.unwrap_or_else(null_value),
);
sqlparams.insert(
"payload".to_string(),
bso.payload.map(as_value).unwrap_or_else(null_value),
);
sqlparams.insert(
"ttl".to_string(),
bso.ttl
.unwrap_or_else(null_value);
let payload = bso.payload.map(as_value).unwrap_or_else(null_value);
let ttl = bso
.ttl
.map(|ttl| as_value(ttl.to_string()))
.unwrap_or_else(null_value),
);
.unwrap_or_else(null_value);
let mut row = ListValue::new();
row.set_values(RepeatedField::from_vec(vec![
as_value(user_id.fxa_uid.clone()),
as_value(user_id.fxa_kid.clone()),
as_value(collection_id.to_string()),
as_value(batch_id.clone()),
as_value(bso.id),
sortindex,
payload,
ttl,
]));
let mut value = Value::new();
value.set_list_value(row);
value
})
.collect();
let mut list_values = ListValue::new();
list_values.set_values(RepeatedField::from_vec(rows));
let mut values = Value::new();
values.set_list_value(list_values);
// values' type is an ARRAY of STRUCTs
let mut param_type = Type::new();
param_type.set_code(TypeCode::ARRAY);
let mut array_type = Type::new();
array_type.set_code(TypeCode::STRUCT);
// STRUCT requires definition of all its field types
let mut struct_type = StructType::new();
let fields = vec![
("fxa_uid", TypeCode::STRING),
("fxa_kid", TypeCode::STRING),
("collection_id", TypeCode::INT64),
("batch_id", TypeCode::STRING),
("batch_bso_id", TypeCode::STRING),
("sortindex", TypeCode::INT64),
("payload", TypeCode::STRING),
("ttl", TypeCode::INT64),
]
.into_iter()
.map(|(name, field_type)| struct_type_field(name, field_type))
.collect();
struct_type.set_fields(RepeatedField::from_vec(fields));
array_type.set_struct_type(struct_type);
param_type.set_array_element_type(array_type);
let mut sqlparams = HashMap::new();
sqlparams.insert("values".to_owned(), values);
let mut sqlparam_types = HashMap::new();
sqlparam_types.insert("values".to_owned(), param_type);
db.sql(
"INSERT INTO batch_bsos (fxa_uid, fxa_kid, collection_id, batch_id, batch_bso_id,
sortindex, payload, ttl)
SELECT * FROM UNNEST(@values)",
)?
.params(sqlparams)
.param_types(sqlparam_types)
.execute(&db.conn)?;
db.sql(
"INSERT INTO batch_bsos (fxa_uid, fxa_kid, collection_id, batch_id, batch_bso_id,
sortindex, payload, ttl)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @batch_id, @batch_bso_id,
@sortindex, @payload, @ttl)",
)?
.params(sqlparams)
.execute(&db.conn)?;
}
Ok(())
}

View File

@ -876,7 +876,7 @@ impl SpannerDb {
"fxa_uid" => params.user_id.fxa_uid,
"fxa_kid" => params.user_id.fxa_kid,
"collection_id" => collection_id.to_string(),
"bso_id" => params.id.to_string(),
"bso_id" => params.id,
})
.execute(&self.conn)?;
if result.affected_rows()? == 0 {
@ -978,7 +978,7 @@ impl SpannerDb {
pub fn get_bsos_sync(&self, params: params::GetBsos) -> Result<results::GetBsos> {
let query = "\
SELECT bso_id, modified, payload, sortindex, expiry
SELECT bso_id, sortindex, payload, modified, expiry
FROM bsos
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
@ -1053,29 +1053,25 @@ impl SpannerDb {
pub fn get_bso_sync(&self, params: params::GetBso) -> Result<Option<results::GetBso>> {
let collection_id = self.get_collection_id(&params.collection)?;
let result = self
.sql(
"SELECT bso_id, modified, payload, sortindex, expiry
self.sql(
"SELECT bso_id, sortindex, payload, modified, expiry
FROM bsos
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id
AND bso_id = @bso_id
AND expiry > CURRENT_TIMESTAMP()",
)?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid,
"fxa_kid" => params.user_id.fxa_kid,
"collection_id" => collection_id.to_string(),
"bso_id" => params.id.to_string(),
})
.execute(&self.conn)?
.one_or_none()?;
Ok(if let Some(row) = result {
Some(bso_from_row(row)?)
} else {
None
)?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid,
"fxa_kid" => params.user_id.fxa_kid,
"collection_id" => collection_id.to_string(),
"bso_id" => params.id,
})
.execute(&self.conn)?
.one_or_none()?
.map(bso_from_row)
.transpose()
}
pub fn get_bso_timestamp_sync(&self, params: params::GetBsoTimestamp) -> Result<SyncTimestamp> {

View File

@ -1,5 +1,11 @@
use std::collections::HashMap;
use googleapis_raw::spanner::v1::{
result_set::{ResultSet, ResultSetMetadata, ResultSetStats},
spanner::ExecuteSqlRequest,
type_pb::{StructType_Field, Type, TypeCode},
};
use protobuf::{
well_known_types::{ListValue, NullValue, Struct, Value},
RepeatedField,
@ -14,21 +20,7 @@ use crate::{
web::extractors::HawkIdentifier,
};
use googleapis_raw::spanner::v1::type_pb::{Type, TypeCode};
type ParamValue = protobuf::well_known_types::Value;
type ParamType = googleapis_raw::spanner::v1::type_pb::Type;
pub type ExecuteSqlRequest = googleapis_raw::spanner::v1::spanner::ExecuteSqlRequest;
type ResultSet = googleapis_raw::spanner::v1::result_set::ResultSet;
type ResultSetMetadata = googleapis_raw::spanner::v1::result_set::ResultSetMetadata;
type ResultSetStats = googleapis_raw::spanner::v1::result_set::ResultSetStats;
pub fn as_value(string_value: String) -> protobuf::well_known_types::Value {
pub fn as_value(string_value: String) -> Value {
let mut value = Value::new();
value.set_string_value(string_value);
value
@ -39,6 +31,14 @@ pub fn as_type(v: TypeCode) -> Type {
t.set_code(v);
t
}
pub fn struct_type_field(name: &str, field_type: TypeCode) -> StructType_Field {
let mut field = StructType_Field::new();
field.set_name(name.to_owned());
field.set_field_type(as_type(field_type));
field
}
pub fn as_list_value(
string_values: impl Iterator<Item = String>,
) -> protobuf::well_known_types::Value {
@ -51,7 +51,7 @@ pub fn as_list_value(
value
}
pub fn null_value() -> protobuf::well_known_types::Value {
pub fn null_value() -> Value {
let mut value = Value::new();
value.set_null_value(NullValue::NULL_VALUE);
value
@ -60,8 +60,8 @@ pub fn null_value() -> protobuf::well_known_types::Value {
#[derive(Default)]
pub struct ExecuteSqlRequestBuilder {
execute_sql: ExecuteSqlRequest,
params: Option<HashMap<String, ParamValue>>,
param_types: Option<HashMap<String, ParamType>>,
params: Option<HashMap<String, Value>>,
param_types: Option<HashMap<String, Type>>,
}
impl ExecuteSqlRequestBuilder {
@ -72,12 +72,12 @@ impl ExecuteSqlRequestBuilder {
}
}
pub fn params(mut self, params: HashMap<String, ParamValue>) -> Self {
pub fn params(mut self, params: HashMap<String, Value>) -> Self {
self.params = Some(params);
self
}
pub fn param_types(mut self, param_types: HashMap<String, ParamType>) -> Self {
pub fn param_types(mut self, param_types: HashMap<String, Type>) -> Self {
self.param_types = Some(param_types);
self
}
@ -163,18 +163,18 @@ impl Iterator for SyncResultSet {
pub fn bso_from_row(row: Vec<Value>) -> Result<results::GetBso> {
Ok(results::GetBso {
id: row[0].get_string_value().to_owned(),
modified: SyncTimestamp::from_rfc3339(&row[1].get_string_value())?,
payload: row[2].get_string_value().to_owned(),
sortindex: if row[3].has_null_value() {
sortindex: if row[1].has_null_value() {
None
} else {
Some(
row[3]
row[1]
.get_string_value()
.parse::<i32>()
.map_err(|e| DbErrorKind::Integrity(e.to_string()))?,
)
},
payload: row[2].get_string_value().to_owned(),
modified: SyncTimestamp::from_rfc3339(&row[3].get_string_value())?,
expiry: SyncTimestamp::from_rfc3339(&row[4].get_string_value())?.as_i64(),
})
}

View File

@ -242,6 +242,7 @@ pub fn post_collection_batch(
}))
};
let commit = breq.commit;
let db = coll.db.clone();
let user_id = coll.user_id.clone();
let collection = coll.collection.clone();
@ -252,23 +253,56 @@ pub fn post_collection_batch(
let mut failed = coll.bsos.invalid.clone();
let bso_ids: Vec<_> = coll.bsos.valid.iter().map(|bso| bso.id.clone()).collect();
coll.db
.append_to_batch(params::AppendToBatch {
if commit && !coll.bsos.valid.is_empty() {
// There's pending items to append to the batch but since we're
// committing, write them to bsos immediately. Otherwise under
// Spanner we would pay twice the mutations for those pending
// items (once writing them to to batch_bsos, then again
// writing them to bsos)
// NOTE: Unfortunately this means we make two calls to
// touch_collection (in post_bsos and then commit_batch). The
// second touch is redundant, writing the same timestamp
Either::A(
coll.db
.post_bsos(params::PostBsos {
user_id: coll.user_id.clone(),
collection: coll.collection.clone(),
// XXX: why does BatchBsoBody exist (it's the same struct
// as PostCollectionBso)?
bsos: coll
.bsos
.valid
.into_iter()
.map(|batch_bso| params::PostCollectionBso {
id: batch_bso.id,
sortindex: batch_bso.sortindex,
payload: batch_bso.payload,
ttl: batch_bso.ttl,
})
.collect(),
failed: Default::default(),
})
.and_then(|_| future::ok(())),
)
} else {
Either::B(coll.db.append_to_batch(params::AppendToBatch {
user_id: coll.user_id.clone(),
collection: coll.collection.clone(),
id: id.clone(),
bsos: coll.bsos.valid.into_iter().map(From::from).collect(),
})
.then(move |result| {
match result {
Ok(_) => success.extend(bso_ids),
Err(e) if e.is_conflict() => return future::err(e),
Err(_) => {
failed.extend(bso_ids.into_iter().map(|id| (id, "db error".to_owned())))
}
};
future::ok((id, success, failed))
})
}))
}
.then(move |result| {
match result {
Ok(_) => success.extend(bso_ids),
Err(e) if e.is_conflict() => return future::err(e),
Err(_) => {
failed.extend(bso_ids.into_iter().map(|id| (id, "db error".to_owned())))
}
};
future::ok((id, success, failed))
})
})
.map_err(From::from)
.and_then(move |(id, success, failed)| {