mirror of
https://github.com/mozilla-services/syncstorage-rs.git
synced 2026-05-04 19:56:11 +02:00
bug: write bsos contained within a commit after the batch has been commited. (#980)
* bug: fold commit message bsos into pending batch The client may sometimes include bsos in the batch commit message. The problem is that due to the way that data is written to spanner, mutations do not retain the ability to see data previously written in the same transaction. This causes collisions. To solve this, treat the bsos included in the commit as another batch update, then commit all the data. This does run the risk of bumping up against the mutation limit, but it ensures the best chance of data consistency. Writing the commit bsos prior to batch commit will result in the "newer" records being overwritten by "older" ones in the batch. Writing the commit bsos after the batch commit runs the same mutation index conflict problem. Closes #882 Co-authored-by: Philip Jenvey <pjenvey@underboss.org>
This commit is contained in:
parent
8c280ccda0
commit
ba6e5f4b94
32
Cargo.lock
generated
32
Cargo.lock
generated
@ -85,7 +85,7 @@ dependencies = [
|
||||
"log",
|
||||
"mime",
|
||||
"percent-encoding 2.1.0",
|
||||
"pin-project 1.0.4",
|
||||
"pin-project 1.0.5",
|
||||
"rand 0.7.3",
|
||||
"regex",
|
||||
"serde 1.0.123",
|
||||
@ -253,7 +253,7 @@ dependencies = [
|
||||
"fxhash",
|
||||
"log",
|
||||
"mime",
|
||||
"pin-project 1.0.4",
|
||||
"pin-project 1.0.5",
|
||||
"regex",
|
||||
"serde 1.0.123",
|
||||
"serde_json",
|
||||
@ -748,9 +748,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "ctor"
|
||||
version = "0.1.18"
|
||||
version = "0.1.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "10bcb9d7dcbf7002aaffbb53eac22906b64cdcc127971dcc387d8eb7c95d5560"
|
||||
checksum = "e8f45d9ad417bcef4817d614a501ab55cdd96a6fdb24f49aab89a54acfd66b19"
|
||||
dependencies = [
|
||||
"quote",
|
||||
"syn",
|
||||
@ -773,9 +773,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "curl-sys"
|
||||
version = "0.4.39+curl-7.74.0"
|
||||
version = "0.4.40+curl-7.75.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07a8ce861e7b68a0b394e814d7ee9f1b2750ff8bd10372c6ad3bacc10e86f874"
|
||||
checksum = "2ffafc1c35958318bd7fdd0582995ce4c72f4f461a8e70499ccee83a619fd562"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
@ -1393,7 +1393,7 @@ dependencies = [
|
||||
"httparse",
|
||||
"httpdate",
|
||||
"itoa",
|
||||
"pin-project 1.0.4",
|
||||
"pin-project 1.0.5",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
@ -1599,9 +1599,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.84"
|
||||
version = "0.2.85"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1cca32fa0182e8c0989459524dc356b8f2b5c10f1b9eb521b7d182c03cf8c5ff"
|
||||
checksum = "7ccac4b00700875e6a07c6cde370d44d32fa01c5a65cdd2fca6858c479d28bb3"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
@ -2012,11 +2012,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.0.4"
|
||||
version = "1.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95b70b68509f17aa2857863b6fa00bf21fc93674c7a8893de2f469f6aa7ca2f2"
|
||||
checksum = "96fa8ebb90271c4477f144354485b8068bd8f6b78b428b01ba892ca26caf0b63"
|
||||
dependencies = [
|
||||
"pin-project-internal 1.0.4",
|
||||
"pin-project-internal 1.0.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -2032,9 +2032,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-internal"
|
||||
version = "1.0.4"
|
||||
version = "1.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "caa25a6393f22ce819b0f50e0be89287292fda8d425be38ee0ca14c4931d9e71"
|
||||
checksum = "758669ae3558c6f74bd2a18b41f7ac0b5a195aea6639d6a9b5e5d1ad5ba24c0b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@ -2378,9 +2378,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.16.19"
|
||||
version = "0.16.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "024a1e66fea74c66c66624ee5622a7ff0e4b73a13b4f5c326ddb50c708944226"
|
||||
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
|
||||
@ -49,7 +49,7 @@ hostname = "0.3.1"
|
||||
hkdf = "0.10"
|
||||
hmac = "0.10"
|
||||
jsonwebtoken = "7.2.0"
|
||||
log = { version = "0.4", features = ["max_level_info", "release_max_level_info"] }
|
||||
log = { version = "0.4", features = ["max_level_debug", "release_max_level_info"] }
|
||||
mime = "0.3"
|
||||
num_cpus = "1"
|
||||
# must match what's used by googleapis-raw
|
||||
|
||||
@ -320,7 +320,6 @@ pub async fn post_collection_batch(
|
||||
.await?
|
||||
};
|
||||
|
||||
let commit = breq.commit;
|
||||
let user_id = coll.user_id.clone();
|
||||
let collection = coll.collection.clone();
|
||||
|
||||
@ -328,69 +327,56 @@ pub async fn post_collection_batch(
|
||||
let mut failed = coll.bsos.invalid;
|
||||
let bso_ids: Vec<_> = coll.bsos.valid.iter().map(|bso| bso.id.clone()).collect();
|
||||
|
||||
let result = if commit && !coll.bsos.valid.is_empty() {
|
||||
// There's pending items to append to the batch but since we're
|
||||
// committing, write them to bsos immediately. Otherwise under
|
||||
// Spanner we would pay twice the mutations for those pending
|
||||
// items (once writing them to to batch_bsos, then again
|
||||
// writing them to bsos)
|
||||
trace!("Batch: Committing {}", &new_batch.id);
|
||||
db.post_bsos(params::PostBsos {
|
||||
user_id: coll.user_id.clone(),
|
||||
collection: coll.collection.clone(),
|
||||
// XXX: why does BatchBsoBody exist (it's the same struct
|
||||
// as PostCollectionBso)?
|
||||
bsos: coll
|
||||
.bsos
|
||||
.valid
|
||||
.into_iter()
|
||||
.map(|batch_bso| params::PostCollectionBso {
|
||||
id: batch_bso.id,
|
||||
sortindex: batch_bso.sortindex,
|
||||
payload: batch_bso.payload,
|
||||
ttl: batch_bso.ttl,
|
||||
})
|
||||
.collect(),
|
||||
for_batch: true,
|
||||
failed: Default::default(),
|
||||
})
|
||||
.await
|
||||
.map(|_| ())
|
||||
} else {
|
||||
// We're not yet to commit the accumulated batch, but there are some
|
||||
// additional records we need to add.
|
||||
trace!("Batch: Appending to {}", &new_batch.id);
|
||||
db.append_to_batch(params::AppendToBatch {
|
||||
user_id: coll.user_id.clone(),
|
||||
collection: coll.collection.clone(),
|
||||
batch: new_batch.clone(),
|
||||
bsos: coll.bsos.valid.into_iter().map(From::from).collect(),
|
||||
})
|
||||
.await
|
||||
};
|
||||
let mut resp: Value = json!({});
|
||||
|
||||
// collect up the successful and failed bso_ids into a response.
|
||||
match result {
|
||||
Ok(_) => success.extend(bso_ids),
|
||||
Err(e) if e.is_conflict() => return Err(e.into()),
|
||||
Err(apperr) => {
|
||||
if let ApiErrorKind::Db(dberr) = apperr.kind() {
|
||||
// If we're over quota, return immediately with a 403 to let the client know.
|
||||
// Otherwise the client will simply keep retrying records.
|
||||
if let DbErrorKind::Quota = dberr.kind() {
|
||||
return Err(apperr.into());
|
||||
macro_rules! handle_result {
|
||||
// collect up the successful and failed bso_ids into a response.
|
||||
( $r: expr) => {
|
||||
match $r {
|
||||
Ok(_) => success.extend(bso_ids.clone()),
|
||||
Err(e) if e.is_conflict() => return Err(e.into()),
|
||||
Err(apperr) => {
|
||||
if let ApiErrorKind::Db(dberr) = apperr.kind() {
|
||||
// If we're over quota, return immediately with a 403 to let the client know.
|
||||
// Otherwise the client will simply keep retrying records.
|
||||
if let DbErrorKind::Quota = dberr.kind() {
|
||||
return Err(apperr.into());
|
||||
}
|
||||
};
|
||||
failed.extend(
|
||||
bso_ids
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|id| (id, "db error".to_owned())),
|
||||
)
|
||||
}
|
||||
};
|
||||
failed.extend(bso_ids.into_iter().map(|id| (id, "db error".to_owned())))
|
||||
}
|
||||
};
|
||||
|
||||
let mut resp = json!({
|
||||
"success": success,
|
||||
"failed": failed,
|
||||
});
|
||||
}
|
||||
|
||||
// If we're not committing the current set of records yet.
|
||||
if !breq.commit {
|
||||
// and there are bsos included in this message.
|
||||
if !coll.bsos.valid.is_empty() {
|
||||
// Append the data to the requested batch.
|
||||
let result = {
|
||||
dbg!("Batch: Appending to {}", &new_batch.id);
|
||||
db.append_to_batch(params::AppendToBatch {
|
||||
user_id: coll.user_id.clone(),
|
||||
collection: coll.collection.clone(),
|
||||
batch: new_batch.clone(),
|
||||
bsos: coll.bsos.valid.into_iter().map(From::from).collect(),
|
||||
})
|
||||
.await
|
||||
};
|
||||
handle_result!(result);
|
||||
}
|
||||
|
||||
// Return the batch append response without committing the current
|
||||
// batch to the BSO table.
|
||||
resp["success"] = json!(success);
|
||||
resp["failed"] = json!(failed);
|
||||
|
||||
resp["batch"] = json!(&new_batch.id);
|
||||
return Ok(HttpResponse::Accepted().json(resp));
|
||||
}
|
||||
@ -406,6 +392,8 @@ pub async fn post_collection_batch(
|
||||
|
||||
// TODO: validate *actual* sizes of the batch items
|
||||
// (max_total_records, max_total_bytes)
|
||||
//
|
||||
// First, write the pending batch BSO data into the BSO table.
|
||||
let modified = if let Some(batch) = batch {
|
||||
db.commit_batch(params::CommitBatch {
|
||||
user_id: user_id.clone(),
|
||||
@ -418,6 +406,41 @@ pub async fn post_collection_batch(
|
||||
return Err(ApiError::from(err).into());
|
||||
};
|
||||
|
||||
// Then, write the BSOs contained in the commit request into the BSO table.
|
||||
// (This presumes that the BSOs contained in the final "commit" message are
|
||||
// newer, and thus more "correct", than any prior BSO info that may have been
|
||||
// included in the prior batch creation messages. The client shouldn't really
|
||||
// be including BSOs with the commit message, however it does and we should
|
||||
// handle that case.)
|
||||
if !coll.bsos.valid.is_empty() {
|
||||
trace!("Batch: writing commit message bsos");
|
||||
let result = db
|
||||
.post_bsos(params::PostBsos {
|
||||
user_id: coll.user_id.clone(),
|
||||
collection: coll.collection.clone(),
|
||||
bsos: coll
|
||||
.bsos
|
||||
.valid
|
||||
.into_iter()
|
||||
.map(|batch_bso| params::PostCollectionBso {
|
||||
id: batch_bso.id,
|
||||
sortindex: batch_bso.sortindex,
|
||||
payload: batch_bso.payload,
|
||||
ttl: batch_bso.ttl,
|
||||
})
|
||||
.collect(),
|
||||
for_batch: false,
|
||||
failed: Default::default(),
|
||||
})
|
||||
.await
|
||||
.map(|_| ());
|
||||
|
||||
handle_result!(result);
|
||||
}
|
||||
|
||||
// Always return success, failed, & modified
|
||||
resp["success"] = json!(success);
|
||||
resp["failed"] = json!(failed);
|
||||
resp["modified"] = json!(modified);
|
||||
trace!("Batch: Returning result: {}", &resp);
|
||||
Ok(HttpResponse::build(StatusCode::OK)
|
||||
|
||||
@ -1582,6 +1582,35 @@ class TestStorage(StorageFunctionalTestCase):
|
||||
resp3 = self.app.get(endpoint + '/e')
|
||||
self.assertEquals(committed, resp3.json['modified'])
|
||||
|
||||
|
||||
def test_aaa_batch_commit_collision(self):
|
||||
# It's possible that a batch contain a BSO inside a batch as well
|
||||
# as inside the final "commit" message. This is a bit of a problem
|
||||
# for spanner because of conflicting ways that the data is written
|
||||
# to the database and the discoverability of IDs in previously
|
||||
# submitted batches.
|
||||
endpoint = self.root + '/storage/xxx_col2'
|
||||
orig = "Letting the days go by"
|
||||
repl = "Same as it ever was"
|
||||
|
||||
batch_num = self.retry_post_json(
|
||||
endpoint + "?batch=true",
|
||||
[{"id":"b0", "payload": orig}]
|
||||
).json["batch"]
|
||||
|
||||
resp = self.retry_post_json(
|
||||
endpoint + "?batch={}&commit=true".format(batch_num),
|
||||
[{"id":"b0", "payload": repl}]
|
||||
)
|
||||
|
||||
# this should succeed, using the newerer payload value.
|
||||
assert resp.json["failed"] == {}, "batch commit failed"
|
||||
assert resp.json["success"] == ["b0"], "batch commit id incorrect"
|
||||
resp = self.app.get(endpoint+"?full=1")
|
||||
assert resp.json[0].get(
|
||||
"payload") == repl, "wrong payload returned"
|
||||
|
||||
|
||||
def test_we_dont_need_no_stinkin_batches(self):
|
||||
endpoint = self.root + '/storage/xxx_col2'
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user