From c085ff8b2cb64f7b97c41c2caada22b2984db86b Mon Sep 17 00:00:00 2001 From: jrconlin Date: Fri, 18 Oct 2019 14:39:30 -0700 Subject: [PATCH] f read the spanner data from the spanner DSN --- tools/spanner/purge_ttl.py | 28 +++++++++++++++++++++++---- tools/spanner/write_batch.py | 37 +++++++++++++++++++++++++++--------- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/tools/spanner/purge_ttl.py b/tools/spanner/purge_ttl.py index e9aaab9a..64e24814 100644 --- a/tools/spanner/purge_ttl.py +++ b/tools/spanner/purge_ttl.py @@ -5,16 +5,35 @@ # file, You can obtain one at https://mozilla.org/MPL/2.0/. import os +from urllib import parse + from google.cloud import spanner -# Change these to match your install. -instance_id = os.environ.get("INSTANCE_ID", 'spanner-test') -database_id = os.environ.get("DATABASE_ID", 'sync_stage') +# Change these to match your install. client = spanner.Client() +def from_env(): + try: + url = os.environ.get("SYNC_DATABASE_URL") + if not url: + raise Exception("no url") + purl = parse.urlparse(url) + if purl.scheme == "spanner": + path = purl.path.split("/") + instance_id = path[-3] + database_id = path[-1] + except Exception as e: + # Change these to reflect your Spanner instance install + print("Exception {}".format(e)) + instance_id = os.environ.get("INSTANCE_ID", "spanner-test") + database_id = os.environ.get("DATABASE_ID", "sync_stage") + return (instance_id, database_id) + + def spanner_read_data(request=None): + (instance_id, database_id) = from_env() instance = client.instance(instance_id) database = instance.database(database_id) outputs = [] @@ -31,5 +50,6 @@ def spanner_read_data(request=None): outputs.append("bso: removed {} rows".format(result)) return '\n'.join(outputs) + if __name__ == "__main__": - print(spanner_read_data()) \ No newline at end of file + print(spanner_read_data()) diff --git a/tools/spanner/write_batch.py b/tools/spanner/write_batch.py index 2da26ce8..17176479 100644 --- a/tools/spanner/write_batch.py +++ b/tools/spanner/write_batch.py @@ -1,6 +1,6 @@ # Preload Spanner Database # -# Spanner increases efficiency when there is a minimum of 300G of +# Spanner increases efficiency when there is a minimum of 300G of # data stored. This script preloads a minimal set of data to trigger # that level of optimization. # @@ -8,6 +8,7 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. import os +from urllib import parse import random import string @@ -20,9 +21,6 @@ from google.api_core.exceptions import AlreadyExists from google.cloud import spanner from google.cloud.spanner_v1 import param_types -# Change these to reflect your Spanner instance install -INSTANCE_ID = os.environ.get("INSTANCE_ID", "spanner-test") -DATABASE_ID = os.environ.get("DATABASE_ID", "sync_stage") # max batch size for this write is 2000, otherwise we run into: """google.api_core.exceptions.InvalidArgument: 400 The transaction @@ -63,13 +61,14 @@ COLL_ID = 100 # PAYLOAD_SIZE = 2500000 # PAYLOAD_SIZE = 1000000 """ -google.api_core.exceptions.InvalidArgument: 400 The transaction exceeds +google.api_core.exceptions.InvalidArgument: 400 The transaction exceeds the maximum total bytes-size that can be handled by Spanner. Please reduce the size or number of the writes, or use fewer indexes. (Maximum size: 104857600) """ # PAYLOAD_SIZE = 50000 PAYLOAD_SIZE = 25000 -# fake a base64 like payload. Not strictly neccessary, but may help ML routines. +# fake a base64 like payload. Not strictly neccessary, but may help ML +# routines. PAYLOAD = ''.join( random.choice( string.digits + string.ascii_uppercase + string.ascii_lowercase + "-_=" @@ -88,7 +87,7 @@ def load(instance, db, fxa_uid, fxa_kid, coll_id): def create_user(txn): txn.execute_update( """\ - INSERT INTO user_collections + INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified) VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified) """, @@ -174,14 +173,34 @@ def load(instance, db, fxa_uid, fxa_kid, coll_id): )) +def from_env(): + try: + url = os.environ.get("SYNC_DATABASE_URL") + if not url: + raise Exception("no url") + purl = parse.urlparse(url) + if purl.scheme == "spanner": + path = purl.path.split("/") + instance_id = path[-3] + database_id = path[-1] + except Exception as e: + # Change these to reflect your Spanner instance install + print("Exception {}".format(e)) + instance_id = os.environ.get("INSTANCE_ID", "spanner-test") + database_id = os.environ.get("DATABASE_ID", "sync_stage") + return (instance_id, database_id) + + def loader(): # Prefix uaids for easy filtering later - # Each loader thread gets it's own fake user to prevent some hotspot issues. + # Each loader thread gets it's own fake user to prevent some hotspot + # issues. + (instance_id, database_id) = from_env() fxa_uid = "DEADBEEF" + uuid.uuid4().hex[8:] fxa_kid = "{:013d}-{}".format(22, fxa_uid) name = threading.current_thread().getName() print("{} -> Loading {} {}".format(name, fxa_uid, fxa_kid)) - load(INSTANCE_ID, DATABASE_ID, fxa_uid, fxa_kid, COLL_ID) + load(instance_id, database_id, fxa_uid, fxa_kid, COLL_ID) def main():