mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 19:35:12 +00:00
The fix for SCYLLADB-1373 (b4f652b7c1) changed get_session() to use
the default timeout=30 for the retry loop in patient_*_cql_connection
(previously timeout=0.1). This correctly allowed retrying transient
NoHostAvailable errors during node startup, but introduced a new
flakiness in test_login and other auth tests.
The failure chain:
1. test_login connects with bad credentials (e.g. user="doesntexist")
2. get_session() calls patient_exclusive_cql_connection(), which calls
retry_till_success() with bypassed_exception=NoHostAvailable
3. The first attempt correctly fails: the server rejects the credentials
with AuthenticationFailed, wrapped in NoHostAvailable
4. retry_till_success() catches NoHostAvailable indiscriminately and
retries, not distinguishing between transient errors (node not ready)
and permanent errors (bad credentials)
5. A subsequent retry attempt times out (connect_timeout=5), producing
OperationTimedOut wrapped in NoHostAvailable
6. After 30 seconds, the last NoHostAvailable is raised -- now wrapping
OperationTimedOut instead of the original AuthenticationFailed
7. The assertion `isinstance(..., AuthenticationFailed)` fails
With the old timeout=0.1, the deadline was already exceeded after the
first attempt, so the original AuthenticationFailed propagated.
Fix: Add a `should_retry` predicate parameter to retry_till_success()
and use it in patient_cql_connection() and
patient_exclusive_cql_connection() to immediately re-raise
NoHostAvailable when it wraps AuthenticationFailed. Retrying
authentication failures is never useful since the credentials won't
change between attempts.
Fixes: SCYLLADB-1382
Closes scylladb/scylladb#29348
79 lines
3.0 KiB
Python
79 lines
3.0 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import logging
|
|
import time
|
|
from typing import TYPE_CHECKING
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Callable
|
|
|
|
from test.cluster.dtest.ccmlib.scylla_node import ScyllaNode
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def retry_till_success[T, **P](fun: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
|
|
timeout = kwargs.pop("timeout", 60)
|
|
bypassed_exception = kwargs.pop("bypassed_exception", Exception)
|
|
should_retry = kwargs.pop("should_retry", None)
|
|
|
|
deadline = time.perf_counter() + timeout
|
|
while True:
|
|
try:
|
|
return fun(*args, **kwargs)
|
|
except bypassed_exception as e:
|
|
if (should_retry and not should_retry(e)) or time.perf_counter() > deadline:
|
|
raise
|
|
|
|
# Brief pause before next attempt.
|
|
time.sleep(0.1)
|
|
|
|
|
|
def list_to_hashed_dict(query_response_list: list) -> dict:
|
|
"""
|
|
takes a list and hashes the contents and puts them into a dict so the contents can be compared
|
|
without order. unfortunately, we need to do a little massaging of our input; the result from
|
|
the driver can return a OrderedMapSerializedKey (e.g. [0, 9, OrderedMapSerializedKey([(10, 11)])])
|
|
but our "expected" list is simply a list of elements (or list of list). this means if we
|
|
hash the values as is we'll get different results. to avoid this, when we see a dict,
|
|
convert the raw values (key, value) into a list and insert that list into a new list
|
|
:param query_response_list the list to convert
|
|
:return: dict containing the contents fo the list with the hashed contents
|
|
"""
|
|
hashed_dict = dict()
|
|
for item_lst in query_response_list:
|
|
normalized_list = []
|
|
for item in item_lst:
|
|
if hasattr(item, "items"):
|
|
tmp_list = []
|
|
for a, b in item.items():
|
|
tmp_list.append(a)
|
|
tmp_list.append(b)
|
|
normalized_list.append(tmp_list)
|
|
else:
|
|
normalized_list.append(item)
|
|
list_str = str(normalized_list)
|
|
utf8 = list_str.encode("utf-8", "ignore")
|
|
list_digest = hashlib.sha256(utf8).hexdigest()
|
|
hashed_dict[list_digest] = normalized_list
|
|
return hashed_dict
|
|
|
|
|
|
def set_trace_probability(nodes: list[ScyllaNode], probability_value: float) -> None:
|
|
def _set_trace_probability_for_node(_node: ScyllaNode) -> None:
|
|
logger.debug(f'{"Enable" if probability_value else "Disable"} trace for {_node.name} with {probability_value=}')
|
|
_node.cluster.manager.api.set_trace_probability(node_ip=_node.address(), probability=probability_value)
|
|
|
|
with ThreadPoolExecutor(max_workers=len(nodes)) as executor:
|
|
threads = [executor.submit(_set_trace_probability_for_node, node) for node in nodes]
|
|
[thread.result() for thread in threads]
|