Merge pull request #798 from mozilla-services/feat/796

feat: switch spanner's db pool to deadpool
This commit is contained in:
Philip Jenvey 2020-08-21 18:07:05 -07:00 committed by GitHub
commit 0a2fecccd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 208 additions and 61 deletions

15
Cargo.lock generated
View File

@ -777,6 +777,20 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "deadpool"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4aaff9a7a1de9893f4004fa08527b31cb2ae4121c44e053cf53f29203c73bd23"
dependencies = [
"async-trait",
"config",
"crossbeam-queue",
"num_cpus",
"serde 1.0.114",
"tokio",
]
[[package]]
name = "debugid"
version = "0.7.2"
@ -2719,6 +2733,7 @@ dependencies = [
"cadence",
"chrono",
"config",
"deadpool",
"diesel",
"diesel_logger",
"diesel_migrations",

View File

@ -26,6 +26,7 @@ bytes = "0.5"
cadence = "0.20.0"
chrono = "0.4"
config = "0.10"
deadpool = "0.5.2"
diesel = { version = "1.4.4", features = ["mysql", "r2d2"] }
diesel_logger = "0.1.1"
diesel_migrations = { version = "1.4.0", features = ["mysql"] }

View File

@ -94,6 +94,15 @@ impl From<bb8::State> for PoolState {
}
}
impl From<deadpool::Status> for PoolState {
fn from(status: deadpool::Status) -> PoolState {
PoolState {
connections: status.size as u32,
idle_connections: status.available as u32,
}
}
}
#[cfg(test)]
pub type GetCollectionId = i32;

View File

@ -18,7 +18,7 @@ use crate::{
};
pub async fn create_async(
db: &SpannerDb<'_>,
db: &SpannerDb,
params: params::CreateBatch,
) -> Result<results::CreateBatch> {
let batch_id = Uuid::new_v4().to_simple().to_string();
@ -57,7 +57,7 @@ pub async fn create_async(
Ok(batch_id)
}
pub async fn validate_async(db: &SpannerDb<'_>, params: params::ValidateBatch) -> Result<bool> {
pub async fn validate_async(db: &SpannerDb, params: params::ValidateBatch) -> Result<bool> {
let collection_id = db.get_collection_id_async(&params.collection).await?;
let exists = db
.sql(
@ -81,7 +81,7 @@ pub async fn validate_async(db: &SpannerDb<'_>, params: params::ValidateBatch) -
Ok(exists.is_some())
}
pub async fn append_async(db: &SpannerDb<'_>, params: params::AppendToBatch) -> Result<()> {
pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> Result<()> {
let mut metrics = db.metrics.clone();
metrics.start_timer("storage.spanner.append_items_to_batch", None);
@ -106,7 +106,7 @@ pub async fn append_async(db: &SpannerDb<'_>, params: params::AppendToBatch) ->
}
pub async fn get_async(
db: &SpannerDb<'_>,
db: &SpannerDb,
params: params::GetBatch,
) -> Result<Option<results::GetBatch>> {
let collection_id = db.get_collection_id_async(&params.collection).await?;
@ -142,7 +142,7 @@ pub async fn get_async(
Ok(batch)
}
pub async fn delete_async(db: &SpannerDb<'_>, params: params::DeleteBatch) -> Result<()> {
pub async fn delete_async(db: &SpannerDb, params: params::DeleteBatch) -> Result<()> {
let collection_id = db.get_collection_id_async(&params.collection).await?;
// Also deletes child batch_bsos rows (INTERLEAVE IN PARENT batches ON
// DELETE CASCADE)
@ -165,7 +165,7 @@ pub async fn delete_async(db: &SpannerDb<'_>, params: params::DeleteBatch) -> Re
}
pub async fn commit_async(
db: &SpannerDb<'_>,
db: &SpannerDb,
params: params::CommitBatch,
) -> Result<results::CommitBatch> {
let mut metrics = db.metrics.clone();
@ -239,7 +239,7 @@ pub async fn commit_async(
}
pub async fn do_append_async(
db: &SpannerDb<'_>,
db: &SpannerDb,
user_id: HawkIdentifier,
collection_id: i32,
batch_id: String,
@ -335,7 +335,7 @@ pub async fn do_append_async(
/// For the special case of a user creating a batch for a collection with no
/// prior data.
async fn pretouch_collection_async(
db: &SpannerDb<'_>,
db: &SpannerDb,
user_id: &HawkIdentifier,
collection_id: i32,
) -> Result<()> {

View File

@ -18,7 +18,7 @@ use crate::{
settings::Settings,
};
const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443";
pub const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443";
pub struct SpannerConnectionManager<T> {
database_name: String,
@ -33,6 +33,7 @@ impl<_T> fmt::Debug for SpannerConnectionManager<_T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("SpannerConnectionManager")
.field("database_name", &self.database_name)
.field("test_transactions", &self.test_transactions)
.finish()
}
}
@ -65,7 +66,7 @@ pub struct SpannerSession {
pub client: SpannerClient,
pub session: Session,
pub(super) use_test_transactions: bool,
pub(in crate::db::spanner) use_test_transactions: bool,
}
#[async_trait]
@ -130,7 +131,7 @@ impl<T: std::marker::Send + std::marker::Sync + 'static> ManageConnection
}
}
async fn create_session(
pub async fn create_session(
client: &SpannerClient,
database_name: &str,
) -> Result<Session, grpcio::Error> {

View File

@ -0,0 +1,115 @@
use std::{fmt, sync::Arc};
use actix_web::web::block;
use async_trait::async_trait;
use deadpool::managed::{RecycleError, RecycleResult};
use googleapis_raw::spanner::v1::{spanner::GetSessionRequest, spanner_grpc::SpannerClient};
use grpcio::{ChannelBuilder, ChannelCredentials, EnvBuilder, Environment};
use crate::{
db::error::{DbError, DbErrorKind},
server::metrics::Metrics,
settings::Settings,
};
use super::bb8::{create_session, SpannerSession, SPANNER_ADDRESS};
pub struct Manager {
database_name: String,
/// The gRPC environment
env: Arc<Environment>,
metrics: Metrics,
test_transactions: bool,
}
impl fmt::Debug for Manager {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Manager")
.field("database_name", &self.database_name)
.field("test_transactions", &self.test_transactions)
.finish()
}
}
impl Manager {
pub fn new(settings: &Settings, metrics: &Metrics) -> Result<Self, DbError> {
let url = &settings.database_url;
if !url.starts_with("spanner://") {
Err(DbErrorKind::InvalidUrl(url.to_owned()))?;
}
let database_name = url["spanner://".len()..].to_owned();
let env = Arc::new(EnvBuilder::new().build());
#[cfg(not(test))]
let test_transactions = false;
#[cfg(test)]
let test_transactions = settings.database_use_test_transactions;
Ok(Manager {
database_name,
env,
metrics: metrics.clone(),
test_transactions,
})
}
}
#[async_trait]
impl deadpool::managed::Manager<SpannerSession, DbError> for Manager {
async fn create(&self) -> Result<SpannerSession, DbError> {
let env = self.env.clone();
let mut metrics = self.metrics.clone();
// XXX: issue732: Could google_default_credentials (or
// ChannelBuilder::secure_connect) block?!
let chan = block(move || -> Result<grpcio::Channel, grpcio::Error> {
metrics.start_timer("storage.pool.grpc_auth", None);
// Requires
// GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
let creds = ChannelCredentials::google_default_credentials()?;
Ok(ChannelBuilder::new(env)
.max_send_message_len(100 << 20)
.max_receive_message_len(100 << 20)
.secure_connect(SPANNER_ADDRESS, creds))
})
.await
.map_err(|e| match e {
actix_web::error::BlockingError::Error(e) => e.into(),
actix_web::error::BlockingError::Canceled => {
DbError::internal("web::block Manager operation canceled")
}
})?;
let client = SpannerClient::new(chan);
// Connect to the instance and create a Spanner session.
let session = create_session(&client, &self.database_name).await?;
Ok(SpannerSession {
client,
session,
use_test_transactions: self.test_transactions,
})
}
async fn recycle(&self, conn: &mut SpannerSession) -> RecycleResult<DbError> {
let mut req = GetSessionRequest::new();
req.set_name(conn.session.get_name().to_owned());
if let Err(e) = conn
.client
.get_session_async(&req)
.map_err(|e| RecycleError::Backend(e.into()))?
.await
{
match e {
grpcio::Error::RpcFailure(ref status)
if status.status == grpcio::RpcStatusCode::NOT_FOUND =>
{
conn.session = create_session(&conn.client, &self.database_name)
.await
.map_err(|e| RecycleError::Backend(e.into()))?;
}
_ => return Err(RecycleError::Backend(e.into())),
}
}
Ok(())
}
}

View File

@ -0,0 +1,2 @@
pub mod bb8;
pub mod deadpool;

View File

@ -7,47 +7,46 @@ use std::{
sync::Arc,
};
use bb8::PooledConnection;
use futures::future::TryFutureExt;
use googleapis_raw::spanner::v1::transaction::{
self, TransactionOptions, TransactionOptions_ReadOnly, TransactionOptions_ReadWrite,
};
use googleapis_raw::spanner::v1::{
mutation::{Mutation, Mutation_Write},
spanner::{BeginTransactionRequest, CommitRequest, ExecuteSqlRequest, RollbackRequest},
transaction::{
TransactionOptions, TransactionOptions_ReadOnly, TransactionOptions_ReadWrite,
TransactionSelector,
},
type_pb::TypeCode,
};
#[allow(unused_imports)]
use protobuf::{well_known_types::ListValue, Message, RepeatedField};
use super::manager::{SpannerConnectionManager, SpannerSession};
use super::pool::CollectionCache;
use crate::db::{
error::{DbError, DbErrorKind},
params, results,
spanner::support::{as_type, StreamedResultSetAsync},
util::SyncTimestamp,
Db, DbFuture, Sorting, FIRST_CUSTOM_COLLECTION_ID,
use crate::{
db::{
error::{DbError, DbErrorKind},
params, results,
util::SyncTimestamp,
Db, DbFuture, Sorting, FIRST_CUSTOM_COLLECTION_ID,
},
server::metrics::Metrics,
web::extractors::{BsoQueryParams, HawkIdentifier, Offset},
};
use crate::server::metrics::Metrics;
use crate::web::extractors::{BsoQueryParams, HawkIdentifier, Offset};
use super::support::{bso_to_insert_row, bso_to_update_row};
use super::{
batch,
support::{as_list_value, as_value, bso_from_row, ExecuteSqlRequestBuilder},
pool::{CollectionCache, Conn},
support::{
as_list_value, as_type, as_value, bso_from_row, ExecuteSqlRequestBuilder,
StreamedResultSetAsync,
},
support::{bso_to_insert_row, bso_to_update_row},
};
pub type TransactionSelector = transaction::TransactionSelector;
#[derive(Debug, Eq, PartialEq)]
pub enum CollectionLock {
Read,
Write,
}
pub(super) type Conn<'a> = PooledConnection<'a, SpannerConnectionManager<SpannerSession>>;
pub type Result<T> = std::result::Result<T, DbError>;
/// The ttl to use for rows that are never supposed to expire (in seconds)
@ -81,8 +80,8 @@ struct SpannerDbSession {
}
#[derive(Clone, Debug)]
pub struct SpannerDb<'a> {
pub(super) inner: Arc<SpannerDbInner<'a>>,
pub struct SpannerDb {
pub(super) inner: Arc<SpannerDbInner>,
/// Pool level cache of collection_ids and their names
coll_cache: Arc<CollectionCache>,
@ -90,28 +89,28 @@ pub struct SpannerDb<'a> {
pub metrics: Metrics,
}
pub struct SpannerDbInner<'a> {
pub(super) conn: Conn<'a>,
pub struct SpannerDbInner {
pub(super) conn: Conn,
session: RefCell<SpannerDbSession>,
}
impl fmt::Debug for SpannerDbInner<'_> {
impl fmt::Debug for SpannerDbInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SpannerDbInner")
}
}
impl<'a> Deref for SpannerDb<'a> {
type Target = SpannerDbInner<'a>;
impl Deref for SpannerDb {
type Target = SpannerDbInner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<'a> SpannerDb<'a> {
pub fn new(conn: Conn<'a>, coll_cache: Arc<CollectionCache>, metrics: &Metrics) -> Self {
impl SpannerDb {
pub fn new(conn: Conn, coll_cache: Arc<CollectionCache>, metrics: &Metrics) -> Self {
let inner = SpannerDbInner {
conn,
session: RefCell::new(Default::default()),
@ -1605,7 +1604,7 @@ impl<'a> SpannerDb<'a> {
}
}
impl<'a> Db<'a> for SpannerDb<'a> {
impl<'a> Db<'a> for SpannerDb {
fn commit(&self) -> DbFuture<'_, ()> {
let db = self.clone();
Box::pin(async move { db.commit_async().map_err(Into::into).await })

View File

@ -1,5 +1,5 @@
use async_trait::async_trait;
use bb8::{ErrorSink, Pool};
use bb8::ErrorSink;
use std::{
collections::HashMap,
@ -12,10 +12,12 @@ use crate::db::{error::DbError, results, Db, DbPool, STD_COLLS};
use crate::server::metrics::Metrics;
use crate::settings::Settings;
use super::manager::{SpannerConnectionManager, SpannerSession};
use super::manager::bb8::SpannerSession;
use super::models::SpannerDb;
use crate::error::ApiResult;
pub(super) type Conn = deadpool::managed::Object<SpannerSession, DbError>;
embed_migrations!();
/// Run the diesel embedded migrations
@ -30,7 +32,8 @@ embed_migrations!();
#[derive(Clone)]
pub struct SpannerDbPool {
/// Pool of db connections
pool: Pool<SpannerConnectionManager<SpannerSession>>,
//pool: Pool<SpannerConnectionManager<SpannerSession>>,
pool: deadpool::managed::Pool<SpannerSession, DbError>,
/// In-memory cache of collection_ids and their names
coll_cache: Arc<CollectionCache>,
@ -45,24 +48,24 @@ impl SpannerDbPool {
}
pub async fn new_without_migrations(settings: &Settings, metrics: &Metrics) -> Result<Self> {
let manager = SpannerConnectionManager::<SpannerSession>::new(settings, metrics)?;
let max_size = settings.database_pool_max_size.unwrap_or(10);
let builder = bb8::Pool::builder()
.max_size(max_size)
.min_idle(settings.database_pool_min_idle)
.error_sink(Box::new(LoggingErrorSink));
let manager = super::manager::deadpool::Manager::new(settings, metrics)?;
let config = deadpool::managed::PoolConfig::new(max_size as usize);
let pool = deadpool::managed::Pool::from_config(manager, config);
Ok(Self {
pool: builder.build(manager).await?,
pool,
coll_cache: Default::default(),
metrics: metrics.clone(),
})
}
pub async fn get_async(&self) -> Result<SpannerDb<'_>> {
pub async fn get_async(&self) -> Result<SpannerDb> {
let conn = self.pool.get().await.map_err(|e| match e {
bb8::RunError::User(dbe) => dbe,
bb8::RunError::TimedOut => DbError::internal("bb8:TimedOut"),
deadpool::managed::PoolError::Backend(dbe) => dbe,
deadpool::managed::PoolError::Timeout(timeout_type) => {
DbError::internal(&format!("deadpool Timeout: {:?}", timeout_type))
}
})?;
Ok(SpannerDb::new(
conn,
@ -82,7 +85,7 @@ impl DbPool for SpannerDbPool {
}
fn state(&self) -> results::PoolState {
self.pool.state().into()
self.pool.status().into()
}
fn validate_batch_id(&self, id: String) -> Result<()> {

View File

@ -16,14 +16,16 @@ use protobuf::{
RepeatedField,
};
use super::models::{Conn, Result};
use crate::db::{results, util::SyncTimestamp, DbError, DbErrorKind};
use crate::{
db::{params, spanner::models::DEFAULT_BSO_TTL, util::to_rfc3339},
db::{
params, results, spanner::models::DEFAULT_BSO_TTL, util::to_rfc3339, util::SyncTimestamp,
DbError, DbErrorKind,
},
web::extractors::HawkIdentifier,
};
use super::{models::Result, pool::Conn};
pub fn as_value(string_value: String) -> Value {
let mut value = Value::new();
value.set_string_value(string_value);
@ -86,7 +88,7 @@ impl ExecuteSqlRequestBuilder {
self
}
fn prepare_request(self, conn: &Conn<'_>) -> ExecuteSqlRequest {
fn prepare_request(self, conn: &Conn) -> ExecuteSqlRequest {
let mut request = self.execute_sql;
request.set_session(conn.session.get_name().to_owned());
if let Some(params) = self.params {
@ -101,7 +103,7 @@ impl ExecuteSqlRequestBuilder {
}
/// Execute a SQL read statement but return a non-blocking streaming result
pub fn execute_async(self, conn: &Conn<'_>) -> Result<StreamedResultSetAsync> {
pub fn execute_async(self, conn: &Conn) -> Result<StreamedResultSetAsync> {
let stream = conn
.client
.execute_streaming_sql(&self.prepare_request(conn))?;
@ -109,7 +111,7 @@ impl ExecuteSqlRequestBuilder {
}
/// Execute a DML statement, returning the exact count of modified rows
pub async fn execute_dml_async(self, conn: &Conn<'_>) -> Result<i64> {
pub async fn execute_dml_async(self, conn: &Conn) -> Result<i64> {
let rs = conn
.client
.execute_sql_async(&self.prepare_request(conn))?