diff --git a/service/session.cc b/service/session.cc index ae9acef0ff..1c2a5bb343 100644 --- a/service/session.cc +++ b/service/session.cc @@ -9,6 +9,7 @@ #include "service/session.hh" #include "utils/log.hh" #include +#include namespace service { @@ -58,18 +59,35 @@ void session_manager::initiate_close_of_sessions_except(const std::unordered_set } future<> session_manager::drain_closing_sessions() { + slogger.info("drain_closing_sessions: waiting for lock"); + seastar::timer lock_timer([this] { + slogger.warn("drain_closing_sessions: still waiting for lock, available units {}", + _session_drain_sem.available_units()); + }); + lock_timer.arm_periodic(std::chrono::minutes(5)); auto lock = co_await get_units(_session_drain_sem, 1); + lock_timer.cancel(); + auto n = std::distance(_closing_sessions.begin(), _closing_sessions.end()); + slogger.info("drain_closing_sessions: acquired lock, {} sessions to drain", n); auto i = _closing_sessions.begin(); while (i != _closing_sessions.end()) { session& s = *i; ++i; auto id = s.id(); - slogger.debug("draining session {}", id); + slogger.info("drain_closing_sessions: waiting for session {} to close, gate count {}", id, s.gate_count()); + std::optional> warn_timer; + warn_timer.emplace([&s, id] { + slogger.warn("drain_closing_sessions: session {} still not closed, gate count {}", + id, s.gate_count()); + }); + warn_timer->arm_periodic(std::chrono::minutes(5)); co_await s.close(); + warn_timer.reset(); if (_sessions.erase(id)) { - slogger.debug("session {} closed", id); + slogger.info("drain_closing_sessions: session {} closed", id); } } + slogger.info("drain_closing_sessions: done"); } } // namespace service diff --git a/service/session.hh b/service/session.hh index 90792e9c8c..2dedf4a5af 100644 --- a/service/session.hh +++ b/service/session.hh @@ -95,6 +95,10 @@ public: return _id; } + size_t gate_count() const { + return _gate.get_count(); + } + /// Post-condition of successfully resolved future: There are no guards alive for this session, and /// and it's impossible to create more such guards later. /// Can be called concurrently. diff --git a/service/storage_service.cc b/service/storage_service.cc index 6b264b8e04..59d17a1b4c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4494,10 +4494,20 @@ future<> storage_service::local_topology_barrier() { version, current_version))); } - co_await ss._shared_token_metadata.stale_versions_in_use(); + rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: waiting for stale token metadata versions to be released", version); + { + seastar::timer warn_timer([&ss, version] { + rtlogger.warn("raft_topology_cmd::barrier_and_drain version {}: still waiting for stale versions, " + "stale versions (version: use_count): {}", + version, ss._shared_token_metadata.describe_stale_versions()); + }); + warn_timer.arm_periodic(std::chrono::minutes(5)); + co_await ss._shared_token_metadata.stale_versions_in_use(); + } + rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: stale versions released, draining closing sessions", version); co_await get_topology_session_manager().drain_closing_sessions(); - rtlogger.info("raft_topology_cmd::barrier_and_drain done"); + rtlogger.info("raft_topology_cmd::barrier_and_drain version {}: done", version); }); } @@ -4509,7 +4519,9 @@ future storage_service::raft_topology_cmd_handler(raft auto& raft_server = _group0->group0_server(); auto group0_holder = _group0->hold_group0_gate(); // do barrier to make sure we always see the latest topology + rtlogger.info("topology cmd rpc {} index={}: starting read_barrier, term={}", cmd.cmd, cmd_index, term); co_await raft_server.read_barrier(&_group0_as); + rtlogger.info("topology cmd rpc {} index={}: read_barrier completed", cmd.cmd, cmd_index); if (raft_server.get_current_term() != term) { // Return an error since the command is from outdated leader co_return result; diff --git a/test/cluster/test_alternator.py b/test/cluster/test_alternator.py index 3bdf009171..c73d339272 100644 --- a/test/cluster/test_alternator.py +++ b/test/cluster/test_alternator.py @@ -1379,7 +1379,7 @@ async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient): # The next barrier must be for the write_both_read_new, we need a guarantee # that the src_shard observed it logger.info("Waiting for the next barrier") - await log.wait_for(re.escape(f"[shard {src_shard}: gms] raft_topology - raft_topology_cmd::barrier_and_drain done"), + await log.wait_for(f"\\[shard {src_shard}: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain.*done", from_mark=m) # Now we have a guarantee that a new barrier succeeded on the src_shard, diff --git a/test/cluster/test_tablets_lwt.py b/test/cluster/test_tablets_lwt.py index 01993be8ef..95418a6104 100644 --- a/test/cluster/test_tablets_lwt.py +++ b/test/cluster/test_tablets_lwt.py @@ -961,7 +961,7 @@ async def test_tablets_merge_waits_for_lwt(manager: ManagerClient, scale_timeout logger.info("Wait for the global barrier to start draining on shard0") await log0.wait_for("\\[shard 0: gms\\] raft_topology - Got raft_topology_cmd::barrier_and_drain", from_mark=m) # Just to confirm that the guard still holds the erm - matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain done", from_mark=m) + matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain.*done", from_mark=m) assert len(matches) == 0 # Before the fix, the tablet migration global barrier did not wait for the LWT operation.