mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge '[Backport 2025.1] token_metadata: improve stale versions diagnostics' from Gleb Natapov
Backport stale version diagnostics improvement. The patch is already present in all supported versions except 2025.1. Closes scylladb/scylladb#29734 * https://github.com/scylladb/scylladb: token_metadata: improve stale versions diagnostics storage_service: barrier_and_drain – change log level to info
This commit is contained in:
@@ -1150,6 +1150,17 @@ token_metadata::set_version_tracker(version_tracker_t tracker) {
|
||||
_impl->set_version_tracker(std::move(tracker));
|
||||
}
|
||||
|
||||
version_tracker::version_tracker(utils::phased_barrier::operation op, const token_metadata& tm)
|
||||
: _op(std::move(op))
|
||||
, _version(tm.get_version())
|
||||
, _tm(&tm)
|
||||
{
|
||||
}
|
||||
|
||||
long version_tracker::version_use_count() const {
|
||||
return _tm->use_count();
|
||||
}
|
||||
|
||||
version_tracker::~version_tracker() {
|
||||
if (_expired_at) {
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
@@ -1161,8 +1172,8 @@ version_tracker::~version_tracker() {
|
||||
}
|
||||
}
|
||||
|
||||
version_tracker shared_token_metadata::new_tracker(token_metadata::version_t version) {
|
||||
auto tracker = version_tracker(_versions_barrier.start(), version);
|
||||
version_tracker shared_token_metadata::new_tracker(const token_metadata& tm) {
|
||||
auto tracker = version_tracker(_versions_barrier.start(), tm);
|
||||
_trackers.push_front(tracker);
|
||||
return tracker;
|
||||
}
|
||||
@@ -1178,6 +1189,18 @@ void shared_token_metadata::clear_and_dispose(std::unique_ptr<token_metadata_imp
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_map<service::topology::version_t, int> shared_token_metadata::describe_stale_versions() {
|
||||
std::unordered_map<service::topology::version_t, int> result;
|
||||
const auto active_version = _shared.get()->get_version();
|
||||
for (const auto& t: _trackers) {
|
||||
const auto v = t.version();
|
||||
if (v < active_version) {
|
||||
result.emplace(v, t.version_use_count());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
|
||||
if (_shared->get_ring_version() >= tmptr->get_ring_version()) {
|
||||
on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version()));
|
||||
@@ -1191,7 +1214,7 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
|
||||
|
||||
tmptr->set_shared_token_metadata(*this);
|
||||
_shared = std::move(tmptr);
|
||||
_shared->set_version_tracker(new_tracker(_shared->get_version()));
|
||||
_shared->set_version_tracker(new_tracker(*_shared));
|
||||
|
||||
for (auto&& v : _trackers) {
|
||||
if (v.version() != _shared->get_version()) {
|
||||
|
||||
@@ -111,6 +111,7 @@ public:
|
||||
private:
|
||||
utils::phased_barrier::operation _op;
|
||||
service::topology::version_t _version;
|
||||
const token_metadata* _tm = nullptr;
|
||||
link_type _link;
|
||||
|
||||
// When engaged it means the version is no longer latest and should be released soon as to
|
||||
@@ -119,8 +120,7 @@ private:
|
||||
std::chrono::steady_clock::duration _log_threshold;
|
||||
public:
|
||||
version_tracker() = default;
|
||||
version_tracker(utils::phased_barrier::operation op, service::topology::version_t version)
|
||||
: _op(std::move(op)), _version(version) {}
|
||||
version_tracker(utils::phased_barrier::operation op, const token_metadata& tm);
|
||||
version_tracker(version_tracker&&) noexcept = default;
|
||||
version_tracker& operator=(version_tracker&& o) noexcept {
|
||||
if (this != &o) {
|
||||
@@ -136,6 +136,8 @@ public:
|
||||
return _version;
|
||||
}
|
||||
|
||||
long version_use_count() const;
|
||||
|
||||
void mark_expired(std::chrono::steady_clock::duration log_threshold) {
|
||||
if (!_expired_at) {
|
||||
_expired_at = std::chrono::steady_clock::now();
|
||||
@@ -171,7 +173,7 @@ private:
|
||||
friend class token_metadata_impl;
|
||||
};
|
||||
|
||||
class token_metadata final {
|
||||
class token_metadata final: public enable_lw_shared_from_this<token_metadata> {
|
||||
shared_token_metadata* _shared_token_metadata = nullptr;
|
||||
std::unique_ptr<token_metadata_impl> _impl;
|
||||
private:
|
||||
@@ -411,7 +413,7 @@ class shared_token_metadata : public peering_sharded_service<shared_token_metada
|
||||
boost::intrusive::constant_time_size<false>>;
|
||||
version_tracker_list_type _trackers;
|
||||
private:
|
||||
version_tracker new_tracker(token_metadata::version_t);
|
||||
version_tracker new_tracker(const token_metadata& tm);
|
||||
public:
|
||||
// used to construct the shared object as a sharded<> instance
|
||||
// lock_func returns semaphore_units<>
|
||||
@@ -419,7 +421,7 @@ public:
|
||||
: _shared(make_lw_shared<token_metadata>(*this, cfg))
|
||||
, _lock_func(std::move(lock_func))
|
||||
{
|
||||
_shared->set_version_tracker(new_tracker(_shared->get_version()));
|
||||
_shared->set_version_tracker(new_tracker(*_shared));
|
||||
}
|
||||
|
||||
shared_token_metadata(const shared_token_metadata& x) = delete;
|
||||
@@ -446,6 +448,9 @@ public:
|
||||
_stall_detector_threshold = threshold;
|
||||
}
|
||||
|
||||
// Returns a map version -> use_count
|
||||
std::unordered_map<service::topology::version_t, int> describe_stale_versions();
|
||||
|
||||
future<> stale_versions_in_use() const {
|
||||
return _stale_versions_in_use.get_future();
|
||||
}
|
||||
|
||||
@@ -5669,8 +5669,9 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
}
|
||||
co_await container().invoke_on_all([version] (storage_service& ss) -> future<> {
|
||||
const auto current_version = ss._shared_token_metadata.get()->get_version();
|
||||
rtlogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}",
|
||||
version, current_version);
|
||||
rtlogger.info("Got raft_topology_cmd::barrier_and_drain, version {}, "
|
||||
"current version {}, stale versions (version: use_count): {}",
|
||||
version, current_version, ss._shared_token_metadata.describe_stale_versions());
|
||||
|
||||
// This shouldn't happen under normal operation, it's only plausible
|
||||
// if the topology change coordinator has
|
||||
@@ -5690,7 +5691,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
co_await ss._shared_token_metadata.stale_versions_in_use();
|
||||
co_await get_topology_session_manager().drain_closing_sessions();
|
||||
|
||||
rtlogger.debug("raft_topology_cmd::barrier_and_drain done");
|
||||
rtlogger.info("raft_topology_cmd::barrier_and_drain done");
|
||||
});
|
||||
|
||||
co_await utils::get_local_injector().inject("raft_topology_barrier_and_drain_fail", [this] (auto& handler) -> future<> {
|
||||
|
||||
Reference in New Issue
Block a user