mirror of
https://github.com/mozilla-services/syncstorage-rs.git
synced 2026-05-05 12:16:21 +02:00
feat: spawn Tokenserver pool reporter (#1283)
This commit is contained in:
parent
1a197a6c6e
commit
ee8e179479
@ -20,10 +20,6 @@ impl DbPool for MockDbPool {
|
||||
Ok(Box::new(MockDb::new()) as Box<dyn Db<'a>>)
|
||||
}
|
||||
|
||||
fn state(&self) -> results::PoolState {
|
||||
results::PoolState::default()
|
||||
}
|
||||
|
||||
fn validate_batch_id(&self, _: params::ValidateBatchId) -> Result<(), DbError> {
|
||||
Ok(())
|
||||
}
|
||||
@ -33,6 +29,12 @@ impl DbPool for MockDbPool {
|
||||
}
|
||||
}
|
||||
|
||||
impl GetPoolState for MockDbPool {
|
||||
fn state(&self) -> PoolState {
|
||||
PoolState::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MockDb;
|
||||
|
||||
|
||||
@ -60,11 +60,9 @@ pub const BATCH_LIFETIME: i64 = 2 * 60 * 60 * 1000; // 2 hours, in milliseconds
|
||||
type DbFuture<'a, T> = LocalBoxFuture<'a, Result<T, ApiError>>;
|
||||
|
||||
#[async_trait]
|
||||
pub trait DbPool: Sync + Send + Debug {
|
||||
pub trait DbPool: Sync + Send + Debug + GetPoolState {
|
||||
async fn get(&self) -> ApiResult<Box<dyn Db<'_>>>;
|
||||
|
||||
fn state(&self) -> results::PoolState;
|
||||
|
||||
fn validate_batch_id(&self, params: params::ValidateBatchId) -> Result<(), DbError>;
|
||||
|
||||
fn box_clone(&self) -> Box<dyn DbPool>;
|
||||
@ -76,6 +74,23 @@ impl Clone for Box<dyn DbPool> {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait GetPoolState {
|
||||
fn state(&self) -> PoolState;
|
||||
}
|
||||
|
||||
impl GetPoolState for Box<dyn DbPool> {
|
||||
fn state(&self) -> PoolState {
|
||||
(**self).state()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
/// A mockable r2d2::State
|
||||
pub struct PoolState {
|
||||
pub connections: u32,
|
||||
pub idle_connections: u32,
|
||||
}
|
||||
|
||||
pub trait Db<'a>: Debug + 'a {
|
||||
fn lock_for_read(&self, params: params::LockCollection) -> DbFuture<'_, ()>;
|
||||
|
||||
@ -280,10 +295,10 @@ pub async fn pool_from_settings(
|
||||
}
|
||||
|
||||
/// Emit DbPool metrics periodically
|
||||
pub fn spawn_pool_periodic_reporter(
|
||||
pub fn spawn_pool_periodic_reporter<T: GetPoolState + 'static>(
|
||||
interval: Duration,
|
||||
metrics: StatsdClient,
|
||||
pool: Box<dyn DbPool>,
|
||||
pool: T,
|
||||
) -> Result<(), DbError> {
|
||||
let hostname = hostname::get()
|
||||
.expect("Couldn't get hostname")
|
||||
@ -291,7 +306,7 @@ pub fn spawn_pool_periodic_reporter(
|
||||
.expect("Couldn't get hostname");
|
||||
actix_rt::spawn(async move {
|
||||
loop {
|
||||
let results::PoolState {
|
||||
let PoolState {
|
||||
connections,
|
||||
idle_connections,
|
||||
} = pool.state();
|
||||
|
||||
@ -20,11 +20,7 @@ use diesel_logger::LoggingConnection;
|
||||
use super::models::{MysqlDb, Result};
|
||||
#[cfg(test)]
|
||||
use super::test::TestTransactionCustomizer;
|
||||
use crate::db::{
|
||||
error::DbError,
|
||||
results::{self, PoolState},
|
||||
Db, DbPool, STD_COLLS,
|
||||
};
|
||||
use crate::db::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS};
|
||||
use crate::error::{ApiError, ApiResult};
|
||||
use crate::server::metrics::Metrics;
|
||||
use crate::settings::{Quota, Settings};
|
||||
@ -114,10 +110,6 @@ impl DbPool for MysqlDbPool {
|
||||
Ok(Box::new(db) as Box<dyn Db<'a>>)
|
||||
}
|
||||
|
||||
fn state(&self) -> results::PoolState {
|
||||
self.pool.state().into()
|
||||
}
|
||||
|
||||
fn validate_batch_id(&self, id: String) -> Result<()> {
|
||||
super::batch::validate_batch_id(&id)
|
||||
}
|
||||
@ -127,6 +119,12 @@ impl DbPool for MysqlDbPool {
|
||||
}
|
||||
}
|
||||
|
||||
impl GetPoolState for MysqlDbPool {
|
||||
fn state(&self) -> PoolState {
|
||||
self.pool.state().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for MysqlDbPool {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt.debug_struct("MysqlDbPool")
|
||||
|
||||
@ -80,13 +80,6 @@ pub struct PostBsos {
|
||||
pub failed: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
/// A mockable r2d2::State
|
||||
pub struct PoolState {
|
||||
pub connections: u32,
|
||||
pub idle_connections: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ConnectionInfo {
|
||||
pub age: i64,
|
||||
|
||||
@ -8,7 +8,7 @@ use grpcio::{EnvBuilder, Environment};
|
||||
use crate::{
|
||||
db::{
|
||||
error::{DbError, DbErrorKind},
|
||||
results::PoolState,
|
||||
PoolState,
|
||||
},
|
||||
server::metrics::Metrics,
|
||||
settings::Settings,
|
||||
|
||||
@ -7,7 +7,7 @@ use grpcio::{EnvBuilder, Environment};
|
||||
use crate::{
|
||||
db::{
|
||||
error::{DbError, DbErrorKind},
|
||||
results::PoolState,
|
||||
PoolState,
|
||||
},
|
||||
server::metrics::Metrics,
|
||||
settings::Settings,
|
||||
|
||||
@ -5,7 +5,7 @@ use bb8::ErrorSink;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::{
|
||||
db::{error::DbError, results, Db, DbPool, STD_COLLS},
|
||||
db::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS},
|
||||
error::ApiResult,
|
||||
server::metrics::Metrics,
|
||||
settings::{Quota, Settings},
|
||||
@ -100,10 +100,6 @@ impl DbPool for SpannerDbPool {
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn state(&self) -> results::PoolState {
|
||||
self.pool.status().into()
|
||||
}
|
||||
|
||||
fn validate_batch_id(&self, id: String) -> Result<()> {
|
||||
super::batch::validate_batch_id(&id)
|
||||
}
|
||||
@ -113,6 +109,12 @@ impl DbPool for SpannerDbPool {
|
||||
}
|
||||
}
|
||||
|
||||
impl GetPoolState for SpannerDbPool {
|
||||
fn state(&self) -> PoolState {
|
||||
self.pool.status().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SpannerDbPool {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt.debug_struct("SpannerDbPool")
|
||||
|
||||
@ -244,14 +244,22 @@ impl Server {
|
||||
..Default::default()
|
||||
}));
|
||||
let tokenserver_state = if settings.tokenserver.enabled {
|
||||
Some(tokenserver::ServerState::from_settings(
|
||||
let state = tokenserver::ServerState::from_settings(
|
||||
&settings.tokenserver,
|
||||
metrics::metrics_from_opts(
|
||||
&settings.tokenserver.statsd_label,
|
||||
settings.statsd_host.as_deref(),
|
||||
settings.statsd_port,
|
||||
)?,
|
||||
)?)
|
||||
)?;
|
||||
|
||||
spawn_pool_periodic_reporter(
|
||||
Duration::from_secs(10),
|
||||
*state.metrics.clone(),
|
||||
state.db_pool.clone(),
|
||||
)?;
|
||||
|
||||
Some(state)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@ -304,6 +312,13 @@ impl Server {
|
||||
settings.statsd_port,
|
||||
)?,
|
||||
)?;
|
||||
|
||||
spawn_pool_periodic_reporter(
|
||||
Duration::from_secs(10),
|
||||
*tokenserver_state.metrics.clone(),
|
||||
tokenserver_state.db_pool.clone(),
|
||||
)?;
|
||||
|
||||
let server = HttpServer::new(move || {
|
||||
build_app_without_syncstorage!(
|
||||
Some(tokenserver_state.clone()),
|
||||
|
||||
@ -7,7 +7,7 @@ use super::models::{Db, DbFuture};
|
||||
use super::params;
|
||||
use super::pool::DbPool;
|
||||
use super::results;
|
||||
use crate::db::error::DbError;
|
||||
use crate::db::{error::DbError, GetPoolState, PoolState};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MockDbPool;
|
||||
@ -29,6 +29,12 @@ impl DbPool for MockDbPool {
|
||||
}
|
||||
}
|
||||
|
||||
impl GetPoolState for MockDbPool {
|
||||
fn state(&self) -> PoolState {
|
||||
PoolState::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MockDb;
|
||||
|
||||
|
||||
@ -8,7 +8,7 @@ use diesel_logger::LoggingConnection;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::models::{Db, DbResult, TokenserverDb};
|
||||
use crate::db::{error::DbError, DbErrorKind};
|
||||
use crate::db::{error::DbError, DbErrorKind, GetPoolState, PoolState};
|
||||
use crate::diesel::Connection;
|
||||
use crate::server::metrics::Metrics;
|
||||
use crate::tokenserver::settings::Settings;
|
||||
@ -112,12 +112,24 @@ impl DbPool for TokenserverPool {
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait DbPool: Sync + Send {
|
||||
pub trait DbPool: Sync + Send + GetPoolState {
|
||||
async fn get(&self) -> Result<Box<dyn Db>, DbError>;
|
||||
|
||||
fn box_clone(&self) -> Box<dyn DbPool>;
|
||||
}
|
||||
|
||||
impl GetPoolState for TokenserverPool {
|
||||
fn state(&self) -> PoolState {
|
||||
self.inner.state().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl GetPoolState for Box<dyn DbPool> {
|
||||
fn state(&self) -> PoolState {
|
||||
(**self).state()
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Box<dyn DbPool> {
|
||||
fn clone(&self) -> Box<dyn DbPool> {
|
||||
self.box_clone()
|
||||
|
||||
@ -639,7 +639,7 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result<HttpResponse, Error> {
|
||||
let deadarc = state.deadman.clone();
|
||||
let mut deadman = *deadarc.read().await;
|
||||
let db_state = if cfg!(test) {
|
||||
use crate::db::results::PoolState;
|
||||
use crate::db::PoolState;
|
||||
use actix_web::http::header::HeaderValue;
|
||||
use std::str::FromStr;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user