diff --git a/Cargo.lock b/Cargo.lock index add81b9a..7549eeea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -777,6 +777,20 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "deadpool" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4aaff9a7a1de9893f4004fa08527b31cb2ae4121c44e053cf53f29203c73bd23" +dependencies = [ + "async-trait", + "config", + "crossbeam-queue", + "num_cpus", + "serde 1.0.114", + "tokio", +] + [[package]] name = "debugid" version = "0.7.2" @@ -2719,6 +2733,7 @@ dependencies = [ "cadence", "chrono", "config", + "deadpool", "diesel", "diesel_logger", "diesel_migrations", diff --git a/Cargo.toml b/Cargo.toml index ea17f1a7..102cb806 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ bytes = "0.5" cadence = "0.20.0" chrono = "0.4" config = "0.10" +deadpool = "0.5.2" diesel = { version = "1.4.4", features = ["mysql", "r2d2"] } diesel_logger = "0.1.1" diesel_migrations = { version = "1.4.0", features = ["mysql"] } diff --git a/src/db/results.rs b/src/db/results.rs index 3b063f44..93f49f70 100644 --- a/src/db/results.rs +++ b/src/db/results.rs @@ -94,6 +94,15 @@ impl From for PoolState { } } +impl From for PoolState { + fn from(status: deadpool::Status) -> PoolState { + PoolState { + connections: status.size as u32, + idle_connections: status.available as u32, + } + } +} + #[cfg(test)] pub type GetCollectionId = i32; diff --git a/src/db/spanner/batch.rs b/src/db/spanner/batch.rs index 6869e480..66ba26b4 100644 --- a/src/db/spanner/batch.rs +++ b/src/db/spanner/batch.rs @@ -18,7 +18,7 @@ use crate::{ }; pub async fn create_async( - db: &SpannerDb<'_>, + db: &SpannerDb, params: params::CreateBatch, ) -> Result { let batch_id = Uuid::new_v4().to_simple().to_string(); @@ -57,7 +57,7 @@ pub async fn create_async( Ok(batch_id) } -pub async fn validate_async(db: &SpannerDb<'_>, params: params::ValidateBatch) -> Result { +pub async fn validate_async(db: &SpannerDb, params: params::ValidateBatch) -> Result { let collection_id = db.get_collection_id_async(¶ms.collection).await?; let exists = db .sql( @@ -81,7 +81,7 @@ pub async fn validate_async(db: &SpannerDb<'_>, params: params::ValidateBatch) - Ok(exists.is_some()) } -pub async fn append_async(db: &SpannerDb<'_>, params: params::AppendToBatch) -> Result<()> { +pub async fn append_async(db: &SpannerDb, params: params::AppendToBatch) -> Result<()> { let mut metrics = db.metrics.clone(); metrics.start_timer("storage.spanner.append_items_to_batch", None); @@ -106,7 +106,7 @@ pub async fn append_async(db: &SpannerDb<'_>, params: params::AppendToBatch) -> } pub async fn get_async( - db: &SpannerDb<'_>, + db: &SpannerDb, params: params::GetBatch, ) -> Result> { let collection_id = db.get_collection_id_async(¶ms.collection).await?; @@ -142,7 +142,7 @@ pub async fn get_async( Ok(batch) } -pub async fn delete_async(db: &SpannerDb<'_>, params: params::DeleteBatch) -> Result<()> { +pub async fn delete_async(db: &SpannerDb, params: params::DeleteBatch) -> Result<()> { let collection_id = db.get_collection_id_async(¶ms.collection).await?; // Also deletes child batch_bsos rows (INTERLEAVE IN PARENT batches ON // DELETE CASCADE) @@ -165,7 +165,7 @@ pub async fn delete_async(db: &SpannerDb<'_>, params: params::DeleteBatch) -> Re } pub async fn commit_async( - db: &SpannerDb<'_>, + db: &SpannerDb, params: params::CommitBatch, ) -> Result { let mut metrics = db.metrics.clone(); @@ -239,7 +239,7 @@ pub async fn commit_async( } pub async fn do_append_async( - db: &SpannerDb<'_>, + db: &SpannerDb, user_id: HawkIdentifier, collection_id: i32, batch_id: String, @@ -335,7 +335,7 @@ pub async fn do_append_async( /// For the special case of a user creating a batch for a collection with no /// prior data. async fn pretouch_collection_async( - db: &SpannerDb<'_>, + db: &SpannerDb, user_id: &HawkIdentifier, collection_id: i32, ) -> Result<()> { diff --git a/src/db/spanner/manager.rs b/src/db/spanner/manager/bb8.rs similarity index 95% rename from src/db/spanner/manager.rs rename to src/db/spanner/manager/bb8.rs index 344625a9..b1548890 100644 --- a/src/db/spanner/manager.rs +++ b/src/db/spanner/manager/bb8.rs @@ -18,7 +18,7 @@ use crate::{ settings::Settings, }; -const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443"; +pub const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443"; pub struct SpannerConnectionManager { database_name: String, @@ -33,6 +33,7 @@ impl<_T> fmt::Debug for SpannerConnectionManager<_T> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("SpannerConnectionManager") .field("database_name", &self.database_name) + .field("test_transactions", &self.test_transactions) .finish() } } @@ -65,7 +66,7 @@ pub struct SpannerSession { pub client: SpannerClient, pub session: Session, - pub(super) use_test_transactions: bool, + pub(in crate::db::spanner) use_test_transactions: bool, } #[async_trait] @@ -130,7 +131,7 @@ impl ManageConnection } } -async fn create_session( +pub async fn create_session( client: &SpannerClient, database_name: &str, ) -> Result { diff --git a/src/db/spanner/manager/deadpool.rs b/src/db/spanner/manager/deadpool.rs new file mode 100644 index 00000000..e75e7dd3 --- /dev/null +++ b/src/db/spanner/manager/deadpool.rs @@ -0,0 +1,115 @@ +use std::{fmt, sync::Arc}; + +use actix_web::web::block; +use async_trait::async_trait; +use deadpool::managed::{RecycleError, RecycleResult}; +use googleapis_raw::spanner::v1::{spanner::GetSessionRequest, spanner_grpc::SpannerClient}; +use grpcio::{ChannelBuilder, ChannelCredentials, EnvBuilder, Environment}; + +use crate::{ + db::error::{DbError, DbErrorKind}, + server::metrics::Metrics, + settings::Settings, +}; + +use super::bb8::{create_session, SpannerSession, SPANNER_ADDRESS}; + +pub struct Manager { + database_name: String, + /// The gRPC environment + env: Arc, + metrics: Metrics, + test_transactions: bool, +} + +impl fmt::Debug for Manager { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Manager") + .field("database_name", &self.database_name) + .field("test_transactions", &self.test_transactions) + .finish() + } +} + +impl Manager { + pub fn new(settings: &Settings, metrics: &Metrics) -> Result { + let url = &settings.database_url; + if !url.starts_with("spanner://") { + Err(DbErrorKind::InvalidUrl(url.to_owned()))?; + } + let database_name = url["spanner://".len()..].to_owned(); + let env = Arc::new(EnvBuilder::new().build()); + + #[cfg(not(test))] + let test_transactions = false; + #[cfg(test)] + let test_transactions = settings.database_use_test_transactions; + + Ok(Manager { + database_name, + env, + metrics: metrics.clone(), + test_transactions, + }) + } +} + +#[async_trait] +impl deadpool::managed::Manager for Manager { + async fn create(&self) -> Result { + let env = self.env.clone(); + let mut metrics = self.metrics.clone(); + // XXX: issue732: Could google_default_credentials (or + // ChannelBuilder::secure_connect) block?! + let chan = block(move || -> Result { + metrics.start_timer("storage.pool.grpc_auth", None); + // Requires + // GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json + let creds = ChannelCredentials::google_default_credentials()?; + Ok(ChannelBuilder::new(env) + .max_send_message_len(100 << 20) + .max_receive_message_len(100 << 20) + .secure_connect(SPANNER_ADDRESS, creds)) + }) + .await + .map_err(|e| match e { + actix_web::error::BlockingError::Error(e) => e.into(), + actix_web::error::BlockingError::Canceled => { + DbError::internal("web::block Manager operation canceled") + } + })?; + let client = SpannerClient::new(chan); + + // Connect to the instance and create a Spanner session. + let session = create_session(&client, &self.database_name).await?; + + Ok(SpannerSession { + client, + session, + use_test_transactions: self.test_transactions, + }) + } + + async fn recycle(&self, conn: &mut SpannerSession) -> RecycleResult { + let mut req = GetSessionRequest::new(); + req.set_name(conn.session.get_name().to_owned()); + if let Err(e) = conn + .client + .get_session_async(&req) + .map_err(|e| RecycleError::Backend(e.into()))? + .await + { + match e { + grpcio::Error::RpcFailure(ref status) + if status.status == grpcio::RpcStatusCode::NOT_FOUND => + { + conn.session = create_session(&conn.client, &self.database_name) + .await + .map_err(|e| RecycleError::Backend(e.into()))?; + } + _ => return Err(RecycleError::Backend(e.into())), + } + } + Ok(()) + } +} diff --git a/src/db/spanner/manager/mod.rs b/src/db/spanner/manager/mod.rs new file mode 100644 index 00000000..d29dca6f --- /dev/null +++ b/src/db/spanner/manager/mod.rs @@ -0,0 +1,2 @@ +pub mod bb8; +pub mod deadpool; diff --git a/src/db/spanner/models.rs b/src/db/spanner/models.rs index b45d1b9e..cba4bbb8 100644 --- a/src/db/spanner/models.rs +++ b/src/db/spanner/models.rs @@ -7,47 +7,46 @@ use std::{ sync::Arc, }; -use bb8::PooledConnection; use futures::future::TryFutureExt; -use googleapis_raw::spanner::v1::transaction::{ - self, TransactionOptions, TransactionOptions_ReadOnly, TransactionOptions_ReadWrite, -}; use googleapis_raw::spanner::v1::{ mutation::{Mutation, Mutation_Write}, spanner::{BeginTransactionRequest, CommitRequest, ExecuteSqlRequest, RollbackRequest}, + transaction::{ + TransactionOptions, TransactionOptions_ReadOnly, TransactionOptions_ReadWrite, + TransactionSelector, + }, type_pb::TypeCode, }; #[allow(unused_imports)] use protobuf::{well_known_types::ListValue, Message, RepeatedField}; -use super::manager::{SpannerConnectionManager, SpannerSession}; -use super::pool::CollectionCache; - -use crate::db::{ - error::{DbError, DbErrorKind}, - params, results, - spanner::support::{as_type, StreamedResultSetAsync}, - util::SyncTimestamp, - Db, DbFuture, Sorting, FIRST_CUSTOM_COLLECTION_ID, +use crate::{ + db::{ + error::{DbError, DbErrorKind}, + params, results, + util::SyncTimestamp, + Db, DbFuture, Sorting, FIRST_CUSTOM_COLLECTION_ID, + }, + server::metrics::Metrics, + web::extractors::{BsoQueryParams, HawkIdentifier, Offset}, }; -use crate::server::metrics::Metrics; -use crate::web::extractors::{BsoQueryParams, HawkIdentifier, Offset}; -use super::support::{bso_to_insert_row, bso_to_update_row}; use super::{ batch, - support::{as_list_value, as_value, bso_from_row, ExecuteSqlRequestBuilder}, + pool::{CollectionCache, Conn}, + support::{ + as_list_value, as_type, as_value, bso_from_row, ExecuteSqlRequestBuilder, + StreamedResultSetAsync, + }, + support::{bso_to_insert_row, bso_to_update_row}, }; -pub type TransactionSelector = transaction::TransactionSelector; - #[derive(Debug, Eq, PartialEq)] pub enum CollectionLock { Read, Write, } -pub(super) type Conn<'a> = PooledConnection<'a, SpannerConnectionManager>; pub type Result = std::result::Result; /// The ttl to use for rows that are never supposed to expire (in seconds) @@ -81,8 +80,8 @@ struct SpannerDbSession { } #[derive(Clone, Debug)] -pub struct SpannerDb<'a> { - pub(super) inner: Arc>, +pub struct SpannerDb { + pub(super) inner: Arc, /// Pool level cache of collection_ids and their names coll_cache: Arc, @@ -90,28 +89,28 @@ pub struct SpannerDb<'a> { pub metrics: Metrics, } -pub struct SpannerDbInner<'a> { - pub(super) conn: Conn<'a>, +pub struct SpannerDbInner { + pub(super) conn: Conn, session: RefCell, } -impl fmt::Debug for SpannerDbInner<'_> { +impl fmt::Debug for SpannerDbInner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "SpannerDbInner") } } -impl<'a> Deref for SpannerDb<'a> { - type Target = SpannerDbInner<'a>; +impl Deref for SpannerDb { + type Target = SpannerDbInner; fn deref(&self) -> &Self::Target { &self.inner } } -impl<'a> SpannerDb<'a> { - pub fn new(conn: Conn<'a>, coll_cache: Arc, metrics: &Metrics) -> Self { +impl SpannerDb { + pub fn new(conn: Conn, coll_cache: Arc, metrics: &Metrics) -> Self { let inner = SpannerDbInner { conn, session: RefCell::new(Default::default()), @@ -1605,7 +1604,7 @@ impl<'a> SpannerDb<'a> { } } -impl<'a> Db<'a> for SpannerDb<'a> { +impl<'a> Db<'a> for SpannerDb { fn commit(&self) -> DbFuture<'_, ()> { let db = self.clone(); Box::pin(async move { db.commit_async().map_err(Into::into).await }) diff --git a/src/db/spanner/pool.rs b/src/db/spanner/pool.rs index 610335c7..74150e09 100644 --- a/src/db/spanner/pool.rs +++ b/src/db/spanner/pool.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use bb8::{ErrorSink, Pool}; +use bb8::ErrorSink; use std::{ collections::HashMap, @@ -12,10 +12,12 @@ use crate::db::{error::DbError, results, Db, DbPool, STD_COLLS}; use crate::server::metrics::Metrics; use crate::settings::Settings; -use super::manager::{SpannerConnectionManager, SpannerSession}; +use super::manager::bb8::SpannerSession; use super::models::SpannerDb; use crate::error::ApiResult; +pub(super) type Conn = deadpool::managed::Object; + embed_migrations!(); /// Run the diesel embedded migrations @@ -30,7 +32,8 @@ embed_migrations!(); #[derive(Clone)] pub struct SpannerDbPool { /// Pool of db connections - pool: Pool>, + //pool: Pool>, + pool: deadpool::managed::Pool, /// In-memory cache of collection_ids and their names coll_cache: Arc, @@ -45,24 +48,24 @@ impl SpannerDbPool { } pub async fn new_without_migrations(settings: &Settings, metrics: &Metrics) -> Result { - let manager = SpannerConnectionManager::::new(settings, metrics)?; let max_size = settings.database_pool_max_size.unwrap_or(10); - let builder = bb8::Pool::builder() - .max_size(max_size) - .min_idle(settings.database_pool_min_idle) - .error_sink(Box::new(LoggingErrorSink)); + let manager = super::manager::deadpool::Manager::new(settings, metrics)?; + let config = deadpool::managed::PoolConfig::new(max_size as usize); + let pool = deadpool::managed::Pool::from_config(manager, config); Ok(Self { - pool: builder.build(manager).await?, + pool, coll_cache: Default::default(), metrics: metrics.clone(), }) } - pub async fn get_async(&self) -> Result> { + pub async fn get_async(&self) -> Result { let conn = self.pool.get().await.map_err(|e| match e { - bb8::RunError::User(dbe) => dbe, - bb8::RunError::TimedOut => DbError::internal("bb8:TimedOut"), + deadpool::managed::PoolError::Backend(dbe) => dbe, + deadpool::managed::PoolError::Timeout(timeout_type) => { + DbError::internal(&format!("deadpool Timeout: {:?}", timeout_type)) + } })?; Ok(SpannerDb::new( conn, @@ -82,7 +85,7 @@ impl DbPool for SpannerDbPool { } fn state(&self) -> results::PoolState { - self.pool.state().into() + self.pool.status().into() } fn validate_batch_id(&self, id: String) -> Result<()> { diff --git a/src/db/spanner/support.rs b/src/db/spanner/support.rs index 2a9b1715..6c6ca0cc 100644 --- a/src/db/spanner/support.rs +++ b/src/db/spanner/support.rs @@ -16,14 +16,16 @@ use protobuf::{ RepeatedField, }; -use super::models::{Conn, Result}; -use crate::db::{results, util::SyncTimestamp, DbError, DbErrorKind}; - use crate::{ - db::{params, spanner::models::DEFAULT_BSO_TTL, util::to_rfc3339}, + db::{ + params, results, spanner::models::DEFAULT_BSO_TTL, util::to_rfc3339, util::SyncTimestamp, + DbError, DbErrorKind, + }, web::extractors::HawkIdentifier, }; +use super::{models::Result, pool::Conn}; + pub fn as_value(string_value: String) -> Value { let mut value = Value::new(); value.set_string_value(string_value); @@ -86,7 +88,7 @@ impl ExecuteSqlRequestBuilder { self } - fn prepare_request(self, conn: &Conn<'_>) -> ExecuteSqlRequest { + fn prepare_request(self, conn: &Conn) -> ExecuteSqlRequest { let mut request = self.execute_sql; request.set_session(conn.session.get_name().to_owned()); if let Some(params) = self.params { @@ -101,7 +103,7 @@ impl ExecuteSqlRequestBuilder { } /// Execute a SQL read statement but return a non-blocking streaming result - pub fn execute_async(self, conn: &Conn<'_>) -> Result { + pub fn execute_async(self, conn: &Conn) -> Result { let stream = conn .client .execute_streaming_sql(&self.prepare_request(conn))?; @@ -109,7 +111,7 @@ impl ExecuteSqlRequestBuilder { } /// Execute a DML statement, returning the exact count of modified rows - pub async fn execute_dml_async(self, conn: &Conn<'_>) -> Result { + pub async fn execute_dml_async(self, conn: &Conn) -> Result { let rs = conn .client .execute_sql_async(&self.prepare_request(conn))?