mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-02 04:56:58 +00:00
Merge 'Move pending_ranges and endpoints_for_reading from token_metadata to erm' from Gusev Petr
This refactoring is a follow-up for https://github.com/scylladb/scylladb/pull/13376, move per keyspace data structures related to topology changes from `token_metadata` to `erm`. We move `pending_endpoints` and `read_endpoints`, along with their computation logic, from `token_metadata` to `vnode_effective_replication_map`. The `vnode_effective_replication_map` seems more appropriate for them since it contains functionally similar `replication_map` and we will be able to reuse `pending_endpoints/read_endpoints` across keyspaces sharing the same `factory_key`. At present, `pending_endpoints` and `read_endpoints` are updated in the `update_pending_ranges` function. The update logic comprises two parts - preparing data common to all keyspaces/replication_strategies, and calculating the `migration_info` for specific keyspaces. In this PR we introduce a new `topology_change_info` structure to hold the first part's data and create an `update_topology_change_info` function to update it. This structure will be used in `vnode_effective_replication_map` to compute `pending_endpoints` and `read_endpoints`. This enables the reuse of `topology_change_info` across all keyspaces, unlike the current `update_pending_ranges` implementation, which is another benefit of this refactoring. The PR also optimises `replication_map` memory usage for the case `natural_endpoints_depend_on_token == false`. We store endpoints list only once with special key instead of duplicating them for each `vnode` token. The original `update_pending_ranges` remains unchanged during the PR commits, and will be removed entirely upon transitioning to the new implementation. Closes #13715 * github.com:scylladb/scylladb: token_metadata_test: add a test for everywhere strategy token_metadata_test: check read_endpoints when bootstrapping first node token_metadata_test: refactor tests, extract create_erm token_metadata: drop has_pending_ranges and migration_info effective_replication_map: add has_pending_ranges token_metadata: drop update_pending_ranges effective_replication_map: use new get_pending_endpoints and get_endpoints_for_reading token_metadata_test.cc: create token_metadata and replication_strategy as shared pointers vnode_effective_replication_map: get_pending_endpoints and get_endpoints_for_reading calculate_effective_replication_map: compute pending_endpoints and read_endpoints vnode_erm: optimize replication_map vnode_erm::get_range_addresses: use sorted_tokens abstract_replication_strategy.hh: de-virtualize natural_endpoints_depend_on_token sequenced_set: add extract_vector method effective_replication_map: clone_endpoints_gently -> clone_data_gently vnode_erm: gentle destruction of _pending_endpoints and _read_endpoints stall_free.hh: add clear_gently for rvalues stall_free.hh: relax Container requirement token_metadata: add pending_endpoints and read_endpoints to vnode_effective_replication_map token_metadata: introduce topology_change_info token_metadata: replace set_topology_transition_state with set_read_new
This commit is contained in:
@@ -1637,7 +1637,7 @@ future<> view_update_generator::mutate_MV(
|
||||
auto view_token = dht::get_token(*mut.s, mut.fm.key());
|
||||
auto& keyspace_name = mut.s->ks_name();
|
||||
auto target_endpoint = get_view_natural_endpoint(_proxy.local().local_db(), keyspace_name, base_token, view_token);
|
||||
auto remote_endpoints = _proxy.local().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name);
|
||||
auto remote_endpoints = _proxy.local().local_db().find_keyspace(keyspace_name).get_effective_replication_map()->get_pending_endpoints(view_token);
|
||||
auto sem_units = pending_view_updates.split(mut.fm.representation().size());
|
||||
|
||||
const bool update_synchronously = should_update_synchronously(*mut.s);
|
||||
|
||||
@@ -64,23 +64,6 @@ sstring abstract_replication_strategy::to_qualified_class_name(std::string_view
|
||||
return strategy_class_registry::to_qualified_class_name(strategy_class_name);
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, const vnode_effective_replication_map& erm) const {
|
||||
const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token);
|
||||
auto res = erm.get_replication_map().find(key_token);
|
||||
return res->second;
|
||||
}
|
||||
|
||||
stop_iteration abstract_replication_strategy::for_each_natural_endpoint_until(const token& search_token, const vnode_effective_replication_map& erm, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
|
||||
const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token);
|
||||
auto res = erm.get_replication_map().find(key_token);
|
||||
for (const auto& ep : res->second) {
|
||||
if (func(ep) == stop_iteration::yes) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
}
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints_without_node_being_replaced(const token& search_token) const {
|
||||
inet_address_vector_replica_set natural_endpoints = get_natural_endpoints(search_token);
|
||||
maybe_remove_node_being_replaced(*_tmptr, *_rs, natural_endpoints);
|
||||
@@ -110,8 +93,41 @@ void maybe_remove_node_being_replaced(const token_metadata& tm,
|
||||
}
|
||||
}
|
||||
|
||||
inet_address_vector_topology_change vnode_effective_replication_map::get_pending_endpoints(const token& search_token, const sstring& ks_name) const {
|
||||
return _tmptr->pending_endpoints_for(search_token, ks_name);
|
||||
static const std::unordered_set<inet_address>* find_token(const ring_mapping& ring_mapping, const token& token) {
|
||||
if (ring_mapping.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
const auto interval = token_metadata::range_to_interval(range<dht::token>(token));
|
||||
const auto it = ring_mapping.find(interval);
|
||||
return it != ring_mapping.end() ? &it->second : nullptr;
|
||||
}
|
||||
|
||||
inet_address_vector_topology_change vnode_effective_replication_map::get_pending_endpoints(const token& search_token) const {
|
||||
inet_address_vector_topology_change endpoints;
|
||||
const auto* pending_endpoints = find_token(_pending_endpoints, search_token);
|
||||
if (pending_endpoints) {
|
||||
// interval_map does not work with std::vector, convert to inet_address_vector_topology_change
|
||||
endpoints = inet_address_vector_topology_change(pending_endpoints->begin(), pending_endpoints->end());
|
||||
}
|
||||
return endpoints;
|
||||
}
|
||||
|
||||
std::optional<inet_address_vector_replica_set> vnode_effective_replication_map::get_endpoints_for_reading(const token& token) const {
|
||||
const auto* endpoints = find_token(_read_endpoints, token);
|
||||
if (endpoints == nullptr) {
|
||||
return {};
|
||||
}
|
||||
return inet_address_vector_replica_set(endpoints->begin(), endpoints->end());
|
||||
}
|
||||
|
||||
bool vnode_effective_replication_map::has_pending_ranges(inet_address endpoint) const {
|
||||
for (const auto& item : _pending_endpoints) {
|
||||
const auto& nodes = item.second;
|
||||
if (nodes.contains(endpoint)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
std::unique_ptr<token_range_splitter> vnode_effective_replication_map::make_splitter() const {
|
||||
@@ -287,10 +303,10 @@ future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
|
||||
vnode_effective_replication_map::get_range_addresses() const {
|
||||
const token_metadata& tm = *_tmptr;
|
||||
std::unordered_map<dht::token_range, inet_address_vector_replica_set> ret;
|
||||
for (const auto& [t, eps] : _replication_map) {
|
||||
for (auto& t : tm.sorted_tokens()) {
|
||||
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
|
||||
for (auto& r : ranges) {
|
||||
ret.emplace(r, eps);
|
||||
ret.emplace(r, get_natural_endpoints(t));
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
@@ -328,65 +344,138 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
static const auto default_replication_map_key = dht::token::from_int64(0);
|
||||
|
||||
future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) {
|
||||
replication_map replication_map;
|
||||
ring_mapping pending_endpoints;
|
||||
ring_mapping read_endpoints;
|
||||
const auto depend_on_token = rs->natural_endpoints_depend_on_token();
|
||||
const auto& sorted_tokens = tmptr->sorted_tokens();
|
||||
replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1);
|
||||
if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) {
|
||||
const auto& all_tokens = topology_changes->all_tokens;
|
||||
const auto& base_token_metadata = topology_changes->base_token_metadata
|
||||
? *topology_changes->base_token_metadata
|
||||
: *tmptr;
|
||||
const auto& current_tokens = tmptr->get_token_to_endpoint();
|
||||
for (size_t i = 0, size = all_tokens.size(); i < size; ++i) {
|
||||
co_await coroutine::maybe_yield();
|
||||
|
||||
if (!sorted_tokens.empty()) {
|
||||
replication_map.reserve(sorted_tokens.size());
|
||||
if (rs->natural_endpoints_depend_on_token()) {
|
||||
for (const auto &t : sorted_tokens) {
|
||||
auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr);
|
||||
replication_map.emplace(t, eps.get_vector());
|
||||
const auto token = all_tokens[i];
|
||||
|
||||
auto current_endpoints = co_await rs->calculate_natural_endpoints(token, base_token_metadata);
|
||||
auto target_endpoints = co_await rs->calculate_natural_endpoints(token, topology_changes->target_token_metadata);
|
||||
|
||||
auto add_mapping = [&](ring_mapping& target, std::unordered_set<inet_address>&& endpoints) {
|
||||
using interval = ring_mapping::interval_type;
|
||||
if (!depend_on_token) {
|
||||
target += std::make_pair(
|
||||
interval::open(dht::minimum_token(), dht::maximum_token()),
|
||||
std::move(endpoints));
|
||||
} else if (i == 0) {
|
||||
target += std::make_pair(
|
||||
interval::open(all_tokens.back(), dht::maximum_token()),
|
||||
endpoints);
|
||||
target += std::make_pair(
|
||||
interval::left_open(dht::minimum_token(), token),
|
||||
std::move(endpoints));
|
||||
} else {
|
||||
target += std::make_pair(
|
||||
interval::left_open(all_tokens[i - 1], token),
|
||||
std::move(endpoints));
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
std::unordered_set<inet_address> endpoints_diff;
|
||||
for (const auto& e: target_endpoints) {
|
||||
if (!current_endpoints.contains(e)) {
|
||||
endpoints_diff.insert(e);
|
||||
}
|
||||
}
|
||||
if (!endpoints_diff.empty()) {
|
||||
add_mapping(pending_endpoints, std::move(endpoints_diff));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
auto eps = co_await rs->calculate_natural_endpoints(sorted_tokens.front(), *tmptr);
|
||||
for (const auto &t : sorted_tokens) {
|
||||
replication_map.emplace(t, eps.get_vector());
|
||||
co_await coroutine::maybe_yield();
|
||||
|
||||
// in order not to waste memory, we update read_endpoints only if the
|
||||
// new endpoints differs from the old one
|
||||
if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) {
|
||||
add_mapping(read_endpoints, std::move(target_endpoints).extract_set());
|
||||
}
|
||||
|
||||
if (!depend_on_token) {
|
||||
replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector());
|
||||
break;
|
||||
} else if (current_tokens.contains(token)) {
|
||||
replication_map.emplace(token, std::move(current_endpoints).extract_vector());
|
||||
}
|
||||
}
|
||||
} else if (depend_on_token) {
|
||||
for (const auto &t : sorted_tokens) {
|
||||
auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr);
|
||||
replication_map.emplace(t, std::move(eps).extract_vector());
|
||||
}
|
||||
} else {
|
||||
auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr);
|
||||
replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector());
|
||||
}
|
||||
|
||||
auto rf = rs->get_replication_factor(*tmptr);
|
||||
co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map), rf);
|
||||
co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map),
|
||||
std::move(pending_endpoints), std::move(read_endpoints), rf);
|
||||
}
|
||||
|
||||
future<replication_map> vnode_effective_replication_map::clone_endpoints_gently() const {
|
||||
replication_map cloned_endpoints;
|
||||
auto vnode_effective_replication_map::clone_data_gently() const -> future<std::unique_ptr<cloned_data>> {
|
||||
auto result = std::make_unique<cloned_data>();
|
||||
|
||||
for (auto& i : _replication_map) {
|
||||
cloned_endpoints.emplace(i.first, i.second);
|
||||
result->replication_map.emplace(i.first, i.second);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
co_return cloned_endpoints;
|
||||
for (const auto& i : _pending_endpoints) {
|
||||
result->pending_endpoints += i;
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
for (const auto& i : _read_endpoints) {
|
||||
result->read_endpoints += i;
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return std::move(result);
|
||||
}
|
||||
|
||||
const inet_address_vector_replica_set& vnode_effective_replication_map::do_get_natural_endpoints(const token& search_token) const {
|
||||
const token& key_token = _rs->natural_endpoints_depend_on_token()
|
||||
? _tmptr->first_token(search_token)
|
||||
: default_replication_map_key;
|
||||
const auto it = _replication_map.find(key_token);
|
||||
return it->second;
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints(const token& search_token) const {
|
||||
return _rs->get_natural_endpoints(search_token, *this);
|
||||
return do_get_natural_endpoints(search_token);
|
||||
}
|
||||
|
||||
stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until(const token& search_token, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
|
||||
return _rs->for_each_natural_endpoint_until(search_token, *this, func);
|
||||
}
|
||||
|
||||
future<> vnode_effective_replication_map::clear_gently() noexcept {
|
||||
co_await utils::clear_gently(_replication_map);
|
||||
co_await utils::clear_gently(_tmptr);
|
||||
for (const auto& ep : do_get_natural_endpoints(search_token)) {
|
||||
if (func(ep) == stop_iteration::yes) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
}
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
vnode_effective_replication_map::~vnode_effective_replication_map() {
|
||||
if (is_registered()) {
|
||||
_factory->erase_effective_replication_map(this);
|
||||
try {
|
||||
struct background_clear_holder {
|
||||
locator::replication_map replication_map;
|
||||
locator::token_metadata_ptr tmptr;
|
||||
};
|
||||
auto holder = make_lw_shared<background_clear_holder>({std::move(_replication_map), std::move(_tmptr)});
|
||||
auto fut = when_all(utils::clear_gently(holder->replication_map), utils::clear_gently(holder->tmptr)).discard_result().then([holder] {});
|
||||
_factory->submit_background_work(std::move(fut));
|
||||
_factory->submit_background_work(clear_gently(std::move(_replication_map),
|
||||
std::move(_pending_endpoints),
|
||||
std::move(_read_endpoints),
|
||||
std::move(_tmptr)));
|
||||
} catch (...) {
|
||||
// ignore
|
||||
}
|
||||
@@ -425,8 +514,9 @@ future<vnode_effective_replication_map_ptr> effective_replication_map_factory::c
|
||||
mutable_vnode_effective_replication_map_ptr new_erm;
|
||||
if (ref_erm) {
|
||||
auto rf = ref_erm->get_replication_factor();
|
||||
auto local_replication_map = co_await ref_erm->clone_endpoints_gently();
|
||||
new_erm = make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(local_replication_map), rf);
|
||||
auto local_data = co_await ref_erm->clone_data_gently();
|
||||
new_erm = make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(local_data->replication_map),
|
||||
std::move(local_data->pending_endpoints), std::move(local_data->read_endpoints), rf);
|
||||
} else {
|
||||
new_erm = co_await calculate_effective_replication_map(std::move(rs), std::move(tmptr));
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@
|
||||
#include <memory>
|
||||
#include <functional>
|
||||
#include <unordered_map>
|
||||
#include <boost/icl/interval.hpp>
|
||||
#include <boost/icl/interval_map.hpp>
|
||||
#include "gms/inet_address.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "locator/snitch_base.hh"
|
||||
@@ -65,6 +67,7 @@ protected:
|
||||
replication_strategy_type _my_type;
|
||||
bool _per_table = false;
|
||||
bool _uses_tablets = false;
|
||||
bool _natural_endpoints_depend_on_token = true;
|
||||
|
||||
template <typename... Args>
|
||||
void err(const char* fmt, Args&&... args) const {
|
||||
@@ -90,7 +93,7 @@ public:
|
||||
|
||||
// Evaluates to true iff calculate_natural_endpoints
|
||||
// returns different results for different tokens.
|
||||
virtual bool natural_endpoints_depend_on_token() const noexcept { return true; }
|
||||
bool natural_endpoints_depend_on_token() const noexcept { return _natural_endpoints_depend_on_token; }
|
||||
|
||||
// The returned vector has size O(number of normal token owners), which is O(number of nodes in the cluster).
|
||||
// Note: it is not guaranteed that the function will actually yield. If the complexity of a particular implementation
|
||||
@@ -110,9 +113,6 @@ public:
|
||||
|
||||
static sstring to_qualified_class_name(std::string_view strategy_class_name);
|
||||
|
||||
virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token, const vnode_effective_replication_map& erm) const;
|
||||
// Returns the last stop_iteration result of the called func
|
||||
virtual stop_iteration for_each_natural_endpoint_until(const token& search_token, const vnode_effective_replication_map& erm, const noncopyable_function<stop_iteration(const inet_address&)>& func) const;
|
||||
virtual void validate_options(const gms::feature_service&) const = 0;
|
||||
virtual std::optional<std::unordered_set<sstring>> recognized_options(const topology&) const = 0;
|
||||
virtual size_t get_replication_factor(const token_metadata& tm) const = 0;
|
||||
@@ -154,6 +154,7 @@ public:
|
||||
future<dht::token_range_vector> get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, inet_address pending_address, locator::endpoint_dc_rack dr) const;
|
||||
};
|
||||
|
||||
using ring_mapping = boost::icl::interval_map<token, std::unordered_set<inet_address>>;
|
||||
using replication_strategy_ptr = seastar::shared_ptr<const abstract_replication_strategy>;
|
||||
using mutable_replication_strategy_ptr = seastar::shared_ptr<abstract_replication_strategy>;
|
||||
|
||||
@@ -197,7 +198,17 @@ public:
|
||||
/// Returns the set of pending replicas for a given token.
|
||||
/// Pending replica is a replica which gains ownership of data.
|
||||
/// Non-empty only during topology change.
|
||||
virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const = 0;
|
||||
virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const = 0;
|
||||
|
||||
/// Returns a list of nodes to which a read request should be directed.
|
||||
/// Returns not null only during topology changes, if request_read_new was called and
|
||||
/// new set of replicas differs from the old one.
|
||||
virtual std::optional<inet_address_vector_replica_set> get_endpoints_for_reading(const token& search_token) const = 0;
|
||||
|
||||
/// Returns true if there are any pending ranges for this endpoint.
|
||||
/// This operation is expensive, for vnode_erm it iterates
|
||||
/// over all pending ranges which is O(number of tokens).
|
||||
virtual bool has_pending_ranges(inet_address endpoint) const = 0;
|
||||
|
||||
/// Returns a token_range_splitter which is line with the replica assignment of this replication map.
|
||||
/// The splitter can live longer than this instance.
|
||||
@@ -247,9 +258,10 @@ public:
|
||||
|
||||
sstring to_sstring() const;
|
||||
};
|
||||
|
||||
private:
|
||||
replication_map _replication_map;
|
||||
ring_mapping _pending_endpoints;
|
||||
ring_mapping _read_endpoints;
|
||||
std::optional<factory_key> _factory_key = std::nullopt;
|
||||
effective_replication_map_factory* _factory = nullptr;
|
||||
|
||||
@@ -258,24 +270,30 @@ private:
|
||||
public: // effective_replication_map
|
||||
inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override;
|
||||
inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override;
|
||||
inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const override;
|
||||
inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override;
|
||||
std::optional<inet_address_vector_replica_set> get_endpoints_for_reading(const token& search_token) const override;
|
||||
bool has_pending_ranges(inet_address endpoint) const override;
|
||||
std::unique_ptr<token_range_splitter> make_splitter() const override;
|
||||
public:
|
||||
explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) noexcept
|
||||
explicit vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map,
|
||||
ring_mapping pending_endpoints, ring_mapping read_endpoints, size_t replication_factor) noexcept
|
||||
: effective_replication_map(std::move(rs), std::move(tmptr), replication_factor)
|
||||
, _replication_map(std::move(replication_map))
|
||||
, _pending_endpoints(std::move(pending_endpoints))
|
||||
, _read_endpoints(std::move(read_endpoints))
|
||||
{ }
|
||||
vnode_effective_replication_map() = delete;
|
||||
vnode_effective_replication_map(vnode_effective_replication_map&&) = default;
|
||||
~vnode_effective_replication_map();
|
||||
|
||||
const replication_map& get_replication_map() const noexcept {
|
||||
return _replication_map;
|
||||
}
|
||||
|
||||
future<> clear_gently() noexcept;
|
||||
|
||||
future<replication_map> clone_endpoints_gently() const;
|
||||
struct cloned_data {
|
||||
replication_map replication_map;
|
||||
ring_mapping pending_endpoints;
|
||||
ring_mapping read_endpoints;
|
||||
};
|
||||
// boost::icl::interval_map is not no_throw_move_constructible -> can't return cloned_data by val,
|
||||
// since future_state requires T to be no_throw_move_constructible.
|
||||
future<std::unique_ptr<cloned_data>> clone_data_gently() const;
|
||||
|
||||
stop_iteration for_each_natural_endpoint_until(const token& search_token, const noncopyable_function<stop_iteration(const inet_address&)>& func) const;
|
||||
|
||||
@@ -309,6 +327,7 @@ public:
|
||||
|
||||
private:
|
||||
dht::token_range_vector do_get_ranges(noncopyable_function<stop_iteration(bool& add_range, const inet_address& natural_endpoint)> consider_range_for_endpoint) const;
|
||||
const inet_address_vector_replica_set& do_get_natural_endpoints(const token& search_token) const;
|
||||
|
||||
public:
|
||||
static factory_key make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr);
|
||||
@@ -336,9 +355,11 @@ using mutable_vnode_effective_replication_map_ptr = shared_ptr<vnode_effective_r
|
||||
using vnode_erm_ptr = vnode_effective_replication_map_ptr;
|
||||
using mutable_vnode_erm_ptr = mutable_vnode_effective_replication_map_ptr;
|
||||
|
||||
inline mutable_vnode_erm_ptr make_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) {
|
||||
inline mutable_vnode_erm_ptr make_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr, replication_map replication_map, ring_mapping pending_endpoints,
|
||||
ring_mapping read_endpoints, size_t replication_factor) {
|
||||
return seastar::make_shared<vnode_effective_replication_map>(
|
||||
std::move(rs), std::move(tmptr), std::move(replication_map), replication_factor);
|
||||
std::move(rs), std::move(tmptr), std::move(replication_map),
|
||||
std::move(pending_endpoints), std::move(read_endpoints), replication_factor);
|
||||
}
|
||||
|
||||
// Apply the replication strategy over the current configuration and the given token_metadata.
|
||||
|
||||
@@ -17,38 +17,23 @@
|
||||
namespace locator {
|
||||
|
||||
everywhere_replication_strategy::everywhere_replication_strategy(const replication_strategy_config_options& config_options) :
|
||||
abstract_replication_strategy(config_options, replication_strategy_type::everywhere_topology) {}
|
||||
abstract_replication_strategy(config_options, replication_strategy_type::everywhere_topology) {
|
||||
_natural_endpoints_depend_on_token = false;
|
||||
}
|
||||
|
||||
future<endpoint_set> everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const {
|
||||
auto eps = tm.get_all_endpoints();
|
||||
return make_ready_future<endpoint_set>(endpoint_set(eps.begin(), eps.end()));
|
||||
if (tm.sorted_tokens().empty()) {
|
||||
endpoint_set result{inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()})};
|
||||
return make_ready_future<endpoint_set>(std::move(result));
|
||||
}
|
||||
const auto& all_endpoints = tm.get_all_endpoints();
|
||||
return make_ready_future<endpoint_set>(endpoint_set(all_endpoints.begin(), all_endpoints.end()));
|
||||
}
|
||||
|
||||
size_t everywhere_replication_strategy::get_replication_factor(const token_metadata& tm) const {
|
||||
return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners();
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set everywhere_replication_strategy::get_natural_endpoints(const token&, const vnode_effective_replication_map& erm) const {
|
||||
const auto& tm = *erm.get_token_metadata_ptr();
|
||||
if (tm.sorted_tokens().empty()) {
|
||||
return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()});
|
||||
}
|
||||
return boost::copy_range<inet_address_vector_replica_set>(tm.get_all_endpoints());
|
||||
}
|
||||
|
||||
stop_iteration everywhere_replication_strategy::for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map& erm, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
|
||||
const auto& tm = *erm.get_token_metadata_ptr();
|
||||
if (tm.sorted_tokens().empty()) {
|
||||
return func(utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
for (const auto& ep : tm.get_all_endpoints()) {
|
||||
if (func(ep)) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
}
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
using registry = class_registrator<abstract_replication_strategy, everywhere_replication_strategy, const replication_strategy_config_options&>;
|
||||
static registry registrator("org.apache.cassandra.locator.EverywhereStrategy");
|
||||
static registry registrator_short_name("EverywhereStrategy");
|
||||
|
||||
@@ -18,8 +18,6 @@ class everywhere_replication_strategy : public abstract_replication_strategy {
|
||||
public:
|
||||
everywhere_replication_strategy(const replication_strategy_config_options& config_options);
|
||||
|
||||
virtual bool natural_endpoints_depend_on_token() const noexcept override { return false; }
|
||||
|
||||
virtual future<endpoint_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
|
||||
|
||||
virtual void validate_options(const gms::feature_service&) const override { /* noop */ }
|
||||
@@ -34,12 +32,5 @@ public:
|
||||
virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* We need to override this because the default implementation depends
|
||||
* on token calculations but everywhere_replication_strategy may be used before tokens are set up.
|
||||
*/
|
||||
virtual inet_address_vector_replica_set get_natural_endpoints(const token&, const vnode_effective_replication_map&) const override;
|
||||
virtual stop_iteration for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function<stop_iteration(const inet_address&)>& func) const override;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -15,7 +15,9 @@
|
||||
namespace locator {
|
||||
|
||||
local_strategy::local_strategy(const replication_strategy_config_options& config_options) :
|
||||
abstract_replication_strategy(config_options, replication_strategy_type::local) {}
|
||||
abstract_replication_strategy(config_options, replication_strategy_type::local) {
|
||||
_natural_endpoints_depend_on_token = false;
|
||||
}
|
||||
|
||||
future<endpoint_set> local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const {
|
||||
return make_ready_future<endpoint_set>(endpoint_set({utils::fb_utilities::get_broadcast_address()}));
|
||||
@@ -33,14 +35,6 @@ size_t local_strategy::get_replication_factor(const token_metadata&) const {
|
||||
return 1;
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set local_strategy::get_natural_endpoints(const token&, const vnode_effective_replication_map&) const {
|
||||
return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()});
|
||||
}
|
||||
|
||||
stop_iteration local_strategy::for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
|
||||
return func(utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
|
||||
using registry = class_registrator<abstract_replication_strategy, local_strategy, const replication_strategy_config_options&>;
|
||||
static registry registrator("org.apache.cassandra.locator.LocalStrategy");
|
||||
static registry registrator_short_name("LocalStrategy");
|
||||
|
||||
@@ -27,8 +27,6 @@ public:
|
||||
virtual ~local_strategy() {};
|
||||
virtual size_t get_replication_factor(const token_metadata&) const override;
|
||||
|
||||
virtual bool natural_endpoints_depend_on_token() const noexcept override { return false; }
|
||||
|
||||
virtual future<endpoint_set> calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override;
|
||||
|
||||
virtual void validate_options(const gms::feature_service&) const override;
|
||||
@@ -38,13 +36,6 @@ public:
|
||||
virtual bool allow_remove_node_being_replaced_from_natural_endpoints() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* We need to override this because the default implementation depends
|
||||
* on token calculations but LocalStrategy may be used before tokens are set up.
|
||||
*/
|
||||
inet_address_vector_replica_set get_natural_endpoints(const token&, const vnode_effective_replication_map&) const override;
|
||||
virtual stop_iteration for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function<stop_iteration(const inet_address&)>& func) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -228,7 +228,7 @@ public:
|
||||
return result;
|
||||
}
|
||||
|
||||
virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token, const sstring& ks_name) const override {
|
||||
virtual inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override {
|
||||
auto&& tablets = get_tablet_map();
|
||||
auto tablet = tablets.get_tablet_id(search_token);
|
||||
auto&& info = tablets.get_tablet_transition_info(tablet);
|
||||
@@ -240,6 +240,23 @@ public:
|
||||
return {get_endpoint_for_host_id(info->pending_replica.host)};
|
||||
}
|
||||
|
||||
virtual std::optional<inet_address_vector_replica_set> get_endpoints_for_reading(const token& search_token) const override {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
virtual bool has_pending_ranges(inet_address endpoint) const override {
|
||||
const auto host_id = _tmptr->get_host_id_if_known(endpoint);
|
||||
if (!host_id.has_value()) {
|
||||
return false;
|
||||
}
|
||||
for (const auto& [id, transition_info]: get_tablet_map().transitions()) {
|
||||
if (transition_info.pending_replica.host == *host_id) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual std::unique_ptr<token_range_splitter> make_splitter() const override {
|
||||
class splitter : public token_range_splitter {
|
||||
token_metadata_ptr _tmptr; // To keep the tablet map alive.
|
||||
|
||||
@@ -152,6 +152,10 @@ public:
|
||||
return _tablets;
|
||||
}
|
||||
|
||||
const auto& transitions() const {
|
||||
return _transitions;
|
||||
}
|
||||
|
||||
/// Returns an iterable range over tablet_id:s which includes all tablets in token ring order.
|
||||
auto tablet_ids() const {
|
||||
return boost::irange<size_t>(0, tablet_count()) | boost::adaptors::transformed([] (size_t i) {
|
||||
|
||||
@@ -59,20 +59,7 @@ private:
|
||||
// The map between the existing node to be replaced and the replacing node
|
||||
std::unordered_map<inet_address, inet_address> _replacing_endpoints;
|
||||
|
||||
using ring_mapping = boost::icl::interval_map<token, std::unordered_set<inet_address>>;
|
||||
// For each keyspace, migration_info contains ranges of tokens and
|
||||
// corresponding replicas to which writes or reads will be directed:
|
||||
// - pending_endpoints - will be appended to normal endpoints for writes;
|
||||
// - read_endpoints - will completely replace normal endpoints for reads.
|
||||
// This data structure is filled only during data migration between nodes
|
||||
// when they are added or removed from the cluster.
|
||||
// During normal operation, token mapping to nodes is
|
||||
// implemented in the effective_replication_map.
|
||||
struct migration_info {
|
||||
ring_mapping pending_endpoints;
|
||||
ring_mapping read_endpoints;
|
||||
};
|
||||
std::unordered_map<sstring, migration_info> _keyspace_to_migration_info;
|
||||
std::optional<topology_change_info> _topology_change_info;
|
||||
|
||||
std::vector<token> _sorted_tokens;
|
||||
|
||||
@@ -80,7 +67,7 @@ private:
|
||||
|
||||
topology _topology;
|
||||
|
||||
std::optional<service::topology::transition_state> _topology_transition_state;
|
||||
token_metadata::read_new_t _read_new = token_metadata::read_new_t::no;
|
||||
|
||||
long _ring_version = 0;
|
||||
static thread_local long _static_ring_version;
|
||||
@@ -242,21 +229,10 @@ public:
|
||||
static range<dht::token> interval_to_range(boost::icl::interval<token>::interval_type i);
|
||||
|
||||
public:
|
||||
bool has_pending_ranges(sstring keyspace_name, inet_address endpoint) const;
|
||||
|
||||
/**
|
||||
* Calculate pending ranges according to bootstrapping, leaving and replacing nodes.
|
||||
*
|
||||
* We construct an updated version of the token_metadata by incorporating
|
||||
* all proposed modifications (join, bootstrap, and replace operations).
|
||||
* Subsequently, for each token range, we compare the outcomes of the calculate_natural_endpoints
|
||||
* function applied to both the previous and the new token_metadata.
|
||||
* Endpoints present in the updated version but absent in the original one
|
||||
* ought to be appended to the pending_ranges.
|
||||
*/
|
||||
future<> update_pending_ranges(
|
||||
const token_metadata& unpimplified_this,
|
||||
const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack);
|
||||
future<> update_topology_change_info(dc_rack_fn& get_dc_rack);
|
||||
const std::optional<topology_change_info>& get_topology_change_info() const {
|
||||
return _topology_change_info;
|
||||
}
|
||||
public:
|
||||
|
||||
token get_predecessor(token t) const;
|
||||
@@ -274,22 +250,14 @@ public:
|
||||
size_t count_normal_token_owners() const;
|
||||
private:
|
||||
future<> update_normal_token_owners();
|
||||
|
||||
enum class endpoints_field {
|
||||
pending_endpoints,
|
||||
read_endpoints
|
||||
};
|
||||
const std::unordered_set<inet_address>* maybe_migration_endpoints(endpoints_field field,
|
||||
const token& token,
|
||||
const sstring& keyspace_name) const;
|
||||
public:
|
||||
// returns empty vector if keyspace_name not found.
|
||||
inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
|
||||
|
||||
std::optional<inet_address_vector_replica_set> endpoints_for_reading(const token& token, const sstring& keyspace_name) const;
|
||||
|
||||
void set_topology_transition_state(std::optional<service::topology::transition_state> state) {
|
||||
_topology_transition_state = state;
|
||||
void set_read_new(token_metadata::read_new_t read_new) {
|
||||
_read_new = read_new;
|
||||
}
|
||||
|
||||
public:
|
||||
@@ -359,10 +327,6 @@ future<std::unique_ptr<token_metadata_impl>> token_metadata_impl::clone_async()
|
||||
}
|
||||
ret->_leaving_endpoints = _leaving_endpoints;
|
||||
ret->_replacing_endpoints = _replacing_endpoints;
|
||||
for (const auto& p : _keyspace_to_migration_info) {
|
||||
ret->_keyspace_to_migration_info.emplace(p);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
ret->_ring_version = _ring_version;
|
||||
co_return ret;
|
||||
}
|
||||
@@ -381,7 +345,7 @@ future<std::unique_ptr<token_metadata_impl>> token_metadata_impl::clone_only_tok
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
ret->_tablets = _tablets;
|
||||
ret->_topology_transition_state =_topology_transition_state;
|
||||
ret->_read_new = _read_new;
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -391,7 +355,6 @@ future<> token_metadata_impl::clear_gently() noexcept {
|
||||
co_await utils::clear_gently(_bootstrap_tokens);
|
||||
co_await utils::clear_gently(_leaving_endpoints);
|
||||
co_await utils::clear_gently(_replacing_endpoints);
|
||||
co_await utils::clear_gently(_keyspace_to_migration_info);
|
||||
co_await utils::clear_gently(_sorted_tokens);
|
||||
co_await _topology.clear_gently();
|
||||
co_await _tablets.clear_gently();
|
||||
@@ -723,157 +686,84 @@ token_metadata_impl::interval_to_range(boost::icl::interval<token>::interval_typ
|
||||
return range<dht::token>({{i.lower(), start_inclusive}}, {{i.upper(), end_inclusive}});
|
||||
}
|
||||
|
||||
bool
|
||||
token_metadata_impl::has_pending_ranges(sstring keyspace_name, inet_address endpoint) const {
|
||||
const auto it = _keyspace_to_migration_info.find(keyspace_name);
|
||||
if (it == _keyspace_to_migration_info.end()) {
|
||||
return false;
|
||||
}
|
||||
for (const auto& item : it->second.pending_endpoints) {
|
||||
const auto& nodes = item.second;
|
||||
if (nodes.contains(endpoint)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
future<> token_metadata_impl::update_pending_ranges(
|
||||
const token_metadata& unpimplified_this,
|
||||
const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack) {
|
||||
tlogger.debug("calculate_pending_ranges: keyspace_name={}, bootstrap_tokens={}, leaving nodes={}, replacing_endpoints={}",
|
||||
keyspace_name, _bootstrap_tokens, _leaving_endpoints, _replacing_endpoints);
|
||||
future<> token_metadata_impl::update_topology_change_info(dc_rack_fn& get_dc_rack) {
|
||||
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _replacing_endpoints.empty()) {
|
||||
tlogger.debug("No bootstrapping, leaving nodes, replacing nodes -> empty pending ranges for {}", keyspace_name);
|
||||
_keyspace_to_migration_info.erase(keyspace_name);
|
||||
return make_ready_future<>();
|
||||
co_await utils::clear_gently(_topology_change_info);
|
||||
_topology_change_info.reset();
|
||||
co_return;
|
||||
}
|
||||
|
||||
return async([this, &unpimplified_this, &strategy, keyspace_name, &get_dc_rack] () mutable {
|
||||
// true if there is a node replaced with the same IP
|
||||
bool replace_with_same_endpoint = false;
|
||||
// new_token_metadata incorporates all the changes from leaving, bootstrapping and replacing
|
||||
const auto new_token_metadata = token_metadata(std::invoke([&]() -> std::unique_ptr<token_metadata_impl> {
|
||||
auto result = clone_only_token_map(false).get0();
|
||||
|
||||
// construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints
|
||||
std::unordered_map<inet_address, std::unordered_set<token>> new_normal_tokens;
|
||||
if (!_replacing_endpoints.empty()) {
|
||||
for (const auto& [token, inet_address]: _token_to_endpoint_map) {
|
||||
const auto it = _replacing_endpoints.find(inet_address);
|
||||
if (it == _replacing_endpoints.end()) {
|
||||
continue;
|
||||
}
|
||||
new_normal_tokens[it->second].insert(token);
|
||||
// true if there is a node replaced with the same IP
|
||||
bool replace_with_same_endpoint = false;
|
||||
// target_token_metadata incorporates all the changes from leaving, bootstrapping and replacing
|
||||
auto target_token_metadata = co_await clone_only_token_map(false);
|
||||
{
|
||||
// construct new_normal_tokens based on _bootstrap_tokens and _replacing_endpoints
|
||||
std::unordered_map<inet_address, std::unordered_set<token>> new_normal_tokens;
|
||||
if (!_replacing_endpoints.empty()) {
|
||||
for (const auto& [token, inet_address]: _token_to_endpoint_map) {
|
||||
const auto it = _replacing_endpoints.find(inet_address);
|
||||
if (it == _replacing_endpoints.end()) {
|
||||
continue;
|
||||
}
|
||||
for (const auto& [replace_from, replace_to]: _replacing_endpoints) {
|
||||
if (replace_from == replace_to) {
|
||||
replace_with_same_endpoint = true;
|
||||
} else {
|
||||
result->remove_endpoint(replace_from);
|
||||
}
|
||||
new_normal_tokens[it->second].insert(token);
|
||||
}
|
||||
for (const auto& [replace_from, replace_to]: _replacing_endpoints) {
|
||||
if (replace_from == replace_to) {
|
||||
replace_with_same_endpoint = true;
|
||||
} else {
|
||||
target_token_metadata->remove_endpoint(replace_from);
|
||||
}
|
||||
}
|
||||
for (const auto& [token, inet_address]: _bootstrap_tokens) {
|
||||
new_normal_tokens[inet_address].insert(token);
|
||||
}
|
||||
// apply new_normal_tokens
|
||||
for (auto& [endpoint, tokens]: new_normal_tokens) {
|
||||
result->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal);
|
||||
result->update_normal_tokens(std::move(tokens), endpoint).get();
|
||||
}
|
||||
// apply leaving endpoints
|
||||
for (const auto& endpoint: _leaving_endpoints) {
|
||||
result->remove_endpoint(endpoint);
|
||||
}
|
||||
result->sort_tokens();
|
||||
return result;
|
||||
}));
|
||||
|
||||
// We require a distinct token_metadata instance when replace_from equals replace_to,
|
||||
// as it ensures the node is included in pending_ranges.
|
||||
// Otherwise, the node would be excluded from both pending_ranges and
|
||||
// get_natural_endpoints_without_node_being_replaced,
|
||||
// causing the coordinator to overlook it entirely.
|
||||
const token_metadata* base_token_metadata;
|
||||
std::optional<token_metadata> self_copy;
|
||||
if (replace_with_same_endpoint) {
|
||||
self_copy = token_metadata(std::invoke([&]() -> std::unique_ptr<token_metadata_impl> {
|
||||
auto result = clone_only_token_map(false).get0();
|
||||
for (const auto& [replace_from, replace_to]: _replacing_endpoints) {
|
||||
if (replace_from == replace_to) {
|
||||
result->remove_endpoint(replace_from);
|
||||
}
|
||||
}
|
||||
result->sort_tokens();
|
||||
return result;
|
||||
}));
|
||||
base_token_metadata = &*self_copy;
|
||||
} else {
|
||||
base_token_metadata = &unpimplified_this;
|
||||
}
|
||||
for (const auto& [token, inet_address]: _bootstrap_tokens) {
|
||||
new_normal_tokens[inet_address].insert(token);
|
||||
}
|
||||
// apply new_normal_tokens
|
||||
for (auto& [endpoint, tokens]: new_normal_tokens) {
|
||||
target_token_metadata->update_topology(endpoint, get_dc_rack(endpoint), node::state::normal);
|
||||
co_await target_token_metadata->update_normal_tokens(std::move(tokens), endpoint);
|
||||
}
|
||||
// apply leaving endpoints
|
||||
for (const auto& endpoint: _leaving_endpoints) {
|
||||
target_token_metadata->remove_endpoint(endpoint);
|
||||
}
|
||||
target_token_metadata->sort_tokens();
|
||||
}
|
||||
|
||||
// merge tokens from token_to_endpoint and bootstrap_tokens,
|
||||
// preserving tokens of leaving endpoints
|
||||
const auto tokens = std::invoke([&]() -> std::vector<dht::token> {
|
||||
auto tokens = std::vector<dht::token>();
|
||||
tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size());
|
||||
tokens.resize(sorted_tokens().size());
|
||||
std::copy(begin(sorted_tokens()), end(sorted_tokens()), begin(tokens));
|
||||
for (const auto& p: get_bootstrap_tokens()) {
|
||||
tokens.push_back(p.first);
|
||||
// We require a distinct token_metadata instance when replace_from equals replace_to,
|
||||
// as it ensures the node is included in pending_ranges.
|
||||
// Otherwise, the node would be excluded from both pending_ranges and
|
||||
// get_natural_endpoints_without_node_being_replaced,
|
||||
// causing the coordinator to overlook it entirely.
|
||||
std::unique_ptr<token_metadata_impl> base_token_metadata;
|
||||
if (replace_with_same_endpoint) {
|
||||
base_token_metadata = co_await clone_only_token_map(false);
|
||||
for (const auto& [replace_from, replace_to]: _replacing_endpoints) {
|
||||
if (replace_from == replace_to) {
|
||||
base_token_metadata->remove_endpoint(replace_from);
|
||||
}
|
||||
std::sort(begin(tokens), end(tokens));
|
||||
return tokens;
|
||||
});
|
||||
}
|
||||
base_token_metadata->sort_tokens();
|
||||
}
|
||||
|
||||
_keyspace_to_migration_info[keyspace_name] = std::invoke([&]() -> migration_info {
|
||||
migration_info migration_info;
|
||||
for (size_t i = 0, size = tokens.size(); i < size; ++i) {
|
||||
seastar::thread::maybe_yield();
|
||||
// merge tokens from token_to_endpoint and bootstrap_tokens,
|
||||
// preserving tokens of leaving endpoints
|
||||
auto all_tokens = std::vector<dht::token>();
|
||||
all_tokens.reserve(sorted_tokens().size() + get_bootstrap_tokens().size());
|
||||
all_tokens.resize(sorted_tokens().size());
|
||||
std::copy(begin(sorted_tokens()), end(sorted_tokens()), begin(all_tokens));
|
||||
for (const auto& p: get_bootstrap_tokens()) {
|
||||
all_tokens.push_back(p.first);
|
||||
}
|
||||
std::sort(begin(all_tokens), end(all_tokens));
|
||||
|
||||
const auto token = tokens[i];
|
||||
|
||||
const auto old_endpoints = strategy.calculate_natural_endpoints(token, *base_token_metadata).get0();
|
||||
auto new_endpoints = strategy.calculate_natural_endpoints(token, new_token_metadata).get0();
|
||||
|
||||
auto add_mapping = [&](ring_mapping& target, std::unordered_set<inet_address>&& endpoints) {
|
||||
using interval = ring_mapping::interval_type;
|
||||
if (i == 0) {
|
||||
target += std::make_pair(
|
||||
interval::open(tokens.back(), dht::maximum_token()),
|
||||
endpoints);
|
||||
target += std::make_pair(
|
||||
interval::left_open(dht::minimum_token(), token),
|
||||
std::move(endpoints));
|
||||
} else {
|
||||
target += std::make_pair(
|
||||
interval::left_open(tokens[i - 1], token),
|
||||
std::move(endpoints));
|
||||
}
|
||||
};
|
||||
|
||||
std::unordered_set<inet_address> pending_endpoints;
|
||||
for (const auto& e: new_endpoints) {
|
||||
if (!old_endpoints.contains(e)) {
|
||||
pending_endpoints.insert(e);
|
||||
}
|
||||
}
|
||||
if (!pending_endpoints.empty()) {
|
||||
add_mapping(migration_info.pending_endpoints, std::move(pending_endpoints));
|
||||
}
|
||||
|
||||
// in order not to waste memory, we update read_endpoints only if the
|
||||
// new endpoints differs from the old one
|
||||
if (_topology_transition_state == service::topology::transition_state::write_both_read_new &&
|
||||
new_endpoints.get_vector() != old_endpoints.get_vector())
|
||||
{
|
||||
add_mapping(migration_info.read_endpoints, std::move(new_endpoints).extract_set());
|
||||
}
|
||||
}
|
||||
return migration_info;
|
||||
});
|
||||
});
|
||||
auto prev_value = std::move(_topology_change_info);
|
||||
_topology_change_info.emplace(token_metadata(std::move(target_token_metadata)),
|
||||
base_token_metadata ? std::optional(token_metadata(std::move(base_token_metadata))): std::nullopt,
|
||||
std::move(all_tokens),
|
||||
_read_new);
|
||||
co_await utils::clear_gently(prev_value);
|
||||
}
|
||||
|
||||
size_t token_metadata_impl::count_normal_token_owners() const {
|
||||
@@ -911,50 +801,6 @@ void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) {
|
||||
_replacing_endpoints.erase(existing_node);
|
||||
}
|
||||
|
||||
const std::unordered_set<inet_address>* token_metadata_impl::maybe_migration_endpoints(endpoints_field field,
|
||||
const token& token,
|
||||
const sstring& keyspace_name) const
|
||||
{
|
||||
// Fast path 0: migration_info not found for this keyspace_name
|
||||
const auto migration_info_it = _keyspace_to_migration_info.find(keyspace_name);
|
||||
if (migration_info_it == _keyspace_to_migration_info.end()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Fast path 1: empty ring_mapping for this keyspace_name
|
||||
const auto& migration_info = migration_info_it->second;
|
||||
const auto& ring_mapping = field == endpoints_field::pending_endpoints
|
||||
? migration_info.pending_endpoints
|
||||
: migration_info.read_endpoints;
|
||||
if (ring_mapping.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Slow path: lookup remapping
|
||||
const auto interval = range_to_interval(range<dht::token>(token));
|
||||
const auto it = ring_mapping.find(interval);
|
||||
return it != ring_mapping.end() ? &it->second : nullptr;
|
||||
}
|
||||
|
||||
inet_address_vector_topology_change token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const {
|
||||
inet_address_vector_topology_change endpoints;
|
||||
const auto* pending_endpoints = maybe_migration_endpoints(endpoints_field::pending_endpoints,
|
||||
token, keyspace_name);
|
||||
if (pending_endpoints) {
|
||||
// interval_map does not work with std::vector, convert to inet_address_vector_topology_change
|
||||
endpoints = inet_address_vector_topology_change(pending_endpoints->begin(), pending_endpoints->end());
|
||||
}
|
||||
return endpoints;
|
||||
}
|
||||
|
||||
std::optional<inet_address_vector_replica_set> token_metadata_impl::endpoints_for_reading(const token& token, const sstring& keyspace_name) const {
|
||||
const auto* endpoints = maybe_migration_endpoints(endpoints_field::read_endpoints, token, keyspace_name);
|
||||
if (endpoints == nullptr) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return inet_address_vector_replica_set(endpoints->begin(), endpoints->end());
|
||||
}
|
||||
|
||||
std::map<token, inet_address> token_metadata_impl::get_normal_and_bootstrapping_token_to_endpoint_map() const {
|
||||
std::map<token, inet_address> ret(_token_to_endpoint_map.begin(), _token_to_endpoint_map.end());
|
||||
ret.insert(_bootstrap_tokens.begin(), _bootstrap_tokens.end());
|
||||
@@ -969,6 +815,23 @@ std::multimap<inet_address, token> token_metadata_impl::get_endpoint_to_token_ma
|
||||
return cloned;
|
||||
}
|
||||
|
||||
topology_change_info::topology_change_info(token_metadata target_token_metadata_,
|
||||
std::optional<token_metadata> base_token_metadata_,
|
||||
std::vector<dht::token> all_tokens_,
|
||||
token_metadata::read_new_t read_new_)
|
||||
: target_token_metadata(std::move(target_token_metadata_))
|
||||
, base_token_metadata(std::move(base_token_metadata_))
|
||||
, all_tokens(std::move(all_tokens_))
|
||||
, read_new(read_new_)
|
||||
{
|
||||
}
|
||||
|
||||
future<> topology_change_info::clear_gently() {
|
||||
co_await utils::clear_gently(target_token_metadata);
|
||||
co_await utils::clear_gently(base_token_metadata);
|
||||
co_await utils::clear_gently(all_tokens);
|
||||
}
|
||||
|
||||
token_metadata::token_metadata(std::unique_ptr<token_metadata_impl> impl)
|
||||
: _impl(std::move(impl)) {
|
||||
}
|
||||
@@ -1222,14 +1085,14 @@ token_metadata::interval_to_range(boost::icl::interval<token>::interval_type i)
|
||||
return token_metadata_impl::interval_to_range(std::move(i));
|
||||
}
|
||||
|
||||
bool
|
||||
token_metadata::has_pending_ranges(sstring keyspace_name, inet_address endpoint) const {
|
||||
return _impl->has_pending_ranges(std::move(keyspace_name), endpoint);
|
||||
future<>
|
||||
token_metadata::update_topology_change_info(dc_rack_fn& get_dc_rack) {
|
||||
return _impl->update_topology_change_info(get_dc_rack);
|
||||
}
|
||||
|
||||
future<>
|
||||
token_metadata::update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack) {
|
||||
return _impl->update_pending_ranges(*this, strategy, keyspace_name, get_dc_rack);
|
||||
const std::optional<topology_change_info>&
|
||||
token_metadata::get_topology_change_info() const {
|
||||
return _impl->get_topology_change_info();
|
||||
}
|
||||
|
||||
token
|
||||
@@ -1247,19 +1110,9 @@ token_metadata::count_normal_token_owners() const {
|
||||
return _impl->count_normal_token_owners();
|
||||
}
|
||||
|
||||
inet_address_vector_topology_change
|
||||
token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) const {
|
||||
return _impl->pending_endpoints_for(token, keyspace_name);
|
||||
}
|
||||
|
||||
std::optional<inet_address_vector_replica_set>
|
||||
token_metadata::endpoints_for_reading(const token& token, const sstring& keyspace_name) const {
|
||||
return _impl->endpoints_for_reading(token, keyspace_name);
|
||||
}
|
||||
|
||||
void
|
||||
token_metadata::set_topology_transition_state(std::optional<service::topology::transition_state> state) {
|
||||
_impl->set_topology_transition_state(state);
|
||||
token_metadata::set_read_new(read_new_t read_new) {
|
||||
_impl->set_read_new(read_new);
|
||||
}
|
||||
|
||||
std::multimap<inet_address, token>
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
|
||||
#include "locator/types.hh"
|
||||
#include "locator/topology.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
|
||||
// forward declaration since replica/database.hh includes this file
|
||||
namespace replica {
|
||||
@@ -70,6 +69,7 @@ struct host_id_or_endpoint {
|
||||
};
|
||||
|
||||
class token_metadata_impl;
|
||||
struct topology_change_info;
|
||||
|
||||
class token_metadata final {
|
||||
std::unique_ptr<token_metadata_impl> _impl;
|
||||
@@ -244,18 +244,9 @@ public:
|
||||
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
|
||||
static range<dht::token> interval_to_range(boost::icl::interval<token>::interval_type i);
|
||||
|
||||
bool has_pending_ranges(sstring keyspace_name, inet_address endpoint) const;
|
||||
/**
|
||||
* Calculate pending ranges according to bootstrapping, leaving and replacing nodes.
|
||||
*
|
||||
* We construct an updated version of the token_metadata by incorporating
|
||||
* all proposed modifications (join, bootstrap, and replace operations).
|
||||
* Subsequently, for each token range, we compare the outcomes of the calculate_natural_endpoints
|
||||
* function applied to both the previous and the new token_metadata.
|
||||
* Endpoints present in the updated version but absent in the original one
|
||||
* ought to be appended to the pending_ranges.
|
||||
*/
|
||||
future<> update_pending_ranges(const abstract_replication_strategy& strategy, const sstring& keyspace_name, dc_rack_fn& get_dc_rack);
|
||||
future<> update_topology_change_info(dc_rack_fn& get_dc_rack);
|
||||
|
||||
const std::optional<topology_change_info>& get_topology_change_info() const;
|
||||
|
||||
token get_predecessor(token t) const;
|
||||
|
||||
@@ -265,18 +256,13 @@ public:
|
||||
* Bootstrapping tokens are not taken into account. */
|
||||
size_t count_normal_token_owners() const;
|
||||
|
||||
// returns empty vector if keyspace_name not found.
|
||||
inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
|
||||
|
||||
// This function returns a list of nodes to which a read request should be directed.
|
||||
// Returns not null only during topology changes, if _topology_change_stage == read_new and
|
||||
// new set of replicas differs from the old one.
|
||||
std::optional<inet_address_vector_replica_set> endpoints_for_reading(const token& token, const sstring& keyspace_name) const;
|
||||
|
||||
// updates the current topology_transition_state of this instance,
|
||||
// this value is preserved in all clone functions,
|
||||
// by default it's not set
|
||||
void set_topology_transition_state(std::optional<service::topology::transition_state> state);
|
||||
// Updates the read_new flag, switching read requests from
|
||||
// the old endpoints to the new ones during topology changes:
|
||||
// read_new_t::no - no read_endpoints will be stored on update_pending_ranges, all reads goes to normal endpoints;
|
||||
// read_new_t::yes - triggers update_pending_ranges to compute and store new ranges for read requests.
|
||||
// The value is preserved in all clone functions, the default is read_new_t::no.
|
||||
using read_new_t = bool_class<class read_new_tag>;
|
||||
void set_read_new(read_new_t value);
|
||||
|
||||
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
|
||||
std::multimap<inet_address, token> get_endpoint_to_token_map_for_reading() const;
|
||||
@@ -292,6 +278,19 @@ public:
|
||||
friend class token_metadata_impl;
|
||||
};
|
||||
|
||||
struct topology_change_info {
|
||||
token_metadata target_token_metadata;
|
||||
std::optional<token_metadata> base_token_metadata;
|
||||
std::vector<dht::token> all_tokens;
|
||||
token_metadata::read_new_t read_new;
|
||||
|
||||
topology_change_info(token_metadata target_token_metadata_,
|
||||
std::optional<token_metadata> base_token_metadata_,
|
||||
std::vector<dht::token> all_tokens_,
|
||||
token_metadata::read_new_t read_new_);
|
||||
future<> clear_gently();
|
||||
};
|
||||
|
||||
using token_metadata_ptr = lw_shared_ptr<const token_metadata>;
|
||||
using mutable_token_metadata_ptr = lw_shared_ptr<token_metadata>;
|
||||
using token_metadata_lock = semaphore_units<>;
|
||||
|
||||
@@ -2845,7 +2845,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
|
||||
replica::table& table = _db.local().find_column_family(s->id());
|
||||
auto erm = table.get_effective_replication_map();
|
||||
inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints_without_node_being_replaced(token);
|
||||
inet_address_vector_topology_change pending_endpoints = erm->get_pending_endpoints(token, s->ks_name());
|
||||
inet_address_vector_topology_change pending_endpoints = erm->get_pending_endpoints(token);
|
||||
|
||||
slogger.trace("creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints, pending_endpoints);
|
||||
tracing::trace(tr_state, "Creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints ,pending_endpoints);
|
||||
@@ -3208,7 +3208,7 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level
|
||||
storage_proxy::paxos_participants
|
||||
storage_proxy::get_paxos_participants(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token &token, db::consistency_level cl_for_paxos) {
|
||||
inet_address_vector_replica_set natural_endpoints = erm.get_natural_endpoints_without_node_being_replaced(token);
|
||||
inet_address_vector_topology_change pending_endpoints = erm.get_pending_endpoints(token, ks_name);
|
||||
inet_address_vector_topology_change pending_endpoints = erm.get_pending_endpoints(token);
|
||||
|
||||
if (cl_for_paxos == db::consistency_level::LOCAL_SERIAL) {
|
||||
auto local_dc_filter = erm.get_topology().get_local_dc_filter();
|
||||
@@ -6008,7 +6008,7 @@ void storage_proxy::sort_endpoints_by_proximity(const locator::topology& topo, i
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set storage_proxy::get_endpoints_for_reading(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token& token) const {
|
||||
auto endpoints = erm.get_token_metadata_ptr()->endpoints_for_reading(token, ks_name);
|
||||
auto endpoints = erm.get_endpoints_for_reading(token);
|
||||
if (!endpoints) {
|
||||
endpoints = erm.get_natural_endpoints_without_node_being_replaced(token);
|
||||
}
|
||||
|
||||
@@ -263,7 +263,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela
|
||||
slogger.info("waiting for schema information to complete");
|
||||
co_await sleep_abortable(std::chrono::seconds(1), _abort_source);
|
||||
}
|
||||
co_await update_pending_ranges("joining");
|
||||
co_await update_topology_change_info("joining");
|
||||
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
if (!_db.local().get_config().consistent_rangemovement() ||
|
||||
@@ -360,7 +360,20 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
|
||||
co_await add_normal_node(id, rs);
|
||||
}
|
||||
|
||||
tmptr->set_topology_transition_state(_topology_state_machine._topology.tstate);
|
||||
tmptr->set_read_new(std::invoke([](std::optional<topology::transition_state> state) {
|
||||
using read_new_t = locator::token_metadata::read_new_t;
|
||||
if (!state.has_value()) {
|
||||
return read_new_t::no;
|
||||
}
|
||||
switch (*state) {
|
||||
case topology::transition_state::commit_cdc_generation:
|
||||
case topology::transition_state::publish_cdc_generation:
|
||||
case topology::transition_state::write_both_read_old:
|
||||
return read_new_t::no;
|
||||
case topology::transition_state::write_both_read_new:
|
||||
return read_new_t::yes;
|
||||
}
|
||||
}, _topology_state_machine._topology.tstate));
|
||||
|
||||
for (const auto& [id, rs]: _topology_state_machine._topology.transition_nodes) {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
@@ -385,7 +398,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
|
||||
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip);
|
||||
} else {
|
||||
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip);
|
||||
co_await update_pending_ranges(tmptr, ::format("bootstrapping node {}/{}", id, ip));
|
||||
co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
|
||||
}
|
||||
break;
|
||||
case node_state::decommissioning:
|
||||
@@ -394,7 +407,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
|
||||
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip);
|
||||
tmptr->update_host_id(host_id, ip);
|
||||
tmptr->add_leaving_endpoint(ip);
|
||||
co_await update_pending_ranges(tmptr, ::format("{} {}/{}", rs.state, id, ip));
|
||||
co_await update_topology_change_info(tmptr, ::format("{} {}/{}", rs.state, id, ip));
|
||||
break;
|
||||
case node_state::replacing: {
|
||||
assert(_topology_state_machine._topology.req_param.contains(id));
|
||||
@@ -407,7 +420,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
|
||||
assert(existing_ip);
|
||||
tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack});
|
||||
tmptr->add_replacing_endpoint(*existing_ip, ip);
|
||||
co_await update_pending_ranges(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip));
|
||||
co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip));
|
||||
}
|
||||
break;
|
||||
case node_state::rebuilding:
|
||||
@@ -2037,7 +2050,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
|
||||
auto endpoint = get_broadcast_address();
|
||||
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack(), locator::node::state::joining);
|
||||
tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint);
|
||||
return update_pending_ranges(std::move(tmptr), ::format("bootstrapping node {}", endpoint));
|
||||
return update_topology_change_info(std::move(tmptr), ::format("bootstrapping node {}", endpoint));
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -2137,7 +2150,7 @@ future<> storage_service::handle_state_replacing_update_pending_ranges(mutable_t
|
||||
replacing_node, std::current_exception());
|
||||
}
|
||||
slogger.info("handle_state_replacing: Update pending ranges for replacing node {}", replacing_node);
|
||||
co_await update_pending_ranges(tmptr, ::format("handle_state_replacing {}", replacing_node));
|
||||
co_await update_topology_change_info(tmptr, ::format("handle_state_replacing {}", replacing_node));
|
||||
}
|
||||
|
||||
future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
|
||||
@@ -2169,7 +2182,7 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
|
||||
if (_gossiper.uses_host_id(endpoint)) {
|
||||
tmptr->update_host_id(_gossiper.get_host_id(endpoint), endpoint);
|
||||
}
|
||||
co_await update_pending_ranges(tmptr, ::format("handle_state_bootstrap {}", endpoint));
|
||||
co_await update_topology_change_info(tmptr, ::format("handle_state_bootstrap {}", endpoint));
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
}
|
||||
|
||||
@@ -2311,7 +2324,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {
|
||||
co_await tmptr->update_normal_tokens(owned_tokens, endpoint);
|
||||
}
|
||||
|
||||
co_await update_pending_ranges(tmptr, ::format("handle_state_normal {}", endpoint));
|
||||
co_await update_topology_change_info(tmptr, ::format("handle_state_normal {}", endpoint));
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
tmlock.reset();
|
||||
|
||||
@@ -2376,7 +2389,7 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) {
|
||||
// normally
|
||||
tmptr->add_leaving_endpoint(endpoint);
|
||||
|
||||
co_await update_pending_ranges(tmptr, ::format("handle_state_leaving", endpoint));
|
||||
co_await update_topology_change_info(tmptr, ::format("handle_state_leaving", endpoint));
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
}
|
||||
|
||||
@@ -2439,7 +2452,7 @@ future<> storage_service::handle_state_removing(inet_address endpoint, std::vect
|
||||
slogger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint);
|
||||
// Note that the endpoint is being removed
|
||||
tmptr->add_leaving_endpoint(endpoint);
|
||||
return update_pending_ranges(std::move(tmptr), ::format("handle_state_removing {}", endpoint));
|
||||
return update_topology_change_info(std::move(tmptr), ::format("handle_state_removing {}", endpoint));
|
||||
});
|
||||
// find the endpoint coordinating this removal that we need to notify when we're done
|
||||
auto* value = _gossiper.get_application_state_ptr(endpoint, application_state::REMOVAL_COORDINATOR);
|
||||
@@ -2589,7 +2602,7 @@ future<> storage_service::on_remove(gms::inet_address endpoint) {
|
||||
auto tmlock = co_await get_token_metadata_lock();
|
||||
auto tmptr = co_await get_mutable_token_metadata_ptr();
|
||||
tmptr->remove_endpoint(endpoint);
|
||||
co_await update_pending_ranges(tmptr, ::format("on_remove {}", endpoint));
|
||||
co_await update_topology_change_info(tmptr, ::format("on_remove {}", endpoint));
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
}
|
||||
|
||||
@@ -3568,11 +3581,11 @@ future<> storage_service::decommission() {
|
||||
throw std::runtime_error(::format("Node in {} state; wait for status to become normal or restart", ss._operation_mode));
|
||||
}
|
||||
|
||||
ss.update_pending_ranges(::format("decommission {}", endpoint)).get();
|
||||
ss.update_topology_change_info(::format("decommission {}", endpoint)).get();
|
||||
|
||||
auto non_system_keyspaces = db.get_non_local_vnode_based_strategy_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
if (ss.get_token_metadata().has_pending_ranges(keyspace_name, ss.get_broadcast_address())) {
|
||||
if (ss._db.local().find_keyspace(keyspace_name).get_effective_replication_map()->has_pending_ranges(ss.get_broadcast_address())) {
|
||||
throw std::runtime_error("data is currently moving to this node; unable to leave the ring");
|
||||
}
|
||||
}
|
||||
@@ -4128,7 +4141,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(node);
|
||||
}
|
||||
return update_pending_ranges(tmptr, ::format("removenode {}", req.leaving_nodes));
|
||||
return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes));
|
||||
}).get();
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
@@ -4136,7 +4149,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(node);
|
||||
}
|
||||
return update_pending_ranges(tmptr, ::format("removenode {}", req.leaving_nodes));
|
||||
return update_topology_change_info(tmptr, ::format("removenode {}", req.leaving_nodes));
|
||||
});
|
||||
});
|
||||
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
@@ -4176,7 +4189,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
slogger.info("decommission[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(node);
|
||||
}
|
||||
return update_pending_ranges(tmptr, ::format("decommission {}", req.leaving_nodes));
|
||||
return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes));
|
||||
}).get();
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
@@ -4184,7 +4197,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(node);
|
||||
}
|
||||
return update_pending_ranges(tmptr, ::format("decommission {}", req.leaving_nodes));
|
||||
return update_topology_change_info(tmptr, ::format("decommission {}", req.leaving_nodes));
|
||||
});
|
||||
});
|
||||
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
|
||||
@@ -4246,7 +4259,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
||||
tmptr->del_replacing_endpoint(existing_node);
|
||||
}
|
||||
return update_pending_ranges(tmptr, ::format("replace {}", req.replace_nodes));
|
||||
return update_topology_change_info(tmptr, ::format("replace {}", req.replace_nodes));
|
||||
});
|
||||
});
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
|
||||
@@ -4263,7 +4276,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
// Update the pending_ranges for the replacing node
|
||||
slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator);
|
||||
mutate_token_metadata([&req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
return update_pending_ranges(tmptr, ::format("replace {}", req.replace_nodes));
|
||||
return update_topology_change_info(tmptr, ::format("replace {}", req.replace_nodes));
|
||||
}).get();
|
||||
} else if (req.cmd == node_ops_cmd::replace_heartbeat) {
|
||||
slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
@@ -4288,7 +4301,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::joining);
|
||||
tmptr->add_bootstrap_tokens(tokens, endpoint);
|
||||
}
|
||||
return update_pending_ranges(tmptr, ::format("bootstrap {}", req.bootstrap_nodes));
|
||||
return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes));
|
||||
}).get();
|
||||
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
@@ -4298,7 +4311,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
slogger.info("bootstrap[{}]: Removed node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
|
||||
tmptr->remove_bootstrap_tokens(tokens);
|
||||
}
|
||||
return update_pending_ranges(tmptr, ::format("bootstrap {}", req.bootstrap_nodes));
|
||||
return update_topology_change_info(tmptr, ::format("bootstrap {}", req.bootstrap_nodes));
|
||||
});
|
||||
});
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
|
||||
@@ -4714,7 +4727,7 @@ future<> storage_service::excise(std::unordered_set<token> tokens, inet_address
|
||||
tmptr->remove_endpoint(endpoint);
|
||||
tmptr->remove_bootstrap_tokens(tokens);
|
||||
|
||||
co_await update_pending_ranges(tmptr, ::format("excise {}", endpoint));
|
||||
co_await update_topology_change_info(tmptr, ::format("excise {}", endpoint));
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
tmlock.reset();
|
||||
|
||||
@@ -4760,7 +4773,7 @@ future<> storage_service::leave_ring() {
|
||||
co_await mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) {
|
||||
auto endpoint = get_broadcast_address();
|
||||
tmptr->remove_endpoint(endpoint);
|
||||
return update_pending_ranges(std::move(tmptr), ::format("leave_ring {}", endpoint));
|
||||
return update_topology_change_info(std::move(tmptr), ::format("leave_ring {}", endpoint));
|
||||
});
|
||||
|
||||
auto expire_time = _gossiper.compute_expire_time().time_since_epoch().count();
|
||||
@@ -4922,28 +4935,22 @@ future<> storage_service::mutate_token_metadata(std::function<future<> (mutable_
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
}
|
||||
|
||||
future<> storage_service::update_pending_ranges(mutable_token_metadata_ptr tmptr, sstring reason) {
|
||||
future<> storage_service::update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason) {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
try {
|
||||
auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms();
|
||||
for (const auto& [keyspace_name, erm] : ks_erms) {
|
||||
auto& strategy = erm->get_replication_strategy();
|
||||
slogger.debug("Updating pending ranges for keyspace={} starts ({})", keyspace_name, reason);
|
||||
locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); });
|
||||
co_await tmptr->update_pending_ranges(strategy, keyspace_name, get_dc_rack_from_gossiper);
|
||||
slogger.debug("Updating pending ranges for keyspace={} ends ({})", keyspace_name, reason);
|
||||
}
|
||||
locator::dc_rack_fn get_dc_rack_from_gossiper([this] (inet_address ep) { return get_dc_rack_for(ep); });
|
||||
co_await tmptr->update_topology_change_info(get_dc_rack_from_gossiper);
|
||||
} catch (...) {
|
||||
auto ep = std::current_exception();
|
||||
slogger.error("Failed to update pending ranges for {}: {}", reason, ep);
|
||||
slogger.error("Failed to update topology change info for {}: {}", reason, ep);
|
||||
std::rethrow_exception(std::move(ep));
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::update_pending_ranges(sstring reason, acquire_merge_lock acquire_merge_lock) {
|
||||
future<> storage_service::update_topology_change_info(sstring reason, acquire_merge_lock acquire_merge_lock) {
|
||||
return mutate_token_metadata([this, reason = std::move(reason)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
return update_pending_ranges(std::move(tmptr), std::move(reason));
|
||||
return update_topology_change_info(std::move(tmptr), std::move(reason));
|
||||
}, acquire_merge_lock);
|
||||
}
|
||||
|
||||
@@ -4951,7 +4958,7 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
|
||||
// Update pending ranges since keyspace can be changed after we calculate pending ranges.
|
||||
sstring reason = ::format("keyspace {}", ks_name);
|
||||
return container().invoke_on(0, [reason = std::move(reason)] (auto& ss) mutable {
|
||||
return ss.update_pending_ranges(reason, acquire_merge_lock::no).handle_exception([reason = std::move(reason)] (auto ep) {
|
||||
return ss.update_topology_change_info(reason, acquire_merge_lock::no).handle_exception([reason = std::move(reason)] (auto ep) {
|
||||
slogger.warn("Failure to update pending ranges for {} ignored", reason);
|
||||
});
|
||||
});
|
||||
@@ -5556,7 +5563,7 @@ future<> storage_service::wait_for_normal_state_handled_on_boot(const std::unord
|
||||
future<bool> storage_service::is_cleanup_allowed(sstring keyspace) {
|
||||
return container().invoke_on(0, [keyspace = std::move(keyspace)] (storage_service& ss) {
|
||||
auto my_address = ss.get_broadcast_address();
|
||||
auto pending_ranges = ss.get_token_metadata().has_pending_ranges(keyspace, my_address);
|
||||
auto pending_ranges = ss._db.local().find_keyspace(keyspace).get_effective_replication_map()->has_pending_ranges(my_address);
|
||||
bool is_bootstrap_mode = ss._operation_mode == mode::BOOTSTRAP;
|
||||
slogger.debug("is_cleanup_allowed: keyspace={}, is_bootstrap_mode={}, pending_ranges={}",
|
||||
keyspace, is_bootstrap_mode, pending_ranges);
|
||||
|
||||
@@ -190,8 +190,8 @@ private:
|
||||
// Update pending ranges locally and then replicate to all cores.
|
||||
// Should be serialized under token_metadata_lock.
|
||||
// Must be called on shard 0.
|
||||
future<> update_pending_ranges(mutable_token_metadata_ptr tmptr, sstring reason);
|
||||
future<> update_pending_ranges(sstring reason, acquire_merge_lock aml = acquire_merge_lock::yes);
|
||||
future<> update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason);
|
||||
future<> update_topology_change_info(sstring reason, acquire_merge_lock aml = acquire_merge_lock::yes);
|
||||
future<> keyspace_changed(const sstring& ks_name);
|
||||
void register_metrics();
|
||||
future<> snitch_reconfigured();
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "utils/fb_utilities.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "locator/simple_strategy.hh"
|
||||
#include "locator/everywhere_replication_strategy.hh"
|
||||
|
||||
using namespace locator;
|
||||
|
||||
@@ -24,8 +25,8 @@ namespace {
|
||||
};
|
||||
}
|
||||
|
||||
token_metadata create_token_metadata(inet_address this_endpoint) {
|
||||
return token_metadata({
|
||||
mutable_token_metadata_ptr create_token_metadata(inet_address this_endpoint) {
|
||||
return make_lw_shared<token_metadata>(token_metadata::config {
|
||||
topology::config {
|
||||
.this_host_id = host_id::create_random_id(),
|
||||
.this_endpoint = this_endpoint,
|
||||
@@ -33,59 +34,87 @@ namespace {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Strategy>
|
||||
mutable_vnode_erm_ptr create_erm(mutable_token_metadata_ptr tmptr, replication_strategy_config_options opts = {}) {
|
||||
dc_rack_fn get_dc_rack_fn = get_dc_rack;
|
||||
tmptr->update_topology_change_info(get_dc_rack_fn).get();
|
||||
auto strategy = seastar::make_shared<Strategy>(std::move(opts));
|
||||
return calculate_effective_replication_map(std::move(strategy), std::move(tmptr)).get0();
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_first_node) {
|
||||
dc_rack_fn get_dc_rack_fn = get_dc_rack;
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_bootstrap_first_node) {
|
||||
const auto e1 = inet_address("192.168.0.1");
|
||||
const auto t1 = dht::token::from_int64(1);
|
||||
|
||||
auto token_metadata = create_token_metadata(e1);
|
||||
token_metadata.update_topology(e1, get_dc_rack(e1));
|
||||
const auto replication_strategy = simple_strategy(replication_strategy_config_options {
|
||||
{"replication_factor", "1"}
|
||||
});
|
||||
token_metadata.add_bootstrap_token(t1, e1);
|
||||
token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get();
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(0), ks_name),
|
||||
inet_address_vector_topology_change{e1});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name),
|
||||
inet_address_vector_topology_change{e1});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name),
|
||||
inet_address_vector_topology_change{e1});
|
||||
token_metadata->update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata->add_bootstrap_token(t1, e1);
|
||||
|
||||
{
|
||||
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "1"}});
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)),
|
||||
inet_address_vector_topology_change{e1});
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
|
||||
inet_address_vector_topology_change{e1});
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)),
|
||||
inet_address_vector_topology_change{e1});
|
||||
BOOST_REQUIRE(!erm->get_endpoints_for_reading(t1).has_value());
|
||||
}
|
||||
{
|
||||
token_metadata->set_read_new(token_metadata::read_new_t::yes);
|
||||
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "1"}});
|
||||
BOOST_REQUIRE_EQUAL(erm->get_endpoints_for_reading(t1), inet_address_vector_replica_set{e1});
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy) {
|
||||
const auto e1 = inet_address("192.168.0.1");
|
||||
const auto e2 = inet_address("192.168.0.2");
|
||||
const auto t1 = dht::token::from_int64(10);
|
||||
const auto t2 = dht::token::from_int64(20);
|
||||
|
||||
auto token_metadata = create_token_metadata(e1);
|
||||
token_metadata->update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata->update_topology(e2, get_dc_rack(e2));
|
||||
token_metadata->update_normal_tokens({t1}, e1).get();
|
||||
token_metadata->add_bootstrap_token(t2, e2);
|
||||
token_metadata->set_read_new(token_metadata::read_new_t::yes);
|
||||
|
||||
auto erm = create_erm<everywhere_replication_strategy>(token_metadata);
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(t2),
|
||||
inet_address_vector_topology_change{e2});
|
||||
BOOST_REQUIRE_EQUAL(erm->get_endpoints_for_reading(t2),
|
||||
(inet_address_vector_replica_set{e2, e1}));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) {
|
||||
dc_rack_fn get_dc_rack_fn = get_dc_rack;
|
||||
|
||||
const auto e1 = inet_address("192.168.0.1");
|
||||
const auto t1 = dht::token::from_int64(1);
|
||||
const auto e2 = inet_address("192.168.0.2");
|
||||
const auto t2 = dht::token::from_int64(100);
|
||||
|
||||
auto token_metadata = create_token_metadata(e1);
|
||||
token_metadata.update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata.update_topology(e2, get_dc_rack(e2));
|
||||
const auto replication_strategy = simple_strategy(replication_strategy_config_options {
|
||||
{"replication_factor", "1"}
|
||||
});
|
||||
token_metadata.update_normal_tokens({t1}, e1).get();
|
||||
token_metadata.add_bootstrap_token(t2, e2);
|
||||
token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get();
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(0), ks_name),
|
||||
token_metadata->update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata->update_topology(e2, get_dc_rack(e2));
|
||||
token_metadata->update_normal_tokens({t1}, e1).get();
|
||||
token_metadata->add_bootstrap_token(t2, e2);
|
||||
|
||||
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "1"}});
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)),
|
||||
inet_address_vector_topology_change{});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
|
||||
inet_address_vector_topology_change{});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)),
|
||||
inet_address_vector_topology_change{e2});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)),
|
||||
inet_address_vector_topology_change{e2});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(101), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)),
|
||||
inet_address_vector_topology_change{});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) {
|
||||
dc_rack_fn get_dc_rack_fn = get_dc_rack;
|
||||
|
||||
const auto t1 = dht::token::from_int64(1);
|
||||
const auto t10 = dht::token::from_int64(10);
|
||||
const auto t100 = dht::token::from_int64(100);
|
||||
@@ -93,32 +122,29 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) {
|
||||
const auto e1 = inet_address("192.168.0.1");
|
||||
const auto e2 = inet_address("192.168.0.2");
|
||||
const auto e3 = inet_address("192.168.0.3");
|
||||
|
||||
auto token_metadata = create_token_metadata(e1);
|
||||
token_metadata.update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata.update_topology(e2, get_dc_rack(e2));
|
||||
token_metadata.update_topology(e3, get_dc_rack(e3));
|
||||
const auto replication_strategy = simple_strategy(replication_strategy_config_options {
|
||||
{"replication_factor", "2"}
|
||||
});
|
||||
token_metadata.update_normal_tokens({t1, t1000}, e2).get();
|
||||
token_metadata.update_normal_tokens({t10}, e3).get();
|
||||
token_metadata.add_bootstrap_token(t100, e1);
|
||||
token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get();
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name),
|
||||
token_metadata->update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata->update_topology(e2, get_dc_rack(e2));
|
||||
token_metadata->update_topology(e3, get_dc_rack(e3));
|
||||
token_metadata->update_normal_tokens({t1, t1000}, e2).get();
|
||||
token_metadata->update_normal_tokens({t10}, e3).get();
|
||||
token_metadata->add_bootstrap_token(t100, e1);
|
||||
|
||||
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "2"}});
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
|
||||
inet_address_vector_topology_change{});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)),
|
||||
inet_address_vector_topology_change{e1});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(11), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)),
|
||||
inet_address_vector_topology_change{e1});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)),
|
||||
inet_address_vector_topology_change{e1});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(101), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)),
|
||||
inet_address_vector_topology_change{});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) {
|
||||
dc_rack_fn get_dc_rack_fn = get_dc_rack;
|
||||
|
||||
const auto t1 = dht::token::from_int64(1);
|
||||
const auto t10 = dht::token::from_int64(10);
|
||||
const auto t100 = dht::token::from_int64(100);
|
||||
@@ -126,33 +152,30 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) {
|
||||
const auto e1 = inet_address("192.168.0.1");
|
||||
const auto e2 = inet_address("192.168.0.2");
|
||||
const auto e3 = inet_address("192.168.0.3");
|
||||
|
||||
auto token_metadata = create_token_metadata(e1);
|
||||
token_metadata.update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata.update_topology(e2, get_dc_rack(e2));
|
||||
token_metadata.update_topology(e3, get_dc_rack(e3));
|
||||
const auto replication_strategy = simple_strategy(replication_strategy_config_options {
|
||||
{"replication_factor", "2"}
|
||||
});
|
||||
token_metadata.update_normal_tokens({t1, t1000}, e2).get();
|
||||
token_metadata.update_normal_tokens({t10}, e3).get();
|
||||
token_metadata.update_normal_tokens({t100}, e1).get();
|
||||
token_metadata.add_leaving_endpoint(e1);
|
||||
token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get();
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name),
|
||||
token_metadata->update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata->update_topology(e2, get_dc_rack(e2));
|
||||
token_metadata->update_topology(e3, get_dc_rack(e3));
|
||||
token_metadata->update_normal_tokens({t1, t1000}, e2).get();
|
||||
token_metadata->update_normal_tokens({t10}, e3).get();
|
||||
token_metadata->update_normal_tokens({t100}, e1).get();
|
||||
token_metadata->add_leaving_endpoint(e1);
|
||||
|
||||
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "2"}});
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
|
||||
inet_address_vector_topology_change{});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)),
|
||||
inet_address_vector_topology_change{e2});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(11), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)),
|
||||
inet_address_vector_topology_change{e3});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)),
|
||||
inet_address_vector_topology_change{e3});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(101), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)),
|
||||
inet_address_vector_topology_change{});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) {
|
||||
dc_rack_fn get_dc_rack_fn = get_dc_rack;
|
||||
|
||||
const auto t1 = dht::token::from_int64(1);
|
||||
const auto t10 = dht::token::from_int64(10);
|
||||
const auto t100 = dht::token::from_int64(100);
|
||||
@@ -161,38 +184,35 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) {
|
||||
const auto e2 = inet_address("192.168.0.2");
|
||||
const auto e3 = inet_address("192.168.0.3");
|
||||
const auto e4 = inet_address("192.168.0.4");
|
||||
|
||||
auto token_metadata = create_token_metadata(e1);
|
||||
token_metadata.update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata.update_topology(e2, get_dc_rack(e2));
|
||||
token_metadata.update_topology(e3, get_dc_rack(e3));
|
||||
token_metadata.update_topology(e4, get_dc_rack(e4));
|
||||
const auto replication_strategy = simple_strategy(replication_strategy_config_options {
|
||||
{"replication_factor", "2"}
|
||||
});
|
||||
token_metadata.update_normal_tokens({t1000}, e1).get();
|
||||
token_metadata.update_normal_tokens({t1, t100}, e2).get();
|
||||
token_metadata.update_normal_tokens({t10}, e3).get();
|
||||
token_metadata.add_replacing_endpoint(e3, e4);
|
||||
token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get();
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(100), ks_name),
|
||||
token_metadata->update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata->update_topology(e2, get_dc_rack(e2));
|
||||
token_metadata->update_topology(e3, get_dc_rack(e3));
|
||||
token_metadata->update_topology(e4, get_dc_rack(e4));
|
||||
token_metadata->update_normal_tokens({t1000}, e1).get();
|
||||
token_metadata->update_normal_tokens({t1, t100}, e2).get();
|
||||
token_metadata->update_normal_tokens({t10}, e3).get();
|
||||
token_metadata->add_replacing_endpoint(e3, e4);
|
||||
|
||||
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "2"}});
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)),
|
||||
inet_address_vector_topology_change{});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1000), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1000)),
|
||||
inet_address_vector_topology_change{});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1001), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1001)),
|
||||
inet_address_vector_topology_change{e4});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
|
||||
inet_address_vector_topology_change{e4});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(2), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)),
|
||||
inet_address_vector_topology_change{e4});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(10), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(10)),
|
||||
inet_address_vector_topology_change{e4});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(11), ks_name),
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)),
|
||||
inet_address_vector_topology_change{});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas) {
|
||||
dc_rack_fn get_dc_rack_fn = get_dc_rack;
|
||||
|
||||
const auto t1 = dht::token::from_int64(1);
|
||||
const auto t10 = dht::token::from_int64(10);
|
||||
const auto t100 = dht::token::from_int64(100);
|
||||
@@ -200,66 +220,66 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas
|
||||
const auto e1 = inet_address("192.168.0.1");
|
||||
const auto e2 = inet_address("192.168.0.2");
|
||||
const auto e3 = inet_address("192.168.0.3");
|
||||
auto token_metadata = create_token_metadata(e1);
|
||||
token_metadata.update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata.update_topology(e2, get_dc_rack(e2));
|
||||
token_metadata.update_topology(e3, get_dc_rack(e3));
|
||||
const auto replication_strategy = simple_strategy(replication_strategy_config_options {
|
||||
{"replication_factor", "2"}
|
||||
});
|
||||
token_metadata.update_normal_tokens({t1, t1000}, e2).get();
|
||||
token_metadata.update_normal_tokens({t10}, e3).get();
|
||||
token_metadata.add_bootstrap_token(t100, e1);
|
||||
|
||||
auto check_endpoints = [&](int64_t t, inet_address_vector_replica_set expected_replicas,
|
||||
auto token_metadata = create_token_metadata(e1);
|
||||
token_metadata->update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata->update_topology(e2, get_dc_rack(e2));
|
||||
token_metadata->update_topology(e3, get_dc_rack(e3));
|
||||
token_metadata->update_normal_tokens({t1, t1000}, e2).get();
|
||||
token_metadata->update_normal_tokens({t10}, e3).get();
|
||||
token_metadata->add_bootstrap_token(t100, e1);
|
||||
|
||||
auto check_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t,
|
||||
inet_address_vector_replica_set expected_replicas,
|
||||
seastar::compat::source_location sl = seastar::compat::source_location::current())
|
||||
{
|
||||
BOOST_TEST_INFO("line: " << sl.line());
|
||||
const auto expected_set = std::unordered_set<inet_address>(expected_replicas.begin(),
|
||||
expected_replicas.end());
|
||||
const auto actual_replicas = token_metadata.endpoints_for_reading(dht::token::from_int64(t), ks_name);
|
||||
const auto actual_replicas = erm->get_endpoints_for_reading(dht::token::from_int64(t));
|
||||
BOOST_REQUIRE(actual_replicas.has_value());
|
||||
const auto actual_set = std::unordered_set<inet_address>(actual_replicas->begin(),
|
||||
actual_replicas->end());
|
||||
BOOST_REQUIRE_EQUAL(expected_set, actual_set);
|
||||
};
|
||||
|
||||
auto check_no_endpoints = [&](int64_t t,
|
||||
auto check_no_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t,
|
||||
seastar::compat::source_location sl = seastar::compat::source_location::current())
|
||||
{
|
||||
BOOST_TEST_INFO("line: " << sl.line());
|
||||
BOOST_REQUIRE(!token_metadata.endpoints_for_reading(dht::token::from_int64(t), ks_name).has_value());
|
||||
BOOST_REQUIRE(!erm->get_endpoints_for_reading(dht::token::from_int64(t)).has_value());
|
||||
};
|
||||
|
||||
token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get();
|
||||
check_no_endpoints(2);
|
||||
{
|
||||
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "2"}});
|
||||
check_no_endpoints(erm, 2);
|
||||
}
|
||||
|
||||
token_metadata.set_topology_transition_state(service::topology::transition_state::write_both_read_new);
|
||||
token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get();
|
||||
{
|
||||
token_metadata->set_read_new(locator::token_metadata::read_new_t::yes);
|
||||
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "2"}});
|
||||
|
||||
check_endpoints(2, {e3, e1});
|
||||
check_endpoints(10, {e3, e1});
|
||||
check_endpoints(11, {e1, e2});
|
||||
check_endpoints(100, {e1, e2});
|
||||
check_no_endpoints(101);
|
||||
check_no_endpoints(1001);
|
||||
check_no_endpoints(1);
|
||||
check_endpoints(erm, 2, {e3, e1});
|
||||
check_endpoints(erm, 10, {e3, e1});
|
||||
check_endpoints(erm, 11, {e1, e2});
|
||||
check_endpoints(erm, 100, {e1, e2});
|
||||
check_no_endpoints(erm, 101);
|
||||
check_no_endpoints(erm, 1001);
|
||||
check_no_endpoints(erm, 1);
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) {
|
||||
dc_rack_fn get_dc_rack_fn = get_dc_rack;
|
||||
|
||||
const auto t1 = dht::token::from_int64(1);
|
||||
const auto e1 = inet_address("192.168.0.1");
|
||||
|
||||
auto token_metadata = create_token_metadata(e1);
|
||||
token_metadata.update_topology(e1, get_dc_rack(e1));
|
||||
const auto replication_strategy = simple_strategy(replication_strategy_config_options {
|
||||
{"replication_factor", "2"}
|
||||
});
|
||||
token_metadata.update_normal_tokens({t1}, e1).get();
|
||||
token_metadata.add_replacing_endpoint(e1, e1);
|
||||
token_metadata.update_pending_ranges(replication_strategy, ks_name, get_dc_rack_fn).get();
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.pending_endpoints_for(dht::token::from_int64(1), ks_name),
|
||||
token_metadata->update_topology(e1, get_dc_rack(e1));
|
||||
token_metadata->update_normal_tokens({t1}, e1).get();
|
||||
token_metadata->add_replacing_endpoint(e1, e1);
|
||||
|
||||
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "2"}});
|
||||
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
|
||||
inet_address_vector_topology_change{e1});
|
||||
BOOST_REQUIRE_EQUAL(token_metadata.get_endpoint(t1), e1);
|
||||
BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1);
|
||||
}
|
||||
|
||||
@@ -129,6 +129,10 @@ public:
|
||||
return _set;
|
||||
}
|
||||
|
||||
auto extract_vector() && noexcept {
|
||||
return std::move(_vec);
|
||||
}
|
||||
|
||||
auto extract_set() && noexcept {
|
||||
return std::move(_set);
|
||||
}
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/when_all.hh>
|
||||
#include <seastar/core/do_with.hh>
|
||||
#include "utils/collection-concepts.hh"
|
||||
|
||||
using namespace seastar;
|
||||
@@ -95,7 +97,7 @@ concept TriviallyClearableSequence =
|
||||
|
||||
template <typename T>
|
||||
concept Container = Iterable<T> && requires (T x, typename T::iterator it) {
|
||||
{ x.erase(it) } -> std::same_as<typename T::iterator>;
|
||||
x.erase(it);
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
@@ -174,6 +176,14 @@ future<> clear_gently(foreign_ptr<T>& o) noexcept {
|
||||
});
|
||||
}
|
||||
|
||||
template <typename... T>
|
||||
requires (std::is_rvalue_reference_v<T&&> && ...)
|
||||
future<> clear_gently(T&&... o) noexcept {
|
||||
return do_with(std::move(o)..., [](auto&... args) {
|
||||
return when_all(clear_gently(args)...).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
template <SharedPointer T>
|
||||
future<> clear_gently(T& o) noexcept {
|
||||
if (o.use_count() == 1) {
|
||||
|
||||
Reference in New Issue
Block a user