From 59aa28a4e5fdcfe2acc3f767487066d30b998af0 Mon Sep 17 00:00:00 2001 From: Mark Drobnak Date: Tue, 11 Aug 2020 15:05:11 -0400 Subject: [PATCH] feat: More purge_ttl features (#776) * Support a mode option in purge_ttl * Support an expiry mode option in purge_ttl * Support serially deleting prefixes by regex Also changes the collection IDs option from a JSON list to an args list (i.e. [item1,item2,item3]). Closes #735 Closes #743 --- tools/spanner/purge_ttl.py | 135 ++++++++++++++++++++++++++----------- 1 file changed, 95 insertions(+), 40 deletions(-) diff --git a/tools/spanner/purge_ttl.py b/tools/spanner/purge_ttl.py index 24a0821b..2ee8e1d2 100644 --- a/tools/spanner/purge_ttl.py +++ b/tools/spanner/purge_ttl.py @@ -5,15 +5,16 @@ # file, You can obtain one at https://mozilla.org/MPL/2.0/. import argparse -import json +import logging import os import sys -import logging from datetime import datetime -from statsd.defaults.env import statsd +from typing import List, Optional from urllib import parse from google.cloud import spanner +from google.cloud.spanner_v1.database import Database +from statsd.defaults.env import statsd # set up logger logging.basicConfig( @@ -41,18 +42,25 @@ def use_dsn(args): return args -def deleter(database, name, query): +def deleter(database: Database, name: str, query: str, prefix: Optional[str]): with statsd.timer("syncstorage.purge_ttl.{}_duration".format(name)): logging.info("Running: {}".format(query)) start = datetime.now() result = database.execute_partitioned_dml(query) end = datetime.now() logging.info( - "{name}: removed {result} rows, {name}_duration: {time}".format( - name=name, result=result, time=end - start)) + "{name}: removed {result} rows, {name}_duration: {time}, prefix: {prefix}".format( + name=name, result=result, time=end - start, prefix=prefix)) -def add_conditions(args, query): +def add_conditions(args, query: str, prefix: Optional[str]): + """ + Add SQL conditions to a query. + :param args: The program arguments + :param query: The SQL query + :param prefix: The current prefix, if given + :return: The updated SQL query + """ if args.collection_ids: query += " AND collection_id" if len(args.collection_ids) == 1: @@ -60,40 +68,58 @@ def add_conditions(args, query): else: query += " in ({})".format( ', '.join(map(str, args.collection_ids))) - if args.uid_starts: - query += " AND fxa_uid LIKE \"{}%\"".format(args.uid_starts) + if prefix: + query += ' AND REGEXP_CONTAINS(fxa_uaid, r"{}")'.format(prefix) return query -def spanner_purge(args, request=None): +def get_expiry_condition(args): + """ + Get the expiry SQL WHERE condition to use + :param args: The program arguments + :return: A SQL snippet to use in the WHERE clause + """ + if args.expiry_mode == "now": + return 'expiry < CURRENT_TIMESTAMP()' + elif args.expiry_mode == "midnight": + return 'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")' + else: + raise Exception("Invalid expiry mode: {}".format(args.expiry_mode)) + + +def spanner_purge(args): instance = client.instance(args.instance_id) database = instance.database(args.database_id) + expiry_condition = get_expiry_condition(args) + prefixes = args.uid_prefixes if args.uid_prefixes else [None] - logging.info("For {}:{}".format(args.instance_id, args.database_id)) - batch_query = ( - 'DELETE FROM batches WHERE ' - 'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")' - ) - bso_query = add_conditions( - args, - 'DELETE FROM bsos WHERE ' - 'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")' - ) + for prefix in prefixes: + logging.info("For {}:{}, prefix = {}".format(args.instance_id, args.database_id, prefix)) - # Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE - # IN PARENT batches ON DELETE CASCADE) + if args.mode in ["batches", "both"]: + # Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE + # IN PARENT batches ON DELETE CASCADE) + batch_query = 'DELETE FROM batches WHERE {}'.format(expiry_condition) + deleter( + database, + name="batches", + query=batch_query, + prefix=prefix + ) - deleter( - database, - name="batches", - query=batch_query - ) - # Delete BSOs - deleter( - database, - name="bso", - query=bso_query - ) + if args.mode in ["bsos", "both"]: + # Delete BSOs + bso_query = add_conditions( + args, + 'DELETE FROM bsos WHERE {}'.format(expiry_condition), + prefix + ) + deleter( + database, + name="bso", + query=bso_query, + prefix=prefix + ) def get_args(): @@ -120,25 +146,54 @@ def get_args(): ) parser.add_argument( "--collection_ids", + type=parse_args_list, default=os.environ.get("COLLECTION_IDS", "[]"), - help="JSON array of collection IDs to purge" + help="Array of collection IDs to purge" ) parser.add_argument( - "--uid_starts", + "--uid_prefixes", + type=parse_args_list, + default=os.environ.get("PURGE_UID_PREFIXES", "[]"), + help="Array of regex strings used to limit purges based on UID. " + "Each entry is a separate purge run." + ) + parser.add_argument( + "--mode", type=str, - help="Limit to UIDs starting with specified characters" + choices=["batches", "bsos", "both"], + default=os.environ.get("PURGE_MODE", "both"), + help="Purge TTLs in batches, bsos, or both" + ) + parser.add_argument( + "--expiry_mode", + type=str, + choices=["now", "midnight"], + default=os.environ.get("PURGE_EXPIRY_MODE", "midnight"), + help="Choose the timestamp used to check if an entry is expired" ) args = parser.parse_args() - collections = json.loads(args.collection_ids) - if not isinstance(collections, list): - collections = [collections] - args.collection_ids = collections + # override using the DSN URL: if args.sync_database_url: args = use_dsn(args) + return args +def parse_args_list(args_list: str) -> List[str]: + """ + Parse a list of items (or a single string) into a list of strings. + Example input: [item1,item2,item3] + :param args_list: The list/string + :return: A list of strings + """ + if args_list[0] != "[" or args_list[-1] != "]": + # Assume it's a single item + return [args_list] + + return args_list[1:-1].split(",") + + if __name__ == "__main__": args = get_args() with statsd.timer("syncstorage.purge_ttl.total_duration"):