Merge 'hinted handoff: Use host IDs instead of IPs in the module' from Dawid Mędrek
This pull request introduces host ID in the Hinted Handoff module. Nodes are now identified by their host IDs instead of their IPs. The conversion occurs on the boundary between the module and `storage_proxy.hh`, but aside from that, IPs have been erased. The changes take into considerations that there might still be old hints, still identified by IPs, on disk – at start-up, we map them to host IDs if it's possible so that they're not lost. Refs scylladb/scylladb#6403 Fixes scylladb/scylladb#12278 Closes scylladb/scylladb#15567 * github.com:scylladb/scylladb: docs: Update Hinted Handoff documentation db/hints: Add endpoint_downtime_not_bigger_than() db/hints: Migrate hinted handoff when cluster feature is enabled db/hints: Handle arbitrary directories in resource manager db/hints: Start using hint_directory_manager db/hints: Enforce providing IP in get_ep_manager() db/hints: Introduce hint_directory_manager db/hints/resource_manager: Update function description db/hints: Coroutinize space_watchdog::scan_one_ep_dir() db/hints: Expose update lock of space watchdog db/hints: Add function for migrating hint directories to host ID db/hints: Take both IP and host ID when storing hints db/hints: Prepare initializing endpoint managers for migrating from IP to host ID db/hints: Migrate to locator::host_id db/hints: Remove noexcept in do_send_one_mutation() service: Add locator::host_id to on_leave_cluster service: Fix indentation db/hints: Fix indentation
This commit is contained in:
@@ -28,7 +28,7 @@ host_filter::host_filter(std::unordered_set<sstring> allowed_dcs)
|
||||
, _dcs(std::move(allowed_dcs)) {
|
||||
}
|
||||
|
||||
bool host_filter::can_hint_for(const locator::topology& topo, gms::inet_address ep) const {
|
||||
bool host_filter::can_hint_for(const locator::topology& topo, endpoint_id ep) const {
|
||||
switch (_enabled_kind) {
|
||||
case enabled_kind::enabled_for_all:
|
||||
return true;
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "seastarx.hh"
|
||||
#include "db/hints/internal/common.hh"
|
||||
|
||||
namespace gms {
|
||||
class inet_address;
|
||||
@@ -28,6 +29,8 @@ namespace hints {
|
||||
// host_filter tells hints_manager towards which endpoints it is allowed to generate hints.
|
||||
class host_filter final {
|
||||
private:
|
||||
using endpoint_id = internal::endpoint_id;
|
||||
|
||||
enum class enabled_kind {
|
||||
enabled_for_all,
|
||||
enabled_selectively,
|
||||
@@ -58,7 +61,7 @@ public:
|
||||
// Parses hint filtering configuration from a list of DCs.
|
||||
static host_filter parse_from_dc_list(sstring opt);
|
||||
|
||||
bool can_hint_for(const locator::topology& topo, gms::inet_address ep) const;
|
||||
bool can_hint_for(const locator::topology& topo, endpoint_id ep) const;
|
||||
|
||||
inline const std::unordered_set<sstring>& get_dcs() const {
|
||||
return _dcs;
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
#include <seastar/util/bool_class.hh>
|
||||
|
||||
// Scylla includes.
|
||||
#include "gms/inet_address.hh"
|
||||
#include "locator/host_id.hh"
|
||||
|
||||
// STD.
|
||||
#include <cstdint>
|
||||
@@ -21,7 +21,7 @@ namespace db::hints {
|
||||
namespace internal {
|
||||
|
||||
/// Type identifying the host a specific subset of hints should be sent to.
|
||||
using endpoint_id = gms::inet_address;
|
||||
using endpoint_id = locator::host_id;
|
||||
|
||||
/// Tag specifying if hint sending should enter the so-called "drain mode".
|
||||
/// If it should, that means that if a failure while sending a hint occurs,
|
||||
|
||||
@@ -119,13 +119,13 @@ future<> hint_endpoint_manager::stop(drain should_drain) noexcept {
|
||||
});
|
||||
}
|
||||
|
||||
hint_endpoint_manager::hint_endpoint_manager(const endpoint_id& key, manager& shard_manager)
|
||||
hint_endpoint_manager::hint_endpoint_manager(const endpoint_id& key, fs::path hint_directory, manager& shard_manager)
|
||||
: _key(key)
|
||||
, _shard_manager(shard_manager)
|
||||
, _file_update_mutex_ptr(make_lw_shared<seastar::shared_mutex>())
|
||||
, _file_update_mutex(*_file_update_mutex_ptr)
|
||||
, _state(state_set::of<state::stopped>())
|
||||
, _hints_dir(_shard_manager.hints_dir() / format("{}", _key).c_str())
|
||||
, _hints_dir(std::move(hint_directory))
|
||||
// Approximate the position of the last written hint by using the same formula as for segment id calculation in commitlog
|
||||
// TODO: Should this logic be deduplicated with what is in the commitlog?
|
||||
, _last_written_rp(this_shard_id(), std::chrono::duration_cast<std::chrono::milliseconds>(runtime::get_boot_time().time_since_epoch()).count())
|
||||
|
||||
@@ -68,7 +68,7 @@ private:
|
||||
hint_sender _sender;
|
||||
|
||||
public:
|
||||
hint_endpoint_manager(const endpoint_id& key, manager& shard_manager);
|
||||
hint_endpoint_manager(const endpoint_id& key, std::filesystem::path hint_directory, manager& shard_manager);
|
||||
hint_endpoint_manager(hint_endpoint_manager&&);
|
||||
~hint_endpoint_manager();
|
||||
|
||||
|
||||
@@ -76,13 +76,16 @@ future<timespec> hint_sender::get_last_file_modification(const sstring& fname) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> hint_sender::do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints) noexcept {
|
||||
future<> hint_sender::do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints) {
|
||||
return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints] () mutable -> future<> {
|
||||
// The fact that we send with CL::ALL in both cases below ensures that new hints are not going
|
||||
// to be generated as a result of hints sending.
|
||||
if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) {
|
||||
const auto& tm = ermp->get_token_metadata();
|
||||
const auto maybe_addr = tm.get_endpoint_for_host_id_if_known(end_point_key());
|
||||
|
||||
if (maybe_addr && boost::range::find(natural_endpoints, *maybe_addr) != natural_endpoints.end()) {
|
||||
manager_logger.trace("Sending directly to {}", end_point_key());
|
||||
return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), end_point_key());
|
||||
return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), *maybe_addr);
|
||||
} else {
|
||||
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
|
||||
return _proxy.send_hint_to_all_replicas(std::move(m));
|
||||
@@ -95,17 +98,30 @@ bool hint_sender::can_send() noexcept {
|
||||
return false;
|
||||
}
|
||||
|
||||
const auto tmptr = _shard_manager._proxy.get_token_metadata_ptr();
|
||||
const auto maybe_ep = std::invoke([&] () noexcept -> std::optional<gms::inet_address> {
|
||||
try {
|
||||
return tmptr->get_endpoint_for_host_id_if_known(_ep_key);
|
||||
} catch (...) {
|
||||
return std::nullopt;
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
if (_gossiper.is_alive(end_point_key())) {
|
||||
// `hint_sender` can never target this node, so if the returned optional is empty,
|
||||
// that must mean the current locator::token_metadata doesn't store the information
|
||||
// about the target node.
|
||||
if (maybe_ep && _gossiper.is_alive(*maybe_ep)) {
|
||||
_state.remove(state::ep_state_left_the_ring);
|
||||
return true;
|
||||
} else {
|
||||
if (!_state.contains(state::ep_state_left_the_ring)) {
|
||||
const auto& tm = _shard_manager.local_db().get_token_metadata();
|
||||
const auto host_id = tm.get_host_id_if_known(end_point_key());
|
||||
_state.set_if<state::ep_state_left_the_ring>(!host_id || !tm.is_normal_token_owner(*host_id));
|
||||
_state.set_if<state::ep_state_left_the_ring>(!tmptr->is_normal_token_owner(_ep_key));
|
||||
}
|
||||
// send the hints out if the destination Node is part of the ring - we will send to all new replicas in this case
|
||||
// If the node is not part of the ring, we will send hints to all new replicas.
|
||||
// Note that if the optional -- `maybe_ep` -- is empty, that could mean that `_ep_key`
|
||||
// is the locator::host_id of THIS node. However, that's impossible because instances
|
||||
// of `hint_sender` are only created for OTHER nodes, so this logic is correct.
|
||||
return _state.contains(state::ep_state_left_the_ring);
|
||||
}
|
||||
} catch (...) {
|
||||
|
||||
@@ -242,7 +242,7 @@ private:
|
||||
/// \param ermp points to the effective_replication_map used to obtain \c natural_endpoints
|
||||
/// \param natural_endpoints current replicas for the given mutation
|
||||
/// \return future that resolves when the operation is complete
|
||||
future<> do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints) noexcept;
|
||||
future<> do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints);
|
||||
|
||||
/// \brief Send one mutation out.
|
||||
///
|
||||
|
||||
@@ -90,7 +90,7 @@ future<hints_segments_map> get_current_hints_segments(const fs::path& hint_direc
|
||||
return lister::scan_dir(dir / de.name, lister::dir_entry_types::of<directory_entry_type::directory>(),
|
||||
[¤t_hint_segments, shard_id] (fs::path dir, directory_entry de) {
|
||||
manager_logger.trace("\tIP: {}", de.name);
|
||||
|
||||
|
||||
// Hint files.
|
||||
return lister::scan_dir(dir / de.name, lister::dir_entry_types::of<directory_entry_type::regular>(),
|
||||
[¤t_hint_segments, shard_id, ep = de.name] (fs::path dir, directory_entry de) {
|
||||
@@ -255,7 +255,7 @@ future<> remove_irrelevant_shards_directories(const fs::path& hint_directory) {
|
||||
lister::show_hidden::yes, [] (fs::path dir, directory_entry de) {
|
||||
return io_check(remove_file, (dir / de.name).native());
|
||||
});
|
||||
|
||||
|
||||
co_await io_check(remove_file, (dir / de.name).native());
|
||||
}
|
||||
});
|
||||
@@ -275,5 +275,76 @@ future<> rebalance_hints(fs::path hint_directory) {
|
||||
co_await remove_irrelevant_shards_directories(hint_directory);
|
||||
}
|
||||
|
||||
std::pair<locator::host_id, gms::inet_address> hint_directory_manager::insert_mapping(const locator::host_id& host_id,
|
||||
const gms::inet_address& ip)
|
||||
{
|
||||
const auto maybe_mapping = get_mapping(host_id, ip);
|
||||
if (maybe_mapping) {
|
||||
return *maybe_mapping;
|
||||
}
|
||||
|
||||
|
||||
_mappings.emplace(host_id, ip);
|
||||
return std::make_pair(host_id, ip);
|
||||
}
|
||||
|
||||
std::optional<gms::inet_address> hint_directory_manager::get_mapping(const locator::host_id& host_id) const noexcept {
|
||||
auto it = _mappings.find(host_id);
|
||||
if (it != _mappings.end()) {
|
||||
return it->second;
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
std::optional<locator::host_id> hint_directory_manager::get_mapping(const gms::inet_address& ip) const noexcept {
|
||||
for (const auto& [host_id, ep] : _mappings) {
|
||||
if (ep == ip) {
|
||||
return host_id;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
std::optional<std::pair<locator::host_id, gms::inet_address>> hint_directory_manager::get_mapping(
|
||||
const locator::host_id& host_id, const gms::inet_address& ip) const noexcept
|
||||
{
|
||||
for (const auto& [hid, ep] : _mappings) {
|
||||
if (hid == host_id || ep == ip) {
|
||||
return std::make_pair(hid, ep);
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
void hint_directory_manager::remove_mapping(const locator::host_id& host_id) noexcept {
|
||||
_mappings.erase(host_id);
|
||||
}
|
||||
|
||||
void hint_directory_manager::remove_mapping(const gms::inet_address& ip) noexcept {
|
||||
for (const auto& [host_id, ep] : _mappings) {
|
||||
if (ep == ip) {
|
||||
_mappings.erase(host_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool hint_directory_manager::has_mapping(const locator::host_id& host_id) const noexcept {
|
||||
return _mappings.contains(host_id);
|
||||
}
|
||||
|
||||
bool hint_directory_manager::has_mapping(const gms::inet_address& ip) const noexcept {
|
||||
for (const auto& [_, ep] : _mappings) {
|
||||
if (ip == ep) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void hint_directory_manager::clear() noexcept {
|
||||
_mappings.clear();
|
||||
}
|
||||
|
||||
} // namespace internal
|
||||
} // namespace db::hints
|
||||
|
||||
@@ -46,5 +46,41 @@ using hint_entry_reader = commitlog_entry_reader;
|
||||
/// \return A future that resolves when the operation is complete.
|
||||
future<> rebalance_hints(std::filesystem::path hint_directory);
|
||||
|
||||
class hint_directory_manager {
|
||||
private:
|
||||
std::map<locator::host_id, gms::inet_address> _mappings;
|
||||
|
||||
public:
|
||||
// Inserts a new mapping and returns it.
|
||||
// If either the host ID or the IP is already in the map, the function inserts nothings
|
||||
// and returns the existing mapping instead.
|
||||
std::pair<locator::host_id, gms::inet_address> insert_mapping(const locator::host_id& host_id,
|
||||
const gms::inet_address& ip);
|
||||
|
||||
// Returns the corresponding IP for a given host ID if a mapping is present in the directory manager.
|
||||
// Otherwise, an empty optional is returned.
|
||||
[[nodiscard]] std::optional<gms::inet_address> get_mapping(const locator::host_id& host_id) const noexcept;
|
||||
|
||||
// Returns the corresponding host ID for a given IP if a mapping is present in the directory manager.
|
||||
// Otherwise, an empty optional is returned.
|
||||
[[nodiscard]] std::optional<locator::host_id> get_mapping(const gms::inet_address& ip) const noexcept;
|
||||
|
||||
// Returns a mapping corresponding to either the passed host ID, or the passed IP if the mapping exists.
|
||||
// Otherwise, an empty optional is returned.
|
||||
[[nodiscard]] std::optional<std::pair<locator::host_id, gms::inet_address>> get_mapping(
|
||||
const locator::host_id& host_id, const gms::inet_address& ip) const noexcept;
|
||||
|
||||
// Removes a mapping corresponding to the passed host ID if the mapping exists.
|
||||
void remove_mapping(const locator::host_id& host_id) noexcept;
|
||||
// Removes a mapping corresponding to the passed IP if the mapping exists.
|
||||
void remove_mapping(const gms::inet_address& ip) noexcept;
|
||||
|
||||
bool has_mapping(const locator::host_id& host_id) const noexcept;
|
||||
bool has_mapping(const gms::inet_address& ip) const noexcept;
|
||||
|
||||
// Removes all of the mappings.
|
||||
void clear() noexcept;
|
||||
};
|
||||
|
||||
} // namespace internal
|
||||
} // namespace db::hints
|
||||
|
||||
@@ -14,8 +14,11 @@
|
||||
// Seastar features.
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/file-types.hh>
|
||||
#include <seastar/core/file.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
@@ -28,11 +31,18 @@
|
||||
|
||||
// Scylla includes.
|
||||
#include "db/hints/internal/hint_logger.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "gms/endpoint_state.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "utils/directories.hh"
|
||||
#include "utils/disk-error-handler.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/lister.hh"
|
||||
#include "seastarx.hh"
|
||||
@@ -40,6 +50,8 @@
|
||||
// STD.
|
||||
#include <algorithm>
|
||||
#include <exception>
|
||||
#include <shared_mutex>
|
||||
#include <variant>
|
||||
|
||||
namespace db::hints {
|
||||
|
||||
@@ -72,13 +84,13 @@ public:
|
||||
}
|
||||
|
||||
const auto units = co_await seastar::get_units(_lock, 1);
|
||||
|
||||
|
||||
utils::directories::set dir_set;
|
||||
dir_set.add_sharded(_hints_directory);
|
||||
|
||||
|
||||
manager_logger.debug("Creating and validating hint directories: {}", _hints_directory);
|
||||
co_await _dirs.create_and_verify(std::move(dir_set));
|
||||
|
||||
|
||||
_state = state::created_and_validated;
|
||||
}
|
||||
|
||||
@@ -92,7 +104,7 @@ public:
|
||||
}
|
||||
|
||||
const auto units = co_await seastar::get_units(_lock, 1);
|
||||
|
||||
|
||||
manager_logger.debug("Rebalancing hints in {}", _hints_directory);
|
||||
co_await rebalance_hints(fs::path{_hints_directory});
|
||||
|
||||
@@ -171,7 +183,7 @@ void manager::register_metrics(const sstring& group_name) {
|
||||
sm::make_counter("corrupted_files", _stats.corrupted_files,
|
||||
sm::description("Number of hints files that were discarded during sending because the file was corrupted.")),
|
||||
|
||||
sm::make_gauge("pending_drains",
|
||||
sm::make_gauge("pending_drains",
|
||||
sm::description("Number of tasks waiting in the queue for draining hints"),
|
||||
[this] { return _drain_lock.waiters(); }),
|
||||
|
||||
@@ -184,20 +196,21 @@ void manager::register_metrics(const sstring& group_name) {
|
||||
future<> manager::start(shared_ptr<gms::gossiper> gossiper_ptr) {
|
||||
_gossiper_anchor = std::move(gossiper_ptr);
|
||||
|
||||
if (_proxy.features().host_id_based_hinted_handoff) {
|
||||
_uses_host_id = true;
|
||||
co_await migrate_ip_directories();
|
||||
}
|
||||
|
||||
co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of<directory_entry_type::directory>(),
|
||||
[this] (fs::path datadir, directory_entry de) {
|
||||
endpoint_id ep = endpoint_id{de.name};
|
||||
co_await initialize_endpoint_managers();
|
||||
|
||||
if (!check_dc_for(ep)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return get_ep_manager(ep).populate_segments_to_replay();
|
||||
});
|
||||
|
||||
co_await compute_hints_dir_device_id();
|
||||
set_started();
|
||||
|
||||
if (!_uses_host_id) {
|
||||
_migration_callback = _proxy.features().host_id_based_hinted_handoff.when_enabled([this] {
|
||||
_migrating_done = perform_migration();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<> manager::stop() {
|
||||
@@ -205,11 +218,14 @@ future<> manager::stop() {
|
||||
|
||||
set_stopping();
|
||||
|
||||
return _draining_eps_gate.close().finally([this] {
|
||||
return _migrating_done.finally([this] {
|
||||
return _draining_eps_gate.close();
|
||||
}).finally([this] {
|
||||
return parallel_for_each(_ep_managers | boost::adaptors::map_values, [] (hint_endpoint_manager& ep_man) {
|
||||
return ep_man.stop();
|
||||
}).finally([this] {
|
||||
_ep_managers.clear();
|
||||
_hint_directory_manager.clear();
|
||||
manager_logger.info("Shard hint manager has stopped");
|
||||
});
|
||||
});
|
||||
@@ -238,10 +254,9 @@ void manager::forbid_hints() {
|
||||
}
|
||||
|
||||
void manager::forbid_hints_for_eps_with_pending_hints() {
|
||||
manager_logger.trace("space_watchdog: Going to block hints to: {}", _eps_with_pending_hints);
|
||||
|
||||
for (auto& [_, ep_man] : _ep_managers) {
|
||||
if (has_ep_with_pending_hints(ep_man.end_point_key())) {
|
||||
for (auto& [host_id, ep_man] : _ep_managers) {
|
||||
const auto ip = *_hint_directory_manager.get_mapping(host_id);
|
||||
if (has_ep_with_pending_hints(host_id) || has_ep_with_pending_hints(ip)) {
|
||||
ep_man.forbid_hints();
|
||||
} else {
|
||||
ep_man.allow_hints();
|
||||
@@ -249,14 +264,21 @@ void manager::forbid_hints_for_eps_with_pending_hints() {
|
||||
}
|
||||
}
|
||||
|
||||
sync_point::shard_rps manager::calculate_current_sync_point(std::span<const endpoint_id> target_eps) const {
|
||||
sync_point::shard_rps manager::calculate_current_sync_point(std::span<const gms::inet_address> target_eps) const {
|
||||
sync_point::shard_rps rps;
|
||||
const auto tmptr = _proxy.get_token_metadata_ptr();
|
||||
|
||||
for (auto addr : target_eps) {
|
||||
auto it = _ep_managers.find(addr);
|
||||
const auto hid = tmptr->get_host_id_if_known(addr);
|
||||
// Ignore the IPs that we cannot map.
|
||||
if (!hid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto it = _ep_managers.find(*hid);
|
||||
if (it != _ep_managers.end()) {
|
||||
const hint_endpoint_manager& ep_man = it->second;
|
||||
rps[ep_man.end_point_key()] = ep_man.last_written_replay_position();
|
||||
rps[addr] = ep_man.last_written_replay_position();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,6 +286,19 @@ sync_point::shard_rps manager::calculate_current_sync_point(std::span<const endp
|
||||
}
|
||||
|
||||
future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_rps& rps) {
|
||||
using lock_type = std::shared_lock<seastar::shared_mutex>;
|
||||
// Prevent the migration to host-ID-based hinted handoff until this function finishes its execution.
|
||||
const auto shared_lock = co_await std::invoke(coroutine::lambda([&] () -> future<lock_type> {
|
||||
co_await _migration_mutex.lock_shared();
|
||||
|
||||
try {
|
||||
co_return lock_type{_migration_mutex, std::adopt_lock_t{}};
|
||||
} catch (...) {
|
||||
_migration_mutex.unlock_shared();
|
||||
throw;
|
||||
}
|
||||
}));
|
||||
|
||||
abort_source local_as;
|
||||
|
||||
auto sub = as.subscribe([&local_as] () noexcept {
|
||||
@@ -276,27 +311,39 @@ future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_
|
||||
local_as.request_abort();
|
||||
}
|
||||
|
||||
const auto tmptr = _proxy.get_token_metadata_ptr();
|
||||
std::unordered_map<endpoint_id, replay_position> hid_rps{};
|
||||
hid_rps.reserve(rps.size());
|
||||
|
||||
for (const auto& [addr, rp] : rps) {
|
||||
const auto maybe_hid = tmptr->get_host_id_if_known(addr);
|
||||
// Ignore the IPs we cannot map.
|
||||
if (maybe_hid) [[likely]] {
|
||||
hid_rps.emplace(*maybe_hid, rp);
|
||||
}
|
||||
}
|
||||
|
||||
bool was_aborted = false;
|
||||
co_await coroutine::parallel_for_each(_ep_managers,
|
||||
coroutine::lambda([&rps, &local_as, &was_aborted] (auto& pair) -> future<> {
|
||||
coroutine::lambda([&hid_rps, &local_as, &was_aborted] (auto& pair) -> future<> {
|
||||
auto& [ep, ep_man] = pair;
|
||||
|
||||
// When `rps` doesn't specify a replay position for a given endpoint, we use
|
||||
// When `hid_rps` doesn't specify a replay position for a given endpoint, we use
|
||||
// its default value. Normally, it should be equal to returning a ready future here.
|
||||
// However, foreign segments (i.e. segments that were moved from another shard at start-up)
|
||||
// are treated differently from "regular" segments -- we can think of their replay positions
|
||||
// as equal to negative infinity or simply smaller from any other replay position, which
|
||||
// also includes the default value. Because of that, we don't have a choice -- we have to
|
||||
// pass either rps[ep] or the default replay position to the endpoint manager because
|
||||
// pass either hid_rps[ep] or the default replay position to the endpoint manager because
|
||||
// some hints MIGHT need to be sent.
|
||||
const replay_position rp = [&] {
|
||||
auto it = rps.find(ep);
|
||||
if (it == rps.end()) {
|
||||
auto it = hid_rps.find(ep);
|
||||
if (it == hid_rps.end()) {
|
||||
return replay_position{};
|
||||
}
|
||||
return it->second;
|
||||
} ();
|
||||
|
||||
|
||||
try {
|
||||
co_await ep_man.wait_until_hints_are_replayed_up_to(local_as, rp);
|
||||
} catch (abort_requested_exception&) {
|
||||
@@ -312,55 +359,132 @@ future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_
|
||||
}
|
||||
}
|
||||
|
||||
hint_endpoint_manager& manager::get_ep_manager(endpoint_id ep) {
|
||||
auto [it, emplaced] = _ep_managers.try_emplace(ep, ep, *this);
|
||||
hint_endpoint_manager& ep_man = it->second;
|
||||
hint_endpoint_manager& manager::get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip) {
|
||||
// If this is enabled, we can't rely on the information obtained from `_hint_directory_manager`.
|
||||
if (_uses_host_id) {
|
||||
if (auto it = _ep_managers.find(host_id); it != _ep_managers.end()) {
|
||||
return it->second;
|
||||
}
|
||||
} else {
|
||||
if (const auto maybe_mapping = _hint_directory_manager.get_mapping(host_id, ip)) {
|
||||
return _ep_managers.at(maybe_mapping->first);
|
||||
}
|
||||
|
||||
if (emplaced) {
|
||||
manager_logger.trace("Created an endpoint manager for {}", ep);
|
||||
ep_man.start();
|
||||
// If there is no mapping in `_hint_directory_manager` corresponding to either `host_id`, or `ip`,
|
||||
// we need to create a new endpoint manager.
|
||||
_hint_directory_manager.insert_mapping(host_id, ip);
|
||||
}
|
||||
|
||||
return ep_man;
|
||||
try {
|
||||
const auto hint_directory = std::invoke([&] () -> std::filesystem::path {
|
||||
if (_uses_host_id) {
|
||||
return hints_dir() / host_id.to_sstring();
|
||||
} else {
|
||||
return hints_dir() / ip.to_sstring();
|
||||
}
|
||||
});
|
||||
|
||||
auto [it, _] = _ep_managers.emplace(host_id, hint_endpoint_manager{host_id, std::move(hint_directory), *this});
|
||||
hint_endpoint_manager& ep_man = it->second;
|
||||
|
||||
manager_logger.trace("Created an endpoint manager for {}", host_id);
|
||||
ep_man.start();
|
||||
|
||||
return ep_man;
|
||||
} catch (...) {
|
||||
manager_logger.warn("Starting a hint endpoint manager {}/{} has failed", host_id, ip);
|
||||
_hint_directory_manager.remove_mapping(host_id);
|
||||
_ep_managers.erase(host_id);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
bool manager::have_ep_manager(endpoint_id ep) const noexcept {
|
||||
return _ep_managers.contains(ep);
|
||||
bool manager::have_ep_manager(const std::variant<locator::host_id, gms::inet_address>& ep) const noexcept {
|
||||
if (std::holds_alternative<locator::host_id>(ep)) {
|
||||
return _ep_managers.contains(std::get<locator::host_id>(ep));
|
||||
}
|
||||
return _hint_directory_manager.has_mapping(std::get<gms::inet_address>(ep));
|
||||
}
|
||||
|
||||
bool manager::store_hint(endpoint_id ep, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
|
||||
bool manager::store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
|
||||
tracing::trace_state_ptr tr_state) noexcept
|
||||
{
|
||||
if (stopping() || draining_all() || !started() || !can_hint_for(ep)) {
|
||||
manager_logger.trace("Can't store a hint to {}", ep);
|
||||
if (stopping() || draining_all() || !started() || !can_hint_for(host_id)) {
|
||||
manager_logger.trace("Can't store a hint to {}", host_id);
|
||||
++_stats.dropped;
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
manager_logger.trace("Going to store a hint to {}", ep);
|
||||
tracing::trace(tr_state, "Going to store a hint to {}", ep);
|
||||
manager_logger.trace("Going to store a hint to {}", host_id);
|
||||
tracing::trace(tr_state, "Going to store a hint to {}", host_id);
|
||||
|
||||
return get_ep_manager(ep).store_hint(std::move(s), std::move(fm), tr_state);
|
||||
return get_ep_manager(host_id, ip).store_hint(std::move(s), std::move(fm), tr_state);
|
||||
} catch (...) {
|
||||
manager_logger.trace("Failed to store a hint to {}: {}", ep, std::current_exception());
|
||||
tracing::trace(tr_state, "Failed to store a hint to {}: {}", ep, std::current_exception());
|
||||
manager_logger.trace("Failed to store a hint to {}: {}", host_id, std::current_exception());
|
||||
tracing::trace(tr_state, "Failed to store a hint to {}: {}", host_id, std::current_exception());
|
||||
|
||||
++_stats.errors;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if there is a node corresponding to a given host ID that hasn't been down for longer
|
||||
/// than a given amount of time. The function relies on information obtained from the passed `gms::gossiper`.
|
||||
static bool endpoint_downtime_not_bigger_than(const gms::gossiper& gossiper, const locator::host_id& host_id,
|
||||
uint64_t max_downtime_us)
|
||||
{
|
||||
// We want to enforce small buffer optimization in the call
|
||||
// to `gms::gossiper::for_each_endpoint_state_until()` below
|
||||
// to avoid an unnecessary allocation.
|
||||
// Since we need all these four pieces of information in the lambda,
|
||||
// the function object passed to the function might be too big.
|
||||
// That's why we create it locally on the stack and only pass a reference to it.
|
||||
struct sbo_info {
|
||||
locator::host_id host_id;
|
||||
const gms::gossiper& gossiper;
|
||||
int64_t max_hint_window_us;
|
||||
bool small_node_downtime;
|
||||
};
|
||||
|
||||
sbo_info info {
|
||||
.host_id = host_id,
|
||||
.gossiper = gossiper,
|
||||
.max_hint_window_us = max_downtime_us,
|
||||
.small_node_downtime = false
|
||||
};
|
||||
|
||||
gossiper.for_each_endpoint_state_until(
|
||||
[&info] (const gms::inet_address& ip, const gms::endpoint_state& state) {
|
||||
const auto app_state = state.get_application_state_ptr(gms::application_state::HOST_ID);
|
||||
const auto host_id = locator::host_id{utils::UUID{app_state->value()}};
|
||||
if (!app_state || host_id != info.host_id) {
|
||||
return stop_iteration::no;
|
||||
}
|
||||
if (info.gossiper.get_endpoint_downtime(ip) <= info.max_hint_window_us) {
|
||||
info.small_node_downtime = true;
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
return stop_iteration::no;
|
||||
});
|
||||
|
||||
return info.small_node_downtime;
|
||||
}
|
||||
|
||||
bool manager::too_many_in_flight_hints_for(endpoint_id ep) const noexcept {
|
||||
// There is no need to check the DC here because if there is an in-flight hint for this
|
||||
// endpoint, then this means that its DC has already been checked and found to be ok.
|
||||
return _stats.size_of_hints_in_progress > MAX_SIZE_OF_HINTS_IN_PROGRESS
|
||||
&& !_proxy.local_db().get_token_metadata().get_topology().is_me(ep)
|
||||
&& hints_in_progress_for(ep) > 0
|
||||
&& local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us;
|
||||
&& endpoint_downtime_not_bigger_than(local_gossiper(), ep, _max_hint_window_us);
|
||||
}
|
||||
|
||||
bool manager::can_hint_for(endpoint_id ep) const noexcept {
|
||||
if (_state.contains(state::migrating)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_proxy.local_db().get_token_metadata().get_topology().is_me(ep)) {
|
||||
return false;
|
||||
}
|
||||
@@ -388,10 +512,9 @@ bool manager::can_hint_for(endpoint_id ep) const noexcept {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if the endpoint has been down for too long.
|
||||
const auto ep_downtime = local_gossiper().get_endpoint_downtime(ep);
|
||||
if (ep_downtime > _max_hint_window_us) {
|
||||
manager_logger.trace("{} has been down for {}, not hinting", ep, ep_downtime);
|
||||
const bool node_is_alive = endpoint_downtime_not_bigger_than(local_gossiper(), ep, _max_hint_window_us);
|
||||
if (!node_is_alive) {
|
||||
manager_logger.trace("{} has been down for too long, not hinting", ep);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -420,18 +543,39 @@ future<> manager::change_host_filter(host_filter filter) {
|
||||
std::exception_ptr eptr = nullptr;
|
||||
|
||||
try {
|
||||
const auto tmptr = _proxy.get_token_metadata_ptr();
|
||||
|
||||
// Iterate over existing hint directories and see if we can enable an endpoint manager
|
||||
// for some of them
|
||||
co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of<directory_entry_type::directory>(),
|
||||
[this] (fs::path datadir, directory_entry de) {
|
||||
const endpoint_id ep = endpoint_id{de.name};
|
||||
[&] (fs::path datadir, directory_entry de) -> future<> {
|
||||
using pair_type = std::pair<locator::host_id, gms::inet_address>;
|
||||
|
||||
const auto& topology = _proxy.get_token_metadata_ptr()->get_topology();
|
||||
if (_ep_managers.contains(ep) || !_host_filter.can_hint_for(topology, ep)) {
|
||||
return make_ready_future();
|
||||
const auto maybe_host_id_and_ip = std::invoke([&] () -> std::optional<pair_type> {
|
||||
try {
|
||||
locator::host_id_or_endpoint hid_or_ep{de.name};
|
||||
if (hid_or_ep.has_host_id()) {
|
||||
return std::make_optional(pair_type{hid_or_ep.id(), hid_or_ep.resolve_endpoint(*tmptr)});
|
||||
} else {
|
||||
return std::make_optional(pair_type{hid_or_ep.resolve_id(*tmptr), hid_or_ep.endpoint()});
|
||||
}
|
||||
} catch (...) {
|
||||
return std::nullopt;
|
||||
}
|
||||
});
|
||||
|
||||
if (!maybe_host_id_and_ip) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
return get_ep_manager(ep).populate_segments_to_replay();
|
||||
const auto& topology = _proxy.get_token_metadata_ptr()->get_topology();
|
||||
const auto& [host_id, ip] = *maybe_host_id_and_ip;
|
||||
|
||||
if (_ep_managers.contains(host_id) || !_host_filter.can_hint_for(topology, host_id)) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await get_ep_manager(host_id, ip).populate_segments_to_replay();
|
||||
});
|
||||
} catch (...) {
|
||||
// Revert the changes in the filter. The code below will stop the additional managers
|
||||
@@ -439,7 +583,7 @@ future<> manager::change_host_filter(host_filter filter) {
|
||||
_host_filter = std::move(filter);
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
// Remove endpoint managers which are rejected by the filter.
|
||||
co_await coroutine::parallel_for_each(_ep_managers, [this] (auto& pair) {
|
||||
@@ -448,9 +592,10 @@ future<> manager::change_host_filter(host_filter filter) {
|
||||
if (_host_filter.can_hint_for(_proxy.get_token_metadata_ptr()->get_topology(), ep)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
|
||||
return ep_man.stop(drain::no).finally([this, ep] {
|
||||
_ep_managers.erase(ep);
|
||||
_hint_directory_manager.remove_mapping(ep);
|
||||
});
|
||||
});
|
||||
} catch (...) {
|
||||
@@ -481,6 +626,9 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept {
|
||||
manager_logger.trace("on_leave_cluster: {} is removed/decommissioned", endpoint);
|
||||
|
||||
const auto holder = seastar::gate::holder{_draining_eps_gate};
|
||||
// As long as we hold on to this lock, no migration of hinted handoff to host IDs
|
||||
// can be being performed because `manager::perform_migration()` takes it
|
||||
// at the beginning of its execution too.
|
||||
const auto sem_unit = co_await seastar::get_units(_drain_lock, 1);
|
||||
|
||||
// After an endpoint has been drained, we remove its directory with all of its contents.
|
||||
@@ -496,7 +644,7 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept {
|
||||
|
||||
if (_proxy.local_db().get_token_metadata().get_topology().is_me(endpoint)) {
|
||||
set_draining_all();
|
||||
|
||||
|
||||
try {
|
||||
co_await coroutine::parallel_for_each(_ep_managers | boost::adaptors::map_values,
|
||||
[&drain_ep_manager] (hint_endpoint_manager& ep_man) {
|
||||
@@ -507,9 +655,10 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept {
|
||||
}
|
||||
|
||||
_ep_managers.clear();
|
||||
_hint_directory_manager.clear();
|
||||
} else {
|
||||
auto it = _ep_managers.find(endpoint);
|
||||
|
||||
|
||||
if (it != _ep_managers.end()) {
|
||||
try {
|
||||
co_await drain_ep_manager(it->second);
|
||||
@@ -521,6 +670,7 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept {
|
||||
// so iterators could have been invalidated.
|
||||
// This never throws.
|
||||
_ep_managers.erase(endpoint);
|
||||
_hint_directory_manager.remove_mapping(endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -539,8 +689,224 @@ void manager::update_backlog(size_t backlog, size_t max_backlog) {
|
||||
}
|
||||
}
|
||||
|
||||
future<> manager::with_file_update_mutex_for(endpoint_id ep, noncopyable_function<future<> ()> func) {
|
||||
return _ep_managers.at(ep).with_file_update_mutex(std::move(func));
|
||||
future<> manager::with_file_update_mutex_for(const std::variant<locator::host_id, gms::inet_address>& ep,
|
||||
noncopyable_function<future<> ()> func) {
|
||||
const locator::host_id host_id = std::invoke([&] {
|
||||
if (std::holds_alternative<locator::host_id>(ep)) {
|
||||
return std::get<locator::host_id>(ep);
|
||||
}
|
||||
return *_hint_directory_manager.get_mapping(std::get<gms::inet_address>(ep));
|
||||
});
|
||||
return _ep_managers.at(host_id).with_file_update_mutex(std::move(func));
|
||||
}
|
||||
|
||||
// The function assumes that if `_uses_host_id == true`, then there are no directories that represent IP addresses,
|
||||
// i.e. every directory is either valid and represents a host ID, or is invalid (so it should be ignored anyway).
|
||||
future<> manager::initialize_endpoint_managers() {
|
||||
auto maybe_create_ep_mgr = [this] (const locator::host_id& host_id, const gms::inet_address& ip) -> future<> {
|
||||
if (!check_dc_for(host_id)) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await get_ep_manager(host_id, ip).populate_segments_to_replay();
|
||||
};
|
||||
|
||||
// We dispatch here to not hold on to the token metadata if hinted handoff is host-ID-based.
|
||||
// In that case, there are no directories that represent IP addresses, so we won't need to use it.
|
||||
// We want to avoid a situation when topology changes are prevented while we hold on to this pointer.
|
||||
const auto tmptr = _uses_host_id ? nullptr : _proxy.get_token_metadata_ptr();
|
||||
|
||||
co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of<directory_entry_type::directory>(),
|
||||
[&] (fs::path directory, directory_entry de) -> future<> {
|
||||
auto maybe_host_id_or_ep = std::invoke([&] () -> std::optional<locator::host_id_or_endpoint> {
|
||||
try {
|
||||
return locator::host_id_or_endpoint{de.name};
|
||||
} catch (...) {
|
||||
// The name represents neither an IP address, nor a host ID.
|
||||
return std::nullopt;
|
||||
}
|
||||
});
|
||||
|
||||
// The directory is invalid, so there's nothing more to do.
|
||||
if (!maybe_host_id_or_ep) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (_uses_host_id) {
|
||||
// If hinted handoff is host-ID-based, `get_ep_manager` will NOT use the passed IP address,
|
||||
// so we simply pass the default value there.
|
||||
co_return co_await maybe_create_ep_mgr(maybe_host_id_or_ep->id(), gms::inet_address{});
|
||||
}
|
||||
|
||||
// If we have got to this line, hinted handoff is still IP-based.
|
||||
const locator::host_id host_id = maybe_host_id_or_ep->resolve_id(*tmptr);
|
||||
co_await maybe_create_ep_mgr(host_id, maybe_host_id_or_ep->endpoint());
|
||||
});
|
||||
}
|
||||
|
||||
// This function assumes that the hint directory is NOT modified as long as this function is being executed.
|
||||
future<> manager::migrate_ip_directories() {
|
||||
std::vector<sstring> hint_directories{};
|
||||
|
||||
// Step 1. Gather the names of the hint directories.
|
||||
co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of<directory_entry_type::directory>(),
|
||||
[&] (std::filesystem::path, directory_entry de) -> future<> {
|
||||
hint_directories.push_back(std::move(de.name));
|
||||
co_return;
|
||||
});
|
||||
|
||||
struct hint_dir_mapping {
|
||||
sstring current_name;
|
||||
sstring new_name;
|
||||
};
|
||||
|
||||
std::vector<hint_dir_mapping> dirs_to_rename{};
|
||||
std::vector<std::filesystem::path> dirs_to_remove{};
|
||||
|
||||
/* RAII lock for token metadata */ {
|
||||
// We need to keep the topology consistent throughout the loop below to
|
||||
// ensure that, for example, two different IPs won't be mapped to
|
||||
// the same host ID.
|
||||
//
|
||||
// We don't want to hold on to this pointer for longer than necessary.
|
||||
// Topology changes might be postponed otherwise.
|
||||
auto tmptr = _proxy.get_token_metadata_ptr();
|
||||
|
||||
// Step 2. Obtain mappings IP -> host ID for the directories.
|
||||
for (auto& directory : hint_directories) {
|
||||
try {
|
||||
locator::host_id_or_endpoint hid_or_ep{directory};
|
||||
|
||||
// If the directory's name already represents a host ID, there is nothing to do.
|
||||
if (hid_or_ep.has_host_id()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const locator::host_id host_id = hid_or_ep.resolve_id(*tmptr);
|
||||
dirs_to_rename.push_back({.current_name = std::move(directory), .new_name = host_id.to_sstring()});
|
||||
} catch (...) {
|
||||
// We cannot map the IP to the corresponding host ID either because
|
||||
// the relevant mapping doesn't exist anymore or an error occurred. Drop it.
|
||||
//
|
||||
// We only care about directories named after IPs during an upgrade,
|
||||
// so we don't want to make this more complex than necessary.
|
||||
manager_logger.warn("No mapping IP-host ID for hint directory {}. It is going to be removed", directory);
|
||||
dirs_to_remove.push_back(_hints_dir / std::move(directory));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We don't need this memory anymore. The only remaining elements are the names of the directories
|
||||
// that already represent valid host IDs. We won't do anything with them. The rest have been moved
|
||||
// to either `dirs_to_rename` or `dirs_to_remove`.
|
||||
hint_directories.clear();
|
||||
|
||||
// Step 3. Try to rename the directories.
|
||||
co_await coroutine::parallel_for_each(dirs_to_rename, [&] (auto& mapping) -> future<> {
|
||||
std::filesystem::path old_name = _hints_dir / std::move(mapping.current_name);
|
||||
std::filesystem::path new_name = _hints_dir / std::move(mapping.new_name);
|
||||
|
||||
try {
|
||||
manager_logger.info("Renaming hint directory {} to {}", old_name.native(), new_name.native());
|
||||
co_await rename_file(old_name.native(), new_name.native());
|
||||
} catch (...) {
|
||||
manager_logger.warn("Renaming directory {} to {} has failed: {}",
|
||||
old_name.native(), new_name.native(), std::current_exception());
|
||||
dirs_to_remove.push_back(std::move(old_name));
|
||||
}
|
||||
});
|
||||
|
||||
// Step 4. Remove directories that don't represent host IDs.
|
||||
co_await coroutine::parallel_for_each(dirs_to_remove, [] (auto& directory) -> future<> {
|
||||
try {
|
||||
manager_logger.warn("Removing hint directory {}", directory.native());
|
||||
co_await lister::rmdir(directory);
|
||||
} catch (...) {
|
||||
on_internal_error(manager_logger,
|
||||
seastar::format("Removing a hint directory has failed. Reason: {}", std::current_exception()));
|
||||
}
|
||||
});
|
||||
|
||||
co_await io_check(sync_directory, _hints_dir.native());
|
||||
}
|
||||
|
||||
future<> manager::perform_migration() {
|
||||
// This function isn't marked as noexcept, but the only parts of the code that
|
||||
// can throw an exception are:
|
||||
// 1. the call to `migrate_ip_directories()`: if we fail there, the failure is critical.
|
||||
// It doesn't lead to any data corruption, but the node must be stopped;
|
||||
// 2. the re-initialization of the endpoint managers: a failure there is the same failure
|
||||
// that can happen when starting a node. It may be seen as critical, but it should only
|
||||
// boil down to not initializing some of the endpoint managers. No data corruption
|
||||
// is possible.
|
||||
if (_state.contains(state::stopping) || _state.contains(state::draining_all)) {
|
||||
// It's possible the cluster feature is enabled right after the local node decides
|
||||
// to leave the cluster. In that case, the migration callback might still potentially
|
||||
// be called, but we don't want to perform it. We need to stop the node as soon as possible.
|
||||
//
|
||||
// The `state::draining_all` case is more tricky. The semantics of self-draining is not
|
||||
// specified, but based on the description of the state in the header file, it means
|
||||
// the node is leaving the cluster and it works like that indeed, so we apply the same reasoning.
|
||||
co_return;
|
||||
}
|
||||
|
||||
manager_logger.info("Migration of hinted handoff to host ID is starting");
|
||||
// Step 1. Prevent acceping incoming hints.
|
||||
_state.set(state::migrating);
|
||||
|
||||
// Step 2. Make sure during the migration there is no draining process and we don't await any sync points.
|
||||
|
||||
// We're taking this lock for two reasons:
|
||||
// 1. we're waiting for the ongoing drains to finish so that there's no data race,
|
||||
// 2. we suspend new drain requests -- to prevent data races.
|
||||
const auto lock = co_await seastar::get_units(_drain_lock, 1);
|
||||
|
||||
using lock_type = std::unique_lock<seastar::shared_mutex>;
|
||||
// We're taking this lock because we're about to stop endpoint managers here, wheras
|
||||
// `manager::wait_for_sync_point` browses them and awaits their corresponding sync points.
|
||||
// If we stop them during that process, that function will get exceptions.
|
||||
//
|
||||
// Although in the current implemenation there is no danger of race conditions
|
||||
// (or at least race conditions that could be harmful in any way), it's better
|
||||
// to avoid them anyway. Hence this lock.
|
||||
const auto unique_lock = co_await std::invoke(coroutine::lambda([&] () -> future<lock_type> {
|
||||
co_await _migration_mutex.lock();
|
||||
|
||||
try {
|
||||
co_return lock_type{_migration_mutex, std::adopt_lock_t{}};
|
||||
} catch (...) {
|
||||
_migration_mutex.unlock();
|
||||
throw;
|
||||
}
|
||||
}));
|
||||
|
||||
// Step 3. Stop endpoint managers. We will modify the hint directory contents, so this is necessary.
|
||||
co_await coroutine::parallel_for_each(_ep_managers | std::views::values, [] (auto& ep_manager) -> future<> {
|
||||
return ep_manager.stop(drain::no);
|
||||
});
|
||||
|
||||
// Step 4. Prevent resource manager from scanning the hint directory. Race conditions are unacceptable.
|
||||
auto resource_manager_lock = co_await seastar::get_units(_resource_manager.update_lock(), 1);
|
||||
|
||||
// Once the resource manager cannot scan anything anymore, we can safely get rid of these.
|
||||
_ep_managers.clear();
|
||||
_eps_with_pending_hints.clear();
|
||||
|
||||
// We won't need this anymore.
|
||||
_hint_directory_manager.clear();
|
||||
|
||||
// Step 5. Rename the hint directories so that those that remain all represent valid host IDs.
|
||||
co_await migrate_ip_directories();
|
||||
_uses_host_id = true;
|
||||
|
||||
// Step 6. Make resource manager scan the hint directory again.
|
||||
resource_manager_lock.return_all();
|
||||
// Step 7. Once resource manager is working again, endpoint managers can be safely recreated.
|
||||
// We won't modify the contents of the hint directory anymore.
|
||||
co_await initialize_endpoint_managers();
|
||||
// Step 8. Start accepting incoming hints again.
|
||||
_state.remove(state::migrating);
|
||||
manager_logger.info("Migration of hinted handoff to host ID has finished successfully");
|
||||
}
|
||||
|
||||
} // namespace db::hints
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/lowres_clock.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <seastar/core/shared_mutex.hh>
|
||||
#include <seastar/core/timer.hh>
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
@@ -26,6 +27,7 @@
|
||||
#include "db/hints/resource_manager.hh"
|
||||
#include "db/hints/host_filter.hh"
|
||||
#include "db/hints/sync_point.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
|
||||
// STD.
|
||||
@@ -78,8 +80,13 @@ private:
|
||||
using hint_endpoint_manager = internal::hint_endpoint_manager;
|
||||
using node_to_hint_store_factory_type = internal::node_to_hint_store_factory_type;
|
||||
|
||||
using hint_directory_manager = internal::hint_directory_manager;
|
||||
|
||||
enum class state {
|
||||
started, // Hinting is currently allowed (start() has completed).
|
||||
migrating, // The hint manager is being migrated from using IPs to name
|
||||
// hint directories to using host IDs for that purpose. No new
|
||||
// incoming hints will be accepted as long as this is the state.
|
||||
replay_allowed, // Replaying (sending) hints is allowed.
|
||||
draining_all, // Accepting new hints is not allowed. All endpoint managers
|
||||
// are being drained because the node is leaving the cluster.
|
||||
@@ -89,6 +96,7 @@ private:
|
||||
|
||||
using state_set = enum_set<super_enum<state,
|
||||
state::started,
|
||||
state::migrating,
|
||||
state::replay_allowed,
|
||||
state::draining_all,
|
||||
state::stopping>>;
|
||||
@@ -118,21 +126,48 @@ private:
|
||||
resource_manager& _resource_manager;
|
||||
|
||||
std::unordered_map<endpoint_id, hint_endpoint_manager> _ep_managers;
|
||||
|
||||
// This is ONLY used when `_uses_host_id` is false. Otherwise, this map should stay EMPTY.
|
||||
//
|
||||
// Invariants:
|
||||
// (1) there is an endpoint manager in `_ep_managers` identified by host ID `H` if an only if
|
||||
// there is a mapping corresponding to `H` in `_hint_directory_manager`,
|
||||
// (2) a hint directory representing an IP address `I` is managed by an endpoint manager
|
||||
// if and only if there is a mapping corresponding to `I` in `_hint_directory_manager`.
|
||||
hint_directory_manager _hint_directory_manager;
|
||||
|
||||
hint_stats _stats;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
std::unordered_set<endpoint_id> _eps_with_pending_hints;
|
||||
|
||||
// We need to keep a variant here. Before migrating hinted handoff to using host ID, hint directories will
|
||||
// still represent IP addresses. But after the migration, they will start representing host IDs.
|
||||
// We need to handle either case.
|
||||
//
|
||||
// It's especially important when dealing with the scenario when there is an IP directory, but there is
|
||||
// no mapping for in locator::token_metadata. Since we sometimes have to save a directory like that
|
||||
// in this set as well, this variant is necessary.
|
||||
std::unordered_set<std::variant<locator::host_id, gms::inet_address>> _eps_with_pending_hints;
|
||||
|
||||
seastar::named_semaphore _drain_lock = {1, named_semaphore_exception_factory{"drain lock"}};
|
||||
|
||||
bool _uses_host_id = false;
|
||||
std::any _migration_callback = std::nullopt;
|
||||
future<> _migrating_done = make_ready_future();
|
||||
|
||||
// Unique lock if and only if there is an ongoing migration to the host-ID-based hinted handoff.
|
||||
// Shared lock if and only if there is a fiber already executing `manager::wait_for_sync_point`.
|
||||
seastar::shared_mutex _migration_mutex{};
|
||||
|
||||
public:
|
||||
manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter,
|
||||
int64_t max_hint_window_ms, resource_manager& res_manager, sharded<replica::database>& db);
|
||||
|
||||
|
||||
manager(const manager&) = delete;
|
||||
manager& operator=(const manager&) = delete;
|
||||
|
||||
manager(manager&&) = delete;
|
||||
manager& operator=(manager&&) = delete;
|
||||
|
||||
|
||||
~manager() noexcept {
|
||||
assert(_ep_managers.empty());
|
||||
}
|
||||
@@ -141,7 +176,8 @@ public:
|
||||
void register_metrics(const sstring& group_name);
|
||||
future<> start(shared_ptr<gms::gossiper> gossiper_ptr);
|
||||
future<> stop();
|
||||
bool store_hint(endpoint_id ep, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm, tracing::trace_state_ptr tr_state) noexcept;
|
||||
bool store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
|
||||
tracing::trace_state_ptr tr_state) noexcept;
|
||||
|
||||
/// \brief Changes the host_filter currently used, stopping and starting endpoint_managers relevant to the new host_filter.
|
||||
/// \param filter the new host_filter
|
||||
@@ -185,7 +221,8 @@ public:
|
||||
///
|
||||
/// \param ep endpoint whose file update mutex should be locked
|
||||
/// \param func functor to be executed
|
||||
future<> with_file_update_mutex_for(endpoint_id ep, noncopyable_function<future<> ()> func);
|
||||
future<> with_file_update_mutex_for(const std::variant<locator::host_id, gms::inet_address>& ep,
|
||||
noncopyable_function<future<> ()> func);
|
||||
|
||||
/// \brief Checks if hints are disabled for all endpoints
|
||||
/// \return TRUE if hints are disabled.
|
||||
@@ -209,7 +246,7 @@ public:
|
||||
return it->second.hints_in_progress();
|
||||
}
|
||||
|
||||
void add_ep_with_pending_hints(endpoint_id key) {
|
||||
void add_ep_with_pending_hints(const std::variant<locator::host_id, gms::inet_address>& key) {
|
||||
_eps_with_pending_hints.insert(key);
|
||||
}
|
||||
|
||||
@@ -218,7 +255,7 @@ public:
|
||||
_eps_with_pending_hints.reserve(_ep_managers.size());
|
||||
}
|
||||
|
||||
bool has_ep_with_pending_hints(endpoint_id key) const {
|
||||
bool has_ep_with_pending_hints(const std::variant<locator::host_id, gms::inet_address>& key) const {
|
||||
return _eps_with_pending_hints.contains(key);
|
||||
}
|
||||
|
||||
@@ -243,7 +280,7 @@ public:
|
||||
}
|
||||
|
||||
/// \brief Returns a set of replay positions for hint queues towards endpoints from the `target_eps`.
|
||||
sync_point::shard_rps calculate_current_sync_point(std::span<const endpoint_id> target_eps) const;
|
||||
sync_point::shard_rps calculate_current_sync_point(std::span<const gms::inet_address> target_eps) const;
|
||||
|
||||
/// \brief Waits until hint replay reach replay positions described in `rps`.
|
||||
future<> wait_for_sync_point(abort_source& as, const sync_point::shard_rps& rps);
|
||||
@@ -267,10 +304,10 @@ private:
|
||||
return _local_db;
|
||||
}
|
||||
|
||||
hint_endpoint_manager& get_ep_manager(endpoint_id ep);
|
||||
hint_endpoint_manager& get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip);
|
||||
|
||||
public:
|
||||
bool have_ep_manager(endpoint_id ep) const noexcept;
|
||||
bool have_ep_manager(const std::variant<locator::host_id, gms::inet_address>& ep) const noexcept;
|
||||
|
||||
public:
|
||||
/// \brief Initiate the draining when we detect that the node has left the cluster.
|
||||
@@ -316,6 +353,30 @@ private:
|
||||
bool draining_all() noexcept {
|
||||
return _state.contains(state::draining_all);
|
||||
}
|
||||
|
||||
/// Iterates over existing hint directories and for each, if the corresponding endpoint is present
|
||||
/// in locator::topology, creates an endpoint manager.
|
||||
future<> initialize_endpoint_managers();
|
||||
|
||||
/// Renames host directories named after IPs to host IDs.
|
||||
///
|
||||
/// In the past, hosts were identified by their IPs. Now we use host IDs for that purpose,
|
||||
/// but we want to ensure that old hints don't get lost if possible. This function serves
|
||||
/// this purpose. It's only necessary when upgrading Scylla.
|
||||
///
|
||||
/// This function should ONLY be called by `manager::start()` and `manager::perform_migration()`.
|
||||
///
|
||||
/// Calling this function again while the previous call has not yet finished
|
||||
/// is undefined behavior.
|
||||
future<> migrate_ip_directories();
|
||||
|
||||
/// Migrates this hint manager to using host IDs, i.e. when a call to this function ends,
|
||||
/// the names of hint directories will start being represented by host IDs instead of IPs.
|
||||
///
|
||||
/// This function suspends hinted handoff throughout its execution. Among other consequences,
|
||||
/// ALL requested sync points will be canceled, i.e. an exception will be issued
|
||||
/// in the corresponding futures.
|
||||
future<> perform_migration();
|
||||
};
|
||||
|
||||
} // namespace db::hints
|
||||
|
||||
@@ -7,6 +7,8 @@
|
||||
*/
|
||||
|
||||
#include "resource_manager.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "manager.hh"
|
||||
#include "log.hh"
|
||||
#include <boost/range/algorithm/for_each.hpp>
|
||||
@@ -91,28 +93,25 @@ future<> space_watchdog::stop() noexcept {
|
||||
}
|
||||
|
||||
// Called under the end_point_hints_manager::file_update_mutex() of the corresponding end_point_hints_manager instance.
|
||||
future<> space_watchdog::scan_one_ep_dir(fs::path path, manager& shard_manager, ep_key_type ep_key) {
|
||||
return do_with(std::move(path), [this, ep_key, &shard_manager] (fs::path& path) {
|
||||
// It may happen that we get here and the directory has already been deleted in the context of manager::drain_for().
|
||||
// In this case simply bail out.
|
||||
return file_exists(path.native()).then([this, ep_key, &shard_manager, &path] (bool exists) {
|
||||
if (!exists) {
|
||||
return make_ready_future<>();
|
||||
} else {
|
||||
return lister::scan_dir(path, lister::dir_entry_types::of<directory_entry_type::regular>(), [this, ep_key, &shard_manager] (fs::path dir, directory_entry de) {
|
||||
// Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory
|
||||
if (_files_count == 1) {
|
||||
shard_manager.add_ep_with_pending_hints(ep_key);
|
||||
}
|
||||
++_files_count;
|
||||
future<> space_watchdog::scan_one_ep_dir(fs::path path, manager& shard_manager,
|
||||
std::optional<std::variant<locator::host_id, gms::inet_address>> maybe_ep_key) {
|
||||
// It may happen that we get here and the directory has already been deleted in the context of manager::drain_for().
|
||||
// In this case simply bail out.
|
||||
if (!co_await file_exists(path.native())) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
return io_check(file_size, (dir / de.name.c_str()).c_str()).then([this] (uint64_t fsize) {
|
||||
_total_size += fsize;
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
co_await lister::scan_dir(path, lister::dir_entry_types::of<directory_entry_type::regular>(),
|
||||
coroutine::lambda([this, maybe_ep_key, &shard_manager] (fs::path dir, directory_entry de) -> future<> {
|
||||
// Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory
|
||||
if (maybe_ep_key && _files_count == 1) {
|
||||
shard_manager.add_ep_with_pending_hints(*maybe_ep_key);
|
||||
}
|
||||
++_files_count;
|
||||
|
||||
const auto filename = (std::move(dir) / std::move(de.name)).native();
|
||||
_total_size += co_await io_check(file_size, filename);
|
||||
}));
|
||||
}
|
||||
|
||||
// Called from the context of a seastar::thread.
|
||||
@@ -146,14 +145,35 @@ void space_watchdog::on_timer() {
|
||||
// not hintable).
|
||||
// If exists - let's take a file update lock so that files are not changed under our feet. Otherwise, simply
|
||||
// continue to enumeration - there is no one to change them.
|
||||
const internal::endpoint_id ep{de.name};
|
||||
auto maybe_variant = std::invoke([&] () -> std::optional<std::variant<locator::host_id, gms::inet_address>> {
|
||||
try {
|
||||
const auto hid_or_ep = locator::host_id_or_endpoint{de.name};
|
||||
if (hid_or_ep.has_host_id()) {
|
||||
return std::variant<locator::host_id, gms::inet_address>(hid_or_ep.id());
|
||||
} else {
|
||||
return std::variant<locator::host_id, gms::inet_address>(hid_or_ep.endpoint());
|
||||
}
|
||||
} catch (...) {
|
||||
return std::nullopt;
|
||||
}
|
||||
});
|
||||
|
||||
if (shard_manager.have_ep_manager(ep)) {
|
||||
return shard_manager.with_file_update_mutex_for(ep, [this, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)] () mutable {
|
||||
return scan_one_ep_dir(dir / ep_name, shard_manager, ep_key_type(ep_name));
|
||||
// Case 1: The directory is managed by an endpoint manager.
|
||||
if (maybe_variant && shard_manager.have_ep_manager(*maybe_variant)) {
|
||||
const auto variant = *maybe_variant;
|
||||
return shard_manager.with_file_update_mutex_for(variant, [this, variant, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)] () mutable {
|
||||
return scan_one_ep_dir(dir / ep_name, shard_manager, variant);
|
||||
});
|
||||
} else {
|
||||
return scan_one_ep_dir(dir / de.name, shard_manager, ep_key_type(de.name));
|
||||
}
|
||||
// Case 2: The directory isn't managed by an endpoint manager, but it represents either an IP address,
|
||||
// or a host ID.
|
||||
else if (maybe_variant) {
|
||||
return scan_one_ep_dir(dir / de.name, shard_manager, *maybe_variant);
|
||||
}
|
||||
// Case 3: The directory isn't managed by an endpoint manager, and it represents neither an IP address,
|
||||
// nor a host ID.
|
||||
else {
|
||||
return scan_one_ep_dir(dir / de.name, shard_manager, {});
|
||||
}
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@
|
||||
#include "utils/small_vector.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "enum_set.hh"
|
||||
#include "db/hints/internal/common.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
|
||||
// Usually we don't define namespace aliases in our headers
|
||||
// but this one is already entrenched.
|
||||
@@ -31,7 +33,6 @@ class storage_proxy;
|
||||
|
||||
namespace gms {
|
||||
class gossiper;
|
||||
class inet_address;
|
||||
} // namespace gms
|
||||
|
||||
namespace db {
|
||||
@@ -46,7 +47,8 @@ class manager;
|
||||
|
||||
class space_watchdog {
|
||||
private:
|
||||
using ep_key_type = gms::inet_address;
|
||||
using endpoint_id = internal::endpoint_id;
|
||||
|
||||
static const std::chrono::seconds _watchdog_period;
|
||||
|
||||
struct manager_hash {
|
||||
@@ -85,7 +87,7 @@ public:
|
||||
void start();
|
||||
future<> stop() noexcept;
|
||||
|
||||
seastar::named_semaphore& update_lock() {
|
||||
seastar::named_semaphore& update_lock() noexcept {
|
||||
return _update_lock;
|
||||
}
|
||||
|
||||
@@ -110,9 +112,11 @@ private:
|
||||
/// value.
|
||||
///
|
||||
/// \param path directory to scan
|
||||
/// \param ep_name end point ID (as a string)
|
||||
/// \param shard_manager the hint manager managing the directory specified by `path`
|
||||
/// \param maybe_ep_key endpoint ID corresponding to the scanned directory
|
||||
/// \return future that resolves when scanning is complete
|
||||
future<> scan_one_ep_dir(fs::path path, manager& shard_manager, ep_key_type ep_key);
|
||||
future<> scan_one_ep_dir(fs::path path, manager& shard_manager,
|
||||
std::optional<std::variant<locator::host_id, gms::inet_address>> maybe_ep_key);
|
||||
};
|
||||
|
||||
class resource_manager {
|
||||
@@ -193,6 +197,10 @@ public:
|
||||
/// The hints::managers can be added either before or after resource_manager starts.
|
||||
/// If resource_manager is already started, the hints manager will also be started.
|
||||
future<> register_manager(manager& m);
|
||||
|
||||
seastar::named_semaphore& update_lock() noexcept {
|
||||
return _space_watchdog.update_lock();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
# Hinted Handoff Design
|
||||
|
||||
## Abstract
|
||||
Hinted Handoff is a feature that allows replaying failed writes. The mutation and the destination replica are saved in a log and replayed later according to the feature configuration.
|
||||
Hinted Handoff is a feature that allows replaying failed writes. The mutation and the destination replica are saved in a log and replayed later according to the feature configuration.
|
||||
|
||||
## Hinted Handoff Configuration Parameters
|
||||
## Hinted Handoff Configuration Parameters
|
||||
* _hinted_handoff_enabled_: Enables or disables the Hinted Handoff feature completely or enumerate DCs for which hints are allowed.
|
||||
* _max_hint_window_in_ms_: Don't generate hints if the destination Node has been down for more than this value. The hints generation should resume once the Node is seen up.
|
||||
* _hints_directory_: Directory where scylla will store hints. By default `$SCYLLA_HOME/hints`
|
||||
* _hints_compression_: Compression to apply to hints files. By default, hints files are stored uncompressed.
|
||||
|
||||
|
||||
## Future configuration
|
||||
* We should define the fairness configuration between the regular WRITES and hints WRITES.
|
||||
Since we don't have either CPU scheduler or (and) Network scheduler at the moment we can't give any guarantee regarding the runtime and/or networking bandwidth fairness.
|
||||
@@ -16,7 +16,7 @@ Hinted Handoff is a feature that allows replaying failed writes. The mutation an
|
||||
|
||||
## Hints generation
|
||||
* Once the WRITE mutation fails with a timeout we create a _hints_queue_ for the target node.
|
||||
* The queue is specified by a destination node IP.
|
||||
* The queue is specified by a destination node host ID.
|
||||
|
||||
* Each hint is specified by:
|
||||
* Mutation.
|
||||
@@ -31,15 +31,15 @@ As long as hints are appended to the queue the files are closed and flushed to t
|
||||
|
||||
We are going to reuse the commitlog infrastructure for writing hints to disk - it provides both the internal buffering and the memory consumption control.
|
||||
|
||||
Hints to the specific destination are stored under the _hints_directory_/\<shard ID>/\<node IP> directory.
|
||||
Hints to the specific destination are stored under the _hints_directory_/\<shard ID>/\<node host ID> directory.
|
||||
|
||||
### When new hints may be dropped?
|
||||
* A new hint is going to be dropped when there are more than 10MB "in progress" (yet to be stored) hints per-shard and when there are "in progress" hints to the destination the current hint is aimed to.
|
||||
* A new hint is going to be dropped when there are more than 10MB "in progress" (yet to be stored) hints per-shard and when there are "in progress" hints to the destination the current hint is aimed to.
|
||||
* If there are no "in progress" hints to the current destination the new hint won't be dropped due to the per-shard memory limitation.
|
||||
* A hint is going to be dropped if the disk space quota (to the whole node it's 10% of the total disk space of the disk partition where _hints_directory_ is located) has been depleted and when there are pending (already stored) hints to the current destination.
|
||||
* Disk quota is divided equally between all present shards.
|
||||
* If there are no pending hints to the current destination a new hint won't be dropped due to a disk space limitation.
|
||||
* If a new hint is dropped the corresponding metrics counter is increased.
|
||||
* If there are no pending hints to the current destination a new hint won't be dropped due to a disk space limitation.
|
||||
* If a new hint is dropped the corresponding metrics counter is increased.
|
||||
|
||||
### Redistribution of hints when node boots.
|
||||
* When node boots all present hints files are redistributed equally between all present shards.
|
||||
@@ -47,7 +47,7 @@ Hints to the specific destination are stored under the _hints_directory_/\<shard
|
||||
## Hints sending
|
||||
* Hints are sent from each shard by each _hints_queue_ independently.
|
||||
* Each shard sends the hints that it owns (according to the hint file location).
|
||||
* Hints sending is triggered by the following events:
|
||||
* Hints sending is triggered by the following events:
|
||||
* Timer: every X seconds (every 10s).
|
||||
* For each queue:
|
||||
* Forcefully close the queues.
|
||||
@@ -61,12 +61,12 @@ Hints to the specific destination are stored under the _hints_directory_/\<shard
|
||||
* We are going to limit the parallelism during hints sending. The new hint is going to be sent out unless:
|
||||
* The total size of in-flight (being sent) hints is greater or equal to 10% of the total shard memory.
|
||||
* The number of in-flight hints is greater or equal to 128 - this is needed to limit the collateral memory consumption in case of small hints (mutations).
|
||||
* If there is a hint that is bigger than the memory limit above we are going to send it but won't allow any additional in-flight hints while it's being sent.
|
||||
* If there is a hint that is bigger than the memory limit above we are going to send it but won't allow any additional in-flight hints while it's being sent.
|
||||
* Local node is decommissioned (see "When the current node is decommissioned" below).
|
||||
|
||||
## When the current node is decommissioned (in the absence of Hints Streaming)
|
||||
* Send all pending hints out:
|
||||
* If the destination node is not ALIVE or the mutation times out - drop the hint and move on to the next one.
|
||||
* If the destination node is not ALIVE or the mutation times out - drop the hint and move on to the next one.
|
||||
|
||||
## Hints streaming (optional)
|
||||
* Streaming is performed using a new HINT_STREAMING verb:
|
||||
@@ -74,5 +74,25 @@ Hints to the specific destination are stored under the _hints_directory_/\<shard
|
||||
* Shard X would send its hints to the node[Yx], where Yx = X mod N, where N is number of nodes in the cluster without the node that is being decommissioned.
|
||||
* Receiver distributes received hints equally among local shards: pushes them to the corresponding _hint_queue_s (see "Hints generation" above).
|
||||
|
||||
## Migration to host ID
|
||||
### Rationale
|
||||
Scylla is moving away from using IP addresses to identify nodes in its internals and that role is being taken over by host IDs.
|
||||
Hinted Handoff is no exception to that and the module uses the new type now.
|
||||
|
||||
However, to prepare for upgrading Scylla to a new version from one where Hinted Handoff still used IP addresses, a migration
|
||||
process has been introduced. Its purpose is to map existing hint directories on disk so that their names all
|
||||
represent valid host IDs.
|
||||
|
||||
### Migration process
|
||||
When the whole cluster starts using a version of Scylla that supports host-ID based Hinted Handoff, the module is
|
||||
suspended (i.e. no new hints are accepted and no hints are being sent) and we start renaming hint directories to host IDs.
|
||||
Hinted Handoff does NOT work until the migration process has finished.
|
||||
|
||||
As a side effect, all sync points that were created up to then will be canceled, i.e. an exception will be issued
|
||||
instead of a resolved future.
|
||||
|
||||
A major consequence of the migration process is also possible data loss. If there is no corresponding host ID for a given
|
||||
IP address in `locator::token_metadata` or if renaming a directory fails, the directory shall be removed with all of its
|
||||
contents. In that case, a warning will be issued.
|
||||
|
||||
Migration won't be started if a node is being stopped or drained.
|
||||
|
||||
@@ -136,6 +136,7 @@ public:
|
||||
// revert to the digest method when necessary (if we must perform a schema change during RECOVERY).
|
||||
gms::feature group0_schema_versioning { *this, "GROUP0_SCHEMA_VERSIONING"sv };
|
||||
gms::feature supports_consistent_topology_changes { *this, "SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES"sv };
|
||||
gms::feature host_id_based_hinted_handoff { *this, "HOST_ID_BASED_HINTED_HANDOFF"sv };
|
||||
|
||||
// A feature just for use in tests. It must not be advertised unless
|
||||
// the "features_enable_test_feature" injection is enabled.
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "gms/inet_address.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "utils/atomic_vector.hh"
|
||||
|
||||
namespace service {
|
||||
@@ -39,9 +40,10 @@ public:
|
||||
/**
|
||||
* Called when a new node leave the cluster (decommission or removeToken).
|
||||
*
|
||||
* @param endpoint the endpoint that is leaving.
|
||||
* @param endpoint the IP of the endpoint that is leaving.
|
||||
* @param host_id the host ID of the endpoint that is leaving.
|
||||
*/
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint) = 0;
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& host_id) = 0;
|
||||
|
||||
/**
|
||||
* Called when a node is marked UP.
|
||||
@@ -66,7 +68,7 @@ public:
|
||||
future<> unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) noexcept;
|
||||
|
||||
future<> notify_down(gms::inet_address endpoint);
|
||||
future<> notify_left(gms::inet_address endpoint);
|
||||
future<> notify_left(gms::inet_address endpoint, locator::host_id host_id);
|
||||
future<> notify_up(gms::inet_address endpoint);
|
||||
future<> notify_joined(gms::inet_address endpoint);
|
||||
};
|
||||
|
||||
@@ -557,7 +557,7 @@ future<> service_level_controller::do_remove_service_level(sstring name, bool re
|
||||
|
||||
void service_level_controller::on_join_cluster(const gms::inet_address& endpoint) { }
|
||||
|
||||
void service_level_controller::on_leave_cluster(const gms::inet_address& endpoint) {
|
||||
void service_level_controller::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) {
|
||||
auto my_address = _auth_service.local().query_processor().proxy().local_db().get_token_metadata().get_topology().my_address();
|
||||
if (this_shard_id() == global_controller && endpoint == my_address) {
|
||||
_global_controller_db->dist_data_update_aborter.request_abort();
|
||||
|
||||
@@ -259,7 +259,7 @@ public:
|
||||
static sstring default_service_level_name;
|
||||
|
||||
virtual void on_join_cluster(const gms::inet_address& endpoint) override;
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint) override;
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) override;
|
||||
virtual void on_up(const gms::inet_address& endpoint) override;
|
||||
virtual void on_down(const gms::inet_address& endpoint) override;
|
||||
};
|
||||
|
||||
@@ -1016,7 +1016,8 @@ protected:
|
||||
schema_ptr _schema;
|
||||
public:
|
||||
virtual ~mutation_holder() {}
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) = 0;
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr ermptr,
|
||||
tracing::trace_state_ptr tr_state) = 0;
|
||||
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
|
||||
fencing_token fence) = 0;
|
||||
@@ -1055,10 +1056,12 @@ public:
|
||||
_mutations.emplace(m.first, std::move(fm));
|
||||
}
|
||||
}
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override {
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr ermptr,
|
||||
tracing::trace_state_ptr tr_state) override {
|
||||
auto m = _mutations[ep];
|
||||
if (m) {
|
||||
return hm.store_hint(ep, _schema, std::move(m), tr_state);
|
||||
const auto hid = ermptr->get_token_metadata().get_host_id(ep);
|
||||
return hm.store_hint(hid, ep, _schema, std::move(m), tr_state);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
@@ -1114,8 +1117,10 @@ public:
|
||||
}
|
||||
explicit shared_mutation(const mutation& m) : shared_mutation(frozen_mutation_and_schema{freeze(m), m.schema()}) {
|
||||
}
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override {
|
||||
return hm.store_hint(ep, _schema, _mutation, tr_state);
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr ermptr,
|
||||
tracing::trace_state_ptr tr_state) override {
|
||||
const auto hid = ermptr->get_token_metadata().get_host_id(ep);
|
||||
return hm.store_hint(hid, ep, _schema, _mutation, tr_state);
|
||||
}
|
||||
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
|
||||
@@ -1144,7 +1149,8 @@ public:
|
||||
class hint_mutation : public shared_mutation {
|
||||
public:
|
||||
using shared_mutation::shared_mutation;
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override {
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr,
|
||||
tracing::trace_state_ptr tr_state) override {
|
||||
throw std::runtime_error("Attempted to store a hint for a hint");
|
||||
}
|
||||
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
|
||||
@@ -1269,8 +1275,9 @@ public:
|
||||
_size = _proposal->update.representation().size();
|
||||
_schema = std::move(s);
|
||||
}
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override {
|
||||
return false; // CAS does not save hints yet
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr,
|
||||
tracing::trace_state_ptr tr_state) override {
|
||||
return false; // CAS does not save hints yet
|
||||
}
|
||||
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info,
|
||||
@@ -1503,7 +1510,8 @@ public:
|
||||
// we are here because either cl was achieved, but targets left in the handler are not
|
||||
// responding, so a hint should be written for them, or cl == any in which case
|
||||
// hints are counted towards consistency, so we need to write hints and count how much was written
|
||||
auto hints = _proxy->hint_to_dead_endpoints(_mutation_holder, get_targets(), _type, get_trace_state());
|
||||
auto hints = _proxy->hint_to_dead_endpoints(_mutation_holder, get_targets(), _effective_replication_map_ptr,
|
||||
_type, get_trace_state());
|
||||
signal(hints);
|
||||
if (_cl == db::consistency_level::ANY && hints) {
|
||||
slogger.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
|
||||
@@ -1575,8 +1583,9 @@ public:
|
||||
const inet_address_vector_topology_change& get_dead_endpoints() const {
|
||||
return _dead_endpoints;
|
||||
}
|
||||
bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) {
|
||||
return _mutation_holder->store_hint(hm, ep, tr_state);
|
||||
bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr ermptr,
|
||||
tracing::trace_state_ptr tr_state) {
|
||||
return _mutation_holder->store_hint(hm, ep, std::move(ermptr), tr_state);
|
||||
}
|
||||
future<> apply_locally(storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) {
|
||||
auto op = _proxy->start_write();
|
||||
@@ -2484,7 +2493,7 @@ storage_proxy_stats::split_stats::split_stats(const sstring& category, const sst
|
||||
, _long_description_prefix(long_description_prefix)
|
||||
, _category(category)
|
||||
, _op_type(op_type)
|
||||
, _auto_register_metrics(auto_register_metrics)
|
||||
, _auto_register_metrics(auto_register_metrics)
|
||||
, _sg(current_scheduling_group()) { }
|
||||
|
||||
storage_proxy_stats::write_stats::write_stats()
|
||||
@@ -2992,7 +3001,7 @@ storage_proxy::mutate_locally(std::vector<mutation> mutations, tracing::trace_st
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
future<>
|
||||
storage_proxy::mutate_locally(std::vector<mutation> mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) {
|
||||
return mutate_locally(std::move(mutation), tr_state, timeout, _write_smp_service_group, rate_limit_info);
|
||||
}
|
||||
@@ -3117,10 +3126,24 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
|
||||
pending_endpoints.erase(itend, pending_endpoints.end());
|
||||
|
||||
auto all = boost::range::join(natural_endpoints, pending_endpoints);
|
||||
auto all_hids = all | boost::adaptors::transformed([&erm] (const gms::inet_address& ep) {
|
||||
const auto& tm = erm->get_token_metadata();
|
||||
const auto maybe_host_id = tm.get_host_id_if_known(ep);
|
||||
if (maybe_host_id) {
|
||||
return *maybe_host_id;
|
||||
}
|
||||
// We need this additional check because even after removing the mapping IP-host ID corresponding
|
||||
// to this node from `locator::token_metadata` while decommissioning, we still perform mutations
|
||||
// targetting the local node.
|
||||
if (tm.get_topology().is_me(ep)) {
|
||||
return tm.get_topology().my_host_id();
|
||||
}
|
||||
on_internal_error(slogger, seastar::format("No mapping for {} in the passed effective replication map", ep));
|
||||
});
|
||||
|
||||
// If the manager hasn't started yet, no mutation will be performed to another node.
|
||||
// No hint will need to be stored.
|
||||
if (cannot_hint(all, type)) {
|
||||
if (cannot_hint(all_hids, type)) {
|
||||
get_stats().writes_failed_due_to_too_many_in_flight_hints++;
|
||||
// avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can
|
||||
// still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
|
||||
@@ -3232,8 +3255,8 @@ void storage_proxy::register_cdc_operation_result_tracker(const storage_proxy::u
|
||||
void
|
||||
storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level cl) {
|
||||
auto& h = *get_write_response_handler(id);
|
||||
|
||||
size_t hints = hint_to_dead_endpoints(h._mutation_holder, h.get_dead_endpoints(), h._type, h.get_trace_state());
|
||||
size_t hints = hint_to_dead_endpoints(h._mutation_holder, h.get_dead_endpoints(), h._effective_replication_map_ptr,
|
||||
h._type, h.get_trace_state());
|
||||
|
||||
if (cl == db::consistency_level::ANY) {
|
||||
// for cl==ANY hints are counted towards consistency
|
||||
@@ -4124,12 +4147,13 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
|
||||
|
||||
// returns number of hints stored
|
||||
template<typename Range>
|
||||
size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept
|
||||
size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& mh, const Range& targets,
|
||||
locator::effective_replication_map_ptr ermptr, db::write_type type, tracing::trace_state_ptr tr_state) noexcept
|
||||
{
|
||||
if (hints_enabled(type)) {
|
||||
db::hints::manager& hints_manager = hints_manager_for(type);
|
||||
return boost::count_if(targets, [&mh, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool {
|
||||
return mh->store_hint(hints_manager, target, tr_state);
|
||||
return boost::count_if(targets, [&mh, ermptr, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool {
|
||||
return mh->store_hint(hints_manager, target, ermptr, tr_state);
|
||||
});
|
||||
} else {
|
||||
return 0;
|
||||
@@ -6557,10 +6581,10 @@ future<> storage_proxy::wait_for_hint_sync_point(const db::hints::sync_point spo
|
||||
|
||||
void storage_proxy::on_join_cluster(const gms::inet_address& endpoint) {};
|
||||
|
||||
void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint) {
|
||||
void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) {
|
||||
// Discarding these futures is safe. They're awaited by db::hints::manager::stop().
|
||||
(void) _hints_manager.drain_for(endpoint);
|
||||
(void) _hints_for_views_manager.drain_for(endpoint);
|
||||
(void) _hints_manager.drain_for(hid);
|
||||
(void) _hints_for_views_manager.drain_for(hid);
|
||||
}
|
||||
|
||||
void storage_proxy::on_up(const gms::inet_address& endpoint) {};
|
||||
|
||||
@@ -58,7 +58,7 @@ class one_or_two_partition_ranges;
|
||||
}
|
||||
|
||||
namespace cdc {
|
||||
class cdc_service;
|
||||
class cdc_service;
|
||||
}
|
||||
|
||||
namespace gms {
|
||||
@@ -332,7 +332,8 @@ private:
|
||||
void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker);
|
||||
void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout);
|
||||
template<typename Range>
|
||||
size_t hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept;
|
||||
size_t hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& mh, const Range& targets,
|
||||
locator::effective_replication_map_ptr ermptr, db::write_type type, tracing::trace_state_ptr tr_state) noexcept;
|
||||
void hint_to_dead_endpoints(response_id_type, db::consistency_level);
|
||||
template<typename Range>
|
||||
bool cannot_hint(const Range& targets, db::write_type type) const;
|
||||
@@ -724,7 +725,7 @@ public:
|
||||
}
|
||||
|
||||
virtual void on_join_cluster(const gms::inet_address& endpoint) override;
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint) override;
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) override;
|
||||
virtual void on_up(const gms::inet_address& endpoint) override;
|
||||
virtual void on_down(const gms::inet_address& endpoint) override;
|
||||
|
||||
|
||||
@@ -402,30 +402,31 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
|
||||
|
||||
std::vector<future<>> sys_ks_futures;
|
||||
|
||||
auto remove_ip = [&](inet_address ip, bool notify) -> future<> {
|
||||
auto remove_ip = [&](inet_address ip, locator::host_id host_id, bool notify) -> future<> {
|
||||
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(ip));
|
||||
|
||||
if (_gossiper.get_endpoint_state_ptr(ip) && !get_used_ips().contains(ip)) {
|
||||
co_await _gossiper.force_remove_endpoint(ip, gms::null_permit_id);
|
||||
if (notify) {
|
||||
co_await notify_left(ip);
|
||||
co_await notify_left(ip, host_id);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto process_left_node = [&] (raft::server_id id) -> future<> {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
|
||||
if (const auto ip = am.find(id)) {
|
||||
co_await remove_ip(*ip, true);
|
||||
co_await remove_ip(*ip, host_id, true);
|
||||
}
|
||||
|
||||
locator::host_id host_id{id.uuid()};
|
||||
if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
|
||||
update_topology(host_id, std::nullopt, t.left_nodes_rs.at(id));
|
||||
}
|
||||
|
||||
_group0->modifiable_address_map().set_expiring(id);
|
||||
// However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
|
||||
co_await _messaging.local().ban_host(locator::host_id{id.uuid()});
|
||||
co_await _messaging.local().ban_host(host_id);
|
||||
};
|
||||
|
||||
auto process_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> {
|
||||
@@ -474,7 +475,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
|
||||
_exit(1);
|
||||
});
|
||||
// IP change is not expected to emit REMOVED_NODE notifications
|
||||
co_await remove_ip(it->second, false);
|
||||
co_await remove_ip(it->second, host_id, false);
|
||||
}
|
||||
}
|
||||
update_topology(host_id, ip, rs);
|
||||
@@ -1709,7 +1710,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
|
||||
return local_proxy.start_hints_manager(gossiper.local().shared_from_this());
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
if (!raft_topology_change_enabled()) {
|
||||
co_await _feature_service.enable_features_on_join(_gossiper, _sys_ks.local(), *this);
|
||||
}
|
||||
@@ -2454,7 +2455,7 @@ future<> storage_service::handle_state_left(inet_address endpoint, std::vector<s
|
||||
slogger.warn("handle_state_left: Get tokens from token_metadata, node={}/{}, tokens={}", endpoint, host_id, tokens_from_tm);
|
||||
tokens = std::unordered_set<dht::token>(tokens_from_tm.begin(), tokens_from_tm.end());
|
||||
}
|
||||
co_await excise(tokens, endpoint, extract_expire_time(pieces), pid);
|
||||
co_await excise(tokens, endpoint, host_id, extract_expire_time(pieces), pid);
|
||||
}
|
||||
|
||||
future<> storage_service::handle_state_removed(inet_address endpoint, std::vector<sstring> pieces, gms::permit_id pid) {
|
||||
@@ -2475,7 +2476,7 @@ future<> storage_service::handle_state_removed(inet_address endpoint, std::vecto
|
||||
auto state = pieces[0];
|
||||
auto remove_tokens = get_token_metadata().get_tokens(host_id);
|
||||
std::unordered_set<token> tmp(remove_tokens.begin(), remove_tokens.end());
|
||||
co_await excise(std::move(tmp), endpoint, extract_expire_time(pieces), pid);
|
||||
co_await excise(std::move(tmp), endpoint, host_id, extract_expire_time(pieces), pid);
|
||||
} else { // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it
|
||||
add_expire_time_if_found(endpoint, extract_expire_time(pieces));
|
||||
co_await remove_endpoint(endpoint, pid);
|
||||
@@ -3971,7 +3972,7 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
|
||||
const auto& pid = permit.id();
|
||||
ss._gossiper.advertise_token_removed(endpoint, host_id, pid).get();
|
||||
std::unordered_set<token> tmp(tokens.begin(), tokens.end());
|
||||
ss.excise(std::move(tmp), endpoint, pid).get();
|
||||
ss.excise(std::move(tmp), endpoint, host_id, pid).get();
|
||||
removed_from_token_ring = true;
|
||||
slogger.info("removenode[{}]: Finished removing the node from the ring", uuid);
|
||||
} catch (...) {
|
||||
@@ -4797,27 +4798,27 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint, gms::permit_id pid) {
|
||||
slogger.info("Removing tokens {} for {}", tokens, endpoint);
|
||||
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint_ip,
|
||||
locator::host_id endpoint_hid, gms::permit_id pid) {
|
||||
slogger.info("Removing tokens {} for {}", tokens, endpoint_ip);
|
||||
// FIXME: HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
|
||||
co_await remove_endpoint(endpoint, pid);
|
||||
co_await remove_endpoint(endpoint_ip, pid);
|
||||
auto tmlock = std::make_optional(co_await get_token_metadata_lock());
|
||||
auto tmptr = co_await get_mutable_token_metadata_ptr();
|
||||
if (const auto host_id = tmptr->get_host_id_if_known(endpoint); host_id) {
|
||||
tmptr->remove_endpoint(*host_id);
|
||||
}
|
||||
tmptr->remove_endpoint(endpoint_hid);
|
||||
tmptr->remove_bootstrap_tokens(tokens);
|
||||
|
||||
co_await update_topology_change_info(tmptr, ::format("excise {}", endpoint));
|
||||
co_await update_topology_change_info(tmptr, ::format("excise {}", endpoint_ip));
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
tmlock.reset();
|
||||
|
||||
co_await notify_left(endpoint);
|
||||
co_await notify_left(endpoint_ip, endpoint_hid);
|
||||
}
|
||||
|
||||
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint, int64_t expire_time, gms::permit_id pid) {
|
||||
add_expire_time_if_found(endpoint, expire_time);
|
||||
return excise(tokens, endpoint, pid);
|
||||
future<> storage_service::excise(std::unordered_set<token> tokens, inet_address endpoint_ip,
|
||||
locator::host_id endpoint_hid, int64_t expire_time, gms::permit_id pid) {
|
||||
add_expire_time_if_found(endpoint_ip, expire_time);
|
||||
return excise(tokens, endpoint_ip, endpoint_hid, pid);
|
||||
}
|
||||
|
||||
future<> storage_service::leave_ring() {
|
||||
@@ -6618,7 +6619,7 @@ future<> storage_service::force_remove_completion() {
|
||||
const auto& pid = permit.id();
|
||||
co_await ss._gossiper.advertise_token_removed(*endpoint, host_id, pid);
|
||||
std::unordered_set<token> tokens_set(tokens.begin(), tokens.end());
|
||||
co_await ss.excise(tokens_set, *endpoint, pid);
|
||||
co_await ss.excise(tokens_set, *endpoint, host_id, pid);
|
||||
|
||||
slogger.info("force_remove_completion: removing endpoint {} from group 0", *endpoint);
|
||||
assert(ss._group0);
|
||||
@@ -6758,11 +6759,11 @@ future<> storage_service::notify_down(inet_address endpoint) {
|
||||
slogger.debug("Notify node {} has been down", endpoint);
|
||||
}
|
||||
|
||||
future<> endpoint_lifecycle_notifier::notify_left(gms::inet_address endpoint) {
|
||||
return seastar::async([this, endpoint] {
|
||||
_subscribers.thread_for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) {
|
||||
future<> endpoint_lifecycle_notifier::notify_left(gms::inet_address endpoint, locator::host_id hid) {
|
||||
return seastar::async([this, endpoint, hid] {
|
||||
_subscribers.thread_for_each([endpoint, hid] (endpoint_lifecycle_subscriber* subscriber) {
|
||||
try {
|
||||
subscriber->on_leave_cluster(endpoint);
|
||||
subscriber->on_leave_cluster(endpoint, hid);
|
||||
} catch (...) {
|
||||
slogger.warn("Leave cluster notification failed {}: {}", endpoint, std::current_exception());
|
||||
}
|
||||
@@ -6770,9 +6771,9 @@ future<> endpoint_lifecycle_notifier::notify_left(gms::inet_address endpoint) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::notify_left(inet_address endpoint) {
|
||||
co_await container().invoke_on_all([endpoint] (auto&& ss) {
|
||||
return ss._lifecycle_notifier.notify_left(endpoint);
|
||||
future<> storage_service::notify_left(inet_address endpoint, locator::host_id hid) {
|
||||
co_await container().invoke_on_all([endpoint, hid] (auto&& ss) {
|
||||
return ss._lifecycle_notifier.notify_left(endpoint, hid);
|
||||
});
|
||||
slogger.debug("Notify node {} has left the cluster", endpoint);
|
||||
}
|
||||
|
||||
@@ -546,8 +546,10 @@ private:
|
||||
future<> handle_state_removed(inet_address endpoint, std::vector<sstring> pieces, gms::permit_id);
|
||||
|
||||
private:
|
||||
future<> excise(std::unordered_set<token> tokens, inet_address endpoint, gms::permit_id);
|
||||
future<> excise(std::unordered_set<token> tokens, inet_address endpoint, long expire_time, gms::permit_id);
|
||||
future<> excise(std::unordered_set<token> tokens, inet_address endpoint_ip, locator::host_id endpoint_hid,
|
||||
gms::permit_id);
|
||||
future<> excise(std::unordered_set<token> tokens, inet_address endpoint_ip, locator::host_id endpoint_hid,
|
||||
long expire_time, gms::permit_id);
|
||||
|
||||
/** unlike excise we just need this endpoint gone without going through any notifications **/
|
||||
future<> remove_endpoint(inet_address endpoint, gms::permit_id pid);
|
||||
@@ -739,7 +741,7 @@ private:
|
||||
future<> isolate();
|
||||
|
||||
future<> notify_down(inet_address endpoint);
|
||||
future<> notify_left(inet_address endpoint);
|
||||
future<> notify_left(inet_address endpoint, locator::host_id hid);
|
||||
future<> notify_up(inet_address endpoint);
|
||||
future<> notify_joined(inet_address endpoint);
|
||||
future<> notify_cql_change(inet_address endpoint, bool ready);
|
||||
|
||||
@@ -2305,7 +2305,7 @@ public:
|
||||
future<> run();
|
||||
|
||||
virtual void on_join_cluster(const gms::inet_address& endpoint) {}
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint) {};
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) {};
|
||||
virtual void on_up(const gms::inet_address& endpoint) {};
|
||||
virtual void on_down(const gms::inet_address& endpoint) { _topo_sm.event.broadcast(); };
|
||||
};
|
||||
@@ -2506,7 +2506,7 @@ future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
|
||||
auto get_application_state = [&] (locator::host_id host_id, gms::inet_address ep, const gms::application_state_map& epmap, gms::application_state app_state) -> sstring {
|
||||
const auto it = epmap.find(app_state);
|
||||
if (it == epmap.end()) {
|
||||
throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {}/{}: application state {} is missing in gossip",
|
||||
throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {}/{}: application state {} is missing in gossip",
|
||||
host_id, ep, app_state));
|
||||
}
|
||||
// it's versioned_value::value(), not std::optional::value() - it does not throw
|
||||
@@ -2579,7 +2579,7 @@ future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
|
||||
.set("cleanup_status", cleanup_status::clean)
|
||||
.set("request_id", utils::UUID())
|
||||
.set("supported_features", supported_features);
|
||||
|
||||
|
||||
rtlogger.debug("node {} will contain the following parameters: "
|
||||
"datacenter={}, rack={}, tokens={}, shard_count={}, ignore_msb={}, supported_features={}",
|
||||
host_id, datacenter, rack, tokens, shard_count, ignore_msb, supported_features);
|
||||
|
||||
@@ -236,7 +236,7 @@ void cql_server::event_notifier::send_join_cluster(const gms::inet_address& endp
|
||||
}
|
||||
}
|
||||
|
||||
void cql_server::event_notifier::on_leave_cluster(const gms::inet_address& endpoint)
|
||||
void cql_server::event_notifier::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid)
|
||||
{
|
||||
for (auto&& conn : _topology_change_listeners) {
|
||||
using namespace cql_transport;
|
||||
|
||||
@@ -329,7 +329,7 @@ public:
|
||||
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
|
||||
|
||||
virtual void on_join_cluster(const gms::inet_address& endpoint) override;
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint) override;
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) override;
|
||||
virtual void on_up(const gms::inet_address& endpoint) override;
|
||||
virtual void on_down(const gms::inet_address& endpoint) override;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user