Files
scylladb/test/alternator/util.py
Pavel Emelyanov 31f90c089c Merge 'test/alternator: remove unused variable assignments and statements' from Nadav Har'El
Copilot found in test/alternator a bunch of places where we unnecessarily assign a variable that we don't use, or had a duplicated statement which doesn't do anything. This patch fixes all of them. AI still doesn't know how to prepare a patch that looks anything close to reasonable, so I did this part manually, and also carefully investigated each and every change (this took **a lot** of human time).

These patches don't change anything in the functionality of any of the tests. It's all cosmetic.

Closes scylladb/scylladb#27655

* github.com:scylladb/scylladb:
  test/alternator: remove unnecessary duplicate statement
  test/alternator: remove unused variable assignments
2025-12-16 19:23:34 +03:00

418 lines
18 KiB
Python

# Copyright 2019-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# Various utility functions which are useful for multiple tests
import string
import random
import collections
import time
import re
import requests
import json
import pytest
from contextlib import contextmanager
from botocore.hooks import HierarchicalEmitter
# The "pytest-randomly" pytest plugins modifies the default "random" to repeat
# the same pseudo-random sequence (with the same seed) in each separate test.
# But we currently rely on random_string() at al. to return unique keys that
# can be used in different tests to create different items in the same table.
# Until we stop relying on randomness for unique keys (see issue #9988), we
# need to continue the same random sequence for all tests, so let's undo what
# pytest-randomly does by explicitly sharing the same random sequence (which
# we'll call here "global_random") for all tests.
global_random = random.Random()
def random_string(length=10, chars=string.ascii_uppercase + string.digits):
return ''.join(global_random.choice(chars) for x in range(length))
def random_bytes(length=10):
return bytearray(global_random.getrandbits(8) for _ in range(length))
# Utility functions for scan and query into an array of items, reading
# the full (possibly requiring multiple requests to read successive pages).
# For convenience, ConsistentRead=True is used by default, as most tests
# need it to run correctly on a multi-node cluster. Callers who need to
# override it, can (this is necessary in GSI tests, where ConsistentRead=True
# is not supported).
def full_scan(table, ConsistentRead=True, **kwargs):
response = table.scan(ConsistentRead=ConsistentRead, **kwargs)
items = response['Items']
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],
ConsistentRead=ConsistentRead, **kwargs)
items.extend(response['Items'])
return items
# full_scan_and_count returns both items and count as returned by the server.
# Note that count isn't simply len(items) - the server returns them
# independently. e.g., with Select='COUNT' the items are not returned, but
# count is.
def full_scan_and_count(table, ConsistentRead=True, **kwargs):
response = table.scan(ConsistentRead=ConsistentRead, **kwargs)
items = []
count = 0
if 'Items' in response:
items.extend(response['Items'])
if 'Count' in response:
count = count + response['Count']
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],
ConsistentRead=ConsistentRead, **kwargs)
if 'Items' in response:
items.extend(response['Items'])
if 'Count' in response:
count = count + response['Count']
return (count, items)
# Utility function for fetching the entire results of a query into an array of items
def full_query(table, ConsistentRead=True, **kwargs):
response = table.query(ConsistentRead=ConsistentRead, **kwargs)
items = response['Items']
while 'LastEvaluatedKey' in response:
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'],
ConsistentRead=ConsistentRead, **kwargs)
items.extend(response['Items'])
return items
# full_query_and_counts returns both items and counts (pre-filter and
# post-filter count) as returned by the server.
# Note that count isn't simply len(items) - the server returns them
# independently. e.g., with Select='COUNT' the items are not returned, but
# count is.
def full_query_and_counts(table, ConsistentRead=True, **kwargs):
response = table.query(ConsistentRead=ConsistentRead, **kwargs)
items = []
prefilter_count = 0
postfilter_count = 0
pages = 0
if 'Items' in response:
items.extend(response['Items'])
pages = pages + 1
if 'Count' in response:
postfilter_count = postfilter_count + response['Count']
if 'ScannedCount' in response:
prefilter_count = prefilter_count + response['ScannedCount']
while 'LastEvaluatedKey' in response:
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'],
ConsistentRead=ConsistentRead, **kwargs)
if 'Items' in response:
items.extend(response['Items'])
pages = pages + 1
if 'Count' in response:
postfilter_count = postfilter_count + response['Count']
if 'ScannedCount' in response:
prefilter_count = prefilter_count + response['ScannedCount']
return (prefilter_count, postfilter_count, pages, items)
# To compare two lists of items (each is a dict) without regard for order,
# "==" is not good enough because it will fail if the order is different.
# The following function, multiset() converts the list into a multiset
# (set with duplicates) where order doesn't matter, so the multisets can
# be compared.
def freeze(item):
if isinstance(item, dict):
return frozenset((key, freeze(value)) for key, value in item.items())
elif isinstance(item, list):
return tuple(freeze(value) for value in item)
elif isinstance(item, bytearray):
return bytes(item)
return item
def multiset(items):
return collections.Counter([freeze(item) for item in items])
# NOTE: alternator_Test prefix contains a capital letter on purpose,
#in order to validate case sensitivity in alternator
test_table_prefix = 'alternator_Test_'
def unique_table_name():
current_ms = int(round(time.time() * 1000))
# If unique_table_name() is called twice in the same millisecond...
if unique_table_name.last_ms >= current_ms:
current_ms = unique_table_name.last_ms + 1
unique_table_name.last_ms = current_ms
return test_table_prefix + str(current_ms)
unique_table_name.last_ms = 0
def create_test_table(dynamodb, name=None, **kwargs):
if name is None:
name = unique_table_name()
BillingMode = 'PAY_PER_REQUEST'
if 'BillingMode' in kwargs:
BillingMode = kwargs['BillingMode']
del kwargs['BillingMode']
print("fixture creating new table {}".format(name))
table = dynamodb.create_table(TableName=name,
BillingMode=BillingMode, **kwargs)
waiter = table.meta.client.get_waiter('table_exists')
# recheck every second instead of the default, lower, frequency. This can
# save a few seconds on AWS with its very slow table creation, but can
# more on tests on Scylla with its faster table creation turnaround.
waiter.config.delay = 1
waiter.config.max_attempts = 200
waiter.wait(TableName=name)
return table
# A variant of create_test_table() that can be used in a "with" to
# automatically delete the table when the test ends - as:
# with new_test_table(dynamodb, ...) as table:
# When possible to share the same table between multiple tests, always
# prefer to use a fixture over using this function directly.
@contextmanager
def new_test_table(dynamodb, **kwargs):
table = create_test_table(dynamodb, **kwargs)
try:
yield table
# The user's "with" code is running during the yield, so if it
# throws an exception, we need the cleanup to be in a finally block:
finally:
print(f"Deleting table {table.name}")
table.delete()
# DynamoDB's ListTables request returns up to a single page of table names
# (e.g., up to 100) and it is up to the caller to call it again and again
# to get the next page. This is a utility function which calls it repeatedly
# as much as necessary to get the entire list.
# We deliberately return a list and not a set, because we want the caller
# to be able to recognize bugs in ListTables which causes the same table
# to be returned twice.
def list_tables(dynamodb, limit=100):
ret = []
pos = None
while True:
if pos:
page = dynamodb.meta.client.list_tables(Limit=limit, ExclusiveStartTableName=pos);
else:
page = dynamodb.meta.client.list_tables(Limit=limit);
results = page.get('TableNames', None)
if not results:
# DynamoDB may return a last empty page, without TableNames at
# all. It seems it doesn't happen on us-east-1, but does happen
# on eu-north-1 when limit=1.
break;
ret = ret + results
newpos = page.get('LastEvaluatedTableName', None)
if not newpos:
break;
# It doesn't make sense for Dynamo to tell us we need more pages, but
# not send anything in *this* page!
assert len(results) > 0
assert newpos != pos
# Note that we only checked that we got back tables, not that we got
# any new tables not already in ret. So a buggy implementation might
# still cause an endless loop getting the same tables again and again.
pos = newpos
return ret
# Boto3 conveniently transforms native Python types to DynamoDB JSON and back,
# for example one can use the string 'x' as a key and it is transparently
# transformed to the map {'S': 'x'} that Boto3 uses to represent a string.
# While these transformations are very convenient, they prevent us from
# checking various *errors* in the format of API parameters, because boto3
# verifies and/or modifies these parameters for us.
# So the following contextmanager presents a boto3 client which is modified
# to *not* do these transformations or validations at all.
@contextmanager
def client_no_transform(client):
# client.meta.events is an "emitter" object listing various hooks, which
# by default boto3 sets up as explained above. Here we temporarily
# override it with an empty emitter:
old_events = client.meta.events
client.meta.events = HierarchicalEmitter()
yield client
client.meta.events = old_events
def is_aws(dynamodb):
return dynamodb.meta.client._endpoint.host.endswith('.amazonaws.com')
# Return the AWS region name, or the Scylla data center name.
def get_region(dynamodb):
if is_aws(dynamodb):
dc_name = dynamodb.meta.client.meta.region_name
# Make sure we got a non-empty region name.
assert len(dc_name) > 0
return dc_name
system_local = dynamodb.Table('.scylla.alternator.system.local')
return system_local.scan(AttributesToGet=['data_center'])['Items'][0]['data_center']
# Tries to inject an error via Scylla REST API. It only works on Scylla,
# and only in specific build modes (dev, debug, sanitize), so this function
# will trigger a test to be skipped if it cannot be executed.
@contextmanager
def scylla_inject_error(rest_api, err, one_shot=False):
requests.post(f'{rest_api}/v2/error_injection/injection/{err}?one_shot={one_shot}')
response = requests.get(f'{rest_api}/v2/error_injection/injection')
print("Enabled error injections:", response.content.decode('utf-8'))
if response.content.decode('utf-8') == "[]":
pytest.skip("Error injection not enabled in Scylla - try compiling in dev/debug/sanitize mode")
try:
yield
finally:
print("Disabling error injection", err)
requests.delete(f'{rest_api}/v2/error_injection/injection/{err}')
# Send a message to the Scylla log. E.g., we can write a message to the log
# indicating that a test has started, which will make it easier to see which
# test caused which errors in the log.
def scylla_log(optional_rest_api, message, level):
if optional_rest_api:
requests.post(f'{optional_rest_api}/system/log?message={requests.utils.quote(message)}&level={level}')
# UpdateTable for creating a GSI is an asynchronous operation. The table's
# TableStatus changes from ACTIVE to UPDATING for a short while, and then
# goes back to ACTIVE, but the new GSI's IndexStatus appears as CREATING,
# until eventually (in Amazon DynamoDB - it tests a *long* time...) it
# becomes ACTIVE. During the CREATING phase, at some point the Backfilling
# attribute also appears, until it eventually disappears. We need to wait
# until all three markers indicate completion.
# Unfortunately, while boto3 has a client.get_waiter('table_exists') to
# wait for a table to exists, there is no such function to wait for an
# index to come up, so we need to code it ourselves.
def wait_for_gsi(table, gsi_name):
start_time = time.time()
# The timeout needs to be long because on Amazon DynamoDB, even on a
# a tiny table, it sometimes takes minutes.
while time.time() < start_time + 600:
desc = table.meta.client.describe_table(TableName=table.name)
table_status = desc['Table']['TableStatus']
if table_status != 'ACTIVE':
time.sleep(0.1)
continue
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name]
assert len(index_desc) == 1
index_status = index_desc[0]['IndexStatus']
if index_status != 'ACTIVE':
time.sleep(0.1)
continue
# When the index is ACTIVE, this must be after backfilling completed
assert not 'Backfilling' in index_desc[0]
return
raise AssertionError("wait_for_gsi did not complete")
# Similarly to how wait_for_gsi() waits for a GSI to finish adding,
# this function waits for a GSI to be finally deleted.
def wait_for_gsi_gone(table, gsi_name):
start_time = time.time()
while time.time() < start_time + 600:
desc = table.meta.client.describe_table(TableName=table.name)
table_status = desc['Table']['TableStatus']
if table_status != 'ACTIVE':
time.sleep(0.1)
continue
if 'GlobalSecondaryIndexes' in desc['Table']:
index_desc = [x for x in desc['Table']['GlobalSecondaryIndexes'] if x['IndexName'] == gsi_name]
if len(index_desc) != 0:
time.sleep(0.1)
continue
return
raise AssertionError("wait_for_gsi_gone did not complete")
# Read a parameter from Scylla's configuration, stored in the system table
# which is also visible to Alternator. This function will only work on Scylla,
# and fail otherwise, so should only be used in Scylla-only tests.
# If this is Scylla, but the specific parameter name is not present in the
# configuration, this function returns None. If the parameter is present,
# it is returned (as a string).
def scylla_config_read(dynamodb, name):
config_table = dynamodb.Table('.scylla.alternator.system.config')
# We use query() here instead of the simpler get_item(), because
# commit 44a1daf only added support for system tables in Query and
# Scan, not in GetItem...
r = config_table.query(
KeyConditionExpression='#key=:val',
ExpressionAttributeNames={'#key': 'name'},
ExpressionAttributeValues={':val': name}
)
if not 'Items' in r or not r['Items']:
return None
return r['Items'][0]['value']
# Write a parameter in Scylla's configuration, using in the system table
# which is also visible to Alternator. This function will only work on Scylla,
# and fail otherwise, so should only be used in Scylla-only tests.
# Also on Scylla, this function may fail with an exception if the
# configuration parameter alternator_allow_system_table_write is not turned
# on, so callers might want to catch such an exception and skip the test.
def scylla_config_write(dynamodb, name, value):
config_table = dynamodb.Table('.scylla.alternator.system.config')
config_table.update_item(
Key={'name': name},
UpdateExpression='SET #val = :val',
ExpressionAttributeNames={'#val': 'value'},
ExpressionAttributeValues={':val': value}
)
# A context manager that can be used in a "with" to temporarily set a
# configuration parameter to a desired value, and restore its original
# value when the context ends.
# The configuration parameter has to be live-updatable.
# Note that using this mechanism is only a good idea if you're sure that
# no other workload or test is using the same Alternator cluster in parallel,
# because the changed configuration will affect the other workload too.
@contextmanager
def scylla_config_temporary(dynamodb, name, value, nop = False):
if nop:
yield
return
original_value = scylla_config_read(dynamodb, name)
scylla_config_write(dynamodb, name, value)
try:
yield
finally:
scylla_config_write(dynamodb, name, original_value)
# manual_request() can be used to send a DynamoDB API request without any
# boto3 involvement in preparing the request - the operation name and
# operation payload (a JSON string) are created by the caller. Use this
# function sparingly - most tests should use boto3's resource API or when
# needed, client_no_transform(). Although manual_request() does give the test
# more control, not using it allows the test to check the natural requests
# sent by a real-life SDK.
def manual_request(dynamodb, op, payload):
req = get_signed_request(dynamodb, op, payload)
res = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
if res.status_code == 200:
return json.loads(res.text)
else:
err = json.loads(res.text)
error_code = res.status_code
error_type = err['__type'].split('#')[1]
# Normally, DynamoDB uses lowercase 'message', but in some cases
# it uses 'Message', and for some types of error it may be missing
# entirely (we'll return an empty string for that).
message = err.get('message', err.get('Message', ''))
raise ManualRequestError(error_code, error_type, message)
class ManualRequestError(Exception):
def __init__(self, error_code, error_type, message):
super().__init__(message) # message is the main exception text
self.code = error_code
self.type = error_type
self.message = message
def __str__(self):
return f'{self.code} {self.type} {self.message}'
__repr__ = __str__
def get_signed_request(dynamodb, op, payload):
# Usually "payload" will be a Python string and we'll write it as UTF-8.
# but in some tests we may want to write bytes directly - potentially
# bytes which include invalid UTF-8.
payload_bytes = payload if isinstance(payload, bytes) else payload.encode(encoding='UTF-8')
# NOTE: Signing routines use boto3 implementation details and may be prone
# to unexpected changes
class Request:
url=dynamodb.meta.client._endpoint.host
headers={'X-Amz-Target': 'DynamoDB_20120810.' + op, 'Content-Type': 'application/x-amz-json-1.0'}
body=payload_bytes
method='POST'
context={}
params={}
req = Request()
signer = dynamodb.meta.client._request_signer
signer.get_auth(signer.signing_name, signer.region_name).add_auth(request=req)
return req