Compare commits

..

1 Commits

Author SHA1 Message Date
Yaniv Michael Kaul
9b65d6d0fc topology: propagate error messages through raft_topology_cmd_result
When a topology command (e.g., rebuild) fails on a target node, the
exception message was being swallowed at multiple levels:

1. raft_topology_cmd_handler caught exceptions and returned a bare
   fail status with no error details.
2. exec_direct_command_helper saw the fail status and threw a generic
   "failed status returned from {id}" message.
3. The rebuilding handler caught that and stored a hardcoded
   "streaming failed" message.

This meant users only saw "rebuild failed: streaming failed" instead
of the actionable error from the safety check (e.g., "it is unsafe
to use source_dc=dc2 to rebuild keyspace=...").

Fix by:
- Adding an error_message field to raft_topology_cmd_result (with
  [[version 2026.2]] for wire compatibility).
- Populating error_message with the exception text in the handler's
  catch blocks.
- Including error_message in the exception thrown by
  exec_direct_command_helper.
- Passing the actual error through to rtbuilder.done() instead of
  the hardcoded "streaming failed".

A follow-up test is in https://github.com/scylladb/scylladb/pull/29363

Fixes: SCYLLADB-1404
2026-04-24 11:25:31 +03:00
22 changed files with 71 additions and 1636 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.3.0-dev
VERSION=2026.2.0-dev
if test -f version
then

View File

@@ -72,6 +72,7 @@ struct raft_topology_cmd_result {
success
};
service::raft_topology_cmd_result::command_status status;
sstring error_message [[version 2026.2]];
};
struct raft_snapshot {

View File

@@ -4792,8 +4792,13 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
} catch (const raft::request_aborted& e) {
rtlogger.warn("raft_topology_cmd {} failed with: {}", cmd.cmd, e);
result.error_message = e.what();
} catch (const std::exception& e) {
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, e);
result.error_message = e.what();
} catch (...) {
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, std::current_exception());
result.error_message = "unknown error";
}
rtlogger.info("topology cmd rpc {} completed with status={} index={}",

View File

@@ -443,8 +443,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd(
&_messaging, to_host_id(id), id, _term, cmd_index, cmd);
if (result.status == raft_topology_cmd_result::command_status::fail) {
auto msg = result.error_message.empty()
? ::format("failed status returned from {}", id)
: ::format("failed status returned from {}: {}", id, result.error_message);
co_await coroutine::exception(std::make_exception_ptr(
std::runtime_error(::format("failed status returned from {}", id))));
std::runtime_error(std::move(msg))));
}
};
@@ -3909,10 +3912,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (const std::exception& e) {
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
" (node state is rebuilding): {}", e);
rtbuilder.done(e.what());
retake = true;
} catch (...) {
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
" (node state is rebuilding): {}", std::current_exception());
rtbuilder.done("streaming failed");
rtbuilder.done("unknown error");
retake = true;
}
if (retake) {

View File

@@ -318,6 +318,9 @@ struct raft_topology_cmd_result {
success
};
command_status status = command_status::fail;
// Carries the error description back to the topology coordinator
// when the command fails.
sstring error_message;
};
// This class is used in RPC's signatures to hold the topology_version of the caller.

View File

@@ -5171,16 +5171,8 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use a fixed partition key. The row-size thresholds below are chosen at exact
// byte boundaries of the MC sstable row encoding: the first clustering row body
// encodes prev_row_size as a vint, and prev_row_size includes the partition
// header (which contains the partition key's serialized length+bytes). A
// random-size partition key (as produced by simple_schema::make_pkey() /
// tests::generate_partition_key(), which default to key_size{1,128}) would
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
// comparison, making this test flaky. Under smp=1 (which this test runs with),
// a fixed key is always shard-local, so no sharding-metadata issue arises.
mutation partition = s.new_mutation("pv");
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5252,16 +5244,8 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use a fixed partition key. The cell-size thresholds below are chosen at exact
// byte boundaries of the MC sstable row encoding: the first clustering row body
// encodes prev_row_size as a vint, and prev_row_size includes the partition
// header (which contains the partition key's serialized length+bytes). A
// random-size partition key (as produced by simple_schema::make_pkey() /
// tests::generate_partition_key(), which default to key_size{1,128}) would
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
// comparison, making this test flaky. Under smp=1 (which this test runs with),
// a fixed key is always shard-local, so no sharding-metadata issue arises.
mutation partition = s.new_mutation("pv");
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5280,6 +5264,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
for (auto idx = 0; idx < rows - 1; idx++) {
@@ -5341,6 +5326,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
int live_rows = 0;
@@ -5450,6 +5436,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
std::map<bytes, bytes> kv_map;
for (auto i = 0; i < elements; i++) {
@@ -5525,6 +5512,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) {
// Create a mutation with a clustering row whose serialized cell value
// exceeds the 1-byte thresholds, so partition_size, row_size, and
// cell_size records are all generated.
// Use make_pkey() (no argument) to generate a key on this shard.
auto pk = ss.make_pkey();
mutation m(s, pk);
auto ck = ss.make_ckey("ck1");
@@ -5634,6 +5622,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) {
// Create 6 partitions, each with one row of increasing size.
// Since each partition has exactly one row, we get 6 row_size records
// competing for 3 slots.
// Use make_pkeys() to generate shard-local keys.
auto pkeys = ss.make_pkeys(6);
utils::chunked_vector<mutation> muts;
for (int i = 0; i < 6; i++) {

View File

@@ -18,7 +18,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from pathlib import Path
from typing import TYPE_CHECKING
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
from test import TOP_SRC_DIR, path_to
from test.pylib.random_tables import RandomTables
from test.pylib.skip_types import skip_env
from test.pylib.util import unique_name
@@ -394,8 +394,3 @@ async def key_provider(request, tmpdir, scylla_binary):
"""Encryption providers fixture"""
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
yield res
@pytest.fixture(scope="function")
def failure_detector_timeout(build_mode):
return 2000 * MODES_TIMEOUT_FACTOR[build_mode]

View File

@@ -227,11 +227,6 @@ class ScyllaCluster:
def flush(self) -> None:
self.nodetool("flush")
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
for node in self.nodelist():
if node.is_running():
node.compact(keyspace=keyspace, tables=tables)
@staticmethod
def debug(message: str) -> None:
logger.debug(message)

View File

@@ -111,7 +111,6 @@ class ScyllaNode:
self.data_center = server.datacenter
self.rack = server.rack
self._hostid = None
self._smp_set_during_test = None
self._smp = None
self._memory = None
@@ -466,9 +465,6 @@ class ScyllaNode:
if wait_for_binary_proto:
self.wait_for_binary_interface(from_mark=self.mark)
if not self._hostid:
self.hostid()
if wait_other_notice:
timeout = self.cluster.default_wait_other_notice_timeout
for node, mark in marks:
@@ -651,12 +647,11 @@ class ScyllaNode:
cmd.append(table)
self.nodetool(" ".join(cmd), **kwargs)
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
def compact(self, keyspace: str = "", tables: str | None = ()) -> None:
compact_cmd = ["compact"]
if keyspace:
compact_cmd.append(keyspace)
if tables:
compact_cmd.extend(tables)
compact_cmd += tables
self.nodetool(" ".join(compact_cmd))
def drain(self, block_on_log: bool = False) -> None:
@@ -829,13 +824,10 @@ class ScyllaNode:
assert timeout is None, "argument `timeout` is not supported" # not used in scylla-dtest
assert force_refresh is None, "argument `force_refresh` is not supported" # not used in scylla-dtest
if not self._hostid:
try:
self._hostid = self.cluster.manager.get_host_id(server_id=self.server_id)
except Exception as exc:
self.error(f"Failed to get hostid: {exc}")
return self._hostid
try:
return self.cluster.manager.get_host_id(server_id=self.server_id)
except Exception as exc:
self.error(f"Failed to get hostid: {exc}")
def rmtree(self, path: str | Path) -> None:
"""Delete a directory content without removing the directory.

View File

@@ -34,7 +34,6 @@ def pytest_addoption(parser: Parser) -> None:
parser.addoption("--experimental-features", type=lambda s: s.split(","), action="store", help="Pass experimental features <feature>,<feature> to enable", default=None)
parser.addoption("--tablets", action=argparse.BooleanOptionalAction, default=False, help="Whether to enable tablets support (default: %(default)s)")
parser.addoption("--force-gossip-topology-changes", action="store_true", default=False, help="force gossip topology changes in a fresh cluster")
parser.addoption("--compaction-strategy", action="store", default=None, help="Compaction strategy to use in tests that support it (e.g. wide_rows_test.py). One of LeveledCompactionStrategy, SizeTieredCompactionStrategy, TimeWindowCompactionStrategy, or IncrementalCompactionStrategy. If not set, a random strategy is chosen per test.")
def pytest_configure(config: Config) -> None:

View File

@@ -263,25 +263,3 @@ def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
sorted_list2 = sorted(normalized_list2, key=lambda elm: str(elm[sort_key]))
assert sorted_list1 == sorted_list2
def assert_equal_more_with_deviation(actual, expect, deviation_perc):
"""
Assert actual is within inclusive interval [expected...expected+deviation_perc]
@param actual Value inspected
@param expect Beginning of expected interval
@param deviation_perc allowed percent increase
"""
deviation_high = (expect * (100 + deviation_perc)) / 100
assert expect <= actual <= deviation_high, f"Expect result interval {expect}..{deviation_high}, received {actual}"
def assert_less_equal_lists(actual_list, expected_list, msg=None):
"""
Assert actual_list is a subset of the expected list, prints hardcoded or parameterized error message
@param actual_list Inspected list
@param expected_list List that supposed to include actual_list
@param msg Configured message default None.
"""
standardMsg = msg or f"{actual_list} not less than or equal to {expected_list}"
assert set(actual_list) <= set(expected_list), standardMsg

File diff suppressed because it is too large Load Diff

View File

@@ -48,6 +48,5 @@ run_in_dev:
- dtest/commitlog_test
- dtest/cfid_test
- dtest/rebuild_test
- dtest/wide_rows_test
run_in_debug:
- random_failures/test_random_failures

View File

@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
bootstrap, removenode, replace.
@@ -41,7 +41,7 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': failure_detector_timeout
'failure_detector_timeout_in_ms': 2000
}
cmdline = [
'--logger-log-level', 'raft_topology=trace',

View File

@@ -22,11 +22,11 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.nightly
async def test_banned_node_notification(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_banned_node_notification(manager: ManagerClient) -> None:
"""Test that a node banned from the cluster get notification about been banned"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': failure_detector_timeout
'failure_detector_timeout_in_ms': 2000
}
srvs = await manager.servers_add(3, config=config, auto_rack_dc="dc")
cql = manager.get_cql()

View File

@@ -60,14 +60,14 @@ async def make_servers(manager: ManagerClient, servers_num: int,
@pytest.mark.asyncio
async def test_raft_replace_ignore_nodes(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_raft_replace_ignore_nodes(manager: ManagerClient) -> None:
"""Replace 3 dead nodes.
This is a slow test with a 7 node cluster and 3 replace operations,
we want to run it only in dev mode.
"""
logger.info("Booting initial cluster")
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': 2000})
s1_id = await manager.get_host_id(servers[1].server_id)
s2_id = await manager.get_host_id(servers[2].server_id)

View File

@@ -21,9 +21,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_different_ip(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_different_ip(manager: ManagerClient) -> None:
"""Replace an existing node with new node using a different IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
logger.info(f"cluster started, servers {servers}")
logger.info(f"replacing server {servers[0]}")
@@ -67,18 +67,18 @@ async def test_replace_different_ip(manager: ManagerClient, failure_detector_tim
logger.info(f"server {s} system.peers and gossiper state is valid")
@pytest.mark.asyncio
async def test_replace_different_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node reusing the replaced node host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
@pytest.mark.asyncio
async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout}, auto_rack_dc="dc1")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
logger.info(f"creating test table")
@@ -130,9 +130,9 @@ async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detecto
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
@pytest.mark.asyncio
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address and same host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
await manager.server_add(replace_cfg)

View File

@@ -14,9 +14,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_with_same_ip_twice(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None:
logger.info("starting a cluster with two nodes")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
logger.info(f"cluster started {servers}")
async def replace_with_same_ip(s: ServerInfo) -> ServerInfo:

View File

@@ -119,14 +119,14 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage, failure_detector_timeout):
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
if fail_stage == 'cleanup' and fail_replica == 'destination':
skip_env('Failing destination during cleanup is pointless')
if fail_stage == 'cleanup_target' and fail_replica == 'source':
skip_env('Failing source during target cleanup is pointless')
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': failure_detector_timeout}
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
host_ids = []
servers = []

View File

@@ -37,6 +37,6 @@ class ServerInfo(NamedTuple):
class ServerUpState(IntEnum):
PROCESS_STARTED = auto()
HOST_ID_QUERIED = auto()
CQL_ALTERNATOR_CONNECTED = auto()
CQL_ALTERNATOR_QUERIED = auto()
CQL_CONNECTED = auto()
CQL_QUERIED = auto()
SERVING = auto() # Scylla sent sd_notify("serving")

View File

@@ -389,7 +389,7 @@ class ManagerClient:
seeds: list[IPAddress] | None = None,
timeout: float | None = None,
connect_driver: bool = True,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None,
auth_provider: dict[str, str] | None = None) -> None:
@@ -540,7 +540,7 @@ class ManagerClient:
seeds: Optional[List[IPAddress]] = None,
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT,
server_encryption: str = "none",
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
connect_driver: bool = True) -> ServerInfo:
"""Add a new server"""
if expected_error is not None:

View File

@@ -41,7 +41,6 @@ import yaml
import signal
import glob
import errno
import json
import re
import platform
import contextlib
@@ -558,7 +557,7 @@ class ScyllaServer:
async def install_and_start(self,
api: ScyllaRESTAPIClient,
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None:
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None:
"""Setup and start this server."""
await self.install()
@@ -678,63 +677,10 @@ class ScyllaServer:
return None
return maintenance_socket_option
def _alternator_ports(self) -> list[tuple[str, int]]:
"""Return (scheme, port) for every configured Alternator port."""
ports = []
if "alternator_port" in self.config:
ports.append(("http", self.config["alternator_port"]))
if "alternator_https_port" in self.config:
ports.append(("https", self.config["alternator_https_port"]))
return ports
async def get_cql_up_state(self) -> ServerUpState | None:
"""Get the CQL up state (a check we use at start up).
async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool:
"""TCP connect to every configured Alternator port.
Returns True if all ports accept connections.
"""
for _, port in ports:
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(self.ip_addr, port), timeout=2)
writer.close()
await writer.wait_closed()
except (OSError, asyncio.TimeoutError):
return False
return True
async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool:
"""Sends a GetItem for a randomly-named nonexistent table and validates
that the response is a DynamoDB-shaped JSON error (contains __type),
confirming Alternator is processing DynamoDB API requests.
Returns True if all ports respond correctly.
"""
table_name = f"nonexistent_table_{uuid.uuid4().hex}"
headers = {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.GetItem",
}
body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}})
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(timeout=timeout) as session:
for scheme, port in ports:
url = f"{scheme}://{self.ip_addr}:{port}/"
try:
# ssl=False skips certificate verification
async with session.post(url, headers=headers, data=body, ssl=False) as resp:
response_body = await resp.json(content_type=None)
if "__type" not in response_body:
return False
except Exception as exc:
self.logger.debug("Alternator query check failed for %s: %s", url, exc)
return False
return True
async def get_cql_up_state(self) -> tuple[bool, bool]:
"""Check CQL connectivity.
Returns (connected, queried) indicating whether a CQL connection
was established and whether a query executed successfully.
Return None if it fails to connect.
"""
caslog = logging.getLogger('cassandra')
oldlevel = caslog.getEffectiveLevel()
@@ -762,7 +708,6 @@ class ScyllaServer:
request_timeout=self.TOPOLOGY_TIMEOUT)
contact_points=[self.rpc_address]
connected = False
cql_queried = False
try:
# In a cluster setup, it's possible that the CQL
# here is directed to a node different from the initial contact
@@ -785,32 +730,13 @@ class ScyllaServer:
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
auth_provider=self.auth_provider)
self.control_connection = self.control_cluster.connect()
cql_queried = True
return ServerUpState.CQL_QUERIED
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
self.logger.debug("Exception when checking if CQL is up: %s", exc)
return ServerUpState.CQL_CONNECTED if connected else None
finally:
caslog.setLevel(oldlevel)
# Any other exception may indicate a problem, and is passed to the caller.
return connected, cql_queried
async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]:
connected = await self.check_alternator_connected(ports)
queried = connected and await self.check_alternator_queried(ports)
return connected, queried
async def get_cql_alternator_up_state(self) -> ServerUpState | None:
"""Get the combined CQL + Alternator up state."""
cql_connected, cql_queried = await self.get_cql_up_state()
alt_connected, alt_queried = False, False
alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator
if alt_ports:
alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports)
if cql_queried and (alt_queried or not alt_ports):
return ServerUpState.CQL_ALTERNATOR_QUERIED
if not cql_connected or (alt_ports and not alt_connected):
return None
# Here both CQL and Alternator (if exists) are at least connected
return ServerUpState.CQL_ALTERNATOR_CONNECTED
def _setup_notify_socket(self) -> None:
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
@@ -898,7 +824,7 @@ class ScyllaServer:
async def start(self,
api: ScyllaRESTAPIClient,
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None) -> None:
"""Start an installed server.
@@ -971,9 +897,9 @@ class ScyllaServer:
if await self.try_get_host_id(api):
if server_up_state == ServerUpState.PROCESS_STARTED:
server_up_state = ServerUpState.HOST_ID_QUERIED
server_up_state = await self.get_cql_alternator_up_state() or server_up_state
server_up_state = await self.get_cql_up_state() or server_up_state
# Check for SERVING state (sd_notify "serving" message)
if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification():
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
server_up_state = ServerUpState.SERVING
if server_up_state >= expected_server_up_state:
if expected_error is not None:
@@ -1268,7 +1194,7 @@ class ScyllaCluster:
seeds: Optional[List[IPAddress]] = None,
server_encryption: str = "none",
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo:
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo:
"""Add a new server to the cluster"""
self.is_dirty = True
@@ -1508,7 +1434,7 @@ class ScyllaCluster:
server_id: ServerNum,
expected_error: str | None = None,
seeds: list[IPAddress] | None = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None,
auth_provider: dict[str, str] | None = None) -> None:
@@ -1991,7 +1917,7 @@ class ScyllaClusterManager:
server_id=server_id,
expected_error=data.get("expected_error"),
seeds=data.get("seeds"),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
cmdline_options_override=data.get("cmdline_options_override"),
append_env_override=data.get("append_env_override"),
auth_provider=data.get("auth_provider"),
@@ -2026,7 +1952,7 @@ class ScyllaClusterManager:
seeds=data.get("seeds"),
server_encryption=data.get("server_encryption", "none"),
expected_error=data.get("expected_error"),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
)
return s_info.as_dict()