diff --git a/Cargo.lock b/Cargo.lock index 3a4b19d6..3e6f5320 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,7 +85,7 @@ dependencies = [ "log", "mime", "percent-encoding 2.1.0", - "pin-project 1.0.4", + "pin-project 1.0.5", "rand 0.7.3", "regex", "serde 1.0.123", @@ -253,7 +253,7 @@ dependencies = [ "fxhash", "log", "mime", - "pin-project 1.0.4", + "pin-project 1.0.5", "regex", "serde 1.0.123", "serde_json", @@ -748,9 +748,9 @@ dependencies = [ [[package]] name = "ctor" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10bcb9d7dcbf7002aaffbb53eac22906b64cdcc127971dcc387d8eb7c95d5560" +checksum = "e8f45d9ad417bcef4817d614a501ab55cdd96a6fdb24f49aab89a54acfd66b19" dependencies = [ "quote", "syn", @@ -773,9 +773,9 @@ dependencies = [ [[package]] name = "curl-sys" -version = "0.4.39+curl-7.74.0" +version = "0.4.40+curl-7.75.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07a8ce861e7b68a0b394e814d7ee9f1b2750ff8bd10372c6ad3bacc10e86f874" +checksum = "2ffafc1c35958318bd7fdd0582995ce4c72f4f461a8e70499ccee83a619fd562" dependencies = [ "cc", "libc", @@ -1393,7 +1393,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.4", + "pin-project 1.0.5", "socket2", "tokio", "tower-service", @@ -1599,9 +1599,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.84" +version = "0.2.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cca32fa0182e8c0989459524dc356b8f2b5c10f1b9eb521b7d182c03cf8c5ff" +checksum = "7ccac4b00700875e6a07c6cde370d44d32fa01c5a65cdd2fca6858c479d28bb3" [[package]] name = "libloading" @@ -2012,11 +2012,11 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95b70b68509f17aa2857863b6fa00bf21fc93674c7a8893de2f469f6aa7ca2f2" +checksum = "96fa8ebb90271c4477f144354485b8068bd8f6b78b428b01ba892ca26caf0b63" dependencies = [ - "pin-project-internal 1.0.4", + "pin-project-internal 1.0.5", ] [[package]] @@ -2032,9 +2032,9 @@ dependencies = [ [[package]] name = "pin-project-internal" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caa25a6393f22ce819b0f50e0be89287292fda8d425be38ee0ca14c4931d9e71" +checksum = "758669ae3558c6f74bd2a18b41f7ac0b5a195aea6639d6a9b5e5d1ad5ba24c0b" dependencies = [ "proc-macro2", "quote", @@ -2378,9 +2378,9 @@ dependencies = [ [[package]] name = "ring" -version = "0.16.19" +version = "0.16.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "024a1e66fea74c66c66624ee5622a7ff0e4b73a13b4f5c326ddb50c708944226" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" dependencies = [ "cc", "libc", diff --git a/Cargo.toml b/Cargo.toml index d154ffc7..35c70f6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ hostname = "0.3.1" hkdf = "0.10" hmac = "0.10" jsonwebtoken = "7.2.0" -log = { version = "0.4", features = ["max_level_info", "release_max_level_info"] } +log = { version = "0.4", features = ["max_level_debug", "release_max_level_info"] } mime = "0.3" num_cpus = "1" # must match what's used by googleapis-raw diff --git a/src/web/handlers.rs b/src/web/handlers.rs index 345da2ae..8c2a08fe 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -320,7 +320,6 @@ pub async fn post_collection_batch( .await? }; - let commit = breq.commit; let user_id = coll.user_id.clone(); let collection = coll.collection.clone(); @@ -328,69 +327,56 @@ pub async fn post_collection_batch( let mut failed = coll.bsos.invalid; let bso_ids: Vec<_> = coll.bsos.valid.iter().map(|bso| bso.id.clone()).collect(); - let result = if commit && !coll.bsos.valid.is_empty() { - // There's pending items to append to the batch but since we're - // committing, write them to bsos immediately. Otherwise under - // Spanner we would pay twice the mutations for those pending - // items (once writing them to to batch_bsos, then again - // writing them to bsos) - trace!("Batch: Committing {}", &new_batch.id); - db.post_bsos(params::PostBsos { - user_id: coll.user_id.clone(), - collection: coll.collection.clone(), - // XXX: why does BatchBsoBody exist (it's the same struct - // as PostCollectionBso)? - bsos: coll - .bsos - .valid - .into_iter() - .map(|batch_bso| params::PostCollectionBso { - id: batch_bso.id, - sortindex: batch_bso.sortindex, - payload: batch_bso.payload, - ttl: batch_bso.ttl, - }) - .collect(), - for_batch: true, - failed: Default::default(), - }) - .await - .map(|_| ()) - } else { - // We're not yet to commit the accumulated batch, but there are some - // additional records we need to add. - trace!("Batch: Appending to {}", &new_batch.id); - db.append_to_batch(params::AppendToBatch { - user_id: coll.user_id.clone(), - collection: coll.collection.clone(), - batch: new_batch.clone(), - bsos: coll.bsos.valid.into_iter().map(From::from).collect(), - }) - .await - }; + let mut resp: Value = json!({}); - // collect up the successful and failed bso_ids into a response. - match result { - Ok(_) => success.extend(bso_ids), - Err(e) if e.is_conflict() => return Err(e.into()), - Err(apperr) => { - if let ApiErrorKind::Db(dberr) = apperr.kind() { - // If we're over quota, return immediately with a 403 to let the client know. - // Otherwise the client will simply keep retrying records. - if let DbErrorKind::Quota = dberr.kind() { - return Err(apperr.into()); + macro_rules! handle_result { + // collect up the successful and failed bso_ids into a response. + ( $r: expr) => { + match $r { + Ok(_) => success.extend(bso_ids.clone()), + Err(e) if e.is_conflict() => return Err(e.into()), + Err(apperr) => { + if let ApiErrorKind::Db(dberr) = apperr.kind() { + // If we're over quota, return immediately with a 403 to let the client know. + // Otherwise the client will simply keep retrying records. + if let DbErrorKind::Quota = dberr.kind() { + return Err(apperr.into()); + } + }; + failed.extend( + bso_ids + .clone() + .into_iter() + .map(|id| (id, "db error".to_owned())), + ) } }; - failed.extend(bso_ids.into_iter().map(|id| (id, "db error".to_owned()))) } - }; - - let mut resp = json!({ - "success": success, - "failed": failed, - }); + } + // If we're not committing the current set of records yet. if !breq.commit { + // and there are bsos included in this message. + if !coll.bsos.valid.is_empty() { + // Append the data to the requested batch. + let result = { + dbg!("Batch: Appending to {}", &new_batch.id); + db.append_to_batch(params::AppendToBatch { + user_id: coll.user_id.clone(), + collection: coll.collection.clone(), + batch: new_batch.clone(), + bsos: coll.bsos.valid.into_iter().map(From::from).collect(), + }) + .await + }; + handle_result!(result); + } + + // Return the batch append response without committing the current + // batch to the BSO table. + resp["success"] = json!(success); + resp["failed"] = json!(failed); + resp["batch"] = json!(&new_batch.id); return Ok(HttpResponse::Accepted().json(resp)); } @@ -406,6 +392,8 @@ pub async fn post_collection_batch( // TODO: validate *actual* sizes of the batch items // (max_total_records, max_total_bytes) + // + // First, write the pending batch BSO data into the BSO table. let modified = if let Some(batch) = batch { db.commit_batch(params::CommitBatch { user_id: user_id.clone(), @@ -418,6 +406,41 @@ pub async fn post_collection_batch( return Err(ApiError::from(err).into()); }; + // Then, write the BSOs contained in the commit request into the BSO table. + // (This presumes that the BSOs contained in the final "commit" message are + // newer, and thus more "correct", than any prior BSO info that may have been + // included in the prior batch creation messages. The client shouldn't really + // be including BSOs with the commit message, however it does and we should + // handle that case.) + if !coll.bsos.valid.is_empty() { + trace!("Batch: writing commit message bsos"); + let result = db + .post_bsos(params::PostBsos { + user_id: coll.user_id.clone(), + collection: coll.collection.clone(), + bsos: coll + .bsos + .valid + .into_iter() + .map(|batch_bso| params::PostCollectionBso { + id: batch_bso.id, + sortindex: batch_bso.sortindex, + payload: batch_bso.payload, + ttl: batch_bso.ttl, + }) + .collect(), + for_batch: false, + failed: Default::default(), + }) + .await + .map(|_| ()); + + handle_result!(result); + } + + // Always return success, failed, & modified + resp["success"] = json!(success); + resp["failed"] = json!(failed); resp["modified"] = json!(modified); trace!("Batch: Returning result: {}", &resp); Ok(HttpResponse::build(StatusCode::OK) diff --git a/tools/integration_tests/test_storage.py b/tools/integration_tests/test_storage.py index c50ddb34..37b53d35 100644 --- a/tools/integration_tests/test_storage.py +++ b/tools/integration_tests/test_storage.py @@ -1582,6 +1582,35 @@ class TestStorage(StorageFunctionalTestCase): resp3 = self.app.get(endpoint + '/e') self.assertEquals(committed, resp3.json['modified']) + + def test_aaa_batch_commit_collision(self): + # It's possible that a batch contain a BSO inside a batch as well + # as inside the final "commit" message. This is a bit of a problem + # for spanner because of conflicting ways that the data is written + # to the database and the discoverability of IDs in previously + # submitted batches. + endpoint = self.root + '/storage/xxx_col2' + orig = "Letting the days go by" + repl = "Same as it ever was" + + batch_num = self.retry_post_json( + endpoint + "?batch=true", + [{"id":"b0", "payload": orig}] + ).json["batch"] + + resp = self.retry_post_json( + endpoint + "?batch={}&commit=true".format(batch_num), + [{"id":"b0", "payload": repl}] + ) + + # this should succeed, using the newerer payload value. + assert resp.json["failed"] == {}, "batch commit failed" + assert resp.json["success"] == ["b0"], "batch commit id incorrect" + resp = self.app.get(endpoint+"?full=1") + assert resp.json[0].get( + "payload") == repl, "wrong payload returned" + + def test_we_dont_need_no_stinkin_batches(self): endpoint = self.root + '/storage/xxx_col2'