feat: More purge_ttl features (#776)

* Support a mode option in purge_ttl

* Support an expiry mode option in purge_ttl

* Support serially deleting prefixes by regex

Also changes the collection IDs option from a JSON list to an args list
(i.e. [item1,item2,item3]).

Closes #735
Closes #743
This commit is contained in:
Mark Drobnak 2020-08-11 15:05:11 -04:00 committed by GitHub
parent 7d1061f719
commit 59aa28a4e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -5,15 +5,16 @@
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import argparse
import json
import logging
import os
import sys
import logging
from datetime import datetime
from statsd.defaults.env import statsd
from typing import List, Optional
from urllib import parse
from google.cloud import spanner
from google.cloud.spanner_v1.database import Database
from statsd.defaults.env import statsd
# set up logger
logging.basicConfig(
@ -41,18 +42,25 @@ def use_dsn(args):
return args
def deleter(database, name, query):
def deleter(database: Database, name: str, query: str, prefix: Optional[str]):
with statsd.timer("syncstorage.purge_ttl.{}_duration".format(name)):
logging.info("Running: {}".format(query))
start = datetime.now()
result = database.execute_partitioned_dml(query)
end = datetime.now()
logging.info(
"{name}: removed {result} rows, {name}_duration: {time}".format(
name=name, result=result, time=end - start))
"{name}: removed {result} rows, {name}_duration: {time}, prefix: {prefix}".format(
name=name, result=result, time=end - start, prefix=prefix))
def add_conditions(args, query):
def add_conditions(args, query: str, prefix: Optional[str]):
"""
Add SQL conditions to a query.
:param args: The program arguments
:param query: The SQL query
:param prefix: The current prefix, if given
:return: The updated SQL query
"""
if args.collection_ids:
query += " AND collection_id"
if len(args.collection_ids) == 1:
@ -60,40 +68,58 @@ def add_conditions(args, query):
else:
query += " in ({})".format(
', '.join(map(str, args.collection_ids)))
if args.uid_starts:
query += " AND fxa_uid LIKE \"{}%\"".format(args.uid_starts)
if prefix:
query += ' AND REGEXP_CONTAINS(fxa_uaid, r"{}")'.format(prefix)
return query
def spanner_purge(args, request=None):
def get_expiry_condition(args):
"""
Get the expiry SQL WHERE condition to use
:param args: The program arguments
:return: A SQL snippet to use in the WHERE clause
"""
if args.expiry_mode == "now":
return 'expiry < CURRENT_TIMESTAMP()'
elif args.expiry_mode == "midnight":
return 'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")'
else:
raise Exception("Invalid expiry mode: {}".format(args.expiry_mode))
def spanner_purge(args):
instance = client.instance(args.instance_id)
database = instance.database(args.database_id)
expiry_condition = get_expiry_condition(args)
prefixes = args.uid_prefixes if args.uid_prefixes else [None]
logging.info("For {}:{}".format(args.instance_id, args.database_id))
batch_query = (
'DELETE FROM batches WHERE '
'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")'
)
bso_query = add_conditions(
args,
'DELETE FROM bsos WHERE '
'expiry < TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY, "UTC")'
)
for prefix in prefixes:
logging.info("For {}:{}, prefix = {}".format(args.instance_id, args.database_id, prefix))
# Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE
# IN PARENT batches ON DELETE CASCADE)
if args.mode in ["batches", "both"]:
# Delete Batches. Also deletes child batch_bsos rows (INTERLEAVE
# IN PARENT batches ON DELETE CASCADE)
batch_query = 'DELETE FROM batches WHERE {}'.format(expiry_condition)
deleter(
database,
name="batches",
query=batch_query,
prefix=prefix
)
deleter(
database,
name="batches",
query=batch_query
)
# Delete BSOs
deleter(
database,
name="bso",
query=bso_query
)
if args.mode in ["bsos", "both"]:
# Delete BSOs
bso_query = add_conditions(
args,
'DELETE FROM bsos WHERE {}'.format(expiry_condition),
prefix
)
deleter(
database,
name="bso",
query=bso_query,
prefix=prefix
)
def get_args():
@ -120,25 +146,54 @@ def get_args():
)
parser.add_argument(
"--collection_ids",
type=parse_args_list,
default=os.environ.get("COLLECTION_IDS", "[]"),
help="JSON array of collection IDs to purge"
help="Array of collection IDs to purge"
)
parser.add_argument(
"--uid_starts",
"--uid_prefixes",
type=parse_args_list,
default=os.environ.get("PURGE_UID_PREFIXES", "[]"),
help="Array of regex strings used to limit purges based on UID. "
"Each entry is a separate purge run."
)
parser.add_argument(
"--mode",
type=str,
help="Limit to UIDs starting with specified characters"
choices=["batches", "bsos", "both"],
default=os.environ.get("PURGE_MODE", "both"),
help="Purge TTLs in batches, bsos, or both"
)
parser.add_argument(
"--expiry_mode",
type=str,
choices=["now", "midnight"],
default=os.environ.get("PURGE_EXPIRY_MODE", "midnight"),
help="Choose the timestamp used to check if an entry is expired"
)
args = parser.parse_args()
collections = json.loads(args.collection_ids)
if not isinstance(collections, list):
collections = [collections]
args.collection_ids = collections
# override using the DSN URL:
if args.sync_database_url:
args = use_dsn(args)
return args
def parse_args_list(args_list: str) -> List[str]:
"""
Parse a list of items (or a single string) into a list of strings.
Example input: [item1,item2,item3]
:param args_list: The list/string
:return: A list of strings
"""
if args_list[0] != "[" or args_list[-1] != "]":
# Assume it's a single item
return [args_list]
return args_list[1:-1].split(",")
if __name__ == "__main__":
args = get_args()
with statsd.timer("syncstorage.purge_ttl.total_duration"):