From 1d3a8ae3ad2d4f876a4da7d1ccdb546f93818d0f Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Fri, 30 Nov 2018 14:47:52 -0800 Subject: [PATCH 1/2] fix: add the batch extractor + handler Closes #105, #86 --- src/db/mock.rs | 2 +- src/db/mod.rs | 2 +- src/db/mysql/batch.rs | 2 +- src/db/mysql/models.rs | 3 +- src/error.rs | 7 +- src/main.rs | 1 - src/web/extractors.rs | 247 +++++++++++++++++++++++++++++++++-------- src/web/handlers.rs | 119 +++++++++++++++++++- 8 files changed, 325 insertions(+), 58 deletions(-) diff --git a/src/db/mock.rs b/src/db/mock.rs index 3a9f524b..61b64ecb 100644 --- a/src/db/mock.rs +++ b/src/db/mock.rs @@ -75,7 +75,7 @@ impl Db for MockDb { mock_db_method!(validate_batch, ValidateBatch); mock_db_method!(append_to_batch, AppendToBatch); mock_db_method!(get_batch, GetBatch, Option); - mock_db_method!(delete_batch, DeleteBatch); + mock_db_method!(commit_batch, CommitBatch); } unsafe impl Send for MockDb {} diff --git a/src/db/mod.rs b/src/db/mod.rs index 45d5ee7a..79f53956 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -119,7 +119,7 @@ pub trait Db: Send + Debug { fn get_batch(&self, params: params::GetBatch) -> DbFuture>; - fn delete_batch(&self, params: params::DeleteBatch) -> DbFuture; + fn commit_batch(&self, params: params::CommitBatch) -> DbFuture; fn box_clone(&self) -> Box; diff --git a/src/db/mysql/batch.rs b/src/db/mysql/batch.rs index 574e51cb..5c3dcf38 100644 --- a/src/db/mysql/batch.rs +++ b/src/db/mysql/batch.rs @@ -76,7 +76,7 @@ pub fn get(db: &MysqlDb, params: params::GetBatch) -> Result Result<()> { +fn delete(db: &MysqlDb, params: params::DeleteBatch) -> Result<()> { let user_id = params.user_id.legacy_id as i32; let collection_id = db.get_collection_id(¶ms.collection)?; diesel::delete(batches::table) diff --git a/src/db/mysql/models.rs b/src/db/mysql/models.rs index 101e6e38..a7d43310 100644 --- a/src/db/mysql/models.rs +++ b/src/db/mysql/models.rs @@ -693,7 +693,6 @@ impl MysqlDb { batch_db_method!(create_batch_sync, create, CreateBatch); batch_db_method!(validate_batch_sync, validate, ValidateBatch); batch_db_method!(append_to_batch_sync, append, AppendToBatch); - batch_db_method!(delete_batch_sync, delete, DeleteBatch); batch_db_method!(commit_batch_sync, commit, CommitBatch); pub fn get_batch_sync(&self, params: params::GetBatch) -> Result> { @@ -803,7 +802,7 @@ impl Db for MysqlDb { GetBatch, Option ); - sync_db_method!(delete_batch, delete_batch_sync, DeleteBatch); + sync_db_method!(commit_batch, commit_batch_sync, CommitBatch); } #[derive(Debug, QueryableByName)] diff --git a/src/error.rs b/src/error.rs index 2ba0fe30..bfe1286d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -85,7 +85,7 @@ impl ApiError { false } - fn is_conflict(&self) -> bool { + pub fn is_conflict(&self) -> bool { match self.kind() { ApiErrorKind::Db(dbe) => match dbe.kind() { DbErrorKind::Conflict => return true, @@ -99,7 +99,10 @@ impl ApiError { fn weave_error_code(&self) -> WeaveError { match self.kind() { ApiErrorKind::Validation(ver) => match ver.kind() { - ValidationErrorKind::FromDetails(ref _description, ref location, name) => { + ValidationErrorKind::FromDetails(ref description, ref location, name) => { + if description == "size-limit-exceeded" { + return WeaveError::SizeLimitExceeded; + } let name = name.clone().unwrap_or("".to_owned()); if *location == RequestErrorLocation::Body && ["bso", "bsos"].contains(&name.as_str()) diff --git a/src/main.rs b/src/main.rs index 8fb75e40..06cdc277 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,6 @@ extern crate diesel_logger; extern crate diesel_migrations; extern crate docopt; extern crate env_logger; -#[macro_use] extern crate failure; extern crate futures; #[macro_use] diff --git a/src/web/extractors.rs b/src/web/extractors.rs index bf1daf97..0eca4607 100644 --- a/src/web/extractors.rs +++ b/src/web/extractors.rs @@ -2,8 +2,7 @@ //! //! Handles ensuring the header's, body, and query parameters are correct, extraction to //! relevant types, and failing correctly with the appropriate errors if issues arise. -use std::collections::HashMap; -use std::str::FromStr; +use std::{self, collections::HashMap, str::FromStr}; use actix_web::http::header::{HeaderValue, ACCEPT, CONTENT_TYPE}; use actix_web::{ @@ -17,7 +16,7 @@ use serde::de::{Deserialize, Deserializer, Error as SerdeError}; use serde_json::Value; use validator::{Validate, ValidationError}; -use db::{util::SyncTimestamp, Db, Sorting}; +use db::{util::SyncTimestamp, Db, DbError, DbErrorKind, Sorting}; use error::{ApiError, ApiResult}; use server::ServerState; use web::{auth::HawkPayload, error::ValidationErrorKind}; @@ -34,6 +33,7 @@ lazy_static! { Regex::new(r#"IV":\s*"AAAAAAAAAAAAAAAAAAAAAA=="#).unwrap(); static ref VALID_ID_REGEX: Regex = Regex::new(r"^[ -~]{1,64}$").unwrap(); static ref VALID_COLLECTION_ID_REGEX: Regex = Regex::new(r"^[a-zA-Z0-9._-]{1,32}$").unwrap(); + static ref TRUE_REGEX: Regex = Regex::new("^(?i)true$").unwrap(); } #[derive(Deserialize)] @@ -450,6 +450,7 @@ pub struct CollectionPostRequest { pub user_id: HawkIdentifier, pub query: BsoQueryParams, pub bsos: BsoBodies, + pub batch: Option, } impl FromRequest for CollectionPostRequest { @@ -463,6 +464,7 @@ impl FromRequest for CollectionPostRequest { /// - If the collection is 'crypto', known bad payloads are checked for /// - Any valid BSO's beyond `BATCH_MAX_RECORDS` are moved to invalid fn from_request(req: &HttpRequest, _: &Self::Config) -> Self::Result { + let req = req.clone(); let max_post_records = req.state().limits.max_post_records as i64; let fut = <( HawkIdentifier, @@ -470,7 +472,7 @@ impl FromRequest for CollectionPostRequest { CollectionParam, BsoQueryParams, BsoBodies, - )>::extract(req).and_then(move |(user_id, db, collection, query, mut bsos)| { + )>::extract(&req).and_then(move |(user_id, db, collection, query, mut bsos)| { let collection = collection.collection.clone(); if collection == "crypto" { // Verify the client didn't mess up the crypto if we have a payload @@ -499,12 +501,18 @@ impl FromRequest for CollectionPostRequest { } } + let batch = match >::extract(&req) { + Ok(batch) => batch, + Err(e) => return future::err(e.into()), + }; + future::ok(CollectionPostRequest { collection, db, user_id, query, bsos, + batch, }) }); @@ -706,8 +714,8 @@ pub struct BsoQueryParams { pub offset: Option, /// a comma-separated list of BSO ids (list of strings) - #[validate(custom = "validate_qs_ids")] #[serde(deserialize_with = "deserialize_comma_sep_string", default)] + #[validate(custom = "validate_qs_ids")] pub ids: Vec, // flag, whether to include full bodies (bool) @@ -737,6 +745,107 @@ impl FromRequest for BsoQueryParams { } } +#[derive(Debug, Default, Clone, Deserialize, Validate)] +#[serde(default)] +pub struct BatchParams { + pub batch: Option, + #[validate(custom = "validate_qs_commit")] + pub commit: Option, +} + +#[derive(Debug, Default, Clone, Deserialize)] +pub struct BatchRequest { + pub id: Option, + pub commit: bool, +} + +impl FromRequest for Option { + type Config = (); + type Result = ApiResult>; + + fn from_request(req: &HttpRequest, _: &Self::Config) -> Self::Result { + let params = Query::::from_request(req, &()) + .map_err(|e| { + ValidationErrorKind::FromDetails( + e.to_string(), + RequestErrorLocation::QueryString, + None, + ) + })? + .into_inner(); + + let limits = &req.state().limits; + let checks = [ + ("X-Weave-Records", limits.max_post_records), + ("X-Weave-Bytes", limits.max_post_bytes), + ("X-Weave-Total-Records", limits.max_total_records), + ("X-Weave-Total-Bytes", limits.max_total_bytes), + ]; + for (header, limit) in &checks { + let value = match req.headers().get(*header) { + Some(value) => value.to_str().map_err(|e| { + let err: ApiError = ValidationErrorKind::FromDetails( + e.to_string(), + RequestErrorLocation::Header, + Some((*header).to_owned()), + ).into(); + err + })?, + None => continue, + }; + let count = value.parse::<(u32)>().map_err(|_| { + let err: ApiError = ValidationErrorKind::FromDetails( + format!("Invalid integer value: {}", value), + RequestErrorLocation::Header, + Some((*header).to_owned()), + ).into(); + err + })?; + if count > *limit { + return Err(ValidationErrorKind::FromDetails( + "size-limit-exceeded".to_owned(), + RequestErrorLocation::Header, + None, + ).into()) + } + } + + if params.batch.is_none() && params.commit.is_none() { + // No batch options requested + return Ok(None); + } else if params.batch.is_none() { + // commit w/ no batch ID is an error + let err: DbError = DbErrorKind::BatchNotFound.into(); + return Err(err.into()); + } + + params.validate().map_err(|e| { + ValidationErrorKind::FromValidationErrors(e, RequestErrorLocation::QueryString) + })?; + + let id = match params.batch { + None => None, + Some(ref batch) if batch == "" || TRUE_REGEX.is_match(batch) => None, + Some(ref batch) => { + let bytes = base64::decode(batch).unwrap_or(batch.as_bytes().to_vec()); + let decoded = std::str::from_utf8(&bytes).unwrap_or(batch); + Some(decoded.parse::().map_err(|_| { + ValidationErrorKind::FromDetails( + format!(r#"Invalid batch ID: "{}""#, batch), + RequestErrorLocation::QueryString, + Some("batch".to_owned()), + ) + })?) + } + }; + + Ok(Some(BatchRequest { + id, + commit: params.commit.is_some(), + })) + } +} + /// PreCondition Header /// /// It's valid to include a X-If-Modified-Since or X-If-Unmodified-Since header but not @@ -844,6 +953,17 @@ fn validate_qs_ids(ids: &Vec) -> Result<(), ValidationError> { Ok(()) } +/// Verifies the batch commit field is valid +fn validate_qs_commit(commit: &String) -> Result<(), ValidationError> { + if !TRUE_REGEX.is_match(commit) { + return Err(request_error( + r#"commit parameter must be "true" to apply batches"#, + RequestErrorLocation::QueryString, + )); + } + Ok(()) +} + /// Verifies the BSO sortindex is in the valid range fn validate_body_bso_sortindex(sort: i32) -> Result<(), ValidationError> { if BSO_MIN_SORTINDEX_VALUE <= sort && sort <= BSO_MAX_SORTINDEX_VALUE { @@ -922,8 +1042,8 @@ mod tests { use std::sync::Arc; use actix_web::test::TestRequest; - use actix_web::HttpResponse; use actix_web::{http::Method, Binary, Body}; + use actix_web::{Error, HttpResponse}; use base64; use hawk::{Credentials, Key, RequestBuilder}; use hmac::{Hmac, Mac}; @@ -1005,6 +1125,28 @@ mod tests { format!("Hawk {}", request.make_header(&credentials).unwrap()) } + fn post_collection(qs: &str, body: &serde_json::Value) -> Result { + let payload = HawkPayload::test_default(); + let state = make_state(); + let path = format!( + "/storage/1.5/1/storage/tabs{}{}", + if !qs.is_empty() { "?" } else { "" }, + qs + ); + let header = create_valid_hawk_header(&payload, &state, "POST", &path, "localhost", 5000); + let req = TestRequest::with_state(state) + .header("authorization", header) + .header("content-type", "application/json") + .method(Method::POST) + .uri(&format!("http://localhost:5000{}", path)) + .set_payload(body.to_string()) + .param("uid", "1") + .param("collection", "tabs") + .finish(); + req.extensions_mut().insert(make_db()); + CollectionPostRequest::extract(&req).wait() + } + #[test] fn test_invalid_query_args() { let req = TestRequest::with_state(make_state()) @@ -1250,70 +1392,79 @@ mod tests { #[test] fn test_valid_collection_post_request() { - let payload = HawkPayload::test_default(); - let state = make_state(); - let header = create_valid_hawk_header( - &payload, - &state, - "POST", - "/storage/1.5/1/storage/tabs", - "localhost", - 5000, - ); // Batch requests require id's on each BSO let bso_body = json!([ {"id": "123", "payload": "xxx", "sortindex": 23}, {"id": "456", "payload": "xxxasdf", "sortindex": 23} ]); - let req = TestRequest::with_state(state) - .header("authorization", header) - .header("content-type", "application/json") - .method(Method::POST) - .uri("http://localhost:5000/storage/1.5/1/storage/tabs") - .set_payload(bso_body.to_string()) - .param("uid", "1") - .param("collection", "tabs") - .finish(); - req.extensions_mut().insert(make_db()); - let result = CollectionPostRequest::extract(&req).wait().unwrap(); + let result = post_collection("", &bso_body).unwrap(); assert_eq!(result.user_id.legacy_id, 1); assert_eq!(&result.collection, "tabs"); assert_eq!(result.bsos.valid.len(), 2); + assert!(result.batch.is_none()); } #[test] fn test_invalid_collection_post_request() { - let payload = HawkPayload::test_default(); - let state = make_state(); - let header = create_valid_hawk_header( - &payload, - &state, - "POST", - "/storage/1.5/1/storage/tabs", - "localhost", - 5000, - ); // Add extra fields, these will be invalid let bso_body = json!([ {"id": "1", "sortindex": 23, "jump": 1}, {"id": "2", "sortindex": -99, "hop": "low"} ]); - let req = TestRequest::with_state(state) - .header("authorization", header) - .header("content-type", "application/json") - .method(Method::POST) - .uri("http://localhost:5000/storage/1.5/1/storage/tabs") - .set_payload(bso_body.to_string()) - .param("uid", "1") - .param("collection", "tabs") - .finish(); - req.extensions_mut().insert(make_db()); - let result = CollectionPostRequest::extract(&req).wait().unwrap(); + let result = post_collection("", &bso_body).unwrap(); assert_eq!(result.user_id.legacy_id, 1); assert_eq!(&result.collection, "tabs"); assert_eq!(result.bsos.invalid.len(), 2); } + #[test] + fn test_valid_collection_batch_post_request() { + // If the "batch" parameter is has no value or has a value of "true" + // then a new batch will be created. + let bso_body = json!([ + {"id": "123", "payload": "xxx", "sortindex": 23}, + {"id": "456", "payload": "xxxasdf", "sortindex": 23} + ]); + let result = post_collection("batch=True", &bso_body).unwrap(); + assert_eq!(result.user_id.legacy_id, 1); + assert_eq!(&result.collection, "tabs"); + assert_eq!(result.bsos.valid.len(), 2); + let batch = result.batch.unwrap(); + assert_eq!(batch.id, None); + assert_eq!(batch.commit, false); + + let result = post_collection("batch", &bso_body).unwrap(); + let batch = result.batch.unwrap(); + assert_eq!(batch.id, None); + assert_eq!(batch.commit, false); + + let result = post_collection("batch=MTI%3D&commit=true", &bso_body).unwrap(); + let batch = result.batch.unwrap(); + assert_eq!(batch.id, Some(12)); + assert_eq!(batch.commit, true); + } + + #[test] + fn test_invalid_collection_batch_post_request() { + let bso_body = json!([ + {"id": "123", "payload": "xxx", "sortindex": 23}, + {"id": "456", "payload": "xxxasdf", "sortindex": 23} + ]); + let result = post_collection("batch=sammich", &bso_body); + assert!(result.is_err()); + let response: HttpResponse = result.err().unwrap().into(); + assert_eq!(response.status(), 400); + let body = extract_body_as_str(&response); + assert_eq!(body, "0"); + + let result = post_collection("commit=true", &bso_body); + assert!(result.is_err()); + let response: HttpResponse = result.err().unwrap().into(); + assert_eq!(response.status(), 400); + let body = extract_body_as_str(&response); + assert_eq!(body, "0"); + } + #[test] fn test_invalid_precondition_headers() { fn assert_invalid_header( diff --git a/src/web/handlers.rs b/src/web/handlers.rs index fd0b9ea5..8d533d50 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -2,10 +2,10 @@ use std::collections::HashMap; use actix_web::{http::StatusCode, FutureResponse, HttpResponse, State}; -use futures::future::{self, Future}; +use futures::future::{self, Either, Future}; use serde::Serialize; -use db::{params, results::Paginated}; +use db::{params, results::Paginated, DbError, DbErrorKind}; use error::ApiError; use server::ServerState; use web::extractors::{ @@ -172,6 +172,9 @@ where } pub fn post_collection(coll: CollectionPostRequest) -> FutureResponse { + if coll.batch.is_some() { + return post_collection_batch(coll); + } Box::new( coll.db .post_bsos(params::PostBsos { @@ -188,6 +191,118 @@ pub fn post_collection(coll: CollectionPostRequest) -> FutureResponse FutureResponse { + // Bail early if we have nonsensical arguments + let breq = match coll.batch.clone() { + Some(breq) => breq, + None => { + let err: DbError = DbErrorKind::BatchNotFound.into(); + let err: ApiError = err.into(); + return Box::new(future::err(err.into())) + }, + }; + + let fut = if let Some(id) = breq.id { + // Validate the batch before attempting a full append (for efficiency) + Either::A( + coll.db + .validate_batch(params::ValidateBatch { + user_id: coll.user_id.clone(), + collection: coll.collection.clone(), + id, + }).and_then(move |is_valid| { + if is_valid { + Box::new(future::ok(id)) + } else { + let err: DbError = DbErrorKind::BatchNotFound.into(); + Box::new(future::err(err.into())) + } + }), + ) + } else { + Either::B(coll.db.create_batch(params::CreateBatch { + user_id: coll.user_id.clone(), + collection: coll.collection.clone(), + bsos: vec![], + }) + ) + }; + + let db = coll.db.clone(); + let user_id = coll.user_id.clone(); + let collection = coll.collection.clone(); + + let fut = fut + .and_then(move |id| { + let mut success = vec![]; + let mut failed = coll.bsos.invalid.clone(); + let bso_ids: Vec<_> = coll.bsos.valid.iter().map(|bso| bso.id.clone()).collect(); + + coll.db + .append_to_batch(params::AppendToBatch { + user_id: coll.user_id.clone(), + collection: coll.collection.clone(), + id, + bsos: coll.bsos.valid.into_iter().map(From::from).collect(), + }) + .then(move |result| { + match result { + Ok(_) => success.extend(bso_ids), + Err(e) => { + // NLL: not a guard as: (E0008) "moves value into + // pattern guard" + if e.is_conflict() { + return future::err(e); + } + failed.extend( + bso_ids.into_iter().map(|id| (id, "db error".to_owned())), + ) + } + }; + future::ok((id, success, failed)) + }) + }).map_err(From::from); + + Box::new(fut.and_then(move |(id, success, failed)| { + let mut resp = json!({ + "success": success, + "failed": failed, + }); + + if !breq.commit { + resp["batch"] = json!(base64::encode(&id.to_string())); + return Either::A(future::ok(HttpResponse::Accepted().json(resp))); + } + + let fut = db + .get_batch(params::GetBatch { + user_id: user_id.clone(), + collection: collection.clone(), + id, + }).and_then(move |batch| { + // TODO: validate *actual* sizes of the batch items + // (max_total_records, max_total_bytes) + if let Some(batch) = batch { + db.commit_batch(params::CommitBatch { + user_id: user_id.clone(), + collection: collection.clone(), + batch, + }) + } else { + let err: DbError = DbErrorKind::BatchNotFound.into(); + Box::new(future::err(err.into())) + } + }).map_err(From::from) + .map(|result| { + resp["modified"] = json!(result.modified); + HttpResponse::build(StatusCode::OK) + .header("X-Last-Modified", result.modified.as_header()) + .json(resp) + }); + Either::B(fut) + })) +} + pub fn delete_bso(bso_req: BsoRequest) -> FutureResponse { Box::new( bso_req From fe07704495f9e099b95fdb86853d5711ada16ee2 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Wed, 12 Dec 2018 13:05:16 -0800 Subject: [PATCH 2/2] chore: cargo fmt (1.31.0) --- src/db/mod.rs | 6 ++- src/db/mysql/batch.rs | 9 +++-- src/db/mysql/models.rs | 15 ++++--- src/db/util.rs | 9 ++--- src/error.rs | 3 +- src/server/mod.rs | 27 ++++++++----- src/server/test.rs | 15 ++++--- src/web/extractors.rs | 90 ++++++++++++++++++++++++++---------------- src/web/handlers.rs | 47 +++++++++++++--------- src/web/middleware.rs | 24 +++++++---- 10 files changed, 154 insertions(+), 91 deletions(-) diff --git a/src/db/mod.rs b/src/db/mod.rs index 79f53956..b98a411a 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -145,7 +145,8 @@ pub trait Db: Send + Debug { self.get_collection_timestamp(params::GetCollectionTimestamp { user_id, collection, - }).or_else(|e| { + }) + .or_else(|e| { if e.is_colllection_not_found() { Ok(SyncTimestamp::from_seconds(0f64)) } else { @@ -160,7 +161,8 @@ pub trait Db: Send + Debug { user_id, collection, id: bso, - }).or_else(|e| { + }) + .or_else(|e| { if e.is_colllection_not_found() { Ok(SyncTimestamp::from_seconds(0f64)) } else { diff --git a/src/db/mysql/batch.rs b/src/db/mysql/batch.rs index 5c3dcf38..c7e94de5 100644 --- a/src/db/mysql/batch.rs +++ b/src/db/mysql/batch.rs @@ -27,7 +27,8 @@ pub fn create(db: &MysqlDb, params: params::CreateBatch) -> Result Result> { serde_json::from_str(line).map_err(|e| { DbError::internal(&format!("Couldn't deserialize batch::load_bsos bso: {}", e)) }) - }).collect() + }) + .collect() } /// Serialize bsos into strings separated by newlines @@ -125,7 +127,8 @@ fn bsos_to_batch_string(bsos: &[params::PostCollectionBso]) -> Result { serde_json::to_string(bso).map_err(|e| { DbError::internal(&format!("Couldn't serialize batch::create bso: {}", e)) }) - }).collect(); + }) + .collect(); batch_strings.map(|bs| { format!( "{}{}", diff --git a/src/db/mysql/models.rs b/src/db/mysql/models.rs index a7d43310..53d324f9 100644 --- a/src/db/mysql/models.rs +++ b/src/db/mysql/models.rs @@ -362,7 +362,8 @@ impl MysqlDb { bso::payload.eq(payload), bso::modified.eq(timestamp), bso::expiry.eq(timestamp + (ttl as i64 * 1000)), - )).execute(&self.conn)?; + )) + .execute(&self.conn)?; } self.touch_collection(user_id as u32, collection_id) }) @@ -388,7 +389,8 @@ impl MysqlDb { bso::payload, bso::sortindex, bso::expiry, - )).filter(bso::user_id.eq(user_id)) + )) + .filter(bso::user_id.eq(user_id)) .filter(bso::collection_id.eq(collection_id as i32)) // XXX: .filter(bso::expiry.gt(self.timestamp().as_i64())) .into_boxed(); @@ -461,7 +463,8 @@ impl MysqlDb { bso::payload, bso::sortindex, bso::expiry, - )).filter(bso::user_id.eq(user_id as i32)) + )) + .filter(bso::user_id.eq(user_id as i32)) .filter(bso::collection_id.eq(&collection_id)) .filter(bso::id.eq(¶ms.id)) .filter(bso::expiry.ge(self.timestamp().as_i64())) @@ -584,7 +587,8 @@ impl MysqlDb { .into_iter() .map(|cr| { SyncTimestamp::from_i64(cr.modified).and_then(|ts| Ok((cr.collection_id, ts))) - }).collect::>>()?; + }) + .collect::>>()?; self.map_collection_names(modifieds) } @@ -597,7 +601,8 @@ impl MysqlDb { .remove(&id) .map(|name| (name, value)) .ok_or_else(|| DbError::internal("load_collection_names get")) - }).collect() + }) + .collect() } fn load_collection_names<'a>( diff --git a/src/db/util.rs b/src/db/util.rs index c3f5ea00..00d488e2 100644 --- a/src/db/util.rs +++ b/src/db/util.rs @@ -21,11 +21,7 @@ pub fn ms_since_epoch() -> i64 { /// Internally represents a Sync timestamp as a u64 representing milliseconds since the epoch. #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Deserialize, Serialize, FromSqlRow)] pub struct SyncTimestamp( - #[serde( - deserialize_with = "deserialize_ts", - serialize_with = "serialize_ts" - )] - u64, + #[serde(deserialize_with = "deserialize_ts", serialize_with = "serialize_ts")] u64, ); impl SyncTimestamp { @@ -48,7 +44,8 @@ impl SyncTimestamp { } else { Ok(v) } - }).map(|v: f64| (v * 1_000f64) as u64) + }) + .map(|v: f64| (v * 1_000f64) as u64) .map(SyncTimestamp::from_milliseconds) } diff --git a/src/error.rs b/src/error.rs index bfe1286d..7b1bd860 100644 --- a/src/error.rs +++ b/src/error.rs @@ -148,7 +148,8 @@ impl ResponseError for ApiError { HttpResponse::build(self.status) .if_true(self.is_conflict(), |resp| { resp.header("Retry-After", RETRY_AFTER.to_string()); - }).json(self.weave_error_code() as i32) + }) + .json(self.weave_error_code() as i32) } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 7949a4c3..05301003 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -15,27 +15,35 @@ macro_rules! init_routes { ($app:expr) => { $app.resource("/1.5/{uid}/info/collections", |r| { r.method(http::Method::GET).with(handlers::get_collections); - }).resource("/1.5/{uid}/info/collection_counts", |r| { + }) + .resource("/1.5/{uid}/info/collection_counts", |r| { r.method(http::Method::GET) .with(handlers::get_collection_counts); - }).resource("/1.5/{uid}/info/collection_usage", |r| { + }) + .resource("/1.5/{uid}/info/collection_usage", |r| { r.method(http::Method::GET) .with(handlers::get_collection_usage); - }).resource("/1.5/{uid}/info/configuration", |r| { + }) + .resource("/1.5/{uid}/info/configuration", |r| { r.method(http::Method::GET) .with(handlers::get_configuration); - }).resource("/1.5/{uid}/info/quota", |r| { + }) + .resource("/1.5/{uid}/info/quota", |r| { r.method(http::Method::GET).with(handlers::get_quota); - }).resource("/1.5/{uid}", |r| { + }) + .resource("/1.5/{uid}", |r| { r.method(http::Method::DELETE).with(handlers::delete_all); - }).resource("/1.5/{uid}/storage", |r| { + }) + .resource("/1.5/{uid}/storage", |r| { r.method(http::Method::DELETE).with(handlers::delete_all); - }).resource("/1.5/{uid}/storage/{collection}", |r| { + }) + .resource("/1.5/{uid}/storage/{collection}", |r| { r.method(http::Method::DELETE) .with(handlers::delete_collection); r.method(http::Method::GET).with(handlers::get_collection); r.method(http::Method::POST).with(handlers::post_collection); - }).resource("/1.5/{uid}/storage/{collection}/{bso}", |r| { + }) + .resource("/1.5/{uid}/storage/{collection}/{bso}", |r| { r.method(http::Method::DELETE).with(handlers::delete_bso); r.method(http::Method::GET).with(handlers::get_bso); r.method(http::Method::PUT).with(handlers::put_bso); @@ -89,7 +97,8 @@ impl Server { }; build_app(state) - }).bind(format!("127.0.0.1:{}", settings.port)) + }) + .bind(format!("127.0.0.1:{}", settings.port)) .unwrap() .start(); Ok(sys) diff --git a/src/server/test.rs b/src/server/test.rs index b2b6b9b0..06f05d28 100644 --- a/src/server/test.rs +++ b/src/server/test.rs @@ -64,7 +64,8 @@ fn create_request(server: &TestServer, method: http::Method, path: &str) -> Clie .set_header( "Authorization", create_hawk_header(method.as_str(), server.addr().port(), path), - ).finish() + ) + .finish() .unwrap() } @@ -291,13 +292,15 @@ fn invalid_content_type() { server.addr().port(), "/1.5/42/storage/bookmarks/wibble", ), - ).set_header("Content-Type", "application/javascript") + ) + .set_header("Content-Type", "application/javascript") .json(BsoBody { id: Some("wibble".to_string()), sortindex: Some(0), payload: Some("wibble".to_string()), ttl: Some(31536000), - }).unwrap(); + }) + .unwrap(); let response = server.execute(request.send()).unwrap(); assert_eq!(response.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); @@ -307,13 +310,15 @@ fn invalid_content_type() { .set_header( "Authorization", create_hawk_header("POST", server.addr().port(), "/1.5/42/storage/bookmarks"), - ).set_header("Content-Type", "application/javascript") + ) + .set_header("Content-Type", "application/javascript") .json(json!([BsoBody { id: Some("wibble".to_string()), sortindex: Some(0), payload: Some("wibble".to_string()), ttl: Some(31536000), - }])).unwrap(); + }])) + .unwrap(); let response = server.execute(request.send()).unwrap(); assert_eq!(response.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE); diff --git a/src/web/extractors.rs b/src/web/extractors.rs index 0eca4607..f1625ea1 100644 --- a/src/web/extractors.rs +++ b/src/web/extractors.rs @@ -106,7 +106,8 @@ impl FromRequest for BsoBodies { "Invalid Content-Type".to_owned(), RequestErrorLocation::Header, Some("Content-Type".to_owned()), - ).into(), + ) + .into(), )); } } @@ -125,7 +126,8 @@ impl FromRequest for BsoBodies { "Mimetype/encoding/content-length error".to_owned(), RequestErrorLocation::Header, None, - ).into(), + ) + .into(), )); }; @@ -135,7 +137,8 @@ impl FromRequest for BsoBodies { "Invalid JSON in request body".to_owned(), RequestErrorLocation::Body, Some("bsos".to_owned()), - ).into() + ) + .into() } // Define a new bool to check from a static closure to release the reference on the @@ -201,7 +204,8 @@ impl FromRequest for BsoBodies { "Input BSO has duplicate ID".to_owned(), RequestErrorLocation::Body, Some("bsos".to_owned()), - ).into(), + ) + .into(), ); } else { bso_ids.push(id.clone()); @@ -213,7 +217,8 @@ impl FromRequest for BsoBodies { "Input BSO has no ID".to_owned(), RequestErrorLocation::Body, Some("bsos".to_owned()), - ).into(), + ) + .into(), ); }; match BatchBsoBody::from_raw_bso(&bso) { @@ -267,7 +272,8 @@ impl FromRequest for BsoBody { "Invalid Content-Type".to_owned(), RequestErrorLocation::Header, Some("Content-Type".to_owned()), - ).into(), + ) + .into(), )); } } @@ -282,16 +288,19 @@ impl FromRequest for BsoBody { e.to_string(), RequestErrorLocation::Body, Some("bso".to_owned()), - ).into(); + ) + .into(); err.into() - }).and_then(move |bso: Json| { + }) + .and_then(move |bso: Json| { // Check the max payload size manually with our desired limit if bso.payload.as_ref().map(|s| s.len()).unwrap_or_default() > max_payload_size { let err: ApiError = ValidationErrorKind::FromDetails( "payload too large".to_owned(), RequestErrorLocation::Body, Some("bso".to_owned()), - ).into(); + ) + .into(); return future::err(err.into()); } if let Err(e) = bso.validate() { @@ -330,7 +339,8 @@ impl FromRequest for BsoParam { RequestErrorLocation::Path, Some("bso".to_owned()), ) - })?.into_inner(); + })? + .into_inner(); bso.validate().map_err(|e| { ValidationErrorKind::FromValidationErrors(e, RequestErrorLocation::Path) })?; @@ -362,7 +372,8 @@ impl FromRequest for CollectionParam { RequestErrorLocation::Path, Some("collection".to_owned()), ) - })?.into_inner(); + })? + .into_inner(); collection.validate().map_err(|e| { ValidationErrorKind::FromValidationErrors(e, RequestErrorLocation::Path) })?; @@ -426,7 +437,8 @@ impl FromRequest for CollectionRequest { "Invalid accept".to_string(), RequestErrorLocation::Header, Some("accept".to_string()), - ).into()); + ) + .into()); } None => ReplyFormat::Json, }; @@ -472,7 +484,8 @@ impl FromRequest for CollectionPostRequest { CollectionParam, BsoQueryParams, BsoBodies, - )>::extract(&req).and_then(move |(user_id, db, collection, query, mut bsos)| { + )>::extract(&req) + .and_then(move |(user_id, db, collection, query, mut bsos)| { let collection = collection.collection.clone(); if collection == "crypto" { // Verify the client didn't mess up the crypto if we have a payload @@ -484,7 +497,8 @@ impl FromRequest for CollectionPostRequest { "Known-bad BSO payload".to_owned(), RequestErrorLocation::Body, Some("bsos".to_owned()), - ).into(), + ) + .into(), ); } } @@ -579,7 +593,8 @@ impl FromRequest for BsoPutRequest { BsoQueryParams, BsoParam, BsoBody, - )>::extract(req).and_then(|(user_id, db, collection, query, bso, body)| { + )>::extract(req) + .and_then(|(user_id, db, collection, query, bso, body)| { let collection = collection.collection.clone(); if collection == "crypto" { // Verify the client didn't mess up the crypto if we have a payload @@ -590,7 +605,8 @@ impl FromRequest for BsoPutRequest { "Known-bad BSO payload".to_owned(), RequestErrorLocation::Body, Some("bsos".to_owned()), - ).into(), + ) + .into(), ); } } @@ -696,11 +712,11 @@ impl FromRequest for Box { #[serde(default)] pub struct BsoQueryParams { /// lower-bound on last-modified time - #[serde(deserialize_with = "deserialize_sync_timestamp",)] + #[serde(deserialize_with = "deserialize_sync_timestamp")] pub newer: Option, /// upper-bound on last-modified time - #[serde(deserialize_with = "deserialize_sync_timestamp",)] + #[serde(deserialize_with = "deserialize_sync_timestamp")] pub older: Option, /// order in which to return results (string) @@ -719,7 +735,7 @@ pub struct BsoQueryParams { pub ids: Vec, // flag, whether to include full bodies (bool) - #[serde(deserialize_with = "deserialize_present_value",)] + #[serde(deserialize_with = "deserialize_present_value")] pub full: bool, } @@ -737,7 +753,8 @@ impl FromRequest for BsoQueryParams { RequestErrorLocation::QueryString, None, ) - })?.into_inner(); + })? + .into_inner(); params.validate().map_err(|e| { ValidationErrorKind::FromValidationErrors(e, RequestErrorLocation::QueryString) })?; @@ -788,25 +805,28 @@ impl FromRequest for Option { e.to_string(), RequestErrorLocation::Header, Some((*header).to_owned()), - ).into(); + ) + .into(); err })?, None => continue, }; let count = value.parse::<(u32)>().map_err(|_| { - let err: ApiError = ValidationErrorKind::FromDetails( - format!("Invalid integer value: {}", value), - RequestErrorLocation::Header, - Some((*header).to_owned()), - ).into(); - err - })?; + let err: ApiError = ValidationErrorKind::FromDetails( + format!("Invalid integer value: {}", value), + RequestErrorLocation::Header, + Some((*header).to_owned()), + ) + .into(); + err + })?; if count > *limit { return Err(ValidationErrorKind::FromDetails( "size-limit-exceeded".to_owned(), RequestErrorLocation::Header, None, - ).into()) + ) + .into()); } } @@ -892,16 +912,20 @@ impl FromRequest for Option { e.to_string(), RequestErrorLocation::Header, Some(field_name.to_owned()), - ).into() - }).and_then(|v| { + ) + .into() + }) + .and_then(|v| { SyncTimestamp::from_header(v).map_err(|e| { ValidationErrorKind::FromDetails( e.to_string(), RequestErrorLocation::Header, Some(field_name.to_owned()), - ).into() + ) + .into() }) - }).map(|v| { + }) + .map(|v| { let header = if field_name == "X-If-Modified-Since" { PreConditionHeader::IfModifiedSince(v) } else { diff --git a/src/web/handlers.rs b/src/web/handlers.rs index 8d533d50..685f831c 100644 --- a/src/web/handlers.rs +++ b/src/web/handlers.rs @@ -98,12 +98,14 @@ pub fn delete_collection(coll: CollectionRequest) -> FutureResponse FutureResponse FutureResponse { let err: DbError = DbErrorKind::BatchNotFound.into(); let err: ApiError = err.into(); - return Box::new(future::err(err.into())) - }, + return Box::new(future::err(err.into())); + } }; let fut = if let Some(id) = breq.id { @@ -210,7 +215,8 @@ pub fn post_collection_batch(coll: CollectionPostRequest) -> FutureResponse FutureResponse FutureResponse FutureResponse FutureResponse FutureResponse { user_id: bso_req.user_id, collection: bso_req.collection, id: bso_req.bso, - }).map_err(From::from) + }) + .map_err(From::from) .map(|result| HttpResponse::Ok().json(json!({ "modified": result }))), ) } @@ -324,7 +331,8 @@ pub fn get_bso(bso_req: BsoRequest) -> FutureResponse { user_id: bso_req.user_id, collection: bso_req.collection, id: bso_req.bso, - }).map_err(From::from) + }) + .map_err(From::from) .map(|result| { result.map_or_else( || HttpResponse::NotFound().finish(), @@ -345,7 +353,8 @@ pub fn put_bso(bso_req: BsoPutRequest) -> FutureResponse { sortindex: bso_req.body.sortindex, payload: bso_req.body.payload, ttl: bso_req.body.ttl, - }).map_err(From::from) + }) + .map_err(From::from) .map(|result| { HttpResponse::build(StatusCode::OK) .header("X-Last-Modified", result.as_header()) diff --git a/src/web/middleware.rs b/src/web/middleware.rs index e89c6b15..d047e4e4 100644 --- a/src/web/middleware.rs +++ b/src/web/middleware.rs @@ -46,14 +46,17 @@ impl Middleware for WeaveTimestamp { let error: ApiError = ApiErrorKind::Internal(format!( "Invalid X-Last-Modified response header: {}", e - )).into(); + )) + .into(); error - })?.parse::() + })? + .parse::() .map_err(|e| { let error: ApiError = ApiErrorKind::Internal(format!( "Invalid X-Last-Modified response header: {}", e - )).into(); + )) + .into(); error })?; if resp_ts > ts { @@ -70,7 +73,8 @@ impl Middleware for WeaveTimestamp { let error: ApiError = ApiErrorKind::Internal(format!( "Invalid X-Weave-Timestamp response header: {}", e - )).into(); + )) + .into(); error })?, ); @@ -108,7 +112,8 @@ impl Middleware for DbTransaction { match *req.method() { Method::GET | Method::HEAD => db.lock_for_read(lc), _ => db.lock_for_write(lc), - }.or_else(move |e| { + } + .or_else(move |e| { // Middleware::response won't be called: rollback immediately db2.rollback().and_then(|_| future::err(e)) }), @@ -123,7 +128,8 @@ impl Middleware for DbTransaction { req.extensions_mut().insert((db, in_transaction)); future::ok(None) }) - }).map_err(Into::into); + }) + .map_err(Into::into); Ok(Started::Future(Box::new(fut))) } @@ -185,7 +191,8 @@ impl Middleware for PreConditionCheck { .header("X-Last-Modified", resource_ts.as_header()) .body(""); // 304 can't return any content future::ok(Some(resp)) - }).map_err(Into::into); + }) + .map_err(Into::into); Ok(Started::Future(Box::new(fut))) } @@ -218,7 +225,8 @@ impl Middleware for PreConditionCheck { resp.headers_mut().insert("X-Last-Modified", ts_header); } future::ok(resp) - }).map_err(Into::into); + }) + .map_err(Into::into); Ok(Response::Future(Box::new(fut))) } }