feat: add conditions, args to purge_ttl script (#668)

* feat: add conditions, args to purge_ttl script

attempt to try and provide a way to allow the purge_ttl script to
complete.
* Adds arguments (ENV VARS):

  --instance_id (INSTANCE_ID)  Spanner instance id
  --database_id (DATABASE_ID)  Spanner database id
  --sync_database_url (SYNC_DATABASE_URL) Spanner DSN
        `spanner://instance/database`
  --collection_ids (COLLECTION_IDS)
        JSON formatted list of collections to limit deletions
        e.g. `--collection_ids=123` limits to just collection 123
             `--collection_ids=[123,456]` limits to both 123 & 456
             default is all collections

Issue #631

Co-authored-by: Philip Jenvey <pjenvey@underboss.org>
This commit is contained in:
JR Conlin 2020-06-12 15:35:39 -07:00 committed by GitHub
parent 8839e52f87
commit 2a14eb2973
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 90 additions and 29 deletions

View File

@ -4,6 +4,8 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import argparse
import json
import os
import sys
import logging
@ -23,57 +25,116 @@ logging.basicConfig(
client = spanner.Client()
def from_env():
def use_dsn(args):
try:
url = os.environ.get("SYNC_DATABASE_URL")
if not url:
if not args.sync_database_url:
raise Exception("no url")
url = args.sync_database_url
purl = parse.urlparse(url)
if purl.scheme == "spanner":
path = purl.path.split("/")
instance_id = path[-3]
database_id = path[-1]
args.instance_id = path[-3]
args.database_id = path[-1]
except Exception as e:
# Change these to reflect your Spanner instance install
print("Exception {}".format(e))
instance_id = os.environ.get("INSTANCE_ID", "spanner-test")
database_id = os.environ.get("DATABASE_ID", "sync_stage")
return (instance_id, database_id)
return args
def spanner_read_data(request=None):
(instance_id, database_id) = from_env()
instance = client.instance(instance_id)
database = instance.database(database_id)
def deleter(database, name, query):
with statsd.timer("syncstorage.purge_ttl.{}_duration".format(name)):
logging.info("Running: {}".format(query))
start = datetime.now()
result = database.execute_partitioned_dml(query)
end = datetime.now()
logging.info(
"{name}: removed {result} rows, {name}_duration: {time}".format(
name=name, result=result, time=end - start))
logging.info("For {}:{}".format(instance_id, database_id))
def add_conditions(args, query):
if args.collection_ids:
query += " AND collection_id"
if len(args.collection_ids) == 1:
query += " = {:d}".format(args.collection_ids[0])
else:
query += " in ({})".format(
', '.join(map(str, args.collection_ids)))
return query
def spanner_purge(args, request=None):
instance = client.instance(args.instance_id)
database = instance.database(args.database_id)
logging.info("For {}:{}".format(args.instance_id, args.database_id))
batch_query = 'DELETE FROM batches WHERE expiry < CURRENT_TIMESTAMP()'
bso_query = add_conditions(
args,
'DELETE FROM bsos WHERE expiry < CURRENT_TIMESTAMP()'
)
# Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE
# IN PARENT batches ON DELETE CASCADE)
with statsd.timer("syncstorage.purge_ttl.batches_duration"):
batches_start = datetime.now()
query = 'DELETE FROM batches WHERE expiry < CURRENT_TIMESTAMP()'
result = database.execute_partitioned_dml(query)
batches_end = datetime.now()
logging.info("batches: removed {} rows, batches_duration: {}".format(
result, batches_end - batches_start))
deleter(
database,
name="batches",
query=batch_query
)
# Delete BSOs
with statsd.timer("syncstorage.purge_ttl.bso_duration"):
bso_start = datetime.now()
query = 'DELETE FROM bsos WHERE expiry < CURRENT_TIMESTAMP()'
result = database.execute_partitioned_dml(query)
bso_end = datetime.now()
logging.info("bso: removed {} rows, bso_duration: {}".format(
result, bso_end - bso_start))
deleter(
database,
name="bso",
query=bso_query
)
def get_args():
parser = argparse.ArgumentParser(
description="Purge old TTLs"
)
parser.add_argument(
"-i",
"--instance_id",
default=os.environ.get("INSTANCE_ID", "spanner-test"),
help="Spanner instance ID"
)
parser.add_argument(
"-d",
"--database_id",
default=os.environ.get("DATABASE_ID", "sync_schema3"),
help="Spanner Database ID"
)
parser.add_argument(
"-u",
"--sync_database_url",
default=os.environ.get("SYNC_DATABASE_URL"),
help="Spanner Database DSN"
)
parser.add_argument(
"--collection_ids",
default=os.environ.get("COLLECTION_IDS", "[]"),
help="JSON array of collection IDs to purge"
)
args = parser.parse_args()
collections = json.loads(args.collection_ids)
if not isinstance(collections, list):
collections = [collections]
args.collection_ids = collections
# override using the DSN URL:
if args.sync_database_url:
args = use_dsn(args)
return args
if __name__ == "__main__":
args = get_args()
with statsd.timer("syncstorage.purge_ttl.total_duration"):
start_time = datetime.now()
logging.info('Starting purge_ttl.py')
spanner_read_data()
spanner_purge(args)
end_time = datetime.now()
duration = end_time - start_time

View File

@ -1,2 +1,2 @@
google-cloud-spanner >=1.16.0
google-cloud-spanner >= 1.16.0
statsd