Merge branch 'master' into release/0.21

This commit is contained in:
Philip Jenvey 2025-09-25 16:20:00 -07:00
commit 0a56042d6a
No known key found for this signature in database
GPG Key ID: 5B9F83DE4F7EB7FA
24 changed files with 450 additions and 467 deletions

18
Cargo.lock generated
View File

@ -824,11 +824,8 @@ dependencies = [
"byteorder",
"diesel_derives",
"itoa",
"mysqlclient-sys",
"percent-encoding",
"pq-sys",
"r2d2",
"url",
]
[[package]]
@ -2092,17 +2089,6 @@ dependencies = [
"uuid",
]
[[package]]
name = "mysqlclient-sys"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86a34a2bdec189f1060343ba712983e14cad7e87515cfd9ac4653e207535b6b1"
dependencies = [
"pkg-config",
"semver",
"vcpkg",
]
[[package]]
name = "nom"
version = "5.1.3"
@ -3435,7 +3421,9 @@ dependencies = [
"async-trait",
"backtrace",
"base64",
"deadpool",
"diesel",
"diesel-async",
"diesel_logger",
"diesel_migrations",
"env_logger 0.11.8",
@ -3448,6 +3436,7 @@ dependencies = [
"syncstorage-db-common",
"syncstorage-settings",
"thiserror 1.0.69",
"tokio",
"url",
]
@ -3709,6 +3698,7 @@ dependencies = [
"backtrace",
"deadpool",
"diesel",
"diesel-async",
"diesel_migrations",
"http 1.3.1",
"serde 1.0.219",

View File

@ -11,8 +11,7 @@ deadpool.workspace = true
futures.workspace = true
http.workspace = true
thiserror.workspace = true
diesel = { workspace = true, features = ["mysql", "r2d2"] }
diesel-async = { workspace = true }
diesel_migrations = { workspace = true, features = ["mysql"] }
diesel.workspace = true
diesel-async.workspace = true
diesel_migrations.workspace = true
syncserver-common = { path = "../syncserver-common" }

View File

@ -22,9 +22,6 @@ enum SqlErrorKind {
#[error("An error occurred while establishing a db connection: {}", _0)]
DieselConnection(#[from] diesel::result::ConnectionError),
#[error("A database pool error occurred: {}", _0)]
Pool(diesel::r2d2::PoolError),
#[error("Error migrating the database: {}", _0)]
Migration(diesel_migrations::MigrationError),
}
@ -41,18 +38,13 @@ impl From<SqlErrorKind> for SqlError {
impl ReportableError for SqlError {
fn is_sentry_event(&self) -> bool {
#[allow(clippy::match_like_matches_macro)]
match &self.kind {
SqlErrorKind::Pool(_) => false,
_ => true,
}
true
}
fn metric_label(&self) -> Option<&'static str> {
Some(match self.kind {
SqlErrorKind::DieselQuery(_) => "storage.sql.error.diesel_query",
SqlErrorKind::DieselConnection(_) => "storage.sql.error.diesel_connection",
SqlErrorKind::Pool(_) => "storage.sql.error.pool",
SqlErrorKind::Migration(_) => "storage.sql.error.migration",
})
}
@ -70,7 +62,6 @@ from_error!(
SqlError,
SqlErrorKind::DieselConnection
);
from_error!(diesel::r2d2::PoolError, SqlError, SqlErrorKind::Pool);
from_error!(
diesel_migrations::MigrationError,
SqlError,

View File

@ -20,14 +20,6 @@ pub struct PoolState {
pub idle_connections: u32,
}
impl From<diesel::r2d2::State> for PoolState {
fn from(state: diesel::r2d2::State) -> PoolState {
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
}
impl From<deadpool::Status> for PoolState {
fn from(status: deadpool::Status) -> PoolState {
PoolState {
@ -38,17 +30,13 @@ impl From<deadpool::Status> for PoolState {
}
#[macro_export]
macro_rules! sync_db_method {
($name:ident, $sync_name:ident, $type:ident) => {
sync_db_method!($name, $sync_name, $type, results::$type);
macro_rules! async_db_method {
($name:ident, $async_name:path, $type:ident) => {
async_db_method!($name, $async_name, $type, results::$type);
};
($name:ident, $sync_name:ident, $type:ident, $result:ty) => {
($name:ident, $async_name:path, $type:ident, $result:ty) => {
fn $name(&mut self, params: params::$type) -> DbFuture<'_, $result, DbError> {
let mut db = self.clone();
Box::pin(
self.blocking_threadpool
.spawn(move || db.$sync_name(params)),
)
Box::pin($async_name(self, params))
}
};
}

View File

@ -1,17 +1,6 @@
use deadpool::managed::{HookError, HookResult};
use diesel::{mysql::MysqlConnection, r2d2::CustomizeConnection, Connection};
use diesel_async::{pooled_connection::PoolError, AsyncConnection};
#[derive(Debug)]
pub struct TestTransactionCustomizer;
impl CustomizeConnection<MysqlConnection, diesel::r2d2::Error> for TestTransactionCustomizer {
fn on_acquire(&self, conn: &mut MysqlConnection) -> Result<(), diesel::r2d2::Error> {
conn.begin_test_transaction()
.map_err(diesel::r2d2::Error::QueryError)
}
}
pub async fn test_transaction_hook<T>(conn: &mut T) -> HookResult<PoolError>
where
T: AsyncConnection,

View File

@ -268,11 +268,12 @@ impl Server {
let blocking_threadpool = Arc::new(BlockingThreadpool::new(
settings.worker_max_blocking_threads,
));
let db_pool = DbPoolImpl::new(
let mut db_pool = DbPoolImpl::new(
&settings.syncstorage,
&Metrics::from(&metrics),
blocking_threadpool.clone(),
)?;
db_pool.init().await?;
// Spawns sweeper that calls Deadpool `retain` method, clearing unused connections.
db_pool.spawn_sweeper(Duration::from_secs(
settings

View File

@ -93,15 +93,21 @@ async fn get_test_state(settings: &Settings) -> ServerState {
app_channel: settings.environment.clone(),
});
let mut db_pool = Box::new(
DbPoolImpl::new(
&settings.syncstorage,
&Metrics::from(&metrics),
blocking_threadpool,
)
.expect("Could not get db_pool in get_test_state"),
);
db_pool
.init()
.await
.expect("Could not init db_pool in get_test_state");
ServerState {
db_pool: Box::new(
DbPoolImpl::new(
&settings.syncstorage,
&Metrics::from(&metrics),
blocking_threadpool,
)
.expect("Could not get db_pool in get_test_state"),
),
db_pool,
limits: Arc::clone(&SERVER_LIMITS),
limits_json: serde_json::to_string(&**SERVER_LIMITS).unwrap(),
metrics,

View File

@ -38,7 +38,7 @@ impl ServerState {
pub fn from_settings(
settings: &Settings,
metrics: Arc<StatsdClient>,
blocking_threadpool: Arc<BlockingThreadpool>,
#[allow(unused_variables)] blocking_threadpool: Arc<BlockingThreadpool>,
) -> Result<Self, ApiError> {
#[cfg(not(feature = "py_verifier"))]
let oauth_verifier = {

View File

@ -8,6 +8,7 @@ edition.workspace = true
[dependencies]
backtrace.workspace = true
chrono.workspace = true
diesel.workspace = true
futures.workspace = true
lazy_static.workspace = true
http.workspace = true
@ -16,6 +17,5 @@ serde_json.workspace = true
thiserror.workspace = true
async-trait = "0.1.88"
diesel = { workspace = true, features = ["mysql", "r2d2"] }
syncserver-common = { path = "../syncserver-common" }
syncserver-db-common = { path = "../syncserver-db-common" }

View File

@ -51,6 +51,10 @@ pub const FIRST_CUSTOM_COLLECTION_ID: i32 = 101;
pub trait DbPool: Sync + Send + Debug + GetPoolState {
type Error;
async fn init(&mut self) -> Result<(), Self::Error> {
Ok(())
}
async fn get(&self) -> Result<Box<dyn Db<Error = Self::Error>>, Self::Error>;
fn validate_batch_id(&self, params: params::ValidateBatchId) -> Result<(), Self::Error>;

View File

@ -23,7 +23,8 @@ pub async fn db_pool(settings: Option<SyncstorageSettings>) -> Result<DbPoolImpl
settings.database_use_test_transactions = use_test_transactions;
let metrics = Metrics::noop();
let pool = DbPoolImpl::new(&settings, &metrics, Arc::new(BlockingThreadpool::new(512)))?;
let mut pool = DbPoolImpl::new(&settings, &metrics, Arc::new(BlockingThreadpool::new(512)))?;
pool.init().await?;
Ok(pool)
}

View File

@ -8,20 +8,23 @@ edition.workspace = true
[dependencies]
backtrace.workspace = true
base64.workspace = true
deadpool.workspace = true
diesel.workspace = true
diesel-async.workspace = true
diesel_logger.workspace = true
diesel_migrations.workspace = true
futures.workspace = true
http.workspace = true
slog-scope.workspace = true
thiserror.workspace = true
async-trait = "0.1.88"
diesel = { workspace = true, features = ["mysql", "r2d2"] }
diesel_logger = { workspace = true }
diesel_migrations = { workspace = true, features = ["mysql"] }
tokio = { workspace = true, features = ["macros", "sync"] }
url = "2.1"
syncserver-common = { path = "../syncserver-common" }
syncserver-db-common = { path = "../syncserver-db-common" }
syncstorage-db-common = { path = "../syncstorage-db-common" }
syncstorage-settings = { path = "../syncstorage-settings" }
url = "2.1"
[dev-dependencies]
env_logger.workspace = true

View File

@ -8,8 +8,9 @@ use diesel::{
result::{DatabaseErrorKind::UniqueViolation, Error as DieselError},
sql_query,
sql_types::{BigInt, Integer},
ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
ExpressionMethods, OptionalExtension, QueryDsl,
};
use diesel_async::RunQueryDsl;
use syncstorage_db_common::{params, results, UserIdentifier, BATCH_LIFETIME};
use super::{
@ -23,9 +24,12 @@ const MAX_TTL: i32 = 2_100_000_000;
const MAX_BATCH_CREATE_RETRY: u8 = 5;
pub fn create(db: &mut MysqlDb, params: params::CreateBatch) -> DbResult<results::CreateBatch> {
pub async fn create(
db: &mut MysqlDb,
params: params::CreateBatch,
) -> DbResult<results::CreateBatch> {
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
let collection_id = db.get_collection_id(&params.collection).await?;
// Careful, there's some weirdness here!
//
// Sync timestamps are in seconds and quantized to two decimal places, so
@ -48,7 +52,8 @@ pub fn create(db: &mut MysqlDb, params: params::CreateBatch) -> DbResult<results
batch_uploads::user_id.eq(&user_id),
batch_uploads::collection_id.eq(&collection_id),
))
.execute(&mut *db.conn.write()?);
.execute(&mut db.conn)
.await;
match result {
Ok(_) => break,
Err(DieselError::DatabaseError(UniqueViolation, _)) => {
@ -61,14 +66,14 @@ pub fn create(db: &mut MysqlDb, params: params::CreateBatch) -> DbResult<results
}
}
do_append(db, batch_id, params.user_id, collection_id, params.bsos)?;
do_append(db, batch_id, params.user_id, collection_id, params.bsos).await?;
Ok(results::CreateBatch {
id: encode_id(batch_id),
size: None,
})
}
pub fn validate(db: &mut MysqlDb, params: params::ValidateBatch) -> DbResult<bool> {
pub async fn validate(db: &mut MysqlDb, params: params::ValidateBatch) -> DbResult<bool> {
let batch_id = decode_id(&params.id)?;
// Avoid hitting the db for batches that are obviously too old. Recall
// that the batchid is a millisecond timestamp.
@ -77,18 +82,19 @@ pub fn validate(db: &mut MysqlDb, params: params::ValidateBatch) -> DbResult<boo
}
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
let collection_id = db.get_collection_id(&params.collection).await?;
let exists = batch_uploads::table
.select(sql::<Integer>("1"))
.filter(batch_uploads::batch_id.eq(&batch_id))
.filter(batch_uploads::user_id.eq(&user_id))
.filter(batch_uploads::collection_id.eq(&collection_id))
.get_result::<i32>(&mut *db.conn.write()?)
.get_result::<i32>(&mut db.conn)
.await
.optional()?;
Ok(exists.is_some())
}
pub fn append(db: &mut MysqlDb, params: params::AppendToBatch) -> DbResult<()> {
pub async fn append(db: &mut MysqlDb, params: params::AppendToBatch) -> DbResult<()> {
let exists = validate(
db,
params::ValidateBatch {
@ -96,19 +102,23 @@ pub fn append(db: &mut MysqlDb, params: params::AppendToBatch) -> DbResult<()> {
collection: params.collection.clone(),
id: params.batch.id.clone(),
},
)?;
)
.await?;
if !exists {
return Err(DbError::batch_not_found());
}
let batch_id = decode_id(&params.batch.id)?;
let collection_id = db.get_collection_id(&params.collection)?;
do_append(db, batch_id, params.user_id, collection_id, params.bsos)?;
let collection_id = db.get_collection_id(&params.collection).await?;
do_append(db, batch_id, params.user_id, collection_id, params.bsos).await?;
Ok(())
}
pub fn get(db: &mut MysqlDb, params: params::GetBatch) -> DbResult<Option<results::GetBatch>> {
pub async fn get(
db: &mut MysqlDb,
params: params::GetBatch,
) -> DbResult<Option<results::GetBatch>> {
let is_valid = validate(
db,
params::ValidateBatch {
@ -116,7 +126,8 @@ pub fn get(db: &mut MysqlDb, params: params::GetBatch) -> DbResult<Option<result
collection: params.collection,
id: params.id.clone(),
},
)?;
)
.await?;
let batch = if is_valid {
Some(results::GetBatch { id: params.id })
} else {
@ -125,27 +136,32 @@ pub fn get(db: &mut MysqlDb, params: params::GetBatch) -> DbResult<Option<result
Ok(batch)
}
pub fn delete(db: &mut MysqlDb, params: params::DeleteBatch) -> DbResult<()> {
pub async fn delete(db: &mut MysqlDb, params: params::DeleteBatch) -> DbResult<()> {
let batch_id = decode_id(&params.id)?;
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
let collection_id = db.get_collection_id(&params.collection).await?;
diesel::delete(batch_uploads::table)
.filter(batch_uploads::batch_id.eq(&batch_id))
.filter(batch_uploads::user_id.eq(&user_id))
.filter(batch_uploads::collection_id.eq(&collection_id))
.execute(&mut *db.conn.write()?)?;
.execute(&mut db.conn)
.await?;
diesel::delete(batch_upload_items::table)
.filter(batch_upload_items::batch_id.eq(&batch_id))
.filter(batch_upload_items::user_id.eq(&user_id))
.execute(&mut *db.conn.write()?)?;
.execute(&mut db.conn)
.await?;
Ok(())
}
/// Commits a batch to the bsos table, deleting the batch when succesful
pub fn commit(db: &mut MysqlDb, params: params::CommitBatch) -> DbResult<results::CommitBatch> {
pub async fn commit(
db: &mut MysqlDb,
params: params::CommitBatch,
) -> DbResult<results::CommitBatch> {
let batch_id = decode_id(&params.batch.id)?;
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
let collection_id = db.get_collection_id(&params.collection).await?;
let timestamp = db.timestamp();
sql_query(include_str!("batch_commit.sql"))
.bind::<BigInt, _>(user_id)
@ -157,9 +173,10 @@ pub fn commit(db: &mut MysqlDb, params: params::CommitBatch) -> DbResult<results
.bind::<BigInt, _>(user_id)
.bind::<BigInt, _>(&db.timestamp().as_i64())
.bind::<BigInt, _>(&db.timestamp().as_i64())
.execute(&mut *db.conn.write()?)?;
.execute(&mut db.conn)
.await?;
db.update_collection(user_id as u32, collection_id)?;
db.update_collection(user_id as u32, collection_id).await?;
delete(
db,
@ -168,11 +185,12 @@ pub fn commit(db: &mut MysqlDb, params: params::CommitBatch) -> DbResult<results
collection: params.collection,
id: params.batch.id,
},
)?;
)
.await?;
Ok(timestamp)
}
pub fn do_append(
pub async fn do_append(
db: &mut MysqlDb,
batch_id: i64,
user_id: UserIdentifier,
@ -217,7 +235,7 @@ pub fn do_append(
)
.bind::<BigInt, _>(user_id.legacy_id as i64)
.bind::<BigInt, _>(batch_id)
.get_results::<ExistsResult>(&mut *db.conn.write()?)?
.get_results::<ExistsResult>(&mut db.conn).await?
{
existing.insert(exist_idx(
user_id.legacy_id,
@ -241,7 +259,8 @@ pub fn do_append(
payload_size,
ttl_offset: bso.ttl.map(|ttl| ttl as i32),
})
.execute(&mut *db.conn.write()?)?;
.execute(&mut db.conn)
.await?;
} else {
diesel::insert_into(batch_upload_items::table)
.values((
@ -253,7 +272,8 @@ pub fn do_append(
batch_upload_items::payload_size.eq(payload_size),
batch_upload_items::ttl_offset.eq(bso.ttl.map(|ttl| ttl as i32)),
))
.execute(&mut *db.conn.write()?)?;
.execute(&mut db.conn)
.await?;
// make sure to include the key into our table check.
existing.insert(exist_idx);
}
@ -282,8 +302,8 @@ fn decode_id(id: &str) -> DbResult<i64> {
macro_rules! batch_db_method {
($name:ident, $batch_name:ident, $type:ident) => {
pub fn $name(&mut self, params: params::$type) -> DbResult<results::$type> {
batch::$batch_name(self, params)
pub async fn $name(&mut self, params: params::$type) -> DbResult<results::$type> {
batch::$batch_name(self, params).await
}
};
}

View File

@ -41,6 +41,10 @@ impl DbError {
pub fn quota() -> Self {
DbErrorKind::Common(SyncstorageDbError::quota()).into()
}
pub fn pool_timeout(timeout_type: deadpool::managed::TimeoutType) -> Self {
DbErrorKind::PoolTimeout(timeout_type).into()
}
}
#[derive(Debug, Error)]
@ -50,6 +54,9 @@ enum DbErrorKind {
#[error("{}", _0)]
Mysql(SqlError),
#[error("A database pool timeout occurred, type: {:?}", _0)]
PoolTimeout(deadpool::managed::TimeoutType),
}
impl From<DbErrorKind> for DbError {
@ -96,6 +103,7 @@ impl ReportableError for DbError {
Some(match &self.kind {
DbErrorKind::Common(e) => e,
DbErrorKind::Mysql(e) => e,
_ => return None,
})
}
@ -103,6 +111,7 @@ impl ReportableError for DbError {
match &self.kind {
DbErrorKind::Common(e) => e.is_sentry_event(),
DbErrorKind::Mysql(e) => e.is_sentry_event(),
DbErrorKind::PoolTimeout(_) => false,
}
}
@ -110,6 +119,7 @@ impl ReportableError for DbError {
match &self.kind {
DbErrorKind::Common(e) => e.metric_label(),
DbErrorKind::Mysql(e) => e.metric_label(),
DbErrorKind::PoolTimeout(_) => Some("storage.diesel.pool.timeout"),
}
}
@ -117,6 +127,7 @@ impl ReportableError for DbError {
match &self.kind {
DbErrorKind::Common(e) => e.backtrace(),
DbErrorKind::Mysql(e) => e.backtrace(),
_ => None,
}
}
@ -124,6 +135,7 @@ impl ReportableError for DbError {
match &self.kind {
DbErrorKind::Common(e) => e.tags(),
DbErrorKind::Mysql(e) => e.tags(),
_ => vec![],
}
}
}
@ -149,11 +161,6 @@ from_error!(
error
)))
);
from_error!(
diesel::r2d2::PoolError,
DbError,
|error: diesel::r2d2::PoolError| DbError::from(DbErrorKind::Mysql(SqlError::from(error)))
);
from_error!(
diesel_migrations::MigrationError,
DbError,
@ -166,9 +173,3 @@ from_error!(
DbError,
|error: std::boxed::Box<dyn std::error::Error>| DbError::internal_error(error.to_string())
);
impl<Guard> From<std::sync::PoisonError<Guard>> for DbError {
fn from(inner: std::sync::PoisonError<Guard>) -> DbError {
DbError::internal_error(inner.to_string())
}
}

File diff suppressed because it is too large Load Diff

View File

@ -7,31 +7,43 @@ use std::{
time::Duration,
};
use diesel::{
mysql::MysqlConnection,
r2d2::{ConnectionManager, Pool},
Connection,
use deadpool::managed::PoolError;
use diesel::Connection;
use diesel_async::{
async_connection_wrapper::AsyncConnectionWrapper,
pooled_connection::{
deadpool::{Object, Pool},
AsyncDieselConnectionManager,
},
AsyncMysqlConnection,
};
#[cfg(debug_assertions)]
use diesel_logger::LoggingConnection;
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use syncserver_common::{BlockingThreadpool, Metrics};
#[cfg(debug_assertions)]
use syncserver_db_common::test::TestTransactionCustomizer;
use syncserver_db_common::test::test_transaction_hook;
use syncserver_db_common::{GetPoolState, PoolState};
use syncstorage_db_common::{Db, DbPool, STD_COLLS};
use syncstorage_settings::{Quota, Settings};
use tokio::task::spawn_blocking;
use super::{error::DbError, models::MysqlDb, DbResult};
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
pub(crate) type Conn = Object<AsyncMysqlConnection>;
/// Run the diesel embedded migrations
///
/// Mysql DDL statements implicitly commit which could disrupt MysqlPool's
/// begin_test_transaction during tests. So this runs on its own separate conn.
///
/// Note that this runs as a plain diesel blocking method as diesel_async
/// doesn't support async migrations (but we utilize its connection via its
/// [AsyncConnectionWrapper])
fn run_embedded_migrations(database_url: &str) -> DbResult<()> {
let conn = MysqlConnection::establish(database_url)?;
let conn = AsyncConnectionWrapper::<AsyncMysqlConnection>::establish(database_url)?;
// This conn2 charade is to make mut-ness the same for both cases.
#[cfg(debug_assertions)]
@ -46,51 +58,58 @@ fn run_embedded_migrations(database_url: &str) -> DbResult<()> {
#[derive(Clone)]
pub struct MysqlDbPool {
/// Pool of db connections
pool: Pool<ConnectionManager<MysqlConnection>>,
pool: Pool<AsyncMysqlConnection>,
/// Thread Pool for running synchronous db calls
/// In-memory cache of collection_ids and their names
coll_cache: Arc<CollectionCache>,
metrics: Metrics,
quota: Quota,
blocking_threadpool: Arc<BlockingThreadpool>,
database_url: String,
}
impl MysqlDbPool {
/// Creates a new pool of Mysql db connections.
///
/// Also initializes the Mysql db, ensuring all migrations are ran.
/// Doesn't initialize the db (does not run migrations).
pub fn new(
settings: &Settings,
metrics: &Metrics,
blocking_threadpool: Arc<BlockingThreadpool>,
_blocking_threadpool: Arc<BlockingThreadpool>,
) -> DbResult<Self> {
run_embedded_migrations(&settings.database_url)?;
Self::new_without_migrations(settings, metrics, blocking_threadpool)
}
let manager =
AsyncDieselConnectionManager::<AsyncMysqlConnection>::new(&settings.database_url);
pub fn new_without_migrations(
settings: &Settings,
metrics: &Metrics,
blocking_threadpool: Arc<BlockingThreadpool>,
) -> DbResult<Self> {
let manager = ConnectionManager::<MysqlConnection>::new(settings.database_url.clone());
let builder = Pool::builder()
.max_size(settings.database_pool_max_size)
.connection_timeout(Duration::from_secs(
settings.database_pool_connection_timeout.unwrap_or(30) as u64,
))
.min_idle(settings.database_pool_min_idle);
let wait = settings
.database_pool_connection_timeout
.map(|seconds| Duration::from_secs(seconds as u64));
let timeouts = deadpool::managed::Timeouts {
wait,
..Default::default()
};
let config = deadpool::managed::PoolConfig {
max_size: settings.database_pool_max_size as usize,
timeouts,
..Default::default()
};
let builder = Pool::builder(manager)
.config(config)
.runtime(deadpool::Runtime::Tokio1);
#[cfg(debug_assertions)]
let builder = if settings.database_use_test_transactions {
builder.connection_customizer(Box::new(TestTransactionCustomizer))
builder.post_create(deadpool::managed::Hook::async_fn(|conn, _| {
Box::pin(async { test_transaction_hook(conn).await })
}))
} else {
builder
};
let pool = builder
.build()
.map_err(|e| DbError::internal(format!("Couldn't build Db Pool: {e}")))?;
Ok(Self {
pool: builder.build(manager)?,
pool,
coll_cache: Default::default(),
metrics: metrics.clone(),
quota: Quota {
@ -98,7 +117,7 @@ impl MysqlDbPool {
enabled: settings.enable_quota,
enforced: settings.enforce_quota,
},
blocking_threadpool,
database_url: settings.database_url.clone(),
})
}
@ -109,13 +128,21 @@ impl MysqlDbPool {
sweeper()
}
pub fn get_sync(&self) -> DbResult<MysqlDb> {
pub async fn get_mysql_db(&self) -> DbResult<MysqlDb> {
let conn = self.pool.get().await.map_err(|e| match e {
PoolError::Backend(be) => match be {
diesel_async::pooled_connection::PoolError::ConnectionError(ce) => ce.into(),
diesel_async::pooled_connection::PoolError::QueryError(dbe) => dbe.into(),
},
PoolError::Timeout(timeout_type) => DbError::pool_timeout(timeout_type),
_ => DbError::internal(format!("deadpool PoolError: {e}")),
})?;
Ok(MysqlDb::new(
self.pool.get()?,
conn,
Arc::clone(&self.coll_cache),
&self.metrics,
&self.quota,
self.blocking_threadpool.clone(),
))
}
}
@ -131,12 +158,16 @@ fn sweeper() {}
impl DbPool for MysqlDbPool {
type Error = DbError;
async fn get<'a>(&'a self) -> DbResult<Box<dyn Db<Error = Self::Error>>> {
let pool = self.clone();
self.blocking_threadpool
.spawn(move || pool.get_sync())
async fn init(&mut self) -> Result<(), Self::Error> {
let database_url = self.database_url.clone();
spawn_blocking(move || run_embedded_migrations(&database_url))
.await
.map(|db| Box::new(db) as Box<dyn Db<Error = Self::Error>>)
.map_err(|e| DbError::internal(format!("Couldn't spawn migrations: {e}")))??;
Ok(())
}
async fn get<'a>(&'a self) -> DbResult<Box<dyn Db<Error = Self::Error>>> {
Ok(Box::new(self.get_mysql_db().await?) as Box<dyn Db<Error = Self::Error>>)
}
fn validate_batch_id(&self, id: String) -> DbResult<()> {
@ -158,7 +189,7 @@ impl fmt::Debug for MysqlDbPool {
impl GetPoolState for MysqlDbPool {
fn state(&self) -> PoolState {
self.pool.state().into()
self.pool.status().into()
}
}

View File

@ -4,35 +4,37 @@ use diesel::{
// expression_methods::TextExpressionMethods, // See note below about `not_like` becoming swedish
ExpressionMethods,
QueryDsl,
RunQueryDsl,
};
use diesel_async::RunQueryDsl;
use syncserver_common::{BlockingThreadpool, Metrics};
use syncserver_settings::Settings as SyncserverSettings;
use syncstorage_db_common::DbPool;
use syncstorage_settings::Settings as SyncstorageSettings;
use url::Url;
use crate::{models::MysqlDb, pool::MysqlDbPool, schema::collections, DbResult};
pub fn db(settings: &SyncstorageSettings) -> DbResult<MysqlDb> {
async fn db(settings: &SyncstorageSettings) -> DbResult<MysqlDb> {
let _ = env_logger::try_init();
// inherit SYNC_SYNCSTORAGE__DATABASE_URL from the env
let pool = MysqlDbPool::new(
let mut pool = MysqlDbPool::new(
settings,
&Metrics::noop(),
Arc::new(BlockingThreadpool::new(512)),
)?;
pool.get_sync()
pool.init().await?;
pool.get_mysql_db().await
}
#[test]
fn static_collection_id() -> DbResult<()> {
#[tokio::test]
async fn static_collection_id() -> DbResult<()> {
let settings = SyncserverSettings::test_settings().syncstorage;
if Url::parse(&settings.database_url).unwrap().scheme() != "mysql" {
// Skip this test if we're not using mysql
return Ok(());
}
let mut db = db(&settings)?;
let mut db = db(&settings).await?;
// ensure DB actually has predefined common collections
let cols: Vec<(i32, _)> = vec![
@ -58,7 +60,8 @@ fn static_collection_id() -> DbResult<()> {
.filter(collections::name.ne(""))
.filter(collections::name.ne("xxx_col2")) // from server::test
.filter(collections::name.ne("col2")) // from older intergration tests
.load(&mut *db.inner.conn.write()?)?
.load(&mut db.conn)
.await?
.into_iter()
.collect();
assert_eq!(results.len(), cols.len(), "mismatched columns");
@ -67,11 +70,11 @@ fn static_collection_id() -> DbResult<()> {
}
for (id, name) in &cols {
let result = db.get_collection_id(name)?;
let result = db.get_collection_id(name).await?;
assert_eq!(result, *id);
}
let cid = db.get_or_create_collection_id("col1")?;
let cid = db.get_or_create_collection_id("col1").await?;
assert!(cid >= 100);
Ok(())
}

View File

@ -72,8 +72,6 @@ impl From<&Settings> for Deadman {
pub struct Settings {
pub database_url: String,
pub database_pool_max_size: u32,
// NOTE: Not supported by deadpool!
pub database_pool_min_idle: Option<u32>,
/// Pool timeout when waiting for a slot to become available, in seconds
pub database_pool_connection_timeout: Option<u32>,
/// Max age a given connection should live, in seconds
@ -116,7 +114,6 @@ impl Default for Settings {
Settings {
database_url: "mysql://root@127.0.0.1/syncstorage".to_string(),
database_pool_max_size: 10,
database_pool_min_idle: None,
database_pool_connection_lifespan: None,
database_pool_connection_max_idle: None,
database_pool_sweeper_task_interval: 30,

View File

@ -10,6 +10,7 @@ async-trait.workspace = true
backtrace.workspace = true
deadpool.workspace = true
diesel.workspace = true
diesel-async.workspace = true
diesel_migrations.workspace = true
http.workspace = true
serde.workspace = true

View File

@ -1,6 +1,7 @@
use std::fmt;
use backtrace::Backtrace;
use deadpool::managed::PoolError;
use http::StatusCode;
use syncserver_common::{from_error, impl_fmt_display, InternalError, ReportableError};
use syncserver_db_common::error::SqlError;
@ -118,11 +119,6 @@ from_error!(
DbError,
|error: diesel::result::ConnectionError| DbError::from(DbErrorKind::Sql(SqlError::from(error)))
);
from_error!(
diesel::r2d2::PoolError,
DbError,
|error: diesel::r2d2::PoolError| DbError::from(DbErrorKind::Sql(SqlError::from(error)))
);
from_error!(
diesel_migrations::MigrationError,
DbError,
@ -135,3 +131,16 @@ from_error!(
DbError,
|error: std::boxed::Box<dyn std::error::Error>| DbError::internal_error(error.to_string())
);
impl From<PoolError<diesel_async::pooled_connection::PoolError>> for DbError {
fn from(pe: PoolError<diesel_async::pooled_connection::PoolError>) -> DbError {
match pe {
PoolError::Backend(be) => match be {
diesel_async::pooled_connection::PoolError::ConnectionError(ce) => ce.into(),
diesel_async::pooled_connection::PoolError::QueryError(dbe) => dbe.into(),
},
PoolError::Timeout(timeout_type) => DbError::pool_timeout(timeout_type),
_ => DbError::internal(format!("deadpool PoolError: {pe}")),
}
}
}

View File

@ -8,14 +8,14 @@ license.workspace = true
[dependencies]
async-trait.workspace = true
deadpool.workspace = true
diesel = { workspace = true, features = ["postgres", "r2d2"] }
diesel = { workspace = true, features = ["postgres"] }
diesel-async = { workspace = true, features = ["postgres"] }
diesel_logger.workspace = true
diesel_migrations = { workspace = true, features = ["postgres"] }
diesel_logger.workspace = true
diesel_migrations.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }
syncserver-common = { path = "../syncserver-common" }
syncserver-db-common = { path = "../syncserver-db-common" }
tokenserver-common = { path = "../tokenserver-common" }
tokenserver-db-common = { path = "../tokenserver-db-common" }
tokenserver-settings = { path = "../tokenserver-settings" }
tokenserver-settings = { path = "../tokenserver-settings" }

View File

@ -1,7 +1,6 @@
use std::time::Duration;
use async_trait::async_trait;
use deadpool::managed::PoolError;
use diesel::Connection;
use diesel_async::{
async_connection_wrapper::AsyncConnectionWrapper,
@ -112,21 +111,8 @@ impl TokenserverPgPool {
}
async fn get_tokenserver_db(&self) -> Result<TokenserverPgDb, DbError> {
let conn = self.inner.get().await.map_err(|e| match e {
PoolError::Backend(backend_err) => match backend_err {
diesel_async::pooled_connection::PoolError::ConnectionError(conn_err) => {
conn_err.into()
}
diesel_async::pooled_connection::PoolError::QueryError(query_err) => {
query_err.into()
}
},
PoolError::Timeout(timeout_type) => DbError::pool_timeout(timeout_type),
_ => DbError::internal(format!("Deadpool PoolError: {e}")),
})?;
Ok(TokenserverPgDb::new(
conn,
self.inner.get().await?,
&self.metrics,
self.service_id,
self.spanner_node_id,

View File

@ -8,12 +8,11 @@ edition.workspace = true
[dependencies]
async-trait.workspace = true
http.workspace = true
deadpool = { workspace = true }
diesel = { workspace = true }
diesel-async = { workspace = true }
diesel_logger = { workspace = true }
diesel_migrations = { workspace = true, features = ["mysql"] }
deadpool.workspace = true
diesel.workspace = true
diesel-async.workspace = true
diesel_logger.workspace = true
diesel_migrations.workspace = true
syncserver-common = { path = "../syncserver-common" }
syncserver-db-common = { path = "../syncserver-db-common" }
tokenserver-common = { path = "../tokenserver-common" }

View File

@ -1,7 +1,6 @@
use std::time::Duration;
use async_trait::async_trait;
use deadpool::managed::PoolError;
use diesel::Connection;
use diesel_async::{
async_connection_wrapper::AsyncConnectionWrapper,
@ -110,17 +109,8 @@ impl TokenserverPool {
}
pub async fn get_tokenserver_db(&self) -> Result<TokenserverDb, DbError> {
let conn = self.inner.get().await.map_err(|e| match e {
PoolError::Backend(be) => match be {
diesel_async::pooled_connection::PoolError::ConnectionError(ce) => ce.into(),
diesel_async::pooled_connection::PoolError::QueryError(dbe) => dbe.into(),
},
PoolError::Timeout(timeout_type) => DbError::pool_timeout(timeout_type),
_ => DbError::internal(format!("deadpool PoolError: {e}")),
})?;
Ok(TokenserverDb::new(
conn,
self.inner.get().await?,
&self.metrics,
self.service_id,
self.spanner_node_id,