Merge pull request #1882 from mozilla-services/feat/syncstorage-mysql-db-impl-STOR-403
Some checks failed
Glean probe-scraper / glean-probe-scraper (push) Has been cancelled

refactor: syncstorage-mysql models/batch/schema/diesel_ext -> db
This commit is contained in:
Philip Jenvey 2025-11-06 11:04:30 -08:00 committed by GitHub
commit 831e89347b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 590 additions and 768 deletions

View File

@ -1,308 +0,0 @@
use base64::Engine;
use std::collections::HashSet;
use diesel::{
self,
dsl::sql,
insert_into,
result::{DatabaseErrorKind::UniqueViolation, Error as DieselError},
sql_query,
sql_types::{BigInt, Integer},
ExpressionMethods, OptionalExtension, QueryDsl,
};
use diesel_async::RunQueryDsl;
use syncstorage_db_common::{params, results, UserIdentifier, BATCH_LIFETIME};
use super::{
models::MysqlDb,
schema::{batch_upload_items, batch_uploads},
DbError, DbResult,
};
const MAX_TTL: i32 = 2_100_000_000;
const MAX_BATCH_CREATE_RETRY: u8 = 5;
pub async fn create(
db: &mut MysqlDb,
params: params::CreateBatch,
) -> DbResult<results::CreateBatch> {
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection).await?;
// Careful, there's some weirdness here!
//
// Sync timestamps are in seconds and quantized to two decimal places, so
// when we convert one to a bigint in milliseconds, the final digit is
// always zero. But we want to use the lower digits of the batchid for
// sharding writes via (batchid % num_tables), and leaving it as zero would
// skew the sharding distribution.
//
// So we mix in the lowest digit of the uid to improve the distribution
// while still letting us treat these ids as millisecond timestamps. It's
// yuck, but it works and it keeps the weirdness contained to this single
// line of code.
let mut batch_id = db.timestamp().as_i64() + (user_id % 10);
// Occasionally batch_ids clash (usually during unit testing), so also
// retry w/ increments
for i in 1..=MAX_BATCH_CREATE_RETRY {
let result = insert_into(batch_uploads::table)
.values((
batch_uploads::batch_id.eq(&batch_id),
batch_uploads::user_id.eq(&user_id),
batch_uploads::collection_id.eq(&collection_id),
))
.execute(&mut db.conn)
.await;
match result {
Ok(_) => break,
Err(DieselError::DatabaseError(UniqueViolation, _)) => {
if i == MAX_BATCH_CREATE_RETRY {
return Err(DbError::conflict());
}
batch_id += 1;
}
Err(e) => return Err(e.into()),
}
}
do_append(db, batch_id, params.user_id, collection_id, params.bsos).await?;
Ok(results::CreateBatch {
id: encode_id(batch_id),
size: None,
})
}
pub async fn validate(db: &mut MysqlDb, params: params::ValidateBatch) -> DbResult<bool> {
let batch_id = decode_id(&params.id)?;
// Avoid hitting the db for batches that are obviously too old. Recall
// that the batchid is a millisecond timestamp.
if (batch_id + BATCH_LIFETIME) < db.timestamp().as_i64() {
return Ok(false);
}
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection).await?;
let exists = batch_uploads::table
.select(sql::<Integer>("1"))
.filter(batch_uploads::batch_id.eq(&batch_id))
.filter(batch_uploads::user_id.eq(&user_id))
.filter(batch_uploads::collection_id.eq(&collection_id))
.get_result::<i32>(&mut db.conn)
.await
.optional()?;
Ok(exists.is_some())
}
pub async fn append(db: &mut MysqlDb, params: params::AppendToBatch) -> DbResult<()> {
let exists = validate(
db,
params::ValidateBatch {
user_id: params.user_id.clone(),
collection: params.collection.clone(),
id: params.batch.id.clone(),
},
)
.await?;
if !exists {
return Err(DbError::batch_not_found());
}
let batch_id = decode_id(&params.batch.id)?;
let collection_id = db.get_collection_id(&params.collection).await?;
do_append(db, batch_id, params.user_id, collection_id, params.bsos).await?;
Ok(())
}
pub async fn get(
db: &mut MysqlDb,
params: params::GetBatch,
) -> DbResult<Option<results::GetBatch>> {
let is_valid = validate(
db,
params::ValidateBatch {
user_id: params.user_id,
collection: params.collection,
id: params.id.clone(),
},
)
.await?;
let batch = if is_valid {
Some(results::GetBatch { id: params.id })
} else {
None
};
Ok(batch)
}
pub async fn delete(db: &mut MysqlDb, params: params::DeleteBatch) -> DbResult<()> {
let batch_id = decode_id(&params.id)?;
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection).await?;
diesel::delete(batch_uploads::table)
.filter(batch_uploads::batch_id.eq(&batch_id))
.filter(batch_uploads::user_id.eq(&user_id))
.filter(batch_uploads::collection_id.eq(&collection_id))
.execute(&mut db.conn)
.await?;
diesel::delete(batch_upload_items::table)
.filter(batch_upload_items::batch_id.eq(&batch_id))
.filter(batch_upload_items::user_id.eq(&user_id))
.execute(&mut db.conn)
.await?;
Ok(())
}
/// Commits a batch to the bsos table, deleting the batch when succesful
pub async fn commit(
db: &mut MysqlDb,
params: params::CommitBatch,
) -> DbResult<results::CommitBatch> {
let batch_id = decode_id(&params.batch.id)?;
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection).await?;
let timestamp = db.timestamp();
sql_query(include_str!("batch_commit.sql"))
.bind::<BigInt, _>(user_id)
.bind::<Integer, _>(&collection_id)
.bind::<BigInt, _>(&db.timestamp().as_i64())
.bind::<BigInt, _>(&db.timestamp().as_i64())
.bind::<BigInt, _>((MAX_TTL as i64) * 1000) // XXX:
.bind::<BigInt, _>(&batch_id)
.bind::<BigInt, _>(user_id)
.bind::<BigInt, _>(&db.timestamp().as_i64())
.bind::<BigInt, _>(&db.timestamp().as_i64())
.execute(&mut db.conn)
.await?;
db.update_collection(user_id as u32, collection_id).await?;
delete(
db,
params::DeleteBatch {
user_id: params.user_id,
collection: params.collection,
id: params.batch.id,
},
)
.await?;
Ok(timestamp)
}
pub async fn do_append(
db: &mut MysqlDb,
batch_id: i64,
user_id: UserIdentifier,
_collection_id: i32,
bsos: Vec<params::PostCollectionBso>,
) -> DbResult<()> {
fn exist_idx(user_id: u64, batch_id: i64, bso_id: &str) -> String {
// Construct something that matches the key for batch_upload_items
format!(
"{batch_id}-{user_id}-{bso_id}",
batch_id = batch_id,
user_id = user_id,
bso_id = bso_id,
)
}
// It's possible for the list of items to contain a duplicate key entry.
// This means that we can't really call `ON DUPLICATE` here, because that's
// more about inserting one item at a time. (e.g. it works great if the
// values contain a key that's already in the database, less so if the
// the duplicate is in the value set we're inserting.
#[derive(Debug, QueryableByName)]
#[diesel(table_name = batch_upload_items)]
struct ExistsResult {
batch_id: i64,
id: String,
}
#[derive(AsChangeset)]
#[diesel(table_name = batch_upload_items)]
struct UpdateBatches {
payload: Option<String>,
payload_size: Option<i64>,
ttl_offset: Option<i32>,
}
let mut existing = HashSet::new();
// pre-load the "existing" hashset with any batched uploads that are already in the table.
for item in sql_query(
"SELECT userid as user_id, batch as batch_id, id FROM batch_upload_items WHERE userid=? AND batch=?;",
)
.bind::<BigInt, _>(user_id.legacy_id as i64)
.bind::<BigInt, _>(batch_id)
.get_results::<ExistsResult>(&mut db.conn).await?
{
existing.insert(exist_idx(
user_id.legacy_id,
item.batch_id,
&item.id.to_string(),
));
}
for bso in bsos {
let payload_size = bso.payload.as_ref().map(|p| p.len() as i64);
let exist_idx = exist_idx(user_id.legacy_id, batch_id, &bso.id);
if existing.contains(&exist_idx) {
diesel::update(
batch_upload_items::table
.filter(batch_upload_items::user_id.eq(user_id.legacy_id as i64))
.filter(batch_upload_items::batch_id.eq(batch_id)),
)
.set(&UpdateBatches {
payload: bso.payload,
payload_size,
ttl_offset: bso.ttl.map(|ttl| ttl as i32),
})
.execute(&mut db.conn)
.await?;
} else {
diesel::insert_into(batch_upload_items::table)
.values((
batch_upload_items::batch_id.eq(&batch_id),
batch_upload_items::user_id.eq(user_id.legacy_id as i64),
batch_upload_items::id.eq(bso.id.clone()),
batch_upload_items::sortindex.eq(bso.sortindex),
batch_upload_items::payload.eq(bso.payload),
batch_upload_items::payload_size.eq(payload_size),
batch_upload_items::ttl_offset.eq(bso.ttl.map(|ttl| ttl as i32)),
))
.execute(&mut db.conn)
.await?;
// make sure to include the key into our table check.
existing.insert(exist_idx);
}
}
Ok(())
}
pub fn validate_batch_id(id: &str) -> DbResult<()> {
decode_id(id).map(|_| ())
}
fn encode_id(id: i64) -> String {
base64::engine::general_purpose::STANDARD.encode(id.to_string())
}
fn decode_id(id: &str) -> DbResult<i64> {
let bytes = base64::engine::general_purpose::STANDARD
.decode(id)
.unwrap_or_else(|_| id.as_bytes().to_vec());
let decoded = std::str::from_utf8(&bytes).unwrap_or(id);
decoded
.parse::<i64>()
.map_err(|e| DbError::internal(format!("Invalid batch_id: {}", e)))
}
macro_rules! batch_db_method {
($name:ident, $batch_name:ident, $type:ident) => {
pub async fn $name(&mut self, params: params::$type) -> DbResult<results::$type> {
batch::$batch_name(self, params).await
}
};
}

View File

@ -0,0 +1,301 @@
use base64::Engine;
use std::collections::HashSet;
use async_trait::async_trait;
use diesel::{
self,
dsl::sql,
insert_into,
result::{DatabaseErrorKind::UniqueViolation, Error as DieselError},
sql_query,
sql_types::{BigInt, Integer},
ExpressionMethods, OptionalExtension, QueryDsl,
};
use diesel_async::RunQueryDsl;
use syncstorage_db_common::{params, results, BatchDb, Db, UserIdentifier, BATCH_LIFETIME};
use super::{
schema::{batch_upload_items, batch_uploads},
MysqlDb,
};
use crate::{DbError, DbResult};
const MAX_TTL: i32 = 2_100_000_000;
const MAX_BATCH_CREATE_RETRY: u8 = 5;
#[async_trait(?Send)]
impl BatchDb for MysqlDb {
type Error = DbError;
async fn create_batch(
&mut self,
params: params::CreateBatch,
) -> DbResult<results::CreateBatch> {
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_collection_id(&params.collection).await?;
// Careful, there's some weirdness here!
//
// Sync timestamps are in seconds and quantized to two decimal places, so
// when we convert one to a bigint in milliseconds, the final digit is
// always zero. But we want to use the lower digits of the batchid for
// sharding writes via (batchid % num_tables), and leaving it as zero would
// skew the sharding distribution.
//
// So we mix in the lowest digit of the uid to improve the distribution
// while still letting us treat these ids as millisecond timestamps. It's
// yuck, but it works and it keeps the weirdness contained to this single
// line of code.
let mut batch_id = self.timestamp().as_i64() + (user_id % 10);
// Occasionally batch_ids clash (usually during unit testing), so also
// retry w/ increments
for i in 1..=MAX_BATCH_CREATE_RETRY {
let result = insert_into(batch_uploads::table)
.values((
batch_uploads::batch_id.eq(&batch_id),
batch_uploads::user_id.eq(&user_id),
batch_uploads::collection_id.eq(&collection_id),
))
.execute(&mut self.conn)
.await;
match result {
Ok(_) => break,
Err(DieselError::DatabaseError(UniqueViolation, _)) => {
if i == MAX_BATCH_CREATE_RETRY {
return Err(DbError::conflict());
}
batch_id += 1;
}
Err(e) => return Err(e.into()),
}
}
do_append(self, batch_id, params.user_id, collection_id, params.bsos).await?;
Ok(results::CreateBatch {
id: encode_id(batch_id),
size: None,
})
}
async fn validate_batch(&mut self, params: params::ValidateBatch) -> DbResult<bool> {
let batch_id = decode_id(&params.id)?;
// Avoid hitting the db for batches that are obviously too old. Recall
// that the batchid is a millisecond timestamp.
if (batch_id + BATCH_LIFETIME) < self.timestamp().as_i64() {
return Ok(false);
}
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_collection_id(&params.collection).await?;
let exists = batch_uploads::table
.select(sql::<Integer>("1"))
.filter(batch_uploads::batch_id.eq(&batch_id))
.filter(batch_uploads::user_id.eq(&user_id))
.filter(batch_uploads::collection_id.eq(&collection_id))
.get_result::<i32>(&mut self.conn)
.await
.optional()?;
Ok(exists.is_some())
}
async fn append_to_batch(&mut self, params: params::AppendToBatch) -> DbResult<()> {
let exists = self
.validate_batch(params::ValidateBatch {
user_id: params.user_id.clone(),
collection: params.collection.clone(),
id: params.batch.id.clone(),
})
.await?;
if !exists {
return Err(DbError::batch_not_found());
}
let batch_id = decode_id(&params.batch.id)?;
let collection_id = self.get_collection_id(&params.collection).await?;
do_append(self, batch_id, params.user_id, collection_id, params.bsos).await?;
Ok(())
}
async fn get_batch(&mut self, params: params::GetBatch) -> DbResult<Option<results::GetBatch>> {
let is_valid = self
.validate_batch(params::ValidateBatch {
user_id: params.user_id,
collection: params.collection,
id: params.id.clone(),
})
.await?;
let batch = if is_valid {
Some(results::GetBatch { id: params.id })
} else {
None
};
Ok(batch)
}
async fn delete_batch(&mut self, params: params::DeleteBatch) -> DbResult<()> {
let batch_id = decode_id(&params.id)?;
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_collection_id(&params.collection).await?;
diesel::delete(batch_uploads::table)
.filter(batch_uploads::batch_id.eq(&batch_id))
.filter(batch_uploads::user_id.eq(&user_id))
.filter(batch_uploads::collection_id.eq(&collection_id))
.execute(&mut self.conn)
.await?;
diesel::delete(batch_upload_items::table)
.filter(batch_upload_items::batch_id.eq(&batch_id))
.filter(batch_upload_items::user_id.eq(&user_id))
.execute(&mut self.conn)
.await?;
Ok(())
}
/// Commits a batch to the bsos table, deleting the batch when succesful
async fn commit_batch(
&mut self,
params: params::CommitBatch,
) -> DbResult<results::CommitBatch> {
let batch_id = decode_id(&params.batch.id)?;
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_collection_id(&params.collection).await?;
let timestamp = self.timestamp();
sql_query(include_str!("batch_commit.sql"))
.bind::<BigInt, _>(user_id)
.bind::<Integer, _>(&collection_id)
.bind::<BigInt, _>(&self.timestamp().as_i64())
.bind::<BigInt, _>(&self.timestamp().as_i64())
.bind::<BigInt, _>((MAX_TTL as i64) * 1000) // XXX:
.bind::<BigInt, _>(&batch_id)
.bind::<BigInt, _>(user_id)
.bind::<BigInt, _>(&self.timestamp().as_i64())
.bind::<BigInt, _>(&self.timestamp().as_i64())
.execute(&mut self.conn)
.await?;
self.update_collection(params::UpdateCollection {
user_id: params.user_id.clone(),
collection_id,
collection: params.collection.clone(),
})
.await?;
self.delete_batch(params::DeleteBatch {
user_id: params.user_id,
collection: params.collection,
id: params.batch.id,
})
.await?;
Ok(timestamp)
}
}
pub async fn do_append(
db: &mut MysqlDb,
batch_id: i64,
user_id: UserIdentifier,
_collection_id: i32,
bsos: Vec<params::PostCollectionBso>,
) -> DbResult<()> {
fn exist_idx(user_id: u64, batch_id: i64, bso_id: &str) -> String {
// Construct something that matches the key for batch_upload_items
format!(
"{batch_id}-{user_id}-{bso_id}",
batch_id = batch_id,
user_id = user_id,
bso_id = bso_id,
)
}
// It's possible for the list of items to contain a duplicate key entry.
// This means that we can't really call `ON DUPLICATE` here, because that's
// more about inserting one item at a time. (e.g. it works great if the
// values contain a key that's already in the database, less so if the
// the duplicate is in the value set we're inserting.
#[derive(Debug, QueryableByName)]
#[diesel(table_name = batch_upload_items)]
struct ExistsResult {
batch_id: i64,
id: String,
}
#[derive(AsChangeset)]
#[diesel(table_name = batch_upload_items)]
struct UpdateBatches {
payload: Option<String>,
payload_size: Option<i64>,
ttl_offset: Option<i32>,
}
let mut existing = HashSet::new();
// pre-load the "existing" hashset with any batched uploads that are already in the table.
for item in sql_query(
"SELECT userid as user_id, batch as batch_id, id FROM batch_upload_items WHERE userid=? AND batch=?;",
)
.bind::<BigInt, _>(user_id.legacy_id as i64)
.bind::<BigInt, _>(batch_id)
.get_results::<ExistsResult>(&mut db.conn).await?
{
existing.insert(exist_idx(
user_id.legacy_id,
item.batch_id,
&item.id.to_string(),
));
}
for bso in bsos {
let payload_size = bso.payload.as_ref().map(|p| p.len() as i64);
let exist_idx = exist_idx(user_id.legacy_id, batch_id, &bso.id);
if existing.contains(&exist_idx) {
diesel::update(
batch_upload_items::table
.filter(batch_upload_items::user_id.eq(user_id.legacy_id as i64))
.filter(batch_upload_items::batch_id.eq(batch_id)),
)
.set(&UpdateBatches {
payload: bso.payload,
payload_size,
ttl_offset: bso.ttl.map(|ttl| ttl as i32),
})
.execute(&mut db.conn)
.await?;
} else {
diesel::insert_into(batch_upload_items::table)
.values((
batch_upload_items::batch_id.eq(&batch_id),
batch_upload_items::user_id.eq(user_id.legacy_id as i64),
batch_upload_items::id.eq(bso.id.clone()),
batch_upload_items::sortindex.eq(bso.sortindex),
batch_upload_items::payload.eq(bso.payload),
batch_upload_items::payload_size.eq(payload_size),
batch_upload_items::ttl_offset.eq(bso.ttl.map(|ttl| ttl as i32)),
))
.execute(&mut db.conn)
.await?;
// make sure to include the key into our table check.
existing.insert(exist_idx);
}
}
Ok(())
}
pub fn validate_batch_id(id: &str) -> DbResult<()> {
decode_id(id).map(|_| ())
}
fn encode_id(id: i64) -> String {
base64::engine::general_purpose::STANDARD.encode(id.to_string())
}
fn decode_id(id: &str) -> DbResult<i64> {
let bytes = base64::engine::general_purpose::STANDARD
.decode(id)
.unwrap_or_else(|_| id.as_bytes().to_vec());
let decoded = std::str::from_utf8(&bytes).unwrap_or(id);
decoded
.parse::<i64>()
.map_err(|e| DbError::internal(format!("Invalid batch_id: {}", e)))
}

View File

@ -1,4 +1,4 @@
use std::{collections::HashMap, fmt, sync::Arc};
use std::collections::HashMap;
use async_trait::async_trait;
use diesel::{
@ -10,91 +10,25 @@ use diesel::{
ExpressionMethods, OptionalExtension, QueryDsl,
};
use diesel_async::{AsyncConnection, RunQueryDsl, TransactionManager};
use syncserver_common::Metrics;
use syncstorage_db_common::{
error::DbErrorIntrospect, params, results, util::SyncTimestamp, BatchDb, Db, Sorting,
UserIdentifier, DEFAULT_BSO_TTL,
error::DbErrorIntrospect, params, results, util::SyncTimestamp, Db, Sorting, UserIdentifier,
DEFAULT_BSO_TTL,
};
use syncstorage_settings::{Quota, DEFAULT_MAX_TOTAL_RECORDS};
use super::{
batch,
diesel_ext::LockInShareModeDsl,
pool::{CollectionCache, Conn},
schema::{bso, collections, user_collections},
DbError, DbResult,
schema::{bso, user_collections},
CollectionLock, MysqlDb, COLLECTION_ID, COUNT, EXPIRY, LAST_MODIFIED, MODIFIED, TOMBSTONE,
TOTAL_BYTES, USER_ID,
};
use crate::{pool::Conn, DbError, DbResult};
// this is the max number of records we will return.
static DEFAULT_LIMIT: u32 = DEFAULT_MAX_TOTAL_RECORDS;
const TOMBSTONE: i32 = 0;
/// SQL Variable remapping
/// These names are the legacy values mapped to the new names.
const COLLECTION_ID: &str = "collection";
const USER_ID: &str = "userid";
const MODIFIED: &str = "modified";
const EXPIRY: &str = "ttl";
const LAST_MODIFIED: &str = "last_modified";
const COUNT: &str = "count";
const TOTAL_BYTES: &str = "total_bytes";
#[derive(Debug)]
enum CollectionLock {
Read,
Write,
}
/// Per session Db metadata
#[derive(Debug, Default)]
struct MysqlDbSession {
/// The "current time" on the server used for this session's operations
timestamp: SyncTimestamp,
/// Cache of collection modified timestamps per (user_id, collection_id)
coll_modified_cache: HashMap<(u32, i32), SyncTimestamp>,
/// Currently locked collections
coll_locks: HashMap<(u32, i32), CollectionLock>,
/// Whether a transaction was started (begin() called)
in_transaction: bool,
in_write_transaction: bool,
}
pub struct MysqlDb {
pub(super) conn: Conn,
session: MysqlDbSession,
/// Pool level cache of collection_ids and their names
coll_cache: Arc<CollectionCache>,
metrics: Metrics,
quota: Quota,
}
impl fmt::Debug for MysqlDb {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MysqlDb")
.field("session", &self.session)
.field("coll_cache", &self.coll_cache)
.field("metrics", &self.metrics)
.field("quota", &self.quota)
.finish()
}
}
impl MysqlDb {
pub(super) fn new(
conn: Conn,
coll_cache: Arc<CollectionCache>,
metrics: &Metrics,
quota: &Quota,
) -> Self {
MysqlDb {
conn,
session: Default::default(),
coll_cache,
metrics: metrics.clone(),
quota: *quota,
}
}
#[async_trait(?Send)]
impl Db for MysqlDb {
/// APIs for collection-level locking
///
/// Explicitly lock the matching row in the user_collections table. Read
@ -190,7 +124,7 @@ impl MysqlDb {
Ok(())
}
pub(super) async fn begin(&mut self, for_write: bool) -> DbResult<()> {
async fn begin(&mut self, for_write: bool) -> DbResult<()> {
<Conn as AsyncConnection>::TransactionManager::begin_transaction(&mut self.conn).await?;
self.session.in_transaction = true;
if for_write {
@ -215,24 +149,6 @@ impl MysqlDb {
Ok(())
}
async fn erect_tombstone(&mut self, user_id: i32) -> DbResult<()> {
sql_query(format!(
r#"INSERT INTO user_collections ({user_id}, {collection_id}, {modified})
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
{modified} = VALUES({modified})"#,
user_id = USER_ID,
collection_id = COLLECTION_ID,
modified = LAST_MODIFIED
))
.bind::<BigInt, _>(user_id as i64)
.bind::<Integer, _>(TOMBSTONE)
.bind::<BigInt, _>(self.timestamp().as_i64())
.execute(&mut self.conn)
.await?;
Ok(())
}
async fn delete_storage(&mut self, user_id: UserIdentifier) -> DbResult<()> {
let user_id = user_id.legacy_id as i64;
// Delete user data.
@ -275,30 +191,7 @@ impl MysqlDb {
self.get_storage_timestamp(params.user_id).await
}
pub(super) async fn get_or_create_collection_id(&mut self, name: &str) -> DbResult<i32> {
if let Some(id) = self.coll_cache.get_id(name)? {
return Ok(id);
}
diesel::insert_or_ignore_into(collections::table)
.values(collections::name.eq(name))
.execute(&mut self.conn)
.await?;
let id = collections::table
.select(collections::id)
.filter(collections::name.eq(name))
.first(&mut self.conn)
.await?;
if !self.session.in_write_transaction {
self.coll_cache.put(id, name.to_owned())?;
}
Ok(id)
}
pub(super) async fn get_collection_id(&mut self, name: &str) -> DbResult<i32> {
async fn get_collection_id(&mut self, name: &str) -> DbResult<i32> {
if let Some(id) = self.coll_cache.get_id(name)? {
return Ok(id);
}
@ -320,25 +213,6 @@ impl MysqlDb {
Ok(id)
}
async fn _get_collection_name(&mut self, id: i32) -> DbResult<String> {
let name = if let Some(name) = self.coll_cache.get_name(id)? {
name
} else {
sql_query(
"SELECT name
FROM collections
WHERE id = ?",
)
.bind::<Integer, _>(&id)
.get_result::<NameResult>(&mut self.conn)
.await
.optional()?
.ok_or_else(DbError::collection_not_found)?
.name
};
Ok(name)
}
async fn put_bso(&mut self, bso: params::PutBso) -> DbResult<results::PutBso> {
/*
if bso.payload.is_none() && bso.sortindex.is_none() && bso.ttl.is_none() {
@ -434,7 +308,12 @@ impl MysqlDb {
.bind::<BigInt, _>(timestamp + (i64::from(ttl) * 1000)) // remember: this is in millis
.execute(&mut self.conn)
.await?;
self.update_collection(user_id as u32, collection_id).await
self.update_collection(params::UpdateCollection {
user_id: bso.user_id,
collection_id,
collection: bso.collection,
})
.await
}
async fn get_bsos(&mut self, params: params::GetBsos) -> DbResult<results::GetBsos> {
@ -623,7 +502,12 @@ impl MysqlDb {
if affected_rows == 0 {
return Err(DbError::bso_not_found());
}
self.update_collection(user_id as u32, collection_id).await
self.update_collection(params::UpdateCollection {
user_id: params.user_id,
collection_id,
collection: params.collection,
})
.await
}
async fn delete_bsos(&mut self, params: params::DeleteBsos) -> DbResult<results::DeleteBsos> {
@ -635,7 +519,12 @@ impl MysqlDb {
.filter(bso::id.eq_any(params.ids))
.execute(&mut self.conn)
.await?;
self.update_collection(user_id as u32, collection_id).await
self.update_collection(params::UpdateCollection {
user_id: params.user_id,
collection_id,
collection: params.collection,
})
.await
}
async fn post_bsos(&mut self, input: params::PostBsos) -> DbResult<SyncTimestamp> {
@ -653,8 +542,12 @@ impl MysqlDb {
})
.await?;
}
self.update_collection(input.user_id.legacy_id as u32, collection_id)
.await?;
self.update_collection(params::UpdateCollection {
user_id: input.user_id,
collection_id,
collection: input.collection,
})
.await?;
Ok(modified)
}
@ -743,60 +636,13 @@ impl MysqlDb {
Ok(true)
}
async fn map_collection_names<T>(
async fn update_collection(
&mut self,
by_id: HashMap<i32, T>,
) -> DbResult<HashMap<String, T>> {
let mut names = self.load_collection_names(by_id.keys()).await?;
by_id
.into_iter()
.map(|(id, value)| {
names.remove(&id).map(|name| (name, value)).ok_or_else(|| {
DbError::internal("load_collection_names unknown collection id".to_owned())
})
})
.collect()
}
async fn load_collection_names<'a>(
&mut self,
collection_ids: impl Iterator<Item = &'a i32>,
) -> DbResult<HashMap<i32, String>> {
let mut names = HashMap::new();
let mut uncached = Vec::new();
for &id in collection_ids {
if let Some(name) = self.coll_cache.get_name(id)? {
names.insert(id, name);
} else {
uncached.push(id);
}
}
if !uncached.is_empty() {
let result = collections::table
.select((collections::id, collections::name))
.filter(collections::id.eq_any(uncached))
.load::<(i32, String)>(&mut self.conn)
.await?;
for (id, name) in result {
names.insert(id, name.clone());
if !self.session.in_write_transaction {
self.coll_cache.put(id, name)?;
}
}
}
Ok(names)
}
pub(super) async fn update_collection(
&mut self,
user_id: u32,
collection_id: i32,
params: params::UpdateCollection,
) -> DbResult<SyncTimestamp> {
let quota = if self.quota.enabled {
self.calc_quota_usage(user_id, collection_id).await?
self.calc_quota_usage(params.user_id.legacy_id as i64, params.collection_id)
.await?
} else {
results::GetQuotaUsage {
count: 0,
@ -821,8 +667,8 @@ impl MysqlDb {
let total_bytes = quota.total_bytes as i64;
let timestamp = self.timestamp().as_i64();
sql_query(upsert)
.bind::<BigInt, _>(user_id as i64)
.bind::<Integer, _>(&collection_id)
.bind::<BigInt, _>(params.user_id.legacy_id as i64)
.bind::<Integer, _>(&params.collection_id)
.bind::<BigInt, _>(&timestamp)
.bind::<BigInt, _>(&total_bytes)
.bind::<Integer, _>(&quota.count)
@ -872,30 +718,6 @@ impl MysqlDb {
})
}
// perform a heavier weight quota calculation
async fn calc_quota_usage(
&mut self,
user_id: u32,
collection_id: i32,
) -> DbResult<results::GetQuotaUsage> {
let (total_bytes, count): (i64, i32) = bso::table
.select((
sql::<BigInt>(r#"COALESCE(SUM(LENGTH(COALESCE(payload, ""))),0)"#),
sql::<Integer>("COALESCE(COUNT(*),0)"),
))
.filter(bso::user_id.eq(user_id as i64))
.filter(bso::expiry.gt(self.timestamp().as_i64()))
.filter(bso::collection_id.eq(collection_id))
.get_result(&mut self.conn)
.await
.optional()?
.unwrap_or_default();
Ok(results::GetQuotaUsage {
total_bytes: total_bytes as usize,
count,
})
}
async fn get_collection_usage(
&mut self,
user_id: UserIdentifier,
@ -934,169 +756,9 @@ impl MysqlDb {
self.map_collection_names(counts).await
}
batch_db_method!(create_batch, create, CreateBatch);
batch_db_method!(validate_batch, validate, ValidateBatch);
batch_db_method!(append_to_batch, append, AppendToBatch);
batch_db_method!(commit_batch, commit, CommitBatch);
batch_db_method!(delete_batch, delete, DeleteBatch);
async fn get_batch(&mut self, params: params::GetBatch) -> DbResult<Option<results::GetBatch>> {
batch::get(self, params).await
}
pub(super) fn timestamp(&self) -> SyncTimestamp {
fn timestamp(&self) -> SyncTimestamp {
self.session.timestamp
}
}
#[async_trait(?Send)]
impl Db for MysqlDb {
async fn commit(&mut self) -> Result<(), Self::Error> {
MysqlDb::commit(self).await
}
async fn rollback(&mut self) -> Result<(), Self::Error> {
MysqlDb::rollback(self).await
}
async fn begin(&mut self, for_write: bool) -> Result<(), Self::Error> {
MysqlDb::begin(self, for_write).await
}
async fn check(&mut self) -> Result<results::Check, Self::Error> {
MysqlDb::check(self).await
}
async fn lock_for_read(
&mut self,
params: params::LockCollection,
) -> Result<results::LockCollection, Self::Error> {
MysqlDb::lock_for_read(self, params).await
}
async fn lock_for_write(
&mut self,
params: params::LockCollection,
) -> Result<results::LockCollection, Self::Error> {
MysqlDb::lock_for_write(self, params).await
}
async fn get_collection_timestamps(
&mut self,
params: params::GetCollectionTimestamps,
) -> Result<results::GetCollectionTimestamps, Self::Error> {
MysqlDb::get_collection_timestamps(self, params).await
}
async fn get_collection_timestamp(
&mut self,
params: params::GetCollectionTimestamp,
) -> Result<results::GetCollectionTimestamp, Self::Error> {
MysqlDb::get_collection_timestamp(self, params).await
}
async fn get_collection_counts(
&mut self,
params: params::GetCollectionCounts,
) -> Result<results::GetCollectionCounts, Self::Error> {
MysqlDb::get_collection_counts(self, params).await
}
async fn get_collection_usage(
&mut self,
params: params::GetCollectionUsage,
) -> Result<results::GetCollectionUsage, Self::Error> {
MysqlDb::get_collection_usage(self, params).await
}
async fn get_storage_timestamp(
&mut self,
params: params::GetStorageTimestamp,
) -> Result<results::GetStorageTimestamp, Self::Error> {
MysqlDb::get_storage_timestamp(self, params).await
}
async fn get_storage_usage(
&mut self,
params: params::GetStorageUsage,
) -> Result<results::GetStorageUsage, Self::Error> {
MysqlDb::get_storage_usage(self, params).await
}
async fn get_quota_usage(
&mut self,
params: params::GetQuotaUsage,
) -> Result<results::GetQuotaUsage, Self::Error> {
MysqlDb::get_quota_usage(self, params).await
}
async fn delete_storage(
&mut self,
params: params::DeleteStorage,
) -> Result<results::DeleteStorage, Self::Error> {
MysqlDb::delete_storage(self, params).await
}
async fn delete_collection(
&mut self,
params: params::DeleteCollection,
) -> Result<results::DeleteCollection, Self::Error> {
MysqlDb::delete_collection(self, params).await
}
async fn delete_bsos(
&mut self,
params: params::DeleteBsos,
) -> Result<results::DeleteBsos, Self::Error> {
MysqlDb::delete_bsos(self, params).await
}
async fn get_bsos(&mut self, params: params::GetBsos) -> Result<results::GetBsos, Self::Error> {
MysqlDb::get_bsos(self, params).await
}
async fn get_bso_ids(
&mut self,
params: params::GetBsoIds,
) -> Result<results::GetBsoIds, Self::Error> {
MysqlDb::get_bso_ids(self, params).await
}
async fn post_bsos(
&mut self,
params: params::PostBsos,
) -> Result<results::PostBsos, Self::Error> {
MysqlDb::post_bsos(self, params).await
}
async fn delete_bso(
&mut self,
params: params::DeleteBso,
) -> Result<results::DeleteBso, Self::Error> {
MysqlDb::delete_bso(self, params).await
}
async fn get_bso(
&mut self,
params: params::GetBso,
) -> Result<Option<results::GetBso>, Self::Error> {
MysqlDb::get_bso(self, params).await
}
async fn get_bso_timestamp(
&mut self,
params: params::GetBsoTimestamp,
) -> Result<results::GetBsoTimestamp, Self::Error> {
MysqlDb::get_bso_timestamp(self, params).await
}
async fn put_bso(&mut self, params: params::PutBso) -> Result<results::PutBso, Self::Error> {
MysqlDb::put_bso(self, params).await
}
async fn get_collection_id(&mut self, name: &str) -> Result<i32, Self::Error> {
MysqlDb::get_collection_id(self, name).await
}
fn get_connection_info(&self) -> results::ConnectionInfo {
results::ConnectionInfo::default()
@ -1106,17 +768,6 @@ impl Db for MysqlDb {
self.get_or_create_collection_id(name).await
}
async fn update_collection(
&mut self,
param: params::UpdateCollection,
) -> Result<SyncTimestamp, Self::Error> {
MysqlDb::update_collection(self, param.user_id.legacy_id as u32, param.collection_id).await
}
fn timestamp(&self) -> SyncTimestamp {
MysqlDb::timestamp(self)
}
fn set_timestamp(&mut self, timestamp: SyncTimestamp) {
self.session.timestamp = timestamp;
}
@ -1135,66 +786,12 @@ impl Db for MysqlDb {
}
}
#[async_trait(?Send)]
impl BatchDb for MysqlDb {
type Error = DbError;
async fn create_batch(
&mut self,
params: params::CreateBatch,
) -> Result<results::CreateBatch, Self::Error> {
MysqlDb::create_batch(self, params).await
}
async fn validate_batch(
&mut self,
params: params::ValidateBatch,
) -> Result<results::ValidateBatch, Self::Error> {
MysqlDb::validate_batch(self, params).await
}
async fn append_to_batch(
&mut self,
params: params::AppendToBatch,
) -> Result<results::AppendToBatch, Self::Error> {
MysqlDb::append_to_batch(self, params).await
}
async fn get_batch(
&mut self,
params: params::GetBatch,
) -> Result<Option<results::GetBatch>, Self::Error> {
MysqlDb::get_batch(self, params).await
}
async fn commit_batch(
&mut self,
params: params::CommitBatch,
) -> Result<results::CommitBatch, Self::Error> {
MysqlDb::commit_batch(self, params).await
}
async fn delete_batch(
&mut self,
params: params::DeleteBatch,
) -> Result<results::DeleteBatch, Self::Error> {
MysqlDb::delete_batch(self, params).await
}
}
#[derive(Debug, QueryableByName)]
struct IdResult {
#[diesel(sql_type = Integer)]
id: i32,
}
#[allow(dead_code)] // Not really dead, Rust can't see the use above
#[derive(Debug, QueryableByName)]
struct NameResult {
#[diesel(sql_type = Text)]
name: String,
}
#[derive(Debug, QueryableByName)]
struct UserCollectionsResult {
// Can't substitute column names here.

View File

@ -0,0 +1,231 @@
use std::{collections::HashMap, fmt, sync::Arc};
use diesel::{
dsl::sql,
sql_query,
sql_types::{BigInt, Integer, Text},
ExpressionMethods, OptionalExtension, QueryDsl,
};
use diesel_async::RunQueryDsl;
use syncserver_common::Metrics;
use syncstorage_db_common::{results, util::SyncTimestamp, Db};
use syncstorage_settings::Quota;
use crate::{
pool::{CollectionCache, Conn},
DbError, DbResult,
};
use schema::{bso, collections};
mod batch_impl;
mod db_impl;
mod diesel_ext;
pub(crate) mod schema;
pub use batch_impl::validate_batch_id;
const TOMBSTONE: i32 = 0;
/// SQL Variable remapping
/// These names are the legacy values mapped to the new names.
const COLLECTION_ID: &str = "collection";
const USER_ID: &str = "userid";
const MODIFIED: &str = "modified";
const EXPIRY: &str = "ttl";
const LAST_MODIFIED: &str = "last_modified";
const COUNT: &str = "count";
const TOTAL_BYTES: &str = "total_bytes";
#[derive(Debug)]
enum CollectionLock {
Read,
Write,
}
/// Per session Db metadata
#[derive(Debug, Default)]
struct MysqlDbSession {
/// The "current time" on the server used for this session's operations
timestamp: SyncTimestamp,
/// Cache of collection modified timestamps per (user_id, collection_id)
coll_modified_cache: HashMap<(u32, i32), SyncTimestamp>,
/// Currently locked collections
coll_locks: HashMap<(u32, i32), CollectionLock>,
/// Whether a transaction was started (begin() called)
in_transaction: bool,
in_write_transaction: bool,
}
pub struct MysqlDb {
pub(super) conn: Conn,
session: MysqlDbSession,
/// Pool level cache of collection_ids and their names
coll_cache: Arc<CollectionCache>,
metrics: Metrics,
quota: Quota,
}
impl fmt::Debug for MysqlDb {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MysqlDb")
.field("session", &self.session)
.field("coll_cache", &self.coll_cache)
.field("metrics", &self.metrics)
.field("quota", &self.quota)
.finish()
}
}
impl MysqlDb {
pub(super) fn new(
conn: Conn,
coll_cache: Arc<CollectionCache>,
metrics: &Metrics,
quota: &Quota,
) -> Self {
MysqlDb {
conn,
session: Default::default(),
coll_cache,
metrics: metrics.clone(),
quota: *quota,
}
}
async fn erect_tombstone(&mut self, user_id: i32) -> DbResult<()> {
sql_query(format!(
r#"INSERT INTO user_collections ({user_id}, {collection_id}, {modified})
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
{modified} = VALUES({modified})"#,
user_id = USER_ID,
collection_id = COLLECTION_ID,
modified = LAST_MODIFIED
))
.bind::<BigInt, _>(user_id as i64)
.bind::<Integer, _>(TOMBSTONE)
.bind::<BigInt, _>(self.timestamp().as_i64())
.execute(&mut self.conn)
.await?;
Ok(())
}
pub(super) async fn get_or_create_collection_id(&mut self, name: &str) -> DbResult<i32> {
if let Some(id) = self.coll_cache.get_id(name)? {
return Ok(id);
}
diesel::insert_or_ignore_into(collections::table)
.values(collections::name.eq(name))
.execute(&mut self.conn)
.await?;
let id = collections::table
.select(collections::id)
.filter(collections::name.eq(name))
.first(&mut self.conn)
.await?;
if !self.session.in_write_transaction {
self.coll_cache.put(id, name.to_owned())?;
}
Ok(id)
}
async fn _get_collection_name(&mut self, id: i32) -> DbResult<String> {
let name = if let Some(name) = self.coll_cache.get_name(id)? {
name
} else {
sql_query(
"SELECT name
FROM collections
WHERE id = ?",
)
.bind::<Integer, _>(&id)
.get_result::<NameResult>(&mut self.conn)
.await
.optional()?
.ok_or_else(DbError::collection_not_found)?
.name
};
Ok(name)
}
async fn map_collection_names<T>(
&mut self,
by_id: HashMap<i32, T>,
) -> DbResult<HashMap<String, T>> {
let mut names = self.load_collection_names(by_id.keys()).await?;
by_id
.into_iter()
.map(|(id, value)| {
names.remove(&id).map(|name| (name, value)).ok_or_else(|| {
DbError::internal("load_collection_names unknown collection id".to_owned())
})
})
.collect()
}
async fn load_collection_names<'a>(
&mut self,
collection_ids: impl Iterator<Item = &'a i32>,
) -> DbResult<HashMap<i32, String>> {
let mut names = HashMap::new();
let mut uncached = Vec::new();
for &id in collection_ids {
if let Some(name) = self.coll_cache.get_name(id)? {
names.insert(id, name);
} else {
uncached.push(id);
}
}
if !uncached.is_empty() {
let result = collections::table
.select((collections::id, collections::name))
.filter(collections::id.eq_any(uncached))
.load::<(i32, String)>(&mut self.conn)
.await?;
for (id, name) in result {
names.insert(id, name.clone());
if !self.session.in_write_transaction {
self.coll_cache.put(id, name)?;
}
}
}
Ok(names)
}
// perform a heavier weight quota calculation
async fn calc_quota_usage(
&mut self,
user_id: i64,
collection_id: i32,
) -> DbResult<results::GetQuotaUsage> {
let (total_bytes, count): (i64, i32) = bso::table
.select((
sql::<BigInt>(r#"COALESCE(SUM(LENGTH(COALESCE(payload, ""))),0)"#),
sql::<Integer>("COALESCE(COUNT(*),0)"),
))
.filter(bso::user_id.eq(user_id))
.filter(bso::expiry.gt(self.timestamp().as_i64()))
.filter(bso::collection_id.eq(collection_id))
.get_result(&mut self.conn)
.await
.optional()?
.unwrap_or_default();
Ok(results::GetQuotaUsage {
total_bytes: total_bytes as usize,
count,
})
}
}
#[allow(dead_code)] // Not really dead, Rust can't see the use above
#[derive(Debug, QueryableByName)]
struct NameResult {
#[diesel(sql_type = Text)]
name: String,
}

View File

@ -6,15 +6,12 @@ extern crate diesel_migrations;
extern crate slog_scope;
#[macro_use]
mod batch;
mod diesel_ext;
mod models;
mod db;
mod pool;
mod schema;
#[cfg(test)]
mod test;
pub use models::MysqlDb;
pub use db::MysqlDb;
pub use pool::MysqlDbPool;
pub use syncstorage_db_common::diesel::DbError;

View File

@ -28,7 +28,7 @@ use syncstorage_db_common::{Db, DbPool, STD_COLLS};
use syncstorage_settings::{Quota, Settings};
use tokio::task::spawn_blocking;
use super::{models::MysqlDb, DbError, DbResult};
use super::{db::MysqlDb, DbError, DbResult};
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
@ -171,7 +171,7 @@ impl DbPool for MysqlDbPool {
}
fn validate_batch_id(&self, id: String) -> DbResult<()> {
super::batch::validate_batch_id(&id)
super::db::validate_batch_id(&id)
}
fn box_clone(&self) -> Box<dyn DbPool<Error = Self::Error>> {

View File

@ -8,11 +8,15 @@ use diesel::{
use diesel_async::RunQueryDsl;
use syncserver_common::{BlockingThreadpool, Metrics};
use syncserver_settings::Settings as SyncserverSettings;
use syncstorage_db_common::DbPool;
use syncstorage_db_common::{Db, DbPool};
use syncstorage_settings::Settings as SyncstorageSettings;
use url::Url;
use crate::{models::MysqlDb, pool::MysqlDbPool, schema::collections, DbResult};
use crate::{
db::{schema::collections, MysqlDb},
pool::MysqlDbPool,
DbResult,
};
async fn db(settings: &SyncstorageSettings) -> DbResult<MysqlDb> {
let _ = env_logger::try_init();

View File

@ -33,12 +33,6 @@ use support::{
StreamedResultSetAsync,
};
#[derive(Debug, Eq, PartialEq)]
enum CollectionLock {
Read,
Write,
}
mod batch_impl;
mod db_impl;
mod stream;
@ -46,6 +40,12 @@ pub(crate) mod support;
pub use batch_impl::validate_batch_id;
#[derive(Debug, Eq, PartialEq)]
enum CollectionLock {
Read,
Write,
}
const TOMBSTONE: i32 = 0;
pub const PRETOUCH_TS: &str = "0001-01-01T00:00:00.00Z";