mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 19:35:12 +00:00
Compare commits
6 Commits
ykaul/skip
...
next-2026.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5231c77e8e | ||
|
|
d5efd1f676 | ||
|
|
d14d07a079 | ||
|
|
70261dc674 | ||
|
|
e414b2b0b9 | ||
|
|
99ac36b353 |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2026.2.0-dev
|
||||
VERSION=2026.2.0-rc0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -1088,7 +1088,7 @@ void compaction_manager::register_metrics() {
|
||||
sm::make_gauge("normalized_backlog", [this] { return _last_backlog / available_memory(); },
|
||||
sm::description("Holds the sum of normalized compaction backlog for all tables in the system. Backlog is normalized by dividing backlog by shard's available memory.")),
|
||||
sm::make_counter("validation_errors", [this] { return _validation_errors; },
|
||||
sm::description("Holds the number of encountered validation errors.")).set_skip_when_empty(),
|
||||
sm::description("Holds the number of encountered validation errors.")),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -5171,8 +5171,16 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
// Use a fixed partition key. The row-size thresholds below are chosen at exact
|
||||
// byte boundaries of the MC sstable row encoding: the first clustering row body
|
||||
// encodes prev_row_size as a vint, and prev_row_size includes the partition
|
||||
// header (which contains the partition key's serialized length+bytes). A
|
||||
// random-size partition key (as produced by simple_schema::make_pkey() /
|
||||
// tests::generate_partition_key(), which default to key_size{1,128}) would
|
||||
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
|
||||
// comparison, making this test flaky. Under smp=1 (which this test runs with),
|
||||
// a fixed key is always shard-local, so no sharding-metadata issue arises.
|
||||
mutation partition = s.new_mutation("pv");
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5244,8 +5252,16 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
// Use a fixed partition key. The cell-size thresholds below are chosen at exact
|
||||
// byte boundaries of the MC sstable row encoding: the first clustering row body
|
||||
// encodes prev_row_size as a vint, and prev_row_size includes the partition
|
||||
// header (which contains the partition key's serialized length+bytes). A
|
||||
// random-size partition key (as produced by simple_schema::make_pkey() /
|
||||
// tests::generate_partition_key(), which default to key_size{1,128}) would
|
||||
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
|
||||
// comparison, making this test flaky. Under smp=1 (which this test runs with),
|
||||
// a fixed key is always shard-local, so no sharding-metadata issue arises.
|
||||
mutation partition = s.new_mutation("pv");
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5264,7 +5280,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
for (auto idx = 0; idx < rows - 1; idx++) {
|
||||
@@ -5326,7 +5341,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
||||
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
int live_rows = 0;
|
||||
@@ -5436,7 +5450,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
|
||||
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
std::map<bytes, bytes> kv_map;
|
||||
for (auto i = 0; i < elements; i++) {
|
||||
@@ -5512,7 +5525,6 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) {
|
||||
// Create a mutation with a clustering row whose serialized cell value
|
||||
// exceeds the 1-byte thresholds, so partition_size, row_size, and
|
||||
// cell_size records are all generated.
|
||||
// Use make_pkey() (no argument) to generate a key on this shard.
|
||||
auto pk = ss.make_pkey();
|
||||
mutation m(s, pk);
|
||||
auto ck = ss.make_ckey("ck1");
|
||||
@@ -5622,7 +5634,6 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) {
|
||||
// Create 6 partitions, each with one row of increasing size.
|
||||
// Since each partition has exactly one row, we get 6 row_size records
|
||||
// competing for 3 slots.
|
||||
// Use make_pkeys() to generate shard-local keys.
|
||||
auto pkeys = ss.make_pkeys(6);
|
||||
utils::chunked_vector<mutation> muts;
|
||||
for (int i = 0; i < 6; i++) {
|
||||
|
||||
@@ -18,7 +18,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from multiprocessing import Event
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from test import TOP_SRC_DIR, path_to
|
||||
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.skip_types import skip_env
|
||||
from test.pylib.util import unique_name
|
||||
@@ -394,3 +394,8 @@ async def key_provider(request, tmpdir, scylla_binary):
|
||||
"""Encryption providers fixture"""
|
||||
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
|
||||
yield res
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def failure_detector_timeout(build_mode):
|
||||
return 2000 * MODES_TIMEOUT_FACTOR[build_mode]
|
||||
|
||||
@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
|
||||
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
|
||||
bootstrap, removenode, replace.
|
||||
|
||||
@@ -41,7 +41,7 @@ async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
|
||||
"""
|
||||
# Decrease the failure detector threshold so we don't have to wait for too long.
|
||||
config = {
|
||||
'failure_detector_timeout_in_ms': 2000
|
||||
'failure_detector_timeout_in_ms': failure_detector_timeout
|
||||
}
|
||||
cmdline = [
|
||||
'--logger-log-level', 'raft_topology=trace',
|
||||
|
||||
@@ -22,11 +22,11 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.nightly
|
||||
async def test_banned_node_notification(manager: ManagerClient) -> None:
|
||||
async def test_banned_node_notification(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Test that a node banned from the cluster get notification about been banned"""
|
||||
# Decrease the failure detector threshold so we don't have to wait for too long.
|
||||
config = {
|
||||
'failure_detector_timeout_in_ms': 2000
|
||||
'failure_detector_timeout_in_ms': failure_detector_timeout
|
||||
}
|
||||
srvs = await manager.servers_add(3, config=config, auto_rack_dc="dc")
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -60,14 +60,14 @@ async def make_servers(manager: ManagerClient, servers_num: int,
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raft_replace_ignore_nodes(manager: ManagerClient) -> None:
|
||||
async def test_raft_replace_ignore_nodes(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Replace 3 dead nodes.
|
||||
|
||||
This is a slow test with a 7 node cluster and 3 replace operations,
|
||||
we want to run it only in dev mode.
|
||||
"""
|
||||
logger.info("Booting initial cluster")
|
||||
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
|
||||
s1_id = await manager.get_host_id(servers[1].server_id)
|
||||
s2_id = await manager.get_host_id(servers[2].server_id)
|
||||
|
||||
@@ -21,9 +21,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_different_ip(manager: ManagerClient) -> None:
|
||||
async def test_replace_different_ip(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Replace an existing node with new node using a different IP address"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
logger.info(f"cluster started, servers {servers}")
|
||||
|
||||
logger.info(f"replacing server {servers[0]}")
|
||||
@@ -67,18 +67,18 @@ async def test_replace_different_ip(manager: ManagerClient) -> None:
|
||||
logger.info(f"server {s} system.peers and gossiper state is valid")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
|
||||
async def test_replace_different_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Replace an existing node with new node reusing the replaced node host id"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
|
||||
await manager.server_add(replace_cfg)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
|
||||
async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Replace an existing node with new node using the same IP address"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout}, auto_rack_dc="dc1")
|
||||
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
|
||||
|
||||
logger.info(f"creating test table")
|
||||
@@ -130,9 +130,9 @@ async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
|
||||
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
|
||||
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Replace an existing node with new node using the same IP address and same host id"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
|
||||
await manager.server_add(replace_cfg)
|
||||
|
||||
@@ -14,9 +14,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None:
|
||||
async def test_replace_with_same_ip_twice(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
logger.info("starting a cluster with two nodes")
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
logger.info(f"cluster started {servers}")
|
||||
|
||||
async def replace_with_same_ip(s: ServerInfo) -> ServerInfo:
|
||||
|
||||
@@ -119,14 +119,14 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
|
||||
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
|
||||
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage, failure_detector_timeout):
|
||||
if fail_stage == 'cleanup' and fail_replica == 'destination':
|
||||
skip_env('Failing destination during cleanup is pointless')
|
||||
if fail_stage == 'cleanup_target' and fail_replica == 'source':
|
||||
skip_env('Failing source during target cleanup is pointless')
|
||||
|
||||
logger.info("Bootstrapping cluster")
|
||||
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
|
||||
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': failure_detector_timeout}
|
||||
host_ids = []
|
||||
servers = []
|
||||
|
||||
|
||||
@@ -37,6 +37,6 @@ class ServerInfo(NamedTuple):
|
||||
class ServerUpState(IntEnum):
|
||||
PROCESS_STARTED = auto()
|
||||
HOST_ID_QUERIED = auto()
|
||||
CQL_CONNECTED = auto()
|
||||
CQL_QUERIED = auto()
|
||||
CQL_ALTERNATOR_CONNECTED = auto()
|
||||
CQL_ALTERNATOR_QUERIED = auto()
|
||||
SERVING = auto() # Scylla sent sd_notify("serving")
|
||||
|
||||
@@ -389,7 +389,7 @@ class ManagerClient:
|
||||
seeds: list[IPAddress] | None = None,
|
||||
timeout: float | None = None,
|
||||
connect_driver: bool = True,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None,
|
||||
auth_provider: dict[str, str] | None = None) -> None:
|
||||
@@ -540,7 +540,7 @@ class ManagerClient:
|
||||
seeds: Optional[List[IPAddress]] = None,
|
||||
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT,
|
||||
server_encryption: str = "none",
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
connect_driver: bool = True) -> ServerInfo:
|
||||
"""Add a new server"""
|
||||
if expected_error is not None:
|
||||
|
||||
@@ -41,6 +41,7 @@ import yaml
|
||||
import signal
|
||||
import glob
|
||||
import errno
|
||||
import json
|
||||
import re
|
||||
import platform
|
||||
import contextlib
|
||||
@@ -557,7 +558,7 @@ class ScyllaServer:
|
||||
async def install_and_start(self,
|
||||
api: ScyllaRESTAPIClient,
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None:
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None:
|
||||
"""Setup and start this server."""
|
||||
|
||||
await self.install()
|
||||
@@ -677,10 +678,63 @@ class ScyllaServer:
|
||||
return None
|
||||
return maintenance_socket_option
|
||||
|
||||
async def get_cql_up_state(self) -> ServerUpState | None:
|
||||
"""Get the CQL up state (a check we use at start up).
|
||||
def _alternator_ports(self) -> list[tuple[str, int]]:
|
||||
"""Return (scheme, port) for every configured Alternator port."""
|
||||
ports = []
|
||||
if "alternator_port" in self.config:
|
||||
ports.append(("http", self.config["alternator_port"]))
|
||||
if "alternator_https_port" in self.config:
|
||||
ports.append(("https", self.config["alternator_https_port"]))
|
||||
return ports
|
||||
|
||||
Return None if it fails to connect.
|
||||
async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool:
|
||||
"""TCP connect to every configured Alternator port.
|
||||
|
||||
Returns True if all ports accept connections.
|
||||
"""
|
||||
for _, port in ports:
|
||||
try:
|
||||
_, writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(self.ip_addr, port), timeout=2)
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
except (OSError, asyncio.TimeoutError):
|
||||
return False
|
||||
return True
|
||||
|
||||
async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool:
|
||||
"""Sends a GetItem for a randomly-named nonexistent table and validates
|
||||
that the response is a DynamoDB-shaped JSON error (contains __type),
|
||||
confirming Alternator is processing DynamoDB API requests.
|
||||
|
||||
Returns True if all ports respond correctly.
|
||||
"""
|
||||
table_name = f"nonexistent_table_{uuid.uuid4().hex}"
|
||||
headers = {
|
||||
"Content-Type": "application/x-amz-json-1.0",
|
||||
"X-Amz-Target": "DynamoDB_20120810.GetItem",
|
||||
}
|
||||
body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}})
|
||||
timeout = aiohttp.ClientTimeout(total=2)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
for scheme, port in ports:
|
||||
url = f"{scheme}://{self.ip_addr}:{port}/"
|
||||
try:
|
||||
# ssl=False skips certificate verification
|
||||
async with session.post(url, headers=headers, data=body, ssl=False) as resp:
|
||||
response_body = await resp.json(content_type=None)
|
||||
if "__type" not in response_body:
|
||||
return False
|
||||
except Exception as exc:
|
||||
self.logger.debug("Alternator query check failed for %s: %s", url, exc)
|
||||
return False
|
||||
return True
|
||||
|
||||
async def get_cql_up_state(self) -> tuple[bool, bool]:
|
||||
"""Check CQL connectivity.
|
||||
|
||||
Returns (connected, queried) indicating whether a CQL connection
|
||||
was established and whether a query executed successfully.
|
||||
"""
|
||||
caslog = logging.getLogger('cassandra')
|
||||
oldlevel = caslog.getEffectiveLevel()
|
||||
@@ -708,6 +762,7 @@ class ScyllaServer:
|
||||
request_timeout=self.TOPOLOGY_TIMEOUT)
|
||||
contact_points=[self.rpc_address]
|
||||
connected = False
|
||||
cql_queried = False
|
||||
try:
|
||||
# In a cluster setup, it's possible that the CQL
|
||||
# here is directed to a node different from the initial contact
|
||||
@@ -730,13 +785,32 @@ class ScyllaServer:
|
||||
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
|
||||
auth_provider=self.auth_provider)
|
||||
self.control_connection = self.control_cluster.connect()
|
||||
return ServerUpState.CQL_QUERIED
|
||||
cql_queried = True
|
||||
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
|
||||
self.logger.debug("Exception when checking if CQL is up: %s", exc)
|
||||
return ServerUpState.CQL_CONNECTED if connected else None
|
||||
finally:
|
||||
caslog.setLevel(oldlevel)
|
||||
# Any other exception may indicate a problem, and is passed to the caller.
|
||||
return connected, cql_queried
|
||||
|
||||
async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]:
|
||||
connected = await self.check_alternator_connected(ports)
|
||||
queried = connected and await self.check_alternator_queried(ports)
|
||||
return connected, queried
|
||||
|
||||
async def get_cql_alternator_up_state(self) -> ServerUpState | None:
|
||||
"""Get the combined CQL + Alternator up state."""
|
||||
cql_connected, cql_queried = await self.get_cql_up_state()
|
||||
alt_connected, alt_queried = False, False
|
||||
alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator
|
||||
if alt_ports:
|
||||
alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports)
|
||||
if cql_queried and (alt_queried or not alt_ports):
|
||||
return ServerUpState.CQL_ALTERNATOR_QUERIED
|
||||
if not cql_connected or (alt_ports and not alt_connected):
|
||||
return None
|
||||
# Here both CQL and Alternator (if exists) are at least connected
|
||||
return ServerUpState.CQL_ALTERNATOR_CONNECTED
|
||||
|
||||
def _setup_notify_socket(self) -> None:
|
||||
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
|
||||
@@ -824,7 +898,7 @@ class ScyllaServer:
|
||||
async def start(self,
|
||||
api: ScyllaRESTAPIClient,
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None) -> None:
|
||||
"""Start an installed server.
|
||||
@@ -897,9 +971,9 @@ class ScyllaServer:
|
||||
if await self.try_get_host_id(api):
|
||||
if server_up_state == ServerUpState.PROCESS_STARTED:
|
||||
server_up_state = ServerUpState.HOST_ID_QUERIED
|
||||
server_up_state = await self.get_cql_up_state() or server_up_state
|
||||
server_up_state = await self.get_cql_alternator_up_state() or server_up_state
|
||||
# Check for SERVING state (sd_notify "serving" message)
|
||||
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
|
||||
if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification():
|
||||
server_up_state = ServerUpState.SERVING
|
||||
if server_up_state >= expected_server_up_state:
|
||||
if expected_error is not None:
|
||||
@@ -1194,7 +1268,7 @@ class ScyllaCluster:
|
||||
seeds: Optional[List[IPAddress]] = None,
|
||||
server_encryption: str = "none",
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo:
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo:
|
||||
"""Add a new server to the cluster"""
|
||||
self.is_dirty = True
|
||||
|
||||
@@ -1434,7 +1508,7 @@ class ScyllaCluster:
|
||||
server_id: ServerNum,
|
||||
expected_error: str | None = None,
|
||||
seeds: list[IPAddress] | None = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None,
|
||||
auth_provider: dict[str, str] | None = None) -> None:
|
||||
@@ -1917,7 +1991,7 @@ class ScyllaClusterManager:
|
||||
server_id=server_id,
|
||||
expected_error=data.get("expected_error"),
|
||||
seeds=data.get("seeds"),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
|
||||
cmdline_options_override=data.get("cmdline_options_override"),
|
||||
append_env_override=data.get("append_env_override"),
|
||||
auth_provider=data.get("auth_provider"),
|
||||
@@ -1952,7 +2026,7 @@ class ScyllaClusterManager:
|
||||
seeds=data.get("seeds"),
|
||||
server_encryption=data.get("server_encryption", "none"),
|
||||
expected_error=data.get("expected_error"),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
|
||||
)
|
||||
return s_info.as_dict()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user