feat: switch from regex_contains to starts_with (#805)

* added add'l command aliases
  * `--ids` == `--collection_ids`
  * `--prefix` == `--uid_prefixes`
* switched to parameterized DML

Issue #799

Co-authored-by: Donovan Preston <donovanpreston@gmail.com>
This commit is contained in:
JR Conlin 2020-09-01 08:11:38 -07:00 committed by GitHub
parent 5969d74e95
commit a79f8407de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,6 +14,7 @@ from urllib import parse
from google.cloud import spanner
from google.cloud.spanner_v1.database import Database
from google.cloud.spanner_v1 import param_types
from statsd.defaults.env import statsd
# set up logger
@ -42,13 +43,19 @@ def use_dsn(args):
return args
def deleter(database: Database, name: str, query: str, prefix: Optional[str]=None, dryrun: Optional[bool]=False):
def deleter(database: Database,
name: str,
query: str,
prefix: Optional[str]=None,
params: Optional[dict]=None,
param_types: Optional[dict]=None,
dryrun: Optional[bool]=False):
with statsd.timer("syncstorage.purge_ttl.{}_duration".format(name)):
logging.info("Running: {}".format(query))
logging.info("Running: {} :: {}".format(query, params))
start = datetime.now()
result = 0
if not dryrun:
result = database.execute_partitioned_dml(query)
result = database.execute_partitioned_dml(query, params=params, param_types=param_types)
end = datetime.now()
logging.info(
"{name}: removed {result} rows, {name}_duration: {time}, prefix: {prefix}".format(
@ -61,20 +68,30 @@ def add_conditions(args, query: str, prefix: Optional[str]):
:param args: The program arguments
:param query: The SQL query
:param prefix: The current prefix, if given
:return: The updated SQL query
:return: The updated SQL query, and list of params
"""
params = {}
types = {}
if args.collection_ids:
ids = list(filter(len, args.collection_ids))
if ids:
query += " AND collection_id"
if len(ids) == 1:
query += " = {}".format(ids[0])
query += " = @collection_id".format(ids[0])
params['collection_id'] = ids[0]
types['collection_id'] = param_types.INT64
else:
query += " in ({})".format(
', '.join(ids))
for count,id in enumerate(ids):
name = 'collection_id_{}'.format(count)
params[name] = id
types[name] = param_types.INT64
query += " in (@{})".format(
', @'.join(params.keys()))
if prefix:
query += ' AND REGEXP_CONTAINS(fxa_uid, r"{}")'.format(prefix)
return query
query += ' AND STARTS_WITH(fxa_uid, @prefix)'.format(prefix)
params['prefix'] = prefix
types['prefix'] = param_types.STRING
return (query, params, types)
def get_expiry_condition(args):
@ -103,7 +120,7 @@ def spanner_purge(args):
if args.mode in ["batches", "both"]:
# Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE
# IN PARENT batches ON DELETE CASCADE)
batch_query = add_conditions(
(batch_query, params, types) = add_conditions(
args,
'DELETE FROM batches WHERE {}'.format(expiry_condition),
prefix,
@ -112,13 +129,15 @@ def spanner_purge(args):
database,
name="batches",
query=batch_query,
params=params,
param_types=types,
prefix=prefix,
dryrun=args.dryrun,
)
if args.mode in ["bsos", "both"]:
# Delete BSOs
bso_query = add_conditions(
(bso_query, params, types) = add_conditions(
args,
'DELETE FROM bsos WHERE {}'.format(expiry_condition),
prefix
@ -127,6 +146,8 @@ def spanner_purge(args):
database,
name="bso",
query=bso_query,
params=params,
param_types=types,
prefix=prefix,
dryrun=args.dryrun,
)
@ -156,15 +177,17 @@ def get_args():
)
parser.add_argument(
"--collection_ids",
"--ids",
type=parse_args_list,
default=os.environ.get("COLLECTION_IDS", "[]"),
help="Array of collection IDs to purge"
)
parser.add_argument(
"--uid_prefixes",
"--prefix",
type=parse_args_list,
default=os.environ.get("PURGE_UID_PREFIXES", "[]"),
help="Array of regex strings used to limit purges based on UID. "
help="Array of strings used to limit purges based on UID. "
"Each entry is a separate purge run."
)
parser.add_argument(