Files
scylladb/service/address_map.hh
Pavel Emelyanov 54edb44b20 code: Stop using seastar::compat::source_location
And switch to std::source_location.
Upcoming seastar update will deprecate its compatibility layer.

The patch is

  for f in $(git grep -l 'seastar::compat::source_location'); do
    sed -e 's/seastar::compat::source_location/std::source_location/g' -i $f;
  done

and removal of few header includes.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#27309
2025-11-27 19:10:11 +02:00

448 lines
16 KiB
C++

/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "utils/assert.hh"
#include "utils/replicator.hh"
#include "gms/inet_address.hh"
#include "gms/generation-number.hh"
#include <seastar/core/on_internal_error.hh>
#include <seastar/core/sharded.hh>
#include <seastar/util/log.hh>
#include <boost/intrusive/list.hpp>
#include <chrono>
#include "locator/host_id.hh"
namespace bi = boost::intrusive;
namespace db {
class system_keyspace;
}
namespace service {
extern seastar::logger rslog;
// This class provides an abstraction of expirable server address mappings
// used by the messaging service and raft rpc module to store host id to ip mapping
template <typename Clock>
class address_map_t : public peering_sharded_service<address_map_t<Clock>> {
// Expiring mappings stay in the cache for 1 hour (if not accessed during this time period)
static constexpr std::chrono::hours default_expiry_period{1};
static constexpr size_t initial_buckets_count = 16;
using clock_duration = typename Clock::duration;
using clock_time_point = typename Clock::time_point;
class expiring_entry_ptr;
public:
// Represents a change intent to a host mapping.
// Associative, but not commutative.
// Not all updates provide a generation, so we must merge updates in order.
// Expiry changes are not resolved using generation number, they rely on order.
struct entry_mutation {
// The address map's source of IP addresses is gossip,
// which can reorder events it delivers. It is therefore
// possible that we get an outdated IP address after
// the map has been updated with a new one, and revert
// the mapping to an incorrect one (see #14274). To
// protect against outdated information we mark each
// entry with its generation number, when available,
// and drop updates with outdated generations. 0 means
// there is no generation available - e.g. it's set when
// we load the persisted map state from system.peers at
// boot.
// The generation is only used to resolve addr changes.
gms::generation_type generation;
std::optional<gms::inet_address> addr;
// If engaged, indicates that this mutation changes the "expiring" status of the entry.
// If disengaged, the "expiring" status is not changed by the update.
// Updates of "expiring" status are always applied in order, last update wins.
std::optional<bool> expiring;
explicit operator bool() const {
return addr.has_value() || expiring.has_value();
}
// We rely on this to never throw for exception safety.
void apply(const entry_mutation& m) noexcept {
if (m.addr && (m.generation >= generation || !addr)) {
generation = m.generation;
addr = m.addr;
}
if (m.expiring) {
expiring = m.expiring;
}
}
};
using host_mutation = std::pair<locator::host_id, entry_mutation>;
using cluster_mutation = std::unordered_map<locator::host_id, entry_mutation>;
private:
// An `inet_address` optionally equipped with a pointer to an entry
// in LRU list of 'expiring entries'. If the pointer is set, it means that this
// `timestamped_entry` is expiring; the corresponding LRU list entry contains
// the last access time and we periodically delete elements from the LRU list
// when they become too old.
struct timestamped_entry {
entry_mutation _m;
std::unique_ptr<expiring_entry_ptr> _lru_entry;
explicit timestamped_entry(entry_mutation m)
: _m(std::move(m)), _lru_entry(nullptr)
{
}
bool expiring() const {
return _lru_entry != nullptr;
}
};
class expiring_entry_ptr : public bi::list_base_hook<> {
public:
// Base type for LRU list of expiring entries.
//
// When an entry is created with state, an
// entry in this list is created, holding a pointer to the base entry
// which contains the data.
//
// The LRU list is maintained in such a way that MRU (most recently used)
// entries are at the beginning of the list while LRU entries move to the
// end.
using list_type = bi::list<expiring_entry_ptr>;
explicit expiring_entry_ptr(list_type& l, const locator::host_id& entry_id)
: _expiring_list(l), _last_accessed(Clock::now()), _entry_id(entry_id)
{
_expiring_list.push_front(*this);
}
~expiring_entry_ptr() {
_expiring_list.erase(_expiring_list.iterator_to(*this));
}
// Update last access timestamp and move ourselves to the front of LRU list.
void touch() {
_last_accessed = Clock::now();
_expiring_list.erase(_expiring_list.iterator_to(*this));
_expiring_list.push_front(*this);
}
// Test whether the entry has expired or not given a base time point and
// an expiration period (the time period since the last access lies within
// the given expiration period time frame).
bool expired(clock_duration expiry_period) const {
auto last_access_delta = Clock::now() - _last_accessed;
return expiry_period < last_access_delta;
}
const locator::host_id& entry_id() {
return _entry_id;
}
private:
list_type& _expiring_list;
clock_time_point _last_accessed;
const locator::host_id& _entry_id;
};
using map_type = std::unordered_map<locator::host_id, timestamped_entry>;
using map_iterator = typename map_type::iterator;
using expiring_list_type = typename expiring_entry_ptr::list_type;
using expiring_list_iterator = typename expiring_list_type::iterator;
// LRU list to hold expiring entries.
//
// Marked as `mutable` since the `find` function, which should naturally
// be `const`, updates the entry's timestamp and thus requires
// non-const access.
mutable expiring_list_type _expiring_list;
// Container to hold address mappings (both permanent and expiring).
// Declared as `mutable` for the same reasons as `_expiring_list`.
//
// It's important that _map is declared after _expiring_list, so it's
// destroyed first: when we destroy _map, the LRU entries corresponding
// to expiring entries are also destroyed, which unlinks them from the list,
// so the list must still exist.
mutable map_type _map;
// Timer that executes the cleanup procedure to erase expired
// entries from the mappings container.
//
// Rearmed automatically in the following cases:
// * A new expiring entry is created
// * If there are still some expiring entries left in the LRU list after
// the cleanup is finished.
seastar::timer<Clock> _timer;
clock_duration _expiry_period;
struct replicator_impl;
// Engaged on shard 0
std::unique_ptr<replicator_impl> _replicator;
void drop_expired_entries(bool force = false) {
auto list_it = _expiring_list.rbegin();
while (list_it != _expiring_list.rend() && (list_it->expired(_expiry_period) || force)) {
// Remove from both LRU list and base storage
auto map_it = _map.find(list_it->entry_id());
if (map_it == _map.end()) {
on_internal_error(rslog, format(
"address_map::drop_expired_entries: missing entry with id {}", list_it->entry_id()));
}
_map.erase(map_it);
// Point at the oldest entry again
list_it = _expiring_list.rbegin();
}
if (!_expiring_list.empty()) {
// Rearm the timer in case there are still some expiring entries
_timer.arm(_expiry_period);
}
}
void add_expiring_entry(const locator::host_id& entry_id, timestamped_entry& entry) {
entry._lru_entry = std::make_unique<expiring_entry_ptr>(_expiring_list, entry_id);
if (!_timer.armed()) {
_timer.arm(_expiry_period);
}
}
void apply_to_all(locator::host_id id, entry_mutation m, std::source_location l = std::source_location::current());
public: // Used by replicator
static void prepare_apply(cluster_mutation& map, const host_mutation& mf) {
map.try_emplace(mf.first, entry_mutation{});
}
// Applies mf to map.
// Must not throw when called after prepare_apply(map, mf).
static void apply(cluster_mutation& map, const host_mutation& mf) {
auto [it, emplaced] = map.try_emplace(mf.first, mf.second);
if (!emplaced) {
it->second.apply(mf.second);
}
}
static void apply(cluster_mutation& dst, const cluster_mutation& src) {
for (auto&& e : src) {
apply(dst, e);
}
}
void on_replication_failed(std::exception_ptr e) {
rslog.warn("address map replication failed: {}", e);
}
// Strong exception guarantees: no side effects on exception.
void apply_locally(const host_mutation& mf, bool update_if_exists = true) {
auto&& [id, m] = mf;
if (!m) {
return;
}
auto [it, emplaced] = _map.try_emplace(id, timestamped_entry{m});
auto& entry = it->second;
if (emplaced) {
if (!m.expiring || *m.expiring) {
try {
add_expiring_entry(it->first, entry);
} catch (...) {
_map.erase(it);
throw;
}
}
} else if (update_if_exists) {
if (m.expiring) {
if (*m.expiring) {
// Do first, so that we don't leave side effects on exception
if (!entry.expiring()) {
add_expiring_entry(it->first, entry);
}
} else {
entry._lru_entry = nullptr;
}
}
entry._m.apply(m);
if (entry.expiring()) {
entry._lru_entry->touch();
}
}
}
// Weak exception guarantees: It's allowed to partially apply m.
void apply_locally(const cluster_mutation& m) {
for (auto&& e : m) {
apply_locally(e);
}
}
public:
address_map_t();
future<> stop();
// Resolves when all local updates replicate everywhere.
// Call on shard 0 only.
future<> barrier();
// Find a mapping with a given id.
//
// If a mapping is expiring, the last access timestamp is updated automatically.
std::optional<gms::inet_address> find(locator::host_id id) const {
auto it = _map.find(id);
if (it == _map.end()) {
return std::nullopt;
}
auto& entry = it->second;
if (entry.expiring()) {
// Touch the entry to update it's access timestamp and move it to the front of LRU list
entry._lru_entry->touch();
}
return entry._m.addr;
}
// Same as find() above but expects mapping to exist
gms::inet_address get(locator::host_id id) const {
try {
return find(id).value();
} catch (std::bad_optional_access& err) {
on_internal_error(rslog, fmt::format("No ip address for {} when one is expected", id));
}
}
// Find an id with a given mapping.
//
// If a mapping is expiring, the last access timestamp is updated automatically.
//
// The only purpose of this function is to allow passing IPs with the
// --ignore-dead-nodes parameter in raft_removenode and raft_replace. As this
// feature is deprecated, we should also remove this function when the
// deprecation period ends.
std::optional<locator::host_id> find_by_addr(gms::inet_address addr) const {
rslog.warn("Finding Raft nodes by IP addresses is deprecated. Please use Host IDs instead.");
if (addr == gms::inet_address{}) {
on_internal_error(rslog, "address_map::find_by_addr: called with an empty address");
}
auto it = std::find_if(_map.begin(), _map.end(), [&](auto&& mapping) {
return mapping.second._m.addr.value_or(gms::inet_address{}) == addr;
});
if (it == _map.end()) {
return std::nullopt;
}
auto& entry = it->second;
if (entry.expiring()) {
entry._lru_entry->touch();
}
return it->first;
}
// Convert an expiring entry to a non-expiring one, or
// insert a new non-expiring entry if the entry is missing.
// Called on Raft configuration changes to mark the new
// member of Raft configuration as a non-temporary member
// of the address map. The configuration member may be
// lacking an IP address but it will be added later.
// Can only be called on shard 0.
// The expiring state is replicated to other shards.
void set_nonexpiring(locator::host_id id) {
apply_to_all(id, entry_mutation{ .expiring = false });
}
// Convert a non-expiring entry to an expiring one,
// eventually erasing it from the mapping. Never inserts an
// entry if it doesn't exist.
// Can be called only on shard 0.
// The expiring state is replicated to other shards.
void set_expiring(locator::host_id id) {
apply_to_all(id, entry_mutation{ .expiring = true });
}
// Insert a new mapping with an IP address if it doesn't
// exist yet. Creates a mapping only on the current shard. Doesn't
// update the mapping if it already exists.
// The purpose of this function is to cache an IP address
// on a local shard while gossiper messages are still
// arriving.
// Used primarily from Raft RPC to speed up Raft at boot.
void opt_add_entry(locator::host_id id, gms::inet_address addr) {
if (addr == gms::inet_address{}) {
on_internal_error(rslog, format("IP address missing for {}", id));
}
apply_locally(std::make_pair(id, entry_mutation{ .addr = addr }), false);
}
// Insert or update entry with a new IP address on all shards.
// Used when we get a gossip notification about a node IP
// address. Overrides the current IP address if present,
// as long as the generation of the new entry is greater.
// If no entry is present, creates an expiring entry - there
// must be a separate Raft configuration change event (@sa
// set_nonexpiring()) to mark the entry as non expiring.
void add_or_update_entry(locator::host_id id, gms::inet_address addr,
gms::generation_type generation_number = {}) {
if (addr == gms::inet_address{}) {
on_internal_error(rslog, format("IP address missing for {}", id));
}
apply_to_all(id, entry_mutation{
.generation = generation_number,
.addr = addr,
});
}
// Drop all expiring entries immediately, without waiting for expiry.
// Used for testing
void force_drop_expiring_entries() {
drop_expired_entries(true);
}
};
template <typename Clock>
struct address_map_t<Clock>::replicator_impl : public replicator<typename address_map_t<Clock>::cluster_mutation, address_map_t<Clock>> {
using replicator<typename address_map_t<Clock>::cluster_mutation, address_map_t<Clock>>::replicator;
};
template <typename Clock>
address_map_t<Clock>::address_map_t()
: _map(initial_buckets_count)
, _timer([this] { drop_expired_entries(); })
, _expiry_period(default_expiry_period)
{
if (this_shard_id() == 0) {
_replicator = std::make_unique<replicator_impl>(*this);
}
}
template <typename Clock>
future<> address_map_t<Clock>::stop() {
if (_replicator) {
co_await _replicator->stop();
}
}
template <typename Clock>
future<> address_map_t<Clock>::barrier() {
if (this_shard_id() != 0) {
on_internal_error(rslog, "barrier() must be called on shard 0");
}
return _replicator->barrier();
}
template <typename Clock>
void address_map_t<Clock>::apply_to_all(locator::host_id id, entry_mutation m, std::source_location l) {
if (this_shard_id() != 0) {
on_internal_error(rslog, format("address_map::{}() called on shard {} != 0", l.function_name(), this_shard_id()));
}
_replicator->apply_to_all(std::make_pair(id, m));
}
} // end of namespace service