Files
scylladb/locator/topology.cc
Patryk Jędrzejczak 4160ae94c1 locator/node: preserve _excluded in clone()
We currently ignore the `_excluded` field in `clone()`. Losing
information about exclusion can have unpredictable consequences. One
observed effect (that led to finding this issue) is that the
`/storage_service/nodes/excluded` API endpoint sometimes misses excluded
nodes.
2025-11-26 13:26:11 +01:00

594 lines
21 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <bit>
#include <ranges>
#include <utility>
#include <fmt/std.h>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/on_internal_error.hh>
#include <seastar/util/lazy.hh>
#include <seastar/core/shard_id.hh>
#include "utils/log.hh"
#include "locator/topology.hh"
#include "locator/production_snitch_base.hh"
#include "utils/assert.hh"
#include "utils/stall_free.hh"
#include "utils/to_string.hh"
struct node_printer {
const locator::node* v;
node_printer(const locator::node* n) noexcept : v(n) {}
};
template <>
struct fmt::formatter<node_printer> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const node_printer& np, fmt::format_context& ctx) const {
const locator::node* node = np.v;
auto out = fmt::format_to(ctx.out(), "node={}", fmt::ptr(node));
if (node) {
out = fmt::format_to(out, " {:v}", *node);
}
return out;
}
};
static auto lazy_backtrace() {
return seastar::value_of([] { return current_backtrace(); });
}
namespace locator {
static logging::logger tlogger("topology");
thread_local const endpoint_dc_rack endpoint_dc_rack::default_location = {
.dc = locator::production_snitch_base::default_dc,
.rack = locator::production_snitch_base::default_rack,
};
node::node(const locator::topology* topology, locator::host_id id, endpoint_dc_rack dc_rack, state state, shard_id shard_count, bool excluded, this_node is_this_node, node::idx_type idx)
: _topology(topology)
, _host_id(id)
, _dc_rack(std::move(dc_rack))
, _state(state)
, _shard_count(std::move(shard_count))
, _excluded(excluded)
, _is_this_node(is_this_node)
, _idx(idx)
{}
node_holder node::make(const locator::topology* topology, locator::host_id id, endpoint_dc_rack dc_rack, state state, shard_id shard_count, bool excluded, node::this_node is_this_node, node::idx_type idx) {
return std::make_unique<node>(topology, std::move(id), std::move(dc_rack), std::move(state), shard_count, excluded, is_this_node, idx);
}
node_holder node::clone() const {
return make(nullptr, host_id(), dc_rack(), get_state(), get_shard_count(), is_excluded(), is_this_node());
}
std::string node::to_string(node::state s) {
switch (s) {
case state::none: return "none";
case state::bootstrapping: return "bootstrapping";
case state::replacing: return "replacing";
case state::normal: return "normal";
case state::being_decommissioned: return "being_decommissioned";
case state::being_removed: return "being_removed";
case state::being_replaced: return "being_replaced";
case state::left: return "left";
}
__builtin_unreachable();
}
future<> topology::clear_gently() noexcept {
_this_node = nullptr;
co_await utils::clear_gently(_dc_endpoints);
co_await utils::clear_gently(_dc_racks);
_datacenters.clear();
_dc_rack_nodes.clear();
_dc_nodes.clear();
_nodes_by_host_id.clear();
co_await utils::clear_gently(_nodes);
}
topology::topology(shallow_copy, config cfg)
: _shard(this_shard_id())
, _cfg(cfg)
, _sort_by_proximity(true)
{
// constructor for shallow copying of token_metadata_impl
}
topology::topology(config cfg)
: _shard(this_shard_id())
, _cfg(cfg)
, _sort_by_proximity(!cfg.disable_proximity_sorting)
, _random_engine(std::random_device{}())
{
tlogger.trace("topology[{}]: constructing using config: endpoint={} id={} dc={} rack={}", fmt::ptr(this),
cfg.this_endpoint, cfg.this_host_id, cfg.local_dc_rack.dc, cfg.local_dc_rack.rack);
add_node(cfg.this_host_id, cfg.local_dc_rack, node::state::none);
}
topology::topology(topology&& o) noexcept
: _shard(o._shard)
, _cfg(std::move(o._cfg))
, _this_node(std::exchange(o._this_node, nullptr))
, _nodes(std::move(o._nodes))
, _nodes_by_host_id(std::move(o._nodes_by_host_id))
, _dc_nodes(std::move(o._dc_nodes))
, _dc_rack_nodes(std::move(o._dc_rack_nodes))
, _dc_endpoints(std::move(o._dc_endpoints))
, _dc_racks(std::move(o._dc_racks))
, _sort_by_proximity(o._sort_by_proximity)
, _datacenters(std::move(o._datacenters))
, _random_engine(std::move(o._random_engine))
{
SCYLLA_ASSERT(_shard == this_shard_id());
tlogger.trace("topology[{}]: move from [{}]", fmt::ptr(this), fmt::ptr(&o));
for (auto& n : _nodes) {
if (n) {
n->set_topology(this);
}
}
}
topology& topology::operator=(topology&& o) noexcept {
if (this != &o) {
this->~topology();
new (this) topology(std::move(o));
}
return *this;
}
void topology::set_host_id_cfg(host_id this_host_id) {
if (_cfg.this_host_id) {
on_internal_error(tlogger, fmt::format("topology[{}] set_host_id_cfg can be caller only once current id {} new id {}", fmt::ptr(this), _cfg.this_host_id, this_host_id));
}
if (_nodes.size() != 1) {
on_internal_error(tlogger, fmt::format("topology[{}] set_host_id_cfg called while nodes size is greater than 1", fmt::ptr(this)));
}
if (!_this_node) {
on_internal_error(tlogger, fmt::format("topology[{}] set_host_id_cfg called while _this_nodes is null", fmt::ptr(this)));
}
if (_this_node->host_id()) {
on_internal_error(tlogger, fmt::format("topology[{}] set_host_id_cfg called while _this_nodes has non null id {}", fmt::ptr(this), _this_node->host_id()));
}
remove_node(*_this_node);
tlogger.trace("topology[{}]: set host id to {}", fmt::ptr(this), this_host_id);
_cfg.this_host_id = this_host_id;
add_or_update_endpoint(this_host_id);
}
future<topology> topology::clone_gently() const {
topology ret(topology::shallow_copy{}, _cfg);
tlogger.debug("topology[{}]: clone_gently to {} from shard {}", fmt::ptr(this), fmt::ptr(&ret), _shard);
for (const auto& nptr : _nodes) {
if (nptr) {
ret.add_node(nptr->clone());
}
co_await coroutine::maybe_yield();
}
ret._sort_by_proximity = _sort_by_proximity;
ret._random_engine = _random_engine;
co_return ret;
}
const node& topology::add_node(host_id id, const endpoint_dc_rack& dr, node::state state, shard_id shard_count) {
if (dr.dc.empty() || dr.rack.empty()) {
on_internal_error(tlogger, "Node must have valid dc and rack");
}
return add_node(node::make(this, id, dr, state, shard_count));
}
bool topology::is_configured_this_node(const node& n) const {
return _cfg.this_host_id == n.host_id();
}
const node& topology::add_node(node_holder nptr) {
const node* node = nptr.get();
if (nptr->topology() != this) {
if (nptr->topology()) {
on_fatal_internal_error(tlogger, seastar::format("topology[{}]: {} belongs to different topology={}", fmt::ptr(this), node_printer(node), fmt::ptr(node->topology())));
}
nptr->set_topology(this);
}
if (node->idx() > 0) {
on_internal_error(tlogger, seastar::format("topology[{}]: {}: has assigned idx", fmt::ptr(this), node_printer(nptr.get())));
}
// Note that _nodes contains also the this_node()
nptr->set_idx(_nodes.size());
_nodes.emplace_back(std::move(nptr));
try {
if (is_configured_this_node(*node)) {
if (_this_node) {
on_internal_error(tlogger, seastar::format("topology[{}]: {}: local node already mapped to {}", fmt::ptr(this), node_printer(node), node_printer(this_node())));
}
locator::node& n = *_nodes.back();
n._is_this_node = node::this_node::yes;
if (n._dc_rack == endpoint_dc_rack::default_location) {
n._dc_rack = _cfg.local_dc_rack;
}
}
tlogger.debug("topology[{}]: add_node: {}, at {}", fmt::ptr(this), node_printer(nptr.get()), lazy_backtrace());
index_node(*node);
} catch (...) {
pop_node(*node);
throw;
}
return *node;
}
void topology::update_node(node& node, std::optional<host_id> opt_id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st, std::optional<shard_id> opt_shard_count) {
tlogger.debug("topology[{}]: update_node: {}: to: host_id={} dc={} rack={} state={} shard_count={}, at {}", fmt::ptr(this), node_printer(&node),
opt_id ? format("{}", *opt_id) : "unchanged",
opt_dr ? format("{}", opt_dr->dc) : "unchanged",
opt_dr ? format("{}", opt_dr->rack) : "unchanged",
opt_st ? format("{}", *opt_st) : "unchanged",
opt_shard_count ? format("{}", *opt_shard_count) : "unchanged",
lazy_backtrace());
bool changed = false;
if (opt_id) {
if (*opt_id != node.host_id()) {
if (!*opt_id) {
on_internal_error(tlogger, seastar::format("Updating node host_id to null is disallowed: {}: new host_id={}", node_printer(&node), *opt_id));
}
if (node.is_this_node() && node.host_id()) {
on_internal_error(tlogger, seastar::format("This node host_id is already set: {}: new host_id={}", node_printer(&node), *opt_id));
}
if (_nodes_by_host_id.contains(*opt_id)) {
on_internal_error(tlogger, seastar::format("Cannot update node host_id: {}: new host_id already exists: {}", node_printer(&node), node_printer(find_node(*opt_id))));
}
changed = true;
} else {
opt_id.reset();
}
}
if (opt_dr) {
if (opt_dr->dc.empty() || opt_dr->dc == production_snitch_base::default_dc) {
opt_dr->dc = node.dc_rack().dc;
}
if (opt_dr->rack.empty() || opt_dr->rack == production_snitch_base::default_rack) {
opt_dr->rack = node.dc_rack().rack;
}
if (*opt_dr != node.dc_rack()) {
changed = true;
} else {
opt_dr.reset();
}
}
if (opt_st) {
changed |= node.get_state() != *opt_st;
}
if (opt_shard_count) {
changed |= node.get_shard_count() != *opt_shard_count;
}
if (!changed) {
return;
}
unindex_node(node);
// The following block must not throw
try {
if (opt_id) {
node._host_id = *opt_id;
}
if (opt_dr) {
node._dc_rack = std::move(*opt_dr);
}
if (opt_st) {
node.set_state(*opt_st);
}
if (opt_shard_count) {
node.set_shard_count(*opt_shard_count);
}
} catch (...) {
std::terminate();
}
index_node(node);
}
bool topology::remove_node(host_id id) {
auto node = find_node(id);
tlogger.debug("topology[{}]: remove_node: host_id={}: {}", fmt::ptr(this), id, node_printer(node));
if (node) {
remove_node(*node);
return true;
}
return false;
}
void topology::remove_node(const node& node) {
pop_node(node);
}
void topology::index_node(const node& node) {
tlogger.trace("topology[{}]: index_node: {}, at {}", fmt::ptr(this), node_printer(&node), lazy_backtrace());
if (node.idx() < 0) {
on_internal_error(tlogger, seastar::format("topology[{}]: {}: must already have a valid idx", fmt::ptr(this), node_printer(&node)));
}
// When a node is initialized before host id is known the topology is constructed with a single
// node with zero id
if (!node.host_id() && _nodes.size() != 1) {
on_internal_error(tlogger, fmt::format("topology[{}] try to index node with zero id while this is not a single node in the topology", fmt::ptr(this)));
}
auto [nit, inserted_host_id] = _nodes_by_host_id.emplace(node.host_id(), std::cref(node));
if (!inserted_host_id) {
on_internal_error(tlogger, seastar::format("topology[{}]: {}: node already exists", fmt::ptr(this), node_printer(&node)));
}
// We keep location of left nodes because they may still appear in tablet replica sets
// and algorithms expect to know which dc they belonged to. View replica pairing needs stable
// replica indexes.
// But we don't consider those nodes as members of the cluster so don't update dc registry.
if (!node.left() && !node.is_none()) {
const auto& dc = node.dc_rack().dc;
const auto& rack = node.dc_rack().rack;
const auto& endpoint = node.host_id();
_dc_nodes[dc].emplace(std::cref(node));
_dc_rack_nodes[dc][rack].emplace(std::cref(node));
_dc_endpoints[dc].insert(endpoint);
_dc_racks[dc][rack].insert(endpoint);
_datacenters.insert(dc);
}
if (node.is_this_node()) {
_this_node = &node;
}
}
void topology::unindex_node(const node& node) {
tlogger.trace("topology[{}]: unindex_node: {}, at {}", fmt::ptr(this), node_printer(&node), lazy_backtrace());
const auto& dc = node.dc_rack().dc;
const auto& rack = node.dc_rack().rack;
if (_dc_nodes.contains(dc)) {
bool found = _dc_nodes.at(dc).erase(std::cref(node));
if (found) {
if (auto dit = _dc_endpoints.find(dc); dit != _dc_endpoints.end()) {
const auto& ep = node.host_id();
auto& eps = dit->second;
eps.erase(ep);
if (eps.empty()) {
_dc_rack_nodes.erase(dc);
_dc_racks.erase(dc);
_dc_endpoints.erase(dit);
_datacenters.erase(dc);
} else {
_dc_rack_nodes[dc][rack].erase(std::cref(node));
auto& racks = _dc_racks[dc];
if (auto rit = racks.find(rack); rit != racks.end()) {
auto& rack_eps = rit->second;
rack_eps.erase(ep);
if (rack_eps.empty()) {
racks.erase(rit);
}
}
}
}
}
}
auto host_it = _nodes_by_host_id.find(node.host_id());
if (host_it != _nodes_by_host_id.end() && host_it->second == node) {
_nodes_by_host_id.erase(host_it);
}
if (_this_node == &node) {
_this_node = nullptr;
}
}
node_holder topology::pop_node(const node& node) {
tlogger.trace("topology[{}]: pop_node: {}, at {}", fmt::ptr(this), node_printer(&node), lazy_backtrace());
unindex_node(node);
auto nh = std::exchange(_nodes[node.idx()], {});
// shrink _nodes if the last node is popped
// like when failing to index a newly added node
if (std::cmp_equal(node.idx(), _nodes.size() - 1)) {
_nodes.resize(node.idx());
}
return nh;
}
// Finds a node by its host_id
// Returns nullptr if not found
const node* topology::find_node(host_id id) const noexcept {
auto it = _nodes_by_host_id.find(id);
if (it != _nodes_by_host_id.end()) {
return &it->second.get();
}
tlogger.trace("topology[{}]: find_node did not find {}", fmt::ptr(this), id);
return nullptr;
}
// Finds a node by its host_id
// Returns nullptr if not found
node* topology::find_node(host_id id) noexcept {
return make_mutable(const_cast<const topology*>(this)->find_node(id));
}
// Finds a node by its index
// Returns nullptr if not found
const node* topology::find_node(node::idx_type idx) const noexcept {
if (std::cmp_greater_equal(idx, _nodes.size())) {
return nullptr;
}
return _nodes.at(idx).get();
}
const node& topology::add_or_update_endpoint(host_id id, std::optional<endpoint_dc_rack> opt_dr, std::optional<node::state> opt_st, std::optional<shard_id> shard_count)
{
tlogger.trace("topology[{}]: add_or_update_endpoint: host_id={} dc={} rack={} state={} shards={}, at {}", fmt::ptr(this),
id, opt_dr.value_or(endpoint_dc_rack{}).dc, opt_dr.value_or(endpoint_dc_rack{}).rack, opt_st.value_or(node::state::none), shard_count,
lazy_backtrace());
auto* n = find_node(id);
if (n) {
update_node(*n, std::nullopt, std::move(opt_dr), std::move(opt_st), std::move(shard_count));
return *n;
}
return add_node(id,
opt_dr.value_or(endpoint_dc_rack::default_location),
opt_st.value_or(node::state::none),
shard_count.value_or(0));
}
bool topology::remove_endpoint(locator::host_id host_id)
{
auto node = find_node(host_id);
tlogger.debug("topology[{}]: remove_endpoint: host_id={}: {}", fmt::ptr(this), host_id, node_printer(node));
// Do not allow removing yourself from the topology
// The entry is needed for local writes
if (node && node != _this_node) {
remove_node(*node);
return true;
}
return false;
}
bool topology::has_node(host_id id) const noexcept {
auto node = find_node(id);
tlogger.trace("topology[{}]: has_node: host_id={}: {}", fmt::ptr(this), id, node_printer(node));
return bool(node);
}
const endpoint_dc_rack& topology::get_location_slow(host_id id) const {
// We should do the following check after lookup in nodes.
// In tests, there may be no config for local node, so fall back to get_location()
// only if no mapping is found. Otherwise, get_location() will return empty location
// from config or random node, neither of which is correct.
if (id == _cfg.this_host_id) {
return get_location();
}
throw std::runtime_error(format("Requested location for node {} not in topology. backtrace {}", id, lazy_backtrace()));
}
void topology::sort_by_proximity(locator::host_id address, host_id_vector_replica_set& addresses) const {
if (can_sort_by_proximity()) {
do_sort_by_proximity(address, addresses);
}
}
void topology::do_sort_by_proximity(locator::host_id address, host_id_vector_replica_set& addresses) const {
const auto& loc = get_location(address);
struct info {
locator::host_id id;
int distance;
};
auto host_infos = addresses | std::views::transform([&] (locator::host_id id) {
const auto& loc1 = get_location(id);
return info{id, distance(address, loc, id, loc1)};
}) | std::ranges::to<utils::small_vector<info, host_id_vector_replica_set::internal_capacity()>>();
std::ranges::sort(host_infos, std::ranges::less{}, std::mem_fn(&info::distance));
auto dst = addresses.begin();
auto it = host_infos.begin();
auto prev = it;
auto shuffler = _random_engine();
for (++it; it < host_infos.end(); ++it, ++dst) {
if (prev->distance == it->distance) {
if (shuffler & 1) {
std::swap(prev->id, it->id);
}
shuffler = std::rotr(shuffler, 1);
}
*dst = prev->id;
prev = it;
}
*dst = prev->id;
}
int topology::distance(const locator::host_id& address, const endpoint_dc_rack& loc, const locator::host_id& a1, const endpoint_dc_rack& loc1) noexcept {
// The farthest nodes from a given node are:
// 1. Nodes in other DCs then the reference node
// 2. Nodes in the other RACKs in the same DC as the reference node
// 3. Other nodes in the same DC/RACk as the reference node
int same_dc1 = loc1.dc == loc.dc;
int same_rack1 = same_dc1 & (loc1.rack == loc.rack);
int same_node1 = a1 == address;
int d1 = ((same_dc1 << 2) | (same_rack1 << 1) | same_node1) ^ 7;
return d1;
}
void topology::for_each_node(std::function<void(const node&)> func) const {
for (const auto& np : _nodes) {
if (np && !np->left() && !np->is_none()) {
func(*np);
}
}
}
std::unordered_set<std::reference_wrapper<const node>> topology::get_nodes() const {
std::unordered_set<std::reference_wrapper<const node>> nodes;
for (const auto& np : _nodes) {
if (np && !np->left() && !np->is_none()) {
nodes.insert(std::cref(*np.get()));
}
}
return nodes;
}
std::unordered_set<locator::host_id> topology::get_all_host_ids() const {
std::unordered_set<locator::host_id> ids;
for (const auto& np : _nodes) {
if (np && !np->left() && !np->is_none()) {
ids.insert(np->host_id());
}
}
return ids;
}
std::unordered_map<sstring, std::unordered_set<host_id>>
topology::get_datacenter_host_ids() const {
std::unordered_map<sstring, std::unordered_set<host_id>> ret;
for (auto& [dc, nodes] : _dc_nodes) {
ret[dc] = nodes | std::views::transform([] (const node& n) { return n.host_id(); }) | std::ranges::to<std::unordered_set>();
}
return ret;
}
void topology::seed_random_engine(random_engine_type::result_type value) {
_random_engine.seed(value);
}
} // namespace locator
namespace std {
std::ostream& operator<<(std::ostream& out, const locator::node& node) {
fmt::print(out, "{}", node);
return out;
}
std::ostream& operator<<(std::ostream& out, const locator::node::state& state) {
fmt::print(out, "{}", state);
return out;
}
} // namespace std