mirror of
https://github.com/mozilla-services/syncstorage-rs.git
synced 2026-05-05 12:16:21 +02:00
Merge pull request #108 from mozilla-services/feat/105
fix: add the batch extractor + handler
This commit is contained in:
commit
ca7b6dc9b3
@ -75,7 +75,7 @@ impl Db for MockDb {
|
||||
mock_db_method!(validate_batch, ValidateBatch);
|
||||
mock_db_method!(append_to_batch, AppendToBatch);
|
||||
mock_db_method!(get_batch, GetBatch, Option<results::GetBatch>);
|
||||
mock_db_method!(delete_batch, DeleteBatch);
|
||||
mock_db_method!(commit_batch, CommitBatch);
|
||||
}
|
||||
|
||||
unsafe impl Send for MockDb {}
|
||||
|
||||
@ -119,7 +119,7 @@ pub trait Db: Send + Debug {
|
||||
|
||||
fn get_batch(&self, params: params::GetBatch) -> DbFuture<Option<results::GetBatch>>;
|
||||
|
||||
fn delete_batch(&self, params: params::DeleteBatch) -> DbFuture<results::DeleteBatch>;
|
||||
fn commit_batch(&self, params: params::CommitBatch) -> DbFuture<results::CommitBatch>;
|
||||
|
||||
fn box_clone(&self) -> Box<dyn Db>;
|
||||
|
||||
@ -145,7 +145,8 @@ pub trait Db: Send + Debug {
|
||||
self.get_collection_timestamp(params::GetCollectionTimestamp {
|
||||
user_id,
|
||||
collection,
|
||||
}).or_else(|e| {
|
||||
})
|
||||
.or_else(|e| {
|
||||
if e.is_colllection_not_found() {
|
||||
Ok(SyncTimestamp::from_seconds(0f64))
|
||||
} else {
|
||||
@ -160,7 +161,8 @@ pub trait Db: Send + Debug {
|
||||
user_id,
|
||||
collection,
|
||||
id: bso,
|
||||
}).or_else(|e| {
|
||||
})
|
||||
.or_else(|e| {
|
||||
if e.is_colllection_not_found() {
|
||||
Ok(SyncTimestamp::from_seconds(0f64))
|
||||
} else {
|
||||
|
||||
@ -27,7 +27,8 @@ pub fn create(db: &MysqlDb, params: params::CreateBatch) -> Result<results::Crea
|
||||
batches::id.eq(×tamp),
|
||||
batches::bsos.eq(&bsos),
|
||||
batches::expiry.eq(timestamp + BATCH_LIFETIME),
|
||||
)).execute(&db.conn)?;
|
||||
))
|
||||
.execute(&db.conn)?;
|
||||
Ok(timestamp)
|
||||
}
|
||||
|
||||
@ -76,7 +77,7 @@ pub fn get(db: &MysqlDb, params: params::GetBatch) -> Result<Option<results::Get
|
||||
.optional()?)
|
||||
}
|
||||
|
||||
pub fn delete(db: &MysqlDb, params: params::DeleteBatch) -> Result<()> {
|
||||
fn delete(db: &MysqlDb, params: params::DeleteBatch) -> Result<()> {
|
||||
let user_id = params.user_id.legacy_id as i32;
|
||||
let collection_id = db.get_collection_id(¶ms.collection)?;
|
||||
diesel::delete(batches::table)
|
||||
@ -114,7 +115,8 @@ fn batch_string_to_bsos(bsos: &str) -> Result<Vec<params::PostCollectionBso>> {
|
||||
serde_json::from_str(line).map_err(|e| {
|
||||
DbError::internal(&format!("Couldn't deserialize batch::load_bsos bso: {}", e))
|
||||
})
|
||||
}).collect()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Serialize bsos into strings separated by newlines
|
||||
@ -125,7 +127,8 @@ fn bsos_to_batch_string(bsos: &[params::PostCollectionBso]) -> Result<String> {
|
||||
serde_json::to_string(bso).map_err(|e| {
|
||||
DbError::internal(&format!("Couldn't serialize batch::create bso: {}", e))
|
||||
})
|
||||
}).collect();
|
||||
})
|
||||
.collect();
|
||||
batch_strings.map(|bs| {
|
||||
format!(
|
||||
"{}{}",
|
||||
|
||||
@ -362,7 +362,8 @@ impl MysqlDb {
|
||||
bso::payload.eq(payload),
|
||||
bso::modified.eq(timestamp),
|
||||
bso::expiry.eq(timestamp + (ttl as i64 * 1000)),
|
||||
)).execute(&self.conn)?;
|
||||
))
|
||||
.execute(&self.conn)?;
|
||||
}
|
||||
self.touch_collection(user_id as u32, collection_id)
|
||||
})
|
||||
@ -388,7 +389,8 @@ impl MysqlDb {
|
||||
bso::payload,
|
||||
bso::sortindex,
|
||||
bso::expiry,
|
||||
)).filter(bso::user_id.eq(user_id))
|
||||
))
|
||||
.filter(bso::user_id.eq(user_id))
|
||||
.filter(bso::collection_id.eq(collection_id as i32)) // XXX:
|
||||
.filter(bso::expiry.gt(self.timestamp().as_i64()))
|
||||
.into_boxed();
|
||||
@ -461,7 +463,8 @@ impl MysqlDb {
|
||||
bso::payload,
|
||||
bso::sortindex,
|
||||
bso::expiry,
|
||||
)).filter(bso::user_id.eq(user_id as i32))
|
||||
))
|
||||
.filter(bso::user_id.eq(user_id as i32))
|
||||
.filter(bso::collection_id.eq(&collection_id))
|
||||
.filter(bso::id.eq(¶ms.id))
|
||||
.filter(bso::expiry.ge(self.timestamp().as_i64()))
|
||||
@ -584,7 +587,8 @@ impl MysqlDb {
|
||||
.into_iter()
|
||||
.map(|cr| {
|
||||
SyncTimestamp::from_i64(cr.modified).and_then(|ts| Ok((cr.collection_id, ts)))
|
||||
}).collect::<Result<HashMap<_, _>>>()?;
|
||||
})
|
||||
.collect::<Result<HashMap<_, _>>>()?;
|
||||
self.map_collection_names(modifieds)
|
||||
}
|
||||
|
||||
@ -597,7 +601,8 @@ impl MysqlDb {
|
||||
.remove(&id)
|
||||
.map(|name| (name, value))
|
||||
.ok_or_else(|| DbError::internal("load_collection_names get"))
|
||||
}).collect()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn load_collection_names<'a>(
|
||||
@ -693,7 +698,6 @@ impl MysqlDb {
|
||||
batch_db_method!(create_batch_sync, create, CreateBatch);
|
||||
batch_db_method!(validate_batch_sync, validate, ValidateBatch);
|
||||
batch_db_method!(append_to_batch_sync, append, AppendToBatch);
|
||||
batch_db_method!(delete_batch_sync, delete, DeleteBatch);
|
||||
batch_db_method!(commit_batch_sync, commit, CommitBatch);
|
||||
|
||||
pub fn get_batch_sync(&self, params: params::GetBatch) -> Result<Option<results::GetBatch>> {
|
||||
@ -803,7 +807,7 @@ impl Db for MysqlDb {
|
||||
GetBatch,
|
||||
Option<results::GetBatch>
|
||||
);
|
||||
sync_db_method!(delete_batch, delete_batch_sync, DeleteBatch);
|
||||
sync_db_method!(commit_batch, commit_batch_sync, CommitBatch);
|
||||
}
|
||||
|
||||
#[derive(Debug, QueryableByName)]
|
||||
|
||||
@ -21,11 +21,7 @@ pub fn ms_since_epoch() -> i64 {
|
||||
/// Internally represents a Sync timestamp as a u64 representing milliseconds since the epoch.
|
||||
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Deserialize, Serialize, FromSqlRow)]
|
||||
pub struct SyncTimestamp(
|
||||
#[serde(
|
||||
deserialize_with = "deserialize_ts",
|
||||
serialize_with = "serialize_ts"
|
||||
)]
|
||||
u64,
|
||||
#[serde(deserialize_with = "deserialize_ts", serialize_with = "serialize_ts")] u64,
|
||||
);
|
||||
|
||||
impl SyncTimestamp {
|
||||
@ -48,7 +44,8 @@ impl SyncTimestamp {
|
||||
} else {
|
||||
Ok(v)
|
||||
}
|
||||
}).map(|v: f64| (v * 1_000f64) as u64)
|
||||
})
|
||||
.map(|v: f64| (v * 1_000f64) as u64)
|
||||
.map(SyncTimestamp::from_milliseconds)
|
||||
}
|
||||
|
||||
|
||||
10
src/error.rs
10
src/error.rs
@ -85,7 +85,7 @@ impl ApiError {
|
||||
false
|
||||
}
|
||||
|
||||
fn is_conflict(&self) -> bool {
|
||||
pub fn is_conflict(&self) -> bool {
|
||||
match self.kind() {
|
||||
ApiErrorKind::Db(dbe) => match dbe.kind() {
|
||||
DbErrorKind::Conflict => return true,
|
||||
@ -99,7 +99,10 @@ impl ApiError {
|
||||
fn weave_error_code(&self) -> WeaveError {
|
||||
match self.kind() {
|
||||
ApiErrorKind::Validation(ver) => match ver.kind() {
|
||||
ValidationErrorKind::FromDetails(ref _description, ref location, name) => {
|
||||
ValidationErrorKind::FromDetails(ref description, ref location, name) => {
|
||||
if description == "size-limit-exceeded" {
|
||||
return WeaveError::SizeLimitExceeded;
|
||||
}
|
||||
let name = name.clone().unwrap_or("".to_owned());
|
||||
if *location == RequestErrorLocation::Body
|
||||
&& ["bso", "bsos"].contains(&name.as_str())
|
||||
@ -145,7 +148,8 @@ impl ResponseError for ApiError {
|
||||
HttpResponse::build(self.status)
|
||||
.if_true(self.is_conflict(), |resp| {
|
||||
resp.header("Retry-After", RETRY_AFTER.to_string());
|
||||
}).json(self.weave_error_code() as i32)
|
||||
})
|
||||
.json(self.weave_error_code() as i32)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -12,7 +12,6 @@ extern crate diesel_logger;
|
||||
extern crate diesel_migrations;
|
||||
extern crate docopt;
|
||||
extern crate env_logger;
|
||||
#[macro_use]
|
||||
extern crate failure;
|
||||
extern crate futures;
|
||||
#[macro_use]
|
||||
|
||||
@ -15,27 +15,35 @@ macro_rules! init_routes {
|
||||
($app:expr) => {
|
||||
$app.resource("/1.5/{uid}/info/collections", |r| {
|
||||
r.method(http::Method::GET).with(handlers::get_collections);
|
||||
}).resource("/1.5/{uid}/info/collection_counts", |r| {
|
||||
})
|
||||
.resource("/1.5/{uid}/info/collection_counts", |r| {
|
||||
r.method(http::Method::GET)
|
||||
.with(handlers::get_collection_counts);
|
||||
}).resource("/1.5/{uid}/info/collection_usage", |r| {
|
||||
})
|
||||
.resource("/1.5/{uid}/info/collection_usage", |r| {
|
||||
r.method(http::Method::GET)
|
||||
.with(handlers::get_collection_usage);
|
||||
}).resource("/1.5/{uid}/info/configuration", |r| {
|
||||
})
|
||||
.resource("/1.5/{uid}/info/configuration", |r| {
|
||||
r.method(http::Method::GET)
|
||||
.with(handlers::get_configuration);
|
||||
}).resource("/1.5/{uid}/info/quota", |r| {
|
||||
})
|
||||
.resource("/1.5/{uid}/info/quota", |r| {
|
||||
r.method(http::Method::GET).with(handlers::get_quota);
|
||||
}).resource("/1.5/{uid}", |r| {
|
||||
})
|
||||
.resource("/1.5/{uid}", |r| {
|
||||
r.method(http::Method::DELETE).with(handlers::delete_all);
|
||||
}).resource("/1.5/{uid}/storage", |r| {
|
||||
})
|
||||
.resource("/1.5/{uid}/storage", |r| {
|
||||
r.method(http::Method::DELETE).with(handlers::delete_all);
|
||||
}).resource("/1.5/{uid}/storage/{collection}", |r| {
|
||||
})
|
||||
.resource("/1.5/{uid}/storage/{collection}", |r| {
|
||||
r.method(http::Method::DELETE)
|
||||
.with(handlers::delete_collection);
|
||||
r.method(http::Method::GET).with(handlers::get_collection);
|
||||
r.method(http::Method::POST).with(handlers::post_collection);
|
||||
}).resource("/1.5/{uid}/storage/{collection}/{bso}", |r| {
|
||||
})
|
||||
.resource("/1.5/{uid}/storage/{collection}/{bso}", |r| {
|
||||
r.method(http::Method::DELETE).with(handlers::delete_bso);
|
||||
r.method(http::Method::GET).with(handlers::get_bso);
|
||||
r.method(http::Method::PUT).with(handlers::put_bso);
|
||||
@ -89,7 +97,8 @@ impl Server {
|
||||
};
|
||||
|
||||
build_app(state)
|
||||
}).bind(format!("127.0.0.1:{}", settings.port))
|
||||
})
|
||||
.bind(format!("127.0.0.1:{}", settings.port))
|
||||
.unwrap()
|
||||
.start();
|
||||
Ok(sys)
|
||||
|
||||
@ -64,7 +64,8 @@ fn create_request(server: &TestServer, method: http::Method, path: &str) -> Clie
|
||||
.set_header(
|
||||
"Authorization",
|
||||
create_hawk_header(method.as_str(), server.addr().port(), path),
|
||||
).finish()
|
||||
)
|
||||
.finish()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@ -291,13 +292,15 @@ fn invalid_content_type() {
|
||||
server.addr().port(),
|
||||
"/1.5/42/storage/bookmarks/wibble",
|
||||
),
|
||||
).set_header("Content-Type", "application/javascript")
|
||||
)
|
||||
.set_header("Content-Type", "application/javascript")
|
||||
.json(BsoBody {
|
||||
id: Some("wibble".to_string()),
|
||||
sortindex: Some(0),
|
||||
payload: Some("wibble".to_string()),
|
||||
ttl: Some(31536000),
|
||||
}).unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let response = server.execute(request.send()).unwrap();
|
||||
assert_eq!(response.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
|
||||
@ -307,13 +310,15 @@ fn invalid_content_type() {
|
||||
.set_header(
|
||||
"Authorization",
|
||||
create_hawk_header("POST", server.addr().port(), "/1.5/42/storage/bookmarks"),
|
||||
).set_header("Content-Type", "application/javascript")
|
||||
)
|
||||
.set_header("Content-Type", "application/javascript")
|
||||
.json(json!([BsoBody {
|
||||
id: Some("wibble".to_string()),
|
||||
sortindex: Some(0),
|
||||
payload: Some("wibble".to_string()),
|
||||
ttl: Some(31536000),
|
||||
}])).unwrap();
|
||||
}]))
|
||||
.unwrap();
|
||||
|
||||
let response = server.execute(request.send()).unwrap();
|
||||
assert_eq!(response.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
|
||||
|
||||
@ -2,8 +2,7 @@
|
||||
//!
|
||||
//! Handles ensuring the header's, body, and query parameters are correct, extraction to
|
||||
//! relevant types, and failing correctly with the appropriate errors if issues arise.
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::{self, collections::HashMap, str::FromStr};
|
||||
|
||||
use actix_web::http::header::{HeaderValue, ACCEPT, CONTENT_TYPE};
|
||||
use actix_web::{
|
||||
@ -17,7 +16,7 @@ use serde::de::{Deserialize, Deserializer, Error as SerdeError};
|
||||
use serde_json::Value;
|
||||
use validator::{Validate, ValidationError};
|
||||
|
||||
use db::{util::SyncTimestamp, Db, Sorting};
|
||||
use db::{util::SyncTimestamp, Db, DbError, DbErrorKind, Sorting};
|
||||
use error::{ApiError, ApiResult};
|
||||
use server::ServerState;
|
||||
use web::{auth::HawkPayload, error::ValidationErrorKind};
|
||||
@ -34,6 +33,7 @@ lazy_static! {
|
||||
Regex::new(r#"IV":\s*"AAAAAAAAAAAAAAAAAAAAAA=="#).unwrap();
|
||||
static ref VALID_ID_REGEX: Regex = Regex::new(r"^[ -~]{1,64}$").unwrap();
|
||||
static ref VALID_COLLECTION_ID_REGEX: Regex = Regex::new(r"^[a-zA-Z0-9._-]{1,32}$").unwrap();
|
||||
static ref TRUE_REGEX: Regex = Regex::new("^(?i)true$").unwrap();
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@ -106,7 +106,8 @@ impl FromRequest<ServerState> for BsoBodies {
|
||||
"Invalid Content-Type".to_owned(),
|
||||
RequestErrorLocation::Header,
|
||||
Some("Content-Type".to_owned()),
|
||||
).into(),
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
@ -125,7 +126,8 @@ impl FromRequest<ServerState> for BsoBodies {
|
||||
"Mimetype/encoding/content-length error".to_owned(),
|
||||
RequestErrorLocation::Header,
|
||||
None,
|
||||
).into(),
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
};
|
||||
|
||||
@ -135,7 +137,8 @@ impl FromRequest<ServerState> for BsoBodies {
|
||||
"Invalid JSON in request body".to_owned(),
|
||||
RequestErrorLocation::Body,
|
||||
Some("bsos".to_owned()),
|
||||
).into()
|
||||
)
|
||||
.into()
|
||||
}
|
||||
|
||||
// Define a new bool to check from a static closure to release the reference on the
|
||||
@ -201,7 +204,8 @@ impl FromRequest<ServerState> for BsoBodies {
|
||||
"Input BSO has duplicate ID".to_owned(),
|
||||
RequestErrorLocation::Body,
|
||||
Some("bsos".to_owned()),
|
||||
).into(),
|
||||
)
|
||||
.into(),
|
||||
);
|
||||
} else {
|
||||
bso_ids.push(id.clone());
|
||||
@ -213,7 +217,8 @@ impl FromRequest<ServerState> for BsoBodies {
|
||||
"Input BSO has no ID".to_owned(),
|
||||
RequestErrorLocation::Body,
|
||||
Some("bsos".to_owned()),
|
||||
).into(),
|
||||
)
|
||||
.into(),
|
||||
);
|
||||
};
|
||||
match BatchBsoBody::from_raw_bso(&bso) {
|
||||
@ -267,7 +272,8 @@ impl FromRequest<ServerState> for BsoBody {
|
||||
"Invalid Content-Type".to_owned(),
|
||||
RequestErrorLocation::Header,
|
||||
Some("Content-Type".to_owned()),
|
||||
).into(),
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
@ -282,16 +288,19 @@ impl FromRequest<ServerState> for BsoBody {
|
||||
e.to_string(),
|
||||
RequestErrorLocation::Body,
|
||||
Some("bso".to_owned()),
|
||||
).into();
|
||||
)
|
||||
.into();
|
||||
err.into()
|
||||
}).and_then(move |bso: Json<BsoBody>| {
|
||||
})
|
||||
.and_then(move |bso: Json<BsoBody>| {
|
||||
// Check the max payload size manually with our desired limit
|
||||
if bso.payload.as_ref().map(|s| s.len()).unwrap_or_default() > max_payload_size {
|
||||
let err: ApiError = ValidationErrorKind::FromDetails(
|
||||
"payload too large".to_owned(),
|
||||
RequestErrorLocation::Body,
|
||||
Some("bso".to_owned()),
|
||||
).into();
|
||||
)
|
||||
.into();
|
||||
return future::err(err.into());
|
||||
}
|
||||
if let Err(e) = bso.validate() {
|
||||
@ -330,7 +339,8 @@ impl FromRequest<ServerState> for BsoParam {
|
||||
RequestErrorLocation::Path,
|
||||
Some("bso".to_owned()),
|
||||
)
|
||||
})?.into_inner();
|
||||
})?
|
||||
.into_inner();
|
||||
bso.validate().map_err(|e| {
|
||||
ValidationErrorKind::FromValidationErrors(e, RequestErrorLocation::Path)
|
||||
})?;
|
||||
@ -362,7 +372,8 @@ impl FromRequest<ServerState> for CollectionParam {
|
||||
RequestErrorLocation::Path,
|
||||
Some("collection".to_owned()),
|
||||
)
|
||||
})?.into_inner();
|
||||
})?
|
||||
.into_inner();
|
||||
collection.validate().map_err(|e| {
|
||||
ValidationErrorKind::FromValidationErrors(e, RequestErrorLocation::Path)
|
||||
})?;
|
||||
@ -426,7 +437,8 @@ impl FromRequest<ServerState> for CollectionRequest {
|
||||
"Invalid accept".to_string(),
|
||||
RequestErrorLocation::Header,
|
||||
Some("accept".to_string()),
|
||||
).into());
|
||||
)
|
||||
.into());
|
||||
}
|
||||
None => ReplyFormat::Json,
|
||||
};
|
||||
@ -450,6 +462,7 @@ pub struct CollectionPostRequest {
|
||||
pub user_id: HawkIdentifier,
|
||||
pub query: BsoQueryParams,
|
||||
pub bsos: BsoBodies,
|
||||
pub batch: Option<BatchRequest>,
|
||||
}
|
||||
|
||||
impl FromRequest<ServerState> for CollectionPostRequest {
|
||||
@ -463,6 +476,7 @@ impl FromRequest<ServerState> for CollectionPostRequest {
|
||||
/// - If the collection is 'crypto', known bad payloads are checked for
|
||||
/// - Any valid BSO's beyond `BATCH_MAX_RECORDS` are moved to invalid
|
||||
fn from_request(req: &HttpRequest<ServerState>, _: &Self::Config) -> Self::Result {
|
||||
let req = req.clone();
|
||||
let max_post_records = req.state().limits.max_post_records as i64;
|
||||
let fut = <(
|
||||
HawkIdentifier,
|
||||
@ -470,7 +484,8 @@ impl FromRequest<ServerState> for CollectionPostRequest {
|
||||
CollectionParam,
|
||||
BsoQueryParams,
|
||||
BsoBodies,
|
||||
)>::extract(req).and_then(move |(user_id, db, collection, query, mut bsos)| {
|
||||
)>::extract(&req)
|
||||
.and_then(move |(user_id, db, collection, query, mut bsos)| {
|
||||
let collection = collection.collection.clone();
|
||||
if collection == "crypto" {
|
||||
// Verify the client didn't mess up the crypto if we have a payload
|
||||
@ -482,7 +497,8 @@ impl FromRequest<ServerState> for CollectionPostRequest {
|
||||
"Known-bad BSO payload".to_owned(),
|
||||
RequestErrorLocation::Body,
|
||||
Some("bsos".to_owned()),
|
||||
).into(),
|
||||
)
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -499,12 +515,18 @@ impl FromRequest<ServerState> for CollectionPostRequest {
|
||||
}
|
||||
}
|
||||
|
||||
let batch = match <Option<BatchRequest>>::extract(&req) {
|
||||
Ok(batch) => batch,
|
||||
Err(e) => return future::err(e.into()),
|
||||
};
|
||||
|
||||
future::ok(CollectionPostRequest {
|
||||
collection,
|
||||
db,
|
||||
user_id,
|
||||
query,
|
||||
bsos,
|
||||
batch,
|
||||
})
|
||||
});
|
||||
|
||||
@ -571,7 +593,8 @@ impl FromRequest<ServerState> for BsoPutRequest {
|
||||
BsoQueryParams,
|
||||
BsoParam,
|
||||
BsoBody,
|
||||
)>::extract(req).and_then(|(user_id, db, collection, query, bso, body)| {
|
||||
)>::extract(req)
|
||||
.and_then(|(user_id, db, collection, query, bso, body)| {
|
||||
let collection = collection.collection.clone();
|
||||
if collection == "crypto" {
|
||||
// Verify the client didn't mess up the crypto if we have a payload
|
||||
@ -582,7 +605,8 @@ impl FromRequest<ServerState> for BsoPutRequest {
|
||||
"Known-bad BSO payload".to_owned(),
|
||||
RequestErrorLocation::Body,
|
||||
Some("bsos".to_owned()),
|
||||
).into(),
|
||||
)
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -688,11 +712,11 @@ impl FromRequest<ServerState> for Box<dyn Db> {
|
||||
#[serde(default)]
|
||||
pub struct BsoQueryParams {
|
||||
/// lower-bound on last-modified time
|
||||
#[serde(deserialize_with = "deserialize_sync_timestamp",)]
|
||||
#[serde(deserialize_with = "deserialize_sync_timestamp")]
|
||||
pub newer: Option<SyncTimestamp>,
|
||||
|
||||
/// upper-bound on last-modified time
|
||||
#[serde(deserialize_with = "deserialize_sync_timestamp",)]
|
||||
#[serde(deserialize_with = "deserialize_sync_timestamp")]
|
||||
pub older: Option<SyncTimestamp>,
|
||||
|
||||
/// order in which to return results (string)
|
||||
@ -706,12 +730,12 @@ pub struct BsoQueryParams {
|
||||
pub offset: Option<u64>,
|
||||
|
||||
/// a comma-separated list of BSO ids (list of strings)
|
||||
#[validate(custom = "validate_qs_ids")]
|
||||
#[serde(deserialize_with = "deserialize_comma_sep_string", default)]
|
||||
#[validate(custom = "validate_qs_ids")]
|
||||
pub ids: Vec<String>,
|
||||
|
||||
// flag, whether to include full bodies (bool)
|
||||
#[serde(deserialize_with = "deserialize_present_value",)]
|
||||
#[serde(deserialize_with = "deserialize_present_value")]
|
||||
pub full: bool,
|
||||
}
|
||||
|
||||
@ -729,7 +753,8 @@ impl FromRequest<ServerState> for BsoQueryParams {
|
||||
RequestErrorLocation::QueryString,
|
||||
None,
|
||||
)
|
||||
})?.into_inner();
|
||||
})?
|
||||
.into_inner();
|
||||
params.validate().map_err(|e| {
|
||||
ValidationErrorKind::FromValidationErrors(e, RequestErrorLocation::QueryString)
|
||||
})?;
|
||||
@ -737,6 +762,110 @@ impl FromRequest<ServerState> for BsoQueryParams {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Deserialize, Validate)]
|
||||
#[serde(default)]
|
||||
pub struct BatchParams {
|
||||
pub batch: Option<String>,
|
||||
#[validate(custom = "validate_qs_commit")]
|
||||
pub commit: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Deserialize)]
|
||||
pub struct BatchRequest {
|
||||
pub id: Option<i64>,
|
||||
pub commit: bool,
|
||||
}
|
||||
|
||||
impl FromRequest<ServerState> for Option<BatchRequest> {
|
||||
type Config = ();
|
||||
type Result = ApiResult<Option<BatchRequest>>;
|
||||
|
||||
fn from_request(req: &HttpRequest<ServerState>, _: &Self::Config) -> Self::Result {
|
||||
let params = Query::<BatchParams>::from_request(req, &())
|
||||
.map_err(|e| {
|
||||
ValidationErrorKind::FromDetails(
|
||||
e.to_string(),
|
||||
RequestErrorLocation::QueryString,
|
||||
None,
|
||||
)
|
||||
})?
|
||||
.into_inner();
|
||||
|
||||
let limits = &req.state().limits;
|
||||
let checks = [
|
||||
("X-Weave-Records", limits.max_post_records),
|
||||
("X-Weave-Bytes", limits.max_post_bytes),
|
||||
("X-Weave-Total-Records", limits.max_total_records),
|
||||
("X-Weave-Total-Bytes", limits.max_total_bytes),
|
||||
];
|
||||
for (header, limit) in &checks {
|
||||
let value = match req.headers().get(*header) {
|
||||
Some(value) => value.to_str().map_err(|e| {
|
||||
let err: ApiError = ValidationErrorKind::FromDetails(
|
||||
e.to_string(),
|
||||
RequestErrorLocation::Header,
|
||||
Some((*header).to_owned()),
|
||||
)
|
||||
.into();
|
||||
err
|
||||
})?,
|
||||
None => continue,
|
||||
};
|
||||
let count = value.parse::<(u32)>().map_err(|_| {
|
||||
let err: ApiError = ValidationErrorKind::FromDetails(
|
||||
format!("Invalid integer value: {}", value),
|
||||
RequestErrorLocation::Header,
|
||||
Some((*header).to_owned()),
|
||||
)
|
||||
.into();
|
||||
err
|
||||
})?;
|
||||
if count > *limit {
|
||||
return Err(ValidationErrorKind::FromDetails(
|
||||
"size-limit-exceeded".to_owned(),
|
||||
RequestErrorLocation::Header,
|
||||
None,
|
||||
)
|
||||
.into());
|
||||
}
|
||||
}
|
||||
|
||||
if params.batch.is_none() && params.commit.is_none() {
|
||||
// No batch options requested
|
||||
return Ok(None);
|
||||
} else if params.batch.is_none() {
|
||||
// commit w/ no batch ID is an error
|
||||
let err: DbError = DbErrorKind::BatchNotFound.into();
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
params.validate().map_err(|e| {
|
||||
ValidationErrorKind::FromValidationErrors(e, RequestErrorLocation::QueryString)
|
||||
})?;
|
||||
|
||||
let id = match params.batch {
|
||||
None => None,
|
||||
Some(ref batch) if batch == "" || TRUE_REGEX.is_match(batch) => None,
|
||||
Some(ref batch) => {
|
||||
let bytes = base64::decode(batch).unwrap_or(batch.as_bytes().to_vec());
|
||||
let decoded = std::str::from_utf8(&bytes).unwrap_or(batch);
|
||||
Some(decoded.parse::<i64>().map_err(|_| {
|
||||
ValidationErrorKind::FromDetails(
|
||||
format!(r#"Invalid batch ID: "{}""#, batch),
|
||||
RequestErrorLocation::QueryString,
|
||||
Some("batch".to_owned()),
|
||||
)
|
||||
})?)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Some(BatchRequest {
|
||||
id,
|
||||
commit: params.commit.is_some(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// PreCondition Header
|
||||
///
|
||||
/// It's valid to include a X-If-Modified-Since or X-If-Unmodified-Since header but not
|
||||
@ -783,16 +912,20 @@ impl FromRequest<ServerState> for Option<PreConditionHeader> {
|
||||
e.to_string(),
|
||||
RequestErrorLocation::Header,
|
||||
Some(field_name.to_owned()),
|
||||
).into()
|
||||
}).and_then(|v| {
|
||||
)
|
||||
.into()
|
||||
})
|
||||
.and_then(|v| {
|
||||
SyncTimestamp::from_header(v).map_err(|e| {
|
||||
ValidationErrorKind::FromDetails(
|
||||
e.to_string(),
|
||||
RequestErrorLocation::Header,
|
||||
Some(field_name.to_owned()),
|
||||
).into()
|
||||
)
|
||||
.into()
|
||||
})
|
||||
}).map(|v| {
|
||||
})
|
||||
.map(|v| {
|
||||
let header = if field_name == "X-If-Modified-Since" {
|
||||
PreConditionHeader::IfModifiedSince(v)
|
||||
} else {
|
||||
@ -844,6 +977,17 @@ fn validate_qs_ids(ids: &Vec<String>) -> Result<(), ValidationError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Verifies the batch commit field is valid
|
||||
fn validate_qs_commit(commit: &String) -> Result<(), ValidationError> {
|
||||
if !TRUE_REGEX.is_match(commit) {
|
||||
return Err(request_error(
|
||||
r#"commit parameter must be "true" to apply batches"#,
|
||||
RequestErrorLocation::QueryString,
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Verifies the BSO sortindex is in the valid range
|
||||
fn validate_body_bso_sortindex(sort: i32) -> Result<(), ValidationError> {
|
||||
if BSO_MIN_SORTINDEX_VALUE <= sort && sort <= BSO_MAX_SORTINDEX_VALUE {
|
||||
@ -922,8 +1066,8 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use actix_web::test::TestRequest;
|
||||
use actix_web::HttpResponse;
|
||||
use actix_web::{http::Method, Binary, Body};
|
||||
use actix_web::{Error, HttpResponse};
|
||||
use base64;
|
||||
use hawk::{Credentials, Key, RequestBuilder};
|
||||
use hmac::{Hmac, Mac};
|
||||
@ -1005,6 +1149,28 @@ mod tests {
|
||||
format!("Hawk {}", request.make_header(&credentials).unwrap())
|
||||
}
|
||||
|
||||
fn post_collection(qs: &str, body: &serde_json::Value) -> Result<CollectionPostRequest, Error> {
|
||||
let payload = HawkPayload::test_default();
|
||||
let state = make_state();
|
||||
let path = format!(
|
||||
"/storage/1.5/1/storage/tabs{}{}",
|
||||
if !qs.is_empty() { "?" } else { "" },
|
||||
qs
|
||||
);
|
||||
let header = create_valid_hawk_header(&payload, &state, "POST", &path, "localhost", 5000);
|
||||
let req = TestRequest::with_state(state)
|
||||
.header("authorization", header)
|
||||
.header("content-type", "application/json")
|
||||
.method(Method::POST)
|
||||
.uri(&format!("http://localhost:5000{}", path))
|
||||
.set_payload(body.to_string())
|
||||
.param("uid", "1")
|
||||
.param("collection", "tabs")
|
||||
.finish();
|
||||
req.extensions_mut().insert(make_db());
|
||||
CollectionPostRequest::extract(&req).wait()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalid_query_args() {
|
||||
let req = TestRequest::with_state(make_state())
|
||||
@ -1250,70 +1416,79 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_valid_collection_post_request() {
|
||||
let payload = HawkPayload::test_default();
|
||||
let state = make_state();
|
||||
let header = create_valid_hawk_header(
|
||||
&payload,
|
||||
&state,
|
||||
"POST",
|
||||
"/storage/1.5/1/storage/tabs",
|
||||
"localhost",
|
||||
5000,
|
||||
);
|
||||
// Batch requests require id's on each BSO
|
||||
let bso_body = json!([
|
||||
{"id": "123", "payload": "xxx", "sortindex": 23},
|
||||
{"id": "456", "payload": "xxxasdf", "sortindex": 23}
|
||||
]);
|
||||
let req = TestRequest::with_state(state)
|
||||
.header("authorization", header)
|
||||
.header("content-type", "application/json")
|
||||
.method(Method::POST)
|
||||
.uri("http://localhost:5000/storage/1.5/1/storage/tabs")
|
||||
.set_payload(bso_body.to_string())
|
||||
.param("uid", "1")
|
||||
.param("collection", "tabs")
|
||||
.finish();
|
||||
req.extensions_mut().insert(make_db());
|
||||
let result = CollectionPostRequest::extract(&req).wait().unwrap();
|
||||
let result = post_collection("", &bso_body).unwrap();
|
||||
assert_eq!(result.user_id.legacy_id, 1);
|
||||
assert_eq!(&result.collection, "tabs");
|
||||
assert_eq!(result.bsos.valid.len(), 2);
|
||||
assert!(result.batch.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalid_collection_post_request() {
|
||||
let payload = HawkPayload::test_default();
|
||||
let state = make_state();
|
||||
let header = create_valid_hawk_header(
|
||||
&payload,
|
||||
&state,
|
||||
"POST",
|
||||
"/storage/1.5/1/storage/tabs",
|
||||
"localhost",
|
||||
5000,
|
||||
);
|
||||
// Add extra fields, these will be invalid
|
||||
let bso_body = json!([
|
||||
{"id": "1", "sortindex": 23, "jump": 1},
|
||||
{"id": "2", "sortindex": -99, "hop": "low"}
|
||||
]);
|
||||
let req = TestRequest::with_state(state)
|
||||
.header("authorization", header)
|
||||
.header("content-type", "application/json")
|
||||
.method(Method::POST)
|
||||
.uri("http://localhost:5000/storage/1.5/1/storage/tabs")
|
||||
.set_payload(bso_body.to_string())
|
||||
.param("uid", "1")
|
||||
.param("collection", "tabs")
|
||||
.finish();
|
||||
req.extensions_mut().insert(make_db());
|
||||
let result = CollectionPostRequest::extract(&req).wait().unwrap();
|
||||
let result = post_collection("", &bso_body).unwrap();
|
||||
assert_eq!(result.user_id.legacy_id, 1);
|
||||
assert_eq!(&result.collection, "tabs");
|
||||
assert_eq!(result.bsos.invalid.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_valid_collection_batch_post_request() {
|
||||
// If the "batch" parameter is has no value or has a value of "true"
|
||||
// then a new batch will be created.
|
||||
let bso_body = json!([
|
||||
{"id": "123", "payload": "xxx", "sortindex": 23},
|
||||
{"id": "456", "payload": "xxxasdf", "sortindex": 23}
|
||||
]);
|
||||
let result = post_collection("batch=True", &bso_body).unwrap();
|
||||
assert_eq!(result.user_id.legacy_id, 1);
|
||||
assert_eq!(&result.collection, "tabs");
|
||||
assert_eq!(result.bsos.valid.len(), 2);
|
||||
let batch = result.batch.unwrap();
|
||||
assert_eq!(batch.id, None);
|
||||
assert_eq!(batch.commit, false);
|
||||
|
||||
let result = post_collection("batch", &bso_body).unwrap();
|
||||
let batch = result.batch.unwrap();
|
||||
assert_eq!(batch.id, None);
|
||||
assert_eq!(batch.commit, false);
|
||||
|
||||
let result = post_collection("batch=MTI%3D&commit=true", &bso_body).unwrap();
|
||||
let batch = result.batch.unwrap();
|
||||
assert_eq!(batch.id, Some(12));
|
||||
assert_eq!(batch.commit, true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalid_collection_batch_post_request() {
|
||||
let bso_body = json!([
|
||||
{"id": "123", "payload": "xxx", "sortindex": 23},
|
||||
{"id": "456", "payload": "xxxasdf", "sortindex": 23}
|
||||
]);
|
||||
let result = post_collection("batch=sammich", &bso_body);
|
||||
assert!(result.is_err());
|
||||
let response: HttpResponse = result.err().unwrap().into();
|
||||
assert_eq!(response.status(), 400);
|
||||
let body = extract_body_as_str(&response);
|
||||
assert_eq!(body, "0");
|
||||
|
||||
let result = post_collection("commit=true", &bso_body);
|
||||
assert!(result.is_err());
|
||||
let response: HttpResponse = result.err().unwrap().into();
|
||||
assert_eq!(response.status(), 400);
|
||||
let body = extract_body_as_str(&response);
|
||||
assert_eq!(body, "0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalid_precondition_headers() {
|
||||
fn assert_invalid_header(
|
||||
|
||||
@ -2,10 +2,10 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use actix_web::{http::StatusCode, FutureResponse, HttpResponse, State};
|
||||
use futures::future::{self, Future};
|
||||
use futures::future::{self, Either, Future};
|
||||
use serde::Serialize;
|
||||
|
||||
use db::{params, results::Paginated};
|
||||
use db::{params, results::Paginated, DbError, DbErrorKind};
|
||||
use error::ApiError;
|
||||
use server::ServerState;
|
||||
use web::extractors::{
|
||||
@ -98,12 +98,14 @@ pub fn delete_collection(coll: CollectionRequest) -> FutureResponse<HttpResponse
|
||||
} else {
|
||||
Box::new(future::err(e))
|
||||
}
|
||||
}).map_err(From::from)
|
||||
})
|
||||
.map_err(From::from)
|
||||
.map(move |result| {
|
||||
HttpResponse::Ok()
|
||||
.if_true(delete_bsos, |resp| {
|
||||
resp.header("X-Last-Modified", result.as_header());
|
||||
}).json(result)
|
||||
})
|
||||
.json(result)
|
||||
}),
|
||||
)
|
||||
}
|
||||
@ -138,13 +140,15 @@ where
|
||||
} else {
|
||||
Err(e)
|
||||
}
|
||||
}).map_err(From::from)
|
||||
})
|
||||
.map_err(From::from)
|
||||
.and_then(|result| {
|
||||
coll.db
|
||||
.extract_resource(coll.user_id, Some(coll.collection), None)
|
||||
.map_err(From::from)
|
||||
.map(move |ts| (result, ts))
|
||||
}).map(move |(result, ts)| {
|
||||
})
|
||||
.map(move |(result, ts)| {
|
||||
let mut builder = HttpResponse::build(StatusCode::OK);
|
||||
let resp = builder
|
||||
.header("X-Last-Modified", ts.as_header())
|
||||
@ -172,6 +176,9 @@ where
|
||||
}
|
||||
|
||||
pub fn post_collection(coll: CollectionPostRequest) -> FutureResponse<HttpResponse> {
|
||||
if coll.batch.is_some() {
|
||||
return post_collection_batch(coll);
|
||||
}
|
||||
Box::new(
|
||||
coll.db
|
||||
.post_bsos(params::PostBsos {
|
||||
@ -179,7 +186,8 @@ pub fn post_collection(coll: CollectionPostRequest) -> FutureResponse<HttpRespon
|
||||
collection: coll.collection,
|
||||
bsos: coll.bsos.valid.into_iter().map(From::from).collect(),
|
||||
failed: coll.bsos.invalid,
|
||||
}).map_err(From::from)
|
||||
})
|
||||
.map_err(From::from)
|
||||
.map(|result| {
|
||||
HttpResponse::build(StatusCode::OK)
|
||||
.header("X-Last-Modified", result.modified.as_header())
|
||||
@ -188,6 +196,119 @@ pub fn post_collection(coll: CollectionPostRequest) -> FutureResponse<HttpRespon
|
||||
)
|
||||
}
|
||||
|
||||
pub fn post_collection_batch(coll: CollectionPostRequest) -> FutureResponse<HttpResponse> {
|
||||
// Bail early if we have nonsensical arguments
|
||||
let breq = match coll.batch.clone() {
|
||||
Some(breq) => breq,
|
||||
None => {
|
||||
let err: DbError = DbErrorKind::BatchNotFound.into();
|
||||
let err: ApiError = err.into();
|
||||
return Box::new(future::err(err.into()));
|
||||
}
|
||||
};
|
||||
|
||||
let fut = if let Some(id) = breq.id {
|
||||
// Validate the batch before attempting a full append (for efficiency)
|
||||
Either::A(
|
||||
coll.db
|
||||
.validate_batch(params::ValidateBatch {
|
||||
user_id: coll.user_id.clone(),
|
||||
collection: coll.collection.clone(),
|
||||
id,
|
||||
})
|
||||
.and_then(move |is_valid| {
|
||||
if is_valid {
|
||||
Box::new(future::ok(id))
|
||||
} else {
|
||||
let err: DbError = DbErrorKind::BatchNotFound.into();
|
||||
Box::new(future::err(err.into()))
|
||||
}
|
||||
}),
|
||||
)
|
||||
} else {
|
||||
Either::B(coll.db.create_batch(params::CreateBatch {
|
||||
user_id: coll.user_id.clone(),
|
||||
collection: coll.collection.clone(),
|
||||
bsos: vec![],
|
||||
}))
|
||||
};
|
||||
|
||||
let db = coll.db.clone();
|
||||
let user_id = coll.user_id.clone();
|
||||
let collection = coll.collection.clone();
|
||||
|
||||
let fut = fut
|
||||
.and_then(move |id| {
|
||||
let mut success = vec![];
|
||||
let mut failed = coll.bsos.invalid.clone();
|
||||
let bso_ids: Vec<_> = coll.bsos.valid.iter().map(|bso| bso.id.clone()).collect();
|
||||
|
||||
coll.db
|
||||
.append_to_batch(params::AppendToBatch {
|
||||
user_id: coll.user_id.clone(),
|
||||
collection: coll.collection.clone(),
|
||||
id,
|
||||
bsos: coll.bsos.valid.into_iter().map(From::from).collect(),
|
||||
})
|
||||
.then(move |result| {
|
||||
match result {
|
||||
Ok(_) => success.extend(bso_ids),
|
||||
Err(e) => {
|
||||
// NLL: not a guard as: (E0008) "moves value into
|
||||
// pattern guard"
|
||||
if e.is_conflict() {
|
||||
return future::err(e);
|
||||
}
|
||||
failed.extend(bso_ids.into_iter().map(|id| (id, "db error".to_owned())))
|
||||
}
|
||||
};
|
||||
future::ok((id, success, failed))
|
||||
})
|
||||
})
|
||||
.map_err(From::from);
|
||||
|
||||
Box::new(fut.and_then(move |(id, success, failed)| {
|
||||
let mut resp = json!({
|
||||
"success": success,
|
||||
"failed": failed,
|
||||
});
|
||||
|
||||
if !breq.commit {
|
||||
resp["batch"] = json!(base64::encode(&id.to_string()));
|
||||
return Either::A(future::ok(HttpResponse::Accepted().json(resp)));
|
||||
}
|
||||
|
||||
let fut = db
|
||||
.get_batch(params::GetBatch {
|
||||
user_id: user_id.clone(),
|
||||
collection: collection.clone(),
|
||||
id,
|
||||
})
|
||||
.and_then(move |batch| {
|
||||
// TODO: validate *actual* sizes of the batch items
|
||||
// (max_total_records, max_total_bytes)
|
||||
if let Some(batch) = batch {
|
||||
db.commit_batch(params::CommitBatch {
|
||||
user_id: user_id.clone(),
|
||||
collection: collection.clone(),
|
||||
batch,
|
||||
})
|
||||
} else {
|
||||
let err: DbError = DbErrorKind::BatchNotFound.into();
|
||||
Box::new(future::err(err.into()))
|
||||
}
|
||||
})
|
||||
.map_err(From::from)
|
||||
.map(|result| {
|
||||
resp["modified"] = json!(result.modified);
|
||||
HttpResponse::build(StatusCode::OK)
|
||||
.header("X-Last-Modified", result.modified.as_header())
|
||||
.json(resp)
|
||||
});
|
||||
Either::B(fut)
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn delete_bso(bso_req: BsoRequest) -> FutureResponse<HttpResponse> {
|
||||
Box::new(
|
||||
bso_req
|
||||
@ -196,7 +317,8 @@ pub fn delete_bso(bso_req: BsoRequest) -> FutureResponse<HttpResponse> {
|
||||
user_id: bso_req.user_id,
|
||||
collection: bso_req.collection,
|
||||
id: bso_req.bso,
|
||||
}).map_err(From::from)
|
||||
})
|
||||
.map_err(From::from)
|
||||
.map(|result| HttpResponse::Ok().json(json!({ "modified": result }))),
|
||||
)
|
||||
}
|
||||
@ -209,7 +331,8 @@ pub fn get_bso(bso_req: BsoRequest) -> FutureResponse<HttpResponse> {
|
||||
user_id: bso_req.user_id,
|
||||
collection: bso_req.collection,
|
||||
id: bso_req.bso,
|
||||
}).map_err(From::from)
|
||||
})
|
||||
.map_err(From::from)
|
||||
.map(|result| {
|
||||
result.map_or_else(
|
||||
|| HttpResponse::NotFound().finish(),
|
||||
@ -230,7 +353,8 @@ pub fn put_bso(bso_req: BsoPutRequest) -> FutureResponse<HttpResponse> {
|
||||
sortindex: bso_req.body.sortindex,
|
||||
payload: bso_req.body.payload,
|
||||
ttl: bso_req.body.ttl,
|
||||
}).map_err(From::from)
|
||||
})
|
||||
.map_err(From::from)
|
||||
.map(|result| {
|
||||
HttpResponse::build(StatusCode::OK)
|
||||
.header("X-Last-Modified", result.as_header())
|
||||
|
||||
@ -46,14 +46,17 @@ impl<S> Middleware<S> for WeaveTimestamp {
|
||||
let error: ApiError = ApiErrorKind::Internal(format!(
|
||||
"Invalid X-Last-Modified response header: {}",
|
||||
e
|
||||
)).into();
|
||||
))
|
||||
.into();
|
||||
error
|
||||
})?.parse::<f64>()
|
||||
})?
|
||||
.parse::<f64>()
|
||||
.map_err(|e| {
|
||||
let error: ApiError = ApiErrorKind::Internal(format!(
|
||||
"Invalid X-Last-Modified response header: {}",
|
||||
e
|
||||
)).into();
|
||||
))
|
||||
.into();
|
||||
error
|
||||
})?;
|
||||
if resp_ts > ts {
|
||||
@ -70,7 +73,8 @@ impl<S> Middleware<S> for WeaveTimestamp {
|
||||
let error: ApiError = ApiErrorKind::Internal(format!(
|
||||
"Invalid X-Weave-Timestamp response header: {}",
|
||||
e
|
||||
)).into();
|
||||
))
|
||||
.into();
|
||||
error
|
||||
})?,
|
||||
);
|
||||
@ -108,7 +112,8 @@ impl Middleware<ServerState> for DbTransaction {
|
||||
match *req.method() {
|
||||
Method::GET | Method::HEAD => db.lock_for_read(lc),
|
||||
_ => db.lock_for_write(lc),
|
||||
}.or_else(move |e| {
|
||||
}
|
||||
.or_else(move |e| {
|
||||
// Middleware::response won't be called: rollback immediately
|
||||
db2.rollback().and_then(|_| future::err(e))
|
||||
}),
|
||||
@ -123,7 +128,8 @@ impl Middleware<ServerState> for DbTransaction {
|
||||
req.extensions_mut().insert((db, in_transaction));
|
||||
future::ok(None)
|
||||
})
|
||||
}).map_err(Into::into);
|
||||
})
|
||||
.map_err(Into::into);
|
||||
Ok(Started::Future(Box::new(fut)))
|
||||
}
|
||||
|
||||
@ -185,7 +191,8 @@ impl Middleware<ServerState> for PreConditionCheck {
|
||||
.header("X-Last-Modified", resource_ts.as_header())
|
||||
.body(""); // 304 can't return any content
|
||||
future::ok(Some(resp))
|
||||
}).map_err(Into::into);
|
||||
})
|
||||
.map_err(Into::into);
|
||||
Ok(Started::Future(Box::new(fut)))
|
||||
}
|
||||
|
||||
@ -218,7 +225,8 @@ impl Middleware<ServerState> for PreConditionCheck {
|
||||
resp.headers_mut().insert("X-Last-Modified", ts_header);
|
||||
}
|
||||
future::ok(resp)
|
||||
}).map_err(Into::into);
|
||||
})
|
||||
.map_err(Into::into);
|
||||
Ok(Response::Future(Box::new(fut)))
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user