Copilot found in test/alternator a bunch of places where we unnecessarily assign a variable that we don't use, or had a duplicated statement which doesn't do anything. This patch fixes all of them. AI still doesn't know how to prepare a patch that looks anything close to reasonable, so I did this part manually, and also carefully investigated each and every change (this took **a lot** of human time). These patches don't change anything in the functionality of any of the tests. It's all cosmetic. Closes scylladb/scylladb#27655 * github.com:scylladb/scylladb: test/alternator: remove unnecessary duplicate statement test/alternator: remove unused variable assignments
418 lines
18 KiB
Python
418 lines
18 KiB
Python
# Copyright 2019-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
|
|
# Various utility functions which are useful for multiple tests
|
|
|
|
import string
|
|
import random
|
|
import collections
|
|
import time
|
|
import re
|
|
import requests
|
|
import json
|
|
import pytest
|
|
from contextlib import contextmanager
|
|
from botocore.hooks import HierarchicalEmitter
|
|
|
|
# The "pytest-randomly" pytest plugins modifies the default "random" to repeat
|
|
# the same pseudo-random sequence (with the same seed) in each separate test.
|
|
# But we currently rely on random_string() at al. to return unique keys that
|
|
# can be used in different tests to create different items in the same table.
|
|
# Until we stop relying on randomness for unique keys (see issue #9988), we
|
|
# need to continue the same random sequence for all tests, so let's undo what
|
|
# pytest-randomly does by explicitly sharing the same random sequence (which
|
|
# we'll call here "global_random") for all tests.
|
|
global_random = random.Random()
|
|
|
|
def random_string(length=10, chars=string.ascii_uppercase + string.digits):
|
|
return ''.join(global_random.choice(chars) for x in range(length))
|
|
|
|
def random_bytes(length=10):
|
|
return bytearray(global_random.getrandbits(8) for _ in range(length))
|
|
|
|
# Utility functions for scan and query into an array of items, reading
|
|
# the full (possibly requiring multiple requests to read successive pages).
|
|
# For convenience, ConsistentRead=True is used by default, as most tests
|
|
# need it to run correctly on a multi-node cluster. Callers who need to
|
|
# override it, can (this is necessary in GSI tests, where ConsistentRead=True
|
|
# is not supported).
|
|
def full_scan(table, ConsistentRead=True, **kwargs):
|
|
response = table.scan(ConsistentRead=ConsistentRead, **kwargs)
|
|
items = response['Items']
|
|
while 'LastEvaluatedKey' in response:
|
|
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],
|
|
ConsistentRead=ConsistentRead, **kwargs)
|
|
items.extend(response['Items'])
|
|
return items
|
|
|
|
# full_scan_and_count returns both items and count as returned by the server.
|
|
# Note that count isn't simply len(items) - the server returns them
|
|
# independently. e.g., with Select='COUNT' the items are not returned, but
|
|
# count is.
|
|
def full_scan_and_count(table, ConsistentRead=True, **kwargs):
|
|
response = table.scan(ConsistentRead=ConsistentRead, **kwargs)
|
|
items = []
|
|
count = 0
|
|
if 'Items' in response:
|
|
items.extend(response['Items'])
|
|
if 'Count' in response:
|
|
count = count + response['Count']
|
|
while 'LastEvaluatedKey' in response:
|
|
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],
|
|
ConsistentRead=ConsistentRead, **kwargs)
|
|
if 'Items' in response:
|
|
items.extend(response['Items'])
|
|
if 'Count' in response:
|
|
count = count + response['Count']
|
|
return (count, items)
|
|
|
|
# Utility function for fetching the entire results of a query into an array of items
|
|
def full_query(table, ConsistentRead=True, **kwargs):
|
|
response = table.query(ConsistentRead=ConsistentRead, **kwargs)
|
|
items = response['Items']
|
|
while 'LastEvaluatedKey' in response:
|
|
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'],
|
|
ConsistentRead=ConsistentRead, **kwargs)
|
|
items.extend(response['Items'])
|
|
return items
|
|
|
|
# full_query_and_counts returns both items and counts (pre-filter and
|
|
# post-filter count) as returned by the server.
|
|
# Note that count isn't simply len(items) - the server returns them
|
|
# independently. e.g., with Select='COUNT' the items are not returned, but
|
|
# count is.
|
|
def full_query_and_counts(table, ConsistentRead=True, **kwargs):
|
|
response = table.query(ConsistentRead=ConsistentRead, **kwargs)
|
|
items = []
|
|
prefilter_count = 0
|
|
postfilter_count = 0
|
|
pages = 0
|
|
if 'Items' in response:
|
|
items.extend(response['Items'])
|
|
pages = pages + 1
|
|
if 'Count' in response:
|
|
postfilter_count = postfilter_count + response['Count']
|
|
if 'ScannedCount' in response:
|
|
prefilter_count = prefilter_count + response['ScannedCount']
|
|
while 'LastEvaluatedKey' in response:
|
|
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'],
|
|
ConsistentRead=ConsistentRead, **kwargs)
|
|
if 'Items' in response:
|
|
items.extend(response['Items'])
|
|
pages = pages + 1
|
|
if 'Count' in response:
|
|
postfilter_count = postfilter_count + response['Count']
|
|
if 'ScannedCount' in response:
|
|
prefilter_count = prefilter_count + response['ScannedCount']
|
|
return (prefilter_count, postfilter_count, pages, items)
|
|
|
|
# To compare two lists of items (each is a dict) without regard for order,
|
|
# "==" is not good enough because it will fail if the order is different.
|
|
# The following function, multiset() converts the list into a multiset
|
|
# (set with duplicates) where order doesn't matter, so the multisets can
|
|
# be compared.
|
|
|
|
def freeze(item):
|
|
if isinstance(item, dict):
|
|
return frozenset((key, freeze(value)) for key, value in item.items())
|
|
elif isinstance(item, list):
|
|
return tuple(freeze(value) for value in item)
|
|
elif isinstance(item, bytearray):
|
|
return bytes(item)
|
|
return item
|
|
|
|
def multiset(items):
|
|
return collections.Counter([freeze(item) for item in items])
|
|
|
|
# NOTE: alternator_Test prefix contains a capital letter on purpose,
|
|
#in order to validate case sensitivity in alternator
|
|
test_table_prefix = 'alternator_Test_'
|
|
def unique_table_name():
|
|
current_ms = int(round(time.time() * 1000))
|
|
# If unique_table_name() is called twice in the same millisecond...
|
|
if unique_table_name.last_ms >= current_ms:
|
|
current_ms = unique_table_name.last_ms + 1
|
|
unique_table_name.last_ms = current_ms
|
|
return test_table_prefix + str(current_ms)
|
|
unique_table_name.last_ms = 0
|
|
|
|
def create_test_table(dynamodb, name=None, **kwargs):
|
|
if name is None:
|
|
name = unique_table_name()
|
|
BillingMode = 'PAY_PER_REQUEST'
|
|
if 'BillingMode' in kwargs:
|
|
BillingMode = kwargs['BillingMode']
|
|
del kwargs['BillingMode']
|
|
|
|
print("fixture creating new table {}".format(name))
|
|
table = dynamodb.create_table(TableName=name,
|
|
BillingMode=BillingMode, **kwargs)
|
|
waiter = table.meta.client.get_waiter('table_exists')
|
|
# recheck every second instead of the default, lower, frequency. This can
|
|
# save a few seconds on AWS with its very slow table creation, but can
|
|
# more on tests on Scylla with its faster table creation turnaround.
|
|
waiter.config.delay = 1
|
|
waiter.config.max_attempts = 200
|
|
waiter.wait(TableName=name)
|
|
return table
|
|
|
|
# A variant of create_test_table() that can be used in a "with" to
|
|
# automatically delete the table when the test ends - as:
|
|
# with new_test_table(dynamodb, ...) as table:
|
|
# When possible to share the same table between multiple tests, always
|
|
# prefer to use a fixture over using this function directly.
|
|
@contextmanager
|
|
def new_test_table(dynamodb, **kwargs):
|
|
table = create_test_table(dynamodb, **kwargs)
|
|
try:
|
|
yield table
|
|
# The user's "with" code is running during the yield, so if it
|
|
# throws an exception, we need the cleanup to be in a finally block:
|
|
finally:
|
|
print(f"Deleting table {table.name}")
|
|
table.delete()
|
|
|
|
# DynamoDB's ListTables request returns up to a single page of table names
|
|
# (e.g., up to 100) and it is up to the caller to call it again and again
|
|
# to get the next page. This is a utility function which calls it repeatedly
|
|
# as much as necessary to get the entire list.
|
|
# We deliberately return a list and not a set, because we want the caller
|
|
# to be able to recognize bugs in ListTables which causes the same table
|
|
# to be returned twice.
|
|
def list_tables(dynamodb, limit=100):
|
|
ret = []
|
|
pos = None
|
|
while True:
|
|
if pos:
|
|
page = dynamodb.meta.client.list_tables(Limit=limit, ExclusiveStartTableName=pos);
|
|
else:
|
|
page = dynamodb.meta.client.list_tables(Limit=limit);
|
|
results = page.get('TableNames', None)
|
|
if not results:
|
|
# DynamoDB may return a last empty page, without TableNames at
|
|
# all. It seems it doesn't happen on us-east-1, but does happen
|
|
# on eu-north-1 when limit=1.
|
|
break;
|
|
ret = ret + results
|
|
newpos = page.get('LastEvaluatedTableName', None)
|
|
if not newpos:
|
|
break;
|
|
# It doesn't make sense for Dynamo to tell us we need more pages, but
|
|
# not send anything in *this* page!
|
|
assert len(results) > 0
|
|
assert newpos != pos
|
|
# Note that we only checked that we got back tables, not that we got
|
|
# any new tables not already in ret. So a buggy implementation might
|
|
# still cause an endless loop getting the same tables again and again.
|
|
pos = newpos
|
|
return ret
|
|
|
|
# Boto3 conveniently transforms native Python types to DynamoDB JSON and back,
|
|
# for example one can use the string 'x' as a key and it is transparently
|
|
# transformed to the map {'S': 'x'} that Boto3 uses to represent a string.
|
|
# While these transformations are very convenient, they prevent us from
|
|
# checking various *errors* in the format of API parameters, because boto3
|
|
# verifies and/or modifies these parameters for us.
|
|
# So the following contextmanager presents a boto3 client which is modified
|
|
# to *not* do these transformations or validations at all.
|
|
@contextmanager
|
|
def client_no_transform(client):
|
|
# client.meta.events is an "emitter" object listing various hooks, which
|
|
# by default boto3 sets up as explained above. Here we temporarily
|
|
# override it with an empty emitter:
|
|
old_events = client.meta.events
|
|
client.meta.events = HierarchicalEmitter()
|
|
yield client
|
|
client.meta.events = old_events
|
|
|
|
def is_aws(dynamodb):
|
|
return dynamodb.meta.client._endpoint.host.endswith('.amazonaws.com')
|
|
|
|
# Return the AWS region name, or the Scylla data center name.
|
|
def get_region(dynamodb):
|
|
if is_aws(dynamodb):
|
|
dc_name = dynamodb.meta.client.meta.region_name
|
|
# Make sure we got a non-empty region name.
|
|
assert len(dc_name) > 0
|
|
return dc_name
|
|
system_local = dynamodb.Table('.scylla.alternator.system.local')
|
|
return system_local.scan(AttributesToGet=['data_center'])['Items'][0]['data_center']
|
|
|
|
# Tries to inject an error via Scylla REST API. It only works on Scylla,
|
|
# and only in specific build modes (dev, debug, sanitize), so this function
|
|
# will trigger a test to be skipped if it cannot be executed.
|
|
@contextmanager
|
|
def scylla_inject_error(rest_api, err, one_shot=False):
|
|
requests.post(f'{rest_api}/v2/error_injection/injection/{err}?one_shot={one_shot}')
|
|
response = requests.get(f'{rest_api}/v2/error_injection/injection')
|
|
print("Enabled error injections:", response.content.decode('utf-8'))
|
|
if response.content.decode('utf-8') == "[]":
|
|
pytest.skip("Error injection not enabled in Scylla - try compiling in dev/debug/sanitize mode")
|
|
try:
|
|
yield
|
|
finally:
|
|
print("Disabling error injection", err)
|
|
requests.delete(f'{rest_api}/v2/error_injection/injection/{err}')
|
|
|
|
# Send a message to the Scylla log. E.g., we can write a message to the log
|
|
# indicating that a test has started, which will make it easier to see which
|
|
# test caused which errors in the log.
|
|
def scylla_log(optional_rest_api, message, level):
|
|
if optional_rest_api:
|
|
requests.post(f'{optional_rest_api}/system/log?message={requests.utils.quote(message)}&level={level}')
|
|
|
|
# UpdateTable for creating a GSI is an asynchronous operation. The table's
|
|
# TableStatus changes from ACTIVE to UPDATING for a short while, and then
|
|
# goes back to ACTIVE, but the new GSI's IndexStatus appears as CREATING,
|
|
# until eventually (in Amazon DynamoDB - it tests a *long* time...) it
|
|
# becomes ACTIVE. During the CREATING phase, at some point the Backfilling
|
|
# attribute also appears, until it eventually disappears. We need to wait
|
|
# until all three markers indicate completion.
|
|
# Unfortunately, while boto3 has a client.get_waiter('table_exists') to
|
|
# wait for a table to exists, there is no such function to wait for an
|
|
# index to come up, so we need to code it ourselves.
|
|
def wait_for_gsi(table, gsi_name):
|
|
start_time = time.time()
|
|
# The timeout needs to be long because on Amazon DynamoDB, even on a
|
|
# a tiny table, it sometimes takes minutes.
|
|
while time.time() < start_time + 600:
|
|
desc = table.meta.client.describe_table(TableName=table.name)
|
|
table_status = desc['Table']['TableStatus']
|
|
if table_status != 'ACTIVE':
|
|
time.sleep(0.1)
|
|
continue
|
|
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name]
|
|
assert len(index_desc) == 1
|
|
index_status = index_desc[0]['IndexStatus']
|
|
if index_status != 'ACTIVE':
|
|
time.sleep(0.1)
|
|
continue
|
|
# When the index is ACTIVE, this must be after backfilling completed
|
|
assert not 'Backfilling' in index_desc[0]
|
|
return
|
|
raise AssertionError("wait_for_gsi did not complete")
|
|
|
|
# Similarly to how wait_for_gsi() waits for a GSI to finish adding,
|
|
# this function waits for a GSI to be finally deleted.
|
|
def wait_for_gsi_gone(table, gsi_name):
|
|
start_time = time.time()
|
|
while time.time() < start_time + 600:
|
|
desc = table.meta.client.describe_table(TableName=table.name)
|
|
table_status = desc['Table']['TableStatus']
|
|
if table_status != 'ACTIVE':
|
|
time.sleep(0.1)
|
|
continue
|
|
if 'GlobalSecondaryIndexes' in desc['Table']:
|
|
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name]
|
|
if len(index_desc) != 0:
|
|
time.sleep(0.1)
|
|
continue
|
|
return
|
|
raise AssertionError("wait_for_gsi_gone did not complete")
|
|
|
|
# Read a parameter from Scylla's configuration, stored in the system table
|
|
# which is also visible to Alternator. This function will only work on Scylla,
|
|
# and fail otherwise, so should only be used in Scylla-only tests.
|
|
# If this is Scylla, but the specific parameter name is not present in the
|
|
# configuration, this function returns None. If the parameter is present,
|
|
# it is returned (as a string).
|
|
def scylla_config_read(dynamodb, name):
|
|
config_table = dynamodb.Table('.scylla.alternator.system.config')
|
|
# We use query() here instead of the simpler get_item(), because
|
|
# commit 44a1daf only added support for system tables in Query and
|
|
# Scan, not in GetItem...
|
|
r = config_table.query(
|
|
KeyConditionExpression='#key=:val',
|
|
ExpressionAttributeNames={'#key': 'name'},
|
|
ExpressionAttributeValues={':val': name}
|
|
)
|
|
if not 'Items' in r or not r['Items']:
|
|
return None
|
|
return r['Items'][0]['value']
|
|
|
|
# Write a parameter in Scylla's configuration, using in the system table
|
|
# which is also visible to Alternator. This function will only work on Scylla,
|
|
# and fail otherwise, so should only be used in Scylla-only tests.
|
|
# Also on Scylla, this function may fail with an exception if the
|
|
# configuration parameter alternator_allow_system_table_write is not turned
|
|
# on, so callers might want to catch such an exception and skip the test.
|
|
def scylla_config_write(dynamodb, name, value):
|
|
config_table = dynamodb.Table('.scylla.alternator.system.config')
|
|
config_table.update_item(
|
|
Key={'name': name},
|
|
UpdateExpression='SET #val = :val',
|
|
ExpressionAttributeNames={'#val': 'value'},
|
|
ExpressionAttributeValues={':val': value}
|
|
)
|
|
|
|
# A context manager that can be used in a "with" to temporarily set a
|
|
# configuration parameter to a desired value, and restore its original
|
|
# value when the context ends.
|
|
# The configuration parameter has to be live-updatable.
|
|
# Note that using this mechanism is only a good idea if you're sure that
|
|
# no other workload or test is using the same Alternator cluster in parallel,
|
|
# because the changed configuration will affect the other workload too.
|
|
@contextmanager
|
|
def scylla_config_temporary(dynamodb, name, value, nop = False):
|
|
if nop:
|
|
yield
|
|
return
|
|
original_value = scylla_config_read(dynamodb, name)
|
|
scylla_config_write(dynamodb, name, value)
|
|
try:
|
|
yield
|
|
finally:
|
|
scylla_config_write(dynamodb, name, original_value)
|
|
|
|
# manual_request() can be used to send a DynamoDB API request without any
|
|
# boto3 involvement in preparing the request - the operation name and
|
|
# operation payload (a JSON string) are created by the caller. Use this
|
|
# function sparingly - most tests should use boto3's resource API or when
|
|
# needed, client_no_transform(). Although manual_request() does give the test
|
|
# more control, not using it allows the test to check the natural requests
|
|
# sent by a real-life SDK.
|
|
def manual_request(dynamodb, op, payload):
|
|
req = get_signed_request(dynamodb, op, payload)
|
|
res = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
|
if res.status_code == 200:
|
|
return json.loads(res.text)
|
|
else:
|
|
err = json.loads(res.text)
|
|
error_code = res.status_code
|
|
error_type = err['__type'].split('#')[1]
|
|
# Normally, DynamoDB uses lowercase 'message', but in some cases
|
|
# it uses 'Message', and for some types of error it may be missing
|
|
# entirely (we'll return an empty string for that).
|
|
message = err.get('message', err.get('Message', ''))
|
|
raise ManualRequestError(error_code, error_type, message)
|
|
|
|
class ManualRequestError(Exception):
|
|
def __init__(self, error_code, error_type, message):
|
|
super().__init__(message) # message is the main exception text
|
|
self.code = error_code
|
|
self.type = error_type
|
|
self.message = message
|
|
def __str__(self):
|
|
return f'{self.code} {self.type} {self.message}'
|
|
__repr__ = __str__
|
|
|
|
def get_signed_request(dynamodb, op, payload):
|
|
# Usually "payload" will be a Python string and we'll write it as UTF-8.
|
|
# but in some tests we may want to write bytes directly - potentially
|
|
# bytes which include invalid UTF-8.
|
|
payload_bytes = payload if isinstance(payload, bytes) else payload.encode(encoding='UTF-8')
|
|
# NOTE: Signing routines use boto3 implementation details and may be prone
|
|
# to unexpected changes
|
|
class Request:
|
|
url=dynamodb.meta.client._endpoint.host
|
|
headers={'X-Amz-Target': 'DynamoDB_20120810.' + op, 'Content-Type': 'application/x-amz-json-1.0'}
|
|
body=payload_bytes
|
|
method='POST'
|
|
context={}
|
|
params={}
|
|
req = Request()
|
|
signer = dynamodb.meta.client._request_signer
|
|
signer.get_auth(signer.signing_name, signer.region_name).add_auth(request=req)
|
|
return req
|