Before waiting on stale_versions_in_use(), we log the stale versions the barrier_and_drain handler will wait for, along with the number of token_metadata references representing each version. To achieve this, we store a pointer to token_metadata in version_tracker, traverse the _trackers list, and output all items with a version smaller than the latest. Since token_metadata contains the version_tracker instance, it is guaranteed to remain alive during traversal. To count references, token_metadata now inherits from enable_lw_shared_from_this. This helps diagnose tablet migration stalls and allows more deterministic tests: when a barrier is expected to block, we can verify that the log contains the expected stale versions rather than checking that the barrier_and_drain is blocked on stale_versions_in_use() for a fixed amount of time.
1370 lines
46 KiB
C++
1370 lines
46 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "token_metadata.hh"
|
|
#include <optional>
|
|
#include "locator/snitch_base.hh"
|
|
#include "locator/abstract_replication_strategy.hh"
|
|
#include "locator/tablets.hh"
|
|
#include "utils/chunked_vector.hh"
|
|
#include "utils/log.hh"
|
|
#include "partition_range_compat.hh"
|
|
#include <unordered_map>
|
|
#include <algorithm>
|
|
#include <boost/icl/interval.hpp>
|
|
#include <boost/icl/interval_map.hpp>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <seastar/core/smp.hh>
|
|
#include "utils/assert.hh"
|
|
#include "utils/stall_free.hh"
|
|
#include "gms/gossiper.hh"
|
|
namespace locator {
|
|
|
|
static logging::logger tlogger("token_metadata");
|
|
|
|
template <typename C, typename V>
|
|
static void remove_by_value(C& container, V value) {
|
|
for (auto it = container.begin(); it != container.end();) {
|
|
if (it->second == value) {
|
|
it = container.erase(it);
|
|
} else {
|
|
it++;
|
|
}
|
|
}
|
|
}
|
|
|
|
class token_metadata_impl final {
|
|
private:
|
|
/**
|
|
* Maintains token to endpoint map of every node in the cluster.
|
|
* Each Token is associated with exactly one Address, but each Address may have
|
|
* multiple tokens. Hence, the BiMultiValMap collection.
|
|
*/
|
|
// FIXME: have to be BiMultiValMap
|
|
std::unordered_map<token, host_id> _token_to_endpoint_map;
|
|
|
|
// Track the unique set of nodes in _token_to_endpoint_map
|
|
std::unordered_set<host_id> _normal_token_owners;
|
|
|
|
std::unordered_map<token, host_id> _bootstrap_tokens;
|
|
std::unordered_set<host_id> _leaving_endpoints;
|
|
// The map between the existing node to be replaced and the replacing node
|
|
std::unordered_map<host_id, host_id> _replacing_endpoints;
|
|
|
|
std::optional<topology_change_info> _topology_change_info;
|
|
|
|
utils::chunked_vector<token> _sorted_tokens;
|
|
|
|
tablet_metadata _tablets;
|
|
|
|
topology _topology;
|
|
|
|
token_metadata::read_new_t _read_new = token_metadata::read_new_t::no;
|
|
|
|
long _ring_version = 0;
|
|
static thread_local long _static_ring_version;
|
|
|
|
// Zero means that token_metadata versions are not supported,
|
|
// this will be used in RPC handling to decide whether we
|
|
// need to apply fencing or not.
|
|
// The initial valid version is 1;
|
|
token_metadata::version_t _version = 0;
|
|
token_metadata::version_tracker_t _version_tracker;
|
|
|
|
// Note: if any member is added to this class
|
|
// clone_async() must be updated to copy that member.
|
|
|
|
void sort_tokens();
|
|
|
|
const tablet_metadata& tablets() const { return _tablets; }
|
|
tablet_metadata& tablets() { return _tablets; }
|
|
|
|
void set_tablets(tablet_metadata&& tablets) {
|
|
_tablets = std::move(tablets);
|
|
invalidate_cached_rings();
|
|
}
|
|
|
|
struct shallow_copy {};
|
|
public:
|
|
token_metadata_impl(shallow_copy, const token_metadata_impl& o) noexcept
|
|
: _topology(topology::shallow_copy{}, topology::config{})
|
|
{}
|
|
token_metadata_impl(token_metadata::config cfg) noexcept : _topology(std::move(cfg.topo_cfg)) {};
|
|
token_metadata_impl(const token_metadata_impl&) = delete; // it's too huge for direct copy, use clone_async()
|
|
token_metadata_impl(token_metadata_impl&&) noexcept = default;
|
|
const utils::chunked_vector<token>& sorted_tokens() const;
|
|
future<> update_normal_tokens(std::unordered_set<token> tokens, host_id endpoint);
|
|
const token& first_token(const token& start) const;
|
|
size_t first_token_index(const token& start) const;
|
|
std::optional<host_id> get_endpoint(const token& token) const;
|
|
std::vector<token> get_tokens(const host_id& addr) const;
|
|
const std::unordered_map<token, host_id>& get_token_to_endpoint() const {
|
|
return _token_to_endpoint_map;
|
|
}
|
|
|
|
const std::unordered_set<host_id>& get_leaving_endpoints() const {
|
|
return _leaving_endpoints;
|
|
}
|
|
|
|
const std::unordered_map<token, host_id>& get_bootstrap_tokens() const {
|
|
return _bootstrap_tokens;
|
|
}
|
|
|
|
void update_topology(host_id id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st, std::optional<shard_id> shard_count = std::nullopt) {
|
|
_topology.add_or_update_endpoint(id, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
|
|
}
|
|
|
|
/**
|
|
* Creates an iterable range of the sorted tokens starting at the token next
|
|
* after the given one.
|
|
*
|
|
* @param start A token that will define the beginning of the range
|
|
*
|
|
* @return The requested range (see the description above)
|
|
*/
|
|
std::ranges::subrange<token_metadata::tokens_iterator> ring_range(const token& start) const;
|
|
|
|
std::ranges::subrange<token_metadata::tokens_iterator> ring_range(dht::ring_position_view pos) const;
|
|
|
|
topology& get_topology() {
|
|
return _topology;
|
|
}
|
|
|
|
const topology& get_topology() const {
|
|
return _topology;
|
|
}
|
|
|
|
void debug_show() const;
|
|
|
|
/** @return a copy of host id set for read-only operations */
|
|
std::unordered_set<host_id> get_host_ids() const;
|
|
|
|
void add_bootstrap_token(token t, host_id endpoint);
|
|
|
|
void add_bootstrap_tokens(std::unordered_set<token> tokens, host_id endpoint);
|
|
|
|
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
|
|
|
|
void add_leaving_endpoint(host_id endpoint);
|
|
void del_leaving_endpoint(host_id endpoint);
|
|
public:
|
|
void remove_endpoint(host_id endpoint);
|
|
|
|
bool is_normal_token_owner(host_id endpoint) const;
|
|
|
|
bool is_leaving(host_id endpoint) const;
|
|
|
|
// Is this node being replaced by another node
|
|
bool is_being_replaced(host_id endpoint) const;
|
|
|
|
// Is any node being replaced by another node
|
|
bool is_any_node_being_replaced() const;
|
|
|
|
void add_replacing_endpoint(host_id existing_node, host_id replacing_node);
|
|
|
|
void del_replacing_endpoint(host_id existing_node);
|
|
public:
|
|
|
|
/**
|
|
* Create a full copy of token_metadata_impl using asynchronous continuations.
|
|
* The caller must ensure that the cloned object will not change if
|
|
* the function yields.
|
|
*/
|
|
future<std::unique_ptr<token_metadata_impl>> clone_async() const noexcept;
|
|
|
|
/**
|
|
* Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
|
|
* bootstrap tokens and leaving endpoints are not included in the copy.
|
|
* The caller must ensure that the cloned object will not change if
|
|
* the function yields.
|
|
*/
|
|
future<std::unique_ptr<token_metadata_impl>> clone_only_token_map(bool clone_sorted_tokens = true) const noexcept;
|
|
|
|
/**
|
|
* Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
|
|
* current leave operations have finished.
|
|
*
|
|
* @return new token metadata
|
|
*/
|
|
future<std::unique_ptr<token_metadata_impl>> clone_after_all_left() const noexcept {
|
|
return clone_only_token_map(false).then([this] (std::unique_ptr<token_metadata_impl> all_left_metadata) {
|
|
for (auto endpoint : _leaving_endpoints) {
|
|
all_left_metadata->remove_endpoint(endpoint);
|
|
}
|
|
all_left_metadata->sort_tokens();
|
|
|
|
return all_left_metadata;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Destroy the token_metadata members using continuations
|
|
* to prevent reactor stalls.
|
|
*/
|
|
future<> clear_gently() noexcept;
|
|
|
|
public:
|
|
dht::token_range_vector get_primary_ranges_for(std::unordered_set<token> tokens) const;
|
|
|
|
dht::token_range_vector get_primary_ranges_for(token right) const;
|
|
static boost::icl::interval<token>::interval_type range_to_interval(wrapping_interval<dht::token> r);
|
|
static wrapping_interval<dht::token> interval_to_range(boost::icl::interval<token>::interval_type i);
|
|
|
|
public:
|
|
future<> update_topology_change_info(dc_rack_fn& get_dc_rack);
|
|
const std::optional<topology_change_info>& get_topology_change_info() const {
|
|
return _topology_change_info;
|
|
}
|
|
public:
|
|
|
|
token get_predecessor(token t) const;
|
|
|
|
// Returns nodes that are officially part of the ring. It does not include:
|
|
// - nodes that are still joining the cluster, e.g., a node that is still
|
|
// streaming data before it finishes the bootstrap process and turns into
|
|
// NORMAL status,
|
|
// - zero-token nodes (the ones with join_ring=false).
|
|
const std::unordered_set<host_id>& get_normal_token_owners() const noexcept {
|
|
return _normal_token_owners;
|
|
}
|
|
|
|
void for_each_token_owner(std::function<void(const node&)> func) const;
|
|
|
|
/* Returns the number of different endpoints that own tokens in the ring.
|
|
* Bootstrapping tokens are not taken into account. */
|
|
size_t count_normal_token_owners() const;
|
|
|
|
std::unordered_map<sstring, std::unordered_set<host_id>> get_datacenter_token_owners() const;
|
|
|
|
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<host_id>>>
|
|
get_datacenter_racks_token_owners() const;
|
|
|
|
std::unordered_map<sstring, std::unordered_set<std::reference_wrapper<const node>>> get_datacenter_token_owners_nodes() const;
|
|
|
|
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<std::reference_wrapper<const node>>>>
|
|
get_datacenter_racks_token_owners_nodes() const;
|
|
private:
|
|
future<> update_normal_token_owners();
|
|
public:
|
|
void set_read_new(token_metadata::read_new_t read_new) {
|
|
_read_new = read_new;
|
|
}
|
|
|
|
public:
|
|
long get_ring_version() const {
|
|
return _ring_version;
|
|
}
|
|
|
|
void invalidate_cached_rings() {
|
|
_ring_version = ++_static_ring_version;
|
|
tlogger.debug("ring_version={}", _ring_version);
|
|
}
|
|
|
|
token_metadata::version_t get_version() const {
|
|
return _version;
|
|
}
|
|
void set_version(token_metadata::version_t version) {
|
|
if (version <= 0) {
|
|
on_internal_error(tlogger,
|
|
format("token_metadata_impl::set_version: invalid new version {}", version));
|
|
}
|
|
if (version < _version) {
|
|
on_internal_error(tlogger,
|
|
format("token_metadata_impl::set_version: new version can't be smaller than the previous one, "
|
|
"new version {}, previous version {}", version, _version));
|
|
}
|
|
_version = version;
|
|
}
|
|
void set_version_tracker(token_metadata::version_tracker_t tracker) {
|
|
_version_tracker = std::move(tracker);
|
|
}
|
|
|
|
friend class token_metadata;
|
|
};
|
|
|
|
thread_local long token_metadata_impl::_static_ring_version;
|
|
|
|
tokens_iterator::tokens_iterator(const token& start, const token_metadata_impl* token_metadata)
|
|
: _token_metadata(token_metadata) {
|
|
_cur_it = _token_metadata->sorted_tokens().begin() + _token_metadata->first_token_index(start);
|
|
_remaining = _token_metadata->sorted_tokens().size();
|
|
}
|
|
|
|
bool tokens_iterator::operator==(const tokens_iterator& it) const {
|
|
return _remaining == it._remaining;
|
|
}
|
|
|
|
const token& tokens_iterator::operator*() const {
|
|
return *_cur_it;
|
|
}
|
|
|
|
tokens_iterator& tokens_iterator::operator++() {
|
|
++_cur_it;
|
|
if (_cur_it == _token_metadata->sorted_tokens().end()) {
|
|
_cur_it = _token_metadata->sorted_tokens().begin();
|
|
}
|
|
--_remaining;
|
|
return *this;
|
|
}
|
|
|
|
host_id token_metadata::get_my_id() const {
|
|
return get_topology().get_config().this_host_id;
|
|
}
|
|
|
|
inline
|
|
std::ranges::subrange<token_metadata::tokens_iterator>
|
|
token_metadata_impl::ring_range(const token& start) const {
|
|
auto begin = token_metadata::tokens_iterator(start, this);
|
|
auto end = token_metadata::tokens_iterator();
|
|
return std::ranges::subrange(begin, end);
|
|
}
|
|
|
|
future<std::unique_ptr<token_metadata_impl>> token_metadata_impl::clone_async() const noexcept {
|
|
auto ret = co_await clone_only_token_map();
|
|
ret->_bootstrap_tokens.reserve(_bootstrap_tokens.size());
|
|
for (const auto& p : _bootstrap_tokens) {
|
|
ret->_bootstrap_tokens.emplace(p);
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
ret->_leaving_endpoints = _leaving_endpoints;
|
|
ret->_replacing_endpoints = _replacing_endpoints;
|
|
ret->_ring_version = _ring_version;
|
|
ret->_version = _version;
|
|
co_return ret;
|
|
}
|
|
|
|
future<std::unique_ptr<token_metadata_impl>> token_metadata_impl::clone_only_token_map(bool clone_sorted_tokens) const noexcept {
|
|
auto ret = std::make_unique<token_metadata_impl>(shallow_copy{}, *this);
|
|
ret->_token_to_endpoint_map.reserve(_token_to_endpoint_map.size());
|
|
for (const auto& p : _token_to_endpoint_map) {
|
|
ret->_token_to_endpoint_map.emplace(p);
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
ret->_normal_token_owners = _normal_token_owners;
|
|
ret->_topology = co_await _topology.clone_gently();
|
|
if (clone_sorted_tokens) {
|
|
ret->_sorted_tokens = _sorted_tokens;
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
ret->_tablets = co_await _tablets.copy();
|
|
ret->_read_new = _read_new;
|
|
co_return ret;
|
|
}
|
|
|
|
future<> token_metadata_impl::clear_gently() noexcept {
|
|
_version_tracker = {};
|
|
co_await utils::clear_gently(_token_to_endpoint_map);
|
|
co_await utils::clear_gently(_normal_token_owners);
|
|
co_await utils::clear_gently(_bootstrap_tokens);
|
|
co_await utils::clear_gently(_leaving_endpoints);
|
|
co_await utils::clear_gently(_replacing_endpoints);
|
|
co_await utils::clear_gently(_sorted_tokens);
|
|
co_await _topology.clear_gently();
|
|
co_await _tablets.clear_gently();
|
|
co_return;
|
|
}
|
|
|
|
void token_metadata_impl::sort_tokens() {
|
|
utils::chunked_vector<token> sorted;
|
|
sorted.reserve(_token_to_endpoint_map.size());
|
|
|
|
for (auto&& i : _token_to_endpoint_map) {
|
|
sorted.push_back(i.first);
|
|
}
|
|
|
|
std::sort(sorted.begin(), sorted.end());
|
|
|
|
_sorted_tokens = std::move(sorted);
|
|
}
|
|
|
|
const tablet_metadata& token_metadata::tablets() const {
|
|
return _impl->tablets();
|
|
}
|
|
|
|
tablet_metadata& token_metadata::tablets() {
|
|
return _impl->tablets();
|
|
}
|
|
|
|
void token_metadata::set_tablets(tablet_metadata tm) {
|
|
_impl->set_tablets(std::move(tm));
|
|
}
|
|
|
|
const utils::chunked_vector<token>& token_metadata_impl::sorted_tokens() const {
|
|
return _sorted_tokens;
|
|
}
|
|
|
|
std::vector<token> token_metadata_impl::get_tokens(const host_id& addr) const {
|
|
std::vector<token> res;
|
|
for (auto&& i : _token_to_endpoint_map) {
|
|
if (i.second == addr) {
|
|
res.push_back(i.first);
|
|
}
|
|
}
|
|
std::sort(res.begin(), res.end());
|
|
return res;
|
|
}
|
|
|
|
future<> token_metadata_impl::update_normal_tokens(std::unordered_set<token> tokens, host_id endpoint) {
|
|
if (tokens.empty()) {
|
|
co_return;
|
|
}
|
|
|
|
if (!_topology.has_node(endpoint)) {
|
|
on_internal_error(tlogger, format("token_metadata_impl: {} must be a member of topology to update normal tokens", endpoint));
|
|
}
|
|
|
|
bool should_sort_tokens = false;
|
|
|
|
// Phase 1: erase all tokens previously owned by the endpoint.
|
|
for(auto it = _token_to_endpoint_map.begin(), ite = _token_to_endpoint_map.end(); it != ite;) {
|
|
co_await coroutine::maybe_yield();
|
|
if(it->second == endpoint) {
|
|
auto tokit = tokens.find(it->first);
|
|
if (tokit == tokens.end()) {
|
|
// token no longer owned by endpoint
|
|
it = _token_to_endpoint_map.erase(it);
|
|
continue;
|
|
}
|
|
// token ownership did not change,
|
|
// no further update needed for it.
|
|
tokens.erase(tokit);
|
|
}
|
|
++it;
|
|
}
|
|
|
|
// Phase 2:
|
|
// a. ...
|
|
// b. update pending _bootstrap_tokens and _leaving_endpoints
|
|
// c. update _token_to_endpoint_map with the new endpoint->token mappings
|
|
// - set `should_sort_tokens` if new tokens were added
|
|
remove_by_value(_bootstrap_tokens, endpoint);
|
|
_leaving_endpoints.erase(endpoint);
|
|
invalidate_cached_rings();
|
|
for (const token& t : tokens)
|
|
{
|
|
co_await coroutine::maybe_yield();
|
|
auto prev = _token_to_endpoint_map.insert(std::pair<token, host_id>(t, endpoint));
|
|
should_sort_tokens |= prev.second; // new token inserted -> sort
|
|
if (prev.first->second != endpoint) {
|
|
tlogger.debug("Token {} changing ownership from {} to {}", t, prev.first->second, endpoint);
|
|
prev.first->second = endpoint;
|
|
}
|
|
}
|
|
|
|
co_await update_normal_token_owners();
|
|
|
|
// New tokens were added to _token_to_endpoint_map
|
|
// so re-sort all tokens.
|
|
if (should_sort_tokens) {
|
|
sort_tokens();
|
|
}
|
|
co_return;
|
|
}
|
|
|
|
size_t token_metadata_impl::first_token_index(const token& start) const {
|
|
if (_sorted_tokens.empty()) {
|
|
auto msg = format("sorted_tokens is empty in first_token_index!");
|
|
tlogger.error("{}", msg);
|
|
throw std::runtime_error(msg);
|
|
}
|
|
auto it = std::lower_bound(_sorted_tokens.begin(), _sorted_tokens.end(), start);
|
|
if (it == _sorted_tokens.end()) {
|
|
return 0;
|
|
} else {
|
|
return std::distance(_sorted_tokens.begin(), it);
|
|
}
|
|
}
|
|
|
|
const token& token_metadata_impl::first_token(const token& start) const {
|
|
return _sorted_tokens[first_token_index(start)];
|
|
}
|
|
|
|
std::optional<host_id> token_metadata_impl::get_endpoint(const token& token) const {
|
|
auto it = _token_to_endpoint_map.find(token);
|
|
if (it == _token_to_endpoint_map.end()) {
|
|
return std::nullopt;
|
|
} else {
|
|
return it->second;
|
|
}
|
|
}
|
|
|
|
void token_metadata_impl::debug_show() const {
|
|
auto reporter = std::make_shared<timer<lowres_clock>>();
|
|
reporter->set_callback ([reporter, this] {
|
|
fmt::print("Endpoint -> Token\n");
|
|
for (auto x : _token_to_endpoint_map) {
|
|
fmt::print("inet_address={}, token={}\n", x.second, x.first);
|
|
}
|
|
fmt::print("Sorted Token\n");
|
|
for (auto x : _sorted_tokens) {
|
|
fmt::print("token={}\n", x);
|
|
}
|
|
});
|
|
reporter->arm_periodic(std::chrono::seconds(1));
|
|
}
|
|
|
|
std::unordered_set<host_id> token_metadata_impl::get_host_ids() const {
|
|
return _topology.get_nodes() |
|
|
std::views::filter([&] (const node& n) { return !n.left() && !n.is_none(); }) |
|
|
std::views::transform([] (const node& n) { return n.host_id(); }) |
|
|
std::ranges::to<std::unordered_set>();
|
|
}
|
|
|
|
bool token_metadata_impl::is_normal_token_owner(host_id endpoint) const {
|
|
return _normal_token_owners.contains(endpoint);
|
|
}
|
|
|
|
void token_metadata_impl::add_bootstrap_token(token t, host_id endpoint) {
|
|
std::unordered_set<token> tokens{t};
|
|
add_bootstrap_tokens(tokens, endpoint);
|
|
}
|
|
|
|
std::ranges::subrange<token_metadata::tokens_iterator>
|
|
token_metadata_impl::ring_range(const dht::ring_position_view start) const {
|
|
return ring_range(start.token());
|
|
}
|
|
|
|
void token_metadata_impl::add_bootstrap_tokens(std::unordered_set<token> tokens, host_id endpoint) {
|
|
for (auto t : tokens) {
|
|
auto old_endpoint = _bootstrap_tokens.find(t);
|
|
if (old_endpoint != _bootstrap_tokens.end() && (*old_endpoint).second != endpoint) {
|
|
auto msg = format("Bootstrap Token collision between {} and {} (token {}", (*old_endpoint).second, endpoint, t);
|
|
throw std::runtime_error(msg);
|
|
}
|
|
|
|
auto old_endpoint2 = _token_to_endpoint_map.find(t);
|
|
if (old_endpoint2 != _token_to_endpoint_map.end() && (*old_endpoint2).second != endpoint) {
|
|
auto msg = format("Bootstrap Token collision between {} and {} (token {}", (*old_endpoint2).second, endpoint, t);
|
|
throw std::runtime_error(msg);
|
|
}
|
|
}
|
|
|
|
std::erase_if(_bootstrap_tokens, [endpoint] (const std::pair<token, host_id>& n) { return n.second == endpoint; });
|
|
|
|
for (auto t : tokens) {
|
|
_bootstrap_tokens[t] = endpoint;
|
|
}
|
|
}
|
|
|
|
void token_metadata_impl::remove_bootstrap_tokens(std::unordered_set<token> tokens) {
|
|
if (tokens.empty()) {
|
|
tlogger.warn("tokens is empty in remove_bootstrap_tokens!");
|
|
return;
|
|
}
|
|
for (auto t : tokens) {
|
|
_bootstrap_tokens.erase(t);
|
|
}
|
|
}
|
|
|
|
bool token_metadata_impl::is_leaving(host_id endpoint) const {
|
|
return _leaving_endpoints.contains(endpoint);
|
|
}
|
|
|
|
bool token_metadata_impl::is_being_replaced(host_id endpoint) const {
|
|
return _replacing_endpoints.contains(endpoint);
|
|
}
|
|
|
|
bool token_metadata_impl::is_any_node_being_replaced() const {
|
|
return !_replacing_endpoints.empty();
|
|
}
|
|
|
|
void token_metadata_impl::remove_endpoint(host_id endpoint) {
|
|
remove_by_value(_bootstrap_tokens, endpoint);
|
|
remove_by_value(_token_to_endpoint_map, endpoint);
|
|
_normal_token_owners.erase(endpoint);
|
|
_topology.remove_endpoint(endpoint);
|
|
_leaving_endpoints.erase(endpoint);
|
|
del_replacing_endpoint(endpoint);
|
|
invalidate_cached_rings();
|
|
}
|
|
|
|
token token_metadata_impl::get_predecessor(token t) const {
|
|
auto& tokens = sorted_tokens();
|
|
auto it = std::lower_bound(tokens.begin(), tokens.end(), t);
|
|
if (it == tokens.end() || *it != t) {
|
|
auto msg = format("token error in get_predecessor!");
|
|
tlogger.error("{}", msg);
|
|
throw std::runtime_error(msg);
|
|
}
|
|
if (it == tokens.begin()) {
|
|
// If the token is the first element, its preprocessor is the last element
|
|
return tokens.back();
|
|
} else {
|
|
return *(--it);
|
|
}
|
|
}
|
|
|
|
dht::token_range_vector token_metadata_impl::get_primary_ranges_for(std::unordered_set<token> tokens) const {
|
|
dht::token_range_vector ranges;
|
|
ranges.reserve(tokens.size() + 1); // one of the ranges will wrap
|
|
for (auto right : tokens) {
|
|
auto left = get_predecessor(right);
|
|
::compat::unwrap_into(
|
|
wrapping_interval<token>(interval_bound<token>(left, false), interval_bound<token>(right)),
|
|
dht::token_comparator(),
|
|
[&] (auto&& rng) { ranges.push_back(std::move(rng)); });
|
|
}
|
|
return ranges;
|
|
}
|
|
|
|
dht::token_range_vector token_metadata_impl::get_primary_ranges_for(token right) const {
|
|
return get_primary_ranges_for(std::unordered_set<token>{right});
|
|
}
|
|
|
|
boost::icl::interval<token>::interval_type
|
|
token_metadata_impl::range_to_interval(wrapping_interval<dht::token> r) {
|
|
bool start_inclusive = false;
|
|
bool end_inclusive = false;
|
|
token start = dht::minimum_token();
|
|
token end = dht::maximum_token();
|
|
|
|
if (r.start()) {
|
|
start = r.start()->value();
|
|
start_inclusive = r.start()->is_inclusive();
|
|
}
|
|
|
|
if (r.end()) {
|
|
end = r.end()->value();
|
|
end_inclusive = r.end()->is_inclusive();
|
|
}
|
|
|
|
if (start_inclusive == false && end_inclusive == false) {
|
|
return boost::icl::interval<token>::open(std::move(start), std::move(end));
|
|
} else if (start_inclusive == false && end_inclusive == true) {
|
|
return boost::icl::interval<token>::left_open(std::move(start), std::move(end));
|
|
} else if (start_inclusive == true && end_inclusive == false) {
|
|
return boost::icl::interval<token>::right_open(std::move(start), std::move(end));
|
|
} else {
|
|
return boost::icl::interval<token>::closed(std::move(start), std::move(end));
|
|
}
|
|
}
|
|
|
|
wrapping_interval<dht::token>
|
|
token_metadata_impl::interval_to_range(boost::icl::interval<token>::interval_type i) {
|
|
bool start_inclusive;
|
|
bool end_inclusive;
|
|
auto bounds = i.bounds().bits();
|
|
if (bounds == boost::icl::interval_bounds::static_open) {
|
|
start_inclusive = false;
|
|
end_inclusive = false;
|
|
} else if (bounds == boost::icl::interval_bounds::static_left_open) {
|
|
start_inclusive = false;
|
|
end_inclusive = true;
|
|
} else if (bounds == boost::icl::interval_bounds::static_right_open) {
|
|
start_inclusive = true;
|
|
end_inclusive = false;
|
|
} else if (bounds == boost::icl::interval_bounds::static_closed) {
|
|
start_inclusive = true;
|
|
end_inclusive = true;
|
|
} else {
|
|
throw std::runtime_error("Invalid boost::icl::interval<token> bounds");
|
|
}
|
|
return wrapping_interval<dht::token>({{i.lower(), start_inclusive}}, {{i.upper(), end_inclusive}});
|
|
}
|
|
|
|
future<> token_metadata_impl::update_topology_change_info(dc_rack_fn& get_dc_rack) {
|
|
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) {
|
|
co_await utils::clear_gently(_topology_change_info);
|
|
_topology_change_info.reset();
|
|
co_return;
|
|
}
|
|
|
|
// target_token_metadata incorporates all the changes from leaving, bootstrapping and replacing
|
|
auto target_token_metadata = co_await clone_only_token_map(false);
|
|
{
|
|
// construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints
|
|
std::unordered_map<host_id, std::unordered_set<token>> new_normal_tokens;
|
|
if (!_replacing_endpoints.empty()) {
|
|
for (const auto& [token, inet_address]: _token_to_endpoint_map) {
|
|
const auto it = _replacing_endpoints.find(inet_address);
|
|
if (it == _replacing_endpoints.end()) {
|
|
continue;
|
|
}
|
|
new_normal_tokens[it->second].insert(token);
|
|
}
|
|
for (const auto& [replace_from, replace_to]: _replacing_endpoints) {
|
|
target_token_metadata->remove_endpoint(replace_from);
|
|
}
|
|
}
|
|
for (const auto& [token, inet_address]: _bootstrap_tokens) {
|
|
new_normal_tokens[inet_address].insert(token);
|
|
}
|
|
// apply new_normal_tokens
|
|
for (auto& [endpoint, tokens]: new_normal_tokens) {
|
|
target_token_metadata->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal);
|
|
co_await target_token_metadata->update_normal_tokens(std::move(tokens), endpoint);
|
|
}
|
|
// apply leaving endpoints
|
|
for (const auto& endpoint: _leaving_endpoints) {
|
|
target_token_metadata->remove_endpoint(endpoint);
|
|
}
|
|
target_token_metadata->sort_tokens();
|
|
}
|
|
|
|
// merge tokens from token_to_endpoint and bootstrap_tokens,
|
|
// preserving tokens of leaving endpoints
|
|
auto all_tokens = sorted_tokens();
|
|
all_tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size());
|
|
for (const auto& p: get_bootstrap_tokens()) {
|
|
all_tokens.push_back(p.first);
|
|
}
|
|
std::sort(all_tokens.begin(), all_tokens.end());
|
|
|
|
auto prev_value = std::move(_topology_change_info);
|
|
_topology_change_info.emplace(make_lw_shared<token_metadata>(std::move(target_token_metadata)),
|
|
std::move(all_tokens),
|
|
_read_new);
|
|
co_await utils::clear_gently(prev_value);
|
|
}
|
|
|
|
size_t token_metadata_impl::count_normal_token_owners() const {
|
|
return _normal_token_owners.size();
|
|
}
|
|
|
|
void token_metadata_impl::for_each_token_owner(std::function<void(const node&)> func) const {
|
|
_topology.for_each_node([this, func = std::move(func)] (const node& node) {
|
|
if (is_normal_token_owner(node.host_id())) {
|
|
func(node);
|
|
}
|
|
});
|
|
}
|
|
|
|
std::unordered_map<sstring, std::unordered_set<host_id>>
|
|
token_metadata_impl::get_datacenter_token_owners() const {
|
|
std::unordered_map<sstring, std::unordered_set<host_id>> datacenter_token_owners;
|
|
_topology.for_each_node([&] (const node& n) {
|
|
if (is_normal_token_owner(n.host_id())) {
|
|
datacenter_token_owners[n.dc_rack().dc].insert(n.host_id());
|
|
}
|
|
});
|
|
return datacenter_token_owners;
|
|
}
|
|
|
|
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<host_id>>>
|
|
token_metadata_impl::get_datacenter_racks_token_owners() const {
|
|
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<host_id>>> dc_racks_token_owners;
|
|
_topology.for_each_node([&] (const node& n) {
|
|
const auto& dc_rack = n.dc_rack();
|
|
if (is_normal_token_owner(n.host_id())) {
|
|
dc_racks_token_owners[dc_rack.dc][dc_rack.rack].insert(n.host_id());
|
|
}
|
|
});
|
|
return dc_racks_token_owners;
|
|
}
|
|
|
|
std::unordered_map<sstring, std::unordered_set<std::reference_wrapper<const node>>>
|
|
token_metadata_impl::get_datacenter_token_owners_nodes() const {
|
|
std::unordered_map<sstring, std::unordered_set<std::reference_wrapper<const node>>> datacenter_token_owners;
|
|
_topology.for_each_node([&] (const node& n) {
|
|
if (is_normal_token_owner(n.host_id())) {
|
|
datacenter_token_owners[n.dc_rack().dc].insert(std::cref(n));
|
|
}
|
|
});
|
|
return datacenter_token_owners;
|
|
}
|
|
|
|
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<std::reference_wrapper<const node>>>>
|
|
token_metadata_impl::get_datacenter_racks_token_owners_nodes() const {
|
|
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<std::reference_wrapper<const node>>>> dc_racks_token_owners;
|
|
_topology.for_each_node([&] (const node& n) {
|
|
const auto& dc_rack = n.dc_rack();
|
|
if (is_normal_token_owner(n.host_id())) {
|
|
dc_racks_token_owners[dc_rack.dc][dc_rack.rack].insert(std::cref(n));
|
|
}
|
|
});
|
|
return dc_racks_token_owners;
|
|
}
|
|
|
|
future<> token_metadata_impl::update_normal_token_owners() {
|
|
std::unordered_set<host_id> eps;
|
|
for (auto [t, ep]: _token_to_endpoint_map) {
|
|
eps.insert(ep);
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
_normal_token_owners = std::move(eps);
|
|
}
|
|
|
|
void token_metadata_impl::add_leaving_endpoint(host_id endpoint) {
|
|
_leaving_endpoints.emplace(endpoint);
|
|
}
|
|
|
|
void token_metadata_impl::del_leaving_endpoint(host_id endpoint) {
|
|
_leaving_endpoints.erase(endpoint);
|
|
}
|
|
|
|
void token_metadata_impl::add_replacing_endpoint(host_id existing_node, host_id replacing_node) {
|
|
if (existing_node == replacing_node) {
|
|
on_internal_error(tlogger, seastar::format("Can't replace node {} with itself", existing_node));
|
|
}
|
|
tlogger.info("Added node {} as pending replacing endpoint which replaces existing node {}",
|
|
replacing_node, existing_node);
|
|
_replacing_endpoints[existing_node] = replacing_node;
|
|
}
|
|
|
|
void token_metadata_impl::del_replacing_endpoint(host_id existing_node) {
|
|
if (_replacing_endpoints.contains(existing_node)) {
|
|
tlogger.info("Removed node {} as pending replacing endpoint which replaces existing node {}",
|
|
_replacing_endpoints[existing_node], existing_node);
|
|
}
|
|
_replacing_endpoints.erase(existing_node);
|
|
}
|
|
|
|
topology_change_info::topology_change_info(lw_shared_ptr<token_metadata> target_token_metadata_,
|
|
utils::chunked_vector<dht::token> all_tokens_,
|
|
token_metadata::read_new_t read_new_)
|
|
: target_token_metadata(std::move(target_token_metadata_))
|
|
, all_tokens(std::move(all_tokens_))
|
|
, read_new(read_new_)
|
|
{
|
|
}
|
|
|
|
future<> topology_change_info::clear_gently() {
|
|
co_await utils::clear_gently(target_token_metadata);
|
|
co_await utils::clear_gently(all_tokens);
|
|
}
|
|
|
|
token_metadata::token_metadata(std::unique_ptr<token_metadata_impl> impl)
|
|
: _impl(std::move(impl))
|
|
{
|
|
}
|
|
|
|
token_metadata::token_metadata(shared_token_metadata& stm, config cfg)
|
|
: _shared_token_metadata(&stm)
|
|
, _impl(std::make_unique<token_metadata_impl>(std::move(cfg)))
|
|
{
|
|
}
|
|
|
|
token_metadata::~token_metadata() {
|
|
clear_and_dispose_impl();
|
|
}
|
|
|
|
token_metadata::token_metadata(token_metadata&&) noexcept = default;
|
|
|
|
token_metadata& token_metadata::token_metadata::operator=(token_metadata&& o) noexcept {
|
|
if (this != &o) {
|
|
clear_and_dispose_impl();
|
|
_shared_token_metadata = std::exchange(o._shared_token_metadata, nullptr);
|
|
_impl = std::exchange(o._impl, nullptr);
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
void token_metadata::set_shared_token_metadata(shared_token_metadata& stm) {
|
|
_shared_token_metadata = &stm;
|
|
}
|
|
|
|
shared_token_metadata& token_metadata::get_shared_token_metadata() {
|
|
return *_shared_token_metadata;
|
|
}
|
|
|
|
const utils::chunked_vector<token>&
|
|
token_metadata::sorted_tokens() const {
|
|
return _impl->sorted_tokens();
|
|
}
|
|
|
|
future<>
|
|
token_metadata::update_normal_tokens(std::unordered_set<token> tokens, host_id endpoint) {
|
|
return _impl->update_normal_tokens(std::move(tokens), endpoint);
|
|
}
|
|
|
|
const token&
|
|
token_metadata::first_token(const token& start) const {
|
|
return _impl->first_token(start);
|
|
}
|
|
|
|
size_t
|
|
token_metadata::first_token_index(const token& start) const {
|
|
return _impl->first_token_index(start);
|
|
}
|
|
|
|
std::optional<host_id>
|
|
token_metadata::get_endpoint(const token& token) const {
|
|
return _impl->get_endpoint(token);
|
|
}
|
|
|
|
std::vector<token>
|
|
token_metadata::get_tokens(const host_id& addr) const {
|
|
return _impl->get_tokens(addr);
|
|
}
|
|
|
|
const std::unordered_map<token, host_id>&
|
|
token_metadata::get_token_to_endpoint() const {
|
|
return _impl->get_token_to_endpoint();
|
|
}
|
|
|
|
const std::unordered_set<host_id>&
|
|
token_metadata::get_leaving_endpoints() const {
|
|
return _impl->get_leaving_endpoints();
|
|
}
|
|
|
|
const std::unordered_map<token, host_id>&
|
|
token_metadata::get_bootstrap_tokens() const {
|
|
return _impl->get_bootstrap_tokens();
|
|
}
|
|
|
|
void
|
|
token_metadata::update_topology(host_id ep, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st, std::optional<shard_id> shard_count) {
|
|
_impl->update_topology(ep, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
|
|
}
|
|
|
|
std::ranges::subrange<token_metadata::tokens_iterator>
|
|
token_metadata::ring_range(const token& start) const {
|
|
return _impl->ring_range(start);
|
|
}
|
|
|
|
std::ranges::subrange<token_metadata::tokens_iterator>
|
|
token_metadata::ring_range(dht::ring_position_view start) const {
|
|
return _impl->ring_range(start);
|
|
}
|
|
|
|
class token_metadata_ring_splitter : public locator::token_range_splitter {
|
|
token_metadata_ptr _tmptr;
|
|
std::ranges::subrange<token_metadata::tokens_iterator> _range;
|
|
public:
|
|
token_metadata_ring_splitter(token_metadata_ptr tmptr)
|
|
: _tmptr(std::move(tmptr))
|
|
, _range(_tmptr->sorted_tokens().empty() // ring_range() throws if the ring is empty
|
|
? std::ranges::subrange(token_metadata::tokens_iterator(), token_metadata::tokens_iterator())
|
|
: _tmptr->ring_range(dht::minimum_token()))
|
|
{ }
|
|
|
|
void reset(dht::ring_position_view pos) override {
|
|
_range = _tmptr->ring_range(pos);
|
|
}
|
|
|
|
std::optional<dht::token> next_token() override {
|
|
if (_range.empty()) {
|
|
return std::nullopt;
|
|
}
|
|
auto t = *_range.begin();
|
|
_range.advance(1);
|
|
return t;
|
|
}
|
|
};
|
|
|
|
std::unique_ptr<locator::token_range_splitter> make_splitter(token_metadata_ptr tmptr) {
|
|
return std::make_unique<token_metadata_ring_splitter>(std::move(tmptr));
|
|
}
|
|
|
|
class local_splitter : public locator::token_range_splitter {
|
|
std::optional<dht::token> _next_token;
|
|
public:
|
|
local_splitter() : _next_token(dht::minimum_token()) {}
|
|
|
|
void reset(dht::ring_position_view pos) override {
|
|
_next_token = pos.token();
|
|
}
|
|
|
|
std::optional<dht::token> next_token() override {
|
|
if (auto ret = std::exchange(_next_token, std::nullopt)) {
|
|
return ret;
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
};
|
|
|
|
topology&
|
|
token_metadata::get_topology() {
|
|
return _impl->get_topology();
|
|
}
|
|
|
|
const topology&
|
|
token_metadata::get_topology() const {
|
|
return _impl->get_topology();
|
|
}
|
|
|
|
void
|
|
token_metadata::debug_show() const {
|
|
_impl->debug_show();
|
|
}
|
|
|
|
std::unordered_set<host_id>
|
|
token_metadata::get_host_ids() const {
|
|
return _impl->get_host_ids();
|
|
}
|
|
|
|
void
|
|
token_metadata::add_bootstrap_token(token t, host_id endpoint) {
|
|
_impl->add_bootstrap_token(t, endpoint);
|
|
}
|
|
|
|
void
|
|
token_metadata::add_bootstrap_tokens(std::unordered_set<token> tokens, host_id endpoint) {
|
|
_impl->add_bootstrap_tokens(std::move(tokens), endpoint);
|
|
}
|
|
|
|
void
|
|
token_metadata::remove_bootstrap_tokens(std::unordered_set<token> tokens) {
|
|
_impl->remove_bootstrap_tokens(std::move(tokens));
|
|
}
|
|
|
|
void
|
|
token_metadata::add_leaving_endpoint(host_id endpoint) {
|
|
_impl->add_leaving_endpoint(endpoint);
|
|
}
|
|
|
|
void
|
|
token_metadata::del_leaving_endpoint(host_id endpoint) {
|
|
_impl->del_leaving_endpoint(endpoint);
|
|
}
|
|
|
|
void
|
|
token_metadata::remove_endpoint(host_id endpoint) {
|
|
_impl->remove_endpoint(endpoint);
|
|
_impl->sort_tokens();
|
|
}
|
|
|
|
bool
|
|
token_metadata::is_normal_token_owner(host_id endpoint) const {
|
|
return _impl->is_normal_token_owner(endpoint);
|
|
}
|
|
|
|
bool
|
|
token_metadata::is_leaving(host_id endpoint) const {
|
|
return _impl->is_leaving(endpoint);
|
|
}
|
|
|
|
bool
|
|
token_metadata::is_being_replaced(host_id endpoint) const {
|
|
return _impl->is_being_replaced(endpoint);
|
|
}
|
|
|
|
bool
|
|
token_metadata::is_any_node_being_replaced() const {
|
|
return _impl->is_any_node_being_replaced();
|
|
}
|
|
|
|
void token_metadata::add_replacing_endpoint(host_id existing_node, host_id replacing_node) {
|
|
_impl->add_replacing_endpoint(existing_node, replacing_node);
|
|
}
|
|
|
|
void token_metadata::del_replacing_endpoint(host_id existing_node) {
|
|
_impl->del_replacing_endpoint(existing_node);
|
|
}
|
|
|
|
future<token_metadata> token_metadata::clone_async() const noexcept {
|
|
co_return token_metadata(co_await _impl->clone_async());
|
|
}
|
|
|
|
future<token_metadata>
|
|
token_metadata::clone_only_token_map() const noexcept {
|
|
co_return token_metadata(co_await _impl->clone_only_token_map());
|
|
}
|
|
|
|
future<token_metadata>
|
|
token_metadata::clone_after_all_left() const noexcept {
|
|
co_return token_metadata(co_await _impl->clone_after_all_left());
|
|
}
|
|
|
|
void token_metadata::clear_and_dispose_impl() noexcept {
|
|
if (!_shared_token_metadata) {
|
|
return;
|
|
}
|
|
if (auto impl = std::exchange(_impl, nullptr)) {
|
|
_shared_token_metadata->clear_and_dispose(std::move(impl));
|
|
}
|
|
}
|
|
|
|
future<> token_metadata::clear_gently() noexcept {
|
|
return _impl->clear_gently();
|
|
}
|
|
|
|
dht::token_range_vector
|
|
token_metadata::get_primary_ranges_for(std::unordered_set<token> tokens) const {
|
|
return _impl->get_primary_ranges_for(std::move(tokens));
|
|
}
|
|
|
|
dht::token_range_vector
|
|
token_metadata::get_primary_ranges_for(token right) const {
|
|
return _impl->get_primary_ranges_for(right);
|
|
}
|
|
|
|
boost::icl::interval<token>::interval_type
|
|
token_metadata::range_to_interval(wrapping_interval<dht::token> r) {
|
|
return token_metadata_impl::range_to_interval(std::move(r));
|
|
}
|
|
|
|
wrapping_interval<dht::token>
|
|
token_metadata::interval_to_range(boost::icl::interval<token>::interval_type i) {
|
|
return token_metadata_impl::interval_to_range(std::move(i));
|
|
}
|
|
|
|
future<>
|
|
token_metadata::update_topology_change_info(dc_rack_fn& get_dc_rack) {
|
|
return _impl->update_topology_change_info(get_dc_rack);
|
|
}
|
|
|
|
const std::optional<topology_change_info>&
|
|
token_metadata::get_topology_change_info() const {
|
|
return _impl->get_topology_change_info();
|
|
}
|
|
|
|
token
|
|
token_metadata::get_predecessor(token t) const {
|
|
return _impl->get_predecessor(t);
|
|
}
|
|
|
|
const std::unordered_set<host_id>&
|
|
token_metadata::get_normal_token_owners() const {
|
|
return _impl->get_normal_token_owners();
|
|
}
|
|
|
|
void token_metadata::for_each_token_owner(std::function<void(const node&)> func) const {
|
|
return _impl->for_each_token_owner(func);
|
|
}
|
|
|
|
size_t
|
|
token_metadata::count_normal_token_owners() const {
|
|
return _impl->count_normal_token_owners();
|
|
}
|
|
|
|
std::unordered_map<sstring, std::unordered_set<host_id>> token_metadata::get_datacenter_token_owners() const {
|
|
return _impl->get_datacenter_token_owners();
|
|
}
|
|
|
|
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<host_id>>>
|
|
token_metadata::get_datacenter_racks_token_owners() const {
|
|
return _impl->get_datacenter_racks_token_owners();
|
|
}
|
|
|
|
std::unordered_map<sstring, std::unordered_set<std::reference_wrapper<const node>>> token_metadata::get_datacenter_token_owners_nodes() const {
|
|
return _impl->get_datacenter_token_owners_nodes();
|
|
}
|
|
|
|
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_set<std::reference_wrapper<const node>>>>
|
|
token_metadata::get_datacenter_racks_token_owners_nodes() const {
|
|
return _impl->get_datacenter_racks_token_owners_nodes();
|
|
}
|
|
|
|
void
|
|
token_metadata::set_read_new(read_new_t read_new) {
|
|
_impl->set_read_new(read_new);
|
|
}
|
|
|
|
long
|
|
token_metadata::get_ring_version() const {
|
|
return _impl->get_ring_version();
|
|
}
|
|
|
|
void
|
|
token_metadata::invalidate_cached_rings() {
|
|
_impl->invalidate_cached_rings();
|
|
}
|
|
|
|
auto
|
|
token_metadata::get_version() const -> version_t {
|
|
return _impl->get_version();
|
|
}
|
|
void
|
|
token_metadata::set_version(version_t version) {
|
|
_impl->set_version(version);
|
|
}
|
|
void
|
|
token_metadata::set_version_tracker(version_tracker_t tracker) {
|
|
_impl->set_version_tracker(std::move(tracker));
|
|
}
|
|
|
|
version_tracker::version_tracker(utils::phased_barrier::operation op, const token_metadata& tm)
|
|
: _op(std::move(op))
|
|
, _version(tm.get_version())
|
|
, _tm(&tm)
|
|
{
|
|
}
|
|
|
|
long version_tracker::version_use_count() const {
|
|
return _tm->use_count();
|
|
}
|
|
|
|
version_tracker::~version_tracker() {
|
|
if (_expired_at) {
|
|
auto now = std::chrono::steady_clock::now();
|
|
if (*_expired_at + _log_threshold < now) {
|
|
auto d = std::chrono::duration_cast<std::chrono::duration<float>>(now - *_expired_at);
|
|
tlogger.warn("topology version {} held for {:.3f} [s] past expiry, released at: {}", _version, d.count(),
|
|
seastar::current_backtrace());
|
|
}
|
|
}
|
|
}
|
|
|
|
version_tracker shared_token_metadata::new_tracker(const token_metadata& tm) {
|
|
auto tracker = version_tracker(_versions_barrier.start(), tm);
|
|
_trackers.push_front(tracker);
|
|
return tracker;
|
|
}
|
|
|
|
future<> shared_token_metadata::stop() noexcept {
|
|
co_await _background_dispose_gate.close();
|
|
}
|
|
|
|
void shared_token_metadata::clear_and_dispose(std::unique_ptr<token_metadata_impl> impl) noexcept {
|
|
// Safe to drop the future since the gate is closed in stop()
|
|
if (auto gh = _background_dispose_gate.try_hold()) {
|
|
(void)impl->clear_gently().finally([i = std::move(impl), gh = std::move(gh)] {});
|
|
}
|
|
}
|
|
|
|
std::unordered_map<service::topology::version_t, int> shared_token_metadata::describe_stale_versions() {
|
|
std::unordered_map<service::topology::version_t, int> result;
|
|
const auto active_version = _shared.get()->get_version();
|
|
for (const auto& t: _trackers) {
|
|
const auto v = t.version();
|
|
if (v < active_version) {
|
|
result.emplace(v, t.version_use_count());
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {
|
|
if (_shared->get_ring_version() >= tmptr->get_ring_version()) {
|
|
on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version()));
|
|
}
|
|
|
|
if (_shared->get_version() > tmptr->get_version()) {
|
|
on_internal_error(tlogger, format("shared_token_metadata: must not set decreasing version: {} -> {}", _shared->get_version(), tmptr->get_version()));
|
|
} else if (_shared->get_version() < tmptr->get_version()) {
|
|
_stale_versions_in_use = _versions_barrier.advance_and_await();
|
|
}
|
|
|
|
tmptr->set_shared_token_metadata(*this);
|
|
_shared = std::move(tmptr);
|
|
_shared->set_version_tracker(new_tracker(*_shared));
|
|
|
|
for (auto&& v : _trackers) {
|
|
if (v.version() != _shared->get_version()) {
|
|
v.mark_expired(_stall_detector_threshold);
|
|
}
|
|
}
|
|
|
|
tlogger.debug("new token_metadata is set, version {}", _shared->get_version());
|
|
}
|
|
|
|
future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_function<future<> (token_metadata&)> func) {
|
|
auto lk = co_await get_lock();
|
|
auto tm = co_await _shared->clone_async();
|
|
// bump the token_metadata ring_version
|
|
// to invalidate cached token/replication mappings
|
|
// when the modified token_metadata is committed.
|
|
tm.invalidate_cached_rings();
|
|
co_await func(tm);
|
|
set(make_token_metadata_ptr(std::move(tm)));
|
|
}
|
|
|
|
void shared_token_metadata::mutate_token_metadata_for_test(seastar::noncopyable_function<void (token_metadata&)> func) {
|
|
auto& tm = *_shared;
|
|
// bump the token_metadata ring_version
|
|
// to invalidate cached token/replication mappings
|
|
// when the modified token_metadata is committed.
|
|
tm.invalidate_cached_rings();
|
|
func(tm);
|
|
}
|
|
|
|
future<> shared_token_metadata::mutate_on_all_shards(sharded<shared_token_metadata>& stm, seastar::noncopyable_function<future<> (token_metadata&)> func) {
|
|
auto base_shard = this_shard_id();
|
|
SCYLLA_ASSERT(base_shard == 0);
|
|
auto lk = co_await stm.local().get_lock();
|
|
|
|
std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
|
|
pending_token_metadata_ptr.resize(smp::count);
|
|
auto tmptr = stm.local().make_token_metadata_ptr(co_await stm.local().get()->clone_async());
|
|
auto& tm = *tmptr;
|
|
// bump the token_metadata ring_version
|
|
// to invalidate cached token/replication mappings
|
|
// when the modified token_metadata is committed.
|
|
tm.invalidate_cached_rings();
|
|
co_await func(tm);
|
|
|
|
// Apply the mutated token_metadata only after successfully cloning it on all shards.
|
|
pending_token_metadata_ptr[base_shard] = tmptr;
|
|
co_await smp::invoke_on_others(base_shard, [&] () -> future<> {
|
|
pending_token_metadata_ptr[this_shard_id()] = stm.local().make_token_metadata_ptr(co_await tm.clone_async());
|
|
});
|
|
|
|
co_await stm.invoke_on_all([&] (shared_token_metadata& stm) {
|
|
stm.set(std::move(pending_token_metadata_ptr[this_shard_id()]));
|
|
});
|
|
}
|
|
|
|
host_id_or_endpoint::host_id_or_endpoint(const sstring& s, param_type restrict) {
|
|
switch (restrict) {
|
|
case param_type::host_id:
|
|
try {
|
|
_value = host_id(utils::UUID(s));
|
|
} catch (const marshal_exception& e) {
|
|
throw std::invalid_argument(format("Invalid host_id {}: {}", s, e.what()));
|
|
}
|
|
break;
|
|
case param_type::endpoint:
|
|
try {
|
|
_value = gms::inet_address(s);
|
|
} catch (std::invalid_argument& e) {
|
|
throw std::invalid_argument(format("Invalid inet_address {}: {}", s, e.what()));
|
|
}
|
|
break;
|
|
case param_type::auto_detect:
|
|
try {
|
|
_value = host_id(utils::UUID(s));
|
|
} catch (const marshal_exception& e) {
|
|
try {
|
|
_value = gms::inet_address(s);
|
|
} catch (std::invalid_argument& e) {
|
|
throw std::invalid_argument(format("Invalid host_id or inet_address {}", s));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
host_id host_id_or_endpoint::resolve_id(const gms::gossiper& g) const {
|
|
if (has_host_id()) {
|
|
return id();
|
|
}
|
|
try {
|
|
return g.get_host_id(endpoint());
|
|
} catch (...) {
|
|
throw std::runtime_error(format("Host inet address {} not found in the cluster", endpoint()));
|
|
}
|
|
}
|
|
|
|
gms::inet_address host_id_or_endpoint::resolve_endpoint(const gms::gossiper& g) const {
|
|
if (has_endpoint()) {
|
|
return endpoint();
|
|
}
|
|
auto endpoint_opt = g.get_address_map().find(id());
|
|
if (!endpoint_opt) {
|
|
throw std::runtime_error(format("Host ID {} not found in the cluster", id()));
|
|
}
|
|
return *endpoint_opt;
|
|
}
|
|
|
|
future<> pending_token_metadata::assign(locator::mutable_token_metadata_ptr new_token_metadata) {
|
|
auto& sharded_token_metadata = new_token_metadata->get_shared_token_metadata().container();
|
|
// clone a local copy of new_token_metadata on all other shards
|
|
co_await smp::invoke_on_others([this, &new_token_metadata, &sharded_token_metadata] () -> future<> {
|
|
local() = sharded_token_metadata.local().make_token_metadata_ptr(
|
|
co_await new_token_metadata->clone_async());
|
|
});
|
|
local() = std::move(new_token_metadata);
|
|
}
|
|
|
|
locator::mutable_token_metadata_ptr& pending_token_metadata::local() {
|
|
return _shards[this_shard_id()];
|
|
}
|
|
|
|
locator::token_metadata_ptr pending_token_metadata::local() const {
|
|
return _shards[this_shard_id()];
|
|
}
|
|
|
|
future<> pending_token_metadata::destroy() {
|
|
return smp::invoke_on_all([this] () {
|
|
_shards[this_shard_id()] = nullptr;
|
|
});
|
|
}
|
|
|
|
} // namespace locator
|