diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index ecd2acaccd..74c4853e06 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -1150,6 +1150,17 @@ token_metadata::set_version_tracker(version_tracker_t tracker) { _impl->set_version_tracker(std::move(tracker)); } +version_tracker::version_tracker(utils::phased_barrier::operation op, const token_metadata& tm) + : _op(std::move(op)) + , _version(tm.get_version()) + , _tm(&tm) +{ +} + +long version_tracker::version_use_count() const { + return _tm->use_count(); +} + version_tracker::~version_tracker() { if (_expired_at) { auto now = std::chrono::steady_clock::now(); @@ -1161,8 +1172,8 @@ version_tracker::~version_tracker() { } } -version_tracker shared_token_metadata::new_tracker(token_metadata::version_t version) { - auto tracker = version_tracker(_versions_barrier.start(), version); +version_tracker shared_token_metadata::new_tracker(const token_metadata& tm) { + auto tracker = version_tracker(_versions_barrier.start(), tm); _trackers.push_front(tracker); return tracker; } @@ -1178,6 +1189,18 @@ void shared_token_metadata::clear_and_dispose(std::unique_ptr shared_token_metadata::describe_stale_versions() { + std::unordered_map result; + const auto active_version = _shared.get()->get_version(); + for (const auto& t: _trackers) { + const auto v = t.version(); + if (v < active_version) { + result.emplace(v, t.version_use_count()); + } + } + return result; +} + void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { if (_shared->get_ring_version() >= tmptr->get_ring_version()) { on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version())); @@ -1191,7 +1214,7 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { tmptr->set_shared_token_metadata(*this); _shared = std::move(tmptr); - _shared->set_version_tracker(new_tracker(_shared->get_version())); + _shared->set_version_tracker(new_tracker(*_shared)); for (auto&& v : _trackers) { if (v.version() != _shared->get_version()) { diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 03f88a1d4d..3c77fa241c 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -111,6 +111,7 @@ public: private: utils::phased_barrier::operation _op; service::topology::version_t _version; + const token_metadata* _tm = nullptr; link_type _link; // When engaged it means the version is no longer latest and should be released soon as to @@ -119,8 +120,7 @@ private: std::chrono::steady_clock::duration _log_threshold; public: version_tracker() = default; - version_tracker(utils::phased_barrier::operation op, service::topology::version_t version) - : _op(std::move(op)), _version(version) {} + version_tracker(utils::phased_barrier::operation op, const token_metadata& tm); version_tracker(version_tracker&&) noexcept = default; version_tracker& operator=(version_tracker&& o) noexcept { if (this != &o) { @@ -136,6 +136,8 @@ public: return _version; } + long version_use_count() const; + void mark_expired(std::chrono::steady_clock::duration log_threshold) { if (!_expired_at) { _expired_at = std::chrono::steady_clock::now(); @@ -171,7 +173,7 @@ private: friend class token_metadata_impl; }; -class token_metadata final { +class token_metadata final: public enable_lw_shared_from_this { shared_token_metadata* _shared_token_metadata = nullptr; std::unique_ptr _impl; private: @@ -411,7 +413,7 @@ class shared_token_metadata : public peering_sharded_service>; version_tracker_list_type _trackers; private: - version_tracker new_tracker(token_metadata::version_t); + version_tracker new_tracker(const token_metadata& tm); public: // used to construct the shared object as a sharded<> instance // lock_func returns semaphore_units<> @@ -419,7 +421,7 @@ public: : _shared(make_lw_shared(*this, cfg)) , _lock_func(std::move(lock_func)) { - _shared->set_version_tracker(new_tracker(_shared->get_version())); + _shared->set_version_tracker(new_tracker(*_shared)); } shared_token_metadata(const shared_token_metadata& x) = delete; @@ -446,6 +448,9 @@ public: _stall_detector_threshold = threshold; } + // Returns a map version -> use_count + std::unordered_map describe_stale_versions(); + future<> stale_versions_in_use() const { return _stale_versions_in_use.get_future(); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 0b7f32d6e9..b04e26a57a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5669,8 +5669,9 @@ future storage_service::raft_topology_cmd_handler(raft } co_await container().invoke_on_all([version] (storage_service& ss) -> future<> { const auto current_version = ss._shared_token_metadata.get()->get_version(); - rtlogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}", - version, current_version); + rtlogger.info("Got raft_topology_cmd::barrier_and_drain, version {}, " + "current version {}, stale versions (version: use_count): {}", + version, current_version, ss._shared_token_metadata.describe_stale_versions()); // This shouldn't happen under normal operation, it's only plausible // if the topology change coordinator has @@ -5690,7 +5691,7 @@ future storage_service::raft_topology_cmd_handler(raft co_await ss._shared_token_metadata.stale_versions_in_use(); co_await get_topology_session_manager().drain_closing_sessions(); - rtlogger.debug("raft_topology_cmd::barrier_and_drain done"); + rtlogger.info("raft_topology_cmd::barrier_and_drain done"); }); co_await utils::get_local_injector().inject("raft_topology_barrier_and_drain_fail", [this] (auto& handler) -> future<> {