test: Add Tokenserver integration tests (#1152)

Closes #1048
This commit is contained in:
Ethan Donowitz 2021-10-05 16:59:23 -04:00 committed by GitHub
parent 17f89ac5f0
commit 7209ccf551
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1386 additions and 26 deletions

View File

@ -187,13 +187,18 @@ impl ApiError {
}
pub fn render_404<B>(res: ServiceResponse<B>) -> Result<ErrorHandlerResponse<B>> {
// Replace the outbound error message with our own.
let resp =
HttpResponseBuilder::new(StatusCode::NOT_FOUND).json(WeaveError::UnknownError as u32);
Ok(ErrorHandlerResponse::Response(ServiceResponse::new(
res.request().clone(),
resp.into_body(),
)))
if res.request().path().starts_with("/1.0/") {
// Do not use a custom response for Tokenserver requests.
Ok(ErrorHandlerResponse::Response(res))
} else {
// Replace the outbound error message with our own for Sync requests.
let resp = HttpResponseBuilder::new(StatusCode::NOT_FOUND)
.json(WeaveError::UnknownError as u32);
Ok(ErrorHandlerResponse::Response(ServiceResponse::new(
res.request().clone(),
resp.into_body(),
)))
}
}
}

View File

@ -141,7 +141,7 @@ macro_rules! build_app {
// XXX: This route will be enabled when we are ready to roll out Tokenserver
// Tokenserver
// .service(
// web::resource("/1.0/{application}/{version}".to_string())
// web::resource("/1.0/{application}/{version}")
// .route(web::get().to(tokenserver::handlers::get_tokenserver_result)),
// )
// Dockerflow

View File

@ -29,7 +29,7 @@ impl Default for TokenserverError {
impl TokenserverError {
pub fn invalid_generation() -> Self {
TokenserverError {
Self {
status: "invalid-generation",
location: ErrorLocation::Body,
..Self::default()
@ -37,7 +37,7 @@ impl TokenserverError {
}
pub fn invalid_keys_changed_at() -> Self {
TokenserverError {
Self {
status: "invalid-keysChangedAt",
location: ErrorLocation::Body,
..Self::default()
@ -45,7 +45,7 @@ impl TokenserverError {
}
pub fn invalid_key_id(description: &'static str) -> Self {
TokenserverError {
Self {
status: "invalid-key-id",
description,
..Self::default()
@ -87,6 +87,14 @@ impl TokenserverError {
..Self::default()
}
}
pub fn unauthorized(description: &'static str) -> Self {
Self {
status: "error",
description,
..Self::default()
}
}
}
#[derive(Clone, Copy, Debug)]

View File

@ -179,7 +179,7 @@ impl FromRequest for TokenData {
// The request must use Bearer auth
if let Some((auth_type, _)) = authorization_header.split_once(" ") {
if auth_type.to_ascii_lowercase() != "bearer" {
return Err(TokenserverError::invalid_credentials("Unsupported").into());
return Err(TokenserverError::unauthorized("Unsupported").into());
}
}

View File

@ -21,9 +21,10 @@ use crate::{
pub struct TokenserverResult {
id: String,
key: String,
uid: String,
uid: i64,
api_endpoint: String,
duration: u64,
hashed_fxa_uid: String,
}
pub async fn get_tokenserver_result(
@ -110,9 +111,10 @@ pub async fn get_tokenserver_result(
let result = TokenserverResult {
id: token,
key: derived_secret,
uid: tokenserver_request.fxa_uid,
uid: tokenserver_user.uid,
api_endpoint,
duration: tokenserver_request.duration,
hashed_fxa_uid: hashed_fxa_uid.to_owned(),
};
Ok(HttpResponse::build(StatusCode::OK).json(result))

View File

@ -0,0 +1,2 @@
ALTER TABLE `nodes` ADD CONSTRAINT `nodes_ibfk_1` FOREIGN KEY (`service`) REFERENCES `services` (`id`);
ALTER TABLE `users` ADD CONSTRAINT `users_ibfk_1` FOREIGN KEY (`nodeid`) REFERENCES `nodes` (`id`);

View File

@ -0,0 +1,2 @@
ALTER TABLE `nodes` DROP FOREIGN KEY `nodes_ibfk_1`;
ALTER TABLE `users` DROP FOREIGN KEY `users_ibfk_1`;

View File

@ -0,0 +1,5 @@
ALTER TABLE `nodes` ALTER `available` SET DEFAULT 0;
ALTER TABLE `nodes` ALTER `current_load` SET DEFAULT 0;
ALTER TABLE `nodes` ALTER `capacity` SET DEFAULT 0;
ALTER TABLE `nodes` ALTER `downed` SET DEFAULT 0;
ALTER TABLE `nodes` ALTER `backoff` SET DEFAULT 0;

View File

@ -0,0 +1,5 @@
ALTER TABLE `nodes` ALTER `available` DROP DEFAULT;
ALTER TABLE `nodes` ALTER `current_load` DROP DEFAULT;
ALTER TABLE `nodes` ALTER `capacity` DROP DEFAULT;
ALTER TABLE `nodes` ALTER `downed` DROP DEFAULT;
ALTER TABLE `nodes` ALTER `backoff` DROP DEFAULT;

View File

@ -0,0 +1,4 @@
ALTER TABLE `nodes` DROP INDEX `unique_idx`;
ALTER TABLE `users` DROP INDEX `lookup_idx`;
ALTER TABLE `users` DROP INDEX `replaced_at_idx`;
ALTER TABLE `users` DROP INDEX `node_idx`;

View File

@ -0,0 +1,4 @@
ALTER TABLE `nodes` ADD UNIQUE KEY `unique_idx` (`service`, `node`);
ALTER TABLE `users` ADD INDEX `lookup_idx` (`email`, `service`, `created_at`);
ALTER TABLE `users` ADD INDEX `replaced_at_idx` (`service`, `replaced_at`);
ALTER TABLE `users` ADD INDEX `node_idx` (`nodeid`);

View File

@ -0,0 +1 @@
ALTER TABLE `nodes` ADD KEY `service` (`service`);

View File

@ -0,0 +1 @@
ALTER TABLE `nodes` DROP KEY `service`;

View File

@ -0,0 +1 @@
ALTER TABLE `users` ADD KEY `nodeid` (`nodeid`);

View File

@ -0,0 +1 @@
ALTER TABLE `users` DROP KEY `nodeid`;

View File

@ -2,12 +2,14 @@ from fxa.oauth import Client
from fxa.errors import ClientError, TrustError
import json
DEFAULT_OAUTH_SCOPE = 'https://identity.mozilla.com/apps/oldsync'
def verify_token(token, server_url=None):
client = Client(server_url=server_url)
try:
token_data = client.verify_token(token)
token_data = client.verify_token(token, DEFAULT_OAUTH_SCOPE)
# Serialize the data to make it easier to parse in Rust
return json.dumps(token_data)

View File

@ -1,9 +1,14 @@
hawkauthlib
konfig
mysqlclient
psutil
pyjwt
pyramid
pyramid_hawkauth
pyfxa
requests
simplejson
sqlalchemy
tokenlib
unittest2
webtest

View File

@ -2,16 +2,26 @@
import atexit
import os.path
import psutil
import signal
import subprocess
import sys
from test_storage import TestStorage
from test_support import run_live_functional_tests
import time
DEBUG_BUILD = "target/debug/syncstorage"
RELEASE_BUILD = "/app/bin/syncstorage"
def terminate_process(process):
proc = psutil.Process(pid=process.pid)
child_proc = proc.children(recursive=True)
for p in [proc] + child_proc:
os.kill(p.pid, signal.SIGTERM)
process.wait()
if __name__ == "__main__":
# When run as a script, this file will execute the
# functional tests against a live webserver.
@ -25,18 +35,21 @@ if __name__ == "__main__":
"Neither target/debug/syncstorage \
nor /app/bin/syncstorage were found."
)
the_server_subprocess = subprocess.Popen(
"SYNC_MASTER_SECRET=secret0 " + target_binary, shell=True
)
# TODO we should change this to watch for a log message on startup
# to know when to continue instead of sleeping for a fixed amount
time.sleep(20)
def stop_subprocess():
the_server_subprocess.terminate()
the_server_subprocess.wait()
def start_server():
the_server_subprocess = subprocess.Popen(target_binary,
shell=True,
env=os.environ)
atexit.register(stop_subprocess)
# TODO we should change this to watch for a log message on startup
# to know when to continue instead of sleeping for a fixed amount
time.sleep(20)
return the_server_subprocess
os.environ.setdefault('SYNC_MASTER_SECRET', 'secret0')
the_server_subprocess = start_server()
atexit.register(lambda: terminate_process(the_server_subprocess))
res = run_live_functional_tests(TestStorage, sys.argv)
sys.exit(res)

View File

@ -0,0 +1,33 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import unittest
from tokenserver.test_authorization import TestAuthorization
from tokenserver.test_e2e import TestE2e
from tokenserver.test_misc import TestMisc
from tokenserver.test_node_assignment import TestNodeAssignment
def run_local_tests():
return run_tests([TestAuthorization, TestMisc, TestNodeAssignment])
def run_end_to_end_tests():
return run_tests([TestE2e])
def run_tests(test_cases):
loader = unittest.TestLoader()
success = True
for test_case in test_cases:
suite = loader.loadTestsFromTestCase(test_case)
runner = unittest.TextTestRunner()
res = runner.run(suite)
success = success and res.wasSuccessful()
if success:
return 0
else:
return 1

View File

@ -0,0 +1,519 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import unittest
from tokenserver.test_support import TestCase
class TestAuthorization(TestCase, unittest.TestCase):
def setUp(self):
super(TestAuthorization, self).setUp()
def tearDown(self):
super(TestAuthorization, self).tearDown()
def test_unauthorized_error_status(self):
# Totally busted auth -> generic error.
headers = {'Authorization': 'Unsupported-Auth-Scheme IHACKYOU'}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'errors': [
{
'description': 'Unsupported',
'location': 'body',
'name': ''
}
],
'status': 'error'
}
self.assertEqual(res.json, expected_error_response)
def test_no_auth(self):
self.app.get('/1.0/sync/1.5', status=401)
def test_invalid_client_state(self):
headers = {'X-KeyID': '1234-state!'}
resp = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'status': 'error',
'errors': [
{
'location': 'body',
'name': '',
'description': 'Unauthorized'
}
]
}
self.assertEqual(resp.json, expected_error_response)
def test_keys_changed_at_less_than_equal_to_generation(self):
self._add_user(generation=1232, keys_changed_at=1234)
# If keys_changed_at changes, that change must be less than or equal
# the new generation
oauth_token = self._forge_oauth_token(generation=1235)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1236-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'status': 'invalid-keysChangedAt',
'errors': [
{
'location': 'body',
'name': '',
'description': 'Unauthorized'
}
]
}
self.assertEqual(res.json, expected_error_response)
# If the keys_changed_at on the request matches that currently stored
# on the user record, it does not need to be less than or equal to the
# generation on the request
oauth_token = self._forge_oauth_token(generation=1233)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
self.app.get('/1.0/sync/1.5', headers=headers)
# A request with no generation is acceptable
oauth_token = self._forge_oauth_token(generation=None)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1235-YWFh'
}
self.app.get('/1.0/sync/1.5', headers=headers)
# A request with a keys_changed_at less than the new generation
# is acceptable
oauth_token = self._forge_oauth_token(generation=1236)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1235-YWFh'
}
self.app.get('/1.0/sync/1.5', headers=headers)
def test_disallow_reusing_old_client_state(self):
# Add a user record that has already been replaced
self._add_user(client_state='616161', replaced_at=1200)
# Add the most up-to-date user record
self._add_user(client_state='626262')
# A request cannot use a client state associated with a replaced user
oauth_token = self._forge_oauth_token()
# (Note that YWFh is base64 for 'aaa', which is 0x616161 in hex)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'status': 'invalid-client-state',
'errors': [
{
'location': 'header',
'name': 'X-Client-State',
'description': 'Unacceptable client-state value stale '
'value'
}
]
}
self.assertEqual(res.json, expected_error_response)
# Using the last-seen client state is okay
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YmJi'
}
res1 = self.app.get('/1.0/sync/1.5', headers=headers)
# Using a new client state (with an updated generation and
# keys_changed_at) is okay
oauth_token = self._forge_oauth_token(generation=1235)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1235-Y2Nj'
}
res2 = self.app.get('/1.0/sync/1.5', headers=headers)
# This results in the creation of a new user record
self.assertNotEqual(res1.json['uid'], res2.json['uid'])
def test_generation_change_must_accompany_client_state_change(self):
self._add_user(generation=1234, client_state='616161')
# A request with a new client state must also contain a new generation
oauth_token = self._forge_oauth_token(generation=1234)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YmJi'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'status': 'invalid-client-state',
'errors': [
{
'location': 'header',
'name': 'X-Client-State',
'description': 'Unacceptable client-state value new '
'value with no generation change'
}
]
}
self.assertEqual(res.json, expected_error_response)
# A request with no generation is acceptable
oauth_token = self._forge_oauth_token(generation=None)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1235-YmJi'
}
self.app.get('/1.0/sync/1.5', headers=headers)
# We can't use a generation of 1235 when setting a new client state
# because the generation was set to be equal to the keys_changed_at
# in the previous request, which was 1235
oauth_token = self._forge_oauth_token(generation=1235)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1235-Y2Nj'
}
expected_error_response = {
'status': 'invalid-client-state',
'errors': [
{
'location': 'header',
'name': 'X-Client-State',
'description': 'Unacceptable client-state value new '
'value with no generation change'
}
]
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
self.assertEqual(res.json, expected_error_response)
# A change in client state is acceptable only with a change in
# generation (if it is present)
oauth_token = self._forge_oauth_token(generation=1236)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1236-Y2Nj'
}
self.app.get('/1.0/sync/1.5', headers=headers)
def test_keys_changed_at_change_must_accompany_client_state_change(self):
self._add_user(generation=1234, keys_changed_at=1234,
client_state='616161')
# A request with a new client state must also contain a new
# keys_changed_at
oauth_token = self._forge_oauth_token(generation=1235)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YmJi'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'status': 'invalid-client-state',
'errors': [
{
'location': 'header',
'name': 'X-Client-State',
'description': 'Unacceptable client-state value new '
'value with no keys_changed_at change'
}
]
}
self.assertEqual(res.json, expected_error_response)
# A request with a new keys_changed_at is acceptable
oauth_token = self._forge_oauth_token(generation=1235)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1235-YmJi'
}
self.app.get('/1.0/sync/1.5', headers=headers)
def test_generation_must_not_be_less_than_last_seen_value(self):
uid = self._add_user(generation=1234)
# The generation in the request cannot be less than the generation
# currently stored on the user record
oauth_token = self._forge_oauth_token(generation=1233)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'status': 'invalid-generation',
'errors': [
{
'location': 'body',
'name': '',
'description': 'Unauthorized',
}
]
}
self.assertEqual(res.json, expected_error_response)
# A request with no generation is acceptable
oauth_token = self._forge_oauth_token(generation=None)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
self.app.get('/1.0/sync/1.5', headers=headers)
# A request with a generation equal to the last-seen generation is
# acceptable
oauth_token = self._forge_oauth_token(generation=1234)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
self.app.get('/1.0/sync/1.5', headers=headers)
# A request with a generation greater than the last-seen generation is
# acceptable
oauth_token = self._forge_oauth_token(generation=1235)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
# This should not result in the creation of a new user
self.assertEqual(res.json['uid'], uid)
def test_fxa_kid_change(self):
self._add_user(generation=1234, keys_changed_at=None,
client_state='616161')
# An OAuth client shows up, setting keys_changed_at.
# (The value matches generation number above, beause in this scenario
# FxA hasn't been updated to track and report keysChangedAt yet).
oauth_token = self._forge_oauth_token(generation=1234)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh',
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
token0 = self.unsafelyParseToken(res.json['id'])
# Reject keys_changed_at lower than the value previously seen
headers['X-KeyID'] = '1233-YWFh'
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'status': 'invalid-keysChangedAt',
'errors': [
{
'location': 'body',
'name': '',
'description': 'Unauthorized',
}
]
}
self.assertEqual(res.json, expected_error_response)
# Reject greater keys_changed_at with no corresponding update to
# generation
headers['X-KeyID'] = '2345-YmJi'
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
self.assertEqual(res.json, expected_error_response)
# Accept equal keys_changed_at
headers['X-KeyID'] = '1234-YWFh'
self.app.get('/1.0/sync/1.5', headers=headers)
# Accept greater keys_changed_at with new generation
headers['X-KeyID'] = '2345-YmJi'
oauth_token = self._forge_oauth_token(generation=2345)
headers['Authorization'] = 'Bearer %s' % oauth_token
res = self.app.get('/1.0/sync/1.5', headers=headers)
token = self.unsafelyParseToken(res.json['id'])
self.assertEqual(token['fxa_kid'], '0000000002345-YmJi')
self.assertNotEqual(token['uid'], token0['uid'])
self.assertEqual(token['node'], token0['node'])
def test_client_specified_duration(self):
self._add_user(generation=1234, keys_changed_at=1234,
client_state='616161')
oauth_token = self._forge_oauth_token(generation=1234)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh',
}
# It's ok to request a shorter-duration token.
res = self.app.get('/1.0/sync/1.5?duration=12', headers=headers)
self.assertEquals(res.json['duration'], 12)
# But you can't exceed the server's default value.
res = self.app.get('/1.0/sync/1.5?duration=4000', headers=headers)
self.assertEquals(res.json['duration'], 300)
# And nonsense values are ignored.
res = self.app.get('/1.0/sync/1.5?duration=lolwut', headers=headers)
self.assertEquals(res.json['duration'], 300)
res = self.app.get('/1.0/sync/1.5?duration=-1', headers=headers)
self.assertEquals(res.json['duration'], 300)
# Although all servers are now writing keys_changed_at, we still need this
# case to be handled. See this PR for more information:
# https://github.com/mozilla-services/tokenserver/pull/176
def test_kid_change_during_gradual_tokenserver_rollout(self):
# Let's start with a user already in the db, with no keys_changed_at.
uid = self._add_user(generation=1234, client_state='616161',
keys_changed_at=None)
user1 = self._get_user(uid)
# User hits updated tokenserver node, writing keys_changed_at to db.
oauth_token = self._forge_oauth_token(generation=1234)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1200-YWFh',
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
# That should not have triggered a node re-assignment.
user2 = self._get_user(res.json['uid'])
self.assertEqual(user1['uid'], user2['uid'])
self.assertEqual(user1['nodeid'], user2['nodeid'])
# That should have written keys_changed_at into the db.
self.assertEqual(user2['generation'], 1234)
self.assertEqual(user2['keys_changed_at'], 1200)
# User does a password reset on their Firefox Account.
oauth_token = self._forge_oauth_token(generation=2345)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '2345-YmJi',
}
# They sync again, but hit a tokenserver node that isn't updated yet.
# This would trigger the allocation of a new user, so we simulate this
# by adding a new user. We set keys_changed_at to be the last-used
# value, since we are simulating a server that doesn't pay attention
# to keys_changed_at.
uid = self._add_user(generation=2345, keys_changed_at=1200,
client_state='626262')
user2 = self._get_user(uid)
self.assertNotEqual(user1['uid'], user2['uid'])
self.assertEqual(user1['nodeid'], user2['nodeid'])
# They sync again, hitting an updated tokenserver node.
# This should succeed, despite keys_changed_at appearing to have
# changed without any corresponding change in generation number.
res = self.app.get('/1.0/sync/1.5', headers=headers)
# That should not have triggered a second user allocation.
user1 = user2
user2 = self._get_user(res.json['uid'])
self.assertEqual(user2['uid'], user1['uid'])
self.assertEqual(user2['nodeid'], user1['nodeid'])
def test_update_client_state(self):
uid = self._add_user(generation=0, keys_changed_at=None,
client_state='')
user1 = self._get_user(uid)
# The user starts out with no client_state
self.assertEqual(user1['generation'], 0)
self.assertEqual(user1['client_state'], '')
seen_uids = set((uid,))
orig_node = user1['nodeid']
# Changing client_state allocates a new user, resulting in a new uid
oauth_token = self._forge_oauth_token(generation=1234)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YmJi'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
user2 = self._get_user(res.json['uid'])
self.assertTrue(user2['uid'] not in seen_uids)
self.assertEqual(user2['nodeid'], orig_node)
self.assertEqual(user2['generation'], 1234)
self.assertEqual(user2['keys_changed_at'], 1234)
self.assertEqual(user2['client_state'], '626262')
seen_uids.add(user2['uid'])
# We can change the client state even if no generation is present on
# the request
oauth_token = self._forge_oauth_token(generation=None)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1235-Y2Nj'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
user3 = self._get_user(res.json['uid'])
self.assertTrue(user3['uid'] not in seen_uids)
self.assertEqual(user3['nodeid'], orig_node)
# When keys_changed_at changes and generation is not present on the
# request, generation is set to be the same as keys_changed_at
self.assertEqual(user3['generation'], 1235)
self.assertEqual(user3['keys_changed_at'], 1235)
self.assertEqual(user3['client_state'], '636363')
seen_uids.add(user3['uid'])
# We cannot change client_state without a change in keys_changed_at
oauth_token = self._forge_oauth_token(generation=None)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1235-ZGRk'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'status': 'invalid-client-state',
'errors': [
{
'location': 'header',
'name': 'X-Client-State',
'description': 'Unacceptable client-state value new '
'value with no keys_changed_at change'
}
]
}
self.assertEqual(expected_error_response, res.json)
# We cannot use a previously-used client_state
oauth_token = self._forge_oauth_token(generation=1236)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1236-YmJi'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'status': 'invalid-client-state',
'errors': [
{
'location': 'header',
'name': 'X-Client-State',
'description': 'Unacceptable client-state value stale '
'value'
}
]
}
self.assertEqual(expected_error_response, res.json)
def test_set_generation_from_no_generation(self):
# Add a user that has no generation set
uid = self._add_user(generation=0, keys_changed_at=None,
client_state='616161')
oauth_token = self._forge_oauth_token(generation=1234)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
# Send a request to set the generation
self.app.get('/1.0/sync/1.5', headers=headers)
user = self._get_user(uid)
# Ensure that the user had the correct generation set
self.assertEqual(user['generation'], 1234)
def test_set_keys_changed_at_from_no_keys_changed_at(self):
# Add a user that has no keys_changed_at set
uid = self._add_user(generation=1234, keys_changed_at=None,
client_state='616161')
oauth_token = self._forge_oauth_token(generation=1234)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
# Send a request to set the keys_changed_at
self.app.get('/1.0/sync/1.5', headers=headers)
user = self._get_user(uid)
# Ensure that the user had the correct generation set
self.assertEqual(user['keys_changed_at'], 1234)
def test_x_client_state_must_have_same_client_state_as_key_id(self):
self._add_user(client_state='616161')
headers = {
'Authorization': 'Bearer %s' % self._forge_oauth_token(),
'X-KeyID': '1234-YWFh',
'X-Client-State': '626262'
}
# If present, the X-Client-State header must have the same client
# state as the X-KeyID header
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'errors': [
{
'description': 'Unauthorized',
'location': 'body',
'name': ''
}
],
'status': 'invalid-client-state'
}
self.assertEqual(res.json, expected_error_response)
headers['X-Client-State'] = '616161'
res = self.app.get('/1.0/sync/1.5', headers=headers)

View File

@ -0,0 +1,208 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from base64 import urlsafe_b64decode
import hmac
import json
import jwt
import random
import string
import time
import tokenlib
import unittest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from fxa.tools.bearer import get_bearer_token
from fxa.core import Client
from fxa.oauth import Client as OAuthClient
from fxa.tests.utils import TestEmailAccount
from hashlib import sha256
from tokenserver.test_support import TestCase
# This is the client ID used for Firefox Desktop. The FxA team confirmed that
# this is the proper client ID to be using for these integration tests.
CLIENT_ID = '5882386c6d801776'
DEFAULT_TOKEN_DURATION = 300
FXA_ACCOUNT_STAGE_HOST = 'https://api-accounts.stage.mozaws.net'
FXA_OAUTH_STAGE_HOST = 'https://oauth.stage.mozaws.net'
PASSWORD_CHARACTERS = string.ascii_letters + string.punctuation + string.digits
PASSWORD_LENGTH = 32
SCOPE = 'https://identity.mozilla.com/apps/oldsync'
class TestE2e(TestCase, unittest.TestCase):
def setUp(self):
super(TestE2e, self).setUp()
def tearDown(self):
super(TestE2e, self).tearDown()
@classmethod
def setUpClass(cls):
# Create an ephemeral email account to use to create an FxA account
cls.acct = TestEmailAccount()
cls.client = Client(FXA_ACCOUNT_STAGE_HOST)
cls.oauth_client = OAuthClient(CLIENT_ID, None,
server_url=FXA_OAUTH_STAGE_HOST)
cls.fxa_password = cls._generate_password()
# Create an FxA account for these end-to-end tests
cls.session = cls.client.create_account(cls.acct.email,
password=cls.fxa_password)
# Loop until we receive the verification email from FxA
while not cls.acct.messages:
time.sleep(0.5)
cls.acct.fetch()
# Find the message containing the verification code and verify the
# code
for m in cls.acct.messages:
if 'x-verify-code' in m['headers']:
cls.session.verify_email_code(m['headers']['x-verify-code'])
# Create an OAuth token to be used for the end-to-end tests
cls.oauth_token = cls.oauth_client.authorize_token(cls.session, SCOPE)
@classmethod
def tearDownClass(cls):
cls.acct.clear()
cls.client.destroy_account(cls.acct.email, cls.fxa_password)
@staticmethod
def _generate_password():
r = range(PASSWORD_LENGTH)
return ''.join(random.choice(PASSWORD_CHARACTERS) for i in r)
def _get_token_with_bad_scope(self):
bad_scope = 'bad_scope'
return get_bearer_token(TestE2e.acct.email,
TestE2e.fxa_password,
scopes=[bad_scope],
account_server_url=FXA_ACCOUNT_STAGE_HOST,
oauth_server_url=FXA_OAUTH_STAGE_HOST,
client_id=CLIENT_ID)
def _get_bad_token(self):
key = rsa.generate_private_key(backend=default_backend(),
public_exponent=65537,
key_size=2048)
format = serialization.PrivateFormat.TraditionalOpenSSL
algorithm = serialization.NoEncryption()
pem = key.private_bytes(encoding=serialization.Encoding.PEM,
format=format,
encryption_algorithm=algorithm)
private_key = pem.decode('utf-8')
claims = {
'sub': 'fake sub',
'iat': 12345,
'exp': 12345,
}
return jwt.encode(claims, private_key, algorithm='RS256')
@classmethod
def _change_password(cls):
new_password = cls._generate_password()
cls.session.change_password(cls.fxa_password, new_password)
cls.fxa_password = new_password
# Refresh the session
cls.session = cls.client.login(cls.acct.email, cls.fxa_password)
# Refresh the OAuth token
cls.oauth_token = cls.oauth_client.authorize_token(cls.session, SCOPE)
# Adapted from the original Tokenserver:
# https://github.com/mozilla-services/tokenserver/blob/master/tokenserver/util.py#L24
def _fxa_metrics_hash(self, value):
hasher = hmac.new(self.FXA_METRICS_HASH_SECRET.encode('utf-8'), b'',
sha256)
hasher.update(value.encode('utf-8'))
return hasher.hexdigest()
def test_unauthorized_error_status(self):
# Totally busted auth -> generic error.
headers = {
'Authorization': 'Unsupported-Auth-Scheme IHACKYOU',
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'errors': [
{
'description': 'Unsupported',
'location': 'body',
'name': ''
}
],
'status': 'error'
}
self.assertEqual(res.json, expected_error_response)
token = self._get_bad_token()
headers = {
'Authorization': 'Bearer %s' % token,
'X-KeyID': '1234-YWFh'
}
# Bad token -> 'invalid-credentials'
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
expected_error_response = {
'errors': [
{
'description': 'Unauthorized',
'location': 'body',
'name': ''
}
],
'status': 'invalid-credentials'
}
self.assertEqual(res.json, expected_error_response)
# Untrusted scopes -> 'invalid-credentials'
token = self._get_token_with_bad_scope()
headers = {
'Authorization': 'Bearer %s' % token,
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers, status=401)
self.assertEqual(res.json, expected_error_response)
def test_valid_request(self):
oauth_token = self.oauth_token
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
# Send a valid request, allocating a new user
res = self.app.get('/1.0/sync/1.5', headers=headers)
fxa_uid = self.session.uid
# Retrieve the user from the database
user = self._get_user(res.json['uid'])
# First, let's verify that the token we received is valid. To do this,
# we can unpack the hawk header ID into the payload and its signature
# and then construct a tokenlib token to compute the signature
# ourselves. To obtain a matching signature, we use the same secret as
# is used by Tokenserver.
raw = urlsafe_b64decode(res.json['id'])
payload = raw[:-32]
signature = raw[-32:]
payload_dict = json.loads(payload.decode('utf-8'))
signing_secret = self.TOKEN_SIGNING_SECRET
expected_token = tokenlib.make_token(payload_dict,
secret=signing_secret)
expected_signature = urlsafe_b64decode(expected_token)[-32:]
# Using the #compare_digest method here is not strictly necessary, as
# this is not a security-sensitive situation, but it's good practice
self.assertTrue(hmac.compare_digest(expected_signature, signature))
# Check that the given key is a secret derived from the hawk ID
expected_secret = tokenlib.get_derived_secret(res.json['id'],
secret=signing_secret)
self.assertEqual(res.json['key'], expected_secret)
# Check to make sure the remainder of the fields are valid
self.assertEqual(res.json['uid'], user['uid'])
self.assertEqual(res.json['api_endpoint'],
'%s/1.5/%s' % (self.NODE_URL, user['uid']))
self.assertEqual(res.json['duration'], DEFAULT_TOKEN_DURATION)
self.assertEqual(res.json['hashalg'], 'sha256')
self.assertEqual(res.json['hashed_fxa_uid'],
self._fxa_metrics_hash(fxa_uid)[:32])
self.assertEqual(res.json['node_type'], 'example')

View File

@ -0,0 +1,173 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import unittest
from tokenserver.test_support import TestCase
class TestMisc(TestCase, unittest.TestCase):
def setUp(self):
super(TestMisc, self).setUp()
def tearDown(self):
super(TestMisc, self).tearDown()
def test_unknown_app(self):
headers = {
'Authorization': 'Bearer %s' % self._forge_oauth_token(),
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/xXx/token', headers=headers, status=404)
expected_error_response = {
'errors': [
{
'description': 'Unsupported application',
'location': 'url',
'name': 'application'
}
],
'status': 'error'
}
self.assertEqual(res.json, expected_error_response)
def test_unknown_version(self):
headers = {
'Authorization': 'Bearer %s' % self._forge_oauth_token(),
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.2', headers=headers, status=404)
expected_error_response = {
'errors': [
{
'description': 'Unsupported application version',
'location': 'url',
'name': '1.2'
}
],
'status': 'error'
}
self.assertEqual(res.json, expected_error_response)
def test_valid_app(self):
self._add_user()
headers = {
'Authorization': 'Bearer %s' % self._forge_oauth_token(),
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
self.assertIn('https://example.com/1.5', res.json['api_endpoint'])
self.assertIn('duration', res.json)
self.assertEquals(res.json['duration'], 300)
def test_current_user_is_the_most_up_to_date(self):
# Add some users
self._add_user(generation=1234, created_at=1234)
self._add_user(generation=1235, created_at=1234)
self._add_user(generation=1234, created_at=1235)
uid = self._add_user(generation=1236, created_at=1233)
# Users are sorted by (generation, created_at), so the fourth user
# record is considered to be the current user
oauth_token = self._forge_oauth_token(generation=1236)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
self.assertEqual(res.json['uid'], uid)
def test_user_creation_when_most_current_user_is_replaced(self):
# Add some users
uid1 = self._add_user(generation=1234, created_at=1234)
uid2 = self._add_user(generation=1235, created_at=1235)
uid3 = self._add_user(generation=1236, created_at=1236,
replaced_at=1237)
seen_uids = [uid1, uid2, uid3]
# Because the current user (the one with uid3) has been replaced, a new
# user record is created
oauth_token = self._forge_oauth_token(generation=1237)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1237-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
self.assertNotIn(res.json['uid'], seen_uids)
def test_old_users_marked_as_replaced_in_race_recovery(self):
# Add some users
uid1 = self._add_user(generation=1234, created_at=1234)
uid2 = self._add_user(generation=1235, created_at=1235)
uid3 = self._add_user(generation=1236, created_at=1240)
# Make a request
oauth_token = self._forge_oauth_token(generation=1236)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1236-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
# uid3 is associated with the current user
self.assertEqual(res.json['uid'], uid3)
# The users associated with uid1 and uid2 have replaced_at set to be
# equal to created_at on the current user record
user1 = self._get_user(uid1)
user2 = self._get_user(uid2)
self.assertEqual(user1['replaced_at'], 1240)
self.assertEqual(user2['replaced_at'], 1240)
def test_user_updates_with_new_client_state(self):
# Start with a single user in the database
uid = self._add_user(generation=1234, keys_changed_at=1234,
client_state='616161')
# Send a request, updating the generation, keys_changed_at, and
# client_state
oauth_token = self._forge_oauth_token(generation=1235)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1235-YmJi'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
# A new user should have been created
self.assertEqual(self._count_users(), 2)
self.assertNotEqual(uid, res.json['uid'])
# The new user record should have the updated generation,
# keys_changed_at, and client_state
user = self._get_user(res.json['uid'])
self.assertEqual(user['generation'], 1235)
self.assertEqual(user['keys_changed_at'], 1235)
self.assertEqual(user['client_state'], '626262')
# The old user record should not have the updated values
user = self._get_user(uid)
self.assertEqual(user['generation'], 1234)
self.assertEqual(user['keys_changed_at'], 1234)
self.assertEqual(user['client_state'], '616161')
# Get all the replaced users
email = 'test@%s' % self.FXA_EMAIL_DOMAIN
replaced_users = self._get_replaced_users(self.SYNC_1_5_SERVICE_ID,
email)
# Only one user should be replaced
self.assertEqual(len(replaced_users), 1)
# The replaced user record should have the old generation,
# keys_changed_at, and client_state
replaced_user = replaced_users[0]
self.assertEqual(replaced_user['generation'], 1234)
self.assertEqual(replaced_user['keys_changed_at'], 1234)
self.assertEqual(replaced_user['client_state'], '616161')
def test_user_updates_with_same_client_state(self):
# Start with a single user in the database
uid = self._add_user(generation=1234, keys_changed_at=1234)
# Send a request, updating the generation and keys_changed_at but not
# the client state
oauth_token = self._forge_oauth_token(generation=1235)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1235-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
# A new user should not have been created
self.assertEqual(self._count_users(), 1)
self.assertEqual(uid, res.json['uid'])
# The user record should have been updated
user = self._get_user(uid)
self.assertEqual(user['generation'], 1235)
self.assertEqual(user['keys_changed_at'], 1235)

View File

@ -0,0 +1,139 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import unittest
from tokenserver.test_support import TestCase
class TestNodeAssignment(TestCase, unittest.TestCase):
def setUp(self):
super(TestNodeAssignment, self).setUp()
def tearDown(self):
super(TestNodeAssignment, self).tearDown()
def test_user_creation(self):
# Add a few more nodes
self._add_node(available=0, node='https://node1')
self._add_node(available=1, node='https://node2')
self._add_node(available=5, node='https://node3')
# Send a request from an unseen user
oauth_token = self._forge_oauth_token(generation=1234)
headers = {
'Authorization': 'Bearer %s' % oauth_token,
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
# Ensure a single user was created
self.assertEqual(self._count_users(), 1)
# Ensure the user has the correct attributes
user1 = self._get_user(res.json['uid'])
self.assertEqual(user1['generation'], 1234)
self.assertEqual(user1['keys_changed_at'], 1234)
self.assertEqual(user1['client_state'], '616161')
self.assertEqual(user1['nodeid'], self.NODE_ID)
self.assertEqual(user1['service'], self.SYNC_1_5_SERVICE_ID)
# Ensure the 'available' and 'current_load' counts on the node
# assigned to the user have been decremented appropriately
node = self._get_node(self.NODE_ID)
self.assertEqual(node['available'], 99)
self.assertEqual(node['current_load'], 1)
# Send a request from the same user
self.app.get('/1.0/sync/1.5', headers=headers)
# Ensure another user record was not created
self.assertEqual(self._count_users(), 1)
def test_new_user_allocation(self):
# Start with a clean database
cursor = self._execute_sql('DELETE FROM nodes', ())
cursor.close()
self._add_node(available=100, current_load=0, capacity=100, backoff=1,
node='https://node1')
self._add_node(available=100, current_load=0, capacity=100, downed=1,
node='https://node2')
node_id = self._add_node(available=99, current_load=1, capacity=100,
node='https://node3')
self._add_node(available=98, current_load=2, capacity=100,
node='https://node4')
self._add_node(available=97, current_load=3, capacity=100,
node='https://node5')
headers = {
'Authorization': 'Bearer %s' % self._forge_oauth_token(),
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
# The user should have been allocated to the least-loaded node
# (computed as current_load / capacity) that has backoff and downed
# set to 0
user = self._get_user(res.json['uid'])
self.assertEqual(user['nodeid'], node_id)
# The selected node should have current_load incremented and available
# decremented
node = self._get_node(node_id)
self.assertEqual(node['current_load'], 2)
self.assertEqual(node['available'], 98)
def test_successfully_releasing_node_capacity(self):
# Start with a clean database
cursor = self._execute_sql('DELETE FROM nodes', ())
cursor.close()
node_id1 = self._add_node(available=0, current_load=99, capacity=100,
node='https://node1')
node_id2 = self._add_node(available=0, current_load=90, capacity=100,
node='https://node2')
node_id3 = self._add_node(available=0, current_load=80, capacity=81,
node='https://node3')
node_id4 = self._add_node(available=0, current_load=70, capacity=71,
node='https://node4', backoff=1)
node_id5 = self._add_node(available=0, current_load=60, capacity=61,
node='https://node5', downed=1)
headers = {
'Authorization': 'Bearer %s' % self._forge_oauth_token(),
'X-KeyID': '1234-YWFh'
}
res = self.app.get('/1.0/sync/1.5', headers=headers)
# Since every node has no available spots, capacity is added to each
# node according to the equation
# min(capacity*capacity_release_rate, capacity - current_load). Since
# capacity - current_load is 0 for every node, the node with the
# greatest capacity is chosen
user = self._get_user(res.json['uid'])
self.assertEqual(user['nodeid'], node_id2)
# min(100 * 0.1, 100 - 99) = 1
node1 = self._get_node(node_id1)
self.assertEqual(node1['available'], 1)
# min(100 * 0.1, 100 - 90) = 10, and this is the node to which the
# user was assigned, so the final available count is 9
node2 = self._get_node(node_id2)
self.assertEqual(node2['available'], 9)
# min(81 * 0.1, 81 - 80) = 1
node3 = self._get_node(node_id3)
self.assertEqual(node3['available'], 1)
# min(100 * 0.1, 71 - 70) = 1
node4 = self._get_node(node_id4)
self.assertEqual(node4['available'], 1)
# Nodes with downed set to 1 do not have their availability updated
node5 = self._get_node(node_id5)
self.assertEqual(node5['available'], 0)
def test_unsuccessfully_releasing_node_capacity(self):
# Start with a clean database
cursor = self._execute_sql('DELETE FROM nodes', ())
cursor.close()
self._add_node(available=0, current_load=100, capacity=100,
node='https://node1')
self._add_node(available=0, current_load=90, capacity=90,
node='https://node2')
self._add_node(available=0, current_load=80, capacity=80,
node='https://node3')
headers = {
'Authorization': 'Bearer %s' % self._forge_oauth_token(),
'X-KeyID': '1234-YWFh'
}
# All of these nodes are completely full, and no capacity can be
# released
self.app.get('/1.0/sync/1.5', headers=headers, status=503)

View File

@ -0,0 +1,227 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from base64 import urlsafe_b64encode as b64encode
import json
import os
import math
import time
import urllib.parse as urlparse
from sqlalchemy import create_engine
from tokenlib.utils import decode_token_bytes
from webtest import TestApp
class TestCase:
FXA_EMAIL_DOMAIN = 'api-accounts.stage.mozaws.net'
FXA_METRICS_HASH_SECRET = 'secret'
NODE_ID = 800
NODE_URL = 'https://example.com'
SYNC_1_1_SERVICE_ID = 1
SYNC_1_5_SERVICE_ID = 2
SYNC_1_5_SERVICE_NAME = 'sync-1.5'
TOKEN_SIGNING_SECRET = 'secret'
TOKENSERVER_HOST = os.environ['TOKENSERVER_HOST']
def setUp(self):
engine = create_engine(os.environ['SYNC_TOKENSERVER__DATABASE_URL'])
self.database = engine. \
execution_options(isolation_level="AUTOCOMMIT"). \
connect()
host_url = urlparse.urlparse(self.TOKENSERVER_HOST)
self.app = TestApp(self.TOKENSERVER_HOST, extra_environ={
'HTTP_HOST': host_url.netloc,
'wsgi.url_scheme': host_url.scheme or 'http',
'SERVER_NAME': host_url.hostname,
'REMOTE_ADDR': '127.0.0.1',
'SCRIPT_NAME': host_url.path,
})
# Start each test with a blank slate.
cursor = self._execute_sql(('DELETE FROM users'), ())
cursor.close()
cursor = self._execute_sql(('DELETE FROM nodes'), ())
cursor.close()
cursor = self._execute_sql(('DELETE FROM services'), ())
cursor.close()
# Ensure the necessary services exists in the db.
self._add_service('sync-1.1', '{node}/1.1/{uid}',
self.SYNC_1_1_SERVICE_ID)
self._add_service('sync-1.5', '{node}/1.5/{uid}',
self.SYNC_1_5_SERVICE_ID)
# Ensure we have a node with enough capacity to run the tests.
self._add_node(capacity=100, node=self.NODE_URL, id=self.NODE_ID)
def tearDown(self):
# And clean up at the end, for good measure.
cursor = self._execute_sql(('DELETE FROM users'), ())
cursor.close()
cursor = self._execute_sql(('DELETE FROM nodes'), ())
cursor.close()
cursor = self._execute_sql(('DELETE FROM services'), ())
cursor.close()
self.database.close()
def _forge_oauth_token(self, generation=None, sub='test', scope='scope'):
claims = {
'fxa-generation': generation,
'sub': sub,
'client_id': 'client ID',
'scope': scope,
'fxa-profileChangedAt': None
}
header = b64encode(b'{}').strip(b'=').decode('utf-8')
claims = b64encode(json.dumps(claims).encode('utf-8')) \
.strip(b'=').decode('utf-8')
signature = b64encode(b'signature').strip(b'=').decode('utf-8')
return '%s.%s.%s' % (header, claims, signature)
def _add_node(self, service=SYNC_1_5_SERVICE_NAME, capacity=100,
available=100, node=NODE_URL, id=None, current_load=0,
backoff=0, downed=0):
service_id = self._get_service_id(service)
query = 'INSERT INTO nodes (service, node, available, capacity, \
current_load, backoff, downed'
data = (service_id, node, available, capacity, current_load, backoff,
downed)
if id:
query += ', id) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)'
data += (id,)
else:
query += ') VALUES(%s, %s, %s, %s, %s, %s, %s)'
cursor = self._execute_sql(query, data)
cursor.close()
return self._last_insert_id()
def _get_node(self, id):
query = 'SELECT * FROM nodes WHERE id=%s'
cursor = self._execute_sql(query, (id,))
(id, service, node, available, current_load, capacity, downed,
backoff) = cursor.fetchone()
cursor.close()
return {
'id': id,
'service': service,
'node': node,
'available': available,
'current_load': current_load,
'capacity': capacity,
'downed': downed,
'backoff': backoff
}
def _last_insert_id(self):
cursor = self._execute_sql('SELECT LAST_INSERT_ID()', ())
(id,) = cursor.fetchone()
cursor.close()
return id
def _add_service(self, service_name, pattern, id):
query = 'INSERT INTO services (service, pattern, id) \
VALUES(%s, %s, %s)'
cursor = self._execute_sql(query, (service_name, pattern, id))
cursor.close()
def _add_user(self, service=SYNC_1_5_SERVICE_ID, email=None,
generation=1234, client_state='616161', created_at=None,
nodeid=NODE_ID, keys_changed_at=1234, replaced_at=None):
query = '''
INSERT INTO users (service, email, generation, client_state, \
created_at, nodeid, keys_changed_at, replaced_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
'''
created_at = created_at or math.trunc(time.time() * 1000)
cursor = self._execute_sql(query,
(service,
email or 'test@%s' % self.FXA_EMAIL_DOMAIN,
generation, client_state,
created_at, nodeid, keys_changed_at,
replaced_at))
cursor.close()
return self._last_insert_id()
def _get_user(self, uid):
query = 'SELECT * FROM users WHERE uid = %s'
cursor = self._execute_sql(query, (uid,))
(uid, service, email, generation, client_state, created_at,
replaced_at, nodeid, keys_changed_at) = cursor.fetchone()
cursor.close()
return {
'uid': uid,
'service': service,
'email': email,
'generation': generation,
'client_state': client_state,
'created_at': created_at,
'replaced_at': replaced_at,
'nodeid': nodeid,
'keys_changed_at': keys_changed_at
}
def _get_replaced_users(self, service_id, email):
query = 'SELECT * FROM users WHERE service = %s AND email = %s AND \
replaced_at IS NOT NULL'
cursor = self._execute_sql(query, (service_id, email))
users = []
for user in cursor.fetchall():
(uid, service, email, generation, client_state, created_at,
replaced_at, nodeid, keys_changed_at) = user
user_dict = {
'uid': uid,
'service': service,
'email': email,
'generation': generation,
'client_state': client_state,
'created_at': created_at,
'replaced_at': replaced_at,
'nodeid': nodeid,
'keys_changed_at': keys_changed_at
}
users.append(user_dict)
cursor.close()
return users
def _get_service_id(self, service):
query = 'SELECT id FROM services WHERE service = %s'
cursor = self._execute_sql(query, (service,))
(service_id,) = cursor.fetchone()
cursor.close()
return service_id
def _count_users(self):
query = 'SELECT COUNT(DISTINCT(uid)) FROM users'
cursor = self._execute_sql(query, ())
(count,) = cursor.fetchone()
cursor.close()
return count
def _execute_sql(self, query, args):
cursor = self.database.execute(query, args)
return cursor
def unsafelyParseToken(self, token):
# For testing purposes, don't check HMAC or anything...
return json.loads(decode_token_bytes(token)[:-32].decode('utf8'))