Files
scylladb/test/cluster/dtest/alternator_tests.py
Piotr Szymaniak f511264831 alternator/test: fix test_ttl_with_load_and_decommission flaky Connection refused error
The native Scylla nodetool reports ECONNREFUSED as 'Connection refused',
not as 'ConnectException' (which is the Java nodetool format). Add
'Connection refused' to the valid_errors list so that transient
connection failures during concurrent decommission/bootstrap topology
changes are properly tolerated.

Fixes SCYLLADB-1167

Closes scylladb/scylladb#29156
2026-03-22 11:01:45 +02:00

723 lines
36 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import os
import random
import string
import tempfile
import time
from concurrent.futures.thread import ThreadPoolExecutor
from pprint import pformat
import pytest
import requests
from botocore.exceptions import ClientError, EndpointConnectionError
from ccmlib.scylla_node import NodetoolError, ScyllaNode
from deepdiff import DeepDiff
from alternator.utils import schemas
from alternator_utils import (
ALTERNATOR_SECURE_PORT,
ALTERNATOR_SNAPSHOT_FOLDER,
DEFAULT_STRING_LENGTH,
LONGEST_TABLE_SIZE,
NUM_OF_ELEMENTS_IN_SET,
NUM_OF_ITEMS,
SHORTEST_TABLE_SIZE,
TABLE_NAME,
BaseAlternator,
Gsi,
WriteIsolation,
full_query,
generate_put_request_items,
random_string,
set_write_isolation,
)
from dtest_class import get_ip_from_node, wait_for
from tools.cluster import new_node
from tools.retrying import retrying
logger = logging.getLogger(__name__)
class SlowQueriesLoggingError(Exception):
pass
class ConcurrencyLimitNotExceededError(Exception):
pass
class TesterAlternator(BaseAlternator):
def _num_of_nodes_for_test(self, rf):
assert rf >= 3
nr_nodes = rf
if "tablets" in self.scylla_features:
# Alternator uses RF=3 on clusters with three or more live nodes,
# and writes uses CF=QUORUM. also, tablets enforces the RF
# constraints. so, we need to add one spare node to the cluster, so
# that we are allowed to decommission a node.
nr_nodes += 1
return nr_nodes
def test_load_older_snapshot_and_refresh(self):
"""
The test loading older snapshot files and checking the refresh command works
Test will:
- Create a cluster of 3 nodes.
- Load older snapshot files
- Execute nodetool `refresh` command
- Verify after `refresh` commands are equal to snapshot data
"""
table_name, node_idx = TABLE_NAME, 0
snapshot_folder = os.path.join(ALTERNATOR_SNAPSHOT_FOLDER, "scylla4.0.rc1")
self.prepare_dynamodb_cluster(num_of_nodes=3)
node1 = self.cluster.nodelist()[node_idx]
self.create_table(table_name=table_name, node=node1)
table_data = self.create_items()
self.load_snapshot_and_refresh(table_name=table_name, node=node1, snapshot_folder=snapshot_folder)
diff = self.compare_table_data(table_name=table_name, expected_table_data=table_data, node=node1)
assert not diff, f"The following items are missing:\n{pformat(diff)}"
def test_create_snapshot_and_refresh(self, request):
"""
The test checks the behavior of the `snapshot` and `refresh` commands for Alternator
Test will:
- Create a cluster of 3 nodes.
- Create a new table with 100 items.
- Create a snapshot of the table we created earlier.
- Copy the files from the `snapshot` command and moved it to the table folder and used the `refresh`
command to load the data.
- Verify the data before `snapshot` and after `refresh` commands are equal.
"""
table_name, num_of_items = TABLE_NAME, NUM_OF_ITEMS
self.prepare_dynamodb_cluster(num_of_nodes=1)
node1 = self.cluster.nodelist()[0]
self.create_table(table_name=table_name, node=node1)
new_items = self.create_items(num_of_items=num_of_items)
self.batch_write_actions(table_name=table_name, node=node1, new_items=new_items)
data_before_refresh = self.scan_table(table_name=table_name, node=node1)
snapshot_folder = tempfile.mkdtemp()
request.addfinalizer(lambda: node1.rmtree(snapshot_folder))
self.create_snapshot(table_name=TABLE_NAME, node=node1, snapshot_folder=snapshot_folder)
self.delete_table(table_name=table_name, node=node1)
self.create_table(table_name=table_name, node=node1)
self.load_snapshot_and_refresh(table_name=table_name, node=node1, snapshot_folder=snapshot_folder)
diff = self.compare_table_data(table_name=table_name, expected_table_data=data_before_refresh, node=node1)
assert not diff, f"The following items are missing:\n{pformat(diff)}"
def test_dynamo_gsi(self):
self.prepare_dynamodb_cluster(num_of_nodes=4)
node1 = self.cluster.nodelist()[0]
self.create_table(node=node1, create_gsi=True)
logger.info("Writing Alternator data on a table with GSI")
items = generate_put_request_items(num_of_items=NUM_OF_ITEMS, add_gsi=True)
node_resource_table = self.batch_write_actions(table_name=TABLE_NAME, node=node1, new_items=items)
node = self.cluster.nodelist()[1]
logger.info(f"Stopping {node.name} before testing GSI query")
node.stop()
logger.info("Testing and validating a query using GSI")
gsi_filtered_val = items[random.randint(0, NUM_OF_ITEMS - 1)][Gsi.ATTRIBUTE_NAME]
expected_items = [item for item in items if item[Gsi.ATTRIBUTE_NAME] == gsi_filtered_val]
key_condition = {Gsi.ATTRIBUTE_NAME: {"AttributeValueList": [gsi_filtered_val], "ComparisonOperator": "EQ"}}
result_items = full_query(node_resource_table, IndexName=Gsi.NAME, KeyConditions=key_condition)
diff_result = DeepDiff(t1=result_items, t2=expected_items, ignore_order=True)
assert not diff_result, f"The following items differs:\n{pformat(diff_result)}"
def test_drain_during_dynamo_load(self):
"""
1. Create a load of read + update-items delete-set-elements
2. Run nodetool drain for one node.
3. The test verifies that Alternator queries load runs ok during drain, no db-node errors / core-dumps etc.
"""
self.prepare_dynamodb_cluster(num_of_nodes=3)
node1, _, node3 = self.cluster.nodelist()
self.create_table(table_name=TABLE_NAME, node=node1)
items = self.create_items(num_of_items=NUM_OF_ITEMS, use_set_data_type=True)
self.batch_write_actions(table_name=TABLE_NAME, node=node1, new_items=items)
read_and_delete_set_elements_thread = self.run_delete_set_elements_stress(table_name=TABLE_NAME, node=node1)
logger.info(f"Start drain for: {node3.name}")
node3.drain()
logger.info("Drain finished")
read_and_delete_set_elements_thread.join()
def test_decommission_during_dynamo_load(self):
self.prepare_dynamodb_cluster(num_of_nodes=3)
node1, node2, node3 = self.cluster.nodelist()
self.create_table(table_name=TABLE_NAME, node=node1)
items = self.create_items(num_of_items=NUM_OF_ITEMS)
self.batch_write_actions(table_name=TABLE_NAME, node=node1, new_items=items)
alternator_consistent_stress = self.run_read_stress(table_name=TABLE_NAME, node=node1)
logger.info(f"Start first decommission during consistent Alternator-load for: {node2.name}")
node2.decommission()
logger.info("Decommission finished")
alternator_consistent_stress.join()
alternator_non_consistent_stress = self.run_read_stress(table_name=TABLE_NAME, node=node1, consistent_read=False)
logger.info(f"Start a second decommission during non-consistent alternator-load for: {node3.name}")
node3.decommission()
logger.info("Decommission finished")
alternator_non_consistent_stress.join()
logger.info("Check that the correct error is returned for a consistent read where cluster has no quorum")
with pytest.raises(expected_exception=(ClientError,), match="Cannot achieve consistency level for cl LOCAL_QUORUM"):
self.get_table_items(table_name=TABLE_NAME, node=node1, num_of_items=10, consistent_read=True)
logger.info("Check that the correct error is returned for a resource of a decommissioned node")
dynamodb_api_node2 = self.get_dynamodb_api(node=node2)
# pylint:disable = attribute-defined-outside-init
self.node2_resource_table = dynamodb_api_node2.resource.Table(TABLE_NAME)
with pytest.raises(expected_exception=(EndpointConnectionError,), match="Could not connect to the endpoint URL"):
self.get_table_items(table_name=TABLE_NAME, node=node2, num_of_items=10, consistent_read=True)
def test_dynamo_reads_after_repair(self):
self.prepare_dynamodb_cluster(num_of_nodes=3)
node1, node2 = self.cluster.nodelist()[:2]
logger.info(f"Adding data for all nodes except {node2.name}...")
node2.flush()
logger.info(f"Stopping {node2.name}")
node2.stop(wait_other_notice=True)
self.create_table(table_name=TABLE_NAME, node=node1)
items = self.create_items(num_of_items=NUM_OF_ITEMS)
self.batch_write_actions(table_name=TABLE_NAME, node=node1, new_items=items)
logger.info(f"Starting {node2.name}")
node2.start(wait_other_notice=True, wait_for_binary_proto=True)
logger.info(f"starting repair on {node2.name}...")
info = node2.repair()
logger.info(f"{info[0]}\n{info[1]}")
logger.info(f"Reading Alternator queries from node {node2.name}")
self.get_table_items(table_name=TABLE_NAME, node=node2)
def test_dynamo_queries_on_multi_dc(self):
self.prepare_dynamodb_cluster(num_of_nodes=3, is_multi_dc=True)
dc1_node = self.cluster.nodelist()[0]
self.create_table(table_name=TABLE_NAME, node=dc1_node)
logger.info(f"Writing Alternator queries to node {dc1_node.name} on data-center {dc1_node.data_center}")
items = self.create_items(num_of_items=NUM_OF_ITEMS)
self.batch_write_actions(table_name=TABLE_NAME, node=dc1_node, new_items=items)
dc2_node = next(node for node in self.cluster.nodelist() if node.data_center != dc1_node.data_center)
logger.info(f"Reading Alternator queries from node {dc2_node.name} on data-center {dc2_node.data_center}")
wait_for(func=lambda: not self.compare_table_data(expected_table_data=items, table_name=TABLE_NAME, node=dc2_node, consistent_read=False), timeout=5 * 60, text="Waiting until the DC2 will contain all items that insert in DC1")
def test_dynamo_reads_after_new_node_repair(self):
num_of_nodes = self._num_of_nodes_for_test(rf=3)
self.prepare_dynamodb_cluster(num_of_nodes=num_of_nodes)
node1, node2, node3, *_ = self.cluster.nodelist()
logger.info("Adding data for all nodes")
self.prefill_dynamodb_table(node=node1)
logger.info(f"Decommissioning {node3.name}")
node3.decommission()
logger.info("Add node4..")
node4 = new_node(self.cluster, bootstrap=True)
logger.info("Start node4..")
node4.start(wait_for_binary_proto=True, wait_other_notice=True)
logger.info(f"starting repair on {node4.name}...")
stdout, stderr = node4.repair()
logger.info(f"nodetool repair : stdout={stdout}, stderr={stderr}")
logger.info(f"Stopping {node1.name}")
node1.stop(wait_other_notice=True)
logger.info(f"Stopping {node2.name}")
node2.stop(wait_other_notice=True)
tested_node = node4
logger.info(f"Reading Alternator queries from node {tested_node.name}")
self.get_table_items(table_name=TABLE_NAME, node=tested_node, consistent_read=False)
def test_batch_with_auto_snapshot_false(self):
"""Test triggers scylladb/scylladb#6995"""
self.prepare_dynamodb_cluster(num_of_nodes=1, extra_config=dict(auto_snapshot=False))
node1 = self.cluster.nodelist()[0]
table = self.create_table(node=node1, schema=schemas.CONDITION_EXPRESSION_SCHEMA)
load = "x" * 10240
with table.batch_writer() as batch:
for i in range(10000):
batch.put_item({"pk": random_string(length=DEFAULT_STRING_LENGTH), "c": i, "a": load})
self.delete_table(TABLE_NAME, node1)
def test_update_condition_unused_entries_short_circuit(self):
"""
A test for https://github.com/scylladb/scylla/issues/6572 plus a multi DC configuration
"""
self.prepare_dynamodb_cluster(num_of_nodes=3, is_multi_dc=True)
node1 = self.cluster.nodelist()[0]
logger.info("Creating a table..")
table = self.create_table(table_name=TABLE_NAME, node=node1)
new_pk_val = random_string(length=DEFAULT_STRING_LENGTH)
logger.info("simple update item")
table.update_item(Key={self._table_primary_key: new_pk_val}, AttributeUpdates={"a": {"Value": 1, "Action": "PUT"}})
conditional_update_short_circuit = dict(
Key={self._table_primary_key: new_pk_val},
ConditionExpression="#name1 = :val1 OR #name2 = :val2 OR :val3 = :val2",
UpdateExpression="SET #name3 = :val3",
ExpressionAttributeNames={"#name1": "a", "#name2": "b", "#name3": "c"},
ExpressionAttributeValues={":val1": 1, ":val2": 2, ":val3": 3},
)
dc2_node = next(node for node in self.cluster.nodelist() if node.data_center != node1.data_center)
dc2_table = self.get_table(table_name=TABLE_NAME, node=dc2_node)
wait_for(self.is_table_schema_synced, timeout=30, text="Waiting until table schema is updated", table_name=TABLE_NAME, nodes=[node1, dc2_node])
node1.stop()
logger.info("Testing and validating an update query using key condition expression")
logger.info(f"ConditionExpression update of short circuit is: {conditional_update_short_circuit}")
dc2_table.update_item(**conditional_update_short_circuit)
dc2_node.stop()
node1.start()
self.wait_for_alternator(node=node1)
logger.info(f"Reading Alternator queries from node {node1.name} on data-center {node1.data_center}")
item = table.get_item(Key={self._table_primary_key: new_pk_val}, ConsistentRead=True)["Item"]
assert item == {self._table_primary_key: new_pk_val, "a": 1, "c": 3}
def test_modified_tag_is_propagated_to_other_dc(self):
self.prepare_dynamodb_cluster(num_of_nodes=1, is_multi_dc=True)
node1 = self.cluster.nodelist()[0]
table = self.prefill_dynamodb_table(node=node1)
dc2_node = next(node for node in self.cluster.nodelist() if node.data_center != node1.data_center)
logger.info("Check that updating write-isolation tag on one DC is propagated to a node of the other DC (dc2)")
set_write_isolation(table, WriteIsolation.FORBID_RMW)
wait_for(self.is_table_schema_synced, timeout=30, step=3, text="Waiting until table schema is updated", table_name=TABLE_NAME, nodes=[node1, dc2_node])
def test_read_system_tables_via_dynamodb_api(self):
"""
make sure we could only read system tables via dynamodb api
https://github.com/scylladb/scylla/issues/6122
"""
self.prepare_dynamodb_cluster(num_of_nodes=3)
all_nodes = self.cluster.nodelist()
# check each node peer are the node we expect
for node in all_nodes:
results = self.scan_table(".scylla.alternator.system.peers", node=node)
peers = set(item["peer"] for item in results)
# get all the other nodes ip addresses
other_nodes_ips = set(get_ip_from_node(n) for n in set(all_nodes).difference({node}))
assert peers == other_nodes_ips, f"peers in {node.name} are not as expected {other_nodes_ips}"
logger.info("trying to write into system table, which should be readonly")
with pytest.raises(expected_exception=(ClientError,), match="ResourceNotFoundException"):
self.batch_write_actions(table_name=".scylla.alternator.system.peers", new_items=[dict(pk=1)], node=node)
def test_table_name_with_dot_prefix(self):
valid_dynamodb_chars = list(string.digits) + list(string.ascii_uppercase) + ["_", "-", "."]
self.prepare_dynamodb_cluster(num_of_nodes=3)
node1 = self.cluster.nodelist()[0]
table_name_with_dot_prefix = "." + "".join(random.choices(valid_dynamodb_chars, k=min(random.choice(range(SHORTEST_TABLE_SIZE, LONGEST_TABLE_SIZE + 1)), 100)))
logger.info("Creating new table with dot ('.') char prefix")
self.create_table(node=node1, table_name=table_name_with_dot_prefix)
cmd = f"tablestats alternator_{table_name_with_dot_prefix}"
logger.info(f"Executing the following command '{cmd}' (expected to fail)")
try:
node1.nodetool(cmd)
except Exception as e: # noqa: BLE001
msg = str(e)
assert "Unknown keyspace: alternator_" in msg, msg
# The slash at the end tells nodetool is required
# when the keyspace contains dot(s)
cmd = f"{cmd}/"
logger.info(f"Executing the following command '{cmd}'")
node1.nodetool(cmd)
@pytest.mark.parametrize("create_gsi", [False, True])
def test_alternator_nodetool_tablestats(self, create_gsi):
"""nodetool_additional_test.py::TesterAlternator::test_cfstats_syntax
tests "nodetool cfstats" with various combinations of keyspace
and table names. In this test we check the same thing but on an
Alternator table, and using the new command name "tablestats".
With with_gsi=True, this test to reproduces enterprise issue #3522,
where "nodetool tablestats" produces no output if we have an
Alternator GSI.
"""
self.prepare_dynamodb_cluster(num_of_nodes=1)
node1 = self.cluster.nodelist()[0]
table_name = "some_long_table_name_abc"
self.create_table(node=node1, table_name=table_name, create_gsi=create_gsi)
logger.info('Trying "nodetool tablestats" without parameters')
ret = node1.nodetool("tablestats")
assert table_name in str(ret)
logger.info('Trying "nodetool tablestats" of keyspace')
# Note that Alternator always creates a keyspace whose name is
# the table's name with the extra prefix "alternator_".
ret = node1.nodetool(f"tablestats alternator_{table_name}")
assert table_name in str(ret)
logger.info('Trying "nodetool tablestats" of keyspace.table')
ret = node1.nodetool(f"tablestats alternator_{table_name}.{table_name}")
assert table_name in str(ret)
@pytest.mark.parametrize("create_gsi", [False, True])
def test_alternator_nodetool_info(self, create_gsi):
"""Tests that "nodetool info" works on Alternator tables.
When create_gsi=True, it reproduces Scylla Enterprise issue #3512,
where "nodetool info" throws an exception if an Alternator
GSI exists.
"""
self.prepare_dynamodb_cluster(num_of_nodes=1)
node1 = self.cluster.nodelist()[0]
self.create_table(node=node1, table_name="tbl", create_gsi=create_gsi)
node1.nodetool("info")
# We don't check anything specific in the output, just that
# "nodetool info" didn't fail as it used to.
def test_putitem_contention(self): # pylint:disable=too-many-locals
"""
This test reproduces issue #7218, where PutItem operations sometimes
lost part of the item being written - some attributes were lost, and
the name of other attributes replaced by empty strings. The problem
happenes when the write-isolation policy is LWT and there is
contention of writes to the same partition (not necessarily the same
item) happening on more than one coordinator.
To reproduce this contention, we need to start (at least) two nodes,
and connect to them concurrently from two threads.
"""
# Create table with 1 node cluster so that it gets RF=1.
# Otherwise, with RF=2 the contention will cause write timeouts.
# With rf_rack_valid_keyspaces being enabled by default, RF is equal to number of nodes.
# Prior to that, if number of nodes is smaller than 3, alternator creates the keyspace with RF=1.
self.prepare_dynamodb_cluster(num_of_nodes=1)
[node1] = self.cluster.nodelist()
node1_resource = self.get_dynamodb_api(node=node1).resource
# Create the table, access it through the two connections, r1 and r2:
table_name = "test_putitem_contention_table"
table_r1 = node1_resource.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "p", "KeyType": "HASH"}, {"AttributeName": "c", "KeyType": "RANGE"}],
AttributeDefinitions=[{"AttributeName": "p", "AttributeType": "S"}, {"AttributeName": "c", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
waiter = node1_resource.meta.client.get_waiter("table_exists")
waiter.wait(TableName=table_name)
node2 = new_node(self.cluster, bootstrap=True)
node2.start(wait_for_binary_proto=True, wait_other_notice=True)
node2_resource = self.get_dynamodb_api(node=node2).resource
table_r2 = node2_resource.Table(table_name)
def writes(tab, rang):
for i in rang:
tab.put_item(Item={"p": "hi", "c": f"item{i}", "v1": "dog", "v2": "cat"})
# Create two writing threads, each writing 1000 *different* rows to
# one different connection:
total_items = 2000
executor = ThreadPoolExecutor(max_workers=2)
thread1 = executor.submit(writes, tab=table_r1, rang=range(1000))
thread2 = executor.submit(writes, tab=table_r2, rang=range(1000, 2000))
thread1.result()
thread2.result()
# Scan the table, looking for broken items (issue #7218)
n_items = 0
n_bad_items = 0
def check_item(item):
nonlocal n_items
nonlocal n_bad_items
n_items = n_items + 1
if not "v1" in item or not "v2" in item:
n_bad_items = n_bad_items + 1
print(f"Bad item: {item}")
def check_items(items):
for item in items:
check_item(item)
response = table_r1.scan(ConsistentRead=True)
check_items(response["Items"])
while "LastEvaluatedKey" in response:
response = table_r1.scan(ExclusiveStartKey=response["LastEvaluatedKey"], ConsistentRead=True)
check_items(response["Items"])
assert n_items == total_items
assert n_bad_items == 0
def test_tls_connection(self):
"""
Create a HTTPS (SSL/TLS) connection, and verify the test can create a table and insert data into it.
Also, check the log file for each node contains a log that only the HTTPS (SSL/TLS) connection is open, and the
unsecured connection is closed.
"""
new_items = []
table_name = TABLE_NAME
logger.info('Configuring secured Alternator session with "self signed x509 certificate"')
self.prepare_dynamodb_cluster(num_of_nodes=1, is_encrypted=True)
nodes = self.cluster.nodelist()
node1 = nodes[0]
logger.debug("Create table")
self.create_table(table_name=table_name, node=node1)
for node_idx, node in enumerate(nodes):
node.grep_log(f"Alternator server listening on {get_ip_from_node(node=node)}, HTTP port OFF, HTTPS port {ALTERNATOR_SECURE_PORT}")
new_items = self.create_items(num_of_items=(node_idx + 1) * 10)
self.batch_write_actions(table_name=table_name, node=node, new_items=new_items)
self.compare_table_data(expected_table_data=new_items, table_name=table_name, node=node1)
def test_limit_concurrent_requests(self):
"""
Test Support limiting the number of concurrent requests in alternator.
Verifies https://github.com/scylladb/scylla/issues/7294
Test scenario:
1) Configure cluster requests-concurrency-limit to a low number.
2) Issue Alternator 'heavy' requests concurrently (create-table)
3) wait for RequestLimitExceeded error response.
"""
concurrent_requests_limit = 5
extra_config = {"max_concurrent_requests_per_shard": concurrent_requests_limit, "num_tokens": 1}
self.prepare_dynamodb_cluster(num_of_nodes=1, extra_config=extra_config)
node1 = self.cluster.nodelist()[0]
create_tables_threads = []
for tables_num in range(concurrent_requests_limit * 5):
create_tables_threads.append(self.run_create_table_thread())
@retrying(num_attempts=150, sleep_time=0.2, allowed_exceptions=ConcurrencyLimitNotExceededError, message="Running create-table request")
def wait_for_create_table_request_failure():
try:
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
except Exception as error:
if "RequestLimitExceeded" in error.args[0]:
return
raise
raise ConcurrencyLimitNotExceededError
wait_for_create_table_request_failure()
for thread in create_tables_threads:
thread.join()
@staticmethod
def _set_slow_query_logging_api(run_on_node: ScyllaNode, is_enable: bool = True, threshold: int | None = None):
"""
:param run_on_node: node to send the REST API command.
:param is_enable: enable/disable slow-query logging
:param threshold: a numeric value for the minimum duration to a query as slow.
"""
enable = "true" if is_enable else "false"
api_cmd = f"http://{run_on_node.address()}:10000/storage_service/slow_query?enable={enable}"
logger.info(f"Send restful api: {api_cmd}")
result = requests.post(api_cmd)
result.raise_for_status()
if threshold is not None:
api_cmd = f"http://{run_on_node.address()}:10000/storage_service/slow_query?threshold={threshold}"
logger.info(f"Send restful api: {api_cmd}")
result = requests.post(api_cmd)
result.raise_for_status()
api_cmd = f"http://{run_on_node.address()}:10000/storage_service/slow_query"
logger.info(f"Send restful api: {api_cmd}")
response = requests.get(api_cmd)
response_json = response.json()
if response_json["enable"] != is_enable:
raise SlowQueriesLoggingError(f"Got unexpected slow-query-logging values. enable: {response_json['enable']}")
if threshold is not None and response_json["threshold"] != threshold:
raise SlowQueriesLoggingError(f"Got unexpected threshold value: {response_json['threshold']}")
@retrying(num_attempts=100, sleep_time=0.2, allowed_exceptions=SlowQueriesLoggingError, message="wait_for_slow_query_logs")
def wait_for_slow_query_logs(self, node):
"""
Wait for a non-empty query of the scylla.alternator.system_traces.node_slow_log table.
:param node: the node for running table full scan
:return: full scan of node_slow_log table
"""
results = self.scan_table(".scylla.alternator.system_traces.node_slow_log", node=node, consistent_read=False)
if results:
return results
raise SlowQueriesLoggingError
@staticmethod
def is_found_in_slow_queries_log(name: str, log_result: list) -> bool:
"""
:param name: name of alternator operation name of object name to search for.
:param log_result: a slow-query-logging table full scan result to search in.
:return: is name found in the given result full-scan query.
"""
if not any(name in result_op["parameters"] for result_op in log_result):
logger.info(f"{name} is not found in slow-query-log result\n Log result start: {log_result[0]}\n Log result end: {log_result[-1]}")
return False
return True
def create_tables(self, count: int, node) -> list:
"""
:param count: number of tables to create
:param node: the node to run create-table commands on.
:return: a list of table names.
"""
table_names = []
for _ in range(count):
name = random_string(length=10)
self.create_table(table_name=name, node=node)
table_names.append(name)
return table_names
@retrying(num_attempts=100, sleep_time=0.2, allowed_exceptions=SlowQueriesLoggingError, message="wait_for_a_specific_slow_query_logs")
def wait_for_create_table_slow_query_logs(self, node, table_names: list):
"""
Verify all created tables are logged as slow-query operation.
"""
logger.info("Get logging results for 'createTable' queries")
results = self.wait_for_slow_query_logs(node=node)
create_table_results = [res for res in results if "CreateTable" in res["parameters"]]
for table in table_names:
if not self.is_found_in_slow_queries_log(name=table, log_result=create_table_results):
raise SlowQueriesLoggingError(f"Table {table} not found in slow-query-log full-scan")
def test_slow_query_logging(self):
"""
Test slow query logging for alternator queries.
Verifies https://github.com/scylladb/scylla/pull/8298
Test scenario:
1) Configure slow-query threshold to a low number.
2) Issue Alternator long-enough queries
3) Verify slow queries of create-table are logged.
4) Verify slow queries of create-table are stopped being logged after set to disabled.
"""
# Setting a small enough threshold value that is lower than createTable operation minimum duration.
slow_query_threshold = 2
num_of_nodes = self._num_of_nodes_for_test(rf=3)
self.prepare_dynamodb_cluster(num_of_nodes=num_of_nodes)
node1 = self.cluster.nodelist()[0]
logger.info("Running background stress and topology changes..")
self.create_table(table_name=TABLE_NAME, node=node1)
stress_thread = self.run_write_stress(table_name=TABLE_NAME, node=node1, num_of_item=1000, ignore_errors=True)
decommission_thread = self.run_decommission_add_node_thread()
logger.info("Enable slow-query-logging with specified threshold")
self._set_slow_query_logging_api(run_on_node=node1, threshold=slow_query_threshold)
logger.info("Execute 5 'createTable' slow-enough queries")
first_table_names = self.create_tables(count=5, node=node1)
logger.info("Verify all created tables are found in log results")
self.wait_for_create_table_slow_query_logs(node=node1, table_names=first_table_names)
logger.info("Disable slow-query-logging")
self._set_slow_query_logging_api(run_on_node=node1, is_enable=False)
logger.info("Execute 5 additional 'createTable' slow-enough queries after slow-query-logging is disabled")
second_table_names = self.create_tables(count=5, node=node1)
results = self.wait_for_slow_query_logs(node=node1)
logger.info("Verify latter created tables are not found in log results")
for table_name in second_table_names:
assert not self.is_found_in_slow_queries_log(name=table_name, log_result=results), f"Found unexpected logged slow query: {table_name}"
stress_thread.join()
decommission_thread.join()
def test_delete_elements_from_a_set(self):
"""
Verifies https://github.com/scylladb/scylla/commit/253387ea07962d4fd8cb221eb90298b9127caf9f
alternator: implement AttributeUpdates DELETE operation with Value
Test scenario:
1) Generate a load with set-type data.
2) Issue topology-change operations (add/remove node)
3) Run AttributeUpdates DELETE operation on a set of strings.
4) Verify the data can be read and no unexpected errors.
"""
num_of_nodes = self._num_of_nodes_for_test(rf=3)
self.prepare_dynamodb_cluster(num_of_nodes=num_of_nodes)
node1 = self.cluster.nodelist()[0]
num_of_items = 300
items = self.create_items(num_of_items=num_of_items, use_set_data_type=True)
self.create_table(table_name=TABLE_NAME, node=node1)
self.batch_write_actions(table_name=TABLE_NAME, node=node1, new_items=items)
logger.info("Running background stress and topology changes..")
stress_thread = self.run_write_stress(table_name=TABLE_NAME, node=node1, num_of_item=num_of_items, use_set_data_type=True, ignore_errors=True)
decommission_thread = self.run_decommission_add_node_thread()
logger.info("Run AttributeUpdates DELETE operations")
self.update_table_delete_set_elements(table_name=TABLE_NAME, node=node1, num_of_items=num_of_items, consistent_read=False)
stress_thread.join()
self.update_table_delete_set_elements(table_name=TABLE_NAME, node=node1, num_of_items=num_of_items, consistent_read=False)
decommission_thread.join()
logger.info("Reading all existing data after delete-operations are completed")
items = self.get_table_items(table_name=TABLE_NAME, node=node1, consistent_read=False, num_of_items=num_of_items)
assert [item for item in items if NUM_OF_ELEMENTS_IN_SET > len(item["Item"]["hello_set"]) > 0]
def test_ttl_with_load_and_decommission(self):
"""
1. Configure a table with TTL enabled and an 'expiration' column.
2. Create a load of read/write/update-items delete-set-elements
3. Run a loop of flush and compaction on cluster nodes.
4. Verify all data is eventually deleted on cluster nodes.
"""
ttl_polling_interval = 4
if self.cluster.scylla_mode != "debug":
nodes = 4
topo = {"dc1": {"rack1": 1, "rack2": 1, "rack3": 2}}
else:
nodes = 2
topo = {"dc1": {"rack1": 2}}
self.prepare_dynamodb_cluster(num_of_nodes=nodes, extra_config={"alternator_ttl_period_in_seconds": ttl_polling_interval}, topo=topo)
node1, *_ = self.cluster.nodelist()
table = self.create_table(node=node1)
# Enable TTL for table
self.get_dynamodb_api(node=node1).client.update_time_to_live(TableName=table.name, TimeToLiveSpecification={"AttributeName": "expiration", "Enabled": True})
expiration_sec = 10
num_of_items = 100
logger.info("Running background stress and topology changes..")
stress_thread = self.run_write_stress(table_name=TABLE_NAME, node=node1, num_of_item=num_of_items, use_set_data_type=True, expiration_sec=expiration_sec, random_start_index=True, ignore_errors=True)
read_and_delete_set_elements_thread = self.run_delete_set_elements_stress(table_name=TABLE_NAME, node=node1, random_start_index=True)
# hardcoded decommission that repeats picking logic of run_decommission_add_node_thread
node_to_decommission = self.cluster.nodelist()[-1]
decommission_thread = self.run_decommission_add_node_thread()
logger.info("Run flush and compaction on cluster nodes")
iterations = 3 if self.cluster.scylla_mode != "debug" else 1
def get_live_nodes():
return [node for node in self.cluster.nodelist()
if node.is_live() and node.server_id != node_to_decommission.server_id]
nodes_for_maintenance = get_live_nodes()
for _ in range(iterations):
try:
random.choice(nodes_for_maintenance).flush()
random.choice(nodes_for_maintenance).compact()
except NodetoolError as exc:
error_message = str(exc)
valid_errors = ["ConnectException", "Connection refused", "status code 404 Not Found"]
if not any(err in error_message for err in valid_errors):
raise
logger.info("Stopping background stress and decommission")
read_and_delete_set_elements_thread.join()
stress_thread.join()
decommission_thread.join()
logger.info("Verify all data is eventually deleted")
@retrying(num_attempts=expiration_sec + ttl_polling_interval, sleep_time=2, allowed_exceptions=TtlNotExpiredError, message="Wait for TTL expiration threshold of a configured table")
def wait_for_deletion_post_ttl_expiration(node: ScyllaNode):
"""
Wait for TTL expiration threshold of a configured table.
:param node: the node for running table full scan
"""
table_data = self.scan_table(table_name=TABLE_NAME, node=node, ConsistentRead=False)
logger.info("scan table result: %s", table_data)
if not table_data:
return
raise TtlNotExpiredError
# Check 2 nodes for TTL expiration of all data
for db_node in self.cluster.nodelist()[:1]:
wait_for_deletion_post_ttl_expiration(node=db_node)
class TtlNotExpiredError(Exception):
pass