Compare commits

..

1 Commits

Author SHA1 Message Date
Marcin Maliszkiewicz
90027db532 gms/gossiper: fix use-after-move in do_send_ack2_msg
The second logger.debug() call on line 405 accesses ack2_msg after
it was moved via std::move() in the co_await call on line 404.
This is undefined behavior.

Fix by formatting ack2_msg to a string before the move, then using
that cached string in both debug log calls.
2026-03-25 13:04:05 +02:00
14 changed files with 88 additions and 191 deletions

View File

@@ -400,9 +400,10 @@ future<> gossiper::do_send_ack2_msg(locator::host_id from, utils::chunked_vector
}
}
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
auto ack2_msg_str = fmt::format("{}", ack2_msg);
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
co_await ser::gossip_rpc_verbs::send_gossip_digest_ack2(&_messaging, from, std::move(ack2_msg));
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
}
// Depends on

View File

@@ -42,14 +42,7 @@ void everywhere_replication_strategy::validate_options(const gms::feature_servic
sstring everywhere_replication_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) const {
const auto replication_factor = erm.get_replication_factor();
if (const auto& topo_info = erm.get_token_metadata().get_topology_change_info(); topo_info && topo_info->read_new) {
if (read_replicas.size() > replication_factor + 1) {
return seastar::format(
"everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, "
"cannot be higher than replication factor {} + 1 during the 'read from new replicas' stage of a topology change",
read_replicas.size(), replication_factor);
}
} else if (read_replicas.size() > replication_factor) {
if (read_replicas.size() > replication_factor) {
return seastar::format("everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, cannot be higher than replication factor {}", read_replicas.size(), replication_factor);
}
return {};

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:d424ce6cc7f65338c34dd35881d23f5ad3425651d66e47dc2c3a20dc798848d4
size 6598648
oid sha256:34a0955d2c5a88e18ddab0f1df085e10a17e14129c3e21de91e4f27ef949b6c4
size 6502668

View File

@@ -87,11 +87,6 @@ target_include_directories(wasmtime_bindings
target_link_libraries(wasmtime_bindings
INTERFACE Rust::rust_combined)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
# The PCH from scylla-precompiled-header is compiled with Seastar's compile
# flags, including sanitizer flags in Debug/Sanitize modes. Any target reusing
# this PCH must have matching compile options, otherwise the compiler rejects
# the PCH due to flag mismatch (e.g., -fsanitize=address).
target_link_libraries(wasmtime_bindings PRIVATE Seastar::seastar)
target_precompile_headers(wasmtime_bindings REUSE_FROM scylla-precompiled-header)
endif()
@@ -113,6 +108,5 @@ target_include_directories(inc
target_link_libraries(inc
INTERFACE Rust::rust_combined)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_link_libraries(inc PRIVATE Seastar::seastar)
target_precompile_headers(inc REUSE_FROM scylla-precompiled-header)
endif()

View File

@@ -181,7 +181,7 @@ def parse_cmd_line() -> argparse.Namespace:
help="Run only tests for given build mode(s)")
parser.add_argument('--repeat', action="store", default="1", type=int,
help="number of times to repeat test execution")
parser.add_argument('--timeout', action="store", default="3600", type=int,
parser.add_argument('--timeout', action="store", default="24000", type=int,
help="timeout value for single test execution")
parser.add_argument('--session-timeout', action="store", default="24000", type=int,
help="timeout value for test.py/pytest session execution")

View File

@@ -511,7 +511,8 @@ class AuditBackendComposite(AuditBackend):
return rows_dict
class CQLAuditTester(AuditTester):
@pytest.mark.single_node
class TestCQLAudit(AuditTester):
"""
Make sure CQL statements are audited
"""
@@ -1762,7 +1763,7 @@ class CQLAuditTester(AuditTester):
async def test_audit_table_noauth(manager: ManagerClient):
"""Table backend, no auth, single node — groups all tests that share this config."""
t = CQLAuditTester(manager)
t = TestCQLAudit(manager)
await t.test_using_non_existent_keyspace(AuditBackendTable)
await t.test_audit_keyspace(AuditBackendTable)
await t.test_audit_keyspace_extra_parameter(AuditBackendTable)
@@ -1786,7 +1787,7 @@ async def test_audit_table_noauth(manager: ManagerClient):
async def test_audit_table_auth(manager: ManagerClient):
"""Table backend, auth enabled, single node."""
t = CQLAuditTester(manager)
t = TestCQLAudit(manager)
await t.test_user_password_masking(AuditBackendTable)
await t.test_negative_audit_records_auth()
await t.test_negative_audit_records_admin()
@@ -1802,7 +1803,7 @@ async def test_audit_table_auth(manager: ManagerClient):
async def test_audit_table_auth_multinode(manager: ManagerClient):
"""Table backend, auth enabled, multi-node (rf=3)."""
t = CQLAuditTester(manager)
t = TestCQLAudit(manager)
await t.test_negative_audit_records_ddl()
@@ -1810,49 +1811,49 @@ async def test_audit_table_auth_multinode(manager: ManagerClient):
async def test_audit_type_none_standalone(manager: ManagerClient):
"""audit=None — verify no auditing occurs."""
await CQLAuditTester(manager).test_audit_type_none()
await TestCQLAudit(manager).test_audit_type_none()
async def test_audit_type_invalid_standalone(manager: ManagerClient):
"""audit=invalid — server should fail to start."""
await CQLAuditTester(manager).test_audit_type_invalid()
await TestCQLAudit(manager).test_audit_type_invalid()
async def test_composite_audit_type_invalid_standalone(manager: ManagerClient):
"""audit=table,syslog,invalid — server should fail to start."""
await CQLAuditTester(manager).test_composite_audit_type_invalid()
await TestCQLAudit(manager).test_composite_audit_type_invalid()
async def test_audit_empty_settings_standalone(manager: ManagerClient):
"""audit=none — verify no auditing occurs."""
await CQLAuditTester(manager).test_audit_empty_settings()
await TestCQLAudit(manager).test_audit_empty_settings()
async def test_composite_audit_empty_settings_standalone(manager: ManagerClient):
"""audit=table,syslog,none — verify no auditing occurs."""
await CQLAuditTester(manager).test_composite_audit_empty_settings()
await TestCQLAudit(manager).test_composite_audit_empty_settings()
async def test_audit_categories_invalid_standalone(manager: ManagerClient):
"""Invalid audit_categories — server should fail to start."""
await CQLAuditTester(manager).test_audit_categories_invalid()
await TestCQLAudit(manager).test_audit_categories_invalid()
async def test_insert_failure_standalone(manager: ManagerClient):
"""7-node topology, audit=table, no auth — standalone due to unique topology."""
await CQLAuditTester(manager).test_insert_failure_doesnt_report_success()
await TestCQLAudit(manager).test_insert_failure_doesnt_report_success()
async def test_service_level_statements_standalone(manager: ManagerClient):
"""audit=table, auth, cmdline=--smp 1 — standalone due to special cmdline."""
await CQLAuditTester(manager).test_service_level_statements()
await TestCQLAudit(manager).test_service_level_statements()
# AuditBackendSyslog, no auth, rf=1
async def test_audit_syslog_noauth(manager: ManagerClient):
"""Syslog backend, no auth, single node."""
t = CQLAuditTester(manager)
t = TestCQLAudit(manager)
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
await t.test_using_non_existent_keyspace(Syslog)
await t.test_audit_keyspace(Syslog)
@@ -1869,7 +1870,7 @@ async def test_audit_syslog_noauth(manager: ManagerClient):
async def test_audit_syslog_auth(manager: ManagerClient):
"""Syslog backend, auth enabled, single node."""
t = CQLAuditTester(manager)
t = TestCQLAudit(manager)
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
await t.test_user_password_masking(Syslog)
await t.test_role_password_masking(Syslog)
@@ -1880,7 +1881,7 @@ async def test_audit_syslog_auth(manager: ManagerClient):
async def test_audit_composite_noauth(manager: ManagerClient):
"""Composite backend (table+syslog), no auth, single node."""
t = CQLAuditTester(manager)
t = TestCQLAudit(manager)
Composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_path)
await t.test_using_non_existent_keyspace(Composite)
await t.test_audit_keyspace(Composite)
@@ -1897,7 +1898,7 @@ async def test_audit_composite_noauth(manager: ManagerClient):
async def test_audit_composite_auth(manager: ManagerClient):
"""Composite backend (table+syslog), auth enabled, single node."""
t = CQLAuditTester(manager)
t = TestCQLAudit(manager)
Composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_path)
await t.test_user_password_masking(Composite)
await t.test_role_password_masking(Composite)
@@ -1909,29 +1910,29 @@ _composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_
@pytest.mark.parametrize("helper_class,config_changer", [
pytest.param(AuditBackendTable, CQLAuditTester.AuditSighupConfigChanger, id="table-sighup"),
pytest.param(AuditBackendTable, CQLAuditTester.AuditCqlConfigChanger, id="table-cql"),
pytest.param(_syslog, CQLAuditTester.AuditSighupConfigChanger, id="syslog-sighup"),
pytest.param(_syslog, CQLAuditTester.AuditCqlConfigChanger, id="syslog-cql"),
pytest.param(_composite, CQLAuditTester.AuditSighupConfigChanger, id="composite-sighup"),
pytest.param(_composite, CQLAuditTester.AuditCqlConfigChanger, id="composite-cql"),
pytest.param(AuditBackendTable, TestCQLAudit.AuditSighupConfigChanger, id="table-sighup"),
pytest.param(AuditBackendTable, TestCQLAudit.AuditCqlConfigChanger, id="table-cql"),
pytest.param(_syslog, TestCQLAudit.AuditSighupConfigChanger, id="syslog-sighup"),
pytest.param(_syslog, TestCQLAudit.AuditCqlConfigChanger, id="syslog-cql"),
pytest.param(_composite, TestCQLAudit.AuditSighupConfigChanger, id="composite-sighup"),
pytest.param(_composite, TestCQLAudit.AuditCqlConfigChanger, id="composite-cql"),
])
async def test_config_no_liveupdate(manager: ManagerClient, helper_class, config_changer):
"""Non-live audit config params (audit, audit_unix_socket_path, audit_syslog_write_buffer_size) must be unmodifiable."""
await CQLAuditTester(manager).test_config_no_liveupdate(helper_class, config_changer)
await TestCQLAudit(manager).test_config_no_liveupdate(helper_class, config_changer)
@pytest.mark.parametrize("helper_class,config_changer", [
pytest.param(AuditBackendTable, CQLAuditTester.AuditSighupConfigChanger, id="table-sighup"),
pytest.param(AuditBackendTable, CQLAuditTester.AuditCqlConfigChanger, id="table-cql"),
pytest.param(_syslog, CQLAuditTester.AuditSighupConfigChanger, id="syslog-sighup"),
pytest.param(_syslog, CQLAuditTester.AuditCqlConfigChanger, id="syslog-cql"),
pytest.param(_composite, CQLAuditTester.AuditSighupConfigChanger, id="composite-sighup"),
pytest.param(_composite, CQLAuditTester.AuditCqlConfigChanger, id="composite-cql"),
pytest.param(AuditBackendTable, TestCQLAudit.AuditSighupConfigChanger, id="table-sighup"),
pytest.param(AuditBackendTable, TestCQLAudit.AuditCqlConfigChanger, id="table-cql"),
pytest.param(_syslog, TestCQLAudit.AuditSighupConfigChanger, id="syslog-sighup"),
pytest.param(_syslog, TestCQLAudit.AuditCqlConfigChanger, id="syslog-cql"),
pytest.param(_composite, TestCQLAudit.AuditSighupConfigChanger, id="composite-sighup"),
pytest.param(_composite, TestCQLAudit.AuditCqlConfigChanger, id="composite-cql"),
])
async def test_config_liveupdate(manager: ManagerClient, helper_class, config_changer):
"""Live-updatable audit config params (categories, keyspaces, tables) must be modifiable at runtime."""
await CQLAuditTester(manager).test_config_liveupdate(helper_class, config_changer)
await TestCQLAudit(manager).test_config_liveupdate(helper_class, config_changer)
@pytest.mark.parametrize("helper_class", [
@@ -1941,4 +1942,4 @@ async def test_config_liveupdate(manager: ManagerClient, helper_class, config_ch
])
async def test_parallel_syslog_audit(manager: ManagerClient, helper_class):
"""Cluster must not fail when multiple queries are audited in parallel."""
await CQLAuditTester(manager).test_parallel_syslog_audit(helper_class)
await TestCQLAudit(manager).test_parallel_syslog_audit(helper_class)

View File

@@ -17,9 +17,9 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
from test.pylib.tablets import get_tablet_replicas
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.util import gather_safely, wait_for
from test.pylib.util import wait_for
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, keyspace_has_tablets, new_test_keyspace, new_test_table
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace
logger = logging.getLogger(__name__)
@@ -51,42 +51,28 @@ async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_poi
@pytest.mark.asyncio
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
node_count = 2
cmdline = ["--logger-log-level", "hints_manager=trace"]
servers = await manager.servers_add(node_count, cmdline=cmdline)
async def wait_for_hints_written(min_hint_count: int, timeout: int):
async def aux():
hints_written = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
if hints_written >= min_hint_count:
return True
return None
assert await wait_for(aux, time.time() + timeout)
servers = await manager.servers_add(node_count)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
uses_tablets = await keyspace_has_tablets(manager, ks)
# If the keyspace uses tablets, let's explicitly require the table to use multiple tablets.
# Otherwise, it could happen that all mutations would target servers[0] only, which would
# ultimately lead to a test failure here. We rely on the assumption that mutations will be
# distributed more or less uniformly!
extra_opts = "WITH tablets = {'min_tablet_count': 16}" if uses_tablets else ""
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", extra_opts) as table:
await manager.server_stop_gracefully(servers[1].server_id)
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
await manager.server_stop_gracefully(servers[1].server_id)
stmt = cql.prepare(f"INSERT INTO {table} (pk, v) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ANY
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
await gather_safely(*[cql.run_async(stmt, (i, i + 1)) for i in range(100)])
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
for i in range(100):
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i+1})", consistency_level=ConsistencyLevel.ANY))
# Verify hints are written
await wait_for_hints_written(hints_before + 1, timeout=60)
# Verify hints are written
hints_after = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
assert hints_after > hints_before
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
@pytest.mark.asyncio
async def test_limited_concurrency_of_writes(manager: ManagerClient):

View File

@@ -151,7 +151,7 @@ async def trigger_tablet_merge(manager, servers, logs):
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
repaired_keys = set(range(0, nr_keys))
unrepaired_keys = set()
@@ -164,7 +164,7 @@ async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdlin
@pytest.mark.asyncio
async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient):
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
await insert_keys(cql, ks, 0, 100)
@@ -274,7 +274,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline=['--logger-log-level', 'compaction=debug'])
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
token = -1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
@@ -335,7 +335,7 @@ async def test_tablet_incremental_repair_and_major(manager: ManagerClient):
@pytest.mark.asyncio
async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# Disable autocompaction
for server in servers:
@@ -381,7 +381,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_split, do_merge):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -442,7 +442,7 @@ async def test_tablet_incremental_repair_with_merge(manager: ManagerClient):
async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(manager: ManagerClient):
nr_keys = 100
cmdline = ["--hinted-handoff-enabled", "0"]
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
await manager.server_stop_gracefully(servers[1].server_id)
@@ -466,7 +466,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -507,7 +507,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_merge(manager):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -541,7 +541,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
nr_keys = 100
# Make sure no data commit log replay after force server stop
cmdline = ['--enable-commitlog', '0']
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -587,7 +587,7 @@ async def test_tablet_incremental_repair_merge_error_in_merge_completion_fiber(m
@pytest.mark.asyncio
async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
token = -1
sstables_repaired_at = 0
@@ -632,7 +632,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
@pytest.mark.asyncio
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
servers, _, _, ks, _, _, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
time1 = 0
time2 = 0
@@ -820,7 +820,7 @@ async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manager: ManagerClient):
cmdline = ['--logger-log-level', 'repair=debug']
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await prepare_cluster_for_incremental_repair(manager, cmdline=cmdline)
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await preapre_cluster_for_incremental_repair(manager, cmdline=cmdline)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)

View File

@@ -20,7 +20,6 @@ from cassandra.query import SimpleStatement
from test.pylib.async_cql import _wrap_future
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables, TextType, Column
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name
from test.cluster.conftest import cluster_con
@@ -404,7 +403,6 @@ async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
for task in [*valid_keyspaces, *invalid_keyspaces]:
_ = tg.create_task(task)
@pytest.mark.asyncio
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager: ManagerClient):
"""
This test verifies that starting a Scylla node fails when there's an RF-rack-invalid keyspace.
@@ -466,50 +464,22 @@ async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager:
for rfs, tablets in valid_keyspaces:
_ = tg.create_task(create_keyspace(rfs, tablets))
# Precondition: s1 has rf_rack_valid_keyspaces set to false.
# Postcondition: s1 still has rf_rack_valid_keyspaces set to false.
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
async def try_fail(rfs: List[int], dc: str, rf: int, rack_count: int):
running_servers = await manager.running_servers()
should_start = s1.server_id not in [server.server_id for server in running_servers]
if should_start:
await manager.server_start(s1.server_id)
ks = await create_keyspace(rfs, True)
# We need to wait for the new schema to propagate.
# Otherwise, it's not clear when the mutation
# corresponding to the created keyspace will
# arrive at server 1.
# It could happen only after the node performs
# the check upon start-up, effectively leading
# to a successful start-up, which we don't want.
# For more context, see issue: SCYLLADB-1137.
await read_barrier(manager.api, s1.ip_addr)
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
err = f"The keyspace '{ks}' is required to be RF-rack-valid. " \
f"That condition is violated for DC '{dc}': RF={rf} vs. rack count={rack_count}."
await manager.server_start(s1.server_id, expected_error=err)
_ = await manager.server_start(s1.server_id, expected_error=err)
await cql.run_async(f"DROP KEYSPACE {ks}")
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "false")
# Test RF-rack-invalid keyspaces.
await try_fail([2, 0], "dc1", 2, 3)
await try_fail([3, 2], "dc2", 2, 1)
await try_fail([4, 1], "dc1", 4, 3)
# We need to perform a read barrier on the node to make
# sure that it processes the last DROP KEYSPACE.
# Otherwise, the node could think the RF-rack-invalid
# keyspace still exists.
await manager.server_start(s1.server_id)
await read_barrier(manager.api, s1.ip_addr)
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
await manager.server_start(s1.server_id)
_ = await manager.server_start(s1.server_id)
@pytest.mark.asyncio
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces_but_not_enforced(manager: ManagerClient):

View File

@@ -23,25 +23,10 @@ from test.cluster.object_store.conftest import format_tuples
from test.cluster.object_store.test_backup import topo, take_snapshot, do_test_streaming_scopes
from test.cluster.util import new_test_keyspace
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name, wait_for
from test.pylib.util import unique_name
logger = logging.getLogger(__name__)
async def wait_for_upload_dir_empty(upload_dir, timeout=30):
'''
Wait until the upload directory is empty with a timeout.
SSTable unlinking is asynchronous and in rare situations, it can happen
that not all sstables are deleted from the upload dir immediately after refresh is done.
'''
deadline = time.time() + timeout
async def check_empty():
files = os.listdir(upload_dir)
if not files:
return True
return None
await wait_for(check_empty, deadline, period=0.5)
class SSTablesOnLocalStorage:
def __init__(self):
self.tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
@@ -168,8 +153,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
for s in servers:
cf_dir = dirs[s.server_id]["cf_dir"]
upload_dir = os.path.join(cf_dir, 'upload')
assert os.path.exists(upload_dir)
await wait_for_upload_dir_empty(upload_dir)
files = os.listdir(os.path.join(cf_dir, 'upload'))
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
shutil.rmtree(tmpbackup)

View File

@@ -196,7 +196,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
tombstone_mark = datetime.now(timezone.utc)
# test #2: the tombstones are not cleaned up when one node is down
with pytest.raises(AssertionError, match="timed out"):
with pytest.raises(AssertionError, match="Deadline exceeded"):
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
await verify_tombstone_gc(tombstone_mark, timeout=5)
@@ -249,7 +249,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# test #4a: the tombstones are not cleaned up after both live nodes join the new group0
with pytest.raises(AssertionError, match="timed out"):
with pytest.raises(AssertionError, match="Deadline exceeded"):
await verify_tombstone_gc(tombstone_mark, timeout=5)
await manager.remove_node(servers[0].server_id, down_server.server_id)

View File

@@ -165,7 +165,7 @@ async def wait_for_cdc_generations_publishing(cql: Session, hosts: list[Host], d
unpublished_generations = topo_res[0].unpublished_cdc_generations
return unpublished_generations is None or len(unpublished_generations) == 0 or None
await wait_for(all_generations_published, deadline=deadline)
await wait_for(all_generations_published, deadline=deadline, period=1.0)
async def check_system_topology_and_cdc_generations_v3_consistency(manager: ManagerClient, live_hosts: list[Host], cqls: Optional[list[Session]] = None, ignored_hosts: list[Host] = []):
@@ -470,17 +470,6 @@ async def new_materialized_view(manager: ManagerClient, table, select, pk, where
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
async def keyspace_has_tablets(manager: ManagerClient, keyspace: str) -> bool:
"""
Checks whether the given keyspace uses tablets.
Adapted from its counterpart in the cqlpy test: cqlpy/util.py::keyspace_has_tablets.
"""
cql = manager.get_cql()
rows_iter = await cql.run_async(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'")
rows = list(rows_iter)
return len(rows) > 0 and getattr(rows[0], "initial_tablets", None) is not None
async def get_raft_log_size(cql, host) -> int:
query = "select count(\"index\") from system.raft"
return (await cql.run_async(query, host=host))[0][0]

View File

@@ -271,21 +271,10 @@ future<std::tuple<tests::proc::process_fixture, int>> tests::proc::start_docker_
// arbitrary timeout of 120s for the server to make some output. Very generous.
// but since we (maybe) run docker, and might need to pull image, this can take
// some time if we're unlucky.
auto [f1, f2] = co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
for (auto* f : {&f1, &f2}) {
if (f->failed()) {
try {
f->get();
} catch (in_use&) {
retry = true;
p = std::current_exception();
} catch (...) {
if (!p) {
p = std::current_exception();
}
}
}
}
co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
} catch (in_use&) {
retry = true;
p = std::current_exception();
} catch (...) {
p = std::current_exception();
}

View File

@@ -56,25 +56,15 @@ def unique_name(unique_name_prefix = 'test_'):
async def wait_for(
pred: Callable[[], Awaitable[Optional[T]]],
deadline: float,
period: float = 0.1,
period: float = 1,
before_retry: Optional[Callable[[], Any]] = None,
backoff_factor: float = 1.5,
max_period: float = 1.0,
label: Optional[str] = None) -> T:
tag = label or getattr(pred, '__name__', 'unlabeled')
start = time.time()
retries = 0
backoff_factor: float = 1,
max_period: float = None) -> T:
while True:
elapsed = time.time() - start
assert time.time() < deadline, \
f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
assert(time.time() < deadline), "Deadline exceeded, failing test."
res = await pred()
if res is not None:
if retries > 0:
logger.debug(f"wait_for({tag}) completed "
f"in {elapsed:.2f}s ({retries} retries)")
return res
retries += 1
await asyncio.sleep(period)
period *= backoff_factor
if max_period is not None:
@@ -283,14 +273,14 @@ async def wait_for_view_v1(cql: Session, name: str, node_count: int, timeout: in
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline, label=f"view_v1_{name}")
await wait_for(view_is_built, deadline)
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
async def view_is_built():
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline, label=f"view_{name}")
await wait_for(view_is_built, deadline)
async def wait_for_first_completed(coros: list[Coroutine], timeout: int|None = None):