mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-20 00:20:47 +00:00
token_metadata: introduce topology_change_info
We plan to move pending_endpoints and read_endpoints, along with their computation logic, from token_metadata to vnode_effective_replication_map. The vnode_effective_replication_map seems more appropriate for them since it contains functionally similar _replication_map and we will be able to reuse pending_endpoints/read_endpoints across keyspaces sharing the same factory_key. At present, pending_endpoints and read_endpoints are updated in the update_pending_ranges function. The update logic comprises two parts - preparing data common to all keyspaces/replication_strategies, and calculating the migration_info for specific keyspaces. In this commit, we introduce a new topology_change_info structure to hold the first part's data add create an update_topology_change_info function to update it. This structure will later be used in vnode_effective_replication_map to compute pending_endpoints and read_endpoints. This enables the reuse of topology_change_info across all keyspaces, unlike the current update_pending_ranges implementation, which is another benefit of this refactoring. The update_topology_change_info implementation is mostly derived from update_pending_ranges, there are a few differences though: * replacing async and thread with plain co_awaits; * adding a utils::clear_gently call for the previous value to mitigate reactor stalls if target_token_metadata grows large; * substituting immediately invoked lambdas with simple variables and blocks to reduce noise, as lambdas would need to be converted into coroutines. The original update_pending_ranges remains unchanged, and will be removed entirely upon transitioning to the new implementation. Meanwhile, we add an update_topology_change_info call to storage_service::update_pending_ranges so that we can iteratively switch the system to the new implementation.
This commit is contained in:
@@ -59,6 +59,8 @@ private:
|
||||
// The map between the existing node to be replaced and the replacing node
|
||||
std::unordered_map<inet_address, inet_address> _replacing_endpoints;
|
||||
|
||||
std::optional<topology_change_info> _topology_change_info;
|
||||
|
||||
using ring_mapping = boost::icl::interval_map<token, std::unordered_set<inet_address>>;
|
||||
// For each keyspace, migration_info contains ranges of tokens and
|
||||
// corresponding replicas to which writes or reads will be directed:
|
||||
@@ -257,6 +259,11 @@ public:
|
||||
future<> update_pending_ranges(
|
||||
const token_metadata& unpimplified_this,
|
||||
const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack);
|
||||
|
||||
future<> update_topology_change_info(dc_rack_fn& get_dc_rack);
|
||||
const std::optional<topology_change_info>& get_topology_change_info() const {
|
||||
return _topology_change_info;
|
||||
}
|
||||
public:
|
||||
|
||||
token get_predecessor(token t) const;
|
||||
@@ -876,6 +883,86 @@ future<> token_metadata_impl::update_pending_ranges(
|
||||
});
|
||||
}
|
||||
|
||||
future<> token_metadata_impl::update_topology_change_info(dc_rack_fn& get_dc_rack) {
|
||||
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) {
|
||||
co_await utils::clear_gently(_topology_change_info);
|
||||
_topology_change_info.reset();
|
||||
co_return;
|
||||
}
|
||||
|
||||
// true if there is a node replaced with the same IP
|
||||
bool replace_with_same_endpoint = false;
|
||||
// target_token_metadata incorporates all the changes from leaving, bootstrapping and replacing
|
||||
auto target_token_metadata = co_await clone_only_token_map(false);
|
||||
{
|
||||
// construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints
|
||||
std::unordered_map<inet_address, std::unordered_set<token>> new_normal_tokens;
|
||||
if (!_replacing_endpoints.empty()) {
|
||||
for (const auto& [token, inet_address]: _token_to_endpoint_map) {
|
||||
const auto it = _replacing_endpoints.find(inet_address);
|
||||
if (it == _replacing_endpoints.end()) {
|
||||
continue;
|
||||
}
|
||||
new_normal_tokens[it->second].insert(token);
|
||||
}
|
||||
for (const auto& [replace_from, replace_to]: _replacing_endpoints) {
|
||||
if (replace_from == replace_to) {
|
||||
replace_with_same_endpoint = true;
|
||||
} else {
|
||||
target_token_metadata->remove_endpoint(replace_from);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (const auto& [token, inet_address]: _bootstrap_tokens) {
|
||||
new_normal_tokens[inet_address].insert(token);
|
||||
}
|
||||
// apply new_normal_tokens
|
||||
for (auto& [endpoint, tokens]: new_normal_tokens) {
|
||||
target_token_metadata->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal);
|
||||
co_await target_token_metadata->update_normal_tokens(std::move(tokens), endpoint);
|
||||
}
|
||||
// apply leaving endpoints
|
||||
for (const auto& endpoint: _leaving_endpoints) {
|
||||
target_token_metadata->remove_endpoint(endpoint);
|
||||
}
|
||||
target_token_metadata->sort_tokens();
|
||||
}
|
||||
|
||||
// We require a distinct token_metadata instance when replace_from equals replace_to,
|
||||
// as it ensures the node is included in pending_ranges.
|
||||
// Otherwise, the node would be excluded from both pending_ranges and
|
||||
// get_natural_endpoints_without_node_being_replaced,
|
||||
// causing the coordinator to overlook it entirely.
|
||||
std::unique_ptr<token_metadata_impl> base_token_metadata;
|
||||
if (replace_with_same_endpoint) {
|
||||
base_token_metadata = co_await clone_only_token_map(false);
|
||||
for (const auto& [replace_from, replace_to]: _replacing_endpoints) {
|
||||
if (replace_from == replace_to) {
|
||||
base_token_metadata->remove_endpoint(replace_from);
|
||||
}
|
||||
}
|
||||
base_token_metadata->sort_tokens();
|
||||
}
|
||||
|
||||
// merge tokens from token_to_endpoint and bootstrap_tokens,
|
||||
// preserving tokens of leaving endpoints
|
||||
auto all_tokens = std::vector<dht::token>();
|
||||
all_tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size());
|
||||
all_tokens.resize(sorted_tokens().size());
|
||||
std::copy(begin(sorted_tokens()), end(sorted_tokens()), begin(all_tokens));
|
||||
for (const auto& p: get_bootstrap_tokens()) {
|
||||
all_tokens.push_back(p.first);
|
||||
}
|
||||
std::sort(begin(all_tokens), end(all_tokens));
|
||||
|
||||
auto prev_value = std::move(_topology_change_info);
|
||||
_topology_change_info.emplace(token_metadata(std::move(target_token_metadata)),
|
||||
base_token_metadata ? std::optional(token_metadata(std::move(base_token_metadata))): std::nullopt,
|
||||
std::move(all_tokens),
|
||||
_read_new);
|
||||
co_await utils::clear_gently(prev_value);
|
||||
}
|
||||
|
||||
size_t token_metadata_impl::count_normal_token_owners() const {
|
||||
return _normal_token_owners.size();
|
||||
}
|
||||
@@ -969,6 +1056,23 @@ std::multimap<inet_address, token> token_metadata_impl::get_endpoint_to_token_ma
|
||||
return cloned;
|
||||
}
|
||||
|
||||
topology_change_info::topology_change_info(token_metadata target_token_metadata_,
|
||||
std::optional<token_metadata> base_token_metadata_,
|
||||
std::vector<dht::token> all_tokens_,
|
||||
token_metadata::read_new_t read_new_)
|
||||
: target_token_metadata(std::move(target_token_metadata_))
|
||||
, base_token_metadata(std::move(base_token_metadata_))
|
||||
, all_tokens(std::move(all_tokens_))
|
||||
, read_new(read_new_)
|
||||
{
|
||||
}
|
||||
|
||||
future<> topology_change_info::clear_gently() {
|
||||
co_await utils::clear_gently(target_token_metadata);
|
||||
co_await utils::clear_gently(base_token_metadata);
|
||||
co_await utils::clear_gently(all_tokens);
|
||||
}
|
||||
|
||||
token_metadata::token_metadata(std::unique_ptr<token_metadata_impl> impl)
|
||||
: _impl(std::move(impl)) {
|
||||
}
|
||||
@@ -1232,6 +1336,16 @@ token_metadata::update_pending_ranges(const abstract_replication_strategy& strat
|
||||
return _impl->update_pending_ranges(*this, strategy, keyspace_name, get_dc_rack);
|
||||
}
|
||||
|
||||
future<>
|
||||
token_metadata::update_topology_change_info(dc_rack_fn& get_dc_rack) {
|
||||
return _impl->update_topology_change_info(get_dc_rack);
|
||||
}
|
||||
|
||||
const std::optional<topology_change_info>&
|
||||
token_metadata::get_topology_change_info() const {
|
||||
return _impl->get_topology_change_info();
|
||||
}
|
||||
|
||||
token
|
||||
token_metadata::get_predecessor(token t) const {
|
||||
return _impl->get_predecessor(t);
|
||||
|
||||
@@ -69,6 +69,7 @@ struct host_id_or_endpoint {
|
||||
};
|
||||
|
||||
class token_metadata_impl;
|
||||
struct topology_change_info;
|
||||
|
||||
class token_metadata final {
|
||||
std::unique_ptr<token_metadata_impl> _impl;
|
||||
@@ -256,6 +257,10 @@ public:
|
||||
*/
|
||||
future<> update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack);
|
||||
|
||||
future<> update_topology_change_info(dc_rack_fn& get_dc_rack);
|
||||
|
||||
const std::optional<topology_change_info>& get_topology_change_info() const;
|
||||
|
||||
token get_predecessor(token t) const;
|
||||
|
||||
const std::unordered_set<inet_address>& get_all_endpoints() const;
|
||||
@@ -294,6 +299,19 @@ public:
|
||||
friend class token_metadata_impl;
|
||||
};
|
||||
|
||||
struct topology_change_info {
|
||||
token_metadata target_token_metadata;
|
||||
std::optional<token_metadata> base_token_metadata;
|
||||
std::vector<dht::token> all_tokens;
|
||||
token_metadata::read_new_t read_new;
|
||||
|
||||
topology_change_info(token_metadata target_token_metadata_,
|
||||
std::optional<token_metadata> base_token_metadata_,
|
||||
std::vector<dht::token> all_tokens_,
|
||||
token_metadata::read_new_t read_new_);
|
||||
future<> clear_gently();
|
||||
};
|
||||
|
||||
using token_metadata_ptr = lw_shared_ptr<const token_metadata>;
|
||||
using mutable_token_metadata_ptr = lw_shared_ptr<token_metadata>;
|
||||
using token_metadata_lock = semaphore_units<>;
|
||||
|
||||
@@ -4750,11 +4750,13 @@ future<> storage_service::update_pending_ranges(mutable_token_metadata_ptr tmptr
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
try {
|
||||
locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); });
|
||||
co_await tmptr->update_topology_change_info(get_dc_rack_from_gossiper);
|
||||
|
||||
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
|
||||
for (const auto& [keyspace_name, erm] : ks_erms) {
|
||||
auto& strategy = erm->get_replication_strategy();
|
||||
slogger.debug("Updating pending ranges for keyspace={} starts ({})", keyspace_name, reason);
|
||||
locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); });
|
||||
co_await tmptr->update_pending_ranges(strategy, keyspace_name, get_dc_rack_from_gossiper);
|
||||
slogger.debug("Updating pending ranges for keyspace={} ends ({})", keyspace_name, reason);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user