Compare commits

..

1 Commits

Author SHA1 Message Date
Yaniv Michael Kaul
958b6ed166 compaction: set_skip_when_empty() for validation_errors metric
Add .set_skip_when_empty() to compaction_manager::validation_errors.
This metric only increments when scrubbing encounters out-of-order or
invalid mutation fragments in SSTables, indicating data corruption.
It is almost always zero and creates unnecessary reporting overhead.

AI-Assisted: yes
Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
2026-04-24 11:10:54 +03:00
13 changed files with 47 additions and 137 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.2.0-rc0
VERSION=2026.2.0-dev
if test -f version
then

View File

@@ -1088,7 +1088,7 @@ void compaction_manager::register_metrics() {
sm::make_gauge("normalized_backlog", [this] { return _last_backlog / available_memory(); },
sm::description("Holds the sum of normalized compaction backlog for all tables in the system. Backlog is normalized by dividing backlog by shard's available memory.")),
sm::make_counter("validation_errors", [this] { return _validation_errors; },
sm::description("Holds the number of encountered validation errors.")),
sm::description("Holds the number of encountered validation errors.")).set_skip_when_empty(),
});
}

View File

@@ -5171,16 +5171,8 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use a fixed partition key. The row-size thresholds below are chosen at exact
// byte boundaries of the MC sstable row encoding: the first clustering row body
// encodes prev_row_size as a vint, and prev_row_size includes the partition
// header (which contains the partition key's serialized length+bytes). A
// random-size partition key (as produced by simple_schema::make_pkey() /
// tests::generate_partition_key(), which default to key_size{1,128}) would
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
// comparison, making this test flaky. Under smp=1 (which this test runs with),
// a fixed key is always shard-local, so no sharding-metadata issue arises.
mutation partition = s.new_mutation("pv");
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5252,16 +5244,8 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use a fixed partition key. The cell-size thresholds below are chosen at exact
// byte boundaries of the MC sstable row encoding: the first clustering row body
// encodes prev_row_size as a vint, and prev_row_size includes the partition
// header (which contains the partition key's serialized length+bytes). A
// random-size partition key (as produced by simple_schema::make_pkey() /
// tests::generate_partition_key(), which default to key_size{1,128}) would
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
// comparison, making this test flaky. Under smp=1 (which this test runs with),
// a fixed key is always shard-local, so no sharding-metadata issue arises.
mutation partition = s.new_mutation("pv");
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5280,6 +5264,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
for (auto idx = 0; idx < rows - 1; idx++) {
@@ -5341,6 +5326,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
int live_rows = 0;
@@ -5450,6 +5436,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
tests::reader_concurrency_semaphore_wrapper semaphore;
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
std::map<bytes, bytes> kv_map;
for (auto i = 0; i < elements; i++) {
@@ -5525,6 +5512,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) {
// Create a mutation with a clustering row whose serialized cell value
// exceeds the 1-byte thresholds, so partition_size, row_size, and
// cell_size records are all generated.
// Use make_pkey() (no argument) to generate a key on this shard.
auto pk = ss.make_pkey();
mutation m(s, pk);
auto ck = ss.make_ckey("ck1");
@@ -5634,6 +5622,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) {
// Create 6 partitions, each with one row of increasing size.
// Since each partition has exactly one row, we get 6 row_size records
// competing for 3 slots.
// Use make_pkeys() to generate shard-local keys.
auto pkeys = ss.make_pkeys(6);
utils::chunked_vector<mutation> muts;
for (int i = 0; i < 6; i++) {

View File

@@ -18,7 +18,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from pathlib import Path
from typing import TYPE_CHECKING
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
from test import TOP_SRC_DIR, path_to
from test.pylib.random_tables import RandomTables
from test.pylib.skip_types import skip_env
from test.pylib.util import unique_name
@@ -394,8 +394,3 @@ async def key_provider(request, tmpdir, scylla_binary):
"""Encryption providers fixture"""
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
yield res
@pytest.fixture(scope="function")
def failure_detector_timeout(build_mode):
return 2000 * MODES_TIMEOUT_FACTOR[build_mode]

View File

@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
bootstrap, removenode, replace.
@@ -41,7 +41,7 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': failure_detector_timeout
'failure_detector_timeout_in_ms': 2000
}
cmdline = [
'--logger-log-level', 'raft_topology=trace',

View File

@@ -22,11 +22,11 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.nightly
async def test_banned_node_notification(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_banned_node_notification(manager: ManagerClient) -> None:
"""Test that a node banned from the cluster get notification about been banned"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': failure_detector_timeout
'failure_detector_timeout_in_ms': 2000
}
srvs = await manager.servers_add(3, config=config, auto_rack_dc="dc")
cql = manager.get_cql()

View File

@@ -60,14 +60,14 @@ async def make_servers(manager: ManagerClient, servers_num: int,
@pytest.mark.asyncio
async def test_raft_replace_ignore_nodes(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_raft_replace_ignore_nodes(manager: ManagerClient) -> None:
"""Replace 3 dead nodes.
This is a slow test with a 7 node cluster and 3 replace operations,
we want to run it only in dev mode.
"""
logger.info("Booting initial cluster")
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': 2000})
s1_id = await manager.get_host_id(servers[1].server_id)
s2_id = await manager.get_host_id(servers[2].server_id)

View File

@@ -21,9 +21,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_different_ip(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_different_ip(manager: ManagerClient) -> None:
"""Replace an existing node with new node using a different IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
logger.info(f"cluster started, servers {servers}")
logger.info(f"replacing server {servers[0]}")
@@ -67,18 +67,18 @@ async def test_replace_different_ip(manager: ManagerClient, failure_detector_tim
logger.info(f"server {s} system.peers and gossiper state is valid")
@pytest.mark.asyncio
async def test_replace_different_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node reusing the replaced node host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
@pytest.mark.asyncio
async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout}, auto_rack_dc="dc1")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
logger.info(f"creating test table")
@@ -130,9 +130,9 @@ async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detecto
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
@pytest.mark.asyncio
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address and same host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
await manager.server_add(replace_cfg)

View File

@@ -14,9 +14,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_with_same_ip_twice(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None:
logger.info("starting a cluster with two nodes")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
logger.info(f"cluster started {servers}")
async def replace_with_same_ip(s: ServerInfo) -> ServerInfo:

View File

@@ -119,14 +119,14 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage, failure_detector_timeout):
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
if fail_stage == 'cleanup' and fail_replica == 'destination':
skip_env('Failing destination during cleanup is pointless')
if fail_stage == 'cleanup_target' and fail_replica == 'source':
skip_env('Failing source during target cleanup is pointless')
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': failure_detector_timeout}
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
host_ids = []
servers = []

View File

@@ -37,6 +37,6 @@ class ServerInfo(NamedTuple):
class ServerUpState(IntEnum):
PROCESS_STARTED = auto()
HOST_ID_QUERIED = auto()
CQL_ALTERNATOR_CONNECTED = auto()
CQL_ALTERNATOR_QUERIED = auto()
CQL_CONNECTED = auto()
CQL_QUERIED = auto()
SERVING = auto() # Scylla sent sd_notify("serving")

View File

@@ -389,7 +389,7 @@ class ManagerClient:
seeds: list[IPAddress] | None = None,
timeout: float | None = None,
connect_driver: bool = True,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None,
auth_provider: dict[str, str] | None = None) -> None:
@@ -540,7 +540,7 @@ class ManagerClient:
seeds: Optional[List[IPAddress]] = None,
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT,
server_encryption: str = "none",
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
connect_driver: bool = True) -> ServerInfo:
"""Add a new server"""
if expected_error is not None:

View File

@@ -41,7 +41,6 @@ import yaml
import signal
import glob
import errno
import json
import re
import platform
import contextlib
@@ -558,7 +557,7 @@ class ScyllaServer:
async def install_and_start(self,
api: ScyllaRESTAPIClient,
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None:
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None:
"""Setup and start this server."""
await self.install()
@@ -678,63 +677,10 @@ class ScyllaServer:
return None
return maintenance_socket_option
def _alternator_ports(self) -> list[tuple[str, int]]:
"""Return (scheme, port) for every configured Alternator port."""
ports = []
if "alternator_port" in self.config:
ports.append(("http", self.config["alternator_port"]))
if "alternator_https_port" in self.config:
ports.append(("https", self.config["alternator_https_port"]))
return ports
async def get_cql_up_state(self) -> ServerUpState | None:
"""Get the CQL up state (a check we use at start up).
async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool:
"""TCP connect to every configured Alternator port.
Returns True if all ports accept connections.
"""
for _, port in ports:
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(self.ip_addr, port), timeout=2)
writer.close()
await writer.wait_closed()
except (OSError, asyncio.TimeoutError):
return False
return True
async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool:
"""Sends a GetItem for a randomly-named nonexistent table and validates
that the response is a DynamoDB-shaped JSON error (contains __type),
confirming Alternator is processing DynamoDB API requests.
Returns True if all ports respond correctly.
"""
table_name = f"nonexistent_table_{uuid.uuid4().hex}"
headers = {
"Content-Type": "application/x-amz-json-1.0",
"X-Amz-Target": "DynamoDB_20120810.GetItem",
}
body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}})
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(timeout=timeout) as session:
for scheme, port in ports:
url = f"{scheme}://{self.ip_addr}:{port}/"
try:
# ssl=False skips certificate verification
async with session.post(url, headers=headers, data=body, ssl=False) as resp:
response_body = await resp.json(content_type=None)
if "__type" not in response_body:
return False
except Exception as exc:
self.logger.debug("Alternator query check failed for %s: %s", url, exc)
return False
return True
async def get_cql_up_state(self) -> tuple[bool, bool]:
"""Check CQL connectivity.
Returns (connected, queried) indicating whether a CQL connection
was established and whether a query executed successfully.
Return None if it fails to connect.
"""
caslog = logging.getLogger('cassandra')
oldlevel = caslog.getEffectiveLevel()
@@ -762,7 +708,6 @@ class ScyllaServer:
request_timeout=self.TOPOLOGY_TIMEOUT)
contact_points=[self.rpc_address]
connected = False
cql_queried = False
try:
# In a cluster setup, it's possible that the CQL
# here is directed to a node different from the initial contact
@@ -785,32 +730,13 @@ class ScyllaServer:
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
auth_provider=self.auth_provider)
self.control_connection = self.control_cluster.connect()
cql_queried = True
return ServerUpState.CQL_QUERIED
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
self.logger.debug("Exception when checking if CQL is up: %s", exc)
return ServerUpState.CQL_CONNECTED if connected else None
finally:
caslog.setLevel(oldlevel)
# Any other exception may indicate a problem, and is passed to the caller.
return connected, cql_queried
async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]:
connected = await self.check_alternator_connected(ports)
queried = connected and await self.check_alternator_queried(ports)
return connected, queried
async def get_cql_alternator_up_state(self) -> ServerUpState | None:
"""Get the combined CQL + Alternator up state."""
cql_connected, cql_queried = await self.get_cql_up_state()
alt_connected, alt_queried = False, False
alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator
if alt_ports:
alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports)
if cql_queried and (alt_queried or not alt_ports):
return ServerUpState.CQL_ALTERNATOR_QUERIED
if not cql_connected or (alt_ports and not alt_connected):
return None
# Here both CQL and Alternator (if exists) are at least connected
return ServerUpState.CQL_ALTERNATOR_CONNECTED
def _setup_notify_socket(self) -> None:
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
@@ -898,7 +824,7 @@ class ScyllaServer:
async def start(self,
api: ScyllaRESTAPIClient,
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None) -> None:
"""Start an installed server.
@@ -971,9 +897,9 @@ class ScyllaServer:
if await self.try_get_host_id(api):
if server_up_state == ServerUpState.PROCESS_STARTED:
server_up_state = ServerUpState.HOST_ID_QUERIED
server_up_state = await self.get_cql_alternator_up_state() or server_up_state
server_up_state = await self.get_cql_up_state() or server_up_state
# Check for SERVING state (sd_notify "serving" message)
if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification():
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
server_up_state = ServerUpState.SERVING
if server_up_state >= expected_server_up_state:
if expected_error is not None:
@@ -1268,7 +1194,7 @@ class ScyllaCluster:
seeds: Optional[List[IPAddress]] = None,
server_encryption: str = "none",
expected_error: Optional[str] = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo:
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo:
"""Add a new server to the cluster"""
self.is_dirty = True
@@ -1508,7 +1434,7 @@ class ScyllaCluster:
server_id: ServerNum,
expected_error: str | None = None,
seeds: list[IPAddress] | None = None,
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
cmdline_options_override: list[str] | None = None,
append_env_override: dict[str, str] | None = None,
auth_provider: dict[str, str] | None = None) -> None:
@@ -1991,7 +1917,7 @@ class ScyllaClusterManager:
server_id=server_id,
expected_error=data.get("expected_error"),
seeds=data.get("seeds"),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
cmdline_options_override=data.get("cmdline_options_override"),
append_env_override=data.get("append_env_override"),
auth_provider=data.get("auth_provider"),
@@ -2026,7 +1952,7 @@ class ScyllaClusterManager:
seeds=data.get("seeds"),
server_encryption=data.get("server_encryption", "none"),
expected_error=data.get("expected_error"),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
)
return s_info.as_dict()