Compare commits

...

11 Commits

Author SHA1 Message Date
Patryk Jędrzejczak
6d09897339 Merge 'Barrier and drain logging' from Gleb Natapov
Add more logging to barrier and drain rpc to try and pinpoint https://github.com/scylladb/scylladb/issues/26281

Bakport since we want to have it if it happens in the field.

Fixes: SCYLLADB-1836
Refs: #26281

Closes scylladb/scylladb#29735

* https://github.com/scylladb/scylladb:
  session, raft_topology: add periodic warnings for hung drain and stale version waits
  session: add info-level logging to drain_closing_sessions
  raft_topology: log sub-step progress in local_topology_barrier
  raft_topology: log read_barrier progress in topology cmd handler

(cherry picked from commit b69d00b0a7)

Closes scylladb/scylladb#29763
2026-05-06 10:26:44 +02:00
Yaniv Michael Kaul
5c8662d606 raft/group0: fix destroy assertion on startup failure
If start_server_for_group0() successfully registers a server in
_raft_gr._servers but a subsequent step (e.g. enable_in_memory_state_machine())
throws, the server is never destroyed because abort_and_drain()/destroy()
check std::get_if<raft::group_id>(&_group0) which was only set after the
entire with_scheduling_group block completed.

Move _group0.emplace<raft::group_id>() inside the lambda, immediately after
start_server_for_group() succeeds, so that cleanup paths can always find
and destroy the registered server.

This fixes the assertion:
  "raft_group_registry - stop(): server for group ... is not destroyed"

which manifests during shutdown after an upgrade where topology_state_load()
fails due to netw::unknown_address.

Backport: Yes, to 2026.1, 2026.2, as it causes a crash on upgrades

Refs: SCYLLADB-1217
Refs: CUSTOMER-340
Refs: CUSTOMER-335
Fixes: SCYLLADB-1809
Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
AI-assisted: Yes, Opencode/Opus 4.6

Closes scylladb/scylladb#29702

(cherry picked from commit 6179406467)

Closes scylladb/scylladb#29742
2026-05-05 10:48:13 +02:00
Patryk Jędrzejczak
74a58a6757 Merge 'paxos_state: keep prepared message alive across statement execution' from Petr Gusev
In do_execute_cql_with_timeout(), when the prepared statement was not found in the cache, we called qp.prepare() and stored the returned result_message::prepared in a local variable scoped to the 'if' block. We then extracted ps_ptr (a checked_weak_ptr to the prepared statement) from the message, let the message go out of scope at the end of the 'if', and used ps_ptr after a co_await on st->execute().

Since 3ac4e258e8 ("transport/messages: hold pinned prepared entry in PREPARE result"), result_message::prepared owns a strong pinned reference to the prepared cache entry. While qp.prepare() runs it also holds its own pin on the entry, so on return the entry has at least the pin owned by the returned message. As long as that message is alive, the cache entry cannot be purged and the weak handle inside ps_ptr remains promotable.

The lifetime gap manifested only in debug builds. qp.prepare() returns a ready future on the cache-miss path, so in release builds the co_await resumes synchronously: control flows from the assignment of ps_ptr straight into st->execute() with no opportunity for any other task (in particular, prepared cache invalidation triggered by a concurrent schema change) to run in between. Debug builds, however, force a reactor preemption point on every co_await even when the awaited future is ready. With prepared_msg already destroyed at the end of the 'if' block, the only remaining handle on the cache entry was the weak ps_ptr, and the preemption gave a concurrent cache purge
- triggered, for example, by Raft schema changes received during a node restart - the chance to drop the entry. The subsequent execute() then failed when promoting the weak pointer with
checked_ptr_is_null_exception.

The exception propagated out of the Paxos prepare path as a generic std::exception with no type information in the log, surfacing on the coordinator as:

  WriteFailure: Failed to prepare ballot ... Replica errors:
  host_id ... -> seastar::rpc::remote_verb_error (std::exception)

Hoist the result_message::prepared into the outer scope so the pinned cache entry stays alive across co_await st->execute(...), closing the window in which a concurrent cache purge could invalidate the weak handle.

Fixes SCYLLADB-1173

backport: the patch is simple, we can backport it to all versions with "LWT over tablets" feature. Note that the problem is only in test runs in debug configuration, production is not affected.

Closes scylladb/scylladb#29675

* https://github.com/scylladb/scylladb:
  table_helper: retry insert prepare on concurrent cache invalidation
  paxos_state: keep prepared message alive across statement execution

(cherry picked from commit 15f35577ed)

Closes scylladb/scylladb#29701
2026-05-05 10:02:19 +02:00
Aleksandr Bykov
148e05820b test: fix flaky test_kill_coordinator_during_op
The test hardcoded the expected number of coordinator elections
(2, 3, 4, 5) for each phase. If a prior phase triggered an extra
election, subsequent phases would wait for a count that was already
reached or would never match.

Fix by reading the current election count before each operation and
expecting exactly one more, making each phase independent of prior
history.

Also add wait_for_no_pending_topology_transition() calls after each
coordinator election to ensure the topology state machine has fully
settled before proceeding with restarts and further operations.

Decrease the failure detector timeout (failure_detector_timeout_in_ms)
to 2000 ms on all test nodes so that coordinator crashes are detected
faster, reducing test wallclock time and timeout-related flakiness.

Enable raft_topology=trace logging on all test nodes to aid
post-failure diagnosis. Add diagnostic logging in
wait_new_coordinator_elected().

Fixes: SCYLLADB-1790

Closes scylladb/scylladb#29284

(cherry picked from commit 8afdae24d2)

Closes scylladb/scylladb#29723
2026-05-02 16:27:16 +03:00
Łukasz Paszkowski
1438830348 sstables: only wipe TemporaryHashes for sstable formats that have it
Commit 8d34127684 ("sstables: clean up TemporaryHashes file in wipe()")
unconditionally calls filename(..., component_type::TemporaryHashes)
inside filesystem_storage::wipe(). However, the TemporaryHashes
component is only registered in the component map of the 'ms' sstable
format. For older formats (ka, la, mc, md, me) the lookup goes through
sstable_version_constants::get_component_map(version).at(...) and throws
std::out_of_range.

The exception is then swallowed by the outer catch(...) in wipe(), which
just logs and ignores. As a side effect, the subsequent
remove_file(new_toc_name) is never reached and the TemporaryTOC
('*-TOC.txt.tmp') file is left as an orphan on disk after every unlink()
of a non-'ms' sstable.

Guard the lookup with get_component_map(version).contains() so the
cleanup is only attempted for formats that actually define the
component.

Add a regression test in test/boost/sstable_directory_test.cc that
creates an 'me'-format sstable, unlinks it and asserts that the sstable
directory is left empty. Without the fix the test fails with a leftover
'me-...-TOC.txt.tmp' file.

Fixes: SCYLLADB-1767

Closes scylladb/scylladb#29620

(cherry picked from commit 7e14ea5ac8)

Closes scylladb/scylladb#29692
2026-04-30 21:49:31 +03:00
Yaniv Michael Kaul
c25f3eced8 gms/gossiper: fix use-after-move in do_send_ack2_msg
The second logger.debug() call accesses ack2_msg after it was moved
via std::move() in the co_await send_gossip_digest_ack2 call.
This is undefined behavior.

Fix by formatting ack2_msg to a string before the move, then using
that cached string in both debug log calls.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1778

Closes scylladb/scylladb#29227

(cherry picked from commit 93722f2c89)

Closes scylladb/scylladb#29710
2026-04-30 21:16:19 +03:00
Wojciech Mitros
d264fea176 replica/database: fix cross-shard deadlock in lock_tables_metadata()
lock_tables_metadata() acquires a write lock on tables_metadata._cf_lock
on every shard.  It used invoke_on_all(), which dispatches lock
acquisitions to all shards in parallel via parallel_for_each +
smp::submit_to.

When two fibers call lock_tables_metadata() concurrently, this can
deadlock.  parallel_for_each starts all iterations unconditionally:
even when the local shard's lock attempt blocks (because the other
fiber already holds it), SMP messages are still sent to remote shards.
Both fibers' lock-acquisition messages land in the per-shard SMP
queues.  The SMP queue itself is FIFO, but process_incoming() drains
it and schedules each item as a reactor task via add_task(), which —
in debug and sanitize builds with SEASTAR_SHUFFLE_TASK_QUEUE — shuffles
each newly added task against all pending tasks in the same scheduling
group's reactor task queue.  This means fiber A's lock acquisition can
be reordered past fiber B's (and past unrelated tasks) on a given shard.
If fiber A wins the lock on shard X while fiber B wins on shard Y, this
creates a classic cross-shard lock-ordering deadlock (circular wait).

In production builds without SEASTAR_SHUFFLE_TASK_QUEUE, the reactor
task queue is FIFO. Still, even in release builds, the SMP queues can
reorder messages even, so the deadlock is still possible, even if it's
much less likely. In debug and sanitize builds, the task-queue shuffle
makes the deadlock very likely whenever both fibers' lock-acquisition
tasks are pending simultaneously in the reactor task queue on any shard.

This deadlock was exposed by ce00d61917 ("db: implement large_data
virtual tables with feature flag gating", merged as 88a8324e68),
which introduced legacy_drop_table_on_all_shards as a second caller
of lock_tables_metadata().  When LARGE_DATA_VIRTUAL_TABLES is enabled
during topology_state_load (via feature_service::enable), two fibers
can race:

  1. activate_large_data_virtual_tables() — calls
     legacy_drop_table_on_all_shards() which calls
     lock_tables_metadata() synchronously via .get()

  2. reload_schema_in_bg() — fires as a background fiber from
     TABLE_DIGEST_INSENSITIVE_TO_EXPIRY, eventually reaches
     schema_applier::commit() which also calls lock_tables_metadata()

If both reach lock_tables_metadata() while the lock is free on all
shards, the parallel acquisition creates the deadlock opportunity.
The deadlock blocks topology_state_load() from completing, which
prevents the bootstrapping node from finishing its topology state
transitions.  The coordinator's topology coordinator then waits for
the node to reach the expected state, but the node is stuck, so
eventually the read_barrier times out after 300 seconds.

Fix by acquiring the shard 0 lock first before attempting to
acquire any other lock. Whichever fiber wins shard 0 is
guaranteed to acquire all remaining shards before the other fiber
can proceed past shard 0, eliminating the circular-wait condition.

Tested manually with 2 approaches:
1. causing different shard locks to be acquired by different
lock_tables_metadata() calls by adding different sleeps depending
on the lock_tables_metadata() call and target shard - this reproduced
the issue consistently
2. matching the time point at which both fibers reach lock_tables_metadata()
adding a single sleep to one of the fibers - this heavily depends on
the machine so we can't create a universal reproducer this way, but
it did result in the observed failure on my machine after finding the
right sleep time

Also added a unit test for concurrent lock_tables_metadata() calls.

Fixes: SCYLLADB-1784
Fixes: SCYLLADB-1785
Fixes: SCYLLADB-1786

Closes scylladb/scylladb#29678

(cherry picked from commit ebaf536449)

Closes scylladb/scylladb#29709
2026-04-30 21:08:15 +03:00
Avi Kivity
9d942a5408 build: point seastar submodule at scylla-seastar.git
This allows us to backport seastar commits as the need arises.
2026-04-30 11:49:38 +03:00
Botond Dénes
9622291e07 Merge 'test/cluster/test_incremental_repair: fix flaky coordinator-change scenario' from Avi Kivity
- Ensure servers[1] is not the topology coordinator before restarting it, preventing the leader death + re-election + re-repair sequence that masked the compaction-merge bug
- Add a retry loop that detects post-restart leadership transfer to servers[1] via direct coordinator query, retrying up to 5 times

Fixes: SCYLLADB-1743

Backporting to 2026.2, which sees the failure regularly.

Closes scylladb/scylladb#29671

* github.com:scylladb/scylladb:
  test/cluster/test_incremental_repair: add retry for residual leadership race
  test/cluster/test_incremental_repair: fix flaky coordinator-change scenario

(cherry picked from commit 3ea4af1c8c)

Closes scylladb/scylladb#29677
2026-04-30 08:46:36 +03:00
Jenkins Promoter
b98470a860 Update ScyllaDB version to: 2026.2.0-rc1 2026-04-28 15:59:54 +03:00
Anna Mikhlin
5231c77e8e Update ScyllaDB version to: 2026.2.0-rc0 2026-04-26 15:28:16 +03:00
22 changed files with 373 additions and 70 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.2.0-dev
VERSION=2026.2.0-rc1
if test -f version
then

View File

@@ -593,6 +593,7 @@ scylla_tests = set([
'test/boost/linearizing_input_stream_test',
'test/boost/lister_test',
'test/boost/locator_topology_test',
'test/boost/lock_tables_metadata_test',
'test/boost/log_heap_test',
'test/boost/logalloc_standard_allocator_segment_pool_backend_test',
'test/boost/logalloc_test',

View File

@@ -399,9 +399,10 @@ future<> gossiper::do_send_ack2_msg(locator::host_id from, utils::chunked_vector
}
}
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
auto ack2_msg_str = fmt::format("{}", ack2_msg);
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
co_await ser::gossip_rpc_verbs::send_gossip_digest_ack2(&_messaging, from, std::move(ack2_msg));
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg_str);
}
// Depends on

View File

@@ -1328,9 +1328,27 @@ future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db,
future<tables_metadata_lock_on_all_shards> database::lock_tables_metadata(sharded<database>& sharded_db) {
tables_metadata_lock_on_all_shards locks;
co_await sharded_db.invoke_on_all([&] (auto& db) -> future<> {
// Acquire write lock on shard 0 first, and then on the remaining shards.
//
// Parallel acquisition on all shards could deadlock when two
// fibers call lock_tables_metadata() concurrently: parallel_for_each
// sends SMP messages to all shards even when the local shard's lock
// attempt blocks. If task reordering (SEASTAR_SHUFFLE_TASK_QUEUE in
// debug/sanitize builds) causes fiber A to win on shard X while
// fiber B wins on shard Y, neither can make progress — classic
// cross-shard lock-ordering deadlock.
//
// Acquiring the write lock on shard 0 first, and then on the remaining
// shards, eliminates this: whichever fiber acquires shard 0 first is
// guaranteed to acquire locks on all other shards before the other fiber
// can acquire the lock on shard 0.
co_await sharded_db.invoke_on(0, [&locks, &sharded_db] (auto& db) -> future<> {
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
co_await sharded_db.invoke_on_others([&locks] (auto& db) -> future<> {
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
});
});
co_return locks;
}

View File

@@ -438,9 +438,10 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
const auto cache_key = qp.compute_id(req, "", cql3::internal_dialect());
auto ps_ptr = qp.get_prepared(cache_key);
shared_ptr<cql_transport::messages::result_message::prepared> prepared_msg;
if (!ps_ptr) {
const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
ps_ptr = msg_ptr->get_prepared();
prepared_msg = co_await qp.prepare(req, qs, cql3::internal_dialect());
ps_ptr = prepared_msg->get_prepared();
if (!ps_ptr) {
on_internal_error(paxos_state::logger, "prepared statement is null");
}
@@ -449,8 +450,8 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
-1, service::node_local_only::yes);
const auto st = ps_ptr->statement;
const auto msg_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
co_return cql3::untyped_result_set(msg_ptr);
const auto result_ptr = co_await st->execute(qp, qs, qo, std::nullopt);
co_return cql3::untyped_result_set(result_ptr);
}
template <typename... Args>

View File

@@ -434,6 +434,8 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
}
future<> group0_state_machine::enable_in_memory_state_machine() {
co_await utils::get_local_injector().inject("group0_state_machine_enable_in_memory_fail",
[] { return std::make_exception_ptr(std::runtime_error("injected failure in enable_in_memory_state_machine")); });
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(_abort_source);
if (!_in_memory_state_machine_enabled) {
_in_memory_state_machine_enabled = true;

View File

@@ -452,14 +452,16 @@ future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service:
auto srv_for_group0 = create_server_for_group0(group0_id, my_id, ss, qp, mm);
auto& persistence = srv_for_group0.persistence;
auto& server = *srv_for_group0.server;
co_await with_scheduling_group(_sg, [this, &srv_for_group0] (this auto self) -> future<> {
co_await with_scheduling_group(_sg, [this, &srv_for_group0, group0_id] (this auto self) -> future<> {
auto& state_machine = dynamic_cast<group0_state_machine&>(srv_for_group0.state_machine);
co_await _raft_gr.start_server_for_group(std::move(srv_for_group0));
// Set _group0 immediately after the server is registered in _raft_gr._servers.
// This ensures abort_and_drain()/destroy() can find and clean up the server
// even if enable_in_memory_state_machine() or later steps throw.
_group0.emplace<raft::group_id>(group0_id);
co_await state_machine.enable_in_memory_state_machine();
});
_group0.emplace<raft::group_id>(group0_id);
// Fix for scylladb/scylladb#16683:
// If the snapshot index is 0, trigger creation of a new snapshot
// so bootstrapping nodes will receive a snapshot transfer.

View File

@@ -9,6 +9,7 @@
#include "service/session.hh"
#include "utils/log.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/core/timer.hh>
namespace service {
@@ -58,18 +59,35 @@ void session_manager::initiate_close_of_sessions_except(const std::unordered_set
}
future<> session_manager::drain_closing_sessions() {
slogger.info("drain_closing_sessions: waiting for lock");
seastar::timer<lowres_clock> lock_timer([this] {
slogger.warn("drain_closing_sessions: still waiting for lock, available units {}",
_session_drain_sem.available_units());
});
lock_timer.arm_periodic(std::chrono::minutes(5));
auto lock = co_await get_units(_session_drain_sem, 1);
lock_timer.cancel();
auto n = std::distance(_closing_sessions.begin(), _closing_sessions.end());
slogger.info("drain_closing_sessions: acquired lock, {} sessions to drain", n);
auto i = _closing_sessions.begin();
while (i != _closing_sessions.end()) {
session& s = *i;
++i;
auto id = s.id();
slogger.debug("draining session {}", id);
slogger.info("drain_closing_sessions: waiting for session {} to close, gate count {}", id, s.gate_count());
std::optional<seastar::timer<lowres_clock>> warn_timer;
warn_timer.emplace([&s, id] {
slogger.warn("drain_closing_sessions: session {} still not closed, gate count {}",
id, s.gate_count());
});
warn_timer->arm_periodic(std::chrono::minutes(5));
co_await s.close();
warn_timer.reset();
if (_sessions.erase(id)) {
slogger.debug("session {} closed", id);
slogger.info("drain_closing_sessions: session {} closed", id);
}
}
slogger.info("drain_closing_sessions: done");
}
} // namespace service

View File

@@ -95,6 +95,10 @@ public:
return _id;
}
size_t gate_count() const {
return _gate.get_count();
}
/// Post-condition of successfully resolved future: There are no guards alive for this session, and
/// and it's impossible to create more such guards later.
/// Can be called concurrently.

View File

@@ -4494,10 +4494,20 @@ future<> storage_service::local_topology_barrier() {
version, current_version)));
}
co_await ss._shared_token_metadata.stale_versions_in_use();
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: waiting for stale token metadata versions to be released", version);
{
seastar::timer<lowres_clock> warn_timer([&ss, version] {
rtlogger.warn("raft_topology_cmd::barrier_and_drain version {}: still waiting for stale versions, "
"stale versions (version: use_count): {}",
version, ss._shared_token_metadata.describe_stale_versions());
});
warn_timer.arm_periodic(std::chrono::minutes(5));
co_await ss._shared_token_metadata.stale_versions_in_use();
}
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: stale versions released, draining closing sessions", version);
co_await get_topology_session_manager().drain_closing_sessions();
rtlogger.info("raft_topology_cmd::barrier_and_drain done");
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: done", version);
});
}
@@ -4509,7 +4519,9 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
auto& raft_server = _group0->group0_server();
auto group0_holder = _group0->hold_group0_gate();
// do barrier to make sure we always see the latest topology
rtlogger.info("topology cmd rpc {} index={}: starting read_barrier, term={}", cmd.cmd, cmd_index, term);
co_await raft_server.read_barrier(&_group0_as);
rtlogger.info("topology cmd rpc {} index={}: read_barrier completed", cmd.cmd, cmd_index);
if (raft_server.get_current_term() != term) {
// Return an error since the command is from outdated leader
co_return result;

View File

@@ -543,11 +543,16 @@ future<> filesystem_storage::wipe(const sstable& sst, sync_dir sync) noexcept {
// during SSTable writing and removed before sealing. If the write
// failed before sealing, the file may still be on disk and must be
// cleaned up explicitly.
// The component is only defined for the `ms` sstable format; for
// older formats it is absent from the component map and looking up
// its filename would throw std::out_of_range.
// Use file_exists() to avoid a C++ exception on the common path
// where the file was already removed before sealing.
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
if (co_await file_exists(temp_hashes)) {
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
if (sstable_version_constants::get_component_map(sst.get_version()).contains(component_type::TemporaryHashes)) {
auto temp_hashes = filename(sst, dir_name.native(), sst._generation, component_type::TemporaryHashes);
if (co_await file_exists(temp_hashes)) {
co_await sst.sstable_write_io_check(remove_file, std::move(temp_hashes));
}
}
if (sync) {
co_await sst.sstable_write_io_check(sync_directory, dir_name.native());

View File

@@ -135,7 +135,23 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::migr
}
future<> table_helper::insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
co_await cache_table_info(qp, mm, qs);
// _prepared_stmt is a checked_weak_ptr into the prepared statements
// cache and can be invalidated by a concurrent purge (e.g. on a schema
// change). cache_table_info() (re-)prepares and assigns _prepared_stmt,
// but the pin protecting the entry is dropped when try_prepare()
// returns. In release the chain of ready-future co_awaits back to here
// resumes synchronously, but debug builds preempt on every co_await
// even for ready futures, opening a window for a purge to drop the
// entry and leave _prepared_stmt null. Loop until a synchronous
// post-resume check finds _prepared_stmt valid; nothing can run between
// that check and the dereference below. _insert_stmt is a strong
// shared_ptr and is not affected by cache invalidation.
while (true) {
co_await cache_table_info(qp, mm, qs);
if (_prepared_stmt) {
break;
}
}
auto opts = opt_maker();
opts.prepare(_prepared_stmt->bound_names);
co_await _insert_stmt->execute(qp, qs, opts, std::nullopt);

View File

@@ -150,6 +150,8 @@ add_scylla_test(lister_test
KIND SEASTAR)
add_scylla_test(locator_topology_test
KIND SEASTAR)
add_scylla_test(lock_tables_metadata_test
KIND SEASTAR)
add_scylla_test(log_heap_test
KIND BOOST)
add_scylla_test(logalloc_standard_allocator_segment_pool_backend_test

View File

@@ -0,0 +1,36 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include <fmt/format.h>
#include <seastar/core/with_timeout.hh>
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
using namespace std::chrono_literals;
// Test that two lock_tables_metadata calls don't deadlock
SEASTAR_TEST_CASE(test_lock_tables_metadata_deadlock) {
return do_with_cql_env_thread([](cql_test_env& e) {
try {
// Repeat the test scenario to increase the chance of hitting the deadlock.
// If no deadlock occurs, each repetition should complete within a fraction of a second,
// so even with 100 repetitions, the total test time should be reasonable.
for (int i = 0; i < 100; ++i) {
with_timeout(lowres_clock::now() + 30s,
when_all_succeed(
e.local_db().lock_tables_metadata(e.db()).discard_result(),
e.local_db().lock_tables_metadata(e.db()).discard_result()
)).get();
}
} catch (seastar::timed_out_error&) {
fmt::print(stderr, "FAIL: lock_tables_metadata deadlocked (timed out after 30s)\n");
_exit(1);
}
});
}

View File

@@ -246,6 +246,33 @@ SEASTAR_TEST_CASE(sstable_directory_test_table_extra_temporary_toc) {
});
}
// Reproducer for SCYLLADB-1697
SEASTAR_TEST_CASE(sstable_directory_test_unlink_sstable_leaves_no_orphans) {
return sstables::test_env::do_with_async([] (test_env& env) {
for (const auto version : {sstable_version_types::me, sstable_version_types::ms}) {
testlog.info("Testing sstable version: {}", version);
auto sst = make_sstable_for_this_shard([&env, version] {
return env.make_sstable(test_table_schema(), version);
});
// Sanity: the TOC was written, otherwise the assertion below would be vacuous.
BOOST_REQUIRE(file_exists(test(sst).filename(sstables::component_type::TOC).native()).get());
sst->unlink().get();
std::vector<sstring> remaining;
lister::scan_dir(env.tempdir().path(), lister::dir_entry_types::of<directory_entry_type::regular>(),
[&remaining] (fs::path, directory_entry de) {
remaining.push_back(de.name);
return make_ready_future<>();
}).get();
BOOST_REQUIRE_MESSAGE(remaining.empty(),
fmt::format("Expected empty sstable dir after unlink for version {}, found: {}", version, remaining));
}
});
}
// Test the absence of TOC. Behavior is controllable by a flag
SEASTAR_TEST_CASE(sstable_directory_test_table_missing_toc) {
return sstables::test_env::do_with_async([] (test_env& env) {

View File

@@ -1379,7 +1379,7 @@ async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
# The next barrier must be for the write_both_read_new, we need a guarantee
# that the src_shard observed it
logger.info("Waiting for the next barrier")
await log.wait_for(re.escape(f"[shard {src_shard}: gms] raft_topology - raft_topology_cmd::barrier_and_drain done"),
await log.wait_for(f"\\[shard {src_shard}: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain.*done",
from_mark=m)
# Now we have a guarantee that a new barrier succeeded on the src_shard,

View File

@@ -11,7 +11,8 @@ import pytest
from test.pylib.manager_client import ManagerClient
from test.pylib.scylla_cluster import ReplaceConfig
from test.cluster.util import (check_token_ring_and_group0_consistency, wait_for_token_ring_and_group0_consistency,
get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected)
get_coordinator_host, get_coordinator_host_ids, wait_new_coordinator_elected,
wait_for_no_pending_topology_transition)
logger = logging.getLogger(__name__)
@@ -19,7 +20,7 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout: int, scale_timeout: callable) -> None:
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
bootstrap, removenode, replace.
@@ -57,9 +58,11 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
logger.debug("Kill coordinator during decommission")
coordinator_host = await get_coordinator_host(manager)
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
await manager.decommission_node(server_id=other_nodes[-1].server_id, expected_error="Decommission failed. See earlier errors")
await wait_new_coordinator_elected(manager, 2, time.time() + 60)
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.server_restart(coordinator_host.server_id, wait_others=1)
await manager.servers_see_each_other(await manager.running_servers())
await check_token_ring_and_group0_consistency(manager)
@@ -73,33 +76,40 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
node_to_remove_srv_id = other_nodes[-1].server_id
logger.debug("Stop node with srv_id %s", node_to_remove_srv_id)
await manager.server_stop_gracefully(node_to_remove_srv_id)
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
logger.debug("Start removenode with srv_id %s from node with srv_id %s", node_to_remove_srv_id, working_srv_id)
await manager.remove_node(working_srv_id,
node_to_remove_srv_id,
expected_error="Removenode failed. See earlier errors")
await wait_new_coordinator_elected(manager, 3, time.time() + 60)
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.others_not_see_server(server_ip=coordinator_host.ip_addr)
logger.debug("Start old coordinator node with srv_id %s", coordinator_host.server_id)
await manager.server_restart(coordinator_host.server_id, wait_others=1)
await manager.servers_see_each_other(await manager.running_servers())
logger.debug("Remove node with srv_id %s from node with srv_id %s because it was banned in a previous attempt", node_to_remove_srv_id, working_srv_id)
await manager.remove_node(working_srv_id, node_to_remove_srv_id)
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.servers_see_each_other(await manager.running_servers())
await check_token_ring_and_group0_consistency(manager)
logger.debug("Restore number of nodes in cluster")
await manager.server_add(cmdline=cmdline)
await manager.server_add(config=config, cmdline=cmdline)
# kill coordinator during bootstrap
logger.debug("Kill coordinator during bootstrap")
nodes = await manager.running_servers()
coordinator_host = await get_coordinator_host(manager)
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
new_node = await manager.server_add(start=False, cmdline=cmdline)
new_node = await manager.server_add(start=False, config=config, cmdline=cmdline)
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
await manager.server_start(new_node.server_id,
expected_error="Startup failed: std::runtime_error")
await wait_new_coordinator_elected(manager, 4, time.time() + 60)
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
await manager.server_restart(coordinator_host.server_id, wait_others=1)
await manager.servers_see_each_other(await manager.running_servers())
await check_token_ring_and_group0_consistency(manager)
@@ -111,11 +121,13 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
other_nodes = [srv for srv in nodes if srv.server_id != coordinator_host.server_id]
node_to_replace_srv_id = other_nodes[-1].server_id
await manager.server_stop_gracefully(node_to_replace_srv_id)
num_elections = len(await get_coordinator_host_ids(manager))
await manager.api.enable_injection(coordinator_host.ip_addr, "crash_coordinator_before_stream", one_shot=True)
replace_cfg = ReplaceConfig(replaced_id = node_to_replace_srv_id, reuse_ip_addr = False, use_host_id = True)
new_node = await manager.server_add(start=False, replace_cfg=replace_cfg, cmdline=cmdline)
new_node = await manager.server_add(start=False, config=config, replace_cfg=replace_cfg, cmdline=cmdline)
await manager.server_start(new_node.server_id, expected_error="Replace failed. See earlier errors")
await wait_new_coordinator_elected(manager, 5, time.time() + 60)
await wait_new_coordinator_elected(manager, num_elections + 1, time.time() + scale_timeout(60))
await wait_for_no_pending_topology_transition(manager, time.time() + scale_timeout(60))
logger.debug("Start old coordinator node")
await manager.others_not_see_server(server_ip=coordinator_host.ip_addr)
await manager.server_restart(coordinator_host.server_id, wait_others=1)
@@ -123,5 +135,5 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
logger.debug("Replaced node is already non-voter and will be banned after restart. Remove it")
coordinator_host = await get_coordinator_host(manager)
await manager.remove_node(coordinator_host.server_id, node_to_replace_srv_id)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + scale_timeout(60))
await check_token_ring_and_group0_consistency(manager)

View File

@@ -0,0 +1,61 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
import logging
import pytest
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_failure_after_group0_server_registration(manager: ManagerClient) -> None:
"""Test that a node shuts down cleanly when group0 startup fails after server registration.
Reproducer for: CUSTOMER-340, CUSTOMER-335, SCYLLADB-1217
On restart, setup_group0_if_exist() calls start_server_for_group0() which
registers the raft server in raft_group_registry._servers, then calls
enable_in_memory_state_machine(). If enable_in_memory_state_machine() throws
(e.g., because reload_state() -> auth_cache().load_all() fails due to topology
being in a transitional state), the exception propagates and stack unwinding
calls raft_group_registry::stop().
Previously, _group0 was set AFTER the with_scheduling_group lambda returned,
so a throw inside the lambda left _group0 as monostate. abort_and_drain() and
destroy() would be no-ops, leaving the server orphaned in _servers.
raft_group_registry::stop() would then hit on_internal_error
("server for group ... is not destroyed") and abort.
The fix moves _group0.emplace() inside the lambda, immediately after
start_server_for_group(), so destroy() can always find and clean up the server.
This test:
1. Starts a node normally (group0 established)
2. Stops the node
3. Restarts with an injection that fails enable_in_memory_state_machine()
4. Verifies the node fails startup cleanly (no abort)
"""
# Start a node normally so group0 is established
srv = await manager.server_add()
logger.info("Server %s started successfully with group0", srv.server_id)
logger.info("Stopping server %s", srv.server_id)
await manager.server_stop_gracefully(srv.server_id)
logger.info("Restarting server %s with injection to fail enable_in_memory_state_machine", srv.server_id)
await manager.server_update_config(srv.server_id,
key='error_injections_at_startup',
value=['group0_state_machine_enable_in_memory_fail'])
await manager.server_start(srv.server_id,
expected_error="injected failure in enable_in_memory_state_machine")
# If we get here without the test framework detecting a crash/abort,
# the node shut down cleanly. The fix ensures abort_and_drain()/destroy()
# can find and clean up the raft server even when startup fails.
logger.info("Server failed startup and shut down cleanly (no abort)")

View File

@@ -8,7 +8,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.tasks.task_manager_client import TaskManagerClient
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, ensure_group0_leader_on, new_test_keyspace, new_test_table, trigger_stepdown
from test.pylib.util import wait_for_cql_and_get_hosts
from cassandra.query import ConsistencyLevel, SimpleStatement
@@ -880,41 +880,30 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
# affected replica but process the UNREPAIRED sstable on the others, so the classification
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
# on the affected replica leading to data resurrection.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
class _LeadershipTransferred(Exception):
"""Raised when leadership transferred to servers[1] during the test, requiring a retry."""
pass
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
async def _do_race_window_promotes_unrepaired_data(manager, servers, cql, ks, token, scylla_path, current_key):
"""Core logic for test_incremental_repair_race_window_promotes_unrepaired_data.
Returns the next current_key value.
Raises _LeadershipTransferred if servers[1] becomes coordinator after the
restart, signalling the caller to retry.
"""
# Ensure servers[1] is not the topology coordinator. If the coordinator is
# restarted, the Raft leader dies, a new election occurs, and the new
# coordinator re-initiates tablet repair -- flushing memtables on all replicas
# and marking post-repair data as repaired. That legitimate re-repair masks
# the compaction-merge bug this test detects.
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
if coord_serv == servers[1]:
other = next(s for s in servers if s != servers[1])
await ensure_group0_leader_on(manager, other)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
coord_mark = await coord_log.mark()
@@ -978,6 +967,16 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
await manager.server_start(target.server_id)
await manager.servers_see_each_other(servers)
# Check if leadership transferred to servers[1] during the restart.
# If so, the new coordinator will re-initiate repair, masking the bug.
new_coord = await get_topology_coordinator(manager)
new_coord_serv = await find_server_by_host_id(manager, servers, new_coord)
if new_coord_serv == servers[1]:
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
await manager.api.wait_task(servers[0].ip_addr, task_id)
raise _LeadershipTransferred(
"servers[1] became topology coordinator after restart")
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
# confirming that the bug was triggered (S1' and E merged during the race window).
deadline = time.time() + 60
@@ -1000,7 +999,7 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
if not compaction_ran:
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
"the bug was not triggered. Skipping assertion.")
return
return current_key
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
@@ -1031,8 +1030,9 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
# servers[0] and servers[2] were never restarted and the coordinator stayed
# alive throughout, so no re-repair could have flushed their memtables.
# Post-repair keys must NOT appear in repaired sstables on these servers.
assert not (repaired_keys_0 & post_repair_key_set), \
f"servers[0] should not have post-repair keys in repaired sstables, " \
f"got: {repaired_keys_0 & post_repair_key_set}"
@@ -1053,6 +1053,54 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
return current_key
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes -> S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
# If leadership transfers to servers[1] between our coordinator check and the
# restart, the coordinator change masks the bug. Detect and retry.
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
current_key = await _do_race_window_promotes_unrepaired_data(
manager, servers, cql, ks, token, scylla_path, current_key)
return
except _LeadershipTransferred as e:
logger.warning(f"Attempt {attempt}/{max_attempts}: {e}. Retrying.")
pytest.fail(f"Leadership kept transferring to servers[1] after {max_attempts} attempts; "
"could not run the test without coordinator interference.")
# ----------------------------------------------------------------------------
# Tombstone GC safety tests

View File

@@ -961,7 +961,7 @@ async def test_tablets_merge_waits_for_lwt(manager: ManagerClient, scale_timeout
logger.info("Wait for the global barrier to start draining on shard0")
await log0.wait_for("\\[shard 0: gms\\] raft_topology - Got raft_topology_cmd::barrier_and_drain", from_mark=m)
# Just to confirm that the guard still holds the erm
matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain done", from_mark=m)
matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain.*done", from_mark=m)
assert len(matches) == 0
# Before the fix, the tablet migration global barrier did not wait for the LWT operation.

View File

@@ -18,7 +18,7 @@ from cassandra.cluster import ConnectionException, ConsistencyLevel, NoHostAvail
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from test.pylib.internal_types import ServerInfo, HostID
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import get_host_api_address, read_barrier
from test.pylib.rest_client import HTTPError, get_host_api_address, read_barrier
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, get_available_host, unique_name
from typing import Optional, List, Union
@@ -119,6 +119,42 @@ async def check_token_ring_and_group0_consistency(manager: ManagerClient) -> Non
assert token_ring_ids == group0_ids
async def wait_for_no_pending_topology_transition(manager: ManagerClient, deadline: float) -> None:
"""Wait until there is no pending topology transition.
Polls system.topology until the transition_state column is null,
indicating that the topology coordinator has finished processing the
current operation (whether it completed successfully or was rolled back).
"""
cql = manager.get_cql()
async def no_transition():
try:
host = await get_available_host(cql, deadline)
await read_barrier(manager.api, get_host_api_address(host))
rs = await cql.run_async(
"select transition_state from system.topology where key = 'topology'",
host=host)
except NoHostAvailable as e:
logger.info(f"Topology transition check failed, retrying: {e}")
return None
except ConnectionException as e:
logger.info(f"Topology transition check failed, retrying: {e}")
return None
except HTTPError as e:
logger.info(f"Read barrier failed, retrying: {e}")
return None
if not rs:
logger.warning(f"Topology transition not visible: system.topology row not found, retrying")
return None
if rs[0].transition_state is not None:
logger.warning(f"Topology transition still in progress: {rs[0].transition_state}")
return None
return True
await wait_for(no_transition, deadline, period=.5)
async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, deadline: float) -> None:
"""
Weaker version of the above check.
@@ -398,13 +434,14 @@ def get_uuid_from_str(string: str) -> str:
async def wait_new_coordinator_elected(manager: ManagerClient, expected_num_of_elections: int, deadline: float) -> None:
"""Wait new coordinator to be elected
Wait while the table 'system.group0_history' will have a number of lines
with the 'new topology coordinator' equal to the expected_num_of_elections number,
Wait while the table 'system.group0_history' will have at least
expected_num_of_elections lines with 'new topology coordinator',
and the latest host_id coordinator differs from the previous one.
"""
async def new_coordinator_elected():
coordinators_ids = await get_coordinator_host_ids(manager)
if len(coordinators_ids) == expected_num_of_elections \
logger.debug(f"Coordinators ids in history: {coordinators_ids}")
if len(coordinators_ids) >= expected_num_of_elections \
and coordinators_ids[0] != coordinators_ids[1]:
return True
logger.warning("New coordinator was not elected %s", coordinators_ids)