We have a test in test_compressed_response.py that reproduces a bug where in Alternator's signature checking code, if a header had multiple consecutive spaces its signature isn't checked correctly. This patch fixes this and that xfailing test begins to pass. But it turns out that the handling of multiple consecutive spaces in headers when calculating the authentication signature is just one example of "header canonization" that the AWS Signature V4 specification requires us to do. There are additional types of header canonization that Alternator must do, and this patch also adds new tests in test_authorization.py for checking *all* the types of canonization. Fortunately, for all other types of canonizations, we already handled them correctly - Alternator already lowercases header names, sorts them alphabetically and removes leading and trailing spaces before calculating the signature. So most of the new tests added pass also without this patch, and only one of them, test_canonization_middle_whitespace, needs this patch to pass. As usual, all the new tests also pass on DynamoDB. Fixes #27775 Signed-off-by: Nadav Har'El <nyh@scylladb.com> Closes scylladb/scylladb#28102
425 lines
19 KiB
Python
425 lines
19 KiB
Python
# Copyright 2019-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
|
|
# Various utility functions which are useful for multiple tests
|
|
|
|
import string
|
|
import random
|
|
import collections
|
|
import time
|
|
import requests
|
|
import json
|
|
import pytest
|
|
from contextlib import contextmanager
|
|
from botocore.hooks import HierarchicalEmitter
|
|
|
|
# The "pytest-randomly" pytest plugins modifies the default "random" to repeat
|
|
# the same pseudo-random sequence (with the same seed) in each separate test.
|
|
# But we currently rely on random_string() at al. to return unique keys that
|
|
# can be used in different tests to create different items in the same table.
|
|
# Until we stop relying on randomness for unique keys (see issue #9988), we
|
|
# need to continue the same random sequence for all tests, so let's undo what
|
|
# pytest-randomly does by explicitly sharing the same random sequence (which
|
|
# we'll call here "global_random") for all tests.
|
|
global_random = random.Random()
|
|
|
|
def random_string(length=10, chars=string.ascii_uppercase + string.digits):
|
|
return ''.join(global_random.choice(chars) for x in range(length))
|
|
|
|
def random_bytes(length=10):
|
|
return bytearray(global_random.getrandbits(8) for _ in range(length))
|
|
|
|
# Utility functions for scan and query into an array of items, reading
|
|
# the full (possibly requiring multiple requests to read successive pages).
|
|
# For convenience, ConsistentRead=True is used by default, as most tests
|
|
# need it to run correctly on a multi-node cluster. Callers who need to
|
|
# override it, can (this is necessary in GSI tests, where ConsistentRead=True
|
|
# is not supported).
|
|
def full_scan(table, ConsistentRead=True, **kwargs):
|
|
response = table.scan(ConsistentRead=ConsistentRead, **kwargs)
|
|
items = response['Items']
|
|
while 'LastEvaluatedKey' in response:
|
|
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],
|
|
ConsistentRead=ConsistentRead, **kwargs)
|
|
items.extend(response['Items'])
|
|
return items
|
|
|
|
# full_scan_and_count returns both items and count as returned by the server.
|
|
# Note that count isn't simply len(items) - the server returns them
|
|
# independently. e.g., with Select='COUNT' the items are not returned, but
|
|
# count is.
|
|
def full_scan_and_count(table, ConsistentRead=True, **kwargs):
|
|
response = table.scan(ConsistentRead=ConsistentRead, **kwargs)
|
|
items = []
|
|
count = 0
|
|
if 'Items' in response:
|
|
items.extend(response['Items'])
|
|
if 'Count' in response:
|
|
count = count + response['Count']
|
|
while 'LastEvaluatedKey' in response:
|
|
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],
|
|
ConsistentRead=ConsistentRead, **kwargs)
|
|
if 'Items' in response:
|
|
items.extend(response['Items'])
|
|
if 'Count' in response:
|
|
count = count + response['Count']
|
|
return (count, items)
|
|
|
|
# Utility function for fetching the entire results of a query into an array of items
|
|
def full_query(table, ConsistentRead=True, **kwargs):
|
|
response = table.query(ConsistentRead=ConsistentRead, **kwargs)
|
|
items = response['Items']
|
|
while 'LastEvaluatedKey' in response:
|
|
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'],
|
|
ConsistentRead=ConsistentRead, **kwargs)
|
|
items.extend(response['Items'])
|
|
return items
|
|
|
|
# full_query_and_counts returns both items and counts (pre-filter and
|
|
# post-filter count) as returned by the server.
|
|
# Note that count isn't simply len(items) - the server returns them
|
|
# independently. e.g., with Select='COUNT' the items are not returned, but
|
|
# count is.
|
|
def full_query_and_counts(table, ConsistentRead=True, **kwargs):
|
|
response = table.query(ConsistentRead=ConsistentRead, **kwargs)
|
|
items = []
|
|
prefilter_count = 0
|
|
postfilter_count = 0
|
|
pages = 0
|
|
if 'Items' in response:
|
|
items.extend(response['Items'])
|
|
pages = pages + 1
|
|
if 'Count' in response:
|
|
postfilter_count = postfilter_count + response['Count']
|
|
if 'ScannedCount' in response:
|
|
prefilter_count = prefilter_count + response['ScannedCount']
|
|
while 'LastEvaluatedKey' in response:
|
|
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'],
|
|
ConsistentRead=ConsistentRead, **kwargs)
|
|
if 'Items' in response:
|
|
items.extend(response['Items'])
|
|
pages = pages + 1
|
|
if 'Count' in response:
|
|
postfilter_count = postfilter_count + response['Count']
|
|
if 'ScannedCount' in response:
|
|
prefilter_count = prefilter_count + response['ScannedCount']
|
|
return (prefilter_count, postfilter_count, pages, items)
|
|
|
|
# To compare two lists of items (each is a dict) without regard for order,
|
|
# "==" is not good enough because it will fail if the order is different.
|
|
# The following function, multiset() converts the list into a multiset
|
|
# (set with duplicates) where order doesn't matter, so the multisets can
|
|
# be compared.
|
|
|
|
def freeze(item):
|
|
if isinstance(item, dict):
|
|
return frozenset((key, freeze(value)) for key, value in item.items())
|
|
elif isinstance(item, list):
|
|
return tuple(freeze(value) for value in item)
|
|
elif isinstance(item, bytearray):
|
|
return bytes(item)
|
|
return item
|
|
|
|
def multiset(items):
|
|
return collections.Counter([freeze(item) for item in items])
|
|
|
|
# NOTE: alternator_Test prefix contains a capital letter on purpose,
|
|
#in order to validate case sensitivity in alternator
|
|
test_table_prefix = 'alternator_Test_'
|
|
def unique_table_name():
|
|
current_ms = int(round(time.time() * 1000))
|
|
# If unique_table_name() is called twice in the same millisecond...
|
|
if unique_table_name.last_ms >= current_ms:
|
|
current_ms = unique_table_name.last_ms + 1
|
|
unique_table_name.last_ms = current_ms
|
|
return test_table_prefix + str(current_ms)
|
|
unique_table_name.last_ms = 0
|
|
|
|
def create_test_table(dynamodb, name=None, **kwargs):
|
|
if name is None:
|
|
name = unique_table_name()
|
|
BillingMode = 'PAY_PER_REQUEST'
|
|
if 'BillingMode' in kwargs:
|
|
BillingMode = kwargs['BillingMode']
|
|
del kwargs['BillingMode']
|
|
|
|
print("fixture creating new table {}".format(name))
|
|
table = dynamodb.create_table(TableName=name,
|
|
BillingMode=BillingMode, **kwargs)
|
|
waiter = table.meta.client.get_waiter('table_exists')
|
|
# recheck every second instead of the default, lower, frequency. This can
|
|
# save a few seconds on AWS with its very slow table creation, but can
|
|
# more on tests on Scylla with its faster table creation turnaround.
|
|
waiter.config.delay = 1
|
|
waiter.config.max_attempts = 200
|
|
waiter.wait(TableName=name)
|
|
return table
|
|
|
|
# A variant of create_test_table() that can be used in a "with" to
|
|
# automatically delete the table when the test ends - as:
|
|
# with new_test_table(dynamodb, ...) as table:
|
|
# When possible to share the same table between multiple tests, always
|
|
# prefer to use a fixture over using this function directly.
|
|
@contextmanager
|
|
def new_test_table(dynamodb, **kwargs):
|
|
table = create_test_table(dynamodb, **kwargs)
|
|
try:
|
|
yield table
|
|
# The user's "with" code is running during the yield, so if it
|
|
# throws an exception, we need the cleanup to be in a finally block:
|
|
finally:
|
|
print(f"Deleting table {table.name}")
|
|
table.delete()
|
|
|
|
# DynamoDB's ListTables request returns up to a single page of table names
|
|
# (e.g., up to 100) and it is up to the caller to call it again and again
|
|
# to get the next page. This is a utility function which calls it repeatedly
|
|
# as much as necessary to get the entire list.
|
|
# We deliberately return a list and not a set, because we want the caller
|
|
# to be able to recognize bugs in ListTables which causes the same table
|
|
# to be returned twice.
|
|
def list_tables(dynamodb, limit=100):
|
|
ret = []
|
|
pos = None
|
|
while True:
|
|
if pos:
|
|
page = dynamodb.meta.client.list_tables(Limit=limit, ExclusiveStartTableName=pos);
|
|
else:
|
|
page = dynamodb.meta.client.list_tables(Limit=limit);
|
|
results = page.get('TableNames', None)
|
|
if not results:
|
|
# DynamoDB may return a last empty page, without TableNames at
|
|
# all. It seems it doesn't happen on us-east-1, but does happen
|
|
# on eu-north-1 when limit=1.
|
|
break;
|
|
ret = ret + results
|
|
newpos = page.get('LastEvaluatedTableName', None)
|
|
if not newpos:
|
|
break;
|
|
# It doesn't make sense for Dynamo to tell us we need more pages, but
|
|
# not send anything in *this* page!
|
|
assert len(results) > 0
|
|
assert newpos != pos
|
|
# Note that we only checked that we got back tables, not that we got
|
|
# any new tables not already in ret. So a buggy implementation might
|
|
# still cause an endless loop getting the same tables again and again.
|
|
pos = newpos
|
|
return ret
|
|
|
|
# Boto3 conveniently transforms native Python types to DynamoDB JSON and back,
|
|
# for example one can use the string 'x' as a key and it is transparently
|
|
# transformed to the map {'S': 'x'} that Boto3 uses to represent a string.
|
|
# While these transformations are very convenient, they prevent us from
|
|
# checking various *errors* in the format of API parameters, because boto3
|
|
# verifies and/or modifies these parameters for us.
|
|
# So the following contextmanager presents a boto3 client which is modified
|
|
# to *not* do these transformations or validations at all.
|
|
@contextmanager
|
|
def client_no_transform(client):
|
|
# client.meta.events is an "emitter" object listing various hooks, which
|
|
# by default boto3 sets up as explained above. Here we temporarily
|
|
# override it with an empty emitter:
|
|
old_events = client.meta.events
|
|
client.meta.events = HierarchicalEmitter()
|
|
yield client
|
|
client.meta.events = old_events
|
|
|
|
def is_aws(dynamodb):
|
|
return dynamodb.meta.client._endpoint.host.endswith('.amazonaws.com')
|
|
|
|
# Return the AWS region name, or the Scylla data center name.
|
|
def get_region(dynamodb):
|
|
if is_aws(dynamodb):
|
|
dc_name = dynamodb.meta.client.meta.region_name
|
|
# Make sure we got a non-empty region name.
|
|
assert len(dc_name) > 0
|
|
return dc_name
|
|
system_local = dynamodb.Table('.scylla.alternator.system.local')
|
|
return system_local.scan(AttributesToGet=['data_center'])['Items'][0]['data_center']
|
|
|
|
# Tries to inject an error via Scylla REST API. It only works on Scylla,
|
|
# and only in specific build modes (dev, debug, sanitize), so this function
|
|
# will trigger a test to be skipped if it cannot be executed.
|
|
@contextmanager
|
|
def scylla_inject_error(rest_api, err, one_shot=False):
|
|
requests.post(f'{rest_api}/v2/error_injection/injection/{err}?one_shot={one_shot}')
|
|
response = requests.get(f'{rest_api}/v2/error_injection/injection')
|
|
print("Enabled error injections:", response.content.decode('utf-8'))
|
|
if response.content.decode('utf-8') == "[]":
|
|
pytest.skip("Error injection not enabled in Scylla - try compiling in dev/debug/sanitize mode")
|
|
try:
|
|
yield
|
|
finally:
|
|
print("Disabling error injection", err)
|
|
requests.delete(f'{rest_api}/v2/error_injection/injection/{err}')
|
|
|
|
# Send a message to the Scylla log. E.g., we can write a message to the log
|
|
# indicating that a test has started, which will make it easier to see which
|
|
# test caused which errors in the log.
|
|
def scylla_log(optional_rest_api, message, level):
|
|
if optional_rest_api:
|
|
requests.post(f'{optional_rest_api}/system/log?message={requests.utils.quote(message)}&level={level}')
|
|
|
|
# UpdateTable for creating a GSI is an asynchronous operation. The table's
|
|
# TableStatus changes from ACTIVE to UPDATING for a short while, and then
|
|
# goes back to ACTIVE, but the new GSI's IndexStatus appears as CREATING,
|
|
# until eventually (in Amazon DynamoDB - it takes a *long* time...) it
|
|
# becomes ACTIVE. During the CREATING phase, at some point the Backfilling
|
|
# attribute also appears, until it eventually disappears. We need to wait
|
|
# until all three markers indicate completion.
|
|
# Unfortunately, while boto3 has a client.get_waiter('table_exists') to
|
|
# wait for a table to exists, there is no such function to wait for an
|
|
# index to come up, so we need to code it ourselves.
|
|
def wait_for_gsi(table, gsi_name):
|
|
start_time = time.time()
|
|
# On Amazon DynamoDB, adding a GSI, even to a tiny table, seems to take
|
|
# as much as 20 minutes. So we need to use an absurdly long "timeout" when
|
|
# running the test on AWS. We also shouldn't retry very often, so we pick
|
|
# a longer "delay"
|
|
if is_aws(table):
|
|
timeout = 1800
|
|
delay = 3
|
|
else:
|
|
timeout = 60
|
|
delay = 0.1
|
|
while time.time() < start_time + timeout:
|
|
desc = table.meta.client.describe_table(TableName=table.name)
|
|
table_status = desc['Table']['TableStatus']
|
|
if table_status != 'ACTIVE':
|
|
time.sleep(delay)
|
|
continue
|
|
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name]
|
|
assert len(index_desc) == 1
|
|
index_status = index_desc[0]['IndexStatus']
|
|
if index_status != 'ACTIVE':
|
|
time.sleep(delay)
|
|
continue
|
|
# When the index is ACTIVE, this must be after backfilling completed
|
|
assert not 'Backfilling' in index_desc[0]
|
|
return
|
|
raise AssertionError("wait_for_gsi did not complete")
|
|
|
|
# Similarly to how wait_for_gsi() waits for a GSI to finish adding,
|
|
# this function waits for a GSI to be finally deleted.
|
|
def wait_for_gsi_gone(table, gsi_name):
|
|
start_time = time.time()
|
|
while time.time() < start_time + 600:
|
|
desc = table.meta.client.describe_table(TableName=table.name)
|
|
table_status = desc['Table']['TableStatus']
|
|
if table_status != 'ACTIVE':
|
|
time.sleep(0.1)
|
|
continue
|
|
if 'GlobalSecondaryIndexes' in desc['Table']:
|
|
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name]
|
|
if len(index_desc) != 0:
|
|
time.sleep(0.1)
|
|
continue
|
|
return
|
|
raise AssertionError("wait_for_gsi_gone did not complete")
|
|
|
|
# Read a parameter from Scylla's configuration, stored in the system table
|
|
# which is also visible to Alternator. This function will only work on Scylla,
|
|
# and fail otherwise, so should only be used in Scylla-only tests.
|
|
# If this is Scylla, but the specific parameter name is not present in the
|
|
# configuration, this function returns None. If the parameter is present,
|
|
# it is returned (as a string).
|
|
def scylla_config_read(dynamodb, name):
|
|
config_table = dynamodb.Table('.scylla.alternator.system.config')
|
|
# We use query() here instead of the simpler get_item(), because
|
|
# commit 44a1daf only added support for system tables in Query and
|
|
# Scan, not in GetItem...
|
|
r = config_table.query(
|
|
KeyConditionExpression='#key=:val',
|
|
ExpressionAttributeNames={'#key': 'name'},
|
|
ExpressionAttributeValues={':val': name}
|
|
)
|
|
if not 'Items' in r or not r['Items']:
|
|
return None
|
|
return r['Items'][0]['value']
|
|
|
|
# Write a parameter in Scylla's configuration, using in the system table
|
|
# which is also visible to Alternator. This function will only work on Scylla,
|
|
# and fail otherwise, so should only be used in Scylla-only tests.
|
|
# Also on Scylla, this function may fail with an exception if the
|
|
# configuration parameter alternator_allow_system_table_write is not turned
|
|
# on, so callers might want to catch such an exception and skip the test.
|
|
def scylla_config_write(dynamodb, name, value):
|
|
config_table = dynamodb.Table('.scylla.alternator.system.config')
|
|
config_table.update_item(
|
|
Key={'name': name},
|
|
UpdateExpression='SET #val = :val',
|
|
ExpressionAttributeNames={'#val': 'value'},
|
|
ExpressionAttributeValues={':val': value}
|
|
)
|
|
|
|
# A context manager that can be used in a "with" to temporarily set a
|
|
# configuration parameter to a desired value, and restore its original
|
|
# value when the context ends.
|
|
# The configuration parameter has to be live-updatable.
|
|
# Note that using this mechanism is only a good idea if you're sure that
|
|
# no other workload or test is using the same Alternator cluster in parallel,
|
|
# because the changed configuration will affect the other workload too.
|
|
@contextmanager
|
|
def scylla_config_temporary(dynamodb, name, value, nop = False):
|
|
if nop:
|
|
yield
|
|
return
|
|
original_value = scylla_config_read(dynamodb, name)
|
|
scylla_config_write(dynamodb, name, value)
|
|
try:
|
|
yield
|
|
finally:
|
|
scylla_config_write(dynamodb, name, original_value)
|
|
|
|
# manual_request() can be used to send a DynamoDB API request without any
|
|
# boto3 involvement in preparing the request - the operation name and
|
|
# operation payload (a JSON string) are created by the caller. Use this
|
|
# function sparingly - most tests should use boto3's resource API or when
|
|
# needed, client_no_transform(). Although manual_request() does give the test
|
|
# more control, not using it allows the test to check the natural requests
|
|
# sent by a real-life SDK.
|
|
def manual_request(dynamodb, op, payload):
|
|
req = get_signed_request(dynamodb, op, payload)
|
|
res = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
|
if res.status_code == 200:
|
|
return json.loads(res.text)
|
|
else:
|
|
err = json.loads(res.text)
|
|
error_code = res.status_code
|
|
error_type = err['__type'].split('#')[1]
|
|
# Normally, DynamoDB uses lowercase 'message', but in some cases
|
|
# it uses 'Message', and for some types of error it may be missing
|
|
# entirely (we'll return an empty string for that).
|
|
message = err.get('message', err.get('Message', ''))
|
|
raise ManualRequestError(error_code, error_type, message)
|
|
|
|
class ManualRequestError(Exception):
|
|
def __init__(self, error_code, error_type, message):
|
|
super().__init__(message) # message is the main exception text
|
|
self.code = error_code
|
|
self.type = error_type
|
|
self.message = message
|
|
def __str__(self):
|
|
return f'{self.code} {self.type} {self.message}'
|
|
__repr__ = __str__
|
|
|
|
def get_signed_request(dynamodb, op, payload, extra_headers=None):
|
|
# Usually "payload" will be a Python string and we'll write it as UTF-8.
|
|
# but in some tests we may want to write bytes directly - potentially
|
|
# bytes which include invalid UTF-8.
|
|
payload_bytes = payload if isinstance(payload, bytes) else payload.encode(encoding='UTF-8')
|
|
# NOTE: Signing routines use boto3 implementation details and may be prone
|
|
# to unexpected changes
|
|
class Request:
|
|
url=dynamodb.meta.client._endpoint.host
|
|
headers={'X-Amz-Target': 'DynamoDB_20120810.' + op, 'Content-Type': 'application/x-amz-json-1.0'} | (extra_headers or {})
|
|
body=payload_bytes
|
|
method='POST'
|
|
context={}
|
|
params={}
|
|
req = Request()
|
|
signer = dynamodb.meta.client._request_signer
|
|
signer.get_auth(signer.signing_name, signer.region_name).add_auth(request=req)
|
|
return req
|