mirror of
https://github.com/mozilla-services/syncstorage-rs.git
synced 2025-08-06 20:06:57 +02:00
Feat: add task to release unused db conns (#1640)
Feat: add task to release unused db conns
This commit is contained in:
parent
bc79ccb972
commit
c01021b87d
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -3006,6 +3006,7 @@ dependencies = [
|
|||||||
name = "syncstorage-spanner"
|
name = "syncstorage-spanner"
|
||||||
version = "0.17.15"
|
version = "0.17.15"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"actix-web",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"backtrace",
|
"backtrace",
|
||||||
"cadence",
|
"cadence",
|
||||||
|
@ -273,6 +273,13 @@ impl Server {
|
|||||||
&Metrics::from(&metrics),
|
&Metrics::from(&metrics),
|
||||||
blocking_threadpool.clone(),
|
blocking_threadpool.clone(),
|
||||||
)?;
|
)?;
|
||||||
|
// Spawns sweeper that calls Deadpool `retain` method, clearing unused connections.
|
||||||
|
db_pool.spawn_sweeper(Duration::from_secs(
|
||||||
|
settings
|
||||||
|
.syncstorage
|
||||||
|
.database_pool_sweeper_task_interval
|
||||||
|
.into(),
|
||||||
|
));
|
||||||
let glean_logger = Arc::new(GleanEventsLogger {
|
let glean_logger = Arc::new(GleanEventsLogger {
|
||||||
// app_id corresponds to probe-scraper entry.
|
// app_id corresponds to probe-scraper entry.
|
||||||
// https://github.com/mozilla/probe-scraper/blob/main/repositories.yaml
|
// https://github.com/mozilla/probe-scraper/blob/main/repositories.yaml
|
||||||
|
@ -99,6 +99,13 @@ impl MysqlDbPool {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawn a task to periodically evict idle connections. Calls wrapper sweeper fn
|
||||||
|
/// to use pool.retain, retaining objects only if they are shorter in duration than
|
||||||
|
/// defined max_idle. Noop for mysql impl.
|
||||||
|
pub fn spawn_sweeper(&self, _interval: Duration) {
|
||||||
|
sweeper()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_sync(&self) -> DbResult<MysqlDb> {
|
pub fn get_sync(&self) -> DbResult<MysqlDb> {
|
||||||
Ok(MysqlDb::new(
|
Ok(MysqlDb::new(
|
||||||
self.pool.get()?,
|
self.pool.get()?,
|
||||||
@ -110,6 +117,13 @@ impl MysqlDbPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sweeper to retain only the objects specified within the closure.
|
||||||
|
/// In this context, if a Spanner connection is unutilized, we want it
|
||||||
|
/// to release the given connections.
|
||||||
|
/// See: https://docs.rs/deadpool/latest/deadpool/managed/struct.Pool.html#method.retain
|
||||||
|
/// Noop for mysql impl
|
||||||
|
fn sweeper() {}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl DbPool for MysqlDbPool {
|
impl DbPool for MysqlDbPool {
|
||||||
type Error = DbError;
|
type Error = DbError;
|
||||||
|
@ -78,6 +78,8 @@ pub struct Settings {
|
|||||||
pub database_pool_connection_lifespan: Option<u32>,
|
pub database_pool_connection_lifespan: Option<u32>,
|
||||||
/// Max time a connection should sit idle before being dropped.
|
/// Max time a connection should sit idle before being dropped.
|
||||||
pub database_pool_connection_max_idle: Option<u32>,
|
pub database_pool_connection_max_idle: Option<u32>,
|
||||||
|
/// Interval for sweeper task releasing unused connections.
|
||||||
|
pub database_pool_sweeper_task_interval: u32,
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
pub database_use_test_transactions: bool,
|
pub database_use_test_transactions: bool,
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
@ -115,6 +117,7 @@ impl Default for Settings {
|
|||||||
database_pool_min_idle: None,
|
database_pool_min_idle: None,
|
||||||
database_pool_connection_lifespan: None,
|
database_pool_connection_lifespan: None,
|
||||||
database_pool_connection_max_idle: None,
|
database_pool_connection_max_idle: None,
|
||||||
|
database_pool_sweeper_task_interval: 30,
|
||||||
database_pool_connection_timeout: Some(30),
|
database_pool_connection_timeout: Some(30),
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
database_use_test_transactions: false,
|
database_use_test_transactions: false,
|
||||||
|
@ -6,6 +6,7 @@ authors.workspace = true
|
|||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
actix-web.workspace = true
|
||||||
backtrace.workspace = true
|
backtrace.workspace = true
|
||||||
cadence.workspace = true
|
cadence.workspace = true
|
||||||
deadpool.workspace = true
|
deadpool.workspace = true
|
||||||
|
@ -13,7 +13,7 @@ use crate::error::DbError;
|
|||||||
pub(crate) type Conn = deadpool::managed::Object<SpannerSessionManager>;
|
pub(crate) type Conn = deadpool::managed::Object<SpannerSessionManager>;
|
||||||
|
|
||||||
pub(crate) struct SpannerSessionManager {
|
pub(crate) struct SpannerSessionManager {
|
||||||
settings: SpannerSessionSettings,
|
pub settings: SpannerSessionSettings,
|
||||||
/// The gRPC environment
|
/// The gRPC environment
|
||||||
env: Arc<Environment>,
|
env: Arc<Environment>,
|
||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use std::{collections::HashMap, fmt, sync::Arc, time::Duration};
|
use std::{collections::HashMap, fmt, sync::Arc, time::Duration};
|
||||||
|
|
||||||
|
use actix_web::rt;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use syncserver_common::{BlockingThreadpool, Metrics};
|
use syncserver_common::{BlockingThreadpool, Metrics};
|
||||||
use syncserver_db_common::{GetPoolState, PoolState};
|
use syncserver_db_common::{GetPoolState, PoolState};
|
||||||
@ -49,7 +50,9 @@ impl SpannerDbPool {
|
|||||||
let config = deadpool::managed::PoolConfig {
|
let config = deadpool::managed::PoolConfig {
|
||||||
max_size,
|
max_size,
|
||||||
timeouts,
|
timeouts,
|
||||||
..Default::default()
|
// Prefer LIFO to allow the sweeper task to evict least frequently
|
||||||
|
// used connections.
|
||||||
|
queue_mode: deadpool::managed::QueueMode::Lifo,
|
||||||
};
|
};
|
||||||
let pool = deadpool::managed::Pool::builder(manager)
|
let pool = deadpool::managed::Pool::builder(manager)
|
||||||
.config(config)
|
.config(config)
|
||||||
@ -84,6 +87,30 @@ impl SpannerDbPool {
|
|||||||
self.quota,
|
self.quota,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawn a task to periodically evict idle connections. Calls wrapper sweeper fn
|
||||||
|
/// to use pool.retain, retaining objects only if they are shorter in duration than
|
||||||
|
/// defined max_idle.
|
||||||
|
pub fn spawn_sweeper(&self, interval: Duration) {
|
||||||
|
let Some(max_idle) = self.pool.manager().settings.max_idle else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let pool = self.pool.clone();
|
||||||
|
rt::spawn(async move {
|
||||||
|
loop {
|
||||||
|
sweeper(&pool, Duration::from_secs(max_idle.into()));
|
||||||
|
rt::time::sleep(interval).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sweeper to retain only the objects specified within the closure.
|
||||||
|
/// In this context, if a Spanner connection is unutilized, we want it
|
||||||
|
/// to release the given connection.
|
||||||
|
/// See: https://docs.rs/deadpool/latest/deadpool/managed/struct.Pool.html#method.retain
|
||||||
|
fn sweeper(pool: &deadpool::managed::Pool<SpannerSessionManager>, max_idle: Duration) {
|
||||||
|
pool.retain(|_, metrics| metrics.last_used() < max_idle);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
Loading…
Reference in New Issue
Block a user