Compare commits

..

1 Commits

Author SHA1 Message Date
Yaniv Michael Kaul
958b6ed166 compaction: set_skip_when_empty() for validation_errors metric
Add .set_skip_when_empty() to compaction_manager::validation_errors.
This metric only increments when scrubbing encounters out-of-order or
invalid mutation fragments in SSTables, indicating data corruption.
It is almost always zero and creates unnecessary reporting overhead.

AI-Assisted: yes
Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
2026-04-24 11:10:54 +03:00
21 changed files with 64 additions and 154 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.3.0-dev
VERSION=2026.2.0-dev
if test -f version
then

View File

@@ -1088,7 +1088,7 @@ void compaction_manager::register_metrics() {
sm::make_gauge("normalized_backlog", [this] { return _last_backlog / available_memory(); },
sm::description("Holds the sum of normalized compaction backlog for all tables in the system. Backlog is normalized by dividing backlog by shard's available memory.")),
sm::make_counter("validation_errors", [this] { return _validation_errors; },
sm::description("Holds the number of encountered validation errors.")),
sm::description("Holds the number of encountered validation errors.")).set_skip_when_empty(),
});
}

View File

@@ -45,7 +45,7 @@ Example:
.. code-block:: console
nodetool removenode 675ed9f4-6564-6dbd-ca08-43fddce952de
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
To only mark the node as permanently down without doing actual removal, use :doc:`nodetool excludenode </operating-scylla/nodetool-commands/excludenode>`:
@@ -79,6 +79,6 @@ Example:
.. code-block:: console
nodetool removenode --ignore-dead-nodes 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1db0-aac8-43fddce9123e 675ed9f4-6564-6dbd-ca08-43fddce952de
nodetool removenode --ignore-dead-nodes 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e 675ed9f4-6564-6dbd-can8-43fddce952gy
.. include:: nodetool-index.rst

View File

@@ -74,7 +74,7 @@ Procedure
-- Address Load Tokens Owns (effective) Host ID Rack
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
UJ 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
UJ 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
Nodes in the cluster finished streaming data to the new node:
@@ -86,7 +86,7 @@ Procedure
-- Address Load Tokens Owns (effective) Host ID Rack
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
#. When the new node status is Up Normal (UN), run the :doc:`nodetool cleanup </operating-scylla/nodetool-commands/cleanup>` command on all nodes in the cluster except for the new node that has just been added. Cleanup removes keys that were streamed to the newly added node and are no longer owned by the node.

View File

@@ -192,7 +192,7 @@ Adding new nodes
-- Address Load Tokens Owns Host ID Rack
UN 192.168.1.10 500 MB 256 33.3% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c RACK0
UN 192.168.1.11 500 MB 256 33.3% 125ed9f4-7777-1dbn-mac8-43fddce9123e RACK1
UN 192.168.1.12 500 MB 256 33.3% 675ed9f4-6564-6dbd-ca08-43fddce952de RACK2
UN 192.168.1.12 500 MB 256 33.3% 675ed9f4-6564-6dbd-can8-43fddce952gy RACK2
UJ 192.168.2.10 250 MB 256 ? a1b2c3d4-5678-90ab-cdef-112233445566 RACK0
**Example output after bootstrap completes:**
@@ -205,7 +205,7 @@ Adding new nodes
-- Address Load Tokens Owns Host ID Rack
UN 192.168.1.10 400 MB 256 25.0% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c RACK0
UN 192.168.1.11 400 MB 256 25.0% 125ed9f4-7777-1dbn-mac8-43fddce9123e RACK1
UN 192.168.1.12 400 MB 256 25.0% 675ed9f4-6564-6dbd-ca08-43fddce952de RACK2
UN 192.168.1.12 400 MB 256 25.0% 675ed9f4-6564-6dbd-can8-43fddce952gy RACK2
UN 192.168.2.10 400 MB 256 25.0% a1b2c3d4-5678-90ab-cdef-112233445566 RACK0
#. For tablets-enabled clusters, wait for tablet load balancing to complete.

View File

@@ -163,5 +163,5 @@ This example shows how to install and configure a three-node cluster using Gossi
-- Address Load Tokens Owns (effective) Host ID Rack
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c 43
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e 44
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de 45
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy 45

View File

@@ -19,7 +19,7 @@ Prerequisites
-- Address Load Tokens Owns (effective) Host ID Rack
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-lac8-23fddce9123e B1
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
Datacenter: ASIA-DC
Status=Up/Down
@@ -165,7 +165,7 @@ Procedure
-- Address Load Tokens Owns (effective) Host ID Rack
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
Datacenter: EUROPE-DC
Status=Up/Down

View File

@@ -18,7 +18,7 @@ Removing a Running Node
-- Address Load Tokens Owns (effective) Host ID Rack
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
#. If the node status is **Up Normal (UN)**, run the :doc:`nodetool decommission </operating-scylla/nodetool-commands/decommission>` command
to remove the node you are connected to. Using ``nodetool decommission`` is the recommended method for cluster scale-down operations. It prevents data loss
@@ -75,7 +75,7 @@ command providing the Host ID of the node you are removing. See :doc:`nodetool r
.. code-block:: console
nodetool removenode 675ed9f4-6564-6dbd-ca08-43fddce952de
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
The ``nodetool removenode`` command notifies other nodes that the token range it owns needs to be moved and
the nodes should redistribute the data using streaming. Using the command does not guarantee the consistency of the rebalanced data if

View File

@@ -23,7 +23,7 @@ Prerequisites
-- Address Load Tokens Owns (effective) Host ID Rack
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
DN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
Login to one of the nodes in the cluster with (UN) status, collect the following info from the node:

View File

@@ -29,7 +29,7 @@ Down (DN), and the node can be replaced.
-- Address Load Tokens Owns (effective) Host ID Rack
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
Remove the Data
==================
@@ -72,7 +72,7 @@ Procedure
For example (using the Host ID of the failed node from above):
``replace_node_first_boot: 675ed9f4-6564-6dbd-ca08-43fddce952de``
``replace_node_first_boot: 675ed9f4-6564-6dbd-can8-43fddce952gy``
#. Start the new node.
@@ -90,7 +90,7 @@ Procedure
-- Address Load Tokens Owns (effective) Host ID Rack
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
``192.168.1.203`` is the dead node.
@@ -121,7 +121,7 @@ Procedure
/192.168.1.203
generation:1553759866
heartbeat:2147483647
HOST_ID:675ed9f4-6564-6dbd-ca08-43fddce952de
HOST_ID:675ed9f4-6564-6dbd-can8-43fddce952gy
STATUS:shutdown,true
RELEASE_VERSION:3.0.8
X3:3
@@ -178,7 +178,7 @@ In this case, the node's data will be cleaned after restart. To remedy this, you
.. code-block:: none
echo 'replace_node_first_boot: 675ed9f4-6564-6dbd-ca08-43fddce952de' | sudo tee --append /etc/scylla/scylla.yaml
echo 'replace_node_first_boot: 675ed9f4-6564-6dbd-can8-43fddce952gy' | sudo tee --append /etc/scylla/scylla.yaml
#. Run the following command to re-setup RAID

View File

@@ -5171,16 +5171,8 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use a fixed partition key. The row-size thresholds below are chosen at exact
// byte boundaries of the MC sstable row encoding: the first clustering row body
// encodes prev_row_size as a vint, and prev_row_size includes the partition
// header (which contains the partition key's serialized length+bytes). A
// random-size partition key (as produced by simple_schema::make_pkey() /
// tests::generate_partition_key(), which default to key_size{1,128}) would
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
// comparison, making this test flaky. Under smp=1 (which this test runs with),
// a fixed key is always shard-local, so no sharding-metadata issue arises.
mutation partition = s.new_mutation("pv");
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5252,16 +5244,8 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use a fixed partition key. The cell-size thresholds below are chosen at exact
// byte boundaries of the MC sstable row encoding: the first clustering row body
// encodes prev_row_size as a vint, and prev_row_size includes the partition
// header (which contains the partition key's serialized length+bytes). A
// random-size partition key (as produced by simple_schema::make_pkey() /
// tests::generate_partition_key(), which default to key_size{1,128}) would
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
// comparison, making this test flaky. Under smp=1 (which this test runs with),
// a fixed key is always shard-local, so no sharding-metadata issue arises.
mutation partition = s.new_mutation("pv");
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5280,6 +5264,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
for (auto idx = 0; idx < rows - 1; idx++) {
@@ -5341,6 +5326,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
int live_rows = 0;
@@ -5450,6 +5436,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
std::map<bytes, bytes> kv_map;
for (auto i = 0; i < elements; i++) {
@@ -5525,6 +5512,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) {
// Create a mutation with a clustering row whose serialized cell value
// exceeds the 1-byte thresholds, so partition_size, row_size, and
// cell_size records are all generated.
// Use make_pkey() (no argument) to generate a key on this shard.
auto pk = ss.make_pkey();
mutation m(s, pk);
auto ck = ss.make_ckey("ck1");
@@ -5634,6 +5622,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) {
// Create 6 partitions, each with one row of increasing size.
// Since each partition has exactly one row, we get 6 row_size records
// competing for 3 slots.
// Use make_pkeys() to generate shard-local keys.
auto pkeys = ss.make_pkeys(6);
utils::chunked_vector<mutation> muts;
for (int i = 0; i < 6; i++) {

View File

@@ -18,7 +18,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from pathlib import Path
from typing import TYPE_CHECKING
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
from test import TOP_SRC_DIR, path_to
from test.pylib.random_tables import RandomTables
from test.pylib.skip_types import skip_env
from test.pylib.util import unique_name
@@ -394,8 +394,3 @@ async def key_provider(request, tmpdir, scylla_binary):
"""Encryption providers fixture"""
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
yield res
@pytest.fixture(scope="function")
def failure_detector_timeout(build_mode):
return 2000 * MODES_TIMEOUT_FACTOR[build_mode]

View File

@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
bootstrap, removenode, replace.
@@ -41,7 +41,7 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': failure_detector_timeout
'failure_detector_timeout_in_ms': 2000
}
cmdline = [
'--logger-log-level', 'raft_topology=trace',

View File

@@ -22,11 +22,11 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.nightly
async def test_banned_node_notification(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_banned_node_notification(manager: ManagerClient) -> None:
"""Test that a node banned from the cluster get notification about been banned"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': failure_detector_timeout
'failure_detector_timeout_in_ms': 2000
}
srvs = await manager.servers_add(3, config=config, auto_rack_dc="dc")
cql = manager.get_cql()

View File

@@ -60,14 +60,14 @@ async def make_servers(manager: ManagerClient, servers_num: int,
@pytest.mark.asyncio
async def test_raft_replace_ignore_nodes(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_raft_replace_ignore_nodes(manager: ManagerClient) -> None:
"""Replace 3 dead nodes.
This is a slow test with a 7 node cluster and 3 replace operations,
we want to run it only in dev mode.
"""
logger.info("Booting initial cluster")
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': 2000})
s1_id = await manager.get_host_id(servers[1].server_id)
s2_id = await manager.get_host_id(servers[2].server_id)

View File

@@ -21,9 +21,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_different_ip(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_different_ip(manager: ManagerClient) -> None:
"""Replace an existing node with new node using a different IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
logger.info(f"cluster started, servers {servers}")
logger.info(f"replacing server {servers[0]}")
@@ -67,18 +67,18 @@ async def test_replace_different_ip(manager: ManagerClient, failure_detector_tim
logger.info(f"server {s} system.peers and gossiper state is valid")
@pytest.mark.asyncio
async def test_replace_different_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node reusing the replaced node host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
@pytest.mark.asyncio
async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout}, auto_rack_dc="dc1")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
logger.info(f"creating test table")
@@ -130,9 +130,9 @@ async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detecto
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
@pytest.mark.asyncio
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address and same host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
await manager.server_add(replace_cfg)

View File

@@ -14,9 +14,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_with_same_ip_twice(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None:
logger.info("starting a cluster with two nodes")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
logger.info(f"cluster started {servers}")
async def replace_with_same_ip(s: ServerInfo) -> ServerInfo:

View File

@@ -119,14 +119,14 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage, failure_detector_timeout):
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
if fail_stage == 'cleanup' and fail_replica == 'destination':
skip_env('Failing destination during cleanup is pointless')
if fail_stage == 'cleanup_target' and fail_replica == 'source':
skip_env('Failing source during target cleanup is pointless')
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': failure_detector_timeout}
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
host_ids = []
servers = []

View File

@@ -37,6 +37,6 @@ class ServerInfo(NamedTuple):
class ServerUpState(IntEnum):
PROCESS_STARTED = auto()
HOST_ID_QUERIED = auto()
CQL_ALTERNATOR_CONNECTED = auto()
CQL_ALTERNATOR_QUERIED = auto()
CQL_CONNECTED = auto()
CQL_QUERIED = auto()
SERVING = auto() # Scylla sent sd_notify("serving")

View File

@@ -389,7 +389,7 @@ class ManagerClient:
seeds: list[IPAddress] | None = None,
timeout: float | None = None,
connect_driver: bool = True,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None,
auth_provider: dict[str, str] | None = None) -> None:
@@ -540,7 +540,7 @@ class ManagerClient:
seeds: Optional[List[IPAddress]] = None,
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT,
server_encryption: str = "none",
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
connect_driver: bool = True) -> ServerInfo:
"""Add a new server"""
if expected_error is not None:

View File

@@ -41,7 +41,6 @@ import yaml
import signal
import glob
import errno
import json
import re
import platform
import contextlib
@@ -558,7 +557,7 @@ class ScyllaServer:
async def install_and_start(self,
api: ScyllaRESTAPIClient,
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None:
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None:
"""Setup and start this server."""
await self.install()
@@ -678,63 +677,10 @@ class ScyllaServer:
return None
return maintenance_socket_option
def _alternator_ports(self) -> list[tuple[str, int]]:
"""Return (scheme, port) for every configured Alternator port."""
ports = []
if "alternator_port" in self.config:
ports.append(("http", self.config["alternator_port"]))
if "alternator_https_port" in self.config:
ports.append(("https", self.config["alternator_https_port"]))
return ports
async def get_cql_up_state(self) -> ServerUpState | None:
"""Get the CQL up state (a check we use at start up).
async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool:
"""TCP connect to every configured Alternator port.
Returns True if all ports accept connections.
"""
for _, port in ports:
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(self.ip_addr, port), timeout=2)
writer.close()
await writer.wait_closed()
except (OSError, asyncio.TimeoutError):
return False
return True
async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool:
"""Sends a GetItem for a randomly-named nonexistent table and validates
that the response is a DynamoDB-shaped JSON error (contains __type),
confirming Alternator is processing DynamoDB API requests.
Returns True if all ports respond correctly.
"""
table_name = f"nonexistent_table_{uuid.uuid4().hex}"
headers = {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.GetItem",
}
body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}})
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(timeout=timeout) as session:
for scheme, port in ports:
url = f"{scheme}://{self.ip_addr}:{port}/"
try:
# ssl=False skips certificate verification
async with session.post(url, headers=headers, data=body, ssl=False) as resp:
response_body = await resp.json(content_type=None)
if "__type" not in response_body:
return False
except Exception as exc:
self.logger.debug("Alternator query check failed for %s: %s", url, exc)
return False
return True
async def get_cql_up_state(self) -> tuple[bool, bool]:
"""Check CQL connectivity.
Returns (connected, queried) indicating whether a CQL connection
was established and whether a query executed successfully.
Return None if it fails to connect.
"""
caslog = logging.getLogger('cassandra')
oldlevel = caslog.getEffectiveLevel()
@@ -762,7 +708,6 @@ class ScyllaServer:
request_timeout=self.TOPOLOGY_TIMEOUT)
contact_points=[self.rpc_address]
connected = False
cql_queried = False
try:
# In a cluster setup, it's possible that the CQL
# here is directed to a node different from the initial contact
@@ -785,32 +730,13 @@ class ScyllaServer:
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
auth_provider=self.auth_provider)
self.control_connection = self.control_cluster.connect()
cql_queried = True
return ServerUpState.CQL_QUERIED
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
self.logger.debug("Exception when checking if CQL is up: %s", exc)
return ServerUpState.CQL_CONNECTED if connected else None
finally:
caslog.setLevel(oldlevel)
# Any other exception may indicate a problem, and is passed to the caller.
return connected, cql_queried
async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]:
connected = await self.check_alternator_connected(ports)
queried = connected and await self.check_alternator_queried(ports)
return connected, queried
async def get_cql_alternator_up_state(self) -> ServerUpState | None:
"""Get the combined CQL + Alternator up state."""
cql_connected, cql_queried = await self.get_cql_up_state()
alt_connected, alt_queried = False, False
alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator
if alt_ports:
alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports)
if cql_queried and (alt_queried or not alt_ports):
return ServerUpState.CQL_ALTERNATOR_QUERIED
if not cql_connected or (alt_ports and not alt_connected):
return None
# Here both CQL and Alternator (if exists) are at least connected
return ServerUpState.CQL_ALTERNATOR_CONNECTED
def _setup_notify_socket(self) -> None:
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
@@ -898,7 +824,7 @@ class ScyllaServer:
async def start(self,
api: ScyllaRESTAPIClient,
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None) -> None:
"""Start an installed server.
@@ -971,9 +897,9 @@ class ScyllaServer:
if await self.try_get_host_id(api):
if server_up_state == ServerUpState.PROCESS_STARTED:
server_up_state = ServerUpState.HOST_ID_QUERIED
server_up_state = await self.get_cql_alternator_up_state() or server_up_state
server_up_state = await self.get_cql_up_state() or server_up_state
# Check for SERVING state (sd_notify "serving" message)
if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification():
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
server_up_state = ServerUpState.SERVING
if server_up_state >= expected_server_up_state:
if expected_error is not None:
@@ -1268,7 +1194,7 @@ class ScyllaCluster:
seeds: Optional[List[IPAddress]] = None,
server_encryption: str = "none",
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo:
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo:
"""Add a new server to the cluster"""
self.is_dirty = True
@@ -1508,7 +1434,7 @@ class ScyllaCluster:
server_id: ServerNum,
expected_error: str | None = None,
seeds: list[IPAddress] | None = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None,
auth_provider: dict[str, str] | None = None) -> None:
@@ -1991,7 +1917,7 @@ class ScyllaClusterManager:
server_id=server_id,
expected_error=data.get("expected_error"),
seeds=data.get("seeds"),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
cmdline_options_override=data.get("cmdline_options_override"),
append_env_override=data.get("append_env_override"),
auth_provider=data.get("auth_provider"),
@@ -2026,7 +1952,7 @@ class ScyllaClusterManager:
seeds=data.get("seeds"),
server_encryption=data.get("server_encryption", "none"),
expected_error=data.get("expected_error"),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
)
return s_info.as_dict()