f read the spanner data from the spanner DSN

This commit is contained in:
jrconlin 2019-10-18 14:39:30 -07:00
parent ff8b38d21b
commit c085ff8b2c
No known key found for this signature in database
GPG Key ID: 91B7F708D9FC4D84
2 changed files with 52 additions and 13 deletions

View File

@ -5,16 +5,35 @@
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import os
from urllib import parse
from google.cloud import spanner
# Change these to match your install.
instance_id = os.environ.get("INSTANCE_ID", 'spanner-test')
database_id = os.environ.get("DATABASE_ID", 'sync_stage')
# Change these to match your install.
client = spanner.Client()
def from_env():
try:
url = os.environ.get("SYNC_DATABASE_URL")
if not url:
raise Exception("no url")
purl = parse.urlparse(url)
if purl.scheme == "spanner":
path = purl.path.split("/")
instance_id = path[-3]
database_id = path[-1]
except Exception as e:
# Change these to reflect your Spanner instance install
print("Exception {}".format(e))
instance_id = os.environ.get("INSTANCE_ID", "spanner-test")
database_id = os.environ.get("DATABASE_ID", "sync_stage")
return (instance_id, database_id)
def spanner_read_data(request=None):
(instance_id, database_id) = from_env()
instance = client.instance(instance_id)
database = instance.database(database_id)
outputs = []
@ -31,5 +50,6 @@ def spanner_read_data(request=None):
outputs.append("bso: removed {} rows".format(result))
return '\n'.join(outputs)
if __name__ == "__main__":
print(spanner_read_data())
print(spanner_read_data())

View File

@ -1,6 +1,6 @@
# Preload Spanner Database
#
# Spanner increases efficiency when there is a minimum of 300G of
# Spanner increases efficiency when there is a minimum of 300G of
# data stored. This script preloads a minimal set of data to trigger
# that level of optimization.
#
@ -8,6 +8,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import os
from urllib import parse
import random
import string
@ -20,9 +21,6 @@ from google.api_core.exceptions import AlreadyExists
from google.cloud import spanner
from google.cloud.spanner_v1 import param_types
# Change these to reflect your Spanner instance install
INSTANCE_ID = os.environ.get("INSTANCE_ID", "spanner-test")
DATABASE_ID = os.environ.get("DATABASE_ID", "sync_stage")
# max batch size for this write is 2000, otherwise we run into:
"""google.api_core.exceptions.InvalidArgument: 400 The transaction
@ -63,13 +61,14 @@ COLL_ID = 100
# PAYLOAD_SIZE = 2500000
# PAYLOAD_SIZE = 1000000
"""
google.api_core.exceptions.InvalidArgument: 400 The transaction exceeds
google.api_core.exceptions.InvalidArgument: 400 The transaction exceeds
the maximum total bytes-size that can be handled by Spanner. Please reduce the
size or number of the writes, or use fewer indexes. (Maximum size: 104857600)
"""
# PAYLOAD_SIZE = 50000
PAYLOAD_SIZE = 25000
# fake a base64 like payload. Not strictly neccessary, but may help ML routines.
# fake a base64 like payload. Not strictly neccessary, but may help ML
# routines.
PAYLOAD = ''.join(
random.choice(
string.digits + string.ascii_uppercase + string.ascii_lowercase + "-_="
@ -88,7 +87,7 @@ def load(instance, db, fxa_uid, fxa_kid, coll_id):
def create_user(txn):
txn.execute_update(
"""\
INSERT INTO user_collections
INSERT INTO user_collections
(fxa_uid, fxa_kid, collection_id, modified)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified)
""",
@ -174,14 +173,34 @@ def load(instance, db, fxa_uid, fxa_kid, coll_id):
))
def from_env():
try:
url = os.environ.get("SYNC_DATABASE_URL")
if not url:
raise Exception("no url")
purl = parse.urlparse(url)
if purl.scheme == "spanner":
path = purl.path.split("/")
instance_id = path[-3]
database_id = path[-1]
except Exception as e:
# Change these to reflect your Spanner instance install
print("Exception {}".format(e))
instance_id = os.environ.get("INSTANCE_ID", "spanner-test")
database_id = os.environ.get("DATABASE_ID", "sync_stage")
return (instance_id, database_id)
def loader():
# Prefix uaids for easy filtering later
# Each loader thread gets it's own fake user to prevent some hotspot issues.
# Each loader thread gets it's own fake user to prevent some hotspot
# issues.
(instance_id, database_id) = from_env()
fxa_uid = "DEADBEEF" + uuid.uuid4().hex[8:]
fxa_kid = "{:013d}-{}".format(22, fxa_uid)
name = threading.current_thread().getName()
print("{} -> Loading {} {}".format(name, fxa_uid, fxa_kid))
load(INSTANCE_ID, DATABASE_ID, fxa_uid, fxa_kid, COLL_ID)
load(instance_id, database_id, fxa_uid, fxa_kid, COLL_ID)
def main():