Add enforce_rack_list option. When the option is set to true, all tablet keyspaces have rack list replication factor. When the option is on: - CREATE STATEMENT always auto-extends rf to rack lists; - ALTER STATEMENT fails when there is numeric rf in any DC. The flag is set to false by default and a node needs to be restarted in order to change its value. Starting a node with enforce_rack_list option will fail, if there are any tablet keyspaces with numeric rf in any DC. enforce_rack_list is a per-node option and a user needs to ensure that no tablet keyspace is altered or created while nodes in the cluster don't have the consistent value.
777 lines
32 KiB
C++
777 lines
32 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "locator/abstract_replication_strategy.hh"
|
|
#include "locator/tablet_replication_strategy.hh"
|
|
#include "locator/local_strategy.hh"
|
|
#include "utils/class_registrator.hh"
|
|
#include "exceptions/exceptions.hh"
|
|
#include <fmt/ranges.h>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
#include "replica/database.hh"
|
|
#include "utils/stall_free.hh"
|
|
|
|
#include <boost/icl/interval.hpp>
|
|
#include <boost/icl/interval_map.hpp>
|
|
#include <variant>
|
|
|
|
namespace locator {
|
|
|
|
using ring_mapping_underlying_type = boost::icl::interval_map<token, std::unordered_set<locator::host_id>>;
|
|
|
|
struct ring_mapping_impl {
|
|
ring_mapping_underlying_type map;
|
|
};
|
|
|
|
ring_mapping::ring_mapping()
|
|
: _impl(std::make_unique<ring_mapping_impl>()) {
|
|
}
|
|
|
|
ring_mapping::ring_mapping(ring_mapping&&) noexcept = default;
|
|
|
|
ring_mapping&
|
|
ring_mapping::operator=(ring_mapping&&) noexcept = default;
|
|
|
|
ring_mapping::~ring_mapping() = default;
|
|
|
|
auto* ring_mapping::operator->() {
|
|
return &_impl->map;
|
|
}
|
|
|
|
auto* ring_mapping::operator->() const {
|
|
return &std::as_const(_impl->map);
|
|
}
|
|
|
|
auto& ring_mapping::operator*() {
|
|
return _impl->map;
|
|
}
|
|
|
|
auto& ring_mapping::operator*() const {
|
|
return std::as_const(_impl->map);
|
|
}
|
|
|
|
logging::logger rslogger("replication_strategy");
|
|
|
|
abstract_replication_strategy::abstract_replication_strategy(
|
|
replication_strategy_params params,
|
|
replication_strategy_type my_type)
|
|
: _config_options(params.options)
|
|
, _my_type(my_type) {
|
|
if (params.initial_tablets.has_value()) {
|
|
_uses_tablets = true;
|
|
}
|
|
}
|
|
|
|
abstract_replication_strategy::ptr_type abstract_replication_strategy::create_replication_strategy(const sstring& strategy_name, replication_strategy_params params, const locator::topology* topo) {
|
|
try {
|
|
return create_object<abstract_replication_strategy, replication_strategy_params, const locator::topology*>(strategy_name, std::move(params), std::move(topo));
|
|
} catch (const no_such_class& e) {
|
|
throw exceptions::configuration_exception(e.what());
|
|
}
|
|
}
|
|
|
|
// class registry signature must match the actual replication strategies' signature
|
|
using strategy_class_registry = class_registry<
|
|
locator::abstract_replication_strategy,
|
|
replication_strategy_params,
|
|
const topology*>;
|
|
|
|
sstring abstract_replication_strategy::to_qualified_class_name(std::string_view strategy_class_name) {
|
|
return strategy_class_registry::to_qualified_class_name(strategy_class_name);
|
|
}
|
|
|
|
static const std::unordered_set<locator::host_id>* find_token(const ring_mapping& ring_mapping, const token& token) {
|
|
if (ring_mapping->empty()) {
|
|
return nullptr;
|
|
}
|
|
const auto interval = token_metadata::range_to_interval(wrapping_interval<dht::token>(token));
|
|
const auto it = ring_mapping->find(interval);
|
|
return it != ring_mapping->end() ? &it->second : nullptr;
|
|
}
|
|
|
|
host_id_vector_topology_change vnode_effective_replication_map::get_pending_replicas(const token& search_token) const {
|
|
const auto* pending_endpoints = find_token(_pending_endpoints, search_token);
|
|
if (!pending_endpoints) {
|
|
return host_id_vector_topology_change{};
|
|
}
|
|
return *pending_endpoints | std::ranges::to<host_id_vector_topology_change>();
|
|
}
|
|
|
|
host_id_vector_replica_set vnode_effective_replication_map::get_replicas_for_reading(const token& token, bool is_vnode) const {
|
|
const auto* endpoints = find_token(_read_endpoints, token);
|
|
if (endpoints == nullptr) {
|
|
return get_natural_replicas(token, is_vnode);
|
|
}
|
|
return *endpoints | std::ranges::to<host_id_vector_replica_set>();
|
|
}
|
|
|
|
std::optional<tablet_routing_info> vnode_effective_replication_map::check_locality(const token& token) const {
|
|
return {};
|
|
}
|
|
|
|
bool vnode_effective_replication_map::has_pending_ranges(locator::host_id endpoint) const {
|
|
for (const auto& item : *_pending_endpoints) {
|
|
const auto& nodes = item.second;
|
|
if (nodes.contains(endpoint)) {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
std::unordered_set<locator::host_id> vnode_effective_replication_map::get_all_pending_nodes() const {
|
|
std::unordered_set<locator::host_id> endpoints;
|
|
for (const auto& item : *_pending_endpoints) {
|
|
endpoints.insert(item.second.begin(), item.second.end());
|
|
}
|
|
|
|
return endpoints;
|
|
}
|
|
|
|
std::unique_ptr<token_range_splitter> vnode_effective_replication_map::make_splitter() const {
|
|
return locator::make_splitter(_tmptr);
|
|
}
|
|
|
|
const dht::sharder& vnode_effective_replication_map::get_sharder(const schema& s) const {
|
|
return s.get_sharder();
|
|
}
|
|
|
|
const per_table_replication_strategy* abstract_replication_strategy::maybe_as_per_table() const {
|
|
if (!_per_table) {
|
|
return nullptr;
|
|
}
|
|
return dynamic_cast<const per_table_replication_strategy*>(this);
|
|
}
|
|
|
|
const tablet_aware_replication_strategy* abstract_replication_strategy::maybe_as_tablet_aware() const {
|
|
if (!_uses_tablets) {
|
|
return nullptr;
|
|
}
|
|
return dynamic_cast<const tablet_aware_replication_strategy*>(this);
|
|
}
|
|
|
|
size_t replication_factor_data::parse(const sstring& rf) {
|
|
if (rf.empty() || std::any_of(rf.begin(), rf.end(), [] (char c) {return !isdigit(c);})) {
|
|
throw exceptions::configuration_exception(
|
|
format("Replication factor must be numeric and non-negative, found '{}'", rf));
|
|
}
|
|
try {
|
|
return std::stol(rf);
|
|
} catch (...) {
|
|
throw exceptions::configuration_exception(
|
|
sstring("Replication factor must be numeric; found ") + rf);
|
|
}
|
|
}
|
|
|
|
replication_factor_data abstract_replication_strategy::parse_replication_factor(const replication_strategy_config_option& rf)
|
|
{
|
|
return replication_factor_data(rf);
|
|
}
|
|
|
|
size_t get_replication_factor(const replication_strategy_config_option& opt) {
|
|
return replication_factor_data(opt).count();
|
|
}
|
|
|
|
bool uses_rack_list_exclusively(const replication_strategy_config_options& opts) {
|
|
for (auto& [_, val] : opts) {
|
|
if (!std::holds_alternative<rack_list>(val)) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
replication_factor_data::replication_factor_data(const replication_strategy_config_option& rf) {
|
|
std::visit(overloaded_functor {
|
|
[&] (const sstring& rf) {
|
|
auto rf_value = parse(rf);
|
|
_data.emplace<size_t>(rf_value);
|
|
_count = rf_value;
|
|
},
|
|
[&] (const std::vector<sstring>& racks) {
|
|
_data.emplace<std::vector<sstring>>(racks);
|
|
_count = racks.size();
|
|
}
|
|
}, rf);
|
|
}
|
|
|
|
void replication_factor_data::validate(const std::unordered_set<sstring>& allowed_racks) {
|
|
std::visit(overloaded_functor {
|
|
[&] (const size_t& rf) {},
|
|
[&] (const std::vector<sstring>& racks) {
|
|
for (const auto& rack : racks) {
|
|
if (!allowed_racks.contains(rack)) {
|
|
throw exceptions::configuration_exception(
|
|
fmt::format("Unrecognized rack name '{}'. allowed_racks={}", rack, allowed_racks));
|
|
}
|
|
}
|
|
}
|
|
}, _data);
|
|
}
|
|
|
|
rack_diff diff_racks(const rack_list& old_racks, const rack_list& new_racks) {
|
|
std::set<sstring> old_racks_set(old_racks.begin(), old_racks.end());
|
|
std::set<sstring> new_racks_set(new_racks.begin(), new_racks.end());
|
|
|
|
rack_diff diff;
|
|
std::ranges::set_difference(new_racks_set, old_racks_set, std::back_inserter(diff.added));
|
|
std::ranges::set_difference(old_racks_set, new_racks_set, std::back_inserter(diff.removed));
|
|
return diff;
|
|
}
|
|
|
|
static
|
|
void
|
|
insert_token_range_to_sorted_container_while_unwrapping(
|
|
const dht::token& prev_tok,
|
|
const dht::token& tok,
|
|
dht::token_range_vector& ret) {
|
|
if (prev_tok < tok) {
|
|
auto pos = ret.end();
|
|
if (!ret.empty() && !std::prev(pos)->end()) {
|
|
// We inserted a wrapped range (a, b] previously as
|
|
// (-inf, b], (a, +inf). So now we insert in the next-to-last
|
|
// position to keep the last range (a, +inf) at the end.
|
|
pos = std::prev(pos);
|
|
}
|
|
ret.insert(pos,
|
|
dht::token_range{
|
|
dht::token_range::bound(prev_tok, false),
|
|
dht::token_range::bound(tok, true)});
|
|
} else {
|
|
ret.emplace_back(
|
|
dht::token_range::bound(prev_tok, false),
|
|
std::nullopt);
|
|
// Insert in front to maintain sorded order
|
|
ret.emplace(
|
|
ret.begin(),
|
|
std::nullopt,
|
|
dht::token_range::bound(tok, true));
|
|
}
|
|
}
|
|
|
|
future<dht::token_range_vector>
|
|
vnode_effective_replication_map::do_get_ranges(noncopyable_function<stop_iteration(bool&, const host_id&)> consider_range_for_endpoint) const {
|
|
dht::token_range_vector ret;
|
|
const auto& tm = *_tmptr;
|
|
const auto& sorted_tokens = tm.sorted_tokens();
|
|
if (sorted_tokens.empty()) {
|
|
on_internal_error(rslogger, "Token metadata is empty");
|
|
}
|
|
auto prev_tok = sorted_tokens.back();
|
|
for (const auto& tok : sorted_tokens) {
|
|
bool add_range = false;
|
|
for_each_natural_endpoint_until(tok, [&] (const host_id& ep) {
|
|
return consider_range_for_endpoint(add_range, ep);
|
|
});
|
|
if (add_range) {
|
|
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
|
|
}
|
|
prev_tok = tok;
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<dht::token_range_vector>
|
|
vnode_effective_replication_map::get_ranges(host_id ep) const {
|
|
// The callback function below is called for each endpoint
|
|
// in each token natural endpoints.
|
|
// Add the range if `ep` is found in the token's natural endpoints
|
|
return do_get_ranges([ep] (bool& add_range, const host_id& e) {
|
|
if ((add_range = (e == ep))) {
|
|
// stop iteration a match is found
|
|
return stop_iteration::yes;
|
|
}
|
|
return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
// Caller must ensure that token_metadata will not change throughout the call.
|
|
future<dht::token_range_vector>
|
|
abstract_replication_strategy::get_ranges(locator::host_id ep, token_metadata_ptr tmptr) const {
|
|
co_return co_await get_ranges(ep, *tmptr);
|
|
}
|
|
|
|
// Caller must ensure that token_metadata will not change throughout the call.
|
|
future<dht::token_range_vector>
|
|
abstract_replication_strategy::get_ranges(locator::host_id ep, const token_metadata& tm) const {
|
|
dht::token_range_vector ret;
|
|
if (!tm.is_normal_token_owner(ep)) {
|
|
co_return ret;
|
|
}
|
|
const auto& sorted_tokens = tm.sorted_tokens();
|
|
if (sorted_tokens.empty()) {
|
|
on_internal_error(rslogger, "Token metadata is empty");
|
|
}
|
|
auto prev_tok = sorted_tokens.back();
|
|
for (auto tok : sorted_tokens) {
|
|
bool should_add = false;
|
|
if (get_type() == replication_strategy_type::everywhere_topology) {
|
|
// everywhere_topology deserves this special fast path.
|
|
// Using the common path would make the function quadratic in the number of endpoints.
|
|
should_add = true;
|
|
} else {
|
|
auto eps = co_await calculate_natural_endpoints(tok, tm);
|
|
should_add = eps.contains(ep);
|
|
}
|
|
if (should_add) {
|
|
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
|
|
}
|
|
prev_tok = tok;
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<dht::token_range_vector>
|
|
vnode_effective_replication_map::get_primary_ranges(locator::host_id ep) const {
|
|
// The callback function below is called for each endpoint
|
|
// in each token natural endpoints.
|
|
// Add the range if `ep` is the primary replica in the token's natural endpoints.
|
|
// The primary replica is first in the natural endpoints list.
|
|
return do_get_ranges([ep] (bool& add_range, const locator::host_id& e) {
|
|
add_range = (e == ep);
|
|
// stop the iteration once the first node was considered.
|
|
return stop_iteration::yes;
|
|
});
|
|
}
|
|
|
|
future<dht::token_range_vector>
|
|
vnode_effective_replication_map::get_primary_ranges_within_dc(locator::host_id ep) const {
|
|
const topology& topo = _tmptr->get_topology();
|
|
sstring local_dc = topo.get_datacenter(ep);
|
|
std::unordered_set<locator::host_id> local_dc_nodes = _tmptr->get_datacenter_token_owners().at(local_dc);
|
|
// The callback function below is called for each endpoint
|
|
// in each token natural endpoints.
|
|
// Add the range if `ep` is the datacenter primary replica in the token's natural endpoints.
|
|
// The primary replica in each datacenter is determined by the natural endpoints list order.
|
|
return do_get_ranges([ep, local_dc_nodes = std::move(local_dc_nodes)] (bool& add_range, const locator::host_id& e) {
|
|
// Unlike get_primary_ranges() which checks if ep is the first
|
|
// owner of this range, here we check if ep is the first just
|
|
// among nodes which belong to the local dc of ep.
|
|
if (!local_dc_nodes.contains(e)) {
|
|
return stop_iteration::no;
|
|
}
|
|
add_range = (e == ep);
|
|
// stop the iteration once the first node contained the local datacenter was considered.
|
|
return stop_iteration::yes;
|
|
});
|
|
}
|
|
|
|
future<std::unordered_map<dht::token_range, host_id_vector_replica_set>>
|
|
vnode_effective_replication_map::get_range_host_ids() const {
|
|
const token_metadata& tm = *_tmptr;
|
|
std::unordered_map<dht::token_range, host_id_vector_replica_set> ret;
|
|
for (auto& t : tm.sorted_tokens()) {
|
|
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
|
|
for (auto& r : ranges) {
|
|
ret.emplace(r, do_get_replicas(t, true));
|
|
}
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<std::unordered_map<dht::token_range, host_id_vector_replica_set>>
|
|
abstract_replication_strategy::get_range_host_ids(const token_metadata& tm) const {
|
|
std::unordered_map<dht::token_range, host_id_vector_replica_set> ret;
|
|
for (auto& t : tm.sorted_tokens()) {
|
|
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
|
|
auto eps = co_await calculate_natural_endpoints(t, tm);
|
|
for (auto& r : ranges) {
|
|
ret.emplace(r, eps.get_vector());
|
|
}
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
future<dht::token_range_vector>
|
|
abstract_replication_strategy::get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const {
|
|
dht::token_range_vector ret;
|
|
auto temp = co_await tmptr->clone_only_token_map();
|
|
temp.update_topology(pending_address, std::move(dr));
|
|
co_await temp.update_normal_tokens(pending_tokens, pending_address);
|
|
for (const auto& t : temp.sorted_tokens()) {
|
|
auto eps = co_await calculate_natural_endpoints(t, temp);
|
|
if (eps.contains(pending_address)) {
|
|
dht::token_range_vector r = temp.get_primary_ranges_for(t);
|
|
rslogger.debug("get_pending_address_ranges: token={} primary_range={} endpoint={}", t, r, pending_address);
|
|
ret.insert(ret.end(), r.begin(), r.end());
|
|
}
|
|
}
|
|
co_await temp.clear_gently();
|
|
co_return ret;
|
|
}
|
|
|
|
static const auto default_replication_map_key = dht::token::from_int64(0);
|
|
|
|
future<mutable_static_effective_replication_map_ptr> calculate_vnode_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) {
|
|
replication_map replication_map;
|
|
ring_mapping pending_endpoints;
|
|
ring_mapping read_endpoints;
|
|
std::unordered_set<locator::host_id> dirty_endpoints;
|
|
|
|
const auto depend_on_token = rs->natural_endpoints_depend_on_token();
|
|
const auto& sorted_tokens = tmptr->sorted_tokens();
|
|
replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1);
|
|
if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) {
|
|
const auto& all_tokens = topology_changes->all_tokens;
|
|
const auto& current_tokens = tmptr->get_token_to_endpoint();
|
|
for (size_t i = 0, size = all_tokens.size(); i < size; ++i) {
|
|
co_await coroutine::maybe_yield();
|
|
|
|
const auto token = all_tokens[i];
|
|
|
|
auto current_endpoints = co_await rs->calculate_natural_endpoints(token, *tmptr);
|
|
auto target_endpoints = co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata);
|
|
|
|
auto add_mapping = [&](ring_mapping& target, std::unordered_set<locator::host_id>&& endpoints) {
|
|
using interval = ring_mapping_underlying_type::interval_type;
|
|
if (!depend_on_token) {
|
|
*target += std::make_pair(
|
|
interval::open(dht::minimum_token(), dht::maximum_token()),
|
|
std::move(endpoints));
|
|
} else if (i == 0) {
|
|
*target += std::make_pair(
|
|
interval::open(all_tokens.back(), dht::maximum_token()),
|
|
endpoints);
|
|
*target += std::make_pair(
|
|
interval::left_open(dht::minimum_token(), token),
|
|
std::move(endpoints));
|
|
} else {
|
|
*target += std::make_pair(
|
|
interval::left_open(all_tokens[i - 1], token),
|
|
std::move(endpoints));
|
|
}
|
|
};
|
|
|
|
{
|
|
host_id_set endpoints_diff;
|
|
for (const auto& e: target_endpoints) {
|
|
if (!current_endpoints.contains(e)) {
|
|
endpoints_diff.insert(e);
|
|
}
|
|
}
|
|
if (!endpoints_diff.empty()) {
|
|
add_mapping(pending_endpoints, std::move(endpoints_diff).extract_set());
|
|
}
|
|
}
|
|
|
|
// If an endpoint is in target endpoints, but not in current endpoints it means
|
|
// it loses a range and becomes dirty
|
|
for (auto& h : current_endpoints) {
|
|
if (!target_endpoints.contains(h)) {
|
|
dirty_endpoints.emplace(h);
|
|
}
|
|
}
|
|
|
|
// in order not to waste memory, we update read_endpoints only if the
|
|
// new endpoints differs from the old one
|
|
if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) {
|
|
add_mapping(read_endpoints, std::move(target_endpoints).extract_set());
|
|
}
|
|
|
|
if (!depend_on_token) {
|
|
replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector());
|
|
break;
|
|
} else if (current_tokens.contains(token)) {
|
|
replication_map.emplace(token, std::move(current_endpoints).extract_vector());
|
|
}
|
|
}
|
|
} else if (depend_on_token) {
|
|
for (const auto &t : sorted_tokens) {
|
|
auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr);
|
|
replication_map.emplace(t, std::move(eps).extract_vector());
|
|
}
|
|
} else {
|
|
auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr);
|
|
replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector());
|
|
}
|
|
|
|
auto rf = rs->get_replication_factor(*tmptr);
|
|
co_return make_vnode_effective_replication_map_ptr(std::move(rs), std::move(tmptr), std::move(replication_map),
|
|
std::move(pending_endpoints), std::move(read_endpoints), std::move(dirty_endpoints), rf);
|
|
}
|
|
|
|
future<mutable_static_effective_replication_map_ptr> vnode_effective_replication_map::clone_gently(replication_strategy_ptr rs, token_metadata_ptr tmptr) const {
|
|
replication_map replication_map;
|
|
ring_mapping pending_endpoints;
|
|
ring_mapping read_endpoints;
|
|
std::unordered_set<locator::host_id> dirty_endpoints;
|
|
|
|
for (auto& i : _replication_map) {
|
|
replication_map.emplace(i.first, i.second);
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
for (const auto& i : *_pending_endpoints) {
|
|
*pending_endpoints += i;
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
for (const auto& i : *_read_endpoints) {
|
|
*read_endpoints += i;
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
// no need to yield while copying since this is bound by nodes, not vnodes
|
|
dirty_endpoints = _dirty_endpoints;
|
|
|
|
co_return make_vnode_effective_replication_map_ptr(std::move(rs), std::move(tmptr), std::move(replication_map),
|
|
std::move(pending_endpoints), std::move(read_endpoints), std::move(dirty_endpoints), _replication_factor);
|
|
}
|
|
|
|
host_id_vector_replica_set vnode_effective_replication_map::do_get_replicas(const token& tok,
|
|
bool is_vnode) const
|
|
{
|
|
const token& key_token = _rs->natural_endpoints_depend_on_token()
|
|
? (is_vnode ? tok : _tmptr->first_token(tok))
|
|
: default_replication_map_key;
|
|
const auto it = _replication_map.find(key_token);
|
|
if (it == _replication_map.end()) {
|
|
on_internal_error(rslogger, format("Token {} not found in replication map: natural_endpoints_depend_on_token={} token={} vnode_token={}",
|
|
key_token, _rs->natural_endpoints_depend_on_token(), tok, _tmptr->first_token(tok)));
|
|
}
|
|
return it->second;
|
|
}
|
|
|
|
host_id_vector_replica_set vnode_effective_replication_map::get_replicas(const token& tok, bool is_vnode) const {
|
|
return do_get_replicas(tok, is_vnode);
|
|
}
|
|
|
|
host_id_vector_replica_set vnode_effective_replication_map::get_natural_replicas(const token& search_token, bool is_vnode) const {
|
|
return get_replicas(search_token, is_vnode);
|
|
}
|
|
|
|
stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until(const token& vnode_tok, const noncopyable_function<stop_iteration(const host_id&)>& func) const {
|
|
for (const auto& ep : do_get_replicas(vnode_tok, true)) {
|
|
if (func(ep) == stop_iteration::yes) {
|
|
return stop_iteration::yes;
|
|
}
|
|
}
|
|
return stop_iteration::no;
|
|
}
|
|
|
|
static_effective_replication_map::~static_effective_replication_map() {
|
|
if (is_registered()) {
|
|
_factory->erase_effective_replication_map(this);
|
|
}
|
|
}
|
|
|
|
vnode_effective_replication_map::~vnode_effective_replication_map() {
|
|
if (is_registered()) {
|
|
try {
|
|
_factory->submit_background_work(dispose_gently(std::move(_replication_map),
|
|
std::move(*_pending_endpoints),
|
|
std::move(*_read_endpoints),
|
|
std::move(_tmptr)));
|
|
} catch (...) {
|
|
// ignore
|
|
}
|
|
}
|
|
}
|
|
|
|
effective_replication_map::effective_replication_map(replication_strategy_ptr rs,
|
|
token_metadata_ptr tmptr,
|
|
size_t replication_factor) noexcept
|
|
: _rs(std::move(rs))
|
|
, _tmptr(std::move(tmptr))
|
|
, _replication_factor(replication_factor)
|
|
, _validity_abort_source(std::make_unique<abort_source>())
|
|
{ }
|
|
|
|
static_effective_replication_map::factory_key static_effective_replication_map::make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr) {
|
|
return factory_key(rs->get_type(), rs->get_config_options(), tmptr->get_ring_version());
|
|
}
|
|
|
|
future<static_effective_replication_map_ptr> effective_replication_map_factory::create_static_effective_replication_map(replication_strategy_ptr rs, const token_metadata_ptr& tmptr) {
|
|
// lookup key on local shard
|
|
auto key = static_effective_replication_map::make_factory_key(rs, tmptr);
|
|
auto erm = find_effective_replication_map(key);
|
|
if (erm) {
|
|
rslogger.debug("create_static_effective_replication_map: found {} [{}]", key, fmt::ptr(erm.get()));
|
|
co_return erm;
|
|
}
|
|
|
|
mutable_static_effective_replication_map_ptr new_erm;
|
|
|
|
if (rs->is_local()) {
|
|
// Local replication strategy does not benefit from cloning across shards
|
|
// to save an expensive calculate function like `calculate_vnode_effective_replication_map`
|
|
new_erm = make_local_effective_replication_map_ptr(std::move(rs), std::move(tmptr));
|
|
} else {
|
|
// try to find a reference erm on shard 0
|
|
// TODO:
|
|
// - use hash of key to distribute the load
|
|
// - instaintiate only on NUMA nodes
|
|
auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future<foreign_ptr<static_effective_replication_map_ptr>> {
|
|
auto erm = ermf.find_effective_replication_map(key);
|
|
co_return make_foreign<static_effective_replication_map_ptr>(std::move(erm));
|
|
});
|
|
if (ref_erm) {
|
|
new_erm = co_await ref_erm->clone_gently(std::move(rs), std::move(tmptr));
|
|
} else {
|
|
new_erm = co_await calculate_vnode_effective_replication_map(std::move(rs), std::move(tmptr));
|
|
}
|
|
}
|
|
co_return insert_effective_replication_map(std::move(new_erm), std::move(key));
|
|
}
|
|
|
|
static_effective_replication_map_ptr effective_replication_map_factory::find_effective_replication_map(const static_effective_replication_map::factory_key& key) const {
|
|
auto it = _effective_replication_maps.find(key);
|
|
if (it != _effective_replication_maps.end()) {
|
|
return it->second->shared_from_this();
|
|
}
|
|
return {};
|
|
}
|
|
|
|
static_effective_replication_map_ptr effective_replication_map_factory::insert_effective_replication_map(mutable_static_effective_replication_map_ptr erm, static_effective_replication_map::factory_key key) {
|
|
auto [it, inserted] = _effective_replication_maps.insert({key, erm.get()});
|
|
if (inserted) {
|
|
rslogger.debug("insert_effective_replication_map: inserted {} [{}]", key, fmt::ptr(erm.get()));
|
|
erm->set_factory(*this, std::move(key));
|
|
return erm;
|
|
}
|
|
auto res = it->second->shared_from_this();
|
|
rslogger.debug("insert_effective_replication_map: found {} [{}]", key, fmt::ptr(res.get()));
|
|
return res;
|
|
}
|
|
|
|
bool effective_replication_map_factory::erase_effective_replication_map(static_effective_replication_map* erm) {
|
|
const auto& key = erm->get_factory_key();
|
|
auto it = _effective_replication_maps.find(key);
|
|
if (it == _effective_replication_maps.end()) {
|
|
rslogger.warn("Could not unregister effective_replication_map {} [{}]: key not found", key, fmt::ptr(erm));
|
|
return false;
|
|
}
|
|
if (it->second != erm) {
|
|
rslogger.warn("Could not unregister effective_replication_map {} [{}]: different instance [{}] is currently registered", key, fmt::ptr(erm), fmt::ptr(it->second));
|
|
return false;
|
|
}
|
|
rslogger.debug("erase_effective_replication_map: erased {} [{}]", key, fmt::ptr(erm));
|
|
_effective_replication_maps.erase(it);
|
|
return true;
|
|
}
|
|
|
|
future<> effective_replication_map_factory::stop() noexcept {
|
|
_stopped = true;
|
|
if (!_effective_replication_maps.empty()) {
|
|
for (auto it = _effective_replication_maps.begin(); it != _effective_replication_maps.end(); it = _effective_replication_maps.erase(it)) {
|
|
auto& [key, erm] = *it;
|
|
rslogger.debug("effective_replication_map_factory::stop found outstanding map {} [{}]", key, fmt::ptr(erm));
|
|
// unregister outstanding effective_replication_maps
|
|
// so they won't try to submit background work
|
|
// to gently clear their contents when they are destroyed.
|
|
erm->unregister();
|
|
}
|
|
|
|
// FIXME: reinstate the internal error
|
|
// when https://github.com/scylladb/scylla/issues/8995
|
|
// is fixed and shutdown order ensures that no outstanding maps
|
|
// are expected here.
|
|
// (see also https://github.com/scylladb/scylla/issues/9684)
|
|
// on_internal_error_noexcept(rslogger, "effective_replication_map_factory stopped with outstanding maps");
|
|
}
|
|
|
|
return std::exchange(_background_work, make_ready_future<>());
|
|
}
|
|
|
|
void effective_replication_map_factory::submit_background_work(future<> fut) {
|
|
if (fut.available() && !fut.failed()) {
|
|
return;
|
|
}
|
|
if (_stopped) {
|
|
on_internal_error(rslogger, "Cannot submit background work: registry already stopped");
|
|
}
|
|
_background_work = _background_work.then([fut = std::move(fut)] () mutable {
|
|
return std::move(fut).handle_exception([] (std::exception_ptr ex) {
|
|
// Ignore errors since we have nothing else to do about them.
|
|
rslogger.warn("effective_replication_map_factory background task failed: {}. Ignored.", std::move(ex));
|
|
});
|
|
});
|
|
}
|
|
|
|
future<> global_static_effective_replication_map::get_keyspace_erms(sharded<replica::database>& sharded_db, std::string_view keyspace_name) {
|
|
return sharded_db.invoke_on(0, [this, &sharded_db, keyspace_name] (replica::database& db) -> future<> {
|
|
// To ensure we get the same effective_replication_map
|
|
// on all shards, acquire the shared_token_metadata lock.
|
|
//
|
|
// As a sanity check compare the ring_version on each shard
|
|
// to the reference version on shard 0.
|
|
//
|
|
// This invariant is achieved by storage_service::mutate_token_metadata
|
|
// and storage_service::replicate_to_all_cores that first acquire the
|
|
// shared_token_metadata lock, then prepare a mutated token metadata
|
|
// that will have an incremented ring_version, use it to re-calculate
|
|
// all e_r_m:s and clone both on all shards. including the ring version,
|
|
// all under the lock.
|
|
auto lk = co_await db.get_shared_token_metadata().get_lock();
|
|
auto erm = db.find_keyspace(keyspace_name).get_static_effective_replication_map();
|
|
utils::get_local_injector().inject("get_keyspace_erms_throw_no_such_keyspace",
|
|
[&keyspace_name] { throw data_dictionary::no_such_keyspace{keyspace_name}; });
|
|
auto ring_version = erm->get_token_metadata().get_ring_version();
|
|
_erms[0] = make_foreign(std::move(erm));
|
|
co_await coroutine::parallel_for_each(std::views::iota(1u, smp::count), [this, &sharded_db, keyspace_name, ring_version] (unsigned shard) -> future<> {
|
|
_erms[shard] = co_await sharded_db.invoke_on(shard, [keyspace_name, ring_version] (const replica::database& db) {
|
|
const auto& ks = db.find_keyspace(keyspace_name);
|
|
auto erm = ks.get_static_effective_replication_map();
|
|
auto local_ring_version = erm->get_token_metadata().get_ring_version();
|
|
if (local_ring_version != ring_version) {
|
|
on_internal_error(rslogger, format("Inconsistent effective_replication_map ring_verion {}, expected {}", local_ring_version, ring_version));
|
|
}
|
|
return make_foreign(std::move(erm));
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
future<global_static_effective_replication_map> make_global_static_effective_replication_map(sharded<replica::database>& sharded_db, std::string_view keyspace_name) {
|
|
global_static_effective_replication_map ret;
|
|
co_await ret.get_keyspace_erms(sharded_db, keyspace_name);
|
|
co_return ret;
|
|
}
|
|
|
|
} // namespace locator
|
|
|
|
auto fmt::formatter<locator::replication_strategy_type>::format(locator::replication_strategy_type t,
|
|
fmt::format_context& ctx) const -> decltype(ctx.out()) {
|
|
std::string_view name;
|
|
switch (t) {
|
|
using enum locator::replication_strategy_type;
|
|
case simple:
|
|
name = "simple";
|
|
break;
|
|
case local:
|
|
name = "local";
|
|
break;
|
|
case network_topology:
|
|
name = "network_topology";
|
|
break;
|
|
case everywhere_topology:
|
|
name = "everywhere_topology";
|
|
break;
|
|
};
|
|
return fmt::format_to(ctx.out(), "{}", name);
|
|
}
|
|
|
|
auto fmt::formatter<locator::static_effective_replication_map::factory_key>::format(const locator::static_effective_replication_map::factory_key& key,
|
|
fmt::format_context& ctx) const -> decltype(ctx.out()) {
|
|
auto out = fmt::format_to(ctx.out(), "{}.{}", key.rs_type, key.ring_version);
|
|
char sep = ':';
|
|
for (const auto& [opt, val] : key.rs_config_options) {
|
|
out = fmt::format_to(out, "{}{}={}", sep, opt, val);
|
|
sep = ',';
|
|
}
|
|
return out;
|
|
}
|
|
|
|
auto fmt::formatter<locator::replication_factor_data>::format(const locator::replication_factor_data& rf, fmt::format_context& ctx) const -> decltype(ctx.out()) {
|
|
return fmt::format_to(ctx.out(), "{}", rf.count());
|
|
}
|