* make purge_ttl a real script
* pull data from env
* add `batches` purge
* delete to expiry
This commit is contained in:
jrconlin 2019-10-18 13:58:45 -07:00
parent 0deaade01f
commit 30a77a9d1b
No known key found for this signature in database
GPG Key ID: 91B7F708D9FC4D84
2 changed files with 21 additions and 9 deletions

View File

@ -4,21 +4,32 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import os
from google.cloud import spanner
# Change these to match your install.
instance_id = 'spanner-test'
database_id = 'sync_kid'
instance_id = os.environ.get("INSTANCE_ID", 'spanner-test')
database_id = os.environ.get("DATABASE_ID", 'sync_stage')
client = spanner.Client()
def spanner_read_data(request):
def spanner_read_data(request=None):
instance = client.instance(instance_id)
database = instance.database(database_id)
outputs = []
query = 'DELETE FROM bso WHERE ttl < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)'
outputs.append("For {}:{}".format(instance_id, database_id))
# Delete Batches
query = 'DELETE FROM batches WHERE expiry < CURRENT_TIMESTAMP()'
result = database.execute_partitioned_dml(query)
outputs.append(results)
outputs.append("batches: removed {} rows".format(result))
# Delete BSOs
query = 'DELETE FROM bso WHERE expiry < CURRENT_TIMESTAMP()'
result = database.execute_partitioned_dml(query)
outputs.append("bso: removed {} rows".format(result))
return '\n'.join(outputs)
if __name__ == "__main__":
print(spanner_read_data())

View File

@ -7,6 +7,7 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import os
import random
import string
@ -20,8 +21,8 @@ from google.cloud import spanner
from google.cloud.spanner_v1 import param_types
# Change these to reflect your Spanner instance install
INSTANCE = "spanner-test"
DB = "sync_stage"
INSTANCE_ID = os.environ.get("INSTANCE_ID", "spanner-test")
DATABASE_ID = os.environ.get("DATABASE_ID", "sync_stage")
# max batch size for this write is 2000, otherwise we run into:
"""google.api_core.exceptions.InvalidArgument: 400 The transaction
@ -180,7 +181,7 @@ def loader():
fxa_kid = "{:013d}-{}".format(22, fxa_uid)
name = threading.current_thread().getName()
print("{} -> Loading {} {}".format(name, fxa_uid, fxa_kid))
load(INSTANCE, DB, fxa_uid, fxa_kid, COLL_ID)
load(INSTANCE_ID, DATABASE_ID, fxa_uid, fxa_kid, COLL_ID)
def main():