feat: add dynamic routing headers to all Spanner ops (#1491)

and support the seemingly undocumented new leader aware routing header recently
added to other spanner clients (via a setting, disabled by default)

SYNC-3922
This commit is contained in:
Philip Jenvey 2023-10-18 14:58:15 -07:00 committed by GitHub
parent 1b9ebbf40f
commit af416fc29f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 397 additions and 184 deletions

195
Cargo.lock generated
View File

@ -90,7 +90,7 @@ dependencies = [
"pin-project 1.1.3",
"rand 0.7.3",
"regex",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"serde_urlencoded",
"sha-1",
@ -118,7 +118,7 @@ dependencies = [
"http",
"log",
"regex",
"serde 1.0.188",
"serde 1.0.189",
]
[[package]]
@ -257,7 +257,7 @@ dependencies = [
"mime",
"pin-project 1.1.3",
"regex",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"serde_urlencoded",
"socket2 0.3.19",
@ -364,15 +364,15 @@ version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12"
dependencies = [
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
]
[[package]]
name = "async-trait"
version = "0.1.73"
version = "0.1.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9"
dependencies = [
"proc-macro2",
"quote",
@ -415,7 +415,7 @@ dependencies = [
"mime",
"percent-encoding 2.3.0",
"rand 0.7.3",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"serde_urlencoded",
]
@ -490,9 +490,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.4.0"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07"
[[package]]
name = "block-buffer"
@ -682,7 +682,7 @@ checksum = "19b076e143e1d9538dde65da30f8481c2a6c44040edb8e02b9bf1351edb92ce3"
dependencies = [
"lazy_static",
"nom 5.1.3",
"serde 1.0.188",
"serde 1.0.189",
]
[[package]]
@ -694,7 +694,7 @@ dependencies = [
"lazy_static",
"nom 5.1.3",
"rust-ini",
"serde 1.0.188",
"serde 1.0.189",
"serde-hjson",
"serde_json",
"toml",
@ -832,9 +832,9 @@ dependencies = [
[[package]]
name = "curl-sys"
version = "0.4.67+curl-8.3.0"
version = "0.4.68+curl-8.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cc35d066510b197a0f72de863736641539957628c8a42e70e27c66849e77c34"
checksum = "b4a0d18d88360e374b16b2273c832b5e57258ffc1d4aa4f96b108e0738d5752f"
dependencies = [
"cc",
"libc",
@ -854,7 +854,7 @@ dependencies = [
"config 0.10.1",
"crossbeam-queue",
"num_cpus",
"serde 1.0.188",
"serde 1.0.189",
"tokio 0.2.25",
]
@ -864,15 +864,18 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d"
dependencies = [
"serde 1.0.188",
"uuid 1.4.1",
"serde 1.0.189",
"uuid 1.5.0",
]
[[package]]
name = "deranged"
version = "0.3.8"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946"
checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3"
dependencies = [
"powerfmt",
]
[[package]]
name = "derive_more"
@ -992,7 +995,7 @@ checksum = "7f3f119846c823f9eafcf953a8f6ffb6ed69bf6240883261a7f13b634579a51f"
dependencies = [
"lazy_static",
"regex",
"serde 1.0.188",
"serde 1.0.189",
"strsim 0.10.0",
]
@ -1061,7 +1064,7 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c"
dependencies = [
"serde 1.0.188",
"serde 1.0.189",
]
[[package]]
@ -1094,9 +1097,9 @@ dependencies = [
[[package]]
name = "flate2"
version = "1.0.27"
version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010"
checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e"
dependencies = [
"crc32fast",
"miniz_oxide",
@ -1612,16 +1615,16 @@ dependencies = [
[[package]]
name = "iana-time-zone"
version = "0.1.57"
version = "0.1.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613"
checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows",
"windows-core",
]
[[package]]
@ -1855,9 +1858,9 @@ checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f"
[[package]]
name = "lock_api"
version = "0.4.10"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16"
checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45"
dependencies = [
"autocfg",
"scopeguard",
@ -2149,7 +2152,7 @@ version = "0.10.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c"
dependencies = [
"bitflags 2.4.0",
"bitflags 2.4.1",
"cfg-if 1.0.0",
"foreign-types",
"libc",
@ -2194,7 +2197,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "006e42d5b888366f1880eda20371fedde764ed2213dc8496f49622fa0c99cd5e"
dependencies = [
"log",
"serde 1.0.188",
"serde 1.0.189",
"winapi 0.3.9",
]
@ -2216,7 +2219,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.8",
"parking_lot_core 0.9.9",
]
[[package]]
@ -2235,13 +2238,13 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.8"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if 1.0.0",
"libc",
"redox_syscall 0.3.5",
"redox_syscall 0.4.1",
"smallvec",
"windows-targets",
]
@ -2347,6 +2350,12 @@ version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -2561,6 +2570,15 @@ dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_users"
version = "0.4.3"
@ -2574,9 +2592,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.10.0"
version = "1.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d119d7c7ca818f8a53c300863d4f87566aac09943aef5b355bb83969dae75d87"
checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
dependencies = [
"aho-corasick",
"memchr",
@ -2586,9 +2604,9 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.4.1"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "465c6fc0621e4abc4187a2bda0937bfd4f722c2730b29562e19689ea796c9a4b"
checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
dependencies = [
"aho-corasick",
"memchr",
@ -2597,9 +2615,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.8.1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56d84fdd47036b038fc80dd333d10b6aab10d5d31f4a366e20014def75328d33"
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "reqwest"
@ -2627,7 +2645,7 @@ dependencies = [
"percent-encoding 2.3.0",
"pin-project-lite 0.2.13",
"rustls",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"serde_urlencoded",
"tokio 0.2.25",
@ -2665,7 +2683,7 @@ dependencies = [
"once_cell",
"percent-encoding 2.3.0",
"pin-project-lite 0.2.13",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"serde_urlencoded",
"system-configuration",
@ -2742,11 +2760,11 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.18"
version = "0.38.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a74ee2d7c2581cd139b42447d7d9389b889bdaad3a73f1ebb16f2a3237bb19c"
checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed"
dependencies = [
"bitflags 2.4.0",
"bitflags 2.4.1",
"errno",
"libc",
"linux-raw-sys",
@ -2920,7 +2938,7 @@ dependencies = [
"once_cell",
"rand 0.8.5",
"sentry-types",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
]
@ -2966,12 +2984,12 @@ dependencies = [
"debugid",
"hex",
"rand 0.8.5",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"thiserror",
"time 0.3.29",
"time 0.3.30",
"url 2.4.1",
"uuid 1.4.1",
"uuid 1.5.0",
]
[[package]]
@ -2982,9 +3000,9 @@ checksum = "9dad3f759919b92c3068c696c15c3d17238234498bbdcc80f2c469606f948ac8"
[[package]]
name = "serde"
version = "1.0.188"
version = "1.0.189"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e"
checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537"
dependencies = [
"serde_derive",
]
@ -3003,9 +3021,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.188"
version = "1.0.189"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5"
dependencies = [
"proc-macro2",
"quote",
@ -3020,7 +3038,7 @@ checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
dependencies = [
"itoa 1.0.9",
"ryu",
"serde 1.0.188",
"serde 1.0.189",
]
[[package]]
@ -3032,7 +3050,7 @@ dependencies = [
"form_urlencoded",
"itoa 1.0.9",
"ryu",
"serde 1.0.188",
"serde 1.0.189",
]
[[package]]
@ -3141,7 +3159,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f400f1c5db96f1f52065e8931ca0c524cceb029f7537c9e6d5424488ca137ca0"
dependencies = [
"chrono",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"slog",
]
@ -3178,7 +3196,7 @@ dependencies = [
"slog",
"term",
"thread_local",
"time 0.3.29",
"time 0.3.30",
]
[[package]]
@ -3261,7 +3279,7 @@ checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef"
dependencies = [
"proc-macro2",
"quote",
"serde 1.0.188",
"serde 1.0.189",
"serde_derive",
"syn 1.0.109",
]
@ -3275,7 +3293,7 @@ dependencies = [
"base-x",
"proc-macro2",
"quote",
"serde 1.0.188",
"serde 1.0.189",
"serde_derive",
"serde_json",
"sha1",
@ -3356,7 +3374,7 @@ dependencies = [
"reqwest 0.10.10",
"sentry",
"sentry-backtrace",
"serde 1.0.188",
"serde 1.0.189",
"serde_derive",
"serde_json",
"sha2",
@ -3373,7 +3391,7 @@ dependencies = [
"syncstorage-db",
"syncstorage-settings",
"thiserror",
"time 0.3.29",
"time 0.3.30",
"tokenserver-auth",
"tokenserver-common",
"tokenserver-db",
@ -3393,7 +3411,7 @@ dependencies = [
"cadence",
"futures 0.3.28",
"hkdf",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"sha2",
"slog",
@ -3420,7 +3438,7 @@ version = "0.14.0"
dependencies = [
"config 0.11.0",
"num_cpus",
"serde 1.0.188",
"serde 1.0.189",
"slog-scope",
"syncserver-common",
"syncstorage-settings",
@ -3463,7 +3481,7 @@ dependencies = [
"futures 0.3.28",
"http",
"lazy_static",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"syncserver-common",
"syncserver-db-common",
@ -3498,9 +3516,9 @@ name = "syncstorage-settings"
version = "0.14.0"
dependencies = [
"rand 0.8.5",
"serde 1.0.188",
"serde 1.0.189",
"syncserver-common",
"time 0.3.29",
"time 0.3.30",
]
[[package]]
@ -3512,6 +3530,7 @@ dependencies = [
"cadence",
"deadpool",
"env_logger 0.10.0",
"form_urlencoded",
"futures 0.3.28",
"google-cloud-rust-raw",
"grpcio",
@ -3654,15 +3673,16 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe"
checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5"
dependencies = [
"deranged",
"itoa 1.0.9",
"libc",
"num_threads",
"serde 1.0.188",
"powerfmt",
"serde 1.0.189",
"time-core",
"time-macros 0.2.15",
]
@ -3730,7 +3750,7 @@ dependencies = [
"mockito",
"pyo3",
"reqwest 0.10.10",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"syncserver-common",
"tokenserver-common",
@ -3744,7 +3764,7 @@ version = "0.14.0"
dependencies = [
"actix-web",
"backtrace",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"syncserver-common",
"thiserror",
@ -3762,7 +3782,7 @@ dependencies = [
"env_logger 0.10.0",
"futures 0.3.28",
"http",
"serde 1.0.188",
"serde 1.0.189",
"serde_derive",
"serde_json",
"slog-scope",
@ -3779,7 +3799,7 @@ dependencies = [
name = "tokenserver-settings"
version = "0.14.0"
dependencies = [
"serde 1.0.188",
"serde 1.0.189",
"tokenserver-common",
]
@ -3898,7 +3918,7 @@ version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234"
dependencies = [
"serde 1.0.188",
"serde 1.0.189",
]
[[package]]
@ -3909,11 +3929,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]]
name = "tracing"
version = "0.1.37"
version = "0.1.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9"
dependencies = [
"cfg-if 1.0.0",
"log",
"pin-project-lite 0.2.13",
"tracing-core",
@ -3921,9 +3940,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.31"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@ -4089,7 +4108,7 @@ dependencies = [
"form_urlencoded",
"idna 0.4.0",
"percent-encoding 2.3.0",
"serde 1.0.188",
"serde 1.0.189",
]
[[package]]
@ -4105,16 +4124,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [
"getrandom 0.2.10",
"serde 1.0.188",
"serde 1.0.189",
]
[[package]]
name = "uuid"
version = "1.4.1"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc"
dependencies = [
"serde 1.0.188",
"serde 1.0.189",
]
[[package]]
@ -4126,7 +4145,7 @@ dependencies = [
"idna 0.4.0",
"lazy_static",
"regex",
"serde 1.0.188",
"serde 1.0.189",
"serde_derive",
"serde_json",
"url 2.4.1",
@ -4220,7 +4239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342"
dependencies = [
"cfg-if 1.0.0",
"serde 1.0.188",
"serde 1.0.189",
"serde_json",
"wasm-bindgen-macro",
]
@ -4372,10 +4391,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.48.0"
name = "windows-core"
version = "0.51.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f"
checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64"
dependencies = [
"windows-targets",
]

View File

@ -79,6 +79,8 @@ pub struct Settings {
pub database_use_test_transactions: bool,
#[cfg(debug_assertions)]
pub database_spanner_use_mutations: bool,
/// Whether leader aware router headers are sent to Spanner
pub database_spanner_route_to_leader: bool,
/// Server-enforced limits for request payloads.
pub limits: ServerLimits,
@ -112,6 +114,7 @@ impl Default for Settings {
database_use_test_transactions: false,
#[cfg(debug_assertions)]
database_spanner_use_mutations: true,
database_spanner_route_to_leader: false,
limits: ServerLimits::default(),
statsd_label: "syncstorage".to_string(),
enable_quota: false,

View File

@ -19,6 +19,7 @@ async-trait = "0.1.40"
#deadpool = "0.5" # pin to 0.5
deadpool = { git = "https://github.com/mozilla-services/deadpool", branch = "deadpool-v0.5.2-issue92" }
google-cloud-rust-raw = "0.15.0"
form_urlencoded = "1.2"
# Some versions of OpenSSL 1.1.1 conflict with grpcio's built-in boringssl which can cause
# syncserver to either fail to either compile, or start. In those cases, try
# `cargo build --features grpcio/openssl ...`

View File

@ -9,6 +9,7 @@ mod macros;
mod batch;
mod error;
mod manager;
mod metadata;
mod models;
mod pool;
mod support;

View File

@ -6,28 +6,26 @@ use grpcio::{EnvBuilder, Environment};
use syncserver_common::{BlockingThreadpool, Metrics};
use syncstorage_settings::Settings;
use super::session::{create_spanner_session, recycle_spanner_session, SpannerSession};
use super::session::{
create_spanner_session, recycle_spanner_session, SpannerSession, SpannerSessionSettings,
};
use crate::error::DbError;
pub(crate) type Conn = deadpool::managed::Object<SpannerSession, DbError>;
pub(crate) struct SpannerSessionManager {
database_name: String,
settings: SpannerSessionSettings,
/// The gRPC environment
env: Arc<Environment>,
metrics: Metrics,
test_transactions: bool,
max_lifespan: Option<u32>,
max_idle: Option<u32>,
emulator_host: Option<String>,
blocking_threadpool: Arc<BlockingThreadpool>,
}
impl fmt::Debug for SpannerSessionManager {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("deadpool::SpannerSessionManager")
.field("database_name", &self.database_name)
.field("test_transactions", &self.test_transactions)
.field("settings", &self.settings)
.field("blocking_threadpool", &self.blocking_threadpool)
.finish()
}
}
@ -38,27 +36,10 @@ impl SpannerSessionManager {
metrics: &Metrics,
blocking_threadpool: Arc<BlockingThreadpool>,
) -> Result<Self, DbError> {
let database_name = settings
.spanner_database_name()
.ok_or_else(|| {
DbError::internal(format!("invalid database url: {}", settings.database_url))
})?
.to_owned();
let env = Arc::new(EnvBuilder::new().build());
#[cfg(not(debug_assertions))]
let test_transactions = false;
#[cfg(debug_assertions)]
let test_transactions = settings.database_use_test_transactions;
Ok(Self {
database_name,
env,
settings: SpannerSessionSettings::from_settings(settings)?,
env: Arc::new(EnvBuilder::new().build()),
metrics: metrics.clone(),
test_transactions,
max_lifespan: settings.database_pool_connection_lifespan,
max_idle: settings.database_pool_connection_max_idle,
emulator_host: settings.spanner_emulator_host.clone(),
blocking_threadpool,
})
}
@ -68,11 +49,9 @@ impl SpannerSessionManager {
impl Manager<SpannerSession, DbError> for SpannerSessionManager {
async fn create(&self) -> Result<SpannerSession, DbError> {
let session = create_spanner_session(
&self.settings,
Arc::clone(&self.env),
self.metrics.clone(),
&self.database_name,
self.test_transactions,
self.emulator_host.clone(),
self.blocking_threadpool.clone(),
)
.await?;
@ -80,14 +59,8 @@ impl Manager<SpannerSession, DbError> for SpannerSessionManager {
}
async fn recycle(&self, conn: &mut SpannerSession) -> RecycleResult<DbError> {
recycle_spanner_session(
conn,
&self.database_name,
&self.metrics,
self.max_lifespan,
self.max_idle,
)
.await
.map_err(RecycleError::Backend)
recycle_spanner_session(conn, &self.metrics)
.await
.map_err(RecycleError::Backend)
}
}

View File

@ -4,10 +4,11 @@ use google_cloud_rust_raw::spanner::v1::{
spanner::{CreateSessionRequest, GetSessionRequest, Session},
spanner_grpc::SpannerClient,
};
use grpcio::{CallOption, ChannelBuilder, ChannelCredentials, Environment, MetadataBuilder};
use grpcio::{CallOption, ChannelBuilder, ChannelCredentials, Environment};
use syncserver_common::{BlockingThreadpool, Metrics};
use syncstorage_settings::Settings;
use crate::error::DbError;
use crate::{error::DbError, metadata::MetadataBuilder};
const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443";
@ -21,25 +22,103 @@ pub struct SpannerSession {
pub session: Session,
/// The underlying client (Connection/Channel) for interacting with spanner
pub client: SpannerClient,
pub(crate) use_test_transactions: bool,
/// Session Settings
pub settings: SpannerSessionSettings,
/// A second based UTC for SpannerSession creation.
/// Session has a similar `create_time` value that is managed by protobuf,
/// but some clock skew issues are possible.
pub(crate) create_time: i64,
/// Whether we are using the Spanner emulator
pub using_spanner_emulator: bool,
}
impl SpannerSession {
/// Return [CallOption] including a Dynamic Routing header for this
/// [SpannerSession] in its [grpcio::Metdata]
///
/// The more common form of [grpcio::Metdata] used by Spanner operations
/// such as `begin`, `commit`, `execute_sql`, etc.
pub fn session_opt(&self) -> Result<CallOption, grpcio::Error> {
// NOTE: this could also be cached (then cloned)
let meta = self
.settings
.metadata_builder()
.routing_param("session", self.session.get_name())
.build()?;
Ok(CallOption::default().headers(meta))
}
}
#[derive(Clone, Debug)]
pub struct SpannerSessionSettings {
/// The database name
pub database: String,
/// Whether [SpannerDb] uses mutations, which should be more efficient for
/// some of its bulk operations
pub use_mutations: bool,
/// Whether the Leader Aware Routing header should be included in gRPC
/// metdata
pub route_to_leader: bool,
/// Max age of a Session
pub max_lifespan: Option<u32>,
/// Max idle time of a Session
pub max_idle: Option<u32>,
/// For tests: disables transactions from committing
pub(crate) use_test_transactions: bool,
/// Spanner emulator hostname when set to Spanner emulator mode
pub emulator_host: Option<String>,
}
impl SpannerSessionSettings {
pub fn from_settings(settings: &Settings) -> Result<Self, DbError> {
let database = settings
.spanner_database_name()
.ok_or_else(|| {
DbError::internal(format!("Invalid database url: {}", settings.database_url))
})?
.to_owned();
#[cfg(not(debug_assertions))]
let (use_test_transactions, use_mutations) = (false, true);
#[cfg(debug_assertions)]
let (use_test_transactions, use_mutations) = (
settings.database_use_test_transactions,
settings.database_spanner_use_mutations,
);
Ok(Self {
database,
use_mutations,
route_to_leader: settings.database_spanner_route_to_leader,
max_lifespan: settings.database_pool_connection_lifespan,
max_idle: settings.database_pool_connection_max_idle,
use_test_transactions,
emulator_host: settings.spanner_emulator_host.clone(),
})
}
/// Whether the Spanner emulator's in use
pub fn using_spanner_emulator(&self) -> bool {
self.emulator_host.is_some()
}
/// Build [grpcio::Metadata] with a Resource prefix and other applicable
/// settings
pub fn metadata_builder(&self) -> MetadataBuilder {
MetadataBuilder::with_prefix(&self.database).route_to_leader(self.route_to_leader)
}
}
/// Create a Session (and the underlying gRPC Channel)
pub async fn create_spanner_session(
settings: &SpannerSessionSettings,
env: Arc<Environment>,
mut metrics: Metrics,
database_name: &str,
use_test_transactions: bool,
emulator_host: Option<String>,
blocking_threadpool: Arc<BlockingThreadpool>,
) -> Result<SpannerSession, DbError> {
let using_spanner_emulator = emulator_host.is_some();
let emulator_host = settings.emulator_host.clone();
let chan = blocking_threadpool
.spawn(move || -> Result<grpcio::Channel, DbError> {
if let Some(spanner_emulator_address) = emulator_host {
@ -66,28 +145,34 @@ pub async fn create_spanner_session(
let client = SpannerClient::new(chan);
// Connect to the instance and create a Spanner session.
let session = create_session(&client, database_name).await?;
let session = create_session(&client, settings).await?;
Ok(SpannerSession {
session,
client,
use_test_transactions,
// NOTE: later versions of deadpool provide an Object::pool method
// where we could get Settings from (via Manager) instead of cloning
settings: settings.clone(),
create_time: crate::now(),
using_spanner_emulator,
})
}
/// Recycle a cached Session for reuse
pub async fn recycle_spanner_session(
conn: &mut SpannerSession,
database_name: &str,
metrics: &Metrics,
max_lifetime: Option<u32>,
max_idle: Option<u32>,
) -> Result<(), DbError> {
let settings = &conn.settings;
let session = conn.session.get_name();
let now = crate::now();
let mut req = GetSessionRequest::new();
req.set_name(conn.session.get_name().to_owned());
req.set_name(session.to_owned());
let meta = settings
.metadata_builder()
.routing_param("name", session)
.build()?;
let opt = CallOption::default().headers(meta);
/*
Connections can sometimes produce GOAWAY errors. GOAWAYs are HTTP2 frame
errors that are (usually) sent before a given connection is shut down. It
@ -108,20 +193,20 @@ pub async fn recycle_spanner_session(
result in the client re-trying.
*/
match conn.client.get_session_async(&req)?.await {
match conn.client.get_session_async_opt(&req, opt)?.await {
Ok(this_session) => {
// Remember, this_session may not be related to
// the SpannerSession.session, so you may need
// to reflect changes if you want a more permanent
// data reference.
if this_session.get_name() != conn.session.get_name() {
if this_session.get_name() != session {
warn!(
"This session may not be the session you want {} != {}",
this_session.get_name(),
conn.session.get_name()
);
}
if let Some(max_life) = max_lifetime {
if let Some(max_life) = settings.max_lifespan {
// use our create time. (this_session has it's own
// `create_time` timestamp, but clock drift could
// be an issue.)
@ -133,7 +218,7 @@ pub async fn recycle_spanner_session(
}
}
// check how long that this has been idle...
if let Some(max_idle) = max_idle {
if let Some(max_idle) = settings.max_idle {
// use the Protobuf last use time from the saved
// reference Session. It's not perfect, but it's good enough.
let idle = conn
@ -157,7 +242,7 @@ pub async fn recycle_spanner_session(
grpcio::Error::RpcFailure(ref status)
if status.code() == grpcio::RpcStatusCode::NOT_FOUND =>
{
conn.session = create_session(&conn.client, database_name).await?;
conn.session = create_session(&conn.client, settings).await?;
Ok(())
}
_ => Err(e.into()),
@ -167,13 +252,14 @@ pub async fn recycle_spanner_session(
async fn create_session(
client: &SpannerClient,
database_name: &str,
settings: &SpannerSessionSettings,
) -> Result<Session, grpcio::Error> {
let mut req = CreateSessionRequest::new();
req.database = database_name.to_owned();
let mut meta = MetadataBuilder::new();
meta.add_str("google-cloud-resource-prefix", database_name)?;
meta.add_str("x-goog-api-client", "gcp-grpc-rs")?;
let opt = CallOption::default().headers(meta.build());
req.database = settings.database.clone();
let meta = settings
.metadata_builder()
.routing_param("database", &settings.database)
.build()?;
let opt = CallOption::default().headers(meta);
client.create_session_async_opt(&req, opt)?.await
}

View File

@ -0,0 +1,134 @@
/// gRPC metadata Resource prefix header
///
/// Generic across Google APIs. This "improves routing by the backend" as
/// described by other clients
const PREFIX_KEY: &str = "google-cloud-resource-prefix";
/// gRPC metadata Client information header
///
/// A `User-Agent` like header, likely its main use is for GCP's metrics
const METRICS_KEY: &str = "x-goog-api-client";
/// gRPC metadata Dynamic Routing header:
/// https://google.aip.dev/client-libraries/4222
///
/// See the googleapis protobuf for which routing header params are used for
/// each Spanner operation (under the `google.api.http` option).
///
/// https://github.com/googleapis/googleapis/blob/master/google/spanner/v1/spanner.proto
const ROUTING_KEY: &str = "x-goog-request-params";
/// gRPC metadata Leader Aware Routing header
///
/// Not well documented. Added to clients in early 2023 defaulting to disabled.
/// Clients have began defaulting it to enabled in late 2023.
///
/// "Enabling leader aware routing would route all requests in RW/PDML
/// transactions to the leader region." as described by other Spanner clients
const LEADER_AWARE_KEY: &str = "x-goog-spanner-route-to-leader";
/// Our user agent value for [METRICS_KEY]
const USER_AGENT: &str = "gcp-grpc-rs";
/// Builds the [grpcio::Metadata] for all db operations
#[derive(Default)]
pub struct MetadataBuilder<'a> {
prefix: &'a str,
routing_params: Vec<(&'a str, &'a str)>,
route_to_leader: bool,
}
impl<'a> MetadataBuilder<'a> {
/// Initialize a new builder with a [PREFIX_KEY] header for the given
/// resource
pub fn with_prefix(prefix: &'a str) -> Self {
Self {
prefix,
..Default::default()
}
}
/// Add a [ROUTING_KEY] header
pub fn routing_param(mut self, key: &'a str, value: &'a str) -> Self {
self.routing_params.push((key, value));
self
}
/// Toggle the [LEADER_AWARE_KEY] header
pub fn route_to_leader(mut self, route_to_leader: bool) -> Self {
self.route_to_leader = route_to_leader;
self
}
/// Build the [grpcio::Metadata]
pub fn build(self) -> Result<grpcio::Metadata, grpcio::Error> {
let mut meta = grpcio::MetadataBuilder::new();
meta.add_str(PREFIX_KEY, self.prefix)?;
meta.add_str(METRICS_KEY, USER_AGENT)?;
if self.route_to_leader {
meta.add_str(LEADER_AWARE_KEY, "true")?;
}
if !self.routing_params.is_empty() {
meta.add_str(ROUTING_KEY, &self.routing_header())?;
}
Ok(meta.build())
}
fn routing_header(self) -> String {
let mut ser = form_urlencoded::Serializer::new(String::new());
for (key, val) in self.routing_params {
ser.append_pair(key, val);
}
// python-spanner (python-api-core) doesn't encode '/':
// https://github.com/googleapis/python-api-core/blob/6251eab/google/api_core/gapic_v1/routing_header.py#L85
ser.finish().replace("%2F", "/")
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, str};
use super::{
MetadataBuilder, LEADER_AWARE_KEY, METRICS_KEY, PREFIX_KEY, ROUTING_KEY, USER_AGENT,
};
pub const DB: &str = "/projects/sync/instances/test/databases/sync1";
pub const SESSION: &str = "/projects/sync/instances/test/databases/sync1/sessions/f00B4r_quuX";
#[test]
fn basic() {
let meta = MetadataBuilder::with_prefix(DB)
.routing_param("session", SESSION)
.routing_param("foo", "bar baz")
.build()
.unwrap();
let meta: HashMap<_, _> = meta.into_iter().collect();
assert_eq!(meta.len(), 3);
assert_eq!(str::from_utf8(meta.get(PREFIX_KEY).unwrap()).unwrap(), DB);
assert_eq!(
str::from_utf8(meta.get(METRICS_KEY).unwrap()).unwrap(),
USER_AGENT
);
assert_eq!(
str::from_utf8(meta.get(ROUTING_KEY).unwrap()).unwrap(),
format!("session={SESSION}&foo=bar+baz")
);
}
#[test]
fn leader_aware() {
let meta = MetadataBuilder::with_prefix(DB)
.route_to_leader(true)
.build()
.unwrap();
let meta: HashMap<_, _> = meta.into_iter().collect();
assert_eq!(meta.len(), 3);
assert_eq!(
str::from_utf8(meta.get(LEADER_AWARE_KEY).unwrap()).unwrap(),
"true"
);
}
}

View File

@ -30,7 +30,7 @@ use syncstorage_db_common::{
};
use syncstorage_settings::Quota;
use super::{
use crate::{
batch,
error::DbError,
pool::{CollectionCache, Conn},
@ -74,10 +74,6 @@ struct SpannerDbSession {
pub struct SpannerDb {
pub(super) inner: Arc<SpannerDbInner>,
/// Whether the put/post_bsos methods use Spanner mutations (which should
/// be more efficient for those specific bulk operations)
use_mutations: bool,
/// Pool level cache of collection_ids and their names
coll_cache: Arc<CollectionCache>,
@ -108,7 +104,6 @@ impl Deref for SpannerDb {
impl SpannerDb {
pub(super) fn new(
conn: Conn,
use_mutations: bool,
coll_cache: Arc<CollectionCache>,
metrics: &Metrics,
quota: Quota,
@ -122,7 +117,6 @@ impl SpannerDb {
// https://github.com/mozilla-services/syncstorage-rs/issues/1480
#[allow(clippy::arc_with_non_send_sync)]
inner: Arc::new(inner),
use_mutations,
coll_cache,
metrics: metrics.clone(),
quota,
@ -329,7 +323,9 @@ impl SpannerDb {
let mut req = BeginTransactionRequest::new();
req.set_session(spanner.session.get_name().to_owned());
req.set_options(options);
let mut transaction = spanner.client.begin_transaction(&req)?;
let mut transaction = spanner
.client
.begin_transaction_opt(&req, spanner.session_opt()?)?;
let mut ts = TransactionSelector::new();
ts.set_id(transaction.take_id());
@ -349,7 +345,10 @@ impl SpannerDb {
let mut req = BeginTransactionRequest::new();
req.set_session(spanner.session.get_name().to_owned());
req.set_options(options);
let mut transaction = spanner.client.begin_transaction_async(&req)?.await?;
let mut transaction = spanner
.client
.begin_transaction_async_opt(&req, spanner.session_opt()?)?
.await?;
let mut ts = TransactionSelector::new();
ts.set_id(transaction.take_id());
@ -454,7 +453,7 @@ impl SpannerDb {
let spanner = &self.conn;
if cfg!(debug_assertions) && spanner.use_test_transactions {
if cfg!(debug_assertions) && spanner.settings.use_test_transactions {
// don't commit test transactions
return Ok(());
}
@ -466,7 +465,7 @@ impl SpannerDb {
if let Some(mutations) = self.session.borrow_mut().mutations.take() {
req.set_mutations(RepeatedField::from_vec(mutations));
}
spanner.client.commit(&req)?;
spanner.client.commit_opt(&req, spanner.session_opt()?)?;
Ok(())
} else {
Err(DbError::internal("No transaction to commit".to_owned()))
@ -481,7 +480,7 @@ impl SpannerDb {
let spanner = &self.conn;
if cfg!(debug_assertions) && spanner.use_test_transactions {
if cfg!(debug_assertions) && spanner.settings.use_test_transactions {
// don't commit test transactions
return Ok(());
}
@ -493,7 +492,10 @@ impl SpannerDb {
if let Some(mutations) = self.session.borrow_mut().mutations.take() {
req.set_mutations(RepeatedField::from_vec(mutations));
}
spanner.client.commit_async(&req)?.await?;
spanner
.client
.commit_async_opt(&req, spanner.session_opt()?)?
.await?;
Ok(())
} else {
Err(DbError::internal("No transaction to commit".to_owned()))
@ -511,7 +513,7 @@ impl SpannerDb {
let mut req = RollbackRequest::new();
req.set_session(spanner.session.get_name().to_owned());
req.set_transaction_id(transaction.get_id().to_vec());
spanner.client.rollback(&req)?;
spanner.client.rollback_opt(&req, spanner.session_opt()?)?;
Ok(())
} else {
Err(DbError::internal("No transaction to rollback".to_owned()))
@ -529,7 +531,10 @@ impl SpannerDb {
let mut req = RollbackRequest::new();
req.set_session(spanner.session.get_name().to_owned());
req.set_transaction_id(transaction.get_id().to_vec());
spanner.client.rollback_async(&req)?.await?;
spanner
.client
.rollback_async_opt(&req, spanner.session_opt()?)?
.await?;
Ok(())
} else {
Err(DbError::internal("No transaction to rollback".to_owned()))
@ -1306,7 +1311,7 @@ impl SpannerDb {
/// Whether to stabilize the sort order for get_bsos_async
fn stabilize_bsos_sort_order(&self) -> bool {
self.inner.conn.using_spanner_emulator
self.inner.conn.settings.using_spanner_emulator()
}
pub fn encode_next_offset(
@ -1506,7 +1511,7 @@ impl SpannerDb {
}
async fn put_bso_async(&self, params: params::PutBso) -> DbResult<results::PutBso> {
if self.use_mutations {
if self.conn.settings.use_mutations {
self.put_bso_with_mutations(params).await
} else {
self.put_bso_without_mutations(params).await
@ -1534,7 +1539,7 @@ impl SpannerDb {
}
async fn post_bsos_async(&self, params: params::PostBsos) -> DbResult<results::PostBsos> {
if self.use_mutations {
if self.conn.settings.use_mutations {
self.post_bsos_with_mutations(params).await
} else {
self.post_bsos_without_mutations(params).await

View File

@ -19,9 +19,6 @@ use super::{
pub struct SpannerDbPool {
/// Pool of db connections
pool: deadpool::managed::Pool<SpannerSession, DbError>,
/// Whether `SpannerDb` use Spanner mutations (which should be more
/// efficient for their bulk operations)
use_mutations: bool,
/// In-memory cache of collection_ids and their names
coll_cache: Arc<CollectionCache>,
@ -56,14 +53,9 @@ impl SpannerDbPool {
};
let config = deadpool::managed::PoolConfig { max_size, timeouts };
let pool = deadpool::managed::Pool::from_config(manager, config);
#[cfg(not(debug_assertions))]
let use_mutations = true;
#[cfg(debug_assertions)]
let use_mutations = settings.database_spanner_use_mutations;
Ok(Self {
pool,
use_mutations,
coll_cache: Default::default(),
metrics: metrics.clone(),
quota: Quota {
@ -83,7 +75,6 @@ impl SpannerDbPool {
})?;
Ok(SpannerDb::new(
conn,
self.use_mutations,
Arc::clone(&self.coll_cache),
&self.metrics,
self.quota,

View File

@ -18,7 +18,7 @@ use syncstorage_db_common::{
params, results, util::to_rfc3339, util::SyncTimestamp, UserIdentifier, DEFAULT_BSO_TTL,
};
use super::{error::DbError, pool::Conn, DbResult};
use crate::{error::DbError, pool::Conn, DbResult};
pub trait IntoSpannerValue {
const TYPE_CODE: TypeCode;
@ -167,7 +167,7 @@ impl ExecuteSqlRequestBuilder {
pub fn execute_async(self, conn: &Conn) -> DbResult<StreamedResultSetAsync> {
let stream = conn
.client
.execute_streaming_sql(&self.prepare_request(conn))?;
.execute_streaming_sql_opt(&self.prepare_request(conn), conn.session_opt()?)?;
Ok(StreamedResultSetAsync::new(stream))
}
@ -175,7 +175,7 @@ impl ExecuteSqlRequestBuilder {
pub async fn execute_dml_async(self, conn: &Conn) -> DbResult<i64> {
let rs = conn
.client
.execute_sql_async(&self.prepare_request(conn))?
.execute_sql_async_opt(&self.prepare_request(conn), conn.session_opt()?)?
.await?;
Ok(rs.get_stats().get_row_count_exact())
}