mirror of
https://github.com/mozilla-services/syncstorage-rs.git
synced 2026-03-29 01:02:20 +01:00
Some checks failed
Glean probe-scraper / glean-probe-scraper (push) Has been cancelled
Build, Tag and Push Container Images to GAR Repository / build-and-push-syncstorage-rs (push) Has been cancelled
Build, Tag and Push Container Images to GAR Repository / build-and-push-syncserver-postgres (push) Has been cancelled
Build, Tag and Push Container Images to GAR Repository / build-and-push-syncstorage-rs-spanner-python-utils (push) Has been cancelled
Build, Tag and Push Container Images to GAR Repository / build-and-push-syncserver-postgres-python-utils (push) Has been cancelled
Build, Tag and Push Container Images to GAR Repository / build-and-push-syncserver-mysql (push) Has been cancelled
MySQL Build and Test / build-and-test-mysql (push) Has been cancelled
MySQL Build and Test / build-mysql-image (push) Has been cancelled
MySQL Build and Test / mysql-e2e-tests (push) Has been cancelled
Postgres Build and Test / build-and-test-postgres (push) Has been cancelled
Postgres Build and Test / build-postgres-image (push) Has been cancelled
Postgres Build and Test / postgres-e2e-tests (push) Has been cancelled
Publish Sync docs to pages / build-mdbook (push) Has been cancelled
Publish Sync docs to pages / build-openapi (push) Has been cancelled
Publish Sync docs to pages / combine-and-prepare (push) Has been cancelled
Publish Sync docs to pages / deploy (push) Has been cancelled
Spanner Build, Test, and Push / build-and-test-spanner (push) Has been cancelled
Spanner Build, Test, and Push / build-spanner-image (push) Has been cancelled
Spanner Build, Test, and Push / spanner-e2e-tests (push) Has been cancelled
test: molotov sync loadtests
239 lines
7.7 KiB
Python
239 lines
7.7 KiB
Python
"""Load testing scenarios for SyncStorage using Molotov."""
|
|
|
|
import sys
|
|
|
|
sys.path.append(".") # NOQA
|
|
import base64
|
|
import json
|
|
import os
|
|
import random
|
|
import time
|
|
from typing import Any
|
|
|
|
from molotov import scenario, setup_session, teardown_session
|
|
from storage import StorageClient
|
|
|
|
_PAYLOAD = """\
|
|
This is the metaglobal payload which contains
|
|
some client data that doesnt look much
|
|
like this
|
|
"""
|
|
_WEIGHTS = {
|
|
"metaglobal": [40, 60, 0, 0, 0],
|
|
"distribution": [80, 15, 4, 1],
|
|
"count_distribution": [71, 15, 7, 4, 3],
|
|
"post_count_distribution": [67, 18, 9, 4, 2],
|
|
"delete_count_distribution": [99, 1, 0, 0, 0],
|
|
}
|
|
|
|
_PROBS = {"get": 0.1, "post": 0.2, "deleteall": 0.01}
|
|
_COLLS = ["bookmarks", "forms", "passwords", "history", "prefs"]
|
|
_BATCH_MAX_COUNT = 100
|
|
|
|
_DISABLE_DELETES = os.environ.get("DISABLE_DELETES", "false").lower() in ("true", "1")
|
|
|
|
|
|
def should_do(name: str) -> bool:
|
|
"""Determine if an action should be performed based on probability.
|
|
|
|
Args:
|
|
name: The name of the action to check.
|
|
|
|
Returns:
|
|
bool: True if the action should be performed, False otherwise.
|
|
|
|
"""
|
|
return random.random() <= _PROBS[name]
|
|
|
|
|
|
def get_num_requests(name: str) -> int:
|
|
"""Get the number of requests to make based on weighted distribution.
|
|
|
|
Args:
|
|
name: The name of the request type to get count for.
|
|
|
|
Returns:
|
|
int: The number of requests to make.
|
|
|
|
"""
|
|
weights = _WEIGHTS[name]
|
|
i = random.randint(1, sum(weights))
|
|
count = 0
|
|
base = 0
|
|
for weight in weights:
|
|
base += weight
|
|
if i <= base:
|
|
break
|
|
count += 1
|
|
return count
|
|
|
|
|
|
@setup_session()
|
|
async def _session(worker_num: int, session: Any) -> None:
|
|
exc = []
|
|
|
|
def _run():
|
|
try:
|
|
session.storage = StorageClient(session)
|
|
except Exception as e:
|
|
exc.append(e)
|
|
|
|
# XXX code will be migrated to Molotov
|
|
# see https://github.com/loads/molotov/issues/100
|
|
import threading
|
|
|
|
t = threading.Thread(target=_run)
|
|
t.start()
|
|
t.join()
|
|
if len(exc) > 0:
|
|
raise exc[0]
|
|
|
|
|
|
@teardown_session()
|
|
async def _teardown_session(worker_num: int, session: Any) -> None:
|
|
if hasattr(session, "storage") and session.storage:
|
|
session.storage.cleanup()
|
|
|
|
|
|
@scenario(1)
|
|
async def test(session: Any) -> None:
|
|
"""Main load test scenario for SyncStorage.
|
|
|
|
Args:
|
|
session: The Molotov session object.
|
|
|
|
"""
|
|
storage = session.storage
|
|
|
|
# Respect the server limits.
|
|
_, config = await storage.get("/info/configuration")
|
|
# print("Config: {}".format(json.dumps(config, indent=3)))
|
|
# fix up consts
|
|
payload = _PAYLOAD[: config.get("max_record_payload_bytes")]
|
|
# GET requests to meta/global
|
|
num_requests = min(get_num_requests("metaglobal"), config.get("max_post_records"))
|
|
batch_max_count = min(_BATCH_MAX_COUNT, config.get("max_total_records"))
|
|
|
|
# Always GET info/collections
|
|
# This is also a good opportunity to correct for timeskew.
|
|
url = "/info/collections"
|
|
resp, _ = await storage.get(url, (200, 404))
|
|
|
|
url = "/storage/meta/global"
|
|
|
|
for x in range(num_requests):
|
|
resp, __ = await storage.get(url, (200, 404))
|
|
if resp.status == 404:
|
|
data = json.dumps({"id": "global", "payload": payload})
|
|
await storage.put(url, data=data, statuses=(200,))
|
|
|
|
# Occasional reads of client records.
|
|
if should_do("get"):
|
|
url = "/storage/clients"
|
|
newer = int(time.time() - random.randint(3600, 360000))
|
|
params = {"full": "1", "newer": str(newer)}
|
|
await storage.get(url, params=params, statuses=(200, 404))
|
|
|
|
# Occasional updates to client records.
|
|
if should_do("post"):
|
|
cid = str(get_num_requests("distribution"))
|
|
url = "/storage/clients"
|
|
wbo = {"id": "client" + cid, "payload": cid * 300}
|
|
data = json.dumps([wbo])
|
|
resp, result = await storage.post(url, data=data, statuses=(200,))
|
|
assert len(result["success"]) == 1, "No success records"
|
|
assert len(result["failed"]) == 0, "Found failed record"
|
|
|
|
# GET requests to individual collections.
|
|
num_requests = get_num_requests("count_distribution")
|
|
cols = random.sample(_COLLS, num_requests)
|
|
for x in range(num_requests):
|
|
url = "/storage/" + cols[x]
|
|
newer = int(time.time() - random.randint(3600, 360000))
|
|
params = {"full": "1", "newer": str(newer)}
|
|
await storage.get(url, params=params, statuses=(200, 404))
|
|
|
|
# POST requests with several WBOs batched together
|
|
num_requests = get_num_requests("post_count_distribution")
|
|
# Let's do roughly 50% transactional batches.
|
|
transact = random.randint(0, 1)
|
|
batch_id = None
|
|
committing = False
|
|
|
|
# Collections should be a single static entry if we're "transactional"
|
|
if transact:
|
|
col = random.sample(_COLLS, 1)[0]
|
|
cols = [col for x in range(num_requests)]
|
|
else:
|
|
cols = random.sample(_COLLS, num_requests)
|
|
|
|
for x in range(num_requests):
|
|
url = "/storage/" + cols[x]
|
|
wbo_list: list[dict[str, str]] = []
|
|
# Random batch size, skewed slightly towards the upper limit.
|
|
items_per_batch = min(random.randint(20, batch_max_count + 80), batch_max_count)
|
|
for _i in range(items_per_batch):
|
|
randomness = os.urandom(10)
|
|
id_bytes = base64.urlsafe_b64encode(randomness).rstrip(b"=")
|
|
id_str = id_bytes.decode("utf8")
|
|
id_str += str(int((time.time() % 100) * 100000))
|
|
# Random payload length. They can be big, but skew small.
|
|
# This gives min=300, mean=450, max=config.max_record_payload_bytes
|
|
payload_length = min(
|
|
int(random.paretovariate(3) * 300),
|
|
config.get("max_record_payload_bytes"),
|
|
)
|
|
|
|
# XXX should be in the class
|
|
token = storage.auth_token.decode("utf8")
|
|
payload_chunks = int((payload_length / len(token)) + 1)
|
|
payload = (token * payload_chunks)[:payload_length]
|
|
wbo = {"id": id_str, "payload": payload}
|
|
wbo_list.append(wbo)
|
|
|
|
data = json.dumps(wbo_list)
|
|
status = 200
|
|
if transact:
|
|
# Batch uploads only return a 200 on commit. An Accepted(202)
|
|
# is returned for batch creation & appends
|
|
status = 202
|
|
if x == 0:
|
|
committing = False
|
|
url += "?batch=true"
|
|
elif x == num_requests - 1:
|
|
url += "?commit=true&batch=%s" % batch_id
|
|
committing = True
|
|
batch_id = None
|
|
status = 200
|
|
else:
|
|
url += "?batch=%s" % batch_id
|
|
|
|
resp, result = await storage.post(url, data=data, statuses=(status,))
|
|
assert len(result["success"]) == items_per_batch, (
|
|
"Result success did not have expected number ofitems in batch {}".format(
|
|
result
|
|
)
|
|
)
|
|
assert len(result["failed"]) == 0, "Result contained failed records: {}".format(
|
|
result
|
|
)
|
|
|
|
if transact and not committing:
|
|
batch_id = result["batch"]
|
|
|
|
if not _DISABLE_DELETES:
|
|
# DELETE requests.
|
|
# We might choose to delete some individual collections, or to
|
|
# do a full reset and delete all the data. Never both in the
|
|
# same run.
|
|
num_requests = get_num_requests("delete_count_distribution")
|
|
if num_requests:
|
|
cols = random.sample(_COLLS, num_requests)
|
|
for x in range(num_requests):
|
|
url = "/storage/" + cols[x]
|
|
resp, result = await storage.delete(url, statuses=(200, 204))
|
|
else:
|
|
if should_do("deleteall"):
|
|
url = "/storage"
|
|
resp, result = await storage.delete(url, statuses=(200,))
|