mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-13 03:12:13 +00:00
Compare commits
11 Commits
master
...
scylla-202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6d09897339 | ||
|
|
5c8662d606 | ||
|
|
74a58a6757 | ||
|
|
148e05820b | ||
|
|
1438830348 | ||
|
|
c25f3eced8 | ||
|
|
d264fea176 | ||
|
|
9d942a5408 | ||
|
|
9622291e07 | ||
|
|
b98470a860 | ||
|
|
5231c77e8e |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2026.2.0-dev
|
||||
VERSION=2026.2.0-rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -593,6 +593,7 @@ scylla_tests = set([
|
||||
'test/boost/linearizing_input_stream_test',
|
||||
'test/boost/lister_test',
|
||||
'test/boost/locator_topology_test',
|
||||
'test/boost/lock_tables_metadata_test',
|
||||
'test/boost/log_heap_test',
|
||||
'test/boost/logalloc_standard_allocator_segment_pool_backend_test',
|
||||
'test/boost/logalloc_test',
|
||||
|
||||
@@ -399,9 +399,10 @@ future<> gossiper::do_send_ack2_msg(locator::host_id from, utils::chunked_vector
|
||||
}
|
||||
}
|
||||
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
|
||||
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
|
||||
auto ack2_msg_str = fmt::format("{}", ack2_msg);
|
||||
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
|
||||
co_await ser::gossip_rpc_verbs::send_gossip_digest_ack2(&_messaging, from, std::move(ack2_msg));
|
||||
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
|
||||
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
|
||||
}
|
||||
|
||||
// Depends on
|
||||
|
||||
@@ -1328,9 +1328,27 @@ future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db,
|
||||
|
||||
future<tables_metadata_lock_on_all_shards> database::lock_tables_metadata(sharded<database>& sharded_db) {
|
||||
tables_metadata_lock_on_all_shards locks;
|
||||
co_await sharded_db.invoke_on_all([&] (auto& db) -> future<> {
|
||||
// Acquire write lock on shard 0 first, and then on the remaining shards.
|
||||
//
|
||||
// Parallel acquisition on all shards could deadlock when two
|
||||
// fibers call lock_tables_metadata() concurrently: parallel_for_each
|
||||
// sends SMP messages to all shards even when the local shard's lock
|
||||
// attempt blocks. If task reordering (SEASTAR_SHUFFLE_TASK_QUEUE in
|
||||
// debug/sanitize builds) causes fiber A to win on shard X while
|
||||
// fiber B wins on shard Y, neither can make progress — classic
|
||||
// cross-shard lock-ordering deadlock.
|
||||
//
|
||||
// Acquiring the write lock on shard 0 first, and then on the remaining
|
||||
// shards, eliminates this: whichever fiber acquires shard 0 first is
|
||||
// guaranteed to acquire locks on all other shards before the other fiber
|
||||
// can acquire the lock on shard 0.
|
||||
co_await sharded_db.invoke_on(0, [&locks, &sharded_db] (auto& db) -> future<> {
|
||||
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
|
||||
co_await sharded_db.invoke_on_others([&locks] (auto& db) -> future<> {
|
||||
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
|
||||
});
|
||||
});
|
||||
|
||||
co_return locks;
|
||||
}
|
||||
|
||||
|
||||
@@ -438,9 +438,10 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
|
||||
|
||||
const auto cache_key = qp.compute_id(req, "", cql3::internal_dialect());
|
||||
auto ps_ptr = qp.get_prepared(cache_key);
|
||||
shared_ptr<cql_transport::messages::result_message::prepared> prepared_msg;
|
||||
if (!ps_ptr) {
|
||||
const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
|
||||
ps_ptr = msg_ptr->get_prepared();
|
||||
prepared_msg = co_await qp.prepare(req, qs, cql3::internal_dialect());
|
||||
ps_ptr = prepared_msg->get_prepared();
|
||||
if (!ps_ptr) {
|
||||
on_internal_error(paxos_state::logger, "prepared statement is null");
|
||||
}
|
||||
@@ -449,8 +450,8 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
|
||||
-1, service::node_local_only::yes);
|
||||
const auto st = ps_ptr->statement;
|
||||
|
||||
const auto msg_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
|
||||
co_return cql3::untyped_result_set(msg_ptr);
|
||||
const auto result_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
|
||||
co_return cql3::untyped_result_set(result_ptr);
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
|
||||
@@ -434,6 +434,8 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
|
||||
}
|
||||
|
||||
future<> group0_state_machine::enable_in_memory_state_machine() {
|
||||
co_await utils::get_local_injector().inject("group0_state_machine_enable_in_memory_fail",
|
||||
[] { return std::make_exception_ptr(std::runtime_error("injected failure in enable_in_memory_state_machine")); });
|
||||
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(_abort_source);
|
||||
if (!_in_memory_state_machine_enabled) {
|
||||
_in_memory_state_machine_enabled = true;
|
||||
|
||||
@@ -452,14 +452,16 @@ future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service:
|
||||
auto srv_for_group0 = create_server_for_group0(group0_id, my_id, ss, qp, mm);
|
||||
auto& persistence = srv_for_group0.persistence;
|
||||
auto& server = *srv_for_group0.server;
|
||||
co_await with_scheduling_group(_sg, [this, &srv_for_group0] (this auto self) -> future<> {
|
||||
co_await with_scheduling_group(_sg, [this, &srv_for_group0, group0_id] (this auto self) -> future<> {
|
||||
auto& state_machine = dynamic_cast<group0_state_machine&>(srv_for_group0.state_machine);
|
||||
co_await _raft_gr.start_server_for_group(std::move(srv_for_group0));
|
||||
// Set _group0 immediately after the server is registered in _raft_gr._servers.
|
||||
// This ensures abort_and_drain()/destroy() can find and clean up the server
|
||||
// even if enable_in_memory_state_machine() or later steps throw.
|
||||
_group0.emplace<raft::group_id>(group0_id);
|
||||
co_await state_machine.enable_in_memory_state_machine();
|
||||
});
|
||||
|
||||
_group0.emplace<raft::group_id>(group0_id);
|
||||
|
||||
// Fix for scylladb/scylladb#16683:
|
||||
// If the snapshot index is 0, trigger creation of a new snapshot
|
||||
// so bootstrapping nodes will receive a snapshot transfer.
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include "service/session.hh"
|
||||
#include "utils/log.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/timer.hh>
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -58,18 +59,35 @@ void session_manager::initiate_close_of_sessions_except(const std::unordered_set
|
||||
}
|
||||
|
||||
future<> session_manager::drain_closing_sessions() {
|
||||
slogger.info("drain_closing_sessions: waiting for lock");
|
||||
seastar::timer<lowres_clock> lock_timer([this] {
|
||||
slogger.warn("drain_closing_sessions: still waiting for lock, available units {}",
|
||||
_session_drain_sem.available_units());
|
||||
});
|
||||
lock_timer.arm_periodic(std::chrono::minutes(5));
|
||||
auto lock = co_await get_units(_session_drain_sem, 1);
|
||||
lock_timer.cancel();
|
||||
auto n = std::distance(_closing_sessions.begin(), _closing_sessions.end());
|
||||
slogger.info("drain_closing_sessions: acquired lock, {} sessions to drain", n);
|
||||
auto i = _closing_sessions.begin();
|
||||
while (i != _closing_sessions.end()) {
|
||||
session& s = *i;
|
||||
++i;
|
||||
auto id = s.id();
|
||||
slogger.debug("draining session {}", id);
|
||||
slogger.info("drain_closing_sessions: waiting for session {} to close, gate count {}", id, s.gate_count());
|
||||
std::optional<seastar::timer<lowres_clock>> warn_timer;
|
||||
warn_timer.emplace([&s, id] {
|
||||
slogger.warn("drain_closing_sessions: session {} still not closed, gate count {}",
|
||||
id, s.gate_count());
|
||||
});
|
||||
warn_timer->arm_periodic(std::chrono::minutes(5));
|
||||
co_await s.close();
|
||||
warn_timer.reset();
|
||||
if (_sessions.erase(id)) {
|
||||
slogger.debug("session {} closed", id);
|
||||
slogger.info("drain_closing_sessions: session {} closed", id);
|
||||
}
|
||||
}
|
||||
slogger.info("drain_closing_sessions: done");
|
||||
}
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -95,6 +95,10 @@ public:
|
||||
return _id;
|
||||
}
|
||||
|
||||
size_t gate_count() const {
|
||||
return _gate.get_count();
|
||||
}
|
||||
|
||||
/// Post-condition of successfully resolved future: There are no guards alive for this session, and
|
||||
/// and it's impossible to create more such guards later.
|
||||
/// Can be called concurrently.
|
||||
|
||||
@@ -4494,10 +4494,20 @@ future<> storage_service::local_topology_barrier() {
|
||||
version, current_version)));
|
||||
}
|
||||
|
||||
co_await ss._shared_token_metadata.stale_versions_in_use();
|
||||
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: waiting for stale token metadata versions to be released", version);
|
||||
{
|
||||
seastar::timer<lowres_clock> warn_timer([&ss, version] {
|
||||
rtlogger.warn("raft_topology_cmd::barrier_and_drain version {}: still waiting for stale versions, "
|
||||
"stale versions (version: use_count): {}",
|
||||
version, ss._shared_token_metadata.describe_stale_versions());
|
||||
});
|
||||
warn_timer.arm_periodic(std::chrono::minutes(5));
|
||||
co_await ss._shared_token_metadata.stale_versions_in_use();
|
||||
}
|
||||
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: stale versions released, draining closing sessions", version);
|
||||
co_await get_topology_session_manager().drain_closing_sessions();
|
||||
|
||||
rtlogger.info("raft_topology_cmd::barrier_and_drain done");
|
||||
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: done", version);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -4509,7 +4519,9 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
auto& raft_server = _group0->group0_server();
|
||||
auto group0_holder = _group0->hold_group0_gate();
|
||||
// do barrier to make sure we always see the latest topology
|
||||
rtlogger.info("topology cmd rpc {} index={}: starting read_barrier, term={}", cmd.cmd, cmd_index, term);
|
||||
co_await raft_server.read_barrier(&_group0_as);
|
||||
rtlogger.info("topology cmd rpc {} index={}: read_barrier completed", cmd.cmd, cmd_index);
|
||||
if (raft_server.get_current_term() != term) {
|
||||
// Return an error since the command is from outdated leader
|
||||
co_return result;
|
||||
|
||||
@@ -543,11 +543,16 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
|
||||
// during SSTable writing and removed before sealing. If the write
|
||||
// failed before sealing, the file may still be on disk and must be
|
||||
// cleaned up explicitly.
|
||||
// The component is only defined for the `ms` sstable format; for
|
||||
// older formats it is absent from the component map and looking up
|
||||
// its filename would throw std::out_of_range.
|
||||
// Use file_exists() to avoid a C++ exception on the common path
|
||||
// where the file was already removed before sealing.
|
||||
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
|
||||
if (co_await file_exists(temp_hashes)) {
|
||||
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
|
||||
if (sstable_version_constants::get_component_map(sst.get_version()).contains(component_type::TemporaryHashes)) {
|
||||
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
|
||||
if (co_await file_exists(temp_hashes)) {
|
||||
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
|
||||
}
|
||||
}
|
||||
if (sync) {
|
||||
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());
|
||||
|
||||
@@ -135,7 +135,23 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::migr
|
||||
}
|
||||
|
||||
future<> table_helper::insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
|
||||
co_await cache_table_info(qp, mm, qs);
|
||||
// _prepared_stmt is a checked_weak_ptr into the prepared statements
|
||||
// cache and can be invalidated by a concurrent purge (e.g. on a schema
|
||||
// change). cache_table_info() (re-)prepares and assigns _prepared_stmt,
|
||||
// but the pin protecting the entry is dropped when try_prepare()
|
||||
// returns. In release the chain of ready-future co_awaits back to here
|
||||
// resumes synchronously, but debug builds preempt on every co_await
|
||||
// even for ready futures, opening a window for a purge to drop the
|
||||
// entry and leave _prepared_stmt null. Loop until a synchronous
|
||||
// post-resume check finds _prepared_stmt valid; nothing can run between
|
||||
// that check and the dereference below. _insert_stmt is a strong
|
||||
// shared_ptr and is not affected by cache invalidation.
|
||||
while (true) {
|
||||
co_await cache_table_info(qp, mm, qs);
|
||||
if (_prepared_stmt) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
auto opts = opt_maker();
|
||||
opts.prepare(_prepared_stmt->bound_names);
|
||||
co_await _insert_stmt->execute(qp, qs, opts, std::nullopt);
|
||||
|
||||
@@ -150,6 +150,8 @@ add_scylla_test(lister_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(locator_topology_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(lock_tables_metadata_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(log_heap_test
|
||||
KIND BOOST)
|
||||
add_scylla_test(logalloc_standard_allocator_segment_pool_backend_test
|
||||
|
||||
36
test/boost/lock_tables_metadata_test.cc
Normal file
36
test/boost/lock_tables_metadata_test.cc
Normal file
@@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <seastar/core/with_timeout.hh>
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
// Test that two lock_tables_metadata calls don't deadlock
|
||||
SEASTAR_TEST_CASE(test_lock_tables_metadata_deadlock) {
|
||||
return do_with_cql_env_thread([](cql_test_env& e) {
|
||||
try {
|
||||
// Repeat the test scenario to increase the chance of hitting the deadlock.
|
||||
// If no deadlock occurs, each repetition should complete within a fraction of a second,
|
||||
// so even with 100 repetitions, the total test time should be reasonable.
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
with_timeout(lowres_clock::now() + 30s,
|
||||
when_all_succeed(
|
||||
e.local_db().lock_tables_metadata(e.db()).discard_result(),
|
||||
e.local_db().lock_tables_metadata(e.db()).discard_result()
|
||||
)).get();
|
||||
}
|
||||
} catch (seastar::timed_out_error&) {
|
||||
fmt::print(stderr, "FAIL: lock_tables_metadata deadlocked (timed out after 30s)\n");
|
||||
_exit(1);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -246,6 +246,33 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) {
|
||||
});
|
||||
}
|
||||
|
||||
// Reproducer for SCYLLADB-1697
|
||||
SEASTAR_TEST_CASE(sstable_directory_test_unlink_sstable_leaves_no_orphans) {
|
||||
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||
for (const auto version : {sstable_version_types::me, sstable_version_types::ms}) {
|
||||
testlog.info("Testing sstable version: {}", version);
|
||||
auto sst = make_sstable_for_this_shard([&env, version] {
|
||||
return env.make_sstable(test_table_schema(), version);
|
||||
});
|
||||
|
||||
// Sanity: the TOC was written, otherwise the assertion below would be vacuous.
|
||||
BOOST_REQUIRE(file_exists(test(sst).filename(sstables::component_type::TOC).native()).get());
|
||||
|
||||
sst->unlink().get();
|
||||
|
||||
std::vector<sstring> remaining;
|
||||
lister::scan_dir(env.tempdir().path(), lister::dir_entry_types::of<directory_entry_type::regular>(),
|
||||
[&remaining] (fs::path, directory_entry de) {
|
||||
remaining.push_back(de.name);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_MESSAGE(remaining.empty(),
|
||||
fmt::format("Expected empty sstable dir after unlink for version {}, found: {}", version, remaining));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Test the absence of TOC. Behavior is controllable by a flag
|
||||
SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
|
||||
return sstables::test_env::do_with_async([] (test_env& env) {
|
||||
|
||||
@@ -1379,7 +1379,7 @@ async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
|
||||
# The next barrier must be for the write_both_read_new, we need a guarantee
|
||||
# that the src_shard observed it
|
||||
logger.info("Waiting for the next barrier")
|
||||
await log.wait_for(re.escape(f"[shard {src_shard}: gms] raft_topology - raft_topology_cmd::barrier_and_drain done"),
|
||||
await log.wait_for(f"\\[shard {src_shard}: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain.*done",
|
||||
from_mark=m)
|
||||
|
||||
# Now we have a guarantee that a new barrier succeeded on the src_shard,
|
||||
|
||||
@@ -11,7 +11,8 @@ import pytest
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.cluster.util import (check_token_ring_and_group0_consistency, wait_for_token_ring_and_group0_consistency,
|
||||
get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected)
|
||||
get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected,
|
||||
wait_for_no_pending_topology_transition)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -19,7 +20,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout: int, scale_timeout: callable) -> None:
|
||||
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
|
||||
bootstrap, removenode, replace.
|
||||
|
||||
@@ -57,9 +58,11 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
|
||||
logger.debug("Kill coordinator during decommission")
|
||||
coordinator_host = await get_coordinator_host(manager)
|
||||
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
|
||||
num_elections = len(await get_coordinator_host_ids(manager))
|
||||
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
|
||||
await manager.decommission_node(server_id=other_nodes[-1].server_id, expected_error="Decommission failed. See earlier errors")
|
||||
await wait_new_coordinator_elected(manager, 2, time.time() + 60)
|
||||
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
|
||||
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
|
||||
await manager.server_restart(coordinator_host.server_id, wait_others=1)
|
||||
await manager.servers_see_each_other(await manager.running_servers())
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
@@ -73,33 +76,40 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
|
||||
node_to_remove_srv_id = other_nodes[-1].server_id
|
||||
logger.debug("Stop node with srv_id %s", node_to_remove_srv_id)
|
||||
await manager.server_stop_gracefully(node_to_remove_srv_id)
|
||||
num_elections = len(await get_coordinator_host_ids(manager))
|
||||
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
|
||||
logger.debug("Start removenode with srv_id %s from node with srv_id %s", node_to_remove_srv_id, working_srv_id)
|
||||
await manager.remove_node(working_srv_id,
|
||||
node_to_remove_srv_id,
|
||||
expected_error="Removenode failed. See earlier errors")
|
||||
|
||||
await wait_new_coordinator_elected(manager, 3, time.time() + 60)
|
||||
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
|
||||
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
|
||||
|
||||
await manager.others_not_see_server(server_ip=coordinator_host.ip_addr)
|
||||
logger.debug("Start old coordinator node with srv_id %s", coordinator_host.server_id)
|
||||
await manager.server_restart(coordinator_host.server_id, wait_others=1)
|
||||
await manager.servers_see_each_other(await manager.running_servers())
|
||||
logger.debug("Remove node with srv_id %s from node with srv_id %s because it was banned in a previous attempt", node_to_remove_srv_id, working_srv_id)
|
||||
await manager.remove_node(working_srv_id, node_to_remove_srv_id)
|
||||
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
|
||||
await manager.servers_see_each_other(await manager.running_servers())
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
logger.debug("Restore number of nodes in cluster")
|
||||
await manager.server_add(cmdline=cmdline)
|
||||
await manager.server_add(config=config, cmdline=cmdline)
|
||||
|
||||
# kill coordinator during bootstrap
|
||||
logger.debug("Kill coordinator during bootstrap")
|
||||
nodes = await manager.running_servers()
|
||||
coordinator_host = await get_coordinator_host(manager)
|
||||
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
|
||||
new_node = await manager.server_add(start=False, cmdline=cmdline)
|
||||
new_node = await manager.server_add(start=False, config=config, cmdline=cmdline)
|
||||
num_elections = len(await get_coordinator_host_ids(manager))
|
||||
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
|
||||
await manager.server_start(new_node.server_id,
|
||||
expected_error="Startup failed: std::runtime_error")
|
||||
await wait_new_coordinator_elected(manager, 4, time.time() + 60)
|
||||
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
|
||||
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
|
||||
await manager.server_restart(coordinator_host.server_id, wait_others=1)
|
||||
await manager.servers_see_each_other(await manager.running_servers())
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
@@ -111,11 +121,13 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
|
||||
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
|
||||
node_to_replace_srv_id = other_nodes[-1].server_id
|
||||
await manager.server_stop_gracefully(node_to_replace_srv_id)
|
||||
num_elections = len(await get_coordinator_host_ids(manager))
|
||||
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
|
||||
replace_cfg = ReplaceConfig(replaced_id = node_to_replace_srv_id, reuse_ip_addr = False, use_host_id = True)
|
||||
new_node = await manager.server_add(start=False, replace_cfg=replace_cfg, cmdline=cmdline)
|
||||
new_node = await manager.server_add(start=False, config=config, replace_cfg=replace_cfg, cmdline=cmdline)
|
||||
await manager.server_start(new_node.server_id, expected_error="Replace failed. See earlier errors")
|
||||
await wait_new_coordinator_elected(manager, 5, time.time() + 60)
|
||||
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
|
||||
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
|
||||
logger.debug("Start old coordinator node")
|
||||
await manager.others_not_see_server(server_ip=coordinator_host.ip_addr)
|
||||
await manager.server_restart(coordinator_host.server_id, wait_others=1)
|
||||
@@ -123,5 +135,5 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
|
||||
logger.debug("Replaced node is already non-voter and will be banned after restart. Remove it")
|
||||
coordinator_host = await get_coordinator_host(manager)
|
||||
await manager.remove_node(coordinator_host.server_id, node_to_replace_srv_id)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + scale_timeout(60))
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_failure_after_group0_server_registration(manager: ManagerClient) -> None:
|
||||
"""Test that a node shuts down cleanly when group0 startup fails after server registration.
|
||||
|
||||
Reproducer for: CUSTOMER-340, CUSTOMER-335, SCYLLADB-1217
|
||||
|
||||
On restart, setup_group0_if_exist() calls start_server_for_group0() which
|
||||
registers the raft server in raft_group_registry._servers, then calls
|
||||
enable_in_memory_state_machine(). If enable_in_memory_state_machine() throws
|
||||
(e.g., because reload_state() -> auth_cache().load_all() fails due to topology
|
||||
being in a transitional state), the exception propagates and stack unwinding
|
||||
calls raft_group_registry::stop().
|
||||
|
||||
Previously, _group0 was set AFTER the with_scheduling_group lambda returned,
|
||||
so a throw inside the lambda left _group0 as monostate. abort_and_drain() and
|
||||
destroy() would be no-ops, leaving the server orphaned in _servers.
|
||||
raft_group_registry::stop() would then hit on_internal_error
|
||||
("server for group ... is not destroyed") and abort.
|
||||
|
||||
The fix moves _group0.emplace() inside the lambda, immediately after
|
||||
start_server_for_group(), so destroy() can always find and clean up the server.
|
||||
|
||||
This test:
|
||||
1. Starts a node normally (group0 established)
|
||||
2. Stops the node
|
||||
3. Restarts with an injection that fails enable_in_memory_state_machine()
|
||||
4. Verifies the node fails startup cleanly (no abort)
|
||||
"""
|
||||
# Start a node normally so group0 is established
|
||||
srv = await manager.server_add()
|
||||
logger.info("Server %s started successfully with group0", srv.server_id)
|
||||
|
||||
logger.info("Stopping server %s", srv.server_id)
|
||||
await manager.server_stop_gracefully(srv.server_id)
|
||||
|
||||
logger.info("Restarting server %s with injection to fail enable_in_memory_state_machine", srv.server_id)
|
||||
await manager.server_update_config(srv.server_id,
|
||||
key='error_injections_at_startup',
|
||||
value=['group0_state_machine_enable_in_memory_fail'])
|
||||
await manager.server_start(srv.server_id,
|
||||
expected_error="injected failure in enable_in_memory_state_machine")
|
||||
|
||||
# If we get here without the test framework detecting a crash/abort,
|
||||
# the node shut down cleanly. The fix ensures abort_and_drain()/destroy()
|
||||
# can find and clean up the raft server even when startup fails.
|
||||
logger.info("Server failed startup and shut down cleanly (no abort)")
|
||||
@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
|
||||
from cassandra.query import ConsistencyLevel, SimpleStatement
|
||||
@@ -880,41 +880,30 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
|
||||
# affected replica but process the UNREPAIRED sstable on the others, so the classification
|
||||
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
|
||||
# on the affected replica leading to data resurrection.
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
class _LeadershipTransferred(Exception):
|
||||
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
|
||||
pass
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
|
||||
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
|
||||
|
||||
Returns the next current_key value.
|
||||
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
|
||||
restart, signalling the caller to retry.
|
||||
"""
|
||||
# Ensure servers[1] is not the topology coordinator. If the coordinator is
|
||||
# restarted, the Raft leader dies, a new election occurs, and the new
|
||||
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
|
||||
# and marking post-repair data as repaired. That legitimate re-repair masks
|
||||
# the compaction-merge bug this test detects.
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
if coord_serv == servers[1]:
|
||||
other = next(s for s in servers if s != servers[1])
|
||||
await ensure_group0_leader_on(manager, other)
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
coord_mark = await coord_log.mark()
|
||||
|
||||
@@ -978,6 +967,16 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
await manager.server_start(target.server_id)
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
# Check if leadership transferred to servers[1] during the restart.
|
||||
# If so, the new coordinator will re-initiate repair, masking the bug.
|
||||
new_coord = await get_topology_coordinator(manager)
|
||||
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
|
||||
if new_coord_serv == servers[1]:
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
|
||||
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
||||
raise _LeadershipTransferred(
|
||||
"servers[1] became topology coordinator after restart")
|
||||
|
||||
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
|
||||
# confirming that the bug was triggered (S1' and E merged during the race window).
|
||||
deadline = time.time() + 60
|
||||
@@ -1000,7 +999,7 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
if not compaction_ran:
|
||||
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
|
||||
"the bug was not triggered. Skipping assertion.")
|
||||
return
|
||||
return current_key
|
||||
|
||||
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
|
||||
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
|
||||
@@ -1031,8 +1030,9 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
|
||||
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
|
||||
|
||||
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
|
||||
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
|
||||
# servers[0] and servers[2] were never restarted and the coordinator stayed
|
||||
# alive throughout, so no re-repair could have flushed their memtables.
|
||||
# Post-repair keys must NOT appear in repaired sstables on these servers.
|
||||
assert not (repaired_keys_0 & post_repair_key_set), \
|
||||
f"servers[0] should not have post-repair keys in repaired sstables, " \
|
||||
f"got: {repaired_keys_0 & post_repair_key_set}"
|
||||
@@ -1053,6 +1053,54 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
||||
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
||||
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
||||
return current_key
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
|
||||
cmdline = ['--hinted-handoff-enabled', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
|
||||
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
|
||||
|
||||
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
|
||||
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH compaction = "
|
||||
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
|
||||
)
|
||||
|
||||
# Disable autocompaction everywhere so we control exactly when compaction runs.
|
||||
for s in servers:
|
||||
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
|
||||
|
||||
scylla_path = await manager.server_get_exe(servers[0].server_id)
|
||||
|
||||
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
|
||||
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
|
||||
# S0'(repaired_at=1) on all nodes.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
|
||||
# These will be the subject of repair 2.
|
||||
repair2_keys = list(range(current_key, current_key + 10))
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
current_key += 10
|
||||
|
||||
# If leadership transfers to servers[1] between our coordinator check and the
|
||||
# restart, the coordinator change masks the bug. Detect and retry.
|
||||
max_attempts = 5
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
current_key = await _do_race_window_promotes_unrepaired_data(
|
||||
manager, servers, cql, ks, token, scylla_path, current_key)
|
||||
return
|
||||
except _LeadershipTransferred as e:
|
||||
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
|
||||
|
||||
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
|
||||
"could not run the test without coordinator interference.")
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Tombstone GC safety tests
|
||||
|
||||
@@ -961,7 +961,7 @@ async def test_tablets_merge_waits_for_lwt(manager: ManagerClient, scale_timeout
|
||||
logger.info("Wait for the global barrier to start draining on shard0")
|
||||
await log0.wait_for("\\[shard 0: gms\\] raft_topology - Got raft_topology_cmd::barrier_and_drain", from_mark=m)
|
||||
# Just to confirm that the guard still holds the erm
|
||||
matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain done", from_mark=m)
|
||||
matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain.*done", from_mark=m)
|
||||
assert len(matches) == 0
|
||||
|
||||
# Before the fix, the tablet migration global barrier did not wait for the LWT operation.
|
||||
|
||||
@@ -18,7 +18,7 @@ from cassandra.cluster import ConnectionException, ConsistencyLevel, NoHostAvail
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
from test.pylib.internal_types import ServerInfo, HostID
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import get_host_api_address, read_barrier
|
||||
from test.pylib.rest_client import HTTPError, get_host_api_address, read_barrier
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name
|
||||
from typing import Optional, List, Union
|
||||
|
||||
@@ -119,6 +119,42 @@ async def check_token_ring_and_group0_consistency(manager: ManagerClient) -> Non
|
||||
assert token_ring_ids == group0_ids
|
||||
|
||||
|
||||
async def wait_for_no_pending_topology_transition(manager: ManagerClient, deadline: float) -> None:
|
||||
"""Wait until there is no pending topology transition.
|
||||
Polls system.topology until the transition_state column is null,
|
||||
indicating that the topology coordinator has finished processing the
|
||||
current operation (whether it completed successfully or was rolled back).
|
||||
"""
|
||||
cql = manager.get_cql()
|
||||
|
||||
async def no_transition():
|
||||
try:
|
||||
host = await get_available_host(cql, deadline)
|
||||
await read_barrier(manager.api, get_host_api_address(host))
|
||||
rs = await cql.run_async(
|
||||
"select transition_state from system.topology where key = 'topology'",
|
||||
host=host)
|
||||
except NoHostAvailable as e:
|
||||
logger.info(f"Topology transition check failed, retrying: {e}")
|
||||
return None
|
||||
except ConnectionException as e:
|
||||
logger.info(f"Topology transition check failed, retrying: {e}")
|
||||
return None
|
||||
except HTTPError as e:
|
||||
logger.info(f"Read barrier failed, retrying: {e}")
|
||||
return None
|
||||
|
||||
if not rs:
|
||||
logger.warning(f"Topology transition not visible: system.topology row not found, retrying")
|
||||
return None
|
||||
if rs[0].transition_state is not None:
|
||||
logger.warning(f"Topology transition still in progress: {rs[0].transition_state}")
|
||||
return None
|
||||
return True
|
||||
|
||||
await wait_for(no_transition, deadline, period=.5)
|
||||
|
||||
|
||||
async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, deadline: float) -> None:
|
||||
"""
|
||||
Weaker version of the above check.
|
||||
@@ -398,13 +434,14 @@ def get_uuid_from_str(string: str) -> str:
|
||||
async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_elections: int, deadline: float) -> None:
|
||||
"""Wait new coordinator to be elected
|
||||
|
||||
Wait while the table 'system.group0_history' will have a number of lines
|
||||
with the 'new topology coordinator' equal to the expected_num_of_elections number,
|
||||
Wait while the table 'system.group0_history' will have at least
|
||||
expected_num_of_elections lines with 'new topology coordinator',
|
||||
and the latest host_id coordinator differs from the previous one.
|
||||
"""
|
||||
async def new_coordinator_elected():
|
||||
coordinators_ids = await get_coordinator_host_ids(manager)
|
||||
if len(coordinators_ids) == expected_num_of_elections \
|
||||
logger.debug(f"Coordinators ids in history: {coordinators_ids}")
|
||||
if len(coordinators_ids) >= expected_num_of_elections \
|
||||
and coordinators_ids[0] != coordinators_ids[1]:
|
||||
return True
|
||||
logger.warning("New coordinator was not elected %s", coordinators_ids)
|
||||
|
||||
Reference in New Issue
Block a user