refactor: remove user migration utils (#1710)
Some checks failed
Glean probe-scraper / glean-probe-scraper (push) Has been cancelled

This commit is contained in:
Taddes 2025-06-23 16:05:57 -04:00 committed by GitHub
parent 1dc421474d
commit f01c21fef4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 0 additions and 2396 deletions

View File

@ -4,8 +4,6 @@ See each directory for details:
* [hawk](hawk) - a tool for generating test HAWK authorization headers * [hawk](hawk) - a tool for generating test HAWK authorization headers
* [spanner](spanner) - Google Cloud Platform Spanner tools for maintenance and testing * [spanner](spanner) - Google Cloud Platform Spanner tools for maintenance and testing
* [user_migration](user_migration) - scripts for dumping and moving user data from SQL to Spanner
## Installation ## Installation
These tools are mostly written in python. It is recommended that you create a commonly shared virtual environment using something like: These tools are mostly written in python. It is recommended that you create a commonly shared virtual environment using something like:

View File

@ -1,100 +0,0 @@
# User Migration Script
This is a workspace for testing user migration from the old databases
to the new durable one.
There are several candidate scripts that you can use.
These progress off of each other in order to provide cached results.
There are a few base files you'll want to declare:
* *dsns* - a file containing the mysql and spanner DSNs for the users.
Each DSN should be on a single line. Currently only one DSN of a
given type is permitted.
(e.g.)
```text
mysql://test:test@localhost/syncstorage
spanner://projects/sync-spanner-dev-225401/instances/spanner-test/databases/sync_schema3
```
* *users.csv* - a mysql dump of the token database. This file is only needed if the `--deanon` de-anonymization flag is set. By default, data is anononymized to prevent accidental movement.
You can produce this file from the following:
```bash
mysql -e "select uid, email, generation, keys_changed_at, \
client_state from users;" > users.csv`
```
The script will automatically skip the title row, and presumes that fields are tab separated.
With those files you can now run:
```bash
gen_fxa_users.py
```
which will take the `users.csv` raw data and generate a
`fxa_users_{date}.lst` file.
```bash
gen_bso_users.py --bso_num #
```
which will automatically read in the `fxa_users_{date}.lst` file,
connect to the mysql database, and geneate a list of sorted users
taken from the `bso#` table. This will create the
`bso_users_{bso_num}_{date}.lst` file
and finally:
```bash
GOOGLE_APPLICATION_CREDENTIALS=credentials.json migrate_node.py \
[--start_bso=0] \
[--end_bso=19] \
[--user_percent 1:100]
```
Which will read the `bso_users_#_{date}.lst` files and move the users
based on `--user_percent`
More importantly `--help` is your friend. feel free to use liberally.
## installation
```bash
virtualenv venv && venv/bin/pip install -r requirements.txt
```
## running
Since you will be connecting to the GCP Spanner API, you will need to have set the `GOOGLE_APPLICATION_CREDENTIALS` env var before running these scripts. This environment variable should point to the exported Google Credentials acquired from the GCP console.
The scripts will take the following actions:
1. fetch all users from a given node.
1. compare and port all user_collections over (NOTE: this may involve remapping collecitonid values.)
1. begin copying over user information from mysql to spanner.
Overall performance may be improved by "batching" BSOs to different
processes using:
`--start_bso` the BSO database (defaults to 0, inclusive) to begin
copying from
`--end_bso` the final BSO database (defaults to 19, inclusive) to copy
from.
Note that these are inclusive values. So to split between two
processes, you would want to use
```bash
migrate_node.py --start_bso=0 --end_bso=9 &
migrate_node.py --start_bso=10 --end_bso=19 &
```
(As short hand for this case, you could also do:
```
migrate_node.py --end_bso=9 &
migrate_node.py --start_bso=10 &
```
and let the defaults handle the rest.)

View File

@ -1,15 +0,0 @@
INSERT IGNORE INTO weave0.collections (name, collectionid) VALUES
("clients", 1),
("crypto", 2),
("forms", 3),
("history", 4),
("keys", 5),
("meta", 6),
("bookmarks", 7),
("prefs", 8),
("tabs", 9),
("passwords", 10),
("addons", 11),
("addresses", 12),
("creditcards", 13),
("reserved", 99);

View File

@ -1,290 +0,0 @@
#! venv/bin/python
#
import argparse
import logging
import threading
import csv
import sys
import os
from datetime import datetime
from mysql import connector
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
def tick(count):
mark = None
if count % 1000 == 0:
mark = "|"
elif count % 100 == 0:
mark = "."
level = logging.getLogger().getEffectiveLevel()
if mark and level > logging.DEBUG:
print(mark, end='', flush=True)
class Report:
bso = "init"
_failure = None
_success = None
def __init__(self, args, lock=None):
self._success_file = args.success_file
self._failure_file = args.failure_file
self._lock = lock
def success(self, uid):
if self._lock:
lock = self._lock.acquire()
if not self._success:
self._success = open(self._success_file, "w")
self._success.write("{}\t{}\n".format(self.bso, uid))
def fail(self, uid, reason=None):
if self._lock:
lock = self._lock.acquire()
if not self._failure:
self._failure = open(self._failure_file, "w")
logging.debug("Skipping user {}".format(uid))
self._failure.write("{}\t{}\t{}\n".format(self.bso, uid, reason or ""))
def close(self):
self._success.close()
self._failure.close()
class BSO_Users:
"""User information from Tokenserver database.
Can be constructed from
``mysql -e "select uid, email, generation, keys_changed_at, \
client_state from users;" > users.csv`
"""
users = {}
anon = False
def __init__(self, args, report, dsn):
self.args = args
self.dsn = dsn
self.report = report
self.get_users(args)
def get_users(self, args):
try:
logging.info("Reading fxa_user data.")
with open(args.fxa_users_file) as csv_file:
line = 0
for (uid, fxa_uid, fxa_kid) in csv.reader(
csv_file, delimiter="\t"
):
if uid == "uid":
continue
tick(line)
logging.debug("Read: {} {}:{}".format(
uid, fxa_uid, fxa_kid))
self.users[int(uid)] = (fxa_uid, fxa_kid)
line += 1
print("")
except Exception as ex:
logging.error(
"Unexpected error",
exc_info=ex
)
self.report.fail(uid, "Unexpected error {}".format(ex))
def run(self, bso_num):
connection = self.conf_mysql(self.dsn)
out_users = []
bso_file = self.args.output_file
bso_file = bso_file.replace("#", str(bso_num))
logging.info("Fetching users from BSO db into {}".format(
bso_file,
))
output_file = open(bso_file, "w")
try:
cursor = connection.cursor()
sql = ("""select userid, count(*) as count from bso{}"""
""" group by userid order by userid""".format(
bso_num))
if self.args.user_range:
(offset, limit) = self.args.user_range.split(':')
sql = "{} limit {} offset {}".format(
sql, limit, offset)
cursor.execute(sql)
for (uid, count) in cursor:
try:
(fxa_uid, fxa_kid) = self.users.get(uid)
if self.args.hoard_limit and count > self.args.hoard_limit:
logging.warn(
"User {} => {}:{} has too "
"many items: {} ".format(
uid, fxa_uid, fxa_kid, count
)
)
self.report.fail(uid, "hoarder {}".format(count))
continue
out_users.append((uid, fxa_uid, fxa_kid))
except TypeError:
self.report.fail(uid, "not found")
logging.error(
("User {} not found in "
"tokenserver data".format(uid)))
if self.args.sort_users:
logging.info("Sorting users...")
out_users.sort(key=lambda tup: tup[1])
# Take a block of percentage of the users.
logging.info("Writing out {} users".format(len(out_users)))
line = 0
output_file.write("uid\tfxa_uid\tfxa_kid\n")
for (uid, fxa_uid, fxa_kid) in out_users:
output_file.write("{}\t{}\t{}\n".format(
uid, fxa_uid, fxa_kid
))
tick(line)
line += 1
output_file.flush()
print("")
except connector.errors.ProgrammingError as ex:
logging.error(ex)
output_file.close()
os.unlink(bso_file)
except Exception as e:
logging.error("### Exception {}:{}", exc_info=e)
output_file.close()
os.unlink(bso_file)
finally:
cursor.close()
def conf_mysql(self, dsn):
"""create a connection to the original storage system """
logging.debug("Configuring MYSQL: {}".format(dsn))
return connector.connect(
user=dsn.username,
password=dsn.password,
host=dsn.hostname,
port=dsn.port or 3306,
database=dsn.path[1:]
)
def get_args():
pid = os.getpid()
parser = argparse.ArgumentParser(
description="Generate BSO user list")
parser.add_argument(
'--dsns', default="move_dsns.lst",
help="file of new line separated DSNs")
parser.add_argument(
'--start_bso',
default=0,
help="Start of BSO range (default 0)"
)
parser.add_argument(
'--end_bso',
default=19,
help="End of BSO range inclusive (default 19)"
)
parser.add_argument(
'--bso_num',
type=int,
default=0,
help="Only read from this bso (default num)"
)
parser.add_argument(
'--output_file',
default="bso_users_#_{}.lst".format(
datetime.now().strftime("%Y_%m_%d")),
help="List of BSO users."
)
parser.add_argument(
'--verbose',
action="store_true",
help="verbose logging"
)
parser.add_argument(
'--quiet',
action="store_true",
help="silence logging"
)
parser.add_argument(
'--user_range',
help="Range of users to extract (offset:limit)"
)
parser.add_argument(
'--hoard_limit', type=int, default=0,
help="reject any user with more than this count of records"
)
parser.add_argument(
'--sort_users', action="store_true",
help="Sort the user"
)
parser.add_argument(
'--success_file', default="success_bso_user.log".format(pid),
help="File of successfully migrated userids"
)
parser.add_argument(
'--failure_file', default="failure_bso_user.log".format(pid),
help="File of unsuccessfully migrated userids"
)
parser.add_argument(
'--fxa_users_file',
default="fxa_users_{}.lst".format(datetime.now().strftime("%Y_%m_%d")),
help="List of pre-generated FxA users."
)
parser.add_argument(
'--threading',
action="store_true",
help="use threading"
)
return parser.parse_args()
def main():
threads = []
args = get_args()
log_level = logging.INFO
if args.quiet:
log_level = logging.ERROR
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(
stream=sys.stdout,
level=log_level,
)
if args.bso_num is not None:
args.start_bso = args.end_bso = args.bso_num
locker = None
if args.threading:
locker = threading.Lock()
report = Report(args, locker)
dsns = open(args.dsns).readlines()
db_dsn = None
for line in dsns:
dsn = urlparse(line.strip())
if 'mysql' in dsn.scheme:
db_dsn = dsn
if not db_dsn:
RuntimeError("mysql dsn must be specified")
bso = BSO_Users(args, report, db_dsn)
# threading is currently in process.
if args.threading:
for bso_num in range(int(args.start_bso), int(args.end_bso) + 1):
t = threading.Thread(target=bso.run, args=(bso_num,))
threads.append(t)
t.start()
else:
bso.run(args.bso_num)
for thread in threads:
thread.join()
if __name__ == "__main__":
main()

View File

@ -1,203 +0,0 @@
#! venv/bin/python
#
import argparse
import logging
import base64
import binascii
import csv
import sys
import os
from datetime import datetime
def tick(count):
mark = None
if count % 1000 == 0:
mark = "|"
elif count % 100 == 0:
mark = "."
level = logging.getLogger().getEffectiveLevel()
if mark and level > logging.DEBUG:
print(mark, end='', flush=True)
class Report:
bso = "init"
_success = None
_failure = None
def __init__(self, args):
self._success_file = args.success_file
self._failure_file = args.failure_file
def success(self, uid):
if not self._success:
self._success = open(self._success_file, "w")
self._success.write("{}\t{}\n".format(self.bso, uid))
def fail(self, uid, reason=None):
if not self._failure:
self._failure = open(self._failure_file, "w")
logging.debug("Skipping user {}".format(uid))
self._failure.write("{}\t{}\t{}\n".format(self.bso, uid, reason or ""))
def close(self):
self._success.close()
self._failure.close()
class FxA_Generate:
"""User information from Tokenserver database.
Can be constructed from
``mysql -e "select uid, email, generation, keys_changed_at, \
client_state from users;" > users.csv`
"""
users = []
anon = False
def __init__(self, args, report):
logging.info("Processing token file: {} into {}".format(
args.users_file,
args.output_file,
))
output_file = open(args.output_file, "w")
output_file.write("uid\tfxa_uid\tfxa_kid\n")
if not os.path.isfile(args.users_file):
raise IOError("{} not found".format(args.users_file))
with open(args.users_file) as csv_file:
try:
line = 0
success = 0
for (uid, email, generation,
keys_changed_at, client_state) in csv.reader(
csv_file, delimiter="\t"):
line += 1
if uid == 'uid':
# skip the header row.
continue
tick(line)
try:
fxa_uid = email.split('@')[0]
try:
keys_changed_at = int(keys_changed_at)
except ValueError:
keys_changed_at = 0
try:
generation = int(generation)
except ValueError:
generation = 0
if (keys_changed_at or generation) == 0:
logging.warn(
"user {} has no k_c_a or "
"generation value".format(
uid))
# trap for actually blank values
if client_state is None or client_state == '':
logging.error(
"User {} "
"has an invalid, empty client state".format(
uid
)
)
report.fail(uid, "invalid client state")
continue
try:
client_state = binascii.unhexlify(client_state)
except binascii.Error:
logging.error(
"User {} has "
"invalid client state: {}".format(
uid, client_state
))
report.fail(uid, "bad client state")
continue
fxa_kid = self.format_key_id(
int(keys_changed_at or generation),
client_state
)
logging.debug("Adding user {} => {} , {}".format(
uid, fxa_uid, fxa_kid
))
output_file.write(
"{}\t{}\t{}\n".format(
uid, fxa_uid, fxa_kid))
success += 1
except Exception as ex:
logging.error(
"User {} Unexpected error".format(uid),
exc_info=ex)
report.fail(uid, "unexpected error")
except Exception as ex:
logging.critical("Error in fxa file around line {}".format(
line), exc_info=ex)
print("")
logging.info("Processed {} users, {} successful".format(line, success))
# The following two functions are taken from browserid.utils
def encode_bytes_b64(self, value):
return base64.urlsafe_b64encode(value).rstrip(b'=').decode('ascii')
def format_key_id(self, keys_changed_at, key_hash):
return "{:013d}-{}".format(
keys_changed_at,
self.encode_bytes_b64(key_hash),
)
def get_args():
pid = os.getpid()
parser = argparse.ArgumentParser(
description="Generate FxA user id info")
parser.add_argument(
'--users_file',
default="users.csv",
help="FXA User info in CSV format (default users.csv)"
)
parser.add_argument(
'--output_file',
default="fxa_users_{}.lst".format(datetime.now().strftime("%Y_%m_%d")),
help="List of FxA users."
)
parser.add_argument(
'--verbose',
action="store_true",
help="verbose logging"
)
parser.add_argument(
'--quiet',
action="store_true",
help="silence logging"
)
parser.add_argument(
'--success_file', default="success_fxa_user.log".format(pid),
help="File of successfully migrated userids"
)
parser.add_argument(
'--failure_file', default="failure_fxa_user.log".format(pid),
help="File of unsuccessfully migrated userids"
)
return parser.parse_args()
def main():
args = get_args()
log_level = logging.INFO
if args.quiet:
log_level = logging.ERROR
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(
stream=sys.stdout,
level=log_level,
)
report = Report(args)
FxA_Generate(args, report)
if __name__ == "__main__":
main()

View File

@ -1,827 +0,0 @@
#! venv/bin/python
# painfully stupid script to check out dumping mysql databases to avro.
# Avro is basically "JSON" for databases. It's not super complicated & it has
# issues (one of which is that it requires Python2).
#
#
import argparse
import logging
import base64
import binascii
import csv
import sys
import math
import json
import os
import time
from datetime import datetime
import grpc
from mysql import connector
from google.cloud import spanner
from google.cloud.spanner_v1 import param_types
from google.api_core.exceptions import AlreadyExists, InvalidArgument
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
META_GLOBAL_COLLECTION_NAME = "meta"
MAX_ROWS = 1500000
class BadDSNException(Exception):
pass
def tick(count):
mark = None
if not count % 100:
mark = "."
if not count % 1000:
mark = "|"
level = logging.getLogger().getEffectiveLevel()
if mark and level > logging.DEBUG:
print(mark, end='', flush=True)
class Report:
bso = "init"
_success = None
_failure = None
def __init__(self, args):
self._success_file = args.success_file
self._failure_file = args.failure_file
def success(self, uid):
if not self._success:
self._success = open(self._success_file, "w")
self._success.write("{}\t{}\n".format(self.bso, uid))
def fail(self, uid, reason=None):
if not self._failure:
self._failure = open(self._failure_file, "w")
logging.debug("Skipping user {}".format(uid))
self._failure.write("{}\t{}\t{}\n".format(self.bso, uid, reason or ""))
def close(self):
self._success.close()
self._failure.close()
def read_failure(self, input):
start = 19
end = 0
users = []
for line in open(input).readlines():
line = line.strip()
if line[0] in "#;/":
continue
(bso, user, reason) = line.split("\t")
start = min(start, int(bso))
end = max(end, int(bso))
users.append(user)
return (int(start), int(end), users)
class FXA_info:
"""User information from Tokenserver database.
Can be constructed from
``mysql -e "select uid, email, generation, keys_changed_at, \
client_state from users;" > users.csv`
"""
users = {}
anon = False
def __init__(self, users_file, args, report):
if args.anon:
self.anon = True
return
logging.info("Reading users file: {}".format(users_file))
if not os.path.isfile(users_file):
raise IOError("{} not found".format(users_file))
with open(users_file) as csv_file:
try:
line = 0
for (uid, fxa_uid, fxa_kid) in csv.reader(
csv_file, delimiter="\t"):
line += 1
tick(line)
if uid == 'uid':
# skip the header row.
continue
if args.user:
if int(uid) not in args.user:
continue
try:
self.users[int(uid)] = (fxa_kid, fxa_uid)
except Exception as ex:
logging.error(
"User {} Unexpected error".format(uid),
exc_info=ex)
report.fail(uid, "unexpected error")
except Exception as ex:
logging.critical("Error in fxa file around line {}".format(
line), exc_info=ex)
def get(self, userid):
if userid in self.users:
return self.users[userid]
if self.anon:
fxa_uid = "fake_" + binascii.hexlify(
os.urandom(11)).decode('utf-8')
fxa_kid = "fake_" + binascii.hexlify(
os.urandom(11)).decode('utf-8')
self.users[userid] = (fxa_kid, fxa_uid)
return (fxa_kid, fxa_uid)
class Collections:
"""Cache spanner collection list.
The spanner collection list is the (soon to be) single source of
truth regarding collection ids.
"""
_by_name = {
"clients": 1,
"crypto": 2,
"forms": 3,
"history": 4,
"keys": 5,
"meta": 6,
"bookmarks": 7,
"prefs": 8,
"tabs": 9,
"passwords": 10,
"addons": 11,
"addresses": 12,
"creditcards": 13,
"reserved": 100,
}
spanner = None
def __init__(self, databases):
"""merge the mysql user_collections into spanner"""
sql = """
SELECT
DISTINCT uc.collection, cc.name
FROM
user_collections as uc,
collections as cc
WHERE
uc.collection = cc.collectionid
ORDER BY
uc.collection
"""
cursor = databases['mysql'].cursor()
def transact(transaction, values):
transaction.insert(
'collections',
columns=('collection_id', 'name'),
values=values)
self.spanner = databases['spanner']
try:
# fetch existing:
with self.spanner.snapshot() as scursor:
rows = scursor.execute_sql(
"select collection_id, name from collections")
for (collection_id, name) in rows:
logging.debug("Loading collection: {} => {}".format(
name, collection_id
))
self._by_name[name] = collection_id
cursor.execute(sql)
for (collection_id, name) in cursor:
if name not in self._by_name:
logging.debug("Adding collection: {} => {}".format(
name, collection_id
))
values = [(collection_id, name)]
self._by_name[name] = collection_id
# Since a collection may collide, do these one at a time.
try:
self.spanner.run_in_transaction(transact, values)
except AlreadyExists:
logging.info(
"Skipping already present collection {}".format(
values
))
pass
finally:
cursor.close()
def get(self, name, collection_id=None):
"""Fetches the collection_id"""
id = self._by_name.get(name)
if id is None:
logging.warn(
"Unknown collection {}:{} encountered!".format(
name, collection_id))
# it would be swell to add these to the collection table,
# but that would mean
# an imbedded spanner transaction, and that's not allowed.
return None
return id
def conf_mysql(dsn):
"""create a connection to the original storage system """
logging.debug("Configuring MYSQL: {}".format(dsn))
connection = connector.connect(
user=dsn.username,
password=dsn.password,
host=dsn.hostname,
port=dsn.port or 3306,
database=dsn.path[1:],
auth_plugin="mysql_native_password"
)
return connection
def conf_spanner(dsn):
"""create a connection to the new Spanner system"""
logging.debug("Configuring SPANNER: {}".format(dsn))
path = dsn.path.split("/")
instance_id = path[-3]
database_id = path[-1]
client = spanner.Client()
instance = client.instance(instance_id)
database = instance.database(database_id)
return database
def conf_db(dsn):
"""read the list of storage definitions from the file and create
a set of connetions.
"""
if "mysql" in dsn.scheme:
return conf_mysql(dsn)
if "spanner" in dsn.scheme:
return conf_spanner(dsn)
raise RuntimeError("Unknown DSN type: {}".format(dsn.scheme))
def dumper(columns, values):
"""verbose column and data dumper. """
result = ""
for row in values:
for i in range(0, len(columns)):
result += " {} => {}\n".format(columns[i], row[i])
return result
def newSyncID():
base64.urlsafe_b64encode(os.urandom(9))
def alter_syncids(pay):
"""Alter the syncIDs for the meta/global record, which will cause a sync
when the client reconnects
"""
payload = json.loads(pay)
payload['syncID'] = newSyncID()
for item in payload['engines']:
payload['engines'][item]['syncID'] = newSyncID()
return json.dumps(payload)
def divvy(biglist, count):
"""Partition a list into a set of equally sized slices"""
lists = []
biglen = len(biglist)
start = 0
while start < biglen:
lists.append(biglist[start:min(start+count, biglen)])
start += count
return lists
def move_user(databases, user_data, collections, fxa, bso_num, args, report):
"""copy user info from original storage to new storage."""
# bso column mapping:
# id => bso_id
# collection => collection_id
# sortindex => sortindex
# modified => modified
# payload => payload
# payload_size => NONE
# ttl => expiry
uc_columns = (
'fxa_uid',
'fxa_kid',
'collection_id',
'modified',
)
bso_columns = (
'collection_id',
'fxa_uid',
'fxa_kid',
'bso_id',
'expiry',
'modified',
'payload',
'sortindex',
)
(uid, fxa_uid, fxa_kid) = user_data
# Fetch the BSO data from the original storage.
sql = """
SELECT
collections.name, bso.collection, uc.last_modified,
bso.id, bso.ttl, bso.modified, bso.payload, bso.sortindex
FROM
bso{} as bso,
collections,
user_collections as uc
WHERE
bso.userid = %s
and collections.collectionid = bso.collection
and uc.collection = bso.collection
and uc.userid = bso.userid
and bso.ttl > unix_timestamp()
ORDER BY
bso.collection, bso.id""".format(bso_num)
unique_key_filter = set()
def spanner_transact_wipe_user(
transaction, fxa_uid, fxa_kid, args):
result = transaction.execute_sql(
"""
SELECT
uc.collection_id, c.name
FROM
user_collections as uc
LEFT JOIN
collections as c
ON
uc.collection_id = c.collection_id
WHERE
uc.fxa_uid = @fxa_uid
AND uc.fxa_kid = @fxa_kid
""",
params=dict(fxa_uid=fxa_uid, fxa_kid=fxa_kid),
param_types=dict(fxa_uid=param_types.STRING, fxa_kid=param_types.STRING),
)
cols = [(row[0], row[1]) for row in result]
if not args.dryrun:
logging.debug("Wiping user, collections: {}".format(cols))
transaction.execute_update(
"""
DELETE FROM
user_collections
WHERE
fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
""",
params=dict(fxa_uid=fxa_uid, fxa_kid=fxa_kid),
param_types=dict(fxa_uid=param_types.STRING, fxa_kid=param_types.STRING),
)
else:
logging.debug("Not wiping user, collections: {}".format(cols))
def spanner_transact_uc(
transaction, data, fxa_uid, fxa_kid, args):
# user collections require a unique key.
for (col, cid, cmod, bid, exp, bmod, pay, sid) in data:
collection_id = collections.get(col, cid)
if collection_id is None:
continue
# columns from sync_schema3
# user_collections modified should come directly from
# mysql user_collections.last_modified
mod_v = datetime.utcfromtimestamp(cmod/1000.0)
# User_Collection can only have unique values. Filter
# non-unique keys and take the most recent modified
# time. The join could be anything.
uc_key = "{}_{}_{}".format(fxa_uid, fxa_kid, col)
if uc_key not in unique_key_filter:
uc_values = [(
fxa_uid,
fxa_kid,
collection_id,
mod_v,
)]
if not args.dryrun:
transaction.insert(
'user_collections',
columns=uc_columns,
values=uc_values
)
else:
logging.debug("not writing {} => {}".format(
uc_columns, uc_values))
unique_key_filter.add(uc_key)
def spanner_transact_bso(transaction, data, fxa_uid, fxa_kid, args):
count = 0
bso_values = []
for (col, cid, cmod, bid, exp, bmod, pay, sid) in data:
collection_id = collections.get(col, cid)
if collection_id is None:
continue
if collection_id != cid:
logging.debug(
"Remapping collection '{}' from {} to {}".format(
col, cid, collection_id))
# columns from sync_schema3
mod_v = datetime.utcfromtimestamp(bmod/1000.0)
exp_v = datetime.utcfromtimestamp(exp)
# add the BSO values.
if args.full and col == META_GLOBAL_COLLECTION_NAME:
pay = alter_syncids(pay)
bso_values.append([
collection_id,
fxa_uid,
fxa_kid,
bid,
exp_v,
mod_v,
pay,
sid,
])
count += 1
if not args.dryrun:
logging.debug(
"###bso{} {}".format(
bso_num,
dumper(bso_columns, bso_values)
)
)
for i in range(0, 5):
try:
transaction.insert(
'bsos',
columns=bso_columns,
values=bso_values
)
break
except grpc._channel_._InactiveRpcError as ex:
logging.warn(
"Could not write record (attempt {})".format(i),
exc_info=ex)
time.sleep(.5)
else:
logging.debug("not writing {} => {}".format(
bso_columns, bso_values))
return count
cursor = databases['mysql'].cursor()
count = 0
try:
# Note: cursor() does not support __enter__()
logging.info("Processing... {} -> {}:{}".format(
uid, fxa_uid, fxa_kid))
cursor.execute(sql, (uid,))
data = []
abort_col = None
abort_count = None
col_count = 0
if args.abort:
(abort_col, abort_count) = args.abort.split(":")
abort_count = int(abort_count)
for row in cursor:
logging.debug("col: {}".format(row[0]))
if abort_col and int(row[1]) == int(abort_col):
col_count += 1
if col_count > abort_count:
logging.debug("Skipping col: {}: {} of {}".format(
row[0], col_count, abort_count))
continue
data.append(row)
if args.abort:
logging.info("Skipped {} of {} rows for {}".format(
abort_count, col_count, abort_col
))
logging.info(
"Moving {} items for user {} => {}:{}".format(
len(data), uid, fxa_uid, fxa_kid))
if args.wipe_user:
databases['spanner'].run_in_transaction(
spanner_transact_wipe_user,
fxa_uid,
fxa_kid,
args,
)
for bunch in divvy(data, args.chunk or 1000):
# Occasionally, there is a batch fail because a
# user collection is not found before a bso is written.
# to solve that, divide the UC updates from the
# BSO updates.
# Run through the list of UserCollection updates
databases['spanner'].run_in_transaction(
spanner_transact_uc,
bunch,
fxa_uid,
fxa_kid,
args,
)
count += databases['spanner'].run_in_transaction(
spanner_transact_bso,
bunch,
fxa_uid,
fxa_kid,
args,
)
if args.ms_delay > 0:
logging.debug(
"Sleeping for {} seconds".format(args.ms_delay * .01))
time.sleep(args.ms_delay * .01)
except AlreadyExists:
logging.warn(
"User {} already imported fxa_uid:{} / fxa_kid:{}".format(
uid, fxa_uid, fxa_kid
))
report.fail(uid, "exists")
return count
except InvalidArgument as ex:
report.fail(uid, "exists")
if "already inserted" in ex.args[0]:
logging.warn(
"User {} already imported fxa_uid:{} / fxa_kid:{}".format(
uid, fxa_uid, fxa_kid
))
return count
else:
raise
except Exception as ex:
report.fail(uid, "unexpected batch error")
logging.error("Unexpected Batch failure: {}:{}".format(
fxa_uid, fxa_kid), exc_info=ex)
finally:
# cursor may complain about unread data, this should prevent
# that warning.
for result in cursor:
pass
cursor.close()
report.success(uid)
return count
def get_percentage_users(users, user_percent):
(block, percentage) = map(
int, user_percent.split(':'))
total_count = len(users)
chunk_size = max(
1, math.floor(
total_count * (int(percentage) * .01)))
chunk_count = math.ceil(total_count / chunk_size)
chunk_start = max(block - 1, 0) * chunk_size
chunk_end = min(chunk_count, block) * chunk_size
if chunk_size * chunk_count > total_count:
if block >= chunk_count - 1:
chunk_end = total_count
users = users[chunk_start:chunk_end]
logging.debug(
"moving users: {} to {}".format(
chunk_start, chunk_end))
return users
def get_users(args, databases, fxa, bso_num, report):
"""Fetch the user information from the Tokenserver Dump """
users = []
try:
if args.user:
for uid in args.user:
try:
(fxa_kid, fxa_uid) = fxa.get(uid)
users.append((uid, fxa_uid, fxa_kid))
except TypeError:
logging.error(
"User {} not found in "
"tokenserver data.".format(uid))
report.fail(uid, "not found")
else:
try:
bso_users_file = args.bso_users_file.replace('#', str(bso_num))
with open(bso_users_file) as bso_file:
line = 0
for row in csv.reader(
bso_file, delimiter="\t"
):
if row[0] == "uid":
continue
users.append(row)
tick(line)
line += 1
except Exception as ex:
logging.critical("Error reading BSO data", exc_info=ex)
exit(-1)
if args.user_percent:
users = get_percentage_users(users, args.user_percent)
except Exception as ex:
logging.critical("Unexpected Error moving database:", exc_info=ex)
exit(-1)
return users
def move_database(databases, collections, bso_num, fxa, args, report):
"""iterate over provided users and move their data from old to new"""
start = time.time()
# off chance that someone else might have written
# a new collection table since the last time we
# fetched.
rows = 0
users = get_users(args, databases, fxa, bso_num, report)
logging.info("Moving {} users".format(len(users)))
for user in users:
rows += move_user(
databases=databases,
user_data=user,
collections=collections,
fxa=fxa,
bso_num=bso_num,
args=args,
report=report)
logging.info("Finished BSO #{} ({} rows) in {} seconds".format(
bso_num,
rows,
math.ceil(time.time() - start)
))
return rows
def get_args():
pid = os.getpid()
today = datetime.now().strftime("%Y_%m_%d")
parser = argparse.ArgumentParser(
description="move user from sql to spanner")
parser.add_argument(
'--dsns', default="move_dsns.lst",
help="file of new line separated DSNs")
parser.add_argument(
'--verbose',
action="store_true",
help="verbose logging"
)
parser.add_argument(
'--quiet',
action="store_true",
help="silence logging"
)
parser.add_argument(
'--offset', type=int, default=0,
help="UID to start at (default 0)")
parser.add_argument(
"--full",
action="store_true",
help="force a full reconcile"
)
parser.add_argument(
'--anon', action='store_true',
help="Anonymize the user data"
)
parser.add_argument(
'--start_bso', default=0,
type=int,
help="start dumping BSO database (default: 0)"
)
parser.add_argument(
'--end_bso',
type=int, default=19,
help="last BSO database to dump (default: 19)"
)
parser.add_argument(
'--bso_num',
type=int,
help="only move this bso (equivalent to start_bso == end_bso)"
)
parser.add_argument(
'--write_chunk',
dest="chunk",
default=1666,
help="how many rows per transaction for spanner (default: 1666)"
)
parser.add_argument(
'--user',
type=str,
help="BSO#:userId[,userid,...] to move."
)
parser.add_argument(
'--retry_file',
type=str,
help="Copy of failure file to read user IDs to retry."
)
parser.add_argument(
'--wipe_user',
action="store_true",
help="delete any pre-existing --user data on spanner before the migration"
)
parser.add_argument(
'--bso_users_file',
default="bso_users_#_{}.lst".format(today),
help="name of the generated BSO user file. "
"(Will use bso number for `#` if present; "
"default: bso_users_#_{}.lst)".format(today),
)
parser.add_argument(
'--fxa_users_file',
default="fxa_users_{}.lst".format(today),
help="List of pre-generated FxA users. Only needed if specifying"
" the `--user` option; default: fxa_users_{}.lst)".format(today)
)
parser.add_argument(
'--dryrun',
action="store_true",
help="Do not write user records to spanner"
)
parser.add_argument(
'--abort',
type=str,
help="abort data in col after #rows (e.g. history:10)"
)
parser.add_argument(
"--user_percent", default="1:100",
help=("Offset and percent of users from this BSO"
"to move (e.g. 2:50 moves the second 50%%) "
"(default 1:100)")
)
parser.add_argument(
'--ms_delay', type=int, default=0,
help="inject a sleep between writes to spanner as a throttle"
)
parser.add_argument(
'--success_file', default="success_{}.log".format(pid),
help="File of successfully migrated userids"
)
parser.add_argument(
'--failure_file', default="failure_{}.log".format(pid),
help="File of unsuccessfully migrated userids"
)
return parser.parse_args()
def main():
args = get_args()
log_level = logging.INFO
if args.quiet:
log_level = logging.ERROR
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(
stream=sys.stdout,
level=log_level,
)
report = Report(args)
dsns = open(args.dsns).readlines()
databases = {}
rows = 0
if args.user:
args.user_percent = "1:100"
(bso, userid) = args.user.split(':')
args.start_bso = int(bso)
args.end_bso = int(bso)
user_list = []
for id in userid.split(','):
user_list.append(int(id))
args.user = user_list
elif args.wipe_user:
raise RuntimeError("--wipe_user requires --user")
if args.retry_file:
(args.start_bso, args.end_bso, args.user) = report.read_failure(
args.retry_file)
if args.bso_num is not None:
args.start_bso = args.end_bso = args.bso_num
for line in dsns:
dsn = urlparse(line.strip())
scheme = dsn.scheme
if 'mysql' in dsn.scheme:
scheme = 'mysql'
databases[scheme] = conf_db(dsn)
if not databases.get('mysql') or not databases.get('spanner'):
raise RuntimeError("Both mysql and spanner dsns must be specified")
fxa_info = FXA_info(args.fxa_users_file, args, report)
collections = Collections(databases)
logging.info("Starting:")
if args.dryrun:
logging.info("=== DRY RUN MODE ===")
start = time.time()
for bso_num in range(args.start_bso, args.end_bso+1):
logging.info("Moving users in bso # {}".format(bso_num))
report.bso = bso_num
rows += move_database(
databases, collections, bso_num, fxa_info, args, report)
logging.info(
"Moved: {} rows in {} seconds".format(
rows or 0, time.time() - start))
if __name__ == "__main__":
main()

View File

@ -1,111 +0,0 @@
#! venv/bin/python
# painfully stupid script to check out dumping a spanner database to avro.
# Avro is basically "JSON" for databases. It's not super complicated & it has
# issues (one of which is that it requires Python2).
# test run Dumped 2770783 rows in 457.566066027 seconds and produced a
# roughly 6.5GB file.
#
# Spanner also has a Deadline issue where it will kill a db connection after
# so many minutes (5?). Might be better to just divvy things up into clusters
# and have threads handle transporting records over.
#
import avro.schema
import argparse
import time
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from google.cloud import spanner
def get_args():
parser = argparse.ArgumentParser(description="dump spanner to arvo files")
parser.add_argument(
'--instance_id', default="spanner-test",
help="Spanner instance name")
parser.add_argument(
'--database_id', default="sync_schema3",
help="Spanner database name")
parser.add_argument(
'--schema', default="sync.avsc",
help="Database schema description")
parser.add_argument(
'--output', default="output.avso",
help="Output file")
parser.add_argument(
'--limit', type=int, default=1500000,
help="Limit to n rows")
return parser.parse_args()
def conf_spanner(args):
spanner_client = spanner.Client()
instance = spanner_client.instance(args.instance_id)
database = instance.database(args.database_id)
return database
def dump_rows(offset, db, writer, args):
print("Querying.... @{}".format(offset))
sql = """
SELECT collection_id, fxa_kid, fxa_uid, bso_id,
UNIX_MICROS(expiry), UNIX_MICROS(modified), payload,
sortindex from bsos LIMIT {} OFFSET {}""".format(args.limit, offset)
try:
with db.snapshot() as snapshot:
result = snapshot.execute_sql(sql)
print("Dumping...")
for row in result:
writer.append({
"collection_id": row[0],
"fxa_kid": row[1],
"fxa_uid": row[2],
"bso_id": row[3],
"expiry": row[4],
"modified": row[5],
"payload": row[6],
"sortindex": row[7]})
offset += 1
if offset % 1000 == 0:
print("Row: {}".format(offset))
return offset
except Exception as ex:
print("Deadline hit at: {} ({})".format(offset, ex))
return offset
def count_rows(db):
with db.snapshot() as snapshot:
result = snapshot.execute_sql("SELECT Count(*) from bsos")
return result.one()[0]
def dump_data(args, schema):
offset = 0
# things time out around 1_500_000 rows.
db = conf_spanner(args)
writer = DataFileWriter(
open(args.output, "wb"), DatumWriter(), schema)
row_count = count_rows(db)
print("Dumping {} rows".format(row_count))
while offset < row_count:
old_offset = offset
offset = dump_rows(offset=offset, db=db, writer=writer, args=args)
if offset == old_offset:
break
writer.close()
return row_count
def main():
start = time.time()
args = get_args()
schema = avro.schema.parse(open(args.schema, "rb").read())
rows = dump_data(args, schema)
print("Dumped: {} rows in {} seconds".format(rows, time.time() - start))
if __name__ == "__main__":
main()

View File

@ -1,312 +0,0 @@
#! venv/bin/python
# This file is historical.
# We're using `migrate_node.py`, however this file may be useful in the future
# if we determine there's a problem with directly transcribing the data from
# mysql to spanner.
#
# painfully stupid script to check out dumping mysql databases to avro.
# Avro is basically "JSON" for databases. It's not super complicated & it has
# issues.
#
import avro.schema
import argparse
import binascii
import csv
import base64
import math
import time
import os
import random
import re
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from mysql import connector
try:
from urllib.parse import urlparse
except:
from urlparse import urlparse
MAX_ROWS=1500000
class BadDSNException(Exception):
pass
def get_args():
parser = argparse.ArgumentParser(description="dump spanner to arvo files")
parser.add_argument(
'--dsns', default="dsns.lst",
help="file of new line separated DSNs")
parser.add_argument(
'--schema', default="sync.avsc",
help="Database schema description")
parser.add_argument(
'--col_schema', default="user_collection.avsc",
help="User Collection schema description"
)
parser.add_argument(
'--output', default="output.avso",
help="Output file")
parser.add_argument(
'--limit', type=int, default=1500000,
help="Limit each read chunk to n rows")
parser.add_argument(
'--offset', type=int, default=0,
help="UID to start at")
parser.add_argument(
'--deanon', action='store_false',
dest='anon',
help="Anonymize the user data"
)
parser.add_argument(
'--start_bso', default=0,
type=int,
help="start dumping BSO database"
)
parser.add_argument(
'--end_bso',
type=int, default=19,
help="last BSO database to dump"
)
parser.add_argument(
'--token_file',
default='users.csv',
help="token user database dump CSV"
)
parser.add_argument(
'--skip_collections', action='store_false',
help="skip user_collections table"
)
return parser.parse_args()
def conf_db(dsn):
dsn = urlparse(dsn)
"""
if dsn.scheme != "mysql":
raise BadDSNException("Invalid MySQL dsn: {}".format(dsn))
"""
connection = connector.connect(
user=dsn.username,
password=dsn.password,
host=dsn.hostname,
port=dsn.port or 3306,
database=dsn.path[1:]
)
return connection
# The following two functions are taken from browserid.utils
def encode_bytes_b64(value):
return base64.urlsafe_b64encode(value).rstrip(b'=').decode('ascii')
def format_key_id(keys_changed_at, key_hash):
return "{:013d}-{}".format(
keys_changed_at,
encode_bytes_b64(key_hash),
)
user_ids = {}
def read_in_token_file(filename):
global user_ids
# you can generate the token file using
# `mysql -e "select uid, email, generation, keys_changed_at, \
# client_state from users;" > users.csv`
#
# future opt: write the transmogrified file to either sqlite3
# or static files.
print("Processing token file...")
with open(filename) as csv_file:
for (uid, email, generation,
keys_changed_at, client_state) in csv.reader(
csv_file, delimiter="\t"):
if uid == 'uid':
# skip the header row.
continue
fxa_uid = email.split('@')[0]
fxa_kid = "{:013d}-{}".format(
int(keys_changed_at or generation),
base64.urlsafe_b64encode(
binascii.unhexlify(client_state)
).rstrip(b'=').decode('ascii'))
user_ids[uid] = (fxa_kid, fxa_uid)
def get_fxa_id(user_id, anon=True):
global user_ids
if user_id in user_ids:
return user_ids[user_id]
if anon:
fxa_uid = binascii.hexlify(
os.urandom(16)).decode('utf-8')
fxa_kid = binascii.hexlify(
os.urandom(16)).decode('utf-8')
user_ids[user_id] = (fxa_kid, fxa_uid)
return (fxa_kid, fxa_uid)
def dump_user_collections(schema, dsn, args):
# userid => fxa_kid
# fxa_uid
# collection => collection_id
# last_modified => modified
db = conf_db(dsn)
cursor = db.cursor()
out_file = args.output.rsplit('.', 1)
out_file_name = "{}_user_collections.{}".format(
out_file[0], out_file[1]
)
writer = DataFileWriter(
open(out_file_name, "wb"), DatumWriter(), schema)
sql = """
SELECT userid, collection, last_modified from user_collections
"""
start = time.time()
try:
cursor.execute(sql)
row = 0
for (user_id, collection_id, last_modified) in cursor:
(fxa_uid, fxa_kid) = get_fxa_id(user_id, args.anon)
try:
writer.append({
"collection_id": collection_id,
"fxa_kid": fxa_kid,
"fxa_uid": fxa_uid,
"modified": last_modified
})
except Exception as ex:
import pdb; pdb.set_trace()
print (ex)
row += 1
print(
"Dumped {} user_collection rows in {} seconds".format(
row, time.time() - start
))
finally:
writer.close()
cursor.close()
def dump_rows(bso_number, chunk_offset, db, writer, args):
# bso column mapping:
# id => bso_id
# collection => collection_id
# sortindex => sortindex
# modified => modified
# payload => payload
# payload_size => NONE
# ttl => expiry
ivre = re.compile(r'("IV": ?"[^"]+")')
print("Querying.... bso{} @{}".format(bso_number, chunk_offset))
sql = """
SELECT userid, collection, id,
ttl, modified, payload,
sortindex from bso{} LIMIT {} OFFSET {}""".format(
bso_number, args.limit, chunk_offset)
cursor = db.cursor()
user = None
row_count = 0
try:
cursor.execute(sql)
print("Dumping...")
for (userid, cid, bid, exp, mod, pay, si) in cursor:
if args.anon:
replacement = encode_bytes_b64(os.urandom(16))
pay = ivre.sub('"IV":"{}"'.format(replacement), pay)
if userid != user:
(fxa_kid, fxa_uid) = get_fxa_id(userid, args.anon)
user = userid
writer.append({
"fxa_uid": fxa_uid,
"fxa_kid": fxa_kid,
"collection_id": cid,
"bso_id": bid,
"expiry": exp,
"modified": mod,
"payload": pay,
"sortindex": si})
row_count += 1
if (chunk_offset + row_count) % 1000 == 0:
print("BSO:{} Row: {}".format(bso_number, chunk_offset + row_count))
if row_count >= MAX_ROWS:
break
except Exception as e:
print("Deadline hit at: {} ({})".format(
chunk_offset + row_count, e))
finally:
cursor.close()
return row_count
def count_rows(db, bso_num=0):
cursor = db.cursor()
try:
cursor.execute("SELECT Count(*) from bso{}".format(bso_num))
return cursor.fetchone()[0]
finally:
cursor.close()
def dump_data(bso_number, schema, dsn, args):
offset = args.offset or 0
total_rows = 0
# things time out around 1_500_000 rows.
db = conf_db(dsn)
out_file = args.output.rsplit('.', 1)
row_count = count_rows(db, bso_number)
for chunk in range(
max(1, math.trunc(math.ceil(row_count / MAX_ROWS)))):
print(
"Dumping {} rows from bso#{} into chunk {}".format(
row_count, bso_number, chunk))
out_file_name = "{}_{}_{}.{}".format(
out_file[0], bso_number, hex(chunk), out_file[1]
)
writer = DataFileWriter(
open(out_file_name, "wb"), DatumWriter(), schema)
rows = dump_rows(
bso_number=bso_number,
chunk_offset=offset,
db=db,
writer=writer,
args=args)
writer.close()
if rows == 0:
break
offset = offset + rows
chunk += 1
return rows
def main():
args = get_args()
rows = 0
dsns = open(args.dsns).readlines()
schema = avro.schema.parse(open(args.schema, "rb").read())
col_schema = avro.schema.parse(open(args.col_schema, "rb").read())
if args.token_file:
read_in_token_file(args.token_file)
start = time.time()
for dsn in dsns:
print("Starting: {}".format(dsn))
try:
if not args.skip_collections:
dump_user_collections(col_schema, dsn, args)
for bso_num in range(args.start_bso, args.end_bso+1):
rows = dump_data(bso_num, schema, dsn, args)
except Exception as ex:
print("Could not process {}: {}".format(dsn, ex))
print("Dumped: {} rows in {} seconds".format(rows, time.time() - start))
if __name__ == "__main__":
main()

View File

@ -1,516 +0,0 @@
#! venv/bin/python
# This file is historical.
# This file will attempt to copy a user from an existing mysql database
# to a spanner table. It requires access to the tokenserver db, which may
# not be available in production environments.
#
#
import argparse
import logging
import base64
import sys
import os
import time
from datetime import datetime
from mysql import connector
from mysql.connector.errors import IntegrityError
from google.cloud import spanner
from google.api_core.exceptions import AlreadyExists
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
SPANNER_NODE_ID = 800
META_GLOBAL_COLLECTION_ID = 6
class BadDSNException(Exception):
pass
# From server_syncstorage
class MigrationState:
UKNOWN = 0
IN_PROGRESS = 1
COMPLETE = 2
class Collections:
"""Cache spanner collection list.
The spanner collection list is the (soon to be) single source of
truth regarding collection ids.
"""
_by_name = {}
databases = None
def __init__(self, databases):
"""Get the cache list of collection ids"""
sql = """
SELECT
name, collection_id
FROM
collections;
"""
self.databases = databases
logging.debug("Fetching collections...")
with self.databases['spanner'].snapshot() as cursor:
rows = cursor.execute_sql(sql)
for row in rows:
self._by_name[row[0]] = row[1]
def get_id(self, name, cursor):
""" Get/Init the ID for a given collection """
if name in self._by_name:
return self._by_name.get(name)
result = cursor.execute_sql("""
SELECT
COALESCE(MAX(collection_id), 1)
FROM
collections""")
# preserve the "reserved" / < 100 ids.
collection_id = max(result.one()[0] + 1, 101)
cursor.insert(
table="collections",
columns=('collection_id', 'name'),
values=[
(collection_id, name)
]
)
self._by_name[name] = collection_id
return collection_id
def get_args():
parser = argparse.ArgumentParser(
description="move user from sql to spanner")
parser.add_argument(
'--dsns', default="move_dsns.lst",
help="file of new line separated DSNs")
parser.add_argument(
'--users', default="move_users.lst",
help="file of new line separated users to move")
parser.add_argument(
'--token_dsn',
help="DSN to the token server database (optional)"
)
parser.add_argument(
'--verbose',
action="store_true",
help="verbose logging"
)
parser.add_argument(
'--quiet',
action="store_true",
help="silence logging"
)
parser.add_argument(
"--full",
action="store_true",
help="force a full reconcile"
)
return parser.parse_args()
def conf_mysql(dsn):
"""create a connection to the original storage system """
logging.debug("Configuring MYSQL: {}".format(dsn))
connection = connector.connect(
user=dsn.username,
password=dsn.password,
host=dsn.hostname,
port=dsn.port or 3306,
database=dsn.path[1:]
)
return connection
def conf_spanner(dsn):
"""create a connection to the new Spanner system"""
logging.debug("Configuring SPANNER: {}".format(dsn))
path = dsn.path.split("/")
instance_id = path[-3]
database_id = path[-1]
client = spanner.Client()
instance = client.instance(instance_id)
database = instance.database(database_id)
return database
def conf_db(dsn):
"""read the list of storage definitions from the file and create
a set of connetions.
"""
if dsn.scheme == "mysql":
return conf_mysql(dsn)
if dsn.scheme == "spanner":
return conf_spanner(dsn)
raise RuntimeError("Unknown DNS type: {}".format(dsn.scheme))
def update_token(databases, user):
"""optionally update the TokenServer storage indicating the user
is now on Spanner
"""
if 'token' not in databases:
logging.warn(
"Skipping token update for user {}...".format(user))
return
logging.info("Updating token server for user: {}".format(user))
try:
cursor = databases['token'].cursor()
cursor.execute(
"""
UPDATE
users
SET
replaced_at = {timestamp},
nodeid = {nodeid}
WHERE
uid = {uid}
""".format(
timestamp=int(time.time() * 100),
nodeid=SPANNER_NODE_ID,
uid=user)
)
databases['token'].commit()
finally:
cursor.close()
# The following two functions are taken from browserid.utils
def encode_bytes_b64(value):
return base64.urlsafe_b64encode(value).rstrip(b'=').decode('ascii')
def format_key_id(keys_changed_at, key_hash):
return "{:013d}-{}".format(
keys_changed_at,
encode_bytes_b64(key_hash),
)
def get_fxa_id(databases, user):
"""generate the spanner user key values from the original storage
data.
"""
sql = """
SELECT
email, generation, keys_changed_at, client_state, node
FROM users
WHERE uid = {uid}
""".format(uid=user)
try:
cursor = databases.get('token', databases['mysql']).cursor()
cursor.execute(sql)
(email, generation, keys_changed_at,
client_state, node) = cursor.next()
fxa_uid = email.split('@')[0]
fxa_kid = format_key_id(
keys_changed_at or generation,
bytes.fromhex(client_state),
)
finally:
cursor.close()
return (fxa_kid, fxa_uid, node)
def create_migration_table(database):
"""create the syncstorage table
This table tells the syncstorage server to return a 5xx for a
given user. It's important that syncstorage NEVER returns a
2xx result for any user that's in migration, or only does
so after deleting the meta/global BSO record so that a full
reconcile happens. (Depends on
https://github.com/mozilla-services/server-syncstorage/pull/136)
"""
try:
cursor = database.cursor()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS
migration (
fxa_uid VARCHAR(255) NOT NULL PRIMARY KEY,
started_at BIGINT NOT NULL,
state SMALLINT
)
""")
database.commit()
finally:
cursor.close()
def dumper(columns, values):
"""verbose column and data dumper. """
result = ""
for row in values:
for i in range(0, len(columns)):
result += " {} => {}\n".format(columns[i], row[i])
return result
def mark_user(databases, user, state=MigrationState.IN_PROGRESS):
""" mark a user in migration """
try:
mysql = databases['mysql'].cursor()
if state == MigrationState.IN_PROGRESS:
try:
logging.info("Marking {} as migrating...".format(user))
mysql.execute(
"INSERT INTO migration "
"(fxa_uid, started, state) VALUES (%s, %s, %s)",
(user, int(time.time()), state)
)
databases['mysql'].commit()
except IntegrityError:
return False
if state == MigrationState.COMPLETE:
logging.info("Marking {} as migrating...".format(user))
mysql.execute(
"UPDATE migration SET state = %s WHERE fxa_uid = %s",
(state, user)
)
databases['mysql'].commit()
finally:
mysql.close()
return True
def finish_user(databases, user):
"""mark a user migration complete"""
# This is not wrapped into `start_user` so that I can reduce
# the number of db IO, since an upsert would just work instead
# of fail out with a dupe.
mysql = databases['mysql'].cursor()
try:
logging.info("Marking {} as migrating...".format(user))
mysql.execute(
"""
UPDATE
migration
SET
state = "finished"
WHERE
fxa_uid = %s
""",
(user,)
)
databases['mysql'].commit()
except IntegrityError:
return False
finally:
mysql.close()
return True
def newSyncID():
base64.urlsafe_b64encode(os.urandom(9))
def alter_syncids(pay):
"""Alter the syncIDs for the meta/global record, which will cause a sync
when the client reconnects
"""
payload = json.loads(pay)
payload['syncID'] = newSyncID()
for item in payload['engines']:
payload['engines'][item]['syncID'] = newSyncID()
return json.dumps(payload)
def move_user(databases, user, args):
"""copy user info from original storage to new storage."""
# bso column mapping:
# id => bso_id
# collection => collection_id
# sortindex => sortindex
# modified => modified
# payload => payload
# payload_size => NONE
# ttl => expiry
# user collections require a unique key.
unique_key_filter = set()
# off chance that someone else might have written
# a new collection table since the last time we
# fetched.
collections = Collections(databases)
uc_columns = (
'fxa_kid',
'fxa_uid',
'collection_id',
'modified',
)
bso_columns = (
'collection_id',
'fxa_kid',
'fxa_uid',
'bso_id',
'expiry',
'modified',
'payload',
'sortindex',
)
# Genereate the Spanner Keys we'll need.
(fxa_kid, fxa_uid, original_node) = get_fxa_id(databases, user)
if not start_user(databases, fxa_uid):
logging.error("User {} already being migrated?".format(fxa_uid))
return
# Fetch the BSO data from the original storage.
sql = """
SELECT
collections.name, bso.collection,
bso.id, bso.ttl, bso.modified, bso.payload, bso.sortindex
FROM
collections, bso
WHERE
bso.userid = %s and collections.collectionid = bso.collection
ORDER BY
modified DESC"""
count = 0
def spanner_transact(transaction):
collection_id = collections.get_id(col, transaction)
if collection_id != cid:
logging.warn(
"Remapping collection '{}' from {} to {}".format(
col, cid, collection_id))
# columns from sync_schema3
mod_v = datetime.utcfromtimestamp(mod/1000.0)
exp_v = datetime.utcfromtimestamp(exp)
# User_Collection can only have unique values. Filter
# non-unique keys and take the most recent modified
# time. The join could be anything.
uc_key = "{}_{}_{}".format(fxa_uid, fxa_kid, col)
if uc_key not in unique_key_filter:
unique_key_filter.add(uc_key)
uc_values = [(
fxa_kid,
fxa_uid,
collection_id,
mod_v,
)]
logging.debug(
"### uc: {}".format(uc_columns, uc_values))
transaction.insert(
'user_collections',
columns=uc_columns,
values=uc_values
)
# add the BSO values.
if args.full and collection_id == META_GLOBAL_COLLECTION_ID:
pay = alter_syncids(pay)
bso_values = [[
collection_id,
fxa_kid,
fxa_uid,
bid,
exp_v,
mod_v,
pay,
sid,
]]
logging.debug(
"###bso: {}".format(dumper(bso_columns, bso_values)))
transaction.insert(
'bsos',
columns=bso_columns,
values=bso_values
)
mysql = databases['mysql'].cursor()
try:
# Note: cursor() does not support __enter__()
mysql.execute(sql, (user,))
logging.info("Processing... {} -> {}:{}".format(
user, fxa_uid, fxa_kid))
for (col, cid, bid, exp, mod, pay, sid) in mysql:
databases['spanner'].run_in_transaction(spanner_transact)
update_token(databases, user)
(ck_kid, ck_uid, ck_node) = get_fxa_id(databases, user)
if ck_node != original_node:
logging.error(
("User's Node Changed! Aborting! "
"fx_uid:{}, fx_kid:{}, node: {} => {}")
.format(user, fxa_uid, fxa_kid,
original_node, ck_node)
)
return
finish_user(databases, user)
count += 1
# Closing the with automatically calls `batch.commit()`
mark_user(user, MigrationState.COMPLETE)
except AlreadyExists:
logging.warn(
"User already imported fxa_uid:{} / fxa_kid:{}".format(
fxa_uid, fxa_kid
))
except Exception as e:
logging.error("### batch failure:", e)
finally:
# cursor may complain about unread data, this should prevent
# that warning.
for result in mysql:
pass
logging.debug("Closing...")
mysql.close()
return count
def move_data(databases, users, args):
"""iterate over provided users and move their data from old to new"""
for user in users:
rows = move_user(databases, user.strip(), args)
return rows
def main():
start = time.time()
args = get_args()
log_level = logging.INFO
if args.quiet:
log_level = logging.ERROR
if args.verbose:
log_level = logging.DEBUG
logging.basicConfig(
stream=sys.stdout,
level=log_level,
)
dsns = open(args.dsns).readlines()
users = open(args.users).readlines()
databases = {}
for line in dsns:
dsn = urlparse(line.strip())
databases[dsn.scheme] = conf_db(dsn)
if args.token_dsn:
dsn = urlparse(args.token_dsn)
databases['token'] = conf_db(dsn)
if not databases.get('mysql') or not databases.get('spanner'):
RuntimeError("Both mysql and spanner dsns must be specified")
# create the migration table if it's not already present.
# This table is used by the sync storage server to force a 500 return
# for a user in migration.
create_migration_table(databases['mysql'])
logging.info("Starting:")
rows = move_data(databases, users, args)
logging.info(
"Moved: {} rows in {} seconds".format(
rows or 0, time.time() - start))
if __name__ == "__main__":
main()

View File

@ -1,4 +0,0 @@
wheel
avro-python3
google-cloud-spanner
mysql-connector

View File

@ -1,13 +0,0 @@
{"namespace": "bso.avro",
"type": "record",
"name": "bso",
"fields": [
{"name": "fxa_uid", "type": ["null", "string"]},
{"name": "fxa_kid", "type": ["null", "string"]},
{"name": "collection_id", "type": ["null", "long"]},
{"name": "bso_id", "type": "string"},
{"name": "expiry", "type": "long"},
{"name": "modified", "type": "long"},
{"name": "payload", "type": "string"},
{"name": "sortindex", "type": ["null", "long"]}
]}

View File

@ -1,3 +0,0 @@
wheel
google-cloud-spanner
mysql-connector