diff --git a/tools/spanner/purge_ttl.py b/tools/spanner/purge_ttl.py index c034e748..b3889020 100644 --- a/tools/spanner/purge_ttl.py +++ b/tools/spanner/purge_ttl.py @@ -14,6 +14,7 @@ from urllib import parse from google.cloud import spanner from google.cloud.spanner_v1.database import Database +from google.cloud.spanner_v1 import param_types from statsd.defaults.env import statsd # set up logger @@ -42,13 +43,19 @@ def use_dsn(args): return args -def deleter(database: Database, name: str, query: str, prefix: Optional[str]=None, dryrun: Optional[bool]=False): +def deleter(database: Database, + name: str, + query: str, + prefix: Optional[str]=None, + params: Optional[dict]=None, + param_types: Optional[dict]=None, + dryrun: Optional[bool]=False): with statsd.timer("syncstorage.purge_ttl.{}_duration".format(name)): - logging.info("Running: {}".format(query)) + logging.info("Running: {} :: {}".format(query, params)) start = datetime.now() result = 0 if not dryrun: - result = database.execute_partitioned_dml(query) + result = database.execute_partitioned_dml(query, params=params, param_types=param_types) end = datetime.now() logging.info( "{name}: removed {result} rows, {name}_duration: {time}, prefix: {prefix}".format( @@ -61,20 +68,30 @@ def add_conditions(args, query: str, prefix: Optional[str]): :param args: The program arguments :param query: The SQL query :param prefix: The current prefix, if given - :return: The updated SQL query + :return: The updated SQL query, and list of params """ + params = {} + types = {} if args.collection_ids: ids = list(filter(len, args.collection_ids)) if ids: query += " AND collection_id" if len(ids) == 1: - query += " = {}".format(ids[0]) + query += " = @collection_id".format(ids[0]) + params['collection_id'] = ids[0] + types['collection_id'] = param_types.INT64 else: - query += " in ({})".format( - ', '.join(ids)) + for count,id in enumerate(ids): + name = 'collection_id_{}'.format(count) + params[name] = id + types[name] = param_types.INT64 + query += " in (@{})".format( + ', @'.join(params.keys())) if prefix: - query += ' AND REGEXP_CONTAINS(fxa_uid, r"{}")'.format(prefix) - return query + query += ' AND STARTS_WITH(fxa_uid, @prefix)'.format(prefix) + params['prefix'] = prefix + types['prefix'] = param_types.STRING + return (query, params, types) def get_expiry_condition(args): @@ -103,7 +120,7 @@ def spanner_purge(args): if args.mode in ["batches", "both"]: # Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE # IN PARENT batches ON DELETE CASCADE) - batch_query = add_conditions( + (batch_query, params, types) = add_conditions( args, 'DELETE FROM batches WHERE {}'.format(expiry_condition), prefix, @@ -112,13 +129,15 @@ def spanner_purge(args): database, name="batches", query=batch_query, + params=params, + param_types=types, prefix=prefix, dryrun=args.dryrun, ) if args.mode in ["bsos", "both"]: # Delete BSOs - bso_query = add_conditions( + (bso_query, params, types) = add_conditions( args, 'DELETE FROM bsos WHERE {}'.format(expiry_condition), prefix @@ -127,6 +146,8 @@ def spanner_purge(args): database, name="bso", query=bso_query, + params=params, + param_types=types, prefix=prefix, dryrun=args.dryrun, ) @@ -156,15 +177,17 @@ def get_args(): ) parser.add_argument( "--collection_ids", + "--ids", type=parse_args_list, default=os.environ.get("COLLECTION_IDS", "[]"), help="Array of collection IDs to purge" ) parser.add_argument( "--uid_prefixes", + "--prefix", type=parse_args_list, default=os.environ.get("PURGE_UID_PREFIXES", "[]"), - help="Array of regex strings used to limit purges based on UID. " + help="Array of strings used to limit purges based on UID. " "Each entry is a separate purge run." ) parser.add_argument(