test: test_storage test_support pytest refactor (#2239)
Some checks are pending
Glean probe-scraper / glean-probe-scraper (push) Waiting to run
Main Workflow - Lint, Build, Test / python-env (push) Waiting to run
Main Workflow - Lint, Build, Test / rust-env (push) Waiting to run
Main Workflow - Lint, Build, Test / python-checks (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / rust-checks (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / clippy (mysql) (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / clippy (postgres) (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / clippy (spanner) (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / clippy-release (mysql) (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / clippy-release (postgres) (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / clippy-release (spanner) (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / build-and-unit-test-postgres (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / build-postgres-image (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / postgres-e2e-tests (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / build-and-unit-test-mysql (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / build-mysql-image (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / mysql-e2e-tests (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / build-and-unit-test-spanner (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / build-spanner-image (push) Blocked by required conditions
Main Workflow - Lint, Build, Test / spanner-e2e-tests (push) Blocked by required conditions
Build, Tag and Push Container Images to GAR / check (push) Waiting to run
Build, Tag and Push Container Images to GAR / build-and-push-syncstorage-rs (push) Blocked by required conditions
Build, Tag and Push Container Images to GAR / build-and-push-syncserver-postgres (push) Blocked by required conditions
Build, Tag and Push Container Images to GAR / build-and-push-syncstorage-rs-spanner-python-utils (push) Blocked by required conditions
Build, Tag and Push Container Images to GAR / build-and-push-syncserver-postgres-python-utils (push) Blocked by required conditions
Build, Tag and Push Container Images to GAR / build-and-push-syncserver-mysql (push) Blocked by required conditions
Publish Sync docs to pages / build-mdbook (push) Waiting to run
Publish Sync docs to pages / build-openapi (push) Waiting to run
Publish Sync docs to pages / combine-and-prepare (push) Blocked by required conditions
Publish Sync docs to pages / deploy (push) Blocked by required conditions

* core integration test conftest

* added test storage functional tests

* remove extraneous conf

* rmv dead wright TestStorage from test_storage

* fmt

* rmv test_support
This commit is contained in:
Taddes 2026-05-04 15:56:14 -04:00 committed by GitHub
parent cd7847d471
commit 6476f5b47a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 2349 additions and 2580 deletions

View File

@ -1,7 +1,20 @@
"""Pytest configuration and fixtures for integration tests."""
import os
import contextlib
import logging
import os
import random
import time
import uuid
import hawkauthlib
import pytest
import webtest
from pyramid.interfaces import IAuthenticationPolicy
from pyramid.request import Request
from webtest import TestApp
from tools.integration_tests.test_support import get_test_configurator
# max number of attempts to check server heartbeat
SYNC_SERVER_STARTUP_MAX_ATTEMPTS = 35
@ -10,8 +23,6 @@ SYNC_SERVER_URL = os.environ.get("SYNC_SERVER_URL", "http://localhost:8000")
logger = logging.getLogger("tools.integration-tests")
if os.environ.get("SYNC_TEST_LOG_HTTP"):
import webtest
_orig_do_request = webtest.TestApp.do_request
def _logged_do_request(self, req, *args, **kwargs):
@ -25,3 +36,187 @@ if os.environ.get("SYNC_TEST_LOG_HTTP"):
return resp
webtest.TestApp.do_request = _logged_do_request
def _retry_send(func, *args, **kwargs):
"""Call a webtest method, retrying once on 409/503."""
try:
return func(*args, **kwargs)
except webtest.AppError as ex:
if "409 " not in ex.args[0] and "503 " not in ex.args[0]:
raise
time.sleep(0.01)
return func(*args, **kwargs)
def retry_post_json(app, *args, **kwargs):
"""POST JSON with retry on transient errors."""
return _retry_send(app.post_json, *args, **kwargs)
def retry_put_json(app, *args, **kwargs):
"""PUT JSON with retry on transient errors."""
return _retry_send(app.put_json, *args, **kwargs)
def retry_delete(app, *args, **kwargs):
"""DELETE with retry on transient errors."""
return _retry_send(app.delete, *args, **kwargs)
def _make_auth_state(config, host_url):
"""Generate hawk credentials for a new random user."""
global_secret = os.environ.get("SYNC_MASTER_SECRET")
policy = config.registry.getUtility(IAuthenticationPolicy)
if global_secret is not None:
policy.secrets._secrets = [global_secret]
user_id = random.randint(1, 100000)
fxa_uid = "DECAFBAD" + str(uuid.uuid4().hex)[8:]
hashed_fxa_uid = str(uuid.uuid4().hex)
fxa_kid = "0000000000000-DECAFBAD" + str(uuid.uuid4().hex)[8:]
req = Request.blank(host_url)
creds = policy.encode_hawk_id(
req,
user_id,
extra={
"hashed_fxa_uid": hashed_fxa_uid,
"fxa_uid": fxa_uid,
"fxa_kid": fxa_kid,
},
)
auth_token, auth_secret = creds
return {
"user_id": user_id,
"fxa_uid": fxa_uid,
"hashed_fxa_uid": hashed_fxa_uid,
"fxa_kid": fxa_kid,
"auth_token": auth_token,
"auth_secret": auth_secret,
}
@pytest.fixture(scope="function")
def st_ctx():
"""Functional test context for storage API tests.
Sets up a Pyramid configurator, creates a TestApp with hawk signing,
authenticates a random user, clears that user's data, and yields a
context dict. Tears down configurator on exit.
"""
ini_file = os.environ.get("MOZSVC_TEST_INI_FILE", "tests.ini")
os.environ["MOZSVC_UUID"] = str(uuid.uuid4())
if "MOZSVC_SQLURI" not in os.environ:
os.environ["MOZSVC_SQLURI"] = "sqlite:///:memory:"
if "MOZSVC_ONDISK_SQLURI" not in os.environ:
ondisk = os.environ["MOZSVC_SQLURI"]
if ":memory:" in ondisk:
ondisk = "sqlite:////tmp/tests-sync-%s.db" % os.environ["MOZSVC_UUID"]
os.environ["MOZSVC_ONDISK_SQLURI"] = ondisk
# Locate tests.ini relative to test_storage.py
this_dir = os.path.dirname(os.path.abspath(__file__))
config = get_test_configurator(this_dir, ini_file)
config.commit()
config.make_wsgi_app()
host_url = os.environ.get("SYNC_SERVER_URL", "http://localhost:8000")
import urllib.parse as urlparse
host_parts = urlparse.urlparse(host_url)
app = TestApp(
host_url,
extra_environ={
"HTTP_HOST": host_parts.netloc,
"wsgi.url_scheme": host_parts.scheme or "http",
"SERVER_NAME": host_parts.hostname,
"REMOTE_ADDR": "127.0.0.1",
"SCRIPT_NAME": host_parts.path,
},
)
# Mutable auth state — shared with the do_request closure so that
# switch_user() and the expired-token test can swap credentials at runtime.
auth = _make_auth_state(config, host_url)
auth_state = {
"auth_token": auth["auth_token"],
"auth_secret": auth["auth_secret"],
}
orig_do_request = app.do_request
def new_do_request(req, *args, **kwds):
hawkauthlib.sign_request(
req, auth_state["auth_token"], auth_state["auth_secret"]
)
return orig_do_request(req, *args, **kwds)
app.do_request = new_do_request
root = "/1.5/%d" % auth["user_id"]
retry_delete(app, root)
ctx = {
"app": app,
"root": root,
"user_id": auth["user_id"],
"fxa_uid": auth["fxa_uid"],
"hashed_fxa_uid": auth["hashed_fxa_uid"],
"fxa_kid": auth["fxa_kid"],
"auth_state": auth_state,
"config": config,
"host_url": host_url,
}
yield ctx
config.end()
del os.environ["MOZSVC_UUID"]
@contextlib.contextmanager
def switch_user(st_ctx):
"""Context manager: temporarily switch to a fresh random user.
Updates both st_ctx and the auth_state dict (shared with the
do_request closure) for the duration of the block, then restores
the original user on exit.
"""
orig_root = st_ctx["root"]
orig_user_id = st_ctx["user_id"]
orig_fxa_uid = st_ctx["fxa_uid"]
orig_hashed_fxa_uid = st_ctx["hashed_fxa_uid"]
orig_fxa_kid = st_ctx["fxa_kid"]
orig_auth_token = st_ctx["auth_state"]["auth_token"]
orig_auth_secret = st_ctx["auth_state"]["auth_secret"]
config = st_ctx["config"]
host_url = st_ctx["host_url"]
app = st_ctx["app"]
for _ in range(10):
new_auth = _make_auth_state(config, host_url)
if new_auth["user_id"] != orig_user_id:
break
else:
raise RuntimeError("Failed to switch to new user id")
st_ctx["auth_state"]["auth_token"] = new_auth["auth_token"]
st_ctx["auth_state"]["auth_secret"] = new_auth["auth_secret"]
st_ctx["user_id"] = new_auth["user_id"]
st_ctx["fxa_uid"] = new_auth["fxa_uid"]
st_ctx["hashed_fxa_uid"] = new_auth["hashed_fxa_uid"]
st_ctx["fxa_kid"] = new_auth["fxa_kid"]
new_root = "/1.5/%d" % new_auth["user_id"]
st_ctx["root"] = new_root
retry_delete(app, new_root)
try:
yield
finally:
st_ctx["auth_state"]["auth_token"] = orig_auth_token
st_ctx["auth_state"]["auth_secret"] = orig_auth_secret
st_ctx["user_id"] = orig_user_id
st_ctx["fxa_uid"] = orig_fxa_uid
st_ctx["hashed_fxa_uid"] = orig_hashed_fxa_uid
st_ctx["fxa_kid"] = orig_fxa_kid
st_ctx["root"] = orig_root

File diff suppressed because it is too large Load Diff

View File

@ -1,382 +0,0 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
"""Test support utilities for tokenserver integration tests."""
from base64 import urlsafe_b64encode as b64encode
import binascii
import json
import os
import math
import time
import urllib.parse as urlparse
from sqlalchemy import create_engine
from sqlalchemy.sql import text as sqltext
from tokenlib.utils import decode_token_bytes
from webtest import TestApp
DEFAULT_OAUTH_SCOPE = "https://identity.mozilla.com/apps/oldsync"
class TestCase:
"""Base test case for tokenserver integration tests."""
FXA_EMAIL_DOMAIN = "api-accounts.stage.mozaws.net"
FXA_METRICS_HASH_SECRET = os.environ.get("SYNC_MASTER_SECRET", "secret0")
NODE_ID = 800
NODE_URL = "https://example.com"
TOKEN_SIGNING_SECRET = os.environ.get("SYNC_MASTER_SECRET", "secret0")
TOKENSERVER_HOST = os.environ["TOKENSERVER_HOST"]
@classmethod
def setUpClass(cls):
"""Set up class-level fixtures for the tokenserver test case."""
cls._build_auth_headers = cls._build_oauth_headers
def setUp(self):
"""Set up test fixtures including database connection and test node."""
db_url = os.environ["SYNC_TOKENSERVER__DATABASE_URL"]
# SQLAlchemy 1.4+ wants postgresql
if db_url.startswith("postgres://"):
db_url = db_url.replace("postgres://", "postgresql://", 1)
engine = create_engine(db_url)
self.database = engine.execution_options(isolation_level="AUTOCOMMIT").connect()
self.db_mode = os.environ["SYNC_TOKENSERVER__DATABASE_URL"].split(":")[0]
# Extract the expected node_type from the syncstorage database URL
# This matches the storage backend type (mysql, postgres, spanner)
syncstorage_url = os.environ.get("SYNC_SYNCSTORAGE__DATABASE_URL", "spanner://")
self.expected_node_type = syncstorage_url.split(":")[0]
# Normalize database URL schemes to match node_type values
if self.expected_node_type == "postgresql":
self.expected_node_type = "postgres"
# MySQL URLs might use "mysql" or other variants - normalize to "mysql"
if self.expected_node_type.startswith("mysql"):
self.expected_node_type = "mysql"
host_url = urlparse.urlparse(self.TOKENSERVER_HOST)
self.app = TestApp(
self.TOKENSERVER_HOST,
extra_environ={
"HTTP_HOST": host_url.netloc,
"wsgi.url_scheme": host_url.scheme or "http",
"SERVER_NAME": host_url.hostname,
"REMOTE_ADDR": "127.0.0.1",
"SCRIPT_NAME": host_url.path,
},
)
# Start each test with a blank slate.
self._clear_db()
# TODO: tokenserver-mysql's migration should add this
# service entry for us (if possible)
self.service_id = self._get_or_add_service("sync-1.5", r"{node}/1.5/{uid}")
# Ensure we have a node with enough capacity to run the tests.
self._add_node(capacity=100, node=self.NODE_URL, id=self.NODE_ID)
def tearDown(self):
"""Tear down test fixtures and clean up the database."""
# And clean up at the end, for good measure.
self._clear_db()
self.database.close()
def _clear_db(self):
cursor = self._execute_sql(sqltext(("DELETE FROM users")), {})
cursor.close()
cursor = self._execute_sql(sqltext(("DELETE FROM nodes")), {})
cursor.close()
# NOTE: don't clear the services between tests as tokenserver
# may have already cached its "sync-1.5" service_id
def _build_oauth_headers(
self,
generation=None,
user="test",
keys_changed_at=None,
client_state=None,
status=200,
**additional_headers,
):
claims = {
"user": user,
"generation": generation,
"client_id": "fake client id",
"scope": [DEFAULT_OAUTH_SCOPE],
}
if generation is not None:
claims["generation"] = generation
body = {"body": claims, "status": status}
headers = {}
headers["Authorization"] = f"Bearer {json.dumps(body)}"
client_state = binascii.unhexlify(client_state)
client_state = b64encode(client_state).strip(b"=").decode("utf-8")
headers["X-KeyID"] = f"{keys_changed_at}-{client_state}"
headers.update(additional_headers)
return headers
def _add_node(
self,
capacity=100,
available=100,
node=NODE_URL,
id=None,
current_load=0,
backoff=0,
downed=0,
):
if not id:
params = {
"service": self.service_id,
"node": node,
"available": available,
"capacity": capacity,
"current_load": current_load,
"backoff": backoff,
"downed": downed,
}
query = sqltext("""\
insert into nodes (service, node, available, capacity, \
current_load, backoff, downed)
values (:service, :node, :available, :capacity, :current_load,
:backoff, :downed)
""")
query_pg = sqltext("""\
insert into nodes (service, node, available, capacity, \
current_load, backoff, downed)
values (:service, :node, :available, :capacity, :current_load,
:backoff, :downed)
RETURNING id
""")
else:
query = sqltext("""\
insert into nodes (service, node, available, capacity, \
current_load, backoff, downed, id)
values (:service, :node, :available, :capacity, :current_load,
:backoff, :downed, :id)
""")
query_pg = sqltext("""\
insert into nodes (service, node, available, capacity, \
current_load, backoff, downed, id)
values (:service, :node, :available, :capacity, :current_load,
:backoff, :downed, :id)
RETURNING id
""")
params = {
"service": self.service_id,
"node": node,
"available": available,
"capacity": capacity,
"current_load": current_load,
"backoff": backoff,
"downed": downed,
"id": id,
}
if self.db_mode == "postgres":
cursor = self._execute_sql(query_pg, params)
result = cursor.fetchone()[0]
cursor.close()
return result
else:
cursor = self._execute_sql(query, params)
result = cursor.lastrowid
cursor.close()
return result
def _get_node(self, id):
query = sqltext("select * from nodes where id = :id")
cursor = self._execute_sql(query, {"id": id})
(id, service, node, available, current_load, capacity, downed, backoff) = (
cursor.fetchone()
)
cursor.close()
return {
"id": id,
"service": service,
"node": node,
"available": available,
"current_load": current_load,
"capacity": capacity,
"downed": downed,
"backoff": backoff,
}
def _add_service(self, service, pattern):
"""Add definition for a new service."""
if self.db_mode == "postgres":
insert_sql = sqltext("""
insert into services (service, pattern)
values (:service, :pattern)
RETURNING id
""")
else:
insert_sql = sqltext("""
insert into services (service, pattern)
values (:service, :pattern)
""")
cursor = self._execute_sql(insert_sql, {"service": service, "pattern": pattern})
if self.db_mode == "postgres":
result = cursor.fetchone()[0]
cursor.close()
return result
else:
result = cursor.lastrowid
cursor.close()
return result
def _add_user(
self,
email=None,
nodeid=NODE_ID,
generation=1234,
keys_changed_at=1234,
client_state="aaaa",
created_at=None,
replaced_at=None,
):
if self.db_mode == "postgres":
insert_sql = sqltext("""\
insert into users (service, email, nodeid, generation, keys_changed_at, client_state, created_at, replaced_at)
values (:service, :email, :nodeid, :generation, :keys_changed_at, :client_state, :created_at, :replaced_at)
RETURNING uid
""")
else:
insert_sql = sqltext("""\
insert into
users
(service, email, nodeid, generation, keys_changed_at, client_state,
created_at, replaced_at)
values
(:service, :email, :nodeid, :generation, :keys_changed_at,
:client_state, :created_at, :replaced_at)
""")
created_at = created_at or math.trunc(time.time() * 1000)
params = {
"service": self.service_id,
"email": email or f"test@{self.FXA_EMAIL_DOMAIN}",
"nodeid": nodeid,
"generation": generation,
"keys_changed_at": keys_changed_at,
"client_state": client_state,
"created_at": created_at,
"replaced_at": replaced_at,
}
cursor = self._execute_sql(insert_sql, params)
if self.db_mode == "postgres":
result = cursor.fetchone()[0]
cursor.close()
return result
else:
result = cursor.lastrowid
cursor.close()
return result
def _get_user(self, uid):
query = sqltext("select * from users where uid = :uid")
cursor = self._execute_sql(query, {"uid": uid})
(
uid,
service,
email,
generation,
client_state,
created_at,
replaced_at,
nodeid,
keys_changed_at,
) = cursor.fetchone()
cursor.close()
return {
"uid": uid,
"service": service,
"email": email,
"generation": generation,
"client_state": client_state,
"created_at": created_at,
"replaced_at": replaced_at,
"nodeid": nodeid,
"keys_changed_at": keys_changed_at,
}
def _get_replaced_users(self, service, email):
query = sqltext("""\
select * from users
where service = :service
and email = :email
and replaced_at is not null
""")
params = {"service": service, "email": email}
cursor = self._execute_sql(query, params)
users = []
for user in cursor.fetchall():
(
uid,
service,
email,
generation,
client_state,
created_at,
replaced_at,
nodeid,
keys_changed_at,
) = user
user_dict = {
"uid": uid,
"service": service,
"email": email,
"generation": generation,
"client_state": client_state,
"created_at": created_at,
"replaced_at": replaced_at,
"nodeid": nodeid,
"keys_changed_at": keys_changed_at,
}
users.append(user_dict)
cursor.close()
return users
def _get_service_id(self, service):
query = sqltext("select id from services where service = :service")
cursor = self._execute_sql(query, {"service": service})
row = cursor.fetchone()
cursor.close()
return None if row is None else row[0]
def _get_or_add_service(self, service, pattern):
service_id = self._get_service_id(service)
if service_id is not None:
return service_id
return self._add_service(service, pattern)
def _count_users(self):
query = sqltext("select COUNT(DISTINCT(uid)) from users")
cursor = self._execute_sql(query, {})
(count,) = cursor.fetchone()
cursor.close()
return count
def _execute_sql(self, *args, **kwds):
"""Execute SQL statement.
*args is the query and **kwds are the keyword argument parameters.
"""
cursor = self.database.execute(*args, **kwds)
return cursor
def unsafelyParseToken(self, token):
"""Parse a token without verifying its HMAC signature."""
# For testing purposes, don't check HMAC or anything...
return json.loads(decode_token_bytes(token)[:-32].decode("utf8"))