Merge 'Barrier and drain logging' from Gleb Natapov

Add more logging to barrier and drain rpc to try and pinpoint https://github.com/scylladb/scylladb/issues/26281

Bakport since we want to have it if it happens in the field.

Fixes: SCYLLADB-1836
Refs: #26281

Closes scylladb/scylladb#29735

* https://github.com/scylladb/scylladb:
  session, raft_topology: add periodic warnings for hung drain and stale version waits
  session: add info-level logging to drain_closing_sessions
  raft_topology: log sub-step progress in local_topology_barrier
  raft_topology: log read_barrier progress in topology cmd handler

(cherry picked from commit b69d00b0a7)

Closes scylladb/scylladb#29763
This commit is contained in:
Patryk Jędrzejczak
2026-05-05 15:04:50 +02:00
parent 5c8662d606
commit 6d09897339
5 changed files with 40 additions and 6 deletions

View File

@@ -9,6 +9,7 @@
#include "service/session.hh"
#include "utils/log.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/core/timer.hh>
namespace service {
@@ -58,18 +59,35 @@ void session_manager::initiate_close_of_sessions_except(const std::unordered_set
}
future<> session_manager::drain_closing_sessions() {
slogger.info("drain_closing_sessions: waiting for lock");
seastar::timer<lowres_clock> lock_timer([this] {
slogger.warn("drain_closing_sessions: still waiting for lock, available units {}",
_session_drain_sem.available_units());
});
lock_timer.arm_periodic(std::chrono::minutes(5));
auto lock = co_await get_units(_session_drain_sem, 1);
lock_timer.cancel();
auto n = std::distance(_closing_sessions.begin(), _closing_sessions.end());
slogger.info("drain_closing_sessions: acquired lock, {} sessions to drain", n);
auto i = _closing_sessions.begin();
while (i != _closing_sessions.end()) {
session& s = *i;
++i;
auto id = s.id();
slogger.debug("draining session {}", id);
slogger.info("drain_closing_sessions: waiting for session {} to close, gate count {}", id, s.gate_count());
std::optional<seastar::timer<lowres_clock>> warn_timer;
warn_timer.emplace([&s, id] {
slogger.warn("drain_closing_sessions: session {} still not closed, gate count {}",
id, s.gate_count());
});
warn_timer->arm_periodic(std::chrono::minutes(5));
co_await s.close();
warn_timer.reset();
if (_sessions.erase(id)) {
slogger.debug("session {} closed", id);
slogger.info("drain_closing_sessions: session {} closed", id);
}
}
slogger.info("drain_closing_sessions: done");
}
} // namespace service

View File

@@ -95,6 +95,10 @@ public:
return _id;
}
size_t gate_count() const {
return _gate.get_count();
}
/// Post-condition of successfully resolved future: There are no guards alive for this session, and
/// and it's impossible to create more such guards later.
/// Can be called concurrently.

View File

@@ -4494,10 +4494,20 @@ future<> storage_service::local_topology_barrier() {
version, current_version)));
}
co_await ss._shared_token_metadata.stale_versions_in_use();
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: waiting for stale token metadata versions to be released", version);
{
seastar::timer<lowres_clock> warn_timer([&ss, version] {
rtlogger.warn("raft_topology_cmd::barrier_and_drain version {}: still waiting for stale versions, "
"stale versions (version: use_count): {}",
version, ss._shared_token_metadata.describe_stale_versions());
});
warn_timer.arm_periodic(std::chrono::minutes(5));
co_await ss._shared_token_metadata.stale_versions_in_use();
}
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: stale versions released, draining closing sessions", version);
co_await get_topology_session_manager().drain_closing_sessions();
rtlogger.info("raft_topology_cmd::barrier_and_drain done");
rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: done", version);
});
}
@@ -4509,7 +4519,9 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
auto& raft_server = _group0->group0_server();
auto group0_holder = _group0->hold_group0_gate();
// do barrier to make sure we always see the latest topology
rtlogger.info("topology cmd rpc {} index={}: starting read_barrier, term={}", cmd.cmd, cmd_index, term);
co_await raft_server.read_barrier(&_group0_as);
rtlogger.info("topology cmd rpc {} index={}: read_barrier completed", cmd.cmd, cmd_index);
if (raft_server.get_current_term() != term) {
// Return an error since the command is from outdated leader
co_return result;

View File

@@ -1379,7 +1379,7 @@ async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
# The next barrier must be for the write_both_read_new, we need a guarantee
# that the src_shard observed it
logger.info("Waiting for the next barrier")
await log.wait_for(re.escape(f"[shard {src_shard}: gms] raft_topology - raft_topology_cmd::barrier_and_drain done"),
await log.wait_for(f"\\[shard {src_shard}: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain.*done",
from_mark=m)
# Now we have a guarantee that a new barrier succeeded on the src_shard,

View File

@@ -961,7 +961,7 @@ async def test_tablets_merge_waits_for_lwt(manager: ManagerClient, scale_timeout
logger.info("Wait for the global barrier to start draining on shard0")
await log0.wait_for("\\[shard 0: gms\\] raft_topology - Got raft_topology_cmd::barrier_and_drain", from_mark=m)
# Just to confirm that the guard still holds the erm
matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain done", from_mark=m)
matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain.*done", from_mark=m)
assert len(matches) == 0
# Before the fix, the tablet migration global barrier did not wait for the LWT operation.