diff --git a/test/cluster/dtest/alternator_tests.py b/test/cluster/dtest/alternator_tests.py index 20ea02ce4e..2cfbfeae01 100644 --- a/test/cluster/dtest/alternator_tests.py +++ b/test/cluster/dtest/alternator_tests.py @@ -9,6 +9,7 @@ import os import random import string import tempfile +import threading from concurrent.futures.thread import ThreadPoolExecutor from pprint import pformat @@ -484,24 +485,27 @@ class TesterAlternator(BaseAlternator): extra_config = {"max_concurrent_requests_per_shard": concurrent_requests_limit, "num_tokens": 1} self.prepare_dynamodb_cluster(num_of_nodes=1, extra_config=extra_config) node1 = self.cluster.nodelist()[0] - create_tables_threads = [] - for tables_num in range(concurrent_requests_limit * 5): - create_tables_threads.append(self.run_create_table_thread()) + stop_workers = threading.Event() - @retrying(num_attempts=150, sleep_time=0.2, allowed_exceptions=ConcurrencyLimitNotExceededError, message="Running create-table request") - def wait_for_create_table_request_failure(): - try: - self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False) - except Exception as error: - if "RequestLimitExceeded" in error.args[0]: - return - raise - raise ConcurrencyLimitNotExceededError + def run_create_table_until_limited() -> None: + while not stop_workers.is_set(): + try: + self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False) + except Exception as error: # noqa: BLE001 + if "RequestLimitExceeded" in str(error): + stop_workers.set() + return + raise - wait_for_create_table_request_failure() + with ThreadPoolExecutor(max_workers=concurrent_requests_limit * 5) as executor: + create_table_futures = [executor.submit(run_create_table_until_limited) for _ in range(concurrent_requests_limit * 5)] - for thread in create_tables_threads: - thread.join() + if not stop_workers.wait(timeout=30): + raise ConcurrencyLimitNotExceededError + + stop_workers.set() + for future in create_table_futures: + future.result(timeout=60) @staticmethod def _set_slow_query_logging_api(run_on_node: ScyllaNode, is_enable: bool = True, threshold: int | None = None):