From 2a14eb2973997e2637ff0894e593642ba9a729f3 Mon Sep 17 00:00:00 2001 From: JR Conlin Date: Fri, 12 Jun 2020 15:35:39 -0700 Subject: [PATCH] feat: add conditions, args to purge_ttl script (#668) * feat: add conditions, args to purge_ttl script attempt to try and provide a way to allow the purge_ttl script to complete. * Adds arguments (ENV VARS): --instance_id (INSTANCE_ID) Spanner instance id --database_id (DATABASE_ID) Spanner database id --sync_database_url (SYNC_DATABASE_URL) Spanner DSN `spanner://instance/database` --collection_ids (COLLECTION_IDS) JSON formatted list of collections to limit deletions e.g. `--collection_ids=123` limits to just collection 123 `--collection_ids=[123,456]` limits to both 123 & 456 default is all collections Issue #631 Co-authored-by: Philip Jenvey --- tools/spanner/purge_ttl.py | 117 +++++++++++++++++++++++++-------- tools/spanner/requirements.txt | 2 +- 2 files changed, 90 insertions(+), 29 deletions(-) diff --git a/tools/spanner/purge_ttl.py b/tools/spanner/purge_ttl.py index 7ac9b694..22011e46 100644 --- a/tools/spanner/purge_ttl.py +++ b/tools/spanner/purge_ttl.py @@ -4,6 +4,8 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. +import argparse +import json import os import sys import logging @@ -23,57 +25,116 @@ logging.basicConfig( client = spanner.Client() -def from_env(): +def use_dsn(args): try: - url = os.environ.get("SYNC_DATABASE_URL") - if not url: + if not args.sync_database_url: raise Exception("no url") + url = args.sync_database_url purl = parse.urlparse(url) if purl.scheme == "spanner": path = purl.path.split("/") - instance_id = path[-3] - database_id = path[-1] + args.instance_id = path[-3] + args.database_id = path[-1] except Exception as e: # Change these to reflect your Spanner instance install print("Exception {}".format(e)) - instance_id = os.environ.get("INSTANCE_ID", "spanner-test") - database_id = os.environ.get("DATABASE_ID", "sync_stage") - return (instance_id, database_id) + return args -def spanner_read_data(request=None): - (instance_id, database_id) = from_env() - instance = client.instance(instance_id) - database = instance.database(database_id) +def deleter(database, name, query): + with statsd.timer("syncstorage.purge_ttl.{}_duration".format(name)): + logging.info("Running: {}".format(query)) + start = datetime.now() + result = database.execute_partitioned_dml(query) + end = datetime.now() + logging.info( + "{name}: removed {result} rows, {name}_duration: {time}".format( + name=name, result=result, time=end - start)) - logging.info("For {}:{}".format(instance_id, database_id)) + +def add_conditions(args, query): + if args.collection_ids: + query += " AND collection_id" + if len(args.collection_ids) == 1: + query += " = {:d}".format(args.collection_ids[0]) + else: + query += " in ({})".format( + ', '.join(map(str, args.collection_ids))) + return query + + +def spanner_purge(args, request=None): + instance = client.instance(args.instance_id) + database = instance.database(args.database_id) + + logging.info("For {}:{}".format(args.instance_id, args.database_id)) + batch_query = 'DELETE FROM batches WHERE expiry < CURRENT_TIMESTAMP()' + bso_query = add_conditions( + args, + 'DELETE FROM bsos WHERE expiry < CURRENT_TIMESTAMP()' + ) # Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE # IN PARENT batches ON DELETE CASCADE) - with statsd.timer("syncstorage.purge_ttl.batches_duration"): - batches_start = datetime.now() - query = 'DELETE FROM batches WHERE expiry < CURRENT_TIMESTAMP()' - result = database.execute_partitioned_dml(query) - batches_end = datetime.now() - logging.info("batches: removed {} rows, batches_duration: {}".format( - result, batches_end - batches_start)) + deleter( + database, + name="batches", + query=batch_query + ) # Delete BSOs - with statsd.timer("syncstorage.purge_ttl.bso_duration"): - bso_start = datetime.now() - query = 'DELETE FROM bsos WHERE expiry < CURRENT_TIMESTAMP()' - result = database.execute_partitioned_dml(query) - bso_end = datetime.now() - logging.info("bso: removed {} rows, bso_duration: {}".format( - result, bso_end - bso_start)) + deleter( + database, + name="bso", + query=bso_query + ) + + +def get_args(): + parser = argparse.ArgumentParser( + description="Purge old TTLs" + ) + parser.add_argument( + "-i", + "--instance_id", + default=os.environ.get("INSTANCE_ID", "spanner-test"), + help="Spanner instance ID" + ) + parser.add_argument( + "-d", + "--database_id", + default=os.environ.get("DATABASE_ID", "sync_schema3"), + help="Spanner Database ID" + ) + parser.add_argument( + "-u", + "--sync_database_url", + default=os.environ.get("SYNC_DATABASE_URL"), + help="Spanner Database DSN" + ) + parser.add_argument( + "--collection_ids", + default=os.environ.get("COLLECTION_IDS", "[]"), + help="JSON array of collection IDs to purge" + ) + args = parser.parse_args() + collections = json.loads(args.collection_ids) + if not isinstance(collections, list): + collections = [collections] + args.collection_ids = collections + # override using the DSN URL: + if args.sync_database_url: + args = use_dsn(args) + return args if __name__ == "__main__": + args = get_args() with statsd.timer("syncstorage.purge_ttl.total_duration"): start_time = datetime.now() logging.info('Starting purge_ttl.py') - spanner_read_data() + spanner_purge(args) end_time = datetime.now() duration = end_time - start_time diff --git a/tools/spanner/requirements.txt b/tools/spanner/requirements.txt index 2d3e3e12..8c7e35f3 100644 --- a/tools/spanner/requirements.txt +++ b/tools/spanner/requirements.txt @@ -1,2 +1,2 @@ -google-cloud-spanner >=1.16.0 +google-cloud-spanner >= 1.16.0 statsd