Taddes 6100529da7
Some checks failed
Glean probe-scraper / glean-probe-scraper (push) Has been cancelled
test: add max total records e2e test (#1796)
test: add max total records e2e test
2025-09-09 14:59:39 -04:00

2255 lines
94 KiB
Python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Functional tests for the SyncStorage server protocol.
This file runs tests to ensure the correct operation of the server
as specified in:
http://docs.services.mozilla.com/storage/apis-1.5.html
If there's an aspect of that spec that's not covered by a test in this file,
consider it a bug.
"""
import pytest
import re
import json
import time
import random
import string
import urllib
import webtest
import contextlib
# import math
import simplejson
from pyramid.interfaces import IAuthenticationPolicy
from webtest.app import AppError
from tools.integration_tests.test_support import StorageFunctionalTestCase
import tokenlib
class ConflictError(Exception):
pass
class BackendError(Exception):
pass
WEAVE_INVALID_WBO = 8 # Invalid Weave Basic Object
WEAVE_SIZE_LIMIT_EXCEEDED = 17 # Size limit exceeded
BATCH_MAX_IDS = 100
def get_limit_config(request, limit):
"""Get the configured value for the named size limit."""
return request.registry.settings["storage." + limit]
def json_dumps(value):
"""Decimal-aware version of json.dumps()."""
return simplejson.dumps(value, use_decimal=True)
def json_loads(value):
"""Decimal-aware version of json.loads()."""
return simplejson.loads(value, use_decimal=True)
_PLD = "*" * 500
_ASCII = string.ascii_letters + string.digits
def randtext(size=10):
return "".join([random.choice(_ASCII) for i in range(size)])
@pytest.mark.usefixtures("setup_server_local_testing")
class TestStorage(StorageFunctionalTestCase):
"""Storage testcases that only use the web API.
These tests are suitable for running against both in-process and live
external web servers.
"""
def setUp(self):
"""Call setUp, set API path for current user, delete
old root path with user for a clean slate in each test.
"""
super(TestStorage, self).setUp()
self.root = "/1.5/%d" % (self.user_id,)
self.retry_delete(self.root)
@contextlib.contextmanager
def _switch_user(self):
"""Allows for temporary switch url to another user id.
Context manager yields for duration of test and then
returns to original user, regardless of test result.
If unsuccessful, root url is retained.
"""
orig_root = self.root
try:
with super(TestStorage, self)._switch_user():
self.root = "/1.5/%d" % (self.user_id,)
yield
finally:
self.root = orig_root
def retry_post_json(self, *args, **kwargs):
"""Helper wrapper for any POST operation."""
return self._retry_send(self.app.post_json, *args, **kwargs)
def retry_put_json(self, *args, **kwargs):
"""Helper wrapper for any PUT operation."""
return self._retry_send(self.app.put_json, *args, **kwargs)
def retry_delete(self, *args, **kwargs):
"""Helper wrapper for any DELETE operation."""
return self._retry_send(self.app.delete, *args, **kwargs)
def _retry_send(self, func, *args, **kwargs):
try:
# Try to call underlying webtest method with args.
return func(*args, **kwargs) # Generic callable
except webtest.AppError as ex:
# If non-200 resp, we want to retry as may be transient
# status. If 409 (conflict) or 503 (Service Unavailable)
# are not present, err non-transient and we re-raise,
# no retry.
if "409 " not in ex.args[0] and "503 " not in ex.args[0]:
raise ex
time.sleep(0.01)
return func(*args, **kwargs)
def test_get_info_collections(self):
# xxx_col1 gets 3 items, xxx_col2 gets 5 items.
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(3)]
resp = self.retry_post_json(self.root + "/storage/xxx_col1", bsos)
ts1 = resp.json["modified"]
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
resp = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
ts2 = resp.json["modified"]
# only those collections should appear in the query.
resp = self.app.get(self.root + "/info/collections")
res = resp.json
keys = sorted(list(res.keys()))
self.assertEqual(keys, ["xxx_col1", "xxx_col2"])
self.assertEqual(res["xxx_col1"], ts1)
self.assertEqual(res["xxx_col2"], ts2)
# Updating items in xxx_col2, check timestamps.
bsos = [{"id": str(i).zfill(2), "payload": "yyy"} for i in range(2)]
resp = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
self.assertTrue(ts2 < resp.json["modified"])
ts2 = resp.json["modified"]
resp = self.app.get(self.root + "/info/collections")
res = resp.json
keys = sorted(list(res.keys()))
self.assertEqual(keys, ["xxx_col1", "xxx_col2"])
self.assertEqual(res["xxx_col1"], ts1)
self.assertEqual(res["xxx_col2"], ts2)
def test_get_collection_count(self):
# xxx_col1 gets 3 items, xxx_col2 gets 5 items.
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(3)]
self.retry_post_json(self.root + "/storage/xxx_col1", bsos)
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
# those counts should be reflected back in query.
resp = self.app.get(self.root + "/info/collection_counts")
res = resp.json
self.assertEqual(len(res), 2)
self.assertEqual(res["xxx_col1"], 3)
self.assertEqual(res["xxx_col2"], 5)
def test_bad_cache(self):
# fixes #637332
# the collection name <-> id mapper is temporarely cached to
# save a few requests.
# but should get purged when new collections are added
# 1. get collection info
resp = self.app.get(self.root + "/info/collections")
numcols = len(resp.json)
# 2. add a new collection + stuff
bso = {"id": "125", "payload": _PLD}
self.retry_put_json(self.root + "/storage/xxxx/125", bso)
# 3. get collection info again, should find the new ones
resp = self.app.get(self.root + "/info/collections")
self.assertEqual(len(resp.json), numcols + 1)
def test_get_collection_only(self):
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
# non-existent collections appear as empty
resp = self.app.get(self.root + "/storage/nonexistent")
res = resp.json
self.assertEqual(res, [])
# try just getting all items at once.
resp = self.app.get(self.root + "/storage/xxx_col2")
res = resp.json
res.sort()
self.assertEqual(res, ["00", "01", "02", "03", "04"])
self.assertEqual(int(resp.headers["X-Weave-Records"]), 5)
# trying various filters
# "ids"
# Returns the ids for objects in the collection that are in the
# provided comma-separated list.
res = self.app.get(self.root + "/storage/xxx_col2?ids=01,03,17")
res = res.json
res.sort()
self.assertEqual(res, ["01", "03"])
# "newer"
# Returns only ids for objects in the collection that have been last
# modified after the timestamp given.
self.retry_delete(self.root + "/storage/xxx_col2")
bso = {"id": "128", "payload": "x"}
res = self.retry_put_json(self.root + "/storage/xxx_col2/128", bso)
ts1 = float(res.headers["X-Last-Modified"])
bso = {"id": "129", "payload": "x"}
res = self.retry_put_json(self.root + "/storage/xxx_col2/129", bso)
ts2 = float(res.headers["X-Last-Modified"])
self.assertTrue(ts1 < ts2)
res = self.app.get(self.root + "/storage/xxx_col2?newer=%s" % ts1)
self.assertEqual(res.json, ["129"])
res = self.app.get(self.root + "/storage/xxx_col2?newer=%s" % ts2)
self.assertEqual(res.json, [])
res = self.app.get(self.root + "/storage/xxx_col2?newer=%s" % (ts1 - 1))
self.assertEqual(sorted(res.json), ["128", "129"])
# "older"
# Returns only ids for objects in the collection that have been last
# modified before the timestamp given.
self.retry_delete(self.root + "/storage/xxx_col2")
bso = {"id": "128", "payload": "x"}
res = self.retry_put_json(self.root + "/storage/xxx_col2/128", bso)
ts1 = float(res.headers["X-Last-Modified"])
bso = {"id": "129", "payload": "x"}
res = self.retry_put_json(self.root + "/storage/xxx_col2/129", bso)
ts2 = float(res.headers["X-Last-Modified"])
self.assertTrue(ts1 < ts2)
res = self.app.get(self.root + "/storage/xxx_col2?older=%s" % ts1)
self.assertEqual(res.json, [])
res = self.app.get(self.root + "/storage/xxx_col2?older=%s" % ts2)
self.assertEqual(res.json, ["128"])
res = self.app.get(self.root + "/storage/xxx_col2?older=%s" % (ts2 + 1))
self.assertEqual(sorted(res.json), ["128", "129"])
qs = "?older=%s&newer=%s" % (ts2 + 1, ts1)
res = self.app.get(self.root + "/storage/xxx_col2" + qs)
self.assertEqual(sorted(res.json), ["129"])
# "full"
# If defined, returns the full BSO, rather than just the id.
res = self.app.get(self.root + "/storage/xxx_col2?full=1")
keys = list(res.json[0].keys())
keys.sort()
wanted = ["id", "modified", "payload"]
self.assertEqual(keys, wanted)
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertTrue(isinstance(res.json, list))
# "limit"
# Sets the maximum number of ids that will be returned
self.retry_delete(self.root + "/storage/xxx_col2")
bsos = []
for i in range(10):
bso = {"id": str(i).zfill(2), "payload": "x", "sortindex": i}
bsos.append(bso)
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
query_url = self.root + "/storage/xxx_col2?sort=index"
res = self.app.get(query_url)
all_items = res.json
self.assertEqual(len(all_items), 10)
res = self.app.get(query_url + "&limit=2")
self.assertEqual(res.json, all_items[:2])
# "offset"
# Skips over items that have already been returned.
next_offset = res.headers["X-Weave-Next-Offset"]
res = self.app.get(query_url + "&limit=3&offset=" + next_offset)
self.assertEqual(res.json, all_items[2:5])
next_offset = res.headers["X-Weave-Next-Offset"]
res = self.app.get(query_url + "&offset=" + next_offset)
self.assertEqual(res.json, all_items[5:])
self.assertTrue("X-Weave-Next-Offset" not in res.headers)
res = self.app.get(query_url + "&limit=10000&offset=" + next_offset)
self.assertEqual(res.json, all_items[5:])
self.assertTrue("X-Weave-Next-Offset" not in res.headers)
# "offset" again, this time ordering by descending timestamp.
query_url = self.root + "/storage/xxx_col2?sort=newest"
res = self.app.get(query_url)
all_items = res.json
self.assertEqual(len(all_items), 10)
res = self.app.get(query_url + "&limit=2")
self.assertEqual(res.json, all_items[:2])
next_offset = res.headers["X-Weave-Next-Offset"]
res = self.app.get(query_url + "&limit=3&offset=" + next_offset)
self.assertEqual(res.json, all_items[2:5])
next_offset = res.headers["X-Weave-Next-Offset"]
res = self.app.get(query_url + "&offset=" + next_offset)
self.assertEqual(res.json, all_items[5:])
self.assertTrue("X-Weave-Next-Offset" not in res.headers)
res = self.app.get(query_url + "&limit=10000&offset=" + next_offset)
self.assertEqual(res.json, all_items[5:])
# "offset" again, this time ordering by ascending timestamp.
query_url = self.root + "/storage/xxx_col2?sort=oldest"
res = self.app.get(query_url)
all_items = res.json
self.assertEqual(len(all_items), 10)
res = self.app.get(query_url + "&limit=2")
self.assertEqual(res.json, all_items[:2])
next_offset = res.headers["X-Weave-Next-Offset"]
res = self.app.get(query_url + "&limit=3&offset=" + next_offset)
self.assertEqual(res.json, all_items[2:5])
next_offset = res.headers["X-Weave-Next-Offset"]
res = self.app.get(query_url + "&offset=" + next_offset)
self.assertEqual(res.json, all_items[5:])
self.assertTrue("X-Weave-Next-Offset" not in res.headers)
res = self.app.get(query_url + "&limit=10000&offset=" + next_offset)
self.assertEqual(res.json, all_items[5:])
# "offset" once more, this time with no explicit ordering
query_url = self.root + "/storage/xxx_col2?"
res = self.app.get(query_url)
all_items = res.json
self.assertEqual(len(all_items), 10)
res = self.app.get(query_url + "&limit=2")
self.assertEqual(res.json, all_items[:2])
next_offset = res.headers["X-Weave-Next-Offset"]
res = self.app.get(query_url + "&limit=3&offset=" + next_offset)
self.assertEqual(res.json, all_items[2:5])
next_offset = res.headers["X-Weave-Next-Offset"]
res = self.app.get(query_url + "&offset=" + next_offset)
self.assertEqual(res.json, all_items[5:])
self.assertTrue("X-Weave-Next-Offset" not in res.headers)
res = self.app.get(query_url + "&limit=10000&offset=" + next_offset)
# "sort"
# 'newest': Orders by timestamp number (newest first)
# 'oldest': Orders by timestamp number (oldest first)
# 'index': Orders by the sortindex descending (highest weight first)
self.retry_delete(self.root + "/storage/xxx_col2")
for index, sortindex in (("00", -1), ("01", 34), ("02", 12)):
bso = {"id": index, "payload": "x", "sortindex": sortindex}
self.retry_post_json(self.root + "/storage/xxx_col2", [bso])
res = self.app.get(self.root + "/storage/xxx_col2?sort=newest")
res = res.json
self.assertEqual(res, ["02", "01", "00"])
res = self.app.get(self.root + "/storage/xxx_col2?sort=oldest")
res = res.json
self.assertEqual(res, ["00", "01", "02"])
res = self.app.get(self.root + "/storage/xxx_col2?sort=index")
res = res.json
self.assertEqual(res, ["01", "02", "00"])
def test_alternative_formats(self):
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
# application/json
res = self.app.get(
self.root + "/storage/xxx_col2",
headers=[("Accept", "application/json")],
)
self.assertEqual(res.content_type.split(";")[0], "application/json")
res = res.json
res.sort()
self.assertEqual(res, ["00", "01", "02", "03", "04"])
# application/newlines
res = self.app.get(
self.root + "/storage/xxx_col2",
headers=[("Accept", "application/newlines")],
)
self.assertEqual(res.content_type, "application/newlines")
self.assertTrue(res.body.endswith(b"\n"))
res = [
json_loads(line) for line in res.body.decode("utf-8").strip().split("\n")
]
res.sort()
self.assertEqual(res, ["00", "01", "02", "03", "04"])
# unspecified format defaults to json
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(res.content_type.split(";")[0], "application/json")
# unkown format gets a 406
self.app.get(
self.root + "/storage/xxx_col2",
headers=[("Accept", "x/yy")],
status=406,
)
def test_set_collection_with_if_modified_since(self):
# Create five items with different timestamps.
for i in range(5):
bsos = [{"id": str(i).zfill(2), "payload": "xxx"}]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
# Get them all, along with their timestamps.
res = self.app.get(self.root + "/storage/xxx_col2?full=true").json
self.assertEqual(len(res), 5)
timestamps = sorted([r["modified"] for r in res])
# The timestamp of the collection should be the max of all those.
self.app.get(
self.root + "/storage/xxx_col2",
headers={"X-If-Modified-Since": str(timestamps[0])},
status=200,
)
res = self.app.get(
self.root + "/storage/xxx_col2",
headers={"X-If-Modified-Since": str(timestamps[-1])},
status=304,
)
self.assertTrue("X-Last-Modified" in res.headers)
def test_get_item(self):
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
# grabbing object 1 from xxx_col2
res = self.app.get(self.root + "/storage/xxx_col2/01")
res = res.json
keys = list(res.keys())
keys.sort()
self.assertEqual(keys, ["id", "modified", "payload"])
self.assertEqual(res["id"], "01")
# unexisting object
self.app.get(self.root + "/storage/xxx_col2/99", status=404)
# using x-if-modified-since header.
self.app.get(
self.root + "/storage/xxx_col2/01",
headers={"X-If-Modified-Since": str(res["modified"])},
status=304,
)
self.app.get(
self.root + "/storage/xxx_col2/01",
headers={"X-If-Modified-Since": str(res["modified"] + 1)},
status=304,
)
res = self.app.get(
self.root + "/storage/xxx_col2/01",
headers={"X-If-Modified-Since": str(res["modified"] - 1)},
)
self.assertEqual(res.json["id"], "01")
def test_set_item(self):
# let's create an object
bso = {"payload": _PLD}
self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
res = self.app.get(self.root + "/storage/xxx_col2/12345")
res = res.json
self.assertEqual(res["payload"], _PLD)
# now let's update it
bso = {"payload": "YYY"}
self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
res = self.app.get(self.root + "/storage/xxx_col2/12345")
res = res.json
self.assertEqual(res["payload"], "YYY")
def test_set_collection(self):
# sending two bsos
bso1 = {"id": "12", "payload": _PLD}
bso2 = {"id": "13", "payload": _PLD}
bsos = [bso1, bso2]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
# checking what we did
res = self.app.get(self.root + "/storage/xxx_col2/12")
res = res.json
self.assertEqual(res["payload"], _PLD)
res = self.app.get(self.root + "/storage/xxx_col2/13")
res = res.json
self.assertEqual(res["payload"], _PLD)
# one more time, with changes
bso1 = {"id": "13", "payload": "XyX"}
bso2 = {"id": "14", "payload": _PLD}
bsos = [bso1, bso2]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
# checking what we did
res = self.app.get(self.root + "/storage/xxx_col2/14")
res = res.json
self.assertEqual(res["payload"], _PLD)
res = self.app.get(self.root + "/storage/xxx_col2/13")
res = res.json
self.assertEqual(res["payload"], "XyX")
# sending two bsos with one bad sortindex
bso1 = {"id": "one", "payload": _PLD}
bso2 = {"id": "two", "payload": _PLD, "sortindex": "FAIL"}
bsos = [bso1, bso2]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
self.app.get(self.root + "/storage/xxx_col2/two", status=404)
def test_set_collection_input_formats(self):
# If we send with application/newlines it should work.
bso1 = {"id": "12", "payload": _PLD}
bso2 = {"id": "13", "payload": _PLD}
bsos = [bso1, bso2]
body = "\n".join(json_dumps(bso) for bso in bsos)
self.app.post(
self.root + "/storage/xxx_col2",
body,
headers={"Content-Type": "application/newlines"},
)
items = self.app.get(self.root + "/storage/xxx_col2").json
self.assertEqual(len(items), 2)
# If we send an unknown content type, we get an error.
self.retry_delete(self.root + "/storage/xxx_col2")
body = json_dumps(bsos)
self.app.post(
self.root + "/storage/xxx_col2",
body,
headers={"Content-Type": "application/octet-stream"},
status=415,
)
items = self.app.get(self.root + "/storage/xxx_col2").json
self.assertEqual(len(items), 0)
def test_set_item_input_formats(self):
# If we send with application/json it should work.
body = json_dumps({"payload": _PLD})
self.app.put(
self.root + "/storage/xxx_col2/TEST",
body,
headers={"Content-Type": "application/json"},
)
item = self.app.get(self.root + "/storage/xxx_col2/TEST").json
self.assertEqual(item["payload"], _PLD)
# If we send json with some other content type, it should fail
self.retry_delete(self.root + "/storage/xxx_col2")
self.app.put(
self.root + "/storage/xxx_col2/TEST",
body,
headers={"Content-Type": "application/octet-stream"},
status=415,
)
self.app.get(self.root + "/storage/xxx_col2/TEST", status=404)
# Unless we use text/plain, which is a special bw-compat case.
self.app.put(
self.root + "/storage/xxx_col2/TEST",
body,
headers={"Content-Type": "text/plain"},
)
item = self.app.get(self.root + "/storage/xxx_col2/TEST").json
self.assertEqual(item["payload"], _PLD)
def test_app_newlines_when_payloads_contain_newlines(self):
# Send some application/newlines with embedded newline chars.
bsos = [
{"id": "01", "payload": "hello\nworld"},
{"id": "02", "payload": "\nmarco\npolo\n"},
]
body = "\n".join(json_dumps(bso) for bso in bsos)
self.assertEqual(len(body.split("\n")), 2)
self.app.post(
self.root + "/storage/xxx_col2",
body,
headers={"Content-Type": "application/newlines"},
)
# Read them back as JSON list, check payloads.
items = self.app.get(self.root + "/storage/xxx_col2?full=1").json
self.assertEqual(len(items), 2)
items.sort(key=lambda bso: bso["id"])
self.assertEqual(items[0]["payload"], bsos[0]["payload"])
self.assertEqual(items[1]["payload"], bsos[1]["payload"])
# Read them back as application/newlines, check payloads.
res = self.app.get(
self.root + "/storage/xxx_col2?full=1",
headers={
"Accept": "application/newlines",
},
)
items = [
json_loads(line) for line in res.body.decode("utf-8").strip().split("\n")
]
self.assertEqual(len(items), 2)
items.sort(key=lambda bso: bso["id"])
self.assertEqual(items[0]["payload"], bsos[0]["payload"])
self.assertEqual(items[1]["payload"], bsos[1]["payload"])
def test_collection_usage(self):
self.retry_delete(self.root + "/storage")
bso1 = {"id": "13", "payload": "XyX"}
bso2 = {"id": "14", "payload": _PLD}
bsos = [bso1, bso2]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = self.app.get(self.root + "/info/collection_usage")
usage = res.json
xxx_col2_size = usage["xxx_col2"]
wanted = (len(bso1["payload"]) + len(bso2["payload"])) / 1024.0
self.assertEqual(round(xxx_col2_size, 2), round(wanted, 2))
def test_delete_collection_items(self):
# creating a collection of three
bso1 = {"id": "12", "payload": _PLD}
bso2 = {"id": "13", "payload": _PLD}
bso3 = {"id": "14", "payload": _PLD}
bsos = [bso1, bso2, bso3]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 3)
# deleting all items
self.retry_delete(self.root + "/storage/xxx_col2")
items = self.app.get(self.root + "/storage/xxx_col2").json
self.assertEqual(len(items), 0)
# Deletes the ids for objects in the collection that are in the
# provided comma-separated list.
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 3)
self.retry_delete(self.root + "/storage/xxx_col2?ids=12,14")
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 1)
self.retry_delete(self.root + "/storage/xxx_col2?ids=13")
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 0)
def test_delete_item(self):
# creating a collection of three
bso1 = {"id": "12", "payload": _PLD}
bso2 = {"id": "13", "payload": _PLD}
bso3 = {"id": "14", "payload": _PLD}
bsos = [bso1, bso2, bso3]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 3)
ts = float(res.headers["X-Last-Modified"])
# deleting item 13
self.retry_delete(self.root + "/storage/xxx_col2/13")
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 2)
# unexisting item should return a 404
self.retry_delete(self.root + "/storage/xxx_col2/12982", status=404)
# The collection should get an updated timestsamp.
res = self.app.get(self.root + "/info/collections")
self.assertTrue(ts < float(res.headers["X-Last-Modified"]))
def test_delete_storage(self):
# creating a collection of three
bso1 = {"id": "12", "payload": _PLD}
bso2 = {"id": "13", "payload": _PLD}
bso3 = {"id": "14", "payload": _PLD}
bsos = [bso1, bso2, bso3]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 3)
# deleting all
self.retry_delete(self.root + "/storage")
items = self.app.get(self.root + "/storage/xxx_col2").json
self.assertEqual(len(items), 0)
self.retry_delete(self.root + "/storage/xxx_col2", status=200)
self.assertEqual(len(items), 0)
def test_x_timestamp_header(self):
if self.distant:
pytest.skip("Test cannot be run against a live server.")
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
now = round(time.time(), 2)
time.sleep(0.01)
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertTrue(now <= float(res.headers["X-Weave-Timestamp"]))
# getting the timestamp with a PUT
now = round(time.time(), 2)
time.sleep(0.01)
bso = {"payload": _PLD}
res = self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
self.assertTrue(now <= float(res.headers["X-Weave-Timestamp"]))
self.assertTrue(abs(now - float(res.headers["X-Weave-Timestamp"])) <= 200)
# getting the timestamp with a POST
now = round(time.time(), 2)
time.sleep(0.01)
bso1 = {"id": "12", "payload": _PLD}
bso2 = {"id": "13", "payload": _PLD}
bsos = [bso1, bso2]
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
self.assertTrue(now <= float(res.headers["X-Weave-Timestamp"]))
def test_ifunmodifiedsince(self):
bso = {"id": "12345", "payload": _PLD}
res = self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
# Using an X-If-Unmodified-Since in the past should cause 412s.
ts = str(float(res.headers["X-Last-Modified"]) - 1)
bso = {"id": "12345", "payload": _PLD + "XXX"}
res = self.retry_put_json(
self.root + "/storage/xxx_col2/12345",
bso,
headers=[("X-If-Unmodified-Since", ts)],
status=412,
)
self.assertTrue("X-Last-Modified" in res.headers)
res = self.retry_delete(
self.root + "/storage/xxx_col2/12345",
headers=[("X-If-Unmodified-Since", ts)],
status=412,
)
self.assertTrue("X-Last-Modified" in res.headers)
self.retry_post_json(
self.root + "/storage/xxx_col2",
[bso],
headers=[("X-If-Unmodified-Since", ts)],
status=412,
)
self.retry_delete(
self.root + "/storage/xxx_col2?ids=12345",
headers=[("X-If-Unmodified-Since", ts)],
status=412,
)
self.app.get(
self.root + "/storage/xxx_col2/12345",
headers=[("X-If-Unmodified-Since", ts)],
status=412,
)
self.app.get(
self.root + "/storage/xxx_col2",
headers=[("X-If-Unmodified-Since", ts)],
status=412,
)
# Deleting items from a collection should give 412 even if some
# other, unrelated item in the collection has been modified.
ts = res.headers["X-Last-Modified"]
res2 = self.retry_put_json(
self.root + "/storage/xxx_col2/54321",
{
"payload": _PLD,
},
)
self.retry_delete(
self.root + "/storage/xxx_col2?ids=12345",
headers=[("X-If-Unmodified-Since", ts)],
status=412,
)
ts = res2.headers["X-Last-Modified"]
# All of those should have left the BSO unchanged
res2 = self.app.get(self.root + "/storage/xxx_col2/12345")
self.assertEqual(res2.json["payload"], _PLD)
self.assertEqual(
res2.headers["X-Last-Modified"], res.headers["X-Last-Modified"]
)
# Using an X-If-Unmodified-Since equal to
# X-Last-Modified should allow the request to succeed.
res = self.retry_post_json(
self.root + "/storage/xxx_col2",
[bso],
headers=[("X-If-Unmodified-Since", ts)],
status=200,
)
ts = res.headers["X-Last-Modified"]
self.app.get(
self.root + "/storage/xxx_col2/12345",
headers=[("X-If-Unmodified-Since", ts)],
status=200,
)
self.retry_delete(
self.root + "/storage/xxx_col2/12345",
headers=[("X-If-Unmodified-Since", ts)],
status=200,
)
res = self.retry_put_json(
self.root + "/storage/xxx_col2/12345",
bso,
headers=[("X-If-Unmodified-Since", "0")],
status=200,
)
ts = res.headers["X-Last-Modified"]
self.app.get(
self.root + "/storage/xxx_col2",
headers=[("X-If-Unmodified-Since", ts)],
status=200,
)
self.retry_delete(
self.root + "/storage/xxx_col2?ids=12345",
headers=[("X-If-Unmodified-Since", ts)],
status=200,
)
def test_quota(self):
res = self.app.get(self.root + "/info/quota")
old_used = res.json[0]
bso = {"payload": _PLD}
self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
res = self.app.get(self.root + "/info/quota")
used = res.json[0]
self.assertEqual(used - old_used, len(_PLD) / 1024.0)
def test_get_collection_ttl(self):
bso = {"payload": _PLD, "ttl": 0}
res = self.retry_put_json(self.root + "/storage/xxx_col2/12345", bso)
time.sleep(1.1)
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(res.json, [])
bso = {"payload": _PLD, "ttl": 2}
res = self.retry_put_json(self.root + "/storage/xxx_col2/123456", bso)
# it should exists now
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 1)
# trying a second put again
self.retry_put_json(self.root + "/storage/xxx_col2/123456", bso)
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 1)
time.sleep(2.1)
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 0)
def test_multi_item_post_limits(self):
res = self.app.get(self.root + "/info/configuration")
try:
max_bytes = res.json["max_post_bytes"]
max_count = res.json["max_post_records"]
max_req_bytes = res.json["max_request_bytes"]
except KeyError:
# Can't run against live server if it doesn't
# report the right config options.
if self.distant:
pytest.skip("")
max_bytes = get_limit_config(self.config, "max_post_bytes")
max_count = get_limit_config(self.config, "max_post_records")
max_req_bytes = get_limit_config(self.config, "max_request_bytes")
# Uploading max_count-5 small objects should succeed.
bsos = [{"id": str(i).zfill(2), "payload": "X"} for i in range(max_count - 5)]
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = res.json
self.assertEqual(len(res["success"]), max_count - 5)
self.assertEqual(len(res["failed"]), 0)
# Uploading max_count+5 items should produce five failures.
bsos = [{"id": str(i).zfill(2), "payload": "X"} for i in range(max_count + 5)]
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = res.json
self.assertEqual(len(res["success"]), max_count)
self.assertEqual(len(res["failed"]), 5)
# Uploading items such that the last item puts us over the
# cumulative limit on payload size, should produce 1 failure.
# The item_size here is arbitrary, so I made it a prime in kB.
item_size = 227 * 1024
max_items, leftover = divmod(max_bytes, item_size)
bsos = [
{"id": str(i).zfill(2), "payload": "X" * item_size}
for i in range(max_items)
]
bsos.append({"id": str(max_items), "payload": "X" * (leftover + 1)})
# Check that we don't go over the limit on raw request bytes,
# which would get us rejected in production with a 413.
self.assertTrue(len(json.dumps(bsos)) < max_req_bytes)
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = res.json
self.assertEqual(len(res["success"]), max_items)
self.assertEqual(len(res["failed"]), 1)
def test_weird_args(self):
# pushing some data in xxx_col2
bsos = [{"id": str(i).zfill(2), "payload": _PLD} for i in range(10)]
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = res.json
# trying weird args and make sure the server returns 400s
# Note: "Offset" is a string since the bsoid could be anything.
# skipping that for now.
args = ("newer", "older", "limit")
for arg in args:
value = randtext()
self.app.get(
self.root + "/storage/xxx_col2?%s=%s" % (arg, value),
status=400,
)
# what about a crazy ids= string ?
ids = ",".join([randtext(10) for i in range(100)])
res = self.app.get(self.root + "/storage/xxx_col2?ids=%s" % ids)
self.assertEqual(res.json, [])
# trying unexpected args - they should not break
self.app.get(self.root + "/storage/xxx_col2?blabla=1", status=200)
def test_guid_deletion(self):
# pushing some data in xxx_col2
bsos = [
{
"id": "6820f3ca-6e8a-4ff4-8af7-8b3625d7d65%d" % i,
"payload": _PLD,
}
for i in range(5)
]
res = self.retry_post_json(self.root + "/storage/passwords", bsos)
res = res.json
self.assertEqual(len(res["success"]), 5)
# now deleting some of them
ids = ",".join(["6820f3ca-6e8a-4ff4-8af7-8b3625d7d65%d" % i for i in range(2)])
self.retry_delete(self.root + "/storage/passwords?ids=%s" % ids)
res = self.app.get(self.root + "/storage/passwords?ids=%s" % ids)
self.assertEqual(len(res.json), 0)
res = self.app.get(self.root + "/storage/passwords")
self.assertEqual(len(res.json), 3)
def test_specifying_ids_with_percent_encoded_query_string(self):
# create some items
bsos = [{"id": "test-%d" % i, "payload": _PLD} for i in range(5)]
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = res.json
self.assertEqual(len(res["success"]), 5)
# now delete some of them
ids = ",".join(["test-%d" % i for i in range(2)])
ids = urllib.request.quote(ids)
self.retry_delete(self.root + "/storage/xxx_col2?ids=%s" % ids)
# check that the correct items were deleted
res = self.app.get(self.root + "/storage/xxx_col2?ids=%s" % ids)
self.assertEqual(len(res.json), 0)
res = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(len(res.json), 3)
def test_timestamp_numbers_are_decimals(self):
# Create five items with different timestamps.
for i in range(5):
bsos = [{"id": str(i).zfill(2), "payload": "xxx"}]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
# make sure the server returns only proper precision timestamps.
resp = self.app.get(self.root + "/storage/xxx_col2?full=1")
bsos = json_loads(resp.body)
timestamps = []
for bso in bsos:
ts = bso["modified"]
# timestamps could be on the hundred seconds (.10) or on the
# second (.0) and the zero could be dropped. We just don't want
# anything beyond milisecond.
self.assertLessEqual(len(str(ts).split(".")[-1]), 2)
timestamps.append(ts)
timestamps.sort()
# try a newer filter now, to get the last two objects
ts = float(timestamps[-3])
# Returns only ids for objects in the collection that have been
# last modified since the timestamp given.
res = self.app.get(self.root + "/storage/xxx_col2?newer=%s" % ts)
res = res.json
try:
self.assertEqual(sorted(res), ["03", "04"])
except AssertionError:
# need to display the whole collection to understand the issue
msg = "Timestamp used: %s" % ts
msg += " " + self.app.get(self.root + "/storage/xxx_col2?full=1").body
msg += " Timestamps received: %s" % str(timestamps)
msg += " Result of newer query: %s" % res
raise AssertionError(msg)
def test_strict_newer(self):
# send two bsos in the 'meh' collection
bso1 = {"id": "01", "payload": _PLD}
bso2 = {"id": "02", "payload": _PLD}
bsos = [bso1, bso2]
res = self.retry_post_json(self.root + "/storage/xxx_meh", bsos)
ts = float(res.headers["X-Last-Modified"])
# send two more bsos
bso3 = {"id": "03", "payload": _PLD}
bso4 = {"id": "04", "payload": _PLD}
bsos = [bso3, bso4]
res = self.retry_post_json(self.root + "/storage/xxx_meh", bsos)
# asking for bsos using newer=ts where newer is the timestamp
# of bso 1 and 2, should not return them
res = self.app.get(self.root + "/storage/xxx_meh?newer=%s" % ts)
res = res.json
self.assertEqual(sorted(res), ["03", "04"])
def test_strict_older(self):
# send two bsos in the 'xxx_meh' collection
bso1 = {"id": "01", "payload": _PLD}
bso2 = {"id": "02", "payload": _PLD}
bsos = [bso1, bso2]
res = self.retry_post_json(self.root + "/storage/xxx_meh", bsos)
# send two more bsos
bso3 = {"id": "03", "payload": _PLD}
bso4 = {"id": "04", "payload": _PLD}
bsos = [bso3, bso4]
res = self.retry_post_json(self.root + "/storage/xxx_meh", bsos)
ts = float(res.headers["X-Last-Modified"])
# asking for bsos using older=ts where older is the timestamp
# of bso 3 and 4, should not return them
res = self.app.get(self.root + "/storage/xxx_meh?older=%s" % ts)
res = res.json
self.assertEqual(sorted(res), ["01", "02"])
def test_handling_of_invalid_json_in_bso_uploads(self):
# Single upload with JSON that's not a BSO.
bso = "notabso"
res = self.retry_put_json(
self.root + "/storage/xxx_col2/invalid", bso, status=400
)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
bso = 42
res = self.retry_put_json(
self.root + "/storage/xxx_col2/invalid", bso, status=400
)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
bso = {"id": ["01", "02"], "payload": {"3": "4"}}
res = self.retry_put_json(
self.root + "/storage/xxx_col2/invalid", bso, status=400
)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
# Batch upload with JSON that's not a list of BSOs
bsos = "notalist"
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos, status=400)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
bsos = 42
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos, status=400)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
# Batch upload a list with something that's not a valid data dict.
# It should fail out entirely, as the input is seriously broken.
bsos = [{"id": "01", "payload": "GOOD"}, "BAD"]
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos, status=400)
# Batch upload a list with something that's an invalid BSO.
# It should process the good entry and fail for the bad.
bsos = [{"id": "01", "payload": "GOOD"}, {"id": "02", "invalid": "ya"}]
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
res = res.json
self.assertEqual(len(res["success"]), 1)
self.assertEqual(len(res["failed"]), 1)
def test_handling_of_invalid_bso_fields(self):
coll_url = self.root + "/storage/xxx_col2"
# Invalid ID - unacceptable characters.
# The newline cases are especially nuanced because \n
# gets special treatment from the regex library.
bso = {"id": "A\nB", "payload": "testing"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
bso = {"id": "A\n", "payload": "testing"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
bso = {"id": "\nN", "payload": "testing"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
bso = {"id": "A\tB", "payload": "testing"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
# Invalid ID - empty string is not acceptable.
bso = {"id": "", "payload": "testing"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
# Invalid ID - too long
bso = {"id": "X" * 65, "payload": "testing"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
# Commenting out this test.
# This uses the same invalid BSO from above, which should return a 400
"""
res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=404)
"""
# Invalid sortindex - not an integer
bso = {"id": "TEST", "payload": "testing", "sortindex": "xxx_meh"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
# Invalid sortindex - not an integer
bso = {"id": "TEST", "payload": "testing", "sortindex": "2.6"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
# Invalid sortindex - larger than max value
bso = {"id": "TEST", "payload": "testing", "sortindex": "1" + "0" * 9}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
# Invalid payload - not a string
bso = {"id": "TEST", "payload": 42}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
# Invalid ttl - not an integer
bso = {"id": "TEST", "payload": "testing", "ttl": "eh?"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
# Invalid ttl - not an integer
bso = {"id": "TEST", "payload": "testing", "ttl": "4.2"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
# Invalid BSO - unknown field
bso = {"id": "TEST", "unexpected": "spanish-inquisition"}
res = self.retry_post_json(coll_url, [bso])
self.assertTrue(res.json["failed"] and not res.json["success"])
res = self.retry_put_json(coll_url + "/" + bso["id"], bso, status=400)
self.assertEqual(res.json, WEAVE_INVALID_WBO)
def test_that_batch_gets_are_limited_to_max_number_of_ids(self):
bso = {"id": "01", "payload": "testing"}
self.retry_put_json(self.root + "/storage/xxx_col2/01", bso)
# Getting with less than the limit works OK.
ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS - 1))
res = self.app.get(self.root + "/storage/xxx_col2?ids=" + ids)
self.assertEqual(res.json, ["01"])
# Getting with equal to the limit works OK.
ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS))
res = self.app.get(self.root + "/storage/xxx_col2?ids=" + ids)
self.assertEqual(res.json, ["01"])
# Getting with more than the limit fails.
ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS + 1))
self.app.get(self.root + "/storage/xxx_col2?ids=" + ids, status=400)
def test_that_batch_deletes_are_limited_to_max_number_of_ids(self):
bso = {"id": "01", "payload": "testing"}
# Deleting with less than the limit works OK.
self.retry_put_json(self.root + "/storage/xxx_col2/01", bso)
ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS - 1))
self.retry_delete(self.root + "/storage/xxx_col2?ids=" + ids)
# Deleting with equal to the limit works OK.
self.retry_put_json(self.root + "/storage/xxx_col2/01", bso)
ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS))
self.retry_delete(self.root + "/storage/xxx_col2?ids=" + ids)
# Deleting with more than the limit fails.
self.retry_put_json(self.root + "/storage/xxx_col2/01", bso)
ids = ",".join(str(i).zfill(2) for i in range(BATCH_MAX_IDS + 1))
self.retry_delete(self.root + "/storage/xxx_col2?ids=" + ids, status=400)
def test_that_expired_items_can_be_overwritten_via_PUT(self):
# Upload something with a small ttl.
bso = {"payload": "XYZ", "ttl": 0}
self.retry_put_json(self.root + "/storage/xxx_col2/TEST", bso)
# Wait for it to expire.
time.sleep(0.02)
self.app.get(self.root + "/storage/xxx_col2/TEST", status=404)
# Overwriting it should still work.
bso = {"payload": "XYZ", "ttl": 42}
self.retry_put_json(self.root + "/storage/xxx_col2/TEST", bso)
def test_if_modified_since_on_info_views(self):
# Store something, so the views have a modified time > 0.
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(3)]
self.retry_post_json(self.root + "/storage/xxx_col1", bsos)
INFO_VIEWS = (
"/info/collections",
"/info/quota",
"/info/collection_usage",
"/info/collection_counts",
)
# Get the initial last-modified version.
r = self.app.get(self.root + "/info/collections")
ts1 = float(r.headers["X-Last-Modified"])
self.assertTrue(ts1 > 0)
# With X-I-M-S set before latest change, all should give a 200.
headers = {"X-If-Modified-Since": str(ts1 - 1)}
for view in INFO_VIEWS:
self.app.get(self.root + view, headers=headers, status=200)
# With X-I-M-S set to after latest change , all should give a 304.
headers = {"X-If-Modified-Since": str(ts1)}
for view in INFO_VIEWS:
self.app.get(self.root + view, headers=headers, status=304)
# Change a collection.
bso = {"payload": "TEST"}
r = self.retry_put_json(self.root + "/storage/xxx_col2/TEST", bso)
ts2 = r.headers["X-Last-Modified"]
# Using the previous version should read the updated data.
headers = {"X-If-Modified-Since": str(ts1)}
for view in INFO_VIEWS:
self.app.get(self.root + view, headers=headers, status=200)
# Using the new timestamp should produce 304s.
headers = {"X-If-Modified-Since": str(ts2)}
for view in INFO_VIEWS:
self.app.get(self.root + view, headers=headers, status=304)
# XXX TODO: the storage-level timestamp is not tracked correctly
# after deleting a collection, so this test fails for now.
# # Delete a collection.
# r = self.retry_delete(self.root + "/storage/xxx_col2")
# ts3 = r.headers["X-Last-Modified"]
# # Using the previous timestamp should read the updated data.
# headers = {"X-If-Modified-Since": str(ts2)}
# for view in INFO_VIEWS:
# self.app.get(self.root + view, headers=headers, status=200)
# # Using the new timestamp should produce 304s.
# headers = {"X-If-Modified-Since": str(ts3)}
# for view in INFO_VIEWS:
# self.app.get(self.root + view, headers=headers, status=304)
def test_that_x_last_modified_is_sent_for_all_get_requests(self):
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(5)]
self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
r = self.app.get(self.root + "/info/collections")
self.assertTrue("X-Last-Modified" in r.headers)
r = self.app.get(self.root + "/info/collection_counts")
self.assertTrue("X-Last-Modified" in r.headers)
r = self.app.get(self.root + "/storage/xxx_col2")
self.assertTrue("X-Last-Modified" in r.headers)
r = self.app.get(self.root + "/storage/xxx_col2/01")
self.assertTrue("X-Last-Modified" in r.headers)
def test_update_of_ttl_without_sending_data(self):
bso = {"payload": "x", "ttl": 1}
self.retry_put_json(self.root + "/storage/xxx_col2/TEST1", bso)
self.retry_put_json(self.root + "/storage/xxx_col2/TEST2", bso)
# Before those expire, update ttl on one that exists
# and on one that does not.
time.sleep(0.2)
bso = {"ttl": 10}
self.retry_put_json(self.root + "/storage/xxx_col2/TEST2", bso)
self.retry_put_json(self.root + "/storage/xxx_col2/TEST3", bso)
# Update some other field on TEST1, which should leave ttl untouched.
bso = {"sortindex": 3}
self.retry_put_json(self.root + "/storage/xxx_col2/TEST1", bso)
# If we wait, TEST1 should expire but the others should not.
time.sleep(0.8)
items = self.app.get(self.root + "/storage/xxx_col2?full=1").json
items = dict((item["id"], item) for item in items)
self.assertEqual(sorted(list(items.keys())), ["TEST2", "TEST3"])
# The existing item should have retained its payload.
# The new item should have got a default payload of empty string.
self.assertEqual(items["TEST2"]["payload"], "x")
self.assertEqual(items["TEST3"]["payload"], "")
ts2 = items["TEST2"]["modified"]
ts3 = items["TEST3"]["modified"]
self.assertTrue(ts2 < ts3)
def test_bulk_update_of_ttls_without_sending_data(self):
# Create 5 BSOs with a ttl of 1 second.
bsos = [{"id": str(i).zfill(2), "payload": "x", "ttl": 1} for i in range(5)]
r = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
ts1 = float(r.headers["X-Last-Modified"])
# Before they expire, bulk-update the ttl to something longer.
# Also send data for some that don't exist yet.
# And just to be really tricky, we're also going to update
# one of the payloads at the same time.
time.sleep(0.2)
bsos = [{"id": str(i).zfill(2), "ttl": 10} for i in range(3, 7)]
bsos[0]["payload"] = "xx"
r = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
self.assertEqual(len(r.json["success"]), 4)
ts2 = float(r.headers["X-Last-Modified"])
# If we wait then items 0, 1, 2 should have expired.
# Items 3, 4, 5, 6 should still exist.
time.sleep(0.8)
items = self.app.get(self.root + "/storage/xxx_col2?full=1").json
items = dict((item["id"], item) for item in items)
self.assertEqual(sorted(list(items.keys())), ["03", "04", "05", "06"])
# Items 3 and 4 should have the specified payloads.
# Items 5 and 6 should have payload defaulted to empty string.
self.assertEqual(items["03"]["payload"], "xx")
self.assertEqual(items["04"]["payload"], "x")
self.assertEqual(items["05"]["payload"], "")
self.assertEqual(items["06"]["payload"], "")
# All items created or modified by the request should get their
# timestamps update. Just bumping the ttl should not bump timestamp.
self.assertEqual(items["03"]["modified"], ts2)
self.assertEqual(items["04"]["modified"], ts1)
self.assertEqual(items["05"]["modified"], ts2)
self.assertEqual(items["06"]["modified"], ts2)
def test_that_negative_integer_fields_are_not_accepted(self):
# ttls cannot be negative
self.retry_put_json(
self.root + "/storage/xxx_col2/TEST",
{
"payload": "TEST",
"ttl": -1,
},
status=400,
)
# limit cannot be negative
self.retry_put_json(self.root + "/storage/xxx_col2/TEST", {"payload": "X"})
self.app.get(self.root + "/storage/xxx_col2?limit=-1", status=400)
# X-If-Modified-Since cannot be negative
self.app.get(
self.root + "/storage/xxx_col2",
headers={
"X-If-Modified-Since": "-3",
},
status=400,
)
# X-If-Unmodified-Since cannot be negative
self.retry_put_json(
self.root + "/storage/xxx_col2/TEST",
{
"payload": "TEST",
},
headers={
"X-If-Unmodified-Since": "-3",
},
status=400,
)
# sortindex actually *can* be negative
self.retry_put_json(
self.root + "/storage/xxx_col2/TEST",
{
"payload": "TEST",
"sortindex": -42,
},
status=200,
)
def test_meta_global_sanity(self):
# Memcache backend is configured to store 'meta' in write-through
# cache, so we want to check it explicitly. We might as well put it
# in the base tests because there's nothing memcached-specific here.
self.app.get(self.root + "/storage/meta/global", status=404)
res = self.app.get(self.root + "/storage/meta")
self.assertEqual(res.json, [])
self.retry_put_json(self.root + "/storage/meta/global", {"payload": "blob"})
res = self.app.get(self.root + "/storage/meta")
self.assertEqual(res.json, ["global"])
res = self.app.get(self.root + "/storage/meta/global")
self.assertEqual(res.json["payload"], "blob")
# It should not have extra keys.
keys = list(res.json.keys())
keys.sort()
self.assertEqual(keys, ["id", "modified", "payload"])
# It should have a properly-formatted "modified" field.
modified_re = r"['\"]modified['\"]:\s*[0-9]+\.[0-9][0-9]\s*[,}]"
self.assertTrue(re.search(modified_re, res.body.decode("utf-8")))
# Any client-specified "modified" field should be ignored
res = self.retry_put_json(
self.root + "/storage/meta/global",
{"payload": "blob", "modified": 12},
)
ts = float(res.headers["X-Weave-Timestamp"])
res = self.app.get(self.root + "/storage/meta/global")
self.assertEqual(res.json["modified"], ts)
def test_that_404_responses_have_a_json_body(self):
res = self.app.get(self.root + "/nonexistent/url", status=404)
self.assertEqual(res.content_type, "application/json")
self.assertEqual(res.json, 0)
def test_that_internal_server_fields_are_not_echoed(self):
self.retry_post_json(
self.root + "/storage/xxx_col1", [{"id": "one", "payload": "blob"}]
)
self.retry_put_json(self.root + "/storage/xxx_col1/two", {"payload": "blub"})
res = self.app.get(self.root + "/storage/xxx_col1?full=1")
self.assertEqual(len(res.json), 2)
for item in res.json:
self.assertTrue("id" in item)
self.assertTrue("payload" in item)
self.assertFalse("payload_size" in item)
self.assertFalse("ttl" in item)
for id in ("one", "two"):
res = self.app.get(self.root + "/storage/xxx_col1/" + id)
self.assertTrue("id" in res.json)
self.assertTrue("payload" in res.json)
self.assertFalse("payload_size" in res.json)
self.assertFalse("ttl" in res.json)
def test_accessing_info_collections_with_an_expired_token(self):
# This can't be run against a live server because we
# have to forge an auth token to test things properly.
if self.distant:
pytest.skip("Test cannot be run against a live server.")
# Write some items while we've got a good token.
bsos = [{"id": str(i).zfill(2), "payload": "xxx"} for i in range(3)]
resp = self.retry_post_json(self.root + "/storage/xxx_col1", bsos)
ts = float(resp.headers["X-Last-Modified"])
# Check that we can read the info correctly.
resp = self.app.get(self.root + "/info/collections")
self.assertEqual(list(resp.json.keys()), ["xxx_col1"])
self.assertEqual(resp.json["xxx_col1"], ts)
# Forge an expired token to use for the test.
auth_policy = self.config.registry.getUtility(IAuthenticationPolicy)
secret = auth_policy._get_token_secrets(self.host_url)[-1]
tm = tokenlib.TokenManager(secret=secret)
exp = time.time() - 60
data = {
"uid": self.user_id,
"node": self.host_url,
"expires": exp,
"hashed_fxa_uid": self.hashed_fxa_uid,
"fxa_uid": self.fxa_uid,
"fxa_kid": self.fxa_kid,
}
self.auth_token = tm.make_token(data)
self.auth_secret = tm.get_derived_secret(self.auth_token)
# The expired token cannot be used for normal operations.
bsos = [{"id": str(i).zfill(2), "payload": "aaa"} for i in range(3)]
self.retry_post_json(self.root + "/storage/xxx_col1", bsos, status=401)
self.app.get(self.root + "/storage/xxx_col1", status=401)
# But it still allows access to /info/collections.
resp = self.app.get(self.root + "/info/collections")
self.assertEqual(list(resp.json.keys()), ["xxx_col1"])
self.assertEqual(resp.json["xxx_col1"], ts)
def test_pagination_with_newer_and_sort_by_oldest(self):
# Twelve bsos with three different modification times.
NUM_ITEMS = 12
bsos = []
timestamps = []
for i in range(NUM_ITEMS):
bso = {"id": str(i).zfill(2), "payload": "x"}
bsos.append(bso)
if i % 4 == 3:
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
ts = float(res.headers["X-Last-Modified"])
timestamps.append((i, ts))
bsos = []
# Try with several different pagination sizes,
# to hit various boundary conditions.
for limit in (2, 3, 4, 5, 6):
for start, ts in timestamps:
query_url = self.root + "/storage/xxx_col2?full=true&sort=oldest"
query_url += "&newer=%s&limit=%s" % (ts, limit)
# Paginated-ly fetch all items.
items = []
res = self.app.get(query_url)
for item in res.json:
if items:
assert items[-1]["modified"] <= item["modified"]
items.append(item)
next_offset = res.headers.get("X-Weave-Next-Offset")
while next_offset is not None:
res = self.app.get(query_url + "&offset=" + next_offset)
for item in res.json:
assert items[-1]["modified"] <= item["modified"]
items.append(item)
next_offset = res.headers.get("X-Weave-Next-Offset")
# They should all be in order, starting from the item
# *after* the one that was used for the newer= timestamp.
self.assertEqual(
sorted(int(item["id"]) for item in items),
list(range(start + 1, NUM_ITEMS)),
)
def test_pagination_with_older_and_sort_by_newest(self):
# Twelve bsos with three different modification times.
NUM_ITEMS = 12
bsos = []
timestamps = []
for i in range(NUM_ITEMS):
bso = {"id": str(i).zfill(2), "payload": "x"}
bsos.append(bso)
if i % 4 == 3:
res = self.retry_post_json(self.root + "/storage/xxx_col2", bsos)
ts = float(res.headers["X-Last-Modified"])
timestamps.append((i - 3, ts))
bsos = []
# Try with several different pagination sizes,
# to hit various boundary conditions.
for limit in (2, 3, 4, 5, 6):
for start, ts in timestamps:
query_url = self.root + "/storage/xxx_col2?full=true&sort=newest"
query_url += "&older=%s&limit=%s" % (ts, limit)
# Paginated-ly fetch all items.
items = []
res = self.app.get(query_url)
for item in res.json:
if items:
assert items[-1]["modified"] >= item["modified"]
items.append(item)
next_offset = res.headers.get("X-Weave-Next-Offset")
while next_offset is not None:
res = self.app.get(query_url + "&offset=" + next_offset)
for item in res.json:
assert items[-1]["modified"] >= item["modified"]
items.append(item)
next_offset = res.headers.get("X-Weave-Next-Offset")
# They should all be in order, up to the item *before*
# the one that was used for the older= timestamp.
self.assertEqual(
sorted(int(item["id"]) for item in items),
list(range(0, start)),
)
def assertCloseEnough(self, val1, val2, delta=0.05):
if abs(val1 - val2) < delta:
return True
raise AssertionError(
"abs(%.2f - %.2f) = %.2f > %.2f" % (val1, val2, abs(val1 - val2), delta)
)
def test_batches(self):
endpoint = self.root + "/storage/xxx_col2"
bso1 = {"id": "12", "payload": "elegance"}
bso2 = {"id": "13", "payload": "slovenly"}
bsos = [bso1, bso2]
self.retry_post_json(endpoint, bsos)
resp = self.app.get(endpoint + "/12")
orig_modified = resp.headers["X-Last-Modified"]
bso3 = {"id": "a", "payload": "internal"}
bso4 = {"id": "b", "payload": "pancreas"}
resp = self.retry_post_json(endpoint + "?batch=true", [bso3, bso4])
batch = resp.json["batch"]
# The collection should not be reported as modified.
self.assertEqual(orig_modified, resp.headers["X-Last-Modified"])
# And reading from it shouldn't show the new records yet.
resp = self.app.get(endpoint)
res = resp.json
res.sort()
self.assertEqual(res, ["12", "13"])
self.assertEqual(int(resp.headers["X-Weave-Records"]), 2)
self.assertEqual(orig_modified, resp.headers["X-Last-Modified"])
bso5 = {"id": "c", "payload": "tinsel"}
bso6 = {"id": "13", "payload": "portnoy"}
bso0 = {"id": "14", "payload": "itsybitsy"}
commit = "?batch={0}&commit=true".format(batch)
resp = self.retry_post_json(endpoint + commit, [bso5, bso6, bso0])
committed = resp.json["modified"]
print(committed)
self.assertEqual(resp.json["modified"], float(resp.headers["X-Last-Modified"]))
# make sure /info/collections got updated
resp = self.app.get(self.root + "/info/collections")
self.assertEqual(float(resp.headers["X-Last-Modified"]), committed)
self.assertEqual(resp.json["xxx_col2"], committed)
# make sure the changes applied
resp = self.app.get(endpoint)
res = resp.json
res.sort()
self.assertEqual(res, ["12", "13", "14", "a", "b", "c"])
self.assertEqual(int(resp.headers["X-Weave-Records"]), 6)
resp = self.app.get(endpoint + "/13")
self.assertEqual(resp.json["payload"], "portnoy")
self.assertEqual(committed, float(resp.headers["X-Last-Modified"]))
self.assertEqual(committed, resp.json["modified"])
resp = self.app.get(endpoint + "/c")
self.assertEqual(resp.json["payload"], "tinsel")
self.assertEqual(committed, resp.json["modified"])
resp = self.app.get(endpoint + "/14")
self.assertEqual(resp.json["payload"], "itsybitsy")
self.assertEqual(committed, resp.json["modified"])
# empty commit POST
bso7 = {"id": "a", "payload": "burrito"}
bso8 = {"id": "e", "payload": "chocolate"}
resp = self.retry_post_json(endpoint + "?batch=true", [bso7, bso8])
batch = resp.json["batch"]
time.sleep(1)
commit = "?batch={0}&commit=true".format(batch)
resp1 = self.retry_post_json(endpoint + commit, [])
committed = resp1.json["modified"]
self.assertEqual(committed, float(resp1.headers["X-Last-Modified"]))
resp2 = self.app.get(endpoint + "/a")
self.assertEqual(committed, float(resp2.headers["X-Last-Modified"]))
self.assertEqual(committed, resp2.json["modified"])
self.assertEqual(resp2.json["payload"], "burrito")
resp3 = self.app.get(endpoint + "/e")
self.assertEqual(committed, resp3.json["modified"])
def test_aaa_batch_commit_collision(self):
# It's possible that a batch contain a BSO inside a batch as well
# as inside the final "commit" message. This is a bit of a problem
# for spanner because of conflicting ways that the data is written
# to the database and the discoverability of IDs in previously
# submitted batches.
endpoint = self.root + "/storage/xxx_col2"
orig = "Letting the days go by"
repl = "Same as it ever was"
batch_num = self.retry_post_json(
endpoint + "?batch=true", [{"id": "b0", "payload": orig}]
).json["batch"]
resp = self.retry_post_json(
endpoint + "?batch={}&commit=true".format(batch_num),
[{"id": "b0", "payload": repl}],
)
# this should succeed, using the newerer payload value.
assert resp.json["failed"] == {}, "batch commit failed"
assert resp.json["success"] == ["b0"], "batch commit id incorrect"
resp = self.app.get(endpoint + "?full=1")
assert resp.json[0].get("payload") == repl, "wrong payload returned"
def test_we_dont_need_no_stinkin_batches(self):
endpoint = self.root + "/storage/xxx_col2"
# invalid batch ID
bso1 = {"id": "f", "payload": "pantomime"}
self.retry_post_json(endpoint + "?batch=sammich", [bso1], status=400)
# commit with no batch ID
self.retry_post_json(endpoint + "?commit=true", [], status=400)
def test_batch_size_limits(self):
limits = self.app.get(self.root + "/info/configuration").json
self.assertTrue("max_post_records" in limits)
self.assertTrue("max_post_bytes" in limits)
self.assertTrue("max_total_records" in limits)
self.assertTrue("max_total_bytes" in limits)
self.assertTrue("max_record_payload_bytes" in limits)
self.assertTrue("max_request_bytes" in limits)
endpoint = self.root + "/storage/xxx_col2?batch=true"
# There are certain obvious constraints on these limits,
# violations of which would be very confusing for clients.
#
# self.assertTrue(
# limits['max_request_bytes'] > limits['max_post_bytes']
# )
# self.assertTrue(
# limits['max_post_bytes'] >= limits['max_record_payload_bytes']
# )
# self.assertTrue(
# limits['max_total_records'] >= limits['max_post_records']
# )
# self.assertTrue(
# limits['max_total_bytes'] >= limits['max_post_bytes']
# )
#
# # `max_post_records` is an (inclusive) limit on
# # the number of items in a single post.
#
# res = self.retry_post_json(endpoint, [], headers={
# 'X-Weave-Records': str(limits['max_post_records'])
# })
# self.assertFalse(res.json['failed'])
# res = self.retry_post_json(endpoint, [], headers={
# 'X-Weave-Records': str(limits['max_post_records'] + 1)
# }, status=400)
# self.assertEqual(res.json, WEAVE_SIZE_LIMIT_EXCEEDED)
#
# bsos = [{'id': str(x), 'payload': ''}
# for x in range(limits['max_post_records'])]
# res = self.retry_post_json(endpoint, bsos)
# self.assertFalse(res.json['failed'])
# bsos.append({'id': 'toomany', 'payload': ''})
# res = self.retry_post_json(endpoint, bsos)
# self.assertEqual(res.json['failed']['toomany'], 'retry bso')
#
# # `max_total_records` is an (inclusive) limit on the
# # total number of items in a batch. We can only enforce
# # it if the client tells us this via header.
#
# self.retry_post_json(endpoint, [], headers={
# 'X-Weave-Total-Records': str(limits['max_total_records'])
# })
# res = self.retry_post_json(endpoint, [], headers={
# 'X-Weave-Total-Records': str(limits['max_total_records'] + 1)
# }, status=400)
# self.assertEqual(res.json, WEAVE_SIZE_LIMIT_EXCEEDED)
#
# # `max_post_bytes` is an (inclusive) limit on the
# # total size of payloads in a single post.
#
# self.retry_post_json(endpoint, [], headers={
# 'X-Weave-Bytes': str(limits['max_post_bytes'])
# })
# res = self.retry_post_json(endpoint, [], headers={
# 'X-Weave-Bytes': str(limits['max_post_bytes'] + 1)
# }, status=400)
# self.assertEqual(res.json, WEAVE_SIZE_LIMIT_EXCEEDED)
bsos = [
{"id": "little", "payload": "XXX"},
{"id": "big", "payload": "X" * (limits["max_post_bytes"] - 3)},
]
res = self.retry_post_json(endpoint, bsos)
self.assertFalse(res.json["failed"])
bsos[1]["payload"] += "X"
res = self.retry_post_json(endpoint, bsos)
self.assertEqual(res.json["success"], ["little"])
self.assertEqual(res.json["failed"]["big"], "retry bytes")
# `max_total_bytes` is an (inclusive) limit on the
# total size of all payloads in a batch. We can only enforce
# it if the client tells us this via header.
self.retry_post_json(
endpoint,
[],
headers={"X-Weave-Total-Bytes": str(limits["max_total_bytes"])},
)
res = self.retry_post_json(
endpoint,
[],
headers={"X-Weave-Total-Bytes": str(limits["max_total_bytes"] + 1)},
status=400,
)
self.assertEqual(res.json, WEAVE_SIZE_LIMIT_EXCEEDED)
def text_max_total_records(self):
"""Test ensuring `max_total_records` setting of 1664 in the
`/info/configuration` endpoint` is validated.
`max_total_records` is an (inclusive) limit on the total
number of items in a batch
Is related directly to `X-Weave-Total-Records` header.
"""
endpoint = self.root + "/storage/xxx_col2"
endpoint_batch = self.root + "/storage/xxx_col2?batch=true"
conf_limits = self.app.get(self.root + "/info/configuration")
try:
limit = conf_limits.json["max_total_records"]
except KeyError:
# This can't be run against a live server because we
# have to forge an auth token to test things properly.
if self.distant:
pytest.skip("Test cannot be run against a live server.")
raise
max_total_records = limit
self.assertTrue("max_total_records" in conf_limits)
self.assertTrue(max_total_records == 1664)
# We can only enforce it if the client tells us this via the
# 'X-Weave-Total-Records' header.
self.retry_post_json(endpoint_batch, [], headers={
'X-Weave-Total-Records': str(conf_limits['max_total_records'])
})
res = self.retry_post_json(endpoint_batch, [], headers={
'X-Weave-Total-Records': str(conf_limits['max_total_records'] + 1)
}, status=400)
self.assertEqual(res.json, WEAVE_SIZE_LIMIT_EXCEEDED)
# Success case within limit
bsos = [{'id': str(i), 'payload': 'X'} for i in range(max_total_records)]
resp = self.retry_post_json(endpoint_batch, bsos)
batch = resp.json["batch"]
resp = self.retry_post_json(f"{endpoint}?batch={batch}&commit=true", [])
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json['modified'], max_total_records)
# Fail case above limit (+1)
bsos = [{'id': str(i), 'payload': 'X'} for i in range(max_total_records + 1)]
resp = self.retry_post_json(endpoint_batch, bsos)
batch = resp.json["batch"]
resp = self.retry_post_json(f"{endpoint}?batch={batch}&commit=true", [])
self.assertIn('error', resp.json)
self.assertEqual(resp.status_code, 400)
def test_batch_partial_update(self):
collection = self.root + "/storage/xxx_col2"
bsos = [
{"id": "a", "payload": "aai"},
{"id": "b", "payload": "bee", "sortindex": 17},
]
resp = self.retry_post_json(collection, bsos)
orig_ts = float(resp.headers["X-Last-Modified"])
# Update one, and add a new one.
bsos = [
{"id": "b", "payload": "bii"},
{"id": "c", "payload": "sea"},
]
resp = self.retry_post_json(collection + "?batch=true", bsos)
batch = resp.json["batch"]
self.assertEqual(orig_ts, float(resp.headers["X-Last-Modified"]))
# The updated item hasn't been written yet.
resp = self.app.get(collection + "?full=1")
res = resp.json
res.sort(key=lambda bso: bso["id"])
self.assertEqual(len(res), 2)
self.assertEqual(res[0]["payload"], "aai")
self.assertEqual(res[1]["payload"], "bee")
self.assertEqual(res[0]["modified"], orig_ts)
self.assertEqual(res[1]["modified"], orig_ts)
self.assertEqual(res[1]["sortindex"], 17)
endpoint = collection + "?batch={0}&commit=true".format(batch)
resp = self.retry_post_json(endpoint, [])
commit_ts = float(resp.headers["X-Last-Modified"])
# The changes have now been applied.
resp = self.app.get(collection + "?full=1")
res = resp.json
res.sort(key=lambda bso: bso["id"])
self.assertEqual(len(res), 3)
self.assertEqual(res[0]["payload"], "aai")
self.assertEqual(res[1]["payload"], "bii")
self.assertEqual(res[2]["payload"], "sea")
self.assertEqual(res[0]["modified"], orig_ts)
self.assertEqual(res[1]["modified"], commit_ts)
self.assertEqual(res[2]["modified"], commit_ts)
# Fields not touched by the batch, should have been preserved.
self.assertEqual(res[1]["sortindex"], 17)
def test_batch_ttl_update(self):
collection = self.root + "/storage/xxx_col2"
bsos = [
{"id": "a", "payload": "ayy"},
{"id": "b", "payload": "bea"},
{"id": "c", "payload": "see"},
]
resp = self.retry_post_json(collection, bsos)
# Bump ttls as a series of individual batch operations.
resp = self.retry_post_json(collection + "?batch=true", [], status=202)
orig_ts = float(resp.headers["X-Last-Modified"])
batch = resp.json["batch"]
endpoint = collection + "?batch={0}".format(batch)
resp = self.retry_post_json(endpoint, [{"id": "a", "ttl": 2}], status=202)
self.assertEqual(orig_ts, float(resp.headers["X-Last-Modified"]))
resp = self.retry_post_json(endpoint, [{"id": "b", "ttl": 2}], status=202)
self.assertEqual(orig_ts, float(resp.headers["X-Last-Modified"]))
resp = self.retry_post_json(endpoint + "&commit=true", [], status=200)
# The payloads should be unchanged
resp = self.app.get(collection + "?full=1")
res = resp.json
res.sort(key=lambda bso: bso["id"])
self.assertEqual(len(res), 3)
self.assertEqual(res[0]["payload"], "ayy")
self.assertEqual(res[1]["payload"], "bea")
self.assertEqual(res[2]["payload"], "see")
# If we wait, the ttls should kick in
time.sleep(2.1)
resp = self.app.get(collection + "?full=1")
res = resp.json
self.assertEqual(len(res), 1)
self.assertEqual(res[0]["payload"], "see")
def test_batch_ttl_is_based_on_commit_timestamp(self):
collection = self.root + "/storage/xxx_col2"
resp = self.retry_post_json(collection + "?batch=true", [], status=202)
batch = resp.json["batch"]
endpoint = collection + "?batch={0}".format(batch)
resp = self.retry_post_json(endpoint, [{"id": "a", "ttl": 3}], status=202)
# Put some time between upload timestamp and commit timestamp.
time.sleep(1.5)
resp = self.retry_post_json(endpoint + "&commit=true", [], status=200)
# Wait a little; if ttl is taken from the time of the commit
# then it should not kick in just yet.
time.sleep(1.6)
resp = self.app.get(collection)
res = resp.json
self.assertEqual(len(res), 1)
self.assertEqual(res[0], "a")
# Wait some more, and the ttl should kick in.
time.sleep(1.6)
resp = self.app.get(collection)
res = resp.json
self.assertEqual(len(res), 0)
def test_batch_with_immediate_commit(self):
collection = self.root + "/storage/xxx_col2"
bsos = [
{"id": "a", "payload": "aih"},
{"id": "b", "payload": "bie"},
{"id": "c", "payload": "cee"},
]
resp = self.retry_post_json(
collection + "?batch=true&commit=true", bsos, status=200
)
self.assertTrue("batch" not in resp.json)
self.assertTrue("modified" in resp.json)
committed = resp.json["modified"]
resp = self.app.get(self.root + "/info/collections")
self.assertEqual(float(resp.headers["X-Last-Modified"]), committed)
self.assertEqual(resp.json["xxx_col2"], committed)
resp = self.app.get(collection + "?full=1")
self.assertEqual(float(resp.headers["X-Last-Modified"]), committed)
res = resp.json
res.sort(key=lambda bso: bso["id"])
self.assertEqual(len(res), 3)
self.assertEqual(res[0]["payload"], "aih")
self.assertEqual(res[1]["payload"], "bie")
self.assertEqual(res[2]["payload"], "cee")
def test_batch_uploads_properly_update_info_collections(self):
collection1 = self.root + "/storage/xxx_col1"
collection2 = self.root + "/storage/xxx_col2"
bsos = [
{"id": "a", "payload": "aih"},
{"id": "b", "payload": "bie"},
{"id": "c", "payload": "cee"},
]
resp = self.retry_post_json(collection1, bsos)
ts1 = resp.json["modified"]
resp = self.retry_post_json(collection2, bsos)
ts2 = resp.json["modified"]
resp = self.app.get(self.root + "/info/collections")
self.assertEqual(float(resp.headers["X-Last-Modified"]), ts2)
self.assertEqual(resp.json["xxx_col1"], ts1)
self.assertEqual(resp.json["xxx_col2"], ts2)
# Overwrite in place, timestamp should change.
resp = self.retry_post_json(collection2 + "?batch=true&commit=true", bsos[:2])
self.assertTrue(resp.json["modified"] > ts2)
ts2 = resp.json["modified"]
resp = self.app.get(self.root + "/info/collections")
self.assertEqual(float(resp.headers["X-Last-Modified"]), ts2)
self.assertEqual(resp.json["xxx_col1"], ts1)
self.assertEqual(resp.json["xxx_col2"], ts2)
# Add new items, timestamp should change
resp = self.retry_post_json(
collection1 + "?batch=true&commit=true",
[{"id": "d", "payload": "dee"}],
)
self.assertTrue(resp.json["modified"] > ts1)
self.assertTrue(resp.json["modified"] >= ts2)
ts1 = resp.json["modified"]
resp = self.app.get(self.root + "/info/collections")
self.assertEqual(float(resp.headers["X-Last-Modified"]), ts1)
self.assertEqual(resp.json["xxx_col1"], ts1)
self.assertEqual(resp.json["xxx_col2"], ts2)
def test_batch_with_failing_bsos(self):
collection = self.root + "/storage/xxx_col2"
bsos = [
{"id": "a", "payload": "aai"},
{"id": "b\n", "payload": "i am invalid", "sortindex": 17},
]
resp = self.retry_post_json(collection + "?batch=true", bsos)
self.assertEqual(len(resp.json["failed"]), 1)
self.assertEqual(len(resp.json["success"]), 1)
batch = resp.json["batch"]
bsos = [
{"id": "c", "payload": "sea"},
{"id": "d", "payload": "dii", "ttl": -12},
]
endpoint = collection + "?batch={0}&commit=true".format(batch)
resp = self.retry_post_json(endpoint, bsos)
self.assertEqual(len(resp.json["failed"]), 1)
self.assertEqual(len(resp.json["success"]), 1)
# To correctly match semantics of batchless POST, the batch
# should be committed including only the successful items.
# It is the client's responsibility to detect that some items
# failed, and decide whether to commit the batch.
resp = self.app.get(collection + "?full=1")
res = resp.json
res.sort(key=lambda bso: bso["id"])
self.assertEqual(len(res), 2)
self.assertEqual(res[0]["payload"], "aai")
self.assertEqual(res[1]["payload"], "sea")
def test_batch_id_is_correctly_scoped_to_a_collection(self):
collection1 = self.root + "/storage/xxx_col1"
bsos = [
{"id": "a", "payload": "aih"},
{"id": "b", "payload": "bie"},
{"id": "c", "payload": "cee"},
]
resp = self.retry_post_json(collection1 + "?batch=true", bsos)
batch = resp.json["batch"]
# I should not be able to add to that batch in a different collection.
endpoint2 = self.root + "/storage/xxx_col2?batch={0}".format(batch)
resp = self.retry_post_json(
endpoint2, [{"id": "d", "payload": "dii"}], status=400
)
# I should not be able to commit that batch in a different collection.
resp = self.retry_post_json(endpoint2 + "&commit=true", [], status=400)
# I should still be able to use the batch in the correct collection.
endpoint1 = collection1 + "?batch={0}".format(batch)
resp = self.retry_post_json(endpoint1, [{"id": "d", "payload": "dii"}])
resp = self.retry_post_json(endpoint1 + "&commit=true", [])
resp = self.app.get(collection1 + "?full=1")
res = resp.json
res.sort(key=lambda bso: bso["id"])
self.assertEqual(len(res), 4)
self.assertEqual(res[0]["payload"], "aih")
self.assertEqual(res[1]["payload"], "bie")
self.assertEqual(res[2]["payload"], "cee")
self.assertEqual(res[3]["payload"], "dii")
def test_users_with_the_same_batch_id_get_separate_data(self):
# Try to generate two users with the same batch-id.
# It might take a couple of attempts...
for _ in range(100):
bsos = [{"id": "a", "payload": "aih"}]
req = "/storage/xxx_col1?batch=true"
resp = self.retry_post_json(self.root + req, bsos)
batch1 = resp.json["batch"]
with self._switch_user():
bsos = [{"id": "b", "payload": "bee"}]
req = "/storage/xxx_col1?batch=true"
resp = self.retry_post_json(self.root + req, bsos)
batch2 = resp.json["batch"]
# Let the second user commit their batch.
req = "/storage/xxx_col1?batch={0}&commit=true".format(batch2)
self.retry_post_json(self.root + req, [])
# It should only have a single item.
resp = self.app.get(self.root + "/storage/xxx_col1")
self.assertEqual(resp.json, ["b"])
# The first user's collection should still be empty.
# Now have the first user commit their batch.
req = "/storage/xxx_col1?batch={0}&commit=true".format(batch1)
self.retry_post_json(self.root + req, [])
# It should only have a single item.
resp = self.app.get(self.root + "/storage/xxx_col1")
self.assertEqual(resp.json, ["a"])
# If we didn't make a conflict, try again.
if batch1 == batch2:
break
else:
pytest.skip("failed to generate conflicting batchid")
def test_that_we_dont_resurrect_committed_batches(self):
# This retry loop tries to trigger a situation where we:
# * create a batch with a single item
# * successfully commit that batch
# * create a new batch tht re-uses the same batchid
for _ in range(100):
bsos = [{"id": "i", "payload": "aye"}]
req = "/storage/xxx_col1?batch=true"
resp = self.retry_post_json(self.root + req, bsos)
batch1 = resp.json["batch"]
req = "/storage/xxx_col1?batch={0}&commit=true".format(batch1)
self.retry_post_json(self.root + req, [])
req = "/storage/xxx_col2?batch=true"
resp = self.retry_post_json(self.root + req, [])
batch2 = resp.json["batch"]
bsos = [{"id": "j", "payload": "jay"}]
req = "/storage/xxx_col2?batch={0}&commit=true".format(batch2)
self.retry_post_json(self.root + req, bsos)
# Retry if we failed to trigger re-use of the batchid.
if batch1 == batch2:
break
else:
pytest.skip("failed to trigger re-use of batchid")
# Despite having the same batchid, the second batch should
# be completely independent of the first.
resp = self.app.get(self.root + "/storage/xxx_col2")
self.assertEqual(resp.json, ["j"])
def test_batch_id_is_correctly_scoped_to_a_user(self):
collection = self.root + "/storage/xxx_col1"
bsos = [
{"id": "a", "payload": "aih"},
{"id": "b", "payload": "bie"},
{"id": "c", "payload": "cee"},
]
resp = self.retry_post_json(collection + "?batch=true", bsos)
batch = resp.json["batch"]
with self._switch_user():
# I should not be able to add to that batch as a different user.
endpoint = self.root + "/storage/xxx_col1?batch={0}".format(batch)
resp = self.retry_post_json(
endpoint, [{"id": "d", "payload": "di"}], status=400
)
# I should not be able to commit that batch as a different user.
resp = self.retry_post_json(endpoint + "&commit=true", [], status=400)
# I should still be able to use the batch in the original user.
endpoint = collection + "?batch={0}".format(batch)
resp = self.retry_post_json(endpoint, [{"id": "d", "payload": "di"}])
resp = self.retry_post_json(endpoint + "&commit=true", [])
resp = self.app.get(collection + "?full=1")
res = resp.json
res.sort(key=lambda bso: bso["id"])
self.assertEqual(len(res), 4)
self.assertEqual(res[0]["payload"], "aih")
self.assertEqual(res[1]["payload"], "bie")
self.assertEqual(res[2]["payload"], "cee")
self.assertEqual(res[3]["payload"], "di")
# bug 1332552 make sure ttl:null use the default ttl
def test_create_bso_with_null_ttl(self):
bso = {"payload": "x", "ttl": None}
self.retry_put_json(self.root + "/storage/xxx_col2/TEST1", bso)
time.sleep(0.1)
res = self.app.get(self.root + "/storage/xxx_col2/TEST1?full=1")
self.assertEqual(res.json["payload"], "x")
def test_rejection_of_known_bad_payloads(self):
bso = {
"id": "keys",
"payload": json_dumps(
{
"ciphertext": "IDontKnowWhatImDoing",
"IV": "AAAAAAAAAAAAAAAAAAAAAA==",
}
),
}
# Fishy IVs are rejected on the "crypto" collection.
self.retry_put_json(self.root + "/storage/crypto/keys", bso, status=400)
self.retry_put_json(self.root + "/storage/crypto/blerg", bso, status=400)
self.retry_post_json(self.root + "/storage/crypto", [bso], status=400)
# But are allowed on other collections.
self.retry_put_json(self.root + "/storage/xxx_col2/keys", bso, status=200)
self.retry_post_json(self.root + "/storage/xxx_col2", [bso], status=200)
# bug 1397357
def test_batch_empty_commit(self):
def testEmptyCommit(contentType, body, status=200):
bsos = [{"id": str(i).zfill(2), "payload": "X"} for i in range(5)]
res = self.retry_post_json(self.root + "/storage/xxx_col?batch=true", bsos)
self.assertEqual(len(res.json["success"]), 5)
self.assertEqual(len(res.json["failed"]), 0)
batch = res.json["batch"]
self.app.post(
self.root + "/storage/xxx_col?commit=true&batch=" + batch,
body,
headers={"Content-Type": contentType},
status=status,
)
testEmptyCommit("application/json", "[]")
testEmptyCommit("application/json", "{}", status=400)
testEmptyCommit("application/json", "", status=400)
testEmptyCommit("application/newlines", "")
testEmptyCommit("application/newlines", "\n", status=400)
testEmptyCommit("application/newlines", "{}", status=400)
testEmptyCommit("application/newlines", "[]", status=400)
def test_cors_settings_are_set(self):
res = self.app.options(
self.root + "/__heartbeat__",
headers={
"Access-Control-Request-Method": "GET",
"Origin": "localhost",
"Access-Control-Request-Headers": "Content-Type",
},
)
self.assertEqual(int(res.headers["access-control-max-age"]), 555)
self.assertEqual(res.headers["access-control-allow-origin"], "localhost")
def test_cors_allows_any_origin(self):
self.app.options(
self.root + "/__heartbeat__",
headers={
"Access-Control-Request-Method": "GET",
"Origin": "http://test-website.com",
"Access-Control-Request-Headers": "Content-Type",
},
status=200,
)
# PATCH is not a default allowed method, so request should return 405
def test_patch_is_not_allowed(self):
collection = self.root + "/storage/xxx_col1"
with self.assertRaises(AppError) as error:
self.app.patch_json(collection)
self.assertIn("405", error)