Compare commits

..

6 Commits

Author SHA1 Message Date
Anna Mikhlin
5231c77e8e Update ScyllaDB version to: 2026.2.0-rc0 2026-04-26 15:28:16 +03:00
Piotr Szymaniak
d5efd1f676 test/cluster: wait for Alternator readiness in server startup
server_add() only waits for CQL readiness before returning. The
Alternator HTTP port may not be listening yet, causing
ConnectionRefused with Alternator tests.

Extend the ServerUpState enum and startup loop to also check Alternator
port readiness when configured. Whenever Alternator port(s) is/are
configured, each is verified if connectable and queryable,
similar to how CQL ports are probed.

Fixes SCYLLADB-1701

Closes scylladb/scylladb#29625
2026-04-25 16:35:44 +03:00
Piotr Smaron
d14d07a079 test: fix flaky test_sstable_write_large_{row,cell} by using a fixed partition key
Commit ce00d61917 ("db: implement large_data virtual tables with feature
flag gating") changed these two tests to construct their mutation with
a randomly generated partition key (simple_schema::make_pkey()) instead
of the previously fixed pk "pv", with the comment that this avoids a
"Failed to generate sharding metadata" error.

simple_schema::make_pkey() delegates to tests::generate_partition_key(),
which defaults to key_size{1, 128}, i.e. the partition key length is
uniformly random in [1, 128] bytes. That interacts badly with the fact
that both tests pick thresholds at exact byte boundaries of the MC
sstable row encoding:

  - The large-data handler records a row's size as
      _data_writer->offset() - current_pos
    (sstables/mx/writer.cc: collect_row_stats()), i.e. the number of
    bytes the row took on disk.
  - For the first clustering row, the body includes a vint-encoded
    prev_row_size = pos - _prev_row_start.
  - _prev_row_start is captured at the start of the partition
    (consume_new_partition()) before the partition key is written to
    the data stream, so prev_row_size rolls in the partition key's
    serialized length (2-byte prefix + pk bytes) + deletion_time +
    static row size.

A random-size partition key therefore perturbs the first clustering
row's encoded size by 1-2 bytes across runs (the vint of prev_row_size
crosses the 128 boundary), flipping the test's byte-exact threshold
comparison. On seed 2104744000 this produced:

  critical check row_size_count == expected.size() has failed [3 != 2]

Fix the two byte-exact-sensitive tests by reverting their partition key
to the fixed s.new_mutation("pv") used before ce00d61917. Under smp=1
(which these tests run with, per -c1 in the test invocation) a fixed
key is always shard-local, so no sharding-metadata issue arises here.

The other tests modified by ce00d61917 (test_sstable_log_too_many_rows,
test_sstable_log_too_many_dead_rows, test_sstable_too_many_collection_elements,
test_large_data_records_round_trip, etc.) assert on row/element counts
or use thresholds with enough slack that the partition key size does
not matter, and are left unchanged.

Add an explanatory comment to each fixed site so the pitfall is not
re-introduced by a future refactor.

Verified stable with:
  ./test.py --mode=dev     test/boost/sstable_3_x_test.cc::test_sstable_write_large_row  --repeat 100 --max-failures 1
  ./test.py --mode=dev     test/boost/sstable_3_x_test.cc::test_sstable_write_large_cell --repeat 100 --max-failures 1
  ./test.py --mode=release test/boost/sstable_3_x_test.cc::test_sstable_write_large_row  --repeat 100 --max-failures 1
  ./test.py --mode=release test/boost/sstable_3_x_test.cc::test_sstable_write_large_cell --repeat 100 --max-failures 1

All four invocations: 100/100 passed.

Fixes: SCYLLADB-1685

Closes scylladb/scylladb#29621
2026-04-25 16:32:02 +03:00
Botond Dénes
70261dc674 Merge 'test/cluster: scale failure_detector_timeout_in_ms by build mode' from Marcin Maliszkiewicz
The failure_detector_timeout_in_ms override of 2000ms in 6 cluster test files is too aggressive for debug/sanitize builds. During node joins, the coordinator's failure detector times out on RPC pings to the joining node while it is still applying schema snapshots, marks it DOWN, and bans it — causing flaky test failures.

Scale the timeout by MODES_TIMEOUT_FACTOR (3x for debug/sanitize, 2x for dev, 1x for release) via a shared failure_detector_timeout fixture in conftest.py.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1587
Backport: no, elasticsearch analyser shows only a single failure

Closes scylladb/scylladb#29522

* github.com:scylladb/scylladb:
  test/cluster: scale failure_detector_timeout_in_ms by build mode
  test/cluster: add failure_detector_timeout fixture
2026-04-24 09:10:43 +03:00
Marcin Maliszkiewicz
e414b2b0b9 test/cluster: scale failure_detector_timeout_in_ms by build mode
Six cluster test files override failure_detector_timeout_in_ms to 2000ms
for faster failure detection. In debug and sanitize builds, this causes
flaky node join failures. The following log analysis shows how.

The coordinator (server 614, IP 127.2.115.3) accepts the joining node
(server 615, host_id 53b01f0b, IP 127.2.115.2) into group0:

  20:10:57,049 [shard 0] raft_group0 - server 614 entered
    'join group0' transition state for 53b01f0b

The joining node begins receiving the raft snapshot 100ms later:

  20:10:57,150 [shard 0] raft_group0 - transfer snapshot from 9fa48539

It then spends ~280ms applying schema changes -- creating 6 keyspaces
and 12+ tables from the snapshot:

  20:10:57,511 [shard 0] migration_manager - Creating keyspace
    system_auth_v2
  ...
  20:10:57,788 [shard 0] migration_manager - Creating
    system_auth_v2.role_members

Meanwhile, the coordinator's failure detector pings the joining node.
Under debug+ASan load the RPC call times out after ~4.6 seconds:

  20:11:01,643 [shard 0] direct_failure_detector - unexpected exception
    when pinging 53b01f0b: seastar::rpc::timeout_error
    (rpc call timed out)

25ms later, the coordinator marks the joining node DOWN and removes it:

  20:11:01,668 [shard 0] raft_group0 - failure_detector_loop:
    Mark node 53b01f0b as DOWN
  20:11:01,717 [shard 0] raft_group0 - bootstrap: failed to accept
    53b01f0b

The joining node was still retrying the snapshot transfer at that point:

  20:11:01,745 [shard 0] raft_group0 - transfer snapshot from 9fa48539

It then receives the ban notification and aborts:

  20:11:01,844 [shard 0] raft_group0 - received notification of being
    banned from the cluster

Replace the hardcoded 2000ms with the failure_detector_timeout fixture
from conftest.py, which scales by MODES_TIMEOUT_FACTOR: 3x for
debug/sanitize (6000ms), 2x for dev (4000ms), 1x for release (2000ms).

Test measurements (before -> after fix):

  debug mode:
  test_replace_with_same_ip_twice           24.02s ->  25.02s
  test_banned_node_notification            217.22s -> 221.72s
  test_kill_coordinator_during_op          116.11s -> 127.13s
  test_node_failure_during_tablet_migration
    [streaming-source]                     183.25s -> 192.69s
  test_replace (4 tests)        skipped in debug (skip_in_debug)
  test_raft_replace_ignore_nodes  skipped in debug (run_in_dev only)

  dev mode:
  test_replace_different_ip                 10.51s ->  11.50s
  test_replace_different_ip_using_host_id   10.01s ->  12.01s
  test_replace_reuse_ip                     10.51s ->  12.03s
  test_replace_reuse_ip_using_host_id       13.01s ->  12.01s
  test_raft_replace_ignore_nodes            19.52s ->  19.52s
2026-04-20 15:28:34 +02:00
Marcin Maliszkiewicz
99ac36b353 test/cluster: add failure_detector_timeout fixture
Add a shared pytest fixture that scales the failure detector timeout
by build mode factor (e.g. 3x for debug/sanitize, 2x for dev).
2026-04-20 15:28:33 +02:00
13 changed files with 137 additions and 47 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.2.0-dev
VERSION=2026.2.0-rc0
if test -f version
then

View File

@@ -1088,7 +1088,7 @@ void compaction_manager::register_metrics() {
sm::make_gauge("normalized_backlog", [this] { return _last_backlog / available_memory(); },
sm::description("Holds the sum of normalized compaction backlog for all tables in the system. Backlog is normalized by dividing backlog by shard's available memory.")),
sm::make_counter("validation_errors", [this] { return _validation_errors; },
sm::description("Holds the number of encountered validation errors.")).set_skip_when_empty(),
sm::description("Holds the number of encountered validation errors.")),
});
}

View File

@@ -5171,8 +5171,16 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
// Use a fixed partition key. The row-size thresholds below are chosen at exact
// byte boundaries of the MC sstable row encoding: the first clustering row body
// encodes prev_row_size as a vint, and prev_row_size includes the partition
// header (which contains the partition key's serialized length+bytes). A
// random-size partition key (as produced by simple_schema::make_pkey() /
// tests::generate_partition_key(), which default to key_size{1,128}) would
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
// comparison, making this test flaky. Under smp=1 (which this test runs with),
// a fixed key is always shard-local, so no sharding-metadata issue arises.
mutation partition = s.new_mutation("pv");
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5244,8 +5252,16 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
// Use a fixed partition key. The cell-size thresholds below are chosen at exact
// byte boundaries of the MC sstable row encoding: the first clustering row body
// encodes prev_row_size as a vint, and prev_row_size includes the partition
// header (which contains the partition key's serialized length+bytes). A
// random-size partition key (as produced by simple_schema::make_pkey() /
// tests::generate_partition_key(), which default to key_size{1,128}) would
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
// comparison, making this test flaky. Under smp=1 (which this test runs with),
// a fixed key is always shard-local, so no sharding-metadata issue arises.
mutation partition = s.new_mutation("pv");
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5264,7 +5280,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
for (auto idx = 0; idx < rows - 1; idx++) {
@@ -5326,7 +5341,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
int live_rows = 0;
@@ -5436,7 +5450,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
std::map<bytes, bytes> kv_map;
for (auto i = 0; i < elements; i++) {
@@ -5512,7 +5525,6 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) {
// Create a mutation with a clustering row whose serialized cell value
// exceeds the 1-byte thresholds, so partition_size, row_size, and
// cell_size records are all generated.
// Use make_pkey() (no argument) to generate a key on this shard.
auto pk = ss.make_pkey();
mutation m(s, pk);
auto ck = ss.make_ckey("ck1");
@@ -5622,7 +5634,6 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) {
// Create 6 partitions, each with one row of increasing size.
// Since each partition has exactly one row, we get 6 row_size records
// competing for 3 slots.
// Use make_pkeys() to generate shard-local keys.
auto pkeys = ss.make_pkeys(6);
utils::chunked_vector<mutation> muts;
for (int i = 0; i < 6; i++) {

View File

@@ -18,7 +18,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from pathlib import Path
from typing import TYPE_CHECKING
from test import TOP_SRC_DIR, path_to
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
from test.pylib.random_tables import RandomTables
from test.pylib.skip_types import skip_env
from test.pylib.util import unique_name
@@ -394,3 +394,8 @@ async def key_provider(request, tmpdir, scylla_binary):
"""Encryption providers fixture"""
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
yield res
@pytest.fixture(scope="function")
def failure_detector_timeout(build_mode):
return 2000 * MODES_TIMEOUT_FACTOR[build_mode]

View File

@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
bootstrap, removenode, replace.
@@ -41,7 +41,7 @@ async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': 2000
'failure_detector_timeout_in_ms': failure_detector_timeout
}
cmdline = [
'--logger-log-level', 'raft_topology=trace',

View File

@@ -22,11 +22,11 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.nightly
async def test_banned_node_notification(manager: ManagerClient) -> None:
async def test_banned_node_notification(manager: ManagerClient, failure_detector_timeout) -> None:
"""Test that a node banned from the cluster get notification about been banned"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': 2000
'failure_detector_timeout_in_ms': failure_detector_timeout
}
srvs = await manager.servers_add(3, config=config, auto_rack_dc="dc")
cql = manager.get_cql()

View File

@@ -60,14 +60,14 @@ async def make_servers(manager: ManagerClient, servers_num: int,
@pytest.mark.asyncio
async def test_raft_replace_ignore_nodes(manager: ManagerClient) -> None:
async def test_raft_replace_ignore_nodes(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace 3 dead nodes.
This is a slow test with a 7 node cluster and 3 replace operations,
we want to run it only in dev mode.
"""
logger.info("Booting initial cluster")
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': 2000})
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
s1_id = await manager.get_host_id(servers[1].server_id)
s2_id = await manager.get_host_id(servers[2].server_id)

View File

@@ -21,9 +21,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_different_ip(manager: ManagerClient) -> None:
async def test_replace_different_ip(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node using a different IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
logger.info(f"cluster started, servers {servers}")
logger.info(f"replacing server {servers[0]}")
@@ -67,18 +67,18 @@ async def test_replace_different_ip(manager: ManagerClient) -> None:
logger.info(f"server {s} system.peers and gossiper state is valid")
@pytest.mark.asyncio
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
async def test_replace_different_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node reusing the replaced node host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
@pytest.mark.asyncio
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node using the same IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout}, auto_rack_dc="dc1")
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
logger.info(f"creating test table")
@@ -130,9 +130,9 @@ async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
@pytest.mark.asyncio
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node using the same IP address and same host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
await manager.server_add(replace_cfg)

View File

@@ -14,9 +14,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None:
async def test_replace_with_same_ip_twice(manager: ManagerClient, failure_detector_timeout) -> None:
logger.info("starting a cluster with two nodes")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
logger.info(f"cluster started {servers}")
async def replace_with_same_ip(s: ServerInfo) -> ServerInfo:

View File

@@ -119,14 +119,14 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage, failure_detector_timeout):
if fail_stage == 'cleanup' and fail_replica == 'destination':
skip_env('Failing destination during cleanup is pointless')
if fail_stage == 'cleanup_target' and fail_replica == 'source':
skip_env('Failing source during target cleanup is pointless')
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': failure_detector_timeout}
host_ids = []
servers = []

View File

@@ -37,6 +37,6 @@ class ServerInfo(NamedTuple):
class ServerUpState(IntEnum):
PROCESS_STARTED = auto()
HOST_ID_QUERIED = auto()
CQL_CONNECTED = auto()
CQL_QUERIED = auto()
CQL_ALTERNATOR_CONNECTED = auto()
CQL_ALTERNATOR_QUERIED = auto()
SERVING = auto() # Scylla sent sd_notify("serving")

View File

@@ -389,7 +389,7 @@ class ManagerClient:
seeds: list[IPAddress] | None = None,
timeout: float | None = None,
connect_driver: bool = True,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None,
auth_provider: dict[str, str] | None = None) -> None:
@@ -540,7 +540,7 @@ class ManagerClient:
seeds: Optional[List[IPAddress]] = None,
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT,
server_encryption: str = "none",
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
connect_driver: bool = True) -> ServerInfo:
"""Add a new server"""
if expected_error is not None:

View File

@@ -41,6 +41,7 @@ import yaml
import signal
import glob
import errno
import json
import re
import platform
import contextlib
@@ -557,7 +558,7 @@ class ScyllaServer:
async def install_and_start(self,
api: ScyllaRESTAPIClient,
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None:
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None:
"""Setup and start this server."""
await self.install()
@@ -677,10 +678,63 @@ class ScyllaServer:
return None
return maintenance_socket_option
async def get_cql_up_state(self) -> ServerUpState | None:
"""Get the CQL up state (a check we use at start up).
def _alternator_ports(self) -> list[tuple[str, int]]:
"""Return (scheme, port) for every configured Alternator port."""
ports = []
if "alternator_port" in self.config:
ports.append(("http", self.config["alternator_port"]))
if "alternator_https_port" in self.config:
ports.append(("https", self.config["alternator_https_port"]))
return ports
Return None if it fails to connect.
async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool:
"""TCP connect to every configured Alternator port.
Returns True if all ports accept connections.
"""
for _, port in ports:
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(self.ip_addr, port), timeout=2)
writer.close()
await writer.wait_closed()
except (OSError, asyncio.TimeoutError):
return False
return True
async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool:
"""Sends a GetItem for a randomly-named nonexistent table and validates
that the response is a DynamoDB-shaped JSON error (contains __type),
confirming Alternator is processing DynamoDB API requests.
Returns True if all ports respond correctly.
"""
table_name = f"nonexistent_table_{uuid.uuid4().hex}"
headers = {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.GetItem",
}
body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}})
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(timeout=timeout) as session:
for scheme, port in ports:
url = f"{scheme}://{self.ip_addr}:{port}/"
try:
# ssl=False skips certificate verification
async with session.post(url, headers=headers, data=body, ssl=False) as resp:
response_body = await resp.json(content_type=None)
if "__type" not in response_body:
return False
except Exception as exc:
self.logger.debug("Alternator query check failed for %s: %s", url, exc)
return False
return True
async def get_cql_up_state(self) -> tuple[bool, bool]:
"""Check CQL connectivity.
Returns (connected, queried) indicating whether a CQL connection
was established and whether a query executed successfully.
"""
caslog = logging.getLogger('cassandra')
oldlevel = caslog.getEffectiveLevel()
@@ -708,6 +762,7 @@ class ScyllaServer:
request_timeout=self.TOPOLOGY_TIMEOUT)
contact_points=[self.rpc_address]
connected = False
cql_queried = False
try:
# In a cluster setup, it's possible that the CQL
# here is directed to a node different from the initial contact
@@ -730,13 +785,32 @@ class ScyllaServer:
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
auth_provider=self.auth_provider)
self.control_connection = self.control_cluster.connect()
return ServerUpState.CQL_QUERIED
cql_queried = True
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
self.logger.debug("Exception when checking if CQL is up: %s", exc)
return ServerUpState.CQL_CONNECTED if connected else None
finally:
caslog.setLevel(oldlevel)
# Any other exception may indicate a problem, and is passed to the caller.
return connected, cql_queried
async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]:
connected = await self.check_alternator_connected(ports)
queried = connected and await self.check_alternator_queried(ports)
return connected, queried
async def get_cql_alternator_up_state(self) -> ServerUpState | None:
"""Get the combined CQL + Alternator up state."""
cql_connected, cql_queried = await self.get_cql_up_state()
alt_connected, alt_queried = False, False
alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator
if alt_ports:
alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports)
if cql_queried and (alt_queried or not alt_ports):
return ServerUpState.CQL_ALTERNATOR_QUERIED
if not cql_connected or (alt_ports and not alt_connected):
return None
# Here both CQL and Alternator (if exists) are at least connected
return ServerUpState.CQL_ALTERNATOR_CONNECTED
def _setup_notify_socket(self) -> None:
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
@@ -824,7 +898,7 @@ class ScyllaServer:
async def start(self,
api: ScyllaRESTAPIClient,
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None) -> None:
"""Start an installed server.
@@ -897,9 +971,9 @@ class ScyllaServer:
if await self.try_get_host_id(api):
if server_up_state == ServerUpState.PROCESS_STARTED:
server_up_state = ServerUpState.HOST_ID_QUERIED
server_up_state = await self.get_cql_up_state() or server_up_state
server_up_state = await self.get_cql_alternator_up_state() or server_up_state
# Check for SERVING state (sd_notify "serving" message)
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification():
server_up_state = ServerUpState.SERVING
if server_up_state >= expected_server_up_state:
if expected_error is not None:
@@ -1194,7 +1268,7 @@ class ScyllaCluster:
seeds: Optional[List[IPAddress]] = None,
server_encryption: str = "none",
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo:
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo:
"""Add a new server to the cluster"""
self.is_dirty = True
@@ -1434,7 +1508,7 @@ class ScyllaCluster:
server_id: ServerNum,
expected_error: str | None = None,
seeds: list[IPAddress] | None = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None,
auth_provider: dict[str, str] | None = None) -> None:
@@ -1917,7 +1991,7 @@ class ScyllaClusterManager:
server_id=server_id,
expected_error=data.get("expected_error"),
seeds=data.get("seeds"),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
cmdline_options_override=data.get("cmdline_options_override"),
append_env_override=data.get("append_env_override"),
auth_provider=data.get("auth_provider"),
@@ -1952,7 +2026,7 @@ class ScyllaClusterManager:
seeds=data.get("seeds"),
server_encryption=data.get("server_encryption", "none"),
expected_error=data.get("expected_error"),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
)
return s_info.as_dict()