token_metadata: introduce topology_change_info

We plan to move pending_endpoints and read_endpoints, along
with their computation logic, from token_metadata to
vnode_effective_replication_map. The vnode_effective_replication_map
seems more appropriate for them since it contains functionally
similar _replication_map and we will be able to reuse
pending_endpoints/read_endpoints across keyspaces
sharing the same factory_key.

At present, pending_endpoints and read_endpoints are updated in the
update_pending_ranges function. The update logic comprises two
parts - preparing data common to all keyspaces/replication_strategies,
and calculating the migration_info for specific keyspaces. In this commit,
we introduce a new topology_change_info structure to hold the first
part's data add create an update_topology_change_info function to
update it. This structure will later be used in
vnode_effective_replication_map to compute pending_endpoints
and read_endpoints. This enables the reuse of topology_change_info
across all keyspaces, unlike the current update_pending_ranges
implementation, which is another benefit of this refactoring.

The update_topology_change_info implementation is mostly derived from
update_pending_ranges, there are a few differences though:
* replacing async and thread with plain co_awaits;
* adding a utils::clear_gently call for the previous value
to mitigate reactor stalls if target_token_metadata grows large;
* substituting immediately invoked lambdas with simple variables and
blocks to reduce noise, as lambdas would need to be converted into coroutines.

The original update_pending_ranges remains unchanged, and will be
removed entirely upon transitioning to the new implementation.
Meanwhile, we add an update_topology_change_info call to
storage_service::update_pending_ranges so that we can
iteratively switch the system to the new implementation.
This commit is contained in:
Petr Gusev
2023-05-15 18:40:06 +04:00
parent 51e80691ef
commit 10bf8c7901
3 changed files with 135 additions and 1 deletions

View File

@@ -59,6 +59,8 @@ private:
// The map between the existing node to be replaced and the replacing node
std::unordered_map<inet_address, inet_address> _replacing_endpoints;
std::optional<topology_change_info> _topology_change_info;
using ring_mapping = boost::icl::interval_map<token, std::unordered_set<inet_address>>;
// For each keyspace, migration_info contains ranges of tokens and
// corresponding replicas to which writes or reads will be directed:
@@ -257,6 +259,11 @@ public:
future<> update_pending_ranges(
const token_metadata& unpimplified_this,
const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack);
future<> update_topology_change_info(dc_rack_fn& get_dc_rack);
const std::optional<topology_change_info>& get_topology_change_info() const {
return _topology_change_info;
}
public:
token get_predecessor(token t) const;
@@ -876,6 +883,86 @@ future<> token_metadata_impl::update_pending_ranges(
});
}
future<> token_metadata_impl::update_topology_change_info(dc_rack_fn& get_dc_rack) {
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) {
co_await utils::clear_gently(_topology_change_info);
_topology_change_info.reset();
co_return;
}
// true if there is a node replaced with the same IP
bool replace_with_same_endpoint = false;
// target_token_metadata incorporates all the changes from leaving, bootstrapping and replacing
auto target_token_metadata = co_await clone_only_token_map(false);
{
// construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints
std::unordered_map<inet_address, std::unordered_set<token>> new_normal_tokens;
if (!_replacing_endpoints.empty()) {
for (const auto& [token, inet_address]: _token_to_endpoint_map) {
const auto it = _replacing_endpoints.find(inet_address);
if (it == _replacing_endpoints.end()) {
continue;
}
new_normal_tokens[it->second].insert(token);
}
for (const auto& [replace_from, replace_to]: _replacing_endpoints) {
if (replace_from == replace_to) {
replace_with_same_endpoint = true;
} else {
target_token_metadata->remove_endpoint(replace_from);
}
}
}
for (const auto& [token, inet_address]: _bootstrap_tokens) {
new_normal_tokens[inet_address].insert(token);
}
// apply new_normal_tokens
for (auto& [endpoint, tokens]: new_normal_tokens) {
target_token_metadata->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal);
co_await target_token_metadata->update_normal_tokens(std::move(tokens), endpoint);
}
// apply leaving endpoints
for (const auto& endpoint: _leaving_endpoints) {
target_token_metadata->remove_endpoint(endpoint);
}
target_token_metadata->sort_tokens();
}
// We require a distinct token_metadata instance when replace_from equals replace_to,
// as it ensures the node is included in pending_ranges.
// Otherwise, the node would be excluded from both pending_ranges and
// get_natural_endpoints_without_node_being_replaced,
// causing the coordinator to overlook it entirely.
std::unique_ptr<token_metadata_impl> base_token_metadata;
if (replace_with_same_endpoint) {
base_token_metadata = co_await clone_only_token_map(false);
for (const auto& [replace_from, replace_to]: _replacing_endpoints) {
if (replace_from == replace_to) {
base_token_metadata->remove_endpoint(replace_from);
}
}
base_token_metadata->sort_tokens();
}
// merge tokens from token_to_endpoint and bootstrap_tokens,
// preserving tokens of leaving endpoints
auto all_tokens = std::vector<dht::token>();
all_tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size());
all_tokens.resize(sorted_tokens().size());
std::copy(begin(sorted_tokens()), end(sorted_tokens()), begin(all_tokens));
for (const auto& p: get_bootstrap_tokens()) {
all_tokens.push_back(p.first);
}
std::sort(begin(all_tokens), end(all_tokens));
auto prev_value = std::move(_topology_change_info);
_topology_change_info.emplace(token_metadata(std::move(target_token_metadata)),
base_token_metadata ? std::optional(token_metadata(std::move(base_token_metadata))): std::nullopt,
std::move(all_tokens),
_read_new);
co_await utils::clear_gently(prev_value);
}
size_t token_metadata_impl::count_normal_token_owners() const {
return _normal_token_owners.size();
}
@@ -969,6 +1056,23 @@ std::multimap<inet_address, token> token_metadata_impl::get_endpoint_to_token_ma
return cloned;
}
topology_change_info::topology_change_info(token_metadata target_token_metadata_,
std::optional<token_metadata> base_token_metadata_,
std::vector<dht::token> all_tokens_,
token_metadata::read_new_t read_new_)
: target_token_metadata(std::move(target_token_metadata_))
, base_token_metadata(std::move(base_token_metadata_))
, all_tokens(std::move(all_tokens_))
, read_new(read_new_)
{
}
future<> topology_change_info::clear_gently() {
co_await utils::clear_gently(target_token_metadata);
co_await utils::clear_gently(base_token_metadata);
co_await utils::clear_gently(all_tokens);
}
token_metadata::token_metadata(std::unique_ptr<token_metadata_impl> impl)
: _impl(std::move(impl)) {
}
@@ -1232,6 +1336,16 @@ token_metadata::update_pending_ranges(const abstract_replication_strategy& strat
return _impl->update_pending_ranges(*this, strategy, keyspace_name, get_dc_rack);
}
future<>
token_metadata::update_topology_change_info(dc_rack_fn& get_dc_rack) {
return _impl->update_topology_change_info(get_dc_rack);
}
const std::optional<topology_change_info>&
token_metadata::get_topology_change_info() const {
return _impl->get_topology_change_info();
}
token
token_metadata::get_predecessor(token t) const {
return _impl->get_predecessor(t);

View File

@@ -69,6 +69,7 @@ struct host_id_or_endpoint {
};
class token_metadata_impl;
struct topology_change_info;
class token_metadata final {
std::unique_ptr<token_metadata_impl> _impl;
@@ -256,6 +257,10 @@ public:
*/
future<> update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack);
future<> update_topology_change_info(dc_rack_fn& get_dc_rack);
const std::optional<topology_change_info>& get_topology_change_info() const;
token get_predecessor(token t) const;
const std::unordered_set<inet_address>& get_all_endpoints() const;
@@ -294,6 +299,19 @@ public:
friend class token_metadata_impl;
};
struct topology_change_info {
token_metadata target_token_metadata;
std::optional<token_metadata> base_token_metadata;
std::vector<dht::token> all_tokens;
token_metadata::read_new_t read_new;
topology_change_info(token_metadata target_token_metadata_,
std::optional<token_metadata> base_token_metadata_,
std::vector<dht::token> all_tokens_,
token_metadata::read_new_t read_new_);
future<> clear_gently();
};
using token_metadata_ptr = lw_shared_ptr<const token_metadata>;
using mutable_token_metadata_ptr = lw_shared_ptr<token_metadata>;
using token_metadata_lock = semaphore_units<>;

View File

@@ -4750,11 +4750,13 @@ future<> storage_service::update_pending_ranges(mutable_token_metadata_ptr tmptr
assert(this_shard_id() == 0);
try {
locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); });
co_await tmptr->update_topology_change_info(get_dc_rack_from_gossiper);
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
for (const auto& [keyspace_name, erm] : ks_erms) {
auto& strategy = erm->get_replication_strategy();
slogger.debug("Updating pending ranges for keyspace={} starts ({})", keyspace_name, reason);
locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); });
co_await tmptr->update_pending_ranges(strategy, keyspace_name, get_dc_rack_from_gossiper);
slogger.debug("Updating pending ranges for keyspace={} ends ({})", keyspace_name, reason);
}