syncstorage-rs/tools/integration_tests/tokenserver/test_e2e.py
Taddes 77254b4a6e
Some checks failed
Glean probe-scraper / glean-probe-scraper (push) Has been cancelled
refactor: python imports (#1730)
refactor: unified python imports for consistency in local, production, and test environments
2025-08-04 15:37:46 -04:00

233 lines
9.7 KiB
Python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from base64 import urlsafe_b64decode
import hmac
import json
import jwt
import pytest
import random
import string
import time
import tokenlib
import unittest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from fxa.core import Client
from fxa.oauth import Client as OAuthClient
from fxa.errors import ClientError, ServerError
from fxa.tests.utils import TestEmailAccount
from hashlib import sha256
from integration_tests.tokenserver.test_support import TestCase
# This is the client ID used for Firefox Desktop. The FxA team confirmed that
# this is the proper client ID to be using for these integration tests.
CLIENT_ID = '5882386c6d801776'
DEFAULT_TOKEN_DURATION = 3600
FXA_ACCOUNT_STAGE_HOST = 'https://api-accounts.stage.mozaws.net'
FXA_OAUTH_STAGE_HOST = 'https://oauth.stage.mozaws.net'
PASSWORD_CHARACTERS = string.ascii_letters + string.punctuation + string.digits
PASSWORD_LENGTH = 32
SCOPE = 'https://identity.mozilla.com/apps/oldsync'
@pytest.mark.usefixtures('setup_server_end_to_end_testing')
class TestE2e(TestCase, unittest.TestCase):
def setUp(self):
super(TestE2e, self).setUp()
def tearDown(self):
super(TestE2e, self).tearDown()
@classmethod
def setUpClass(cls):
# Create an ephemeral email account to use to create an FxA account
cls.acct = TestEmailAccount()
cls.client = Client(FXA_ACCOUNT_STAGE_HOST)
cls.oauth_client = OAuthClient(CLIENT_ID, None,
server_url=FXA_OAUTH_STAGE_HOST)
cls.fxa_password = cls._generate_password()
# Create an FxA account for these end-to-end tests
cls.session = cls.client.create_account(cls.acct.email,
password=cls.fxa_password)
# Loop until we receive the verification email from FxA
while not cls.acct.messages:
time.sleep(0.5)
cls.acct.fetch()
# Find the message containing the verification code and verify the
# code
for m in cls.acct.messages:
if 'x-verify-code' in m['headers']:
cls.session.verify_email_code(m['headers']['x-verify-code'])
# Create an OAuth token to be used for the end-to-end tests
cls.oauth_token = cls.oauth_client.authorize_token(cls.session, SCOPE)
@classmethod
def tearDownClass(cls):
cls.acct.clear()
# A teardown of some of the tests can produce a 401 error because
# of a race condition, where the record had already been removed.
# This causes `destroy_account` to return an error if it attempts
# to parse the invalid JSON response.
# It's also possible that the `destroy_account` is rejected due to
# missing authentication. It is not known why the authentication
# is considered missing.
# This traps for those events.
try:
cls.client.destroy_account(cls.acct.email, cls.fxa_password)
except (ServerError, ClientError) as ex:
print(f"warning: Encountered error when cleaning up: {ex}")
@staticmethod
def _generate_password():
r = range(PASSWORD_LENGTH)
return ''.join(random.choice(PASSWORD_CHARACTERS) for i in r)
def _get_oauth_token_with_bad_scope(self):
bad_scope = 'bad_scope'
return self.oauth_client.authorize_token(self.session, bad_scope)
def _get_bad_token(self):
key = rsa.generate_private_key(backend=default_backend(),
public_exponent=65537,
key_size=2048)
format = serialization.PrivateFormat.TraditionalOpenSSL
algorithm = serialization.NoEncryption()
pem = key.private_bytes(encoding=serialization.Encoding.PEM,
format=format,
encryption_algorithm=algorithm)
private_key = pem.decode('utf-8')
claims = {
'sub': 'fake sub',
'iat': 12345,
'exp': 12345,
}
return jwt.encode(claims, private_key, algorithm='RS256')
def _extract_keys_changed_at_from_assertion(self, assertion):
token = assertion.split('~')[-2]
claims = jwt.decode(token, options={"verify_signature": False})
return claims['fxa-keysChangedAt']
@classmethod
def _change_password(cls):
new_password = cls._generate_password()
cls.session.change_password(cls.fxa_password, new_password)
cls.fxa_password = new_password
# Adapted from the original Tokenserver:
# https://github.com/mozilla-services/tokenserver/blob/master/tokenserver/util.py#L24
def _fxa_metrics_hash(self, value):
hasher = hmac.new(self.FXA_METRICS_HASH_SECRET.encode('utf-8'), b'',
sha256)
hasher.update(value.encode('utf-8'))
return hasher.hexdigest()
def test_unauthorized_oauth_error_status(self):
# Totally busted auth -> generic error.
headers = {
'Authorization': 'Unsupported-Auth-Scheme IHACKYOU',
'X-KeyID': '1234-qqo'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'errors': [
{
'description': 'Unsupported',
'location': 'body',
'name': ''
}
],
'status': 'error'
}
self.assertEqual(res.json, expected_error_response)
token = self._get_bad_token()
headers = {
'Authorization': 'Bearer %s' % token,
'X-KeyID': '1234-qqo'
}
# Bad token -> 'invalid-credentials'
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'errors': [
{
'description': 'Unauthorized',
'location': 'body',
'name': ''
}
],
'status': 'invalid-credentials'
}
self.assertEqual(res.json, expected_error_response)
# Untrusted scopes -> 'invalid-credentials'
token = self._get_oauth_token_with_bad_scope()
headers = {
'Authorization': 'Bearer %s' % token,
'X-KeyID': '1234-qqo'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
self.assertEqual(res.json, expected_error_response)
def test_valid_oauth_request(self):
oauth_token = self.oauth_token
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-qqo'
}
# Send a valid request, allocating a new user
res = self.app.get('/1.0/sync/1.5', headers=headers)
fxa_uid = self.session.uid
# Retrieve the user from the database
user = self._get_user(res.json['uid'])
# First, let's verify that the token we received is valid. To do this,
# we can unpack the hawk header ID into the payload and its signature
# and then construct a tokenlib token to compute the signature
# ourselves. To obtain a matching signature, we use the same secret as
# is used by Tokenserver.
raw = urlsafe_b64decode(res.json['id'])
payload = raw[:-32]
signature = raw[-32:]
payload_str = payload.decode('utf-8')
payload_dict = json.loads(payload_str)
# The `id` payload should include a field indicating the origin of the
# token
self.assertEqual(payload_dict['tokenserver_origin'], 'rust')
signing_secret = self.TOKEN_SIGNING_SECRET
tm = tokenlib.TokenManager(secret=signing_secret)
expected_signature = tm._get_signature(payload_str.encode('utf8'))
# Using the #compare_digest method here is not strictly necessary, as
# this is not a security-sensitive situation, but it's good practice
self.assertTrue(hmac.compare_digest(expected_signature, signature))
# Check that the given key is a secret derived from the hawk ID
expected_secret = tokenlib.get_derived_secret(res.json['id'],
secret=signing_secret)
self.assertEqual(res.json['key'], expected_secret)
# Check to make sure the remainder of the fields are valid
self.assertEqual(res.json['uid'], user['uid'])
self.assertEqual(res.json['api_endpoint'],
'%s/1.5/%s' % (self.NODE_URL, user['uid']))
self.assertEqual(res.json['duration'], DEFAULT_TOKEN_DURATION)
self.assertEqual(res.json['hashalg'], 'sha256')
self.assertEqual(res.json['hashed_fxa_uid'],
self._fxa_metrics_hash(fxa_uid)[:32])
self.assertEqual(res.json['node_type'], 'spanner')
# The response should have an X-Timestamp header that contains the
# number of seconds since the UNIX epoch
self.assertIn('X-Timestamp', res.headers)
self.assertIsNotNone(int(res.headers['X-Timestamp']))
token = self.unsafelyParseToken(res.json['id'])
self.assertIn('hashed_device_id', token)
self.assertEqual(token["uid"], res.json["uid"])
self.assertEqual(token["fxa_uid"], fxa_uid)
self.assertEqual(token["fxa_kid"], "0000000001234-qqo")
self.assertNotEqual(token["hashed_fxa_uid"], token["fxa_uid"])
self.assertEqual(token["hashed_fxa_uid"], res.json["hashed_fxa_uid"])
self.assertIn("hashed_device_id", token)