mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 12:17:02 +00:00
Compare commits
1 Commits
migrate-wi
...
fix/raft-t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b65d6d0fc |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2026.3.0-dev
|
||||
VERSION=2026.2.0-dev
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -72,6 +72,7 @@ struct raft_topology_cmd_result {
|
||||
success
|
||||
};
|
||||
service::raft_topology_cmd_result::command_status status;
|
||||
sstring error_message [[version 2026.2]];
|
||||
};
|
||||
|
||||
struct raft_snapshot {
|
||||
|
||||
@@ -4792,8 +4792,13 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
}
|
||||
} catch (const raft::request_aborted& e) {
|
||||
rtlogger.warn("raft_topology_cmd {} failed with: {}", cmd.cmd, e);
|
||||
result.error_message = e.what();
|
||||
} catch (const std::exception& e) {
|
||||
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, e);
|
||||
result.error_message = e.what();
|
||||
} catch (...) {
|
||||
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, std::current_exception());
|
||||
result.error_message = "unknown error";
|
||||
}
|
||||
|
||||
rtlogger.info("topology cmd rpc {} completed with status={} index={}",
|
||||
|
||||
@@ -443,8 +443,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd(
|
||||
&_messaging, to_host_id(id), id, _term, cmd_index, cmd);
|
||||
if (result.status == raft_topology_cmd_result::command_status::fail) {
|
||||
auto msg = result.error_message.empty()
|
||||
? ::format("failed status returned from {}", id)
|
||||
: ::format("failed status returned from {}: {}", id, result.error_message);
|
||||
co_await coroutine::exception(std::make_exception_ptr(
|
||||
std::runtime_error(::format("failed status returned from {}", id))));
|
||||
std::runtime_error(std::move(msg))));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3909,10 +3912,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
throw;
|
||||
} catch (seastar::abort_requested_exception&) {
|
||||
throw;
|
||||
} catch (const std::exception& e) {
|
||||
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
|
||||
" (node state is rebuilding): {}", e);
|
||||
rtbuilder.done(e.what());
|
||||
retake = true;
|
||||
} catch (...) {
|
||||
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
|
||||
" (node state is rebuilding): {}", std::current_exception());
|
||||
rtbuilder.done("streaming failed");
|
||||
rtbuilder.done("unknown error");
|
||||
retake = true;
|
||||
}
|
||||
if (retake) {
|
||||
|
||||
@@ -318,6 +318,9 @@ struct raft_topology_cmd_result {
|
||||
success
|
||||
};
|
||||
command_status status = command_status::fail;
|
||||
// Carries the error description back to the topology coordinator
|
||||
// when the command fails.
|
||||
sstring error_message;
|
||||
};
|
||||
|
||||
// This class is used in RPC's signatures to hold the topology_version of the caller.
|
||||
|
||||
@@ -5171,16 +5171,8 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use a fixed partition key. The row-size thresholds below are chosen at exact
|
||||
// byte boundaries of the MC sstable row encoding: the first clustering row body
|
||||
// encodes prev_row_size as a vint, and prev_row_size includes the partition
|
||||
// header (which contains the partition key's serialized length+bytes). A
|
||||
// random-size partition key (as produced by simple_schema::make_pkey() /
|
||||
// tests::generate_partition_key(), which default to key_size{1,128}) would
|
||||
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
|
||||
// comparison, making this test flaky. Under smp=1 (which this test runs with),
|
||||
// a fixed key is always shard-local, so no sharding-metadata issue arises.
|
||||
mutation partition = s.new_mutation("pv");
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5252,16 +5244,8 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use a fixed partition key. The cell-size thresholds below are chosen at exact
|
||||
// byte boundaries of the MC sstable row encoding: the first clustering row body
|
||||
// encodes prev_row_size as a vint, and prev_row_size includes the partition
|
||||
// header (which contains the partition key's serialized length+bytes). A
|
||||
// random-size partition key (as produced by simple_schema::make_pkey() /
|
||||
// tests::generate_partition_key(), which default to key_size{1,128}) would
|
||||
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
|
||||
// comparison, making this test flaky. Under smp=1 (which this test runs with),
|
||||
// a fixed key is always shard-local, so no sharding-metadata issue arises.
|
||||
mutation partition = s.new_mutation("pv");
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5280,6 +5264,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
for (auto idx = 0; idx < rows - 1; idx++) {
|
||||
@@ -5341,6 +5326,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
||||
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
int live_rows = 0;
|
||||
@@ -5450,6 +5436,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
|
||||
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
std::map<bytes, bytes> kv_map;
|
||||
for (auto i = 0; i < elements; i++) {
|
||||
@@ -5525,6 +5512,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) {
|
||||
// Create a mutation with a clustering row whose serialized cell value
|
||||
// exceeds the 1-byte thresholds, so partition_size, row_size, and
|
||||
// cell_size records are all generated.
|
||||
// Use make_pkey() (no argument) to generate a key on this shard.
|
||||
auto pk = ss.make_pkey();
|
||||
mutation m(s, pk);
|
||||
auto ck = ss.make_ckey("ck1");
|
||||
@@ -5634,6 +5622,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) {
|
||||
// Create 6 partitions, each with one row of increasing size.
|
||||
// Since each partition has exactly one row, we get 6 row_size records
|
||||
// competing for 3 slots.
|
||||
// Use make_pkeys() to generate shard-local keys.
|
||||
auto pkeys = ss.make_pkeys(6);
|
||||
utils::chunked_vector<mutation> muts;
|
||||
for (int i = 0; i < 6; i++) {
|
||||
|
||||
@@ -18,7 +18,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from multiprocessing import Event
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
|
||||
from test import TOP_SRC_DIR, path_to
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.skip_types import skip_env
|
||||
from test.pylib.util import unique_name
|
||||
@@ -394,8 +394,3 @@ async def key_provider(request, tmpdir, scylla_binary):
|
||||
"""Encryption providers fixture"""
|
||||
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
|
||||
yield res
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def failure_detector_timeout(build_mode):
|
||||
return 2000 * MODES_TIMEOUT_FACTOR[build_mode]
|
||||
|
||||
@@ -227,11 +227,6 @@ class ScyllaCluster:
|
||||
def flush(self) -> None:
|
||||
self.nodetool("flush")
|
||||
|
||||
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
|
||||
for node in self.nodelist():
|
||||
if node.is_running():
|
||||
node.compact(keyspace=keyspace, tables=tables)
|
||||
|
||||
@staticmethod
|
||||
def debug(message: str) -> None:
|
||||
logger.debug(message)
|
||||
|
||||
@@ -111,7 +111,6 @@ class ScyllaNode:
|
||||
self.data_center = server.datacenter
|
||||
self.rack = server.rack
|
||||
|
||||
self._hostid = None
|
||||
self._smp_set_during_test = None
|
||||
self._smp = None
|
||||
self._memory = None
|
||||
@@ -466,9 +465,6 @@ class ScyllaNode:
|
||||
if wait_for_binary_proto:
|
||||
self.wait_for_binary_interface(from_mark=self.mark)
|
||||
|
||||
if not self._hostid:
|
||||
self.hostid()
|
||||
|
||||
if wait_other_notice:
|
||||
timeout = self.cluster.default_wait_other_notice_timeout
|
||||
for node, mark in marks:
|
||||
@@ -651,12 +647,11 @@ class ScyllaNode:
|
||||
cmd.append(table)
|
||||
self.nodetool(" ".join(cmd), **kwargs)
|
||||
|
||||
def compact(self, keyspace: str = "", tables: list[str] | None = None) -> None:
|
||||
def compact(self, keyspace: str = "", tables: str | None = ()) -> None:
|
||||
compact_cmd = ["compact"]
|
||||
if keyspace:
|
||||
compact_cmd.append(keyspace)
|
||||
if tables:
|
||||
compact_cmd.extend(tables)
|
||||
compact_cmd += tables
|
||||
self.nodetool(" ".join(compact_cmd))
|
||||
|
||||
def drain(self, block_on_log: bool = False) -> None:
|
||||
@@ -829,13 +824,10 @@ class ScyllaNode:
|
||||
assert timeout is None, "argument `timeout` is not supported" # not used in scylla-dtest
|
||||
assert force_refresh is None, "argument `force_refresh` is not supported" # not used in scylla-dtest
|
||||
|
||||
if not self._hostid:
|
||||
try:
|
||||
self._hostid = self.cluster.manager.get_host_id(server_id=self.server_id)
|
||||
except Exception as exc:
|
||||
self.error(f"Failed to get hostid: {exc}")
|
||||
|
||||
return self._hostid
|
||||
try:
|
||||
return self.cluster.manager.get_host_id(server_id=self.server_id)
|
||||
except Exception as exc:
|
||||
self.error(f"Failed to get hostid: {exc}")
|
||||
|
||||
def rmtree(self, path: str | Path) -> None:
|
||||
"""Delete a directory content without removing the directory.
|
||||
|
||||
@@ -34,7 +34,6 @@ def pytest_addoption(parser: Parser) -> None:
|
||||
parser.addoption("--experimental-features", type=lambda s: s.split(","), action="store", help="Pass experimental features <feature>,<feature> to enable", default=None)
|
||||
parser.addoption("--tablets", action=argparse.BooleanOptionalAction, default=False, help="Whether to enable tablets support (default: %(default)s)")
|
||||
parser.addoption("--force-gossip-topology-changes", action="store_true", default=False, help="force gossip topology changes in a fresh cluster")
|
||||
parser.addoption("--compaction-strategy", action="store", default=None, help="Compaction strategy to use in tests that support it (e.g. wide_rows_test.py). One of LeveledCompactionStrategy, SizeTieredCompactionStrategy, TimeWindowCompactionStrategy, or IncrementalCompactionStrategy. If not set, a random strategy is chosen per test.")
|
||||
|
||||
|
||||
def pytest_configure(config: Config) -> None:
|
||||
|
||||
@@ -263,25 +263,3 @@ def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
|
||||
sorted_list2 = sorted(normalized_list2, key=lambda elm: str(elm[sort_key]))
|
||||
|
||||
assert sorted_list1 == sorted_list2
|
||||
|
||||
|
||||
def assert_equal_more_with_deviation(actual, expect, deviation_perc):
|
||||
"""
|
||||
Assert actual is within inclusive interval [expected...expected+deviation_perc]
|
||||
@param actual Value inspected
|
||||
@param expect Beginning of expected interval
|
||||
@param deviation_perc allowed percent increase
|
||||
"""
|
||||
deviation_high = (expect * (100 + deviation_perc)) / 100
|
||||
assert expect <= actual <= deviation_high, f"Expect result interval {expect}..{deviation_high}, received {actual}"
|
||||
|
||||
|
||||
def assert_less_equal_lists(actual_list, expected_list, msg=None):
|
||||
"""
|
||||
Assert actual_list is a subset of the expected list, prints hardcoded or parameterized error message
|
||||
@param actual_list Inspected list
|
||||
@param expected_list List that supposed to include actual_list
|
||||
@param msg Configured message default None.
|
||||
"""
|
||||
standardMsg = msg or f"{actual_list} not less than or equal to {expected_list}"
|
||||
assert set(actual_list) <= set(expected_list), standardMsg
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -48,6 +48,5 @@ run_in_dev:
|
||||
- dtest/commitlog_test
|
||||
- dtest/cfid_test
|
||||
- dtest/rebuild_test
|
||||
- dtest/wide_rows_test
|
||||
run_in_debug:
|
||||
- random_failures/test_random_failures
|
||||
|
||||
@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
|
||||
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
|
||||
bootstrap, removenode, replace.
|
||||
|
||||
@@ -41,7 +41,7 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
|
||||
"""
|
||||
# Decrease the failure detector threshold so we don't have to wait for too long.
|
||||
config = {
|
||||
'failure_detector_timeout_in_ms': failure_detector_timeout
|
||||
'failure_detector_timeout_in_ms': 2000
|
||||
}
|
||||
cmdline = [
|
||||
'--logger-log-level', 'raft_topology=trace',
|
||||
|
||||
@@ -22,11 +22,11 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.nightly
|
||||
async def test_banned_node_notification(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
async def test_banned_node_notification(manager: ManagerClient) -> None:
|
||||
"""Test that a node banned from the cluster get notification about been banned"""
|
||||
# Decrease the failure detector threshold so we don't have to wait for too long.
|
||||
config = {
|
||||
'failure_detector_timeout_in_ms': failure_detector_timeout
|
||||
'failure_detector_timeout_in_ms': 2000
|
||||
}
|
||||
srvs = await manager.servers_add(3, config=config, auto_rack_dc="dc")
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -60,14 +60,14 @@ async def make_servers(manager: ManagerClient, servers_num: int,
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raft_replace_ignore_nodes(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
async def test_raft_replace_ignore_nodes(manager: ManagerClient) -> None:
|
||||
"""Replace 3 dead nodes.
|
||||
|
||||
This is a slow test with a 7 node cluster and 3 replace operations,
|
||||
we want to run it only in dev mode.
|
||||
"""
|
||||
logger.info("Booting initial cluster")
|
||||
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': 2000})
|
||||
|
||||
s1_id = await manager.get_host_id(servers[1].server_id)
|
||||
s2_id = await manager.get_host_id(servers[2].server_id)
|
||||
|
||||
@@ -21,9 +21,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_different_ip(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
async def test_replace_different_ip(manager: ManagerClient) -> None:
|
||||
"""Replace an existing node with new node using a different IP address"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
logger.info(f"cluster started, servers {servers}")
|
||||
|
||||
logger.info(f"replacing server {servers[0]}")
|
||||
@@ -67,18 +67,18 @@ async def test_replace_different_ip(manager: ManagerClient, failure_detector_tim
|
||||
logger.info(f"server {s} system.peers and gossiper state is valid")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_different_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
|
||||
"""Replace an existing node with new node reusing the replaced node host id"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
|
||||
await manager.server_add(replace_cfg)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
|
||||
"""Replace an existing node with new node using the same IP address"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout}, auto_rack_dc="dc1")
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
|
||||
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
|
||||
|
||||
logger.info(f"creating test table")
|
||||
@@ -130,9 +130,9 @@ async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detecto
|
||||
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
|
||||
"""Replace an existing node with new node using the same IP address and same host id"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
|
||||
await manager.server_add(replace_cfg)
|
||||
|
||||
@@ -14,9 +14,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_with_same_ip_twice(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None:
|
||||
logger.info("starting a cluster with two nodes")
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
logger.info(f"cluster started {servers}")
|
||||
|
||||
async def replace_with_same_ip(s: ServerInfo) -> ServerInfo:
|
||||
|
||||
@@ -119,14 +119,14 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
|
||||
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage, failure_detector_timeout):
|
||||
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
|
||||
if fail_stage == 'cleanup' and fail_replica == 'destination':
|
||||
skip_env('Failing destination during cleanup is pointless')
|
||||
if fail_stage == 'cleanup_target' and fail_replica == 'source':
|
||||
skip_env('Failing source during target cleanup is pointless')
|
||||
|
||||
logger.info("Bootstrapping cluster")
|
||||
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': failure_detector_timeout}
|
||||
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
|
||||
host_ids = []
|
||||
servers = []
|
||||
|
||||
|
||||
@@ -37,6 +37,6 @@ class ServerInfo(NamedTuple):
|
||||
class ServerUpState(IntEnum):
|
||||
PROCESS_STARTED = auto()
|
||||
HOST_ID_QUERIED = auto()
|
||||
CQL_ALTERNATOR_CONNECTED = auto()
|
||||
CQL_ALTERNATOR_QUERIED = auto()
|
||||
CQL_CONNECTED = auto()
|
||||
CQL_QUERIED = auto()
|
||||
SERVING = auto() # Scylla sent sd_notify("serving")
|
||||
|
||||
@@ -389,7 +389,7 @@ class ManagerClient:
|
||||
seeds: list[IPAddress] | None = None,
|
||||
timeout: float | None = None,
|
||||
connect_driver: bool = True,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None,
|
||||
auth_provider: dict[str, str] | None = None) -> None:
|
||||
@@ -540,7 +540,7 @@ class ManagerClient:
|
||||
seeds: Optional[List[IPAddress]] = None,
|
||||
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT,
|
||||
server_encryption: str = "none",
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
connect_driver: bool = True) -> ServerInfo:
|
||||
"""Add a new server"""
|
||||
if expected_error is not None:
|
||||
|
||||
@@ -41,7 +41,6 @@ import yaml
|
||||
import signal
|
||||
import glob
|
||||
import errno
|
||||
import json
|
||||
import re
|
||||
import platform
|
||||
import contextlib
|
||||
@@ -558,7 +557,7 @@ class ScyllaServer:
|
||||
async def install_and_start(self,
|
||||
api: ScyllaRESTAPIClient,
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None:
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None:
|
||||
"""Setup and start this server."""
|
||||
|
||||
await self.install()
|
||||
@@ -678,63 +677,10 @@ class ScyllaServer:
|
||||
return None
|
||||
return maintenance_socket_option
|
||||
|
||||
def _alternator_ports(self) -> list[tuple[str, int]]:
|
||||
"""Return (scheme, port) for every configured Alternator port."""
|
||||
ports = []
|
||||
if "alternator_port" in self.config:
|
||||
ports.append(("http", self.config["alternator_port"]))
|
||||
if "alternator_https_port" in self.config:
|
||||
ports.append(("https", self.config["alternator_https_port"]))
|
||||
return ports
|
||||
async def get_cql_up_state(self) -> ServerUpState | None:
|
||||
"""Get the CQL up state (a check we use at start up).
|
||||
|
||||
async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool:
|
||||
"""TCP connect to every configured Alternator port.
|
||||
|
||||
Returns True if all ports accept connections.
|
||||
"""
|
||||
for _, port in ports:
|
||||
try:
|
||||
_, writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(self.ip_addr, port), timeout=2)
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
except (OSError, asyncio.TimeoutError):
|
||||
return False
|
||||
return True
|
||||
|
||||
async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool:
|
||||
"""Sends a GetItem for a randomly-named nonexistent table and validates
|
||||
that the response is a DynamoDB-shaped JSON error (contains __type),
|
||||
confirming Alternator is processing DynamoDB API requests.
|
||||
|
||||
Returns True if all ports respond correctly.
|
||||
"""
|
||||
table_name = f"nonexistent_table_{uuid.uuid4().hex}"
|
||||
headers = {
|
||||
"Content-Type": "application/x-amz-json-1.0",
|
||||
"X-Amz-Target": "DynamoDB_20120810.GetItem",
|
||||
}
|
||||
body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}})
|
||||
timeout = aiohttp.ClientTimeout(total=2)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
for scheme, port in ports:
|
||||
url = f"{scheme}://{self.ip_addr}:{port}/"
|
||||
try:
|
||||
# ssl=False skips certificate verification
|
||||
async with session.post(url, headers=headers, data=body, ssl=False) as resp:
|
||||
response_body = await resp.json(content_type=None)
|
||||
if "__type" not in response_body:
|
||||
return False
|
||||
except Exception as exc:
|
||||
self.logger.debug("Alternator query check failed for %s: %s", url, exc)
|
||||
return False
|
||||
return True
|
||||
|
||||
async def get_cql_up_state(self) -> tuple[bool, bool]:
|
||||
"""Check CQL connectivity.
|
||||
|
||||
Returns (connected, queried) indicating whether a CQL connection
|
||||
was established and whether a query executed successfully.
|
||||
Return None if it fails to connect.
|
||||
"""
|
||||
caslog = logging.getLogger('cassandra')
|
||||
oldlevel = caslog.getEffectiveLevel()
|
||||
@@ -762,7 +708,6 @@ class ScyllaServer:
|
||||
request_timeout=self.TOPOLOGY_TIMEOUT)
|
||||
contact_points=[self.rpc_address]
|
||||
connected = False
|
||||
cql_queried = False
|
||||
try:
|
||||
# In a cluster setup, it's possible that the CQL
|
||||
# here is directed to a node different from the initial contact
|
||||
@@ -785,32 +730,13 @@ class ScyllaServer:
|
||||
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
|
||||
auth_provider=self.auth_provider)
|
||||
self.control_connection = self.control_cluster.connect()
|
||||
cql_queried = True
|
||||
return ServerUpState.CQL_QUERIED
|
||||
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
|
||||
self.logger.debug("Exception when checking if CQL is up: %s", exc)
|
||||
return ServerUpState.CQL_CONNECTED if connected else None
|
||||
finally:
|
||||
caslog.setLevel(oldlevel)
|
||||
# Any other exception may indicate a problem, and is passed to the caller.
|
||||
return connected, cql_queried
|
||||
|
||||
async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]:
|
||||
connected = await self.check_alternator_connected(ports)
|
||||
queried = connected and await self.check_alternator_queried(ports)
|
||||
return connected, queried
|
||||
|
||||
async def get_cql_alternator_up_state(self) -> ServerUpState | None:
|
||||
"""Get the combined CQL + Alternator up state."""
|
||||
cql_connected, cql_queried = await self.get_cql_up_state()
|
||||
alt_connected, alt_queried = False, False
|
||||
alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator
|
||||
if alt_ports:
|
||||
alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports)
|
||||
if cql_queried and (alt_queried or not alt_ports):
|
||||
return ServerUpState.CQL_ALTERNATOR_QUERIED
|
||||
if not cql_connected or (alt_ports and not alt_connected):
|
||||
return None
|
||||
# Here both CQL and Alternator (if exists) are at least connected
|
||||
return ServerUpState.CQL_ALTERNATOR_CONNECTED
|
||||
|
||||
def _setup_notify_socket(self) -> None:
|
||||
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
|
||||
@@ -898,7 +824,7 @@ class ScyllaServer:
|
||||
async def start(self,
|
||||
api: ScyllaRESTAPIClient,
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None) -> None:
|
||||
"""Start an installed server.
|
||||
@@ -971,9 +897,9 @@ class ScyllaServer:
|
||||
if await self.try_get_host_id(api):
|
||||
if server_up_state == ServerUpState.PROCESS_STARTED:
|
||||
server_up_state = ServerUpState.HOST_ID_QUERIED
|
||||
server_up_state = await self.get_cql_alternator_up_state() or server_up_state
|
||||
server_up_state = await self.get_cql_up_state() or server_up_state
|
||||
# Check for SERVING state (sd_notify "serving" message)
|
||||
if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification():
|
||||
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
|
||||
server_up_state = ServerUpState.SERVING
|
||||
if server_up_state >= expected_server_up_state:
|
||||
if expected_error is not None:
|
||||
@@ -1268,7 +1194,7 @@ class ScyllaCluster:
|
||||
seeds: Optional[List[IPAddress]] = None,
|
||||
server_encryption: str = "none",
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo:
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo:
|
||||
"""Add a new server to the cluster"""
|
||||
self.is_dirty = True
|
||||
|
||||
@@ -1508,7 +1434,7 @@ class ScyllaCluster:
|
||||
server_id: ServerNum,
|
||||
expected_error: str | None = None,
|
||||
seeds: list[IPAddress] | None = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None,
|
||||
auth_provider: dict[str, str] | None = None) -> None:
|
||||
@@ -1991,7 +1917,7 @@ class ScyllaClusterManager:
|
||||
server_id=server_id,
|
||||
expected_error=data.get("expected_error"),
|
||||
seeds=data.get("seeds"),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
|
||||
cmdline_options_override=data.get("cmdline_options_override"),
|
||||
append_env_override=data.get("append_env_override"),
|
||||
auth_provider=data.get("auth_provider"),
|
||||
@@ -2026,7 +1952,7 @@ class ScyllaClusterManager:
|
||||
seeds=data.get("seeds"),
|
||||
server_encryption=data.get("server_encryption", "none"),
|
||||
expected_error=data.get("expected_error"),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
|
||||
)
|
||||
return s_info.as_dict()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user