mirror of
https://github.com/mozilla-services/syncstorage-rs.git
synced 2026-03-31 19:21:14 +02:00
Some checks failed
Glean probe-scraper / glean-probe-scraper (push) Has been cancelled
Build, Tag and Push Container Images to GAR Repository / build-and-push-syncstorage-rs (push) Has been cancelled
Build, Tag and Push Container Images to GAR Repository / build-and-push-syncserver-postgres (push) Has been cancelled
Build, Tag and Push Container Images to GAR Repository / build-and-push-syncstorage-rs-spanner-python-utils (push) Has been cancelled
Build, Tag and Push Container Images to GAR Repository / build-and-push-syncserver-postgres-python-utils (push) Has been cancelled
Build, Tag and Push Container Images to GAR Repository / build-and-push-syncserver-mysql (push) Has been cancelled
MySQL Build and Test / build-and-test-mysql (push) Has been cancelled
MySQL Build and Test / build-mysql-image (push) Has been cancelled
MySQL Build and Test / mysql-e2e-tests (push) Has been cancelled
Postgres Build and Test / build-and-test-postgres (push) Has been cancelled
Postgres Build and Test / build-postgres-image (push) Has been cancelled
Postgres Build and Test / postgres-e2e-tests (push) Has been cancelled
Publish Sync docs to pages / build-mdbook (push) Has been cancelled
Publish Sync docs to pages / build-openapi (push) Has been cancelled
Publish Sync docs to pages / combine-and-prepare (push) Has been cancelled
Publish Sync docs to pages / deploy (push) Has been cancelled
Spanner Build, Test, and Push / build-and-test-spanner (push) Has been cancelled
Spanner Build, Test, and Push / build-spanner-image (push) Has been cancelled
Spanner Build, Test, and Push / spanner-e2e-tests (push) Has been cancelled
test: molotov sync loadtests
271 lines
7.5 KiB
Python
271 lines
7.5 KiB
Python
"""FxA authentication and JWT token management."""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import random
|
|
import string
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import jwt
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import serialization
|
|
from fxa.core import Client
|
|
from fxa.oauth import Client as OAuthClient
|
|
from fxa.tests.utils import TestEmailAccount
|
|
|
|
# Client ID for Firefox Desktop
|
|
CLIENT_ID = "5882386c6d801776"
|
|
FXA_API_HOST = os.environ.get("FXA_API_HOST", "https://api-accounts.stage.mozaws.net")
|
|
FXA_OAUTH_HOST = os.environ.get("FXA_OAUTH_HOST", "https://oauth.stage.mozaws.net")
|
|
OAUTH_SCOPE = "https://identity.mozilla.com/apps/oldsync"
|
|
PASSWORD_LENGTH = 20
|
|
|
|
# Assertions are good for one year (in seconds).
|
|
# This avoids having to deal with clock-skew in tokenserver requests.
|
|
ASSERTION_LIFETIME = 60 * 60 * 24 * 365
|
|
|
|
# OAuth JWT signing to test without FxA
|
|
OAUTH_PRIVATE_KEY_FILE = os.environ.get("OAUTH_PRIVATE_KEY_FILE")
|
|
OAUTH_ISSUER = os.environ.get("OAUTH_ISSUER", "http://mock-fxa-server:6000")
|
|
OAUTH_JWT_ALGORITHM = os.environ.get("OAUTH_JWT_ALGORITHM", "RS256")
|
|
|
|
_ACCT_TRACKING_FILE = Path(__file__).parent.parent / ".accounts_tracking.json"
|
|
|
|
|
|
def _generate_password() -> str:
|
|
"""Generate a random password for FxA account.
|
|
|
|
Returns:
|
|
str: Random password string.
|
|
|
|
"""
|
|
return "".join(random.choice(string.printable) for i in range(PASSWORD_LENGTH))
|
|
|
|
|
|
def _track_account_creation(email: str, password: str, fxa_uid: str) -> None:
|
|
"""Track FxA account creation in a local file for cleanup.
|
|
|
|
Args:
|
|
email: Account email address.
|
|
password: Account password.
|
|
fxa_uid: FxA user ID.
|
|
|
|
"""
|
|
try:
|
|
if _ACCT_TRACKING_FILE.exists():
|
|
with open(_ACCT_TRACKING_FILE, "r") as f:
|
|
accounts = json.load(f)
|
|
else:
|
|
accounts = []
|
|
|
|
for acc in accounts:
|
|
# already tracked
|
|
if acc["email"] == email:
|
|
return
|
|
|
|
accounts.append(
|
|
{
|
|
"email": email,
|
|
"password": password,
|
|
"fxa_uid": fxa_uid,
|
|
"created_at": int(time.time()),
|
|
}
|
|
)
|
|
|
|
with open(_ACCT_TRACKING_FILE, "w") as f:
|
|
json.dump(accounts, f, indent=2)
|
|
|
|
except Exception:
|
|
# continue with tests
|
|
pass
|
|
|
|
|
|
def _remove_account_from_tracking(email: str) -> None:
|
|
"""Remove an account from the tracking file.
|
|
|
|
Args:
|
|
email: Account email address to remove.
|
|
|
|
"""
|
|
try:
|
|
if not _ACCT_TRACKING_FILE.exists():
|
|
return
|
|
|
|
with open(_ACCT_TRACKING_FILE, "r") as f:
|
|
accounts = json.load(f)
|
|
|
|
accounts = [acc for acc in accounts if acc["email"] != email]
|
|
|
|
if not accounts:
|
|
_ACCT_TRACKING_FILE.unlink()
|
|
else:
|
|
with open(_ACCT_TRACKING_FILE, "w") as f:
|
|
json.dump(accounts, f, indent=2)
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _create_self_signed_jwt(
|
|
email: str, client_id: str = CLIENT_ID
|
|
) -> tuple[str, str, str, str, None, None, None]:
|
|
"""Create a self-signed OAuth JWT.
|
|
|
|
Requires OAUTH_PRIVATE_KEY_FILE env var.
|
|
|
|
Args:
|
|
email: Email address for the JWT.
|
|
client_id: OAuth client ID (default: CLIENT_ID constant).
|
|
|
|
Returns:
|
|
tuple: A tuple containing:
|
|
- oauth_token: Self-signed JWT
|
|
- email: Email address
|
|
- fxa_uid: Generated FxA uid
|
|
- key_id: Key id
|
|
- None: FxA session not needed
|
|
- None: OAuth client not needed
|
|
- None: cleanup info not needed
|
|
|
|
"""
|
|
if not OAUTH_PRIVATE_KEY_FILE:
|
|
raise ValueError("OAUTH_PRIVATE_KEY_FILE must be set")
|
|
|
|
with open(OAUTH_PRIVATE_KEY_FILE, "rb") as f:
|
|
private_key = serialization.load_pem_private_key(
|
|
f.read(), password=None, backend=default_backend()
|
|
)
|
|
|
|
# fake fxa uid
|
|
fxa_uid = hashlib.sha256(email.encode()).hexdigest()[:32]
|
|
|
|
# JWT payload
|
|
now = int(time.time())
|
|
payload = {
|
|
"sub": fxa_uid,
|
|
"scope": OAUTH_SCOPE,
|
|
"fxa-generation": 0,
|
|
"client_id": client_id,
|
|
"iat": now,
|
|
"exp": now + (12 * 3600),
|
|
"iss": OAUTH_ISSUER,
|
|
}
|
|
|
|
# sign JWT
|
|
oauth_token = jwt.encode(
|
|
payload,
|
|
private_key,
|
|
algorithm=OAUTH_JWT_ALGORITHM,
|
|
headers={"typ": "application/at+jwt"},
|
|
)
|
|
|
|
key_id = "1234-qqo"
|
|
|
|
if os.environ.get("DEBUG_OAUTH"):
|
|
print(f"DEBUG: Created self-signed JWT for: {email} / {fxa_uid}")
|
|
|
|
return oauth_token, email, fxa_uid, key_id, None, None, None
|
|
|
|
|
|
def _is_oauth_token_expired(oauth_token: str, buffer_seconds: int = 300) -> bool:
|
|
"""Check if an OAuth token is expired or will expire soon.
|
|
|
|
Args:
|
|
oauth_token: JWT token to check.
|
|
buffer_seconds: Consider token expired if it expires within this many seconds.
|
|
|
|
Returns:
|
|
bool: True if token is expired or will expire soon, False otherwise.
|
|
|
|
"""
|
|
try:
|
|
decoded = jwt.decode(oauth_token, options={"verify_signature": False})
|
|
exp = decoded.get("exp")
|
|
|
|
if exp is None:
|
|
return False
|
|
|
|
current_time = time.time()
|
|
return current_time >= (exp - buffer_seconds)
|
|
|
|
except Exception:
|
|
return True
|
|
|
|
|
|
def _create_fxa_account() -> tuple[
|
|
str, str, str, str, Optional[Any], Optional[OAuthClient], Optional[dict[str, Any]]
|
|
]:
|
|
"""Create account credentials.
|
|
|
|
If OAUTH_PRIVATE_KEY_FILE is provided, creates self-signed JWT. Otherwise,
|
|
creates real, testing account via FxA API. The account is deleted at the
|
|
end of the test.
|
|
|
|
Returns:
|
|
tuple: A tuple containing:
|
|
- oauth_token: OAuth access token
|
|
- acct_email: Account email address
|
|
- fxa_uid: FxA uid
|
|
- key_id: Key id
|
|
- fxa_session: FxA session, or None for self-signed
|
|
- oauth_client: OAuthClient instance, or None for self-signed
|
|
- cleanup_info: Dict with cleanup info, or None for self-signed
|
|
|
|
"""
|
|
if OAUTH_PRIVATE_KEY_FILE:
|
|
email = f"molotov-{int(time.time())}-{random.randint(1000, 9999)}@example.com"
|
|
return _create_self_signed_jwt(email)
|
|
|
|
# use FxA to create account and fetch JWT
|
|
acct = TestEmailAccount()
|
|
client = Client(FXA_API_HOST)
|
|
oauth_client = OAuthClient(CLIENT_ID, None, server_url=FXA_OAUTH_HOST)
|
|
fxa_password = _generate_password()
|
|
session = client.create_account(acct.email, password=fxa_password)
|
|
|
|
# wait for account verification email
|
|
max_retries = 20
|
|
for _ in range(max_retries):
|
|
if acct.messages:
|
|
break
|
|
time.sleep(0.5)
|
|
acct.fetch()
|
|
|
|
if not acct.messages:
|
|
raise ValueError("Failed to receive FxA verification email")
|
|
|
|
# verify account with code
|
|
verified = False
|
|
for m in acct.messages:
|
|
if "x-verify-code" in m["headers"]:
|
|
session.verify_email_code(m["headers"]["x-verify-code"])
|
|
verified = True
|
|
break
|
|
|
|
if not verified:
|
|
raise ValueError("Failed to find verification code in email")
|
|
|
|
oauth_token = oauth_client.authorize_token(session, OAUTH_SCOPE)
|
|
key_id = "1234-qqo"
|
|
cleanup_info = {
|
|
"client": client,
|
|
"acct": acct,
|
|
"email": acct.email,
|
|
"password": fxa_password,
|
|
}
|
|
|
|
_track_account_creation(acct.email, fxa_password, session.uid)
|
|
|
|
return (
|
|
oauth_token,
|
|
acct.email,
|
|
session.uid,
|
|
key_id,
|
|
session,
|
|
oauth_client,
|
|
cleanup_info,
|
|
)
|