mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-01 12:36:56 +00:00
Merge 'Deprecate HOST_ID_BASED_HINTED_HANDOFF feature and drop migration code' from Gleb Natapov
The feature was included in 2024.2 and present on all supported versions. No upgrade from a version that does not have it is possible to the HEAD. It means that the feature can be deprecated features list and all the migration code can be dropped. No need to backport since the is code removal. Closes scylladb/scylladb#30087 * github.com:scylladb/scylladb: hints: remove hint_directory_manager and IP-based hint directory infrastructure hints: remove migration infrastructure hints: deprecate HOST_ID_BASED_HINTED_HANDOFF feature
This commit is contained in:
@@ -272,76 +272,5 @@ future<> rebalance_hints(fs::path hint_directory) {
|
||||
co_await remove_irrelevant_shards_directories(hint_directory);
|
||||
}
|
||||
|
||||
std::pair<locator::host_id, gms::inet_address> hint_directory_manager::insert_mapping(const locator::host_id& host_id,
|
||||
const gms::inet_address& ip)
|
||||
{
|
||||
const auto maybe_mapping = get_mapping(host_id, ip);
|
||||
if (maybe_mapping) {
|
||||
return *maybe_mapping;
|
||||
}
|
||||
|
||||
|
||||
_mappings.emplace(host_id, ip);
|
||||
return std::make_pair(host_id, ip);
|
||||
}
|
||||
|
||||
std::optional<gms::inet_address> hint_directory_manager::get_mapping(const locator::host_id& host_id) const noexcept {
|
||||
auto it = _mappings.find(host_id);
|
||||
if (it != _mappings.end()) {
|
||||
return it->second;
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
std::optional<locator::host_id> hint_directory_manager::get_mapping(const gms::inet_address& ip) const noexcept {
|
||||
for (const auto& [host_id, ep] : _mappings) {
|
||||
if (ep == ip) {
|
||||
return host_id;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
std::optional<std::pair<locator::host_id, gms::inet_address>> hint_directory_manager::get_mapping(
|
||||
const locator::host_id& host_id, const gms::inet_address& ip) const noexcept
|
||||
{
|
||||
for (const auto& [hid, ep] : _mappings) {
|
||||
if (hid == host_id || ep == ip) {
|
||||
return std::make_pair(hid, ep);
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
void hint_directory_manager::remove_mapping(const locator::host_id& host_id) noexcept {
|
||||
_mappings.erase(host_id);
|
||||
}
|
||||
|
||||
void hint_directory_manager::remove_mapping(const gms::inet_address& ip) noexcept {
|
||||
for (const auto& [host_id, ep] : _mappings) {
|
||||
if (ep == ip) {
|
||||
_mappings.erase(host_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool hint_directory_manager::has_mapping(const locator::host_id& host_id) const noexcept {
|
||||
return _mappings.contains(host_id);
|
||||
}
|
||||
|
||||
bool hint_directory_manager::has_mapping(const gms::inet_address& ip) const noexcept {
|
||||
for (const auto& [_, ep] : _mappings) {
|
||||
if (ip == ep) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void hint_directory_manager::clear() noexcept {
|
||||
_mappings.clear();
|
||||
}
|
||||
|
||||
} // namespace internal
|
||||
} // namespace db::hints
|
||||
|
||||
@@ -46,41 +46,5 @@ using hint_entry_reader = commitlog_entry_reader;
|
||||
/// \return A future that resolves when the operation is complete.
|
||||
future<> rebalance_hints(std::filesystem::path hint_directory);
|
||||
|
||||
class hint_directory_manager {
|
||||
private:
|
||||
std::map<locator::host_id, gms::inet_address> _mappings;
|
||||
|
||||
public:
|
||||
// Inserts a new mapping and returns it.
|
||||
// If either the host ID or the IP is already in the map, the function inserts nothings
|
||||
// and returns the existing mapping instead.
|
||||
std::pair<locator::host_id, gms::inet_address> insert_mapping(const locator::host_id& host_id,
|
||||
const gms::inet_address& ip);
|
||||
|
||||
// Returns the corresponding IP for a given host ID if a mapping is present in the directory manager.
|
||||
// Otherwise, an empty optional is returned.
|
||||
[[nodiscard]] std::optional<gms::inet_address> get_mapping(const locator::host_id& host_id) const noexcept;
|
||||
|
||||
// Returns the corresponding host ID for a given IP if a mapping is present in the directory manager.
|
||||
// Otherwise, an empty optional is returned.
|
||||
[[nodiscard]] std::optional<locator::host_id> get_mapping(const gms::inet_address& ip) const noexcept;
|
||||
|
||||
// Returns a mapping corresponding to either the passed host ID, or the passed IP if the mapping exists.
|
||||
// Otherwise, an empty optional is returned.
|
||||
[[nodiscard]] std::optional<std::pair<locator::host_id, gms::inet_address>> get_mapping(
|
||||
const locator::host_id& host_id, const gms::inet_address& ip) const noexcept;
|
||||
|
||||
// Removes a mapping corresponding to the passed host ID if the mapping exists.
|
||||
void remove_mapping(const locator::host_id& host_id) noexcept;
|
||||
// Removes a mapping corresponding to the passed IP if the mapping exists.
|
||||
void remove_mapping(const gms::inet_address& ip) noexcept;
|
||||
|
||||
bool has_mapping(const locator::host_id& host_id) const noexcept;
|
||||
bool has_mapping(const gms::inet_address& ip) const noexcept;
|
||||
|
||||
// Removes all of the mappings.
|
||||
void clear() noexcept;
|
||||
};
|
||||
|
||||
} // namespace internal
|
||||
} // namespace db::hints
|
||||
|
||||
@@ -201,21 +201,10 @@ void manager::register_metrics(const sstring& group_name) {
|
||||
future<> manager::start(shared_ptr<const gms::gossiper> gossiper_ptr) {
|
||||
_gossiper_anchor = std::move(gossiper_ptr);
|
||||
|
||||
if (_proxy.features().host_id_based_hinted_handoff) {
|
||||
_uses_host_id = true;
|
||||
co_await migrate_ip_directories();
|
||||
}
|
||||
|
||||
co_await initialize_endpoint_managers();
|
||||
|
||||
co_await compute_hints_dir_device_id();
|
||||
set_started();
|
||||
|
||||
if (!_uses_host_id) {
|
||||
_migration_callback = _proxy.features().host_id_based_hinted_handoff.when_enabled([this] {
|
||||
_migrating_done = perform_migration();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<> manager::stop() {
|
||||
@@ -226,24 +215,19 @@ future<> manager::stop() {
|
||||
const auto& node = *_proxy.get_token_metadata_ptr()->get_topology().this_node();
|
||||
const bool leaving = node.is_leaving() || node.left();
|
||||
|
||||
return _migrating_done.finally([this, leaving] {
|
||||
// We want to stop the manager as soon as possible if it's not leaving the cluster.
|
||||
// Because of that, we need to cancel all ongoing drains (since that can take quite a bit of time),
|
||||
// but we also need to ensure that no new drains will be started in the meantime.
|
||||
if (!leaving) {
|
||||
for (auto& [_, ep_man] : _ep_managers) {
|
||||
ep_man.cancel_draining();
|
||||
}
|
||||
// We want to stop the manager as soon as possible if it's not leaving the cluster.
|
||||
// Because of that, we need to cancel all ongoing drains (since that can take quite a bit of time),
|
||||
// but we also need to ensure that no new drains will be started in the meantime.
|
||||
if (!leaving) {
|
||||
for (auto& [_, ep_man] : _ep_managers) {
|
||||
ep_man.cancel_draining();
|
||||
}
|
||||
return _draining_eps_gate.close();
|
||||
// At this point, all endpoint managers that were being previously drained have been deleted from the map.
|
||||
// In other words, the next lambda is safe to run, i.e. we won't call `hint_endpoint_manager::stop()` twice.
|
||||
}).finally([this] {
|
||||
}
|
||||
return _draining_eps_gate.close().finally([this] {
|
||||
return parallel_for_each(_ep_managers | std::views::values, [] (hint_endpoint_manager& ep_man) {
|
||||
return ep_man.stop(drain::no);
|
||||
}).finally([this] {
|
||||
_ep_managers.clear();
|
||||
_hint_directory_manager.clear();
|
||||
manager_logger.info("Shard hint manager has stopped");
|
||||
});
|
||||
});
|
||||
@@ -273,8 +257,7 @@ void manager::forbid_hints() {
|
||||
|
||||
void manager::forbid_hints_for_eps_with_pending_hints() {
|
||||
for (auto& [host_id, ep_man] : _ep_managers) {
|
||||
const auto ip = *_hint_directory_manager.get_mapping(host_id);
|
||||
if (has_ep_with_pending_hints(host_id) || has_ep_with_pending_hints(ip)) {
|
||||
if (has_ep_with_pending_hints(host_id)) {
|
||||
ep_man.forbid_hints();
|
||||
} else {
|
||||
ep_man.allow_hints();
|
||||
@@ -330,9 +313,6 @@ sync_point::shard_rps manager::calculate_current_sync_point(std::span<const loca
|
||||
}
|
||||
|
||||
future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_rps& rps) {
|
||||
// Prevent the migration to host-ID-based hinted handoff until this function finishes its execution.
|
||||
const auto shared_lock = co_await get_shared_lock(_migration_mutex);
|
||||
|
||||
abort_source local_as;
|
||||
|
||||
auto sub = as.subscribe([&local_as] () noexcept {
|
||||
@@ -398,24 +378,13 @@ future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_
|
||||
}
|
||||
}
|
||||
|
||||
hint_endpoint_manager& manager::get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip) {
|
||||
// If this is enabled, we can't rely on the information obtained from `_hint_directory_manager`.
|
||||
if (_uses_host_id) {
|
||||
if (auto it = _ep_managers.find(host_id); it != _ep_managers.end()) {
|
||||
return it->second;
|
||||
}
|
||||
} else {
|
||||
if (const auto maybe_mapping = _hint_directory_manager.get_mapping(host_id, ip)) {
|
||||
return _ep_managers.at(maybe_mapping->first);
|
||||
}
|
||||
|
||||
// If there is no mapping in `_hint_directory_manager` corresponding to either `host_id`, or `ip`,
|
||||
// we need to create a new endpoint manager.
|
||||
_hint_directory_manager.insert_mapping(host_id, ip);
|
||||
hint_endpoint_manager& manager::get_ep_manager(const endpoint_id& host_id) {
|
||||
if (auto it = _ep_managers.find(host_id); it != _ep_managers.end()) {
|
||||
return it->second;
|
||||
}
|
||||
|
||||
try {
|
||||
std::filesystem::path hint_directory = hints_dir() / (_uses_host_id ? fmt::to_string(host_id) : fmt::to_string(ip));
|
||||
std::filesystem::path hint_directory = hints_dir() / fmt::to_string(host_id);
|
||||
auto [it, _] = _ep_managers.emplace(host_id, hint_endpoint_manager{host_id, std::move(hint_directory), *this, _hints_sending_sched_group});
|
||||
hint_endpoint_manager& ep_man = it->second;
|
||||
|
||||
@@ -424,8 +393,7 @@ hint_endpoint_manager& manager::get_ep_manager(const endpoint_id& host_id, const
|
||||
|
||||
return ep_man;
|
||||
} catch (...) {
|
||||
manager_logger.warn("Starting a hint endpoint manager {}/{} has failed", host_id, ip);
|
||||
_hint_directory_manager.remove_mapping(host_id);
|
||||
manager_logger.warn("Starting a hint endpoint manager {} has failed", host_id);
|
||||
_ep_managers.erase(host_id);
|
||||
throw;
|
||||
}
|
||||
@@ -439,11 +407,8 @@ uint64_t manager::max_size_of_hints_in_progress() const noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
bool manager::have_ep_manager(const std::variant<locator::host_id, gms::inet_address>& ep) const noexcept {
|
||||
if (std::holds_alternative<locator::host_id>(ep)) {
|
||||
return _ep_managers.contains(std::get<locator::host_id>(ep));
|
||||
}
|
||||
return _hint_directory_manager.has_mapping(std::get<gms::inet_address>(ep));
|
||||
bool manager::have_ep_manager(locator::host_id ep) const noexcept {
|
||||
return _ep_managers.contains(ep);
|
||||
}
|
||||
|
||||
bool manager::store_hint(endpoint_id host_id, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
|
||||
@@ -461,13 +426,11 @@ bool manager::store_hint(endpoint_id host_id, schema_ptr s, lw_shared_ptr<const
|
||||
return false;
|
||||
}
|
||||
|
||||
auto ip = _gossiper_anchor->get_address_map().get(host_id);
|
||||
|
||||
try {
|
||||
manager_logger.trace("Going to store a hint to {}", host_id);
|
||||
tracing::trace(tr_state, "Going to store a hint to {}", host_id);
|
||||
|
||||
return get_ep_manager(host_id, ip).store_hint(std::move(s), std::move(fm), tr_state);
|
||||
return get_ep_manager(host_id).store_hint(std::move(s), std::move(fm), tr_state);
|
||||
} catch (...) {
|
||||
manager_logger.trace("Failed to store a hint to {}: {}", host_id, std::current_exception());
|
||||
tracing::trace(tr_state, "Failed to store a hint to {}: {}", host_id, std::current_exception());
|
||||
@@ -487,10 +450,6 @@ bool manager::too_many_in_flight_hints_for(endpoint_id ep) const noexcept {
|
||||
}
|
||||
|
||||
bool manager::can_hint_for(endpoint_id ep) const noexcept {
|
||||
if (_state.contains(state::migrating)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_proxy.local_db().get_token_metadata().get_topology().is_me(ep)) {
|
||||
return false;
|
||||
}
|
||||
@@ -549,47 +508,32 @@ future<> manager::change_host_filter(host_filter filter) {
|
||||
std::exception_ptr eptr = nullptr;
|
||||
|
||||
try {
|
||||
const auto tmptr = _proxy.get_token_metadata_ptr();
|
||||
|
||||
// Iterate over existing hint directories and see if we can enable an endpoint manager
|
||||
// for some of them
|
||||
co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of<directory_entry_type::directory>(),
|
||||
[&] (fs::path datadir, directory_entry de) -> future<> {
|
||||
using pair_type = std::pair<locator::host_id, gms::inet_address>;
|
||||
|
||||
const auto maybe_host_id_and_ip = std::invoke([&] () -> std::optional<pair_type> {
|
||||
const auto maybe_host_id = std::invoke([&] () -> std::optional<locator::host_id> {
|
||||
try {
|
||||
locator::host_id_or_endpoint hid_or_ep{de.name};
|
||||
|
||||
// If hinted handoff is host-ID-based, hint directories representing IP addresses must've
|
||||
// been created by mistake and they're invalid. The same for pre-host-ID hinted handoff
|
||||
// -- hint directories representing host IDs are NOT valid.
|
||||
if (hid_or_ep.has_host_id() && _uses_host_id) {
|
||||
return std::make_optional(pair_type{hid_or_ep.id(), hid_or_ep.resolve_endpoint(*_gossiper_anchor)});
|
||||
} else if (hid_or_ep.has_endpoint() && !_uses_host_id) {
|
||||
return std::make_optional(pair_type{hid_or_ep.resolve_id(*_gossiper_anchor), hid_or_ep.endpoint()});
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
return locator::host_id(utils::UUID(de.name));
|
||||
} catch (...) {
|
||||
return std::nullopt;
|
||||
}
|
||||
});
|
||||
|
||||
if (!maybe_host_id_and_ip) {
|
||||
if (!maybe_host_id) {
|
||||
manager_logger.warn("Encountered a hint directory of invalid name while changing the host filter: {}. "
|
||||
"Hints stored in it won't be replayed.", de.name);
|
||||
co_return;
|
||||
}
|
||||
|
||||
const auto& topology = _proxy.get_token_metadata_ptr()->get_topology();
|
||||
const auto& [host_id, ip] = *maybe_host_id_and_ip;
|
||||
const auto& host_id = *maybe_host_id;
|
||||
|
||||
if (_ep_managers.contains(host_id) || !_host_filter.can_hint_for(topology, host_id)) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await get_ep_manager(host_id, ip).populate_segments_to_replay();
|
||||
co_await get_ep_manager(host_id).populate_segments_to_replay();
|
||||
});
|
||||
} catch (...) {
|
||||
// Revert the changes in the filter. The code below will stop the additional managers
|
||||
@@ -609,7 +553,6 @@ future<> manager::change_host_filter(host_filter filter) {
|
||||
|
||||
return ep_man.stop(drain::no).finally([this, ep] {
|
||||
_ep_managers.erase(ep);
|
||||
_hint_directory_manager.remove_mapping(ep);
|
||||
});
|
||||
});
|
||||
} catch (...) {
|
||||
@@ -640,23 +583,20 @@ bool manager::check_dc_for(endpoint_id ep) const noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
future<> manager::drain_for(endpoint_id host_id, gms::inet_address ip) noexcept {
|
||||
future<> manager::drain_for(endpoint_id host_id) noexcept {
|
||||
if (!started() || stopping() || draining_all()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (!replay_allowed()) {
|
||||
auto reason = seastar::format("Precondition violdated while trying to drain {} / {}: "
|
||||
"hint replay is not allowed", host_id, ip);
|
||||
auto reason = seastar::format("Precondition violated while trying to drain {}: "
|
||||
"hint replay is not allowed", host_id);
|
||||
on_internal_error(manager_logger, std::move(reason));
|
||||
}
|
||||
|
||||
manager_logger.info("Draining starts for {}", host_id);
|
||||
|
||||
const auto holder = seastar::gate::holder{_draining_eps_gate};
|
||||
// As long as we hold on to this lock, no migration of hinted handoff to host IDs
|
||||
// can be being performed because `manager::perform_migration()` takes it
|
||||
// at the beginning of its execution too.
|
||||
const auto sem_unit = co_await seastar::get_units(_drain_lock, 1);
|
||||
|
||||
// After an endpoint has been drained, we remove its directory with all of its contents.
|
||||
@@ -698,39 +638,20 @@ future<> manager::drain_for(endpoint_id host_id, gms::inet_address ip) noexcept
|
||||
}
|
||||
|
||||
_ep_managers.clear();
|
||||
_hint_directory_manager.clear();
|
||||
} else {
|
||||
const auto maybe_host_id = std::invoke([&] () -> std::optional<locator::host_id> {
|
||||
if (_uses_host_id) {
|
||||
return host_id;
|
||||
}
|
||||
// Before the whole cluster is migrated to the host-ID-based hinted handoff,
|
||||
// one hint directory may correspond to multiple target nodes. If *any* of them
|
||||
// leaves the cluster, we should drain the hint directory. This is why we need
|
||||
// to rely on this mapping here.
|
||||
const auto maybe_mapping = _hint_directory_manager.get_mapping(host_id, ip);
|
||||
if (maybe_mapping) {
|
||||
return maybe_mapping->first;
|
||||
}
|
||||
return std::nullopt;
|
||||
});
|
||||
auto it = _ep_managers.find(host_id);
|
||||
|
||||
if (maybe_host_id) {
|
||||
auto it = _ep_managers.find(*maybe_host_id);
|
||||
|
||||
if (it != _ep_managers.end()) {
|
||||
try {
|
||||
co_await drain_ep_manager(it->second);
|
||||
} catch (...) {
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
|
||||
// We can't provide the function with `it` here because we co_await above,
|
||||
// so iterators could have been invalidated.
|
||||
// This never throws.
|
||||
_ep_managers.erase(*maybe_host_id);
|
||||
_hint_directory_manager.remove_mapping(*maybe_host_id);
|
||||
if (it != _ep_managers.end()) {
|
||||
try {
|
||||
co_await drain_ep_manager(it->second);
|
||||
} catch (...) {
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
|
||||
// We can't provide the function with `it` here because we co_await above,
|
||||
// so iterators could have been invalidated.
|
||||
// This never throws.
|
||||
_ep_managers.erase(host_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -749,240 +670,34 @@ void manager::update_backlog(size_t backlog, size_t max_backlog) {
|
||||
}
|
||||
}
|
||||
|
||||
future<> manager::with_file_update_mutex_for(const std::variant<locator::host_id, gms::inet_address>& ep,
|
||||
future<> manager::with_file_update_mutex_for(locator::host_id ep,
|
||||
noncopyable_function<future<> ()> func) {
|
||||
const locator::host_id host_id = std::invoke([&] {
|
||||
if (std::holds_alternative<locator::host_id>(ep)) {
|
||||
return std::get<locator::host_id>(ep);
|
||||
}
|
||||
return *_hint_directory_manager.get_mapping(std::get<gms::inet_address>(ep));
|
||||
});
|
||||
return _ep_managers.at(host_id).with_file_update_mutex(std::move(func));
|
||||
return _ep_managers.at(ep).with_file_update_mutex(std::move(func));
|
||||
}
|
||||
|
||||
future<> manager::initialize_endpoint_managers() {
|
||||
auto maybe_create_ep_mgr = [this] (const locator::host_id& host_id, const gms::inet_address& ip) -> future<> {
|
||||
if (!check_dc_for(host_id)) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await get_ep_manager(host_id, ip).populate_segments_to_replay();
|
||||
};
|
||||
|
||||
// We dispatch here to not hold on to the token metadata if hinted handoff is host-ID-based.
|
||||
// In that case, there are no directories that represent IP addresses, so we won't need to use it.
|
||||
// We want to avoid a situation when topology changes are prevented while we hold on to this pointer.
|
||||
const auto tmptr = _uses_host_id ? nullptr : _proxy.get_token_metadata_ptr();
|
||||
|
||||
co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of<directory_entry_type::directory>(),
|
||||
[&] (fs::path directory, directory_entry de) -> future<> {
|
||||
auto maybe_host_id_or_ep = std::invoke([&] () -> std::optional<locator::host_id_or_endpoint> {
|
||||
[this] (fs::path directory, directory_entry de) -> future<> {
|
||||
auto maybe_host_id = std::invoke([&] () -> std::optional<locator::host_id> {
|
||||
try {
|
||||
return locator::host_id_or_endpoint{de.name};
|
||||
} catch (...) {
|
||||
// The name represents neither an IP address, nor a host ID.
|
||||
return std::nullopt;
|
||||
}
|
||||
});
|
||||
|
||||
// The directory is invalid, so there's nothing more to do.
|
||||
if (!maybe_host_id_or_ep) {
|
||||
manager_logger.warn("Encountered a hint directory of invalid name while initializing endpoint managers: {}. "
|
||||
"Hints stored in it won't be replayed", de.name);
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (_uses_host_id) {
|
||||
// If hinted handoff is host-ID-based but the directory doesn't represent a host ID,
|
||||
// it's invalid. Ignore it.
|
||||
if (!maybe_host_id_or_ep->has_host_id()) {
|
||||
manager_logger.warn("Encountered a hint directory of invalid name while initializing endpoint managers: {}. "
|
||||
"Hints stored in it won't be replayed", de.name);
|
||||
co_return;
|
||||
}
|
||||
|
||||
// If hinted handoff is host-ID-based, `get_ep_manager` will NOT use the passed IP address,
|
||||
// so we simply pass the default value there.
|
||||
co_return co_await maybe_create_ep_mgr(maybe_host_id_or_ep->id(), gms::inet_address{});
|
||||
}
|
||||
|
||||
// If we have got to this line, hinted handoff is still IP-based and we need to map the IP.
|
||||
|
||||
if (!maybe_host_id_or_ep->has_endpoint()) {
|
||||
// If the directory name doesn't represent an IP, it's invalid. We ignore it.
|
||||
manager_logger.warn("Encountered a hint directory of invalid name while initializing endpoint managers: {}. "
|
||||
"Hints stored in it won't be replayed", de.name);
|
||||
co_return;
|
||||
}
|
||||
|
||||
const auto maybe_host_id = std::invoke([&] () -> std::optional<locator::host_id> {
|
||||
try {
|
||||
return maybe_host_id_or_ep->resolve_id(*_gossiper_anchor);
|
||||
return locator::host_id(utils::UUID(de.name));
|
||||
} catch (...) {
|
||||
return std::nullopt;
|
||||
}
|
||||
});
|
||||
|
||||
if (!maybe_host_id) {
|
||||
manager_logger.warn("Encountered a hint directory of invalid name while initializing endpoint managers: {}. "
|
||||
"Hints stored in it won't be replayed", de.name);
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await maybe_create_ep_mgr(*maybe_host_id, maybe_host_id_or_ep->endpoint());
|
||||
});
|
||||
}
|
||||
|
||||
// This function assumes that the hint directory is NOT modified as long as this function is being executed.
|
||||
future<> manager::migrate_ip_directories() {
|
||||
std::vector<sstring> hint_directories{};
|
||||
|
||||
// Step 1. Gather the names of the hint directories.
|
||||
co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of<directory_entry_type::directory>(),
|
||||
[&] (std::filesystem::path, directory_entry de) -> future<> {
|
||||
hint_directories.push_back(std::move(de.name));
|
||||
co_return;
|
||||
});
|
||||
|
||||
struct hint_dir_mapping {
|
||||
sstring current_name;
|
||||
sstring new_name;
|
||||
};
|
||||
|
||||
std::vector<hint_dir_mapping> dirs_to_rename{};
|
||||
std::vector<std::filesystem::path> dirs_to_remove{};
|
||||
|
||||
/* RAII lock for token metadata */ {
|
||||
// We need to keep the topology consistent throughout the loop below to
|
||||
// ensure that, for example, two different IPs won't be mapped to
|
||||
// the same host ID.
|
||||
//
|
||||
// We don't want to hold on to this pointer for longer than necessary.
|
||||
// Topology changes might be postponed otherwise.
|
||||
auto tmptr = _proxy.get_token_metadata_ptr();
|
||||
|
||||
// Step 2. Obtain mappings IP -> host ID for the directories.
|
||||
for (auto& directory : hint_directories) {
|
||||
try {
|
||||
locator::host_id_or_endpoint hid_or_ep{directory};
|
||||
|
||||
// If the directory's name already represents a host ID, there is nothing to do.
|
||||
if (hid_or_ep.has_host_id()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const locator::host_id host_id = hid_or_ep.resolve_id(*_gossiper_anchor);
|
||||
dirs_to_rename.push_back({.current_name = std::move(directory), .new_name = host_id.to_sstring()});
|
||||
} catch (...) {
|
||||
// We cannot map the IP to the corresponding host ID either because
|
||||
// the relevant mapping doesn't exist anymore or an error occurred. Drop it.
|
||||
//
|
||||
// We only care about directories named after IPs during an upgrade,
|
||||
// so we don't want to make this more complex than necessary.
|
||||
manager_logger.warn("No mapping IP-host ID for hint directory {}. It is going to be removed", directory);
|
||||
dirs_to_remove.push_back(_hints_dir / std::move(directory));
|
||||
}
|
||||
if (!check_dc_for(*maybe_host_id)) {
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
|
||||
// We don't need this memory anymore. The only remaining elements are the names of the directories
|
||||
// that already represent valid host IDs. We won't do anything with them. The rest have been moved
|
||||
// to either `dirs_to_rename` or `dirs_to_remove`.
|
||||
hint_directories.clear();
|
||||
|
||||
// Step 3. Try to rename the directories.
|
||||
co_await coroutine::parallel_for_each(dirs_to_rename, [&] (auto& mapping) -> future<> {
|
||||
std::filesystem::path old_name = _hints_dir / std::move(mapping.current_name);
|
||||
std::filesystem::path new_name = _hints_dir / std::move(mapping.new_name);
|
||||
|
||||
try {
|
||||
manager_logger.info("Renaming hint directory {} to {}", old_name.native(), new_name.native());
|
||||
co_await rename_file(old_name.native(), new_name.native());
|
||||
} catch (...) {
|
||||
manager_logger.warn("Renaming directory {} to {} has failed: {}",
|
||||
old_name.native(), new_name.native(), std::current_exception());
|
||||
dirs_to_remove.push_back(std::move(old_name));
|
||||
}
|
||||
co_await get_ep_manager(*maybe_host_id).populate_segments_to_replay();
|
||||
});
|
||||
|
||||
// Step 4. Remove directories that don't represent host IDs.
|
||||
co_await coroutine::parallel_for_each(dirs_to_remove, [] (auto& directory) -> future<> {
|
||||
try {
|
||||
manager_logger.warn("Removing hint directory {}", directory.native());
|
||||
co_await seastar::recursive_remove_directory(directory);
|
||||
} catch (...) {
|
||||
on_internal_error(manager_logger,
|
||||
seastar::format("Removing a hint directory has failed. Reason: {}", std::current_exception()));
|
||||
}
|
||||
});
|
||||
|
||||
co_await io_check(sync_directory, _hints_dir.native());
|
||||
}
|
||||
|
||||
future<> manager::perform_migration() {
|
||||
// This function isn't marked as noexcept, but the only parts of the code that
|
||||
// can throw an exception are:
|
||||
// 1. the call to `migrate_ip_directories()`: if we fail there, the failure is critical.
|
||||
// It doesn't lead to any data corruption, but the node must be stopped;
|
||||
// 2. the re-initialization of the endpoint managers: a failure there is the same failure
|
||||
// that can happen when starting a node. It may be seen as critical, but it should only
|
||||
// boil down to not initializing some of the endpoint managers. No data corruption
|
||||
// is possible.
|
||||
if (_state.contains(state::stopping) || _state.contains(state::draining_all)) {
|
||||
// It's possible the cluster feature is enabled right after the local node decides
|
||||
// to leave the cluster. In that case, the migration callback might still potentially
|
||||
// be called, but we don't want to perform it. We need to stop the node as soon as possible.
|
||||
//
|
||||
// The `state::draining_all` case is more tricky. The semantics of self-draining is not
|
||||
// specified, but based on the description of the state in the header file, it means
|
||||
// the node is leaving the cluster and it works like that indeed, so we apply the same reasoning.
|
||||
co_return;
|
||||
}
|
||||
|
||||
manager_logger.info("Migration of hinted handoff to host ID is starting");
|
||||
// Step 1. Prevent accepting incoming hints.
|
||||
_state.set(state::migrating);
|
||||
|
||||
// Step 2. Make sure during the migration there is no draining process and we don't await any sync points.
|
||||
|
||||
// We're taking this lock for two reasons:
|
||||
// 1. we're waiting for the ongoing drains to finish so that there's no data race,
|
||||
// 2. we suspend new drain requests -- to prevent data races.
|
||||
const auto lock = co_await seastar::get_units(_drain_lock, 1);
|
||||
|
||||
// We're taking this lock because we're about to stop endpoint managers here, whereas
|
||||
// `manager::wait_for_sync_point` browses them and awaits their corresponding sync points.
|
||||
// If we stop them during that process, that function will get exceptions.
|
||||
//
|
||||
// Although in the current implementation there is no danger of race conditions
|
||||
// (or at least race conditions that could be harmful in any way), it's better
|
||||
// to avoid them anyway. Hence this lock.
|
||||
const auto unique_lock = co_await get_unique_lock(_migration_mutex);
|
||||
// Step 3. Stop endpoint managers. We will modify the hint directory contents, so this is necessary.
|
||||
co_await coroutine::parallel_for_each(_ep_managers | std::views::values, [] (auto& ep_manager) -> future<> {
|
||||
return ep_manager.stop(drain::no);
|
||||
});
|
||||
|
||||
// Step 4. Prevent resource manager from scanning the hint directory. Race conditions are unacceptable.
|
||||
auto resource_manager_lock = co_await seastar::get_units(_resource_manager.update_lock(), 1);
|
||||
|
||||
// Once the resource manager cannot scan anything anymore, we can safely get rid of these.
|
||||
_ep_managers.clear();
|
||||
_eps_with_pending_hints.clear();
|
||||
|
||||
// We won't need this anymore.
|
||||
_hint_directory_manager.clear();
|
||||
|
||||
// Step 5. Rename the hint directories so that those that remain all represent valid host IDs.
|
||||
co_await migrate_ip_directories();
|
||||
_uses_host_id = true;
|
||||
|
||||
// Step 6. Make resource manager scan the hint directory again.
|
||||
resource_manager_lock.return_all();
|
||||
// Step 7. Start accepting incoming hints again.
|
||||
_state.remove(state::migrating);
|
||||
// Step 8. Once resource manager is working again, endpoint managers can be safely recreated.
|
||||
// We won't modify the contents of the hint directory anymore.
|
||||
co_await initialize_endpoint_managers();
|
||||
manager_logger.info("Migration of hinted handoff to host ID has finished successfully");
|
||||
}
|
||||
|
||||
// Technical note: This function obviously doesn't need to be a coroutine. However, it's better to impose
|
||||
@@ -992,7 +707,7 @@ future<> manager::drain_left_nodes() {
|
||||
for (const auto& [host_id, ep_man] : _ep_managers) {
|
||||
if (!_proxy.get_token_metadata_ptr()->is_normal_token_owner(host_id)) {
|
||||
// It's safe to discard this future. It's awaited in `manager::stop()`.
|
||||
(void) drain_for(host_id, {});
|
||||
(void) drain_for(host_id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_mutex.hh>
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
|
||||
// Scylla includes.
|
||||
@@ -76,13 +75,8 @@ private:
|
||||
using hint_endpoint_manager = internal::hint_endpoint_manager;
|
||||
using node_to_hint_store_factory_type = internal::node_to_hint_store_factory_type;
|
||||
|
||||
using hint_directory_manager = internal::hint_directory_manager;
|
||||
|
||||
enum class state {
|
||||
started, // Hinting is currently allowed (start() has completed).
|
||||
migrating, // The hint manager is being migrated from using IPs to name
|
||||
// hint directories to using host IDs for that purpose. No new
|
||||
// incoming hints will be accepted as long as this is the state.
|
||||
replay_allowed, // Replaying (sending) hints is allowed.
|
||||
draining_all, // Accepting new hints is not allowed. All endpoint managers
|
||||
// are being drained because the node is leaving the cluster.
|
||||
@@ -92,7 +86,6 @@ private:
|
||||
|
||||
using state_set = enum_set<super_enum<state,
|
||||
state::started,
|
||||
state::migrating,
|
||||
state::replay_allowed,
|
||||
state::draining_all,
|
||||
state::stopping>>;
|
||||
@@ -122,38 +115,14 @@ private:
|
||||
|
||||
std::unordered_map<endpoint_id, hint_endpoint_manager> _ep_managers;
|
||||
|
||||
// This is ONLY used when `_uses_host_id` is false. Otherwise, this map should stay EMPTY.
|
||||
//
|
||||
// Invariants:
|
||||
// (1) there is an endpoint manager in `_ep_managers` identified by host ID `H` if an only if
|
||||
// there is a mapping corresponding to `H` in `_hint_directory_manager`,
|
||||
// (2) a hint directory representing an IP address `I` is managed by an endpoint manager
|
||||
// if and only if there is a mapping corresponding to `I` in `_hint_directory_manager`.
|
||||
hint_directory_manager _hint_directory_manager;
|
||||
|
||||
hint_stats _stats;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
scheduling_group _hints_sending_sched_group;
|
||||
|
||||
// We need to keep a variant here. Before migrating hinted handoff to using host ID, hint directories will
|
||||
// still represent IP addresses. But after the migration, they will start representing host IDs.
|
||||
// We need to handle either case.
|
||||
//
|
||||
// It's especially important when dealing with the scenario when there is an IP directory, but there is
|
||||
// no mapping for in locator::token_metadata. Since we sometimes have to save a directory like that
|
||||
// in this set as well, this variant is necessary.
|
||||
std::unordered_set<std::variant<locator::host_id, gms::inet_address>> _eps_with_pending_hints;
|
||||
std::unordered_set<locator::host_id> _eps_with_pending_hints;
|
||||
|
||||
seastar::named_semaphore _drain_lock = {1, named_semaphore_exception_factory{"drain lock"}};
|
||||
|
||||
bool _uses_host_id = false;
|
||||
std::any _migration_callback = std::nullopt;
|
||||
future<> _migrating_done = make_ready_future();
|
||||
|
||||
// Unique lock if and only if there is an ongoing migration to the host-ID-based hinted handoff.
|
||||
// Shared lock if and only if there is a fiber already executing `manager::wait_for_sync_point`.
|
||||
seastar::shared_mutex _migration_mutex{};
|
||||
|
||||
public:
|
||||
manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter,
|
||||
int64_t max_hint_window_ms, resource_manager& res_manager, sharded<replica::database>& db, scheduling_group sg);
|
||||
@@ -217,7 +186,7 @@ public:
|
||||
///
|
||||
/// \param ep endpoint whose file update mutex should be locked
|
||||
/// \param func functor to be executed
|
||||
future<> with_file_update_mutex_for(const std::variant<locator::host_id, gms::inet_address>& ep,
|
||||
future<> with_file_update_mutex_for(locator::host_id ep,
|
||||
noncopyable_function<future<> ()> func);
|
||||
|
||||
/// \brief Checks if hints are disabled for all endpoints
|
||||
@@ -242,7 +211,7 @@ public:
|
||||
return it->second.hints_in_progress();
|
||||
}
|
||||
|
||||
void add_ep_with_pending_hints(const std::variant<locator::host_id, gms::inet_address>& key) {
|
||||
void add_ep_with_pending_hints(locator::host_id key) {
|
||||
_eps_with_pending_hints.insert(key);
|
||||
}
|
||||
|
||||
@@ -251,7 +220,7 @@ public:
|
||||
_eps_with_pending_hints.reserve(_ep_managers.size());
|
||||
}
|
||||
|
||||
bool has_ep_with_pending_hints(const std::variant<locator::host_id, gms::inet_address>& key) const {
|
||||
bool has_ep_with_pending_hints(locator::host_id key) const {
|
||||
return _eps_with_pending_hints.contains(key);
|
||||
}
|
||||
|
||||
@@ -303,12 +272,12 @@ private:
|
||||
return _local_db;
|
||||
}
|
||||
|
||||
hint_endpoint_manager& get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip);
|
||||
hint_endpoint_manager& get_ep_manager(const endpoint_id& host_id);
|
||||
|
||||
uint64_t max_size_of_hints_in_progress() const noexcept;
|
||||
|
||||
public:
|
||||
bool have_ep_manager(const std::variant<locator::host_id, gms::inet_address>& ep) const noexcept;
|
||||
bool have_ep_manager(locator::host_id ep) const noexcept;
|
||||
|
||||
public:
|
||||
/// \brief Initiate the draining when we detect that the node has left the cluster.
|
||||
@@ -324,15 +293,10 @@ public:
|
||||
/// the execution of this function.
|
||||
///
|
||||
/// \param host_id host ID of the node that left the cluster
|
||||
/// \param ip the IP of the node that left the cluster
|
||||
future<> drain_for(endpoint_id host_id, gms::inet_address ip) noexcept;
|
||||
future<> drain_for(endpoint_id host_id) noexcept;
|
||||
|
||||
void update_backlog(size_t backlog, size_t max_backlog);
|
||||
|
||||
bool uses_host_id() const noexcept {
|
||||
return _uses_host_id;
|
||||
}
|
||||
|
||||
private:
|
||||
bool stopping() const noexcept {
|
||||
return _state.contains(state::stopping);
|
||||
@@ -368,26 +332,6 @@ private:
|
||||
/// in locator::topology, creates an endpoint manager.
|
||||
future<> initialize_endpoint_managers();
|
||||
|
||||
/// Renames host directories named after IPs to host IDs.
|
||||
///
|
||||
/// In the past, hosts were identified by their IPs. Now we use host IDs for that purpose,
|
||||
/// but we want to ensure that old hints don't get lost if possible. This function serves
|
||||
/// this purpose. It's only necessary when upgrading Scylla.
|
||||
///
|
||||
/// This function should ONLY be called by `manager::start()` and `manager::perform_migration()`.
|
||||
///
|
||||
/// Calling this function again while the previous call has not yet finished
|
||||
/// is undefined behavior.
|
||||
future<> migrate_ip_directories();
|
||||
|
||||
/// Migrates this hint manager to using host IDs, i.e. when a call to this function ends,
|
||||
/// the names of hint directories will start being represented by host IDs instead of IPs.
|
||||
///
|
||||
/// This function suspends hinted handoff throughout its execution. Among other consequences,
|
||||
/// ALL requested sync points will be canceled, i.e. an exception will be issued
|
||||
/// in the corresponding futures.
|
||||
future<> perform_migration();
|
||||
|
||||
public:
|
||||
/// Performs draining for all nodes that have already left the cluster.
|
||||
/// This should only be called when the hint endpoint managers have been initialized
|
||||
|
||||
@@ -7,8 +7,6 @@
|
||||
*/
|
||||
|
||||
#include "resource_manager.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "manager.hh"
|
||||
#include "utils/log.hh"
|
||||
#include <boost/range/algorithm/for_each.hpp>
|
||||
@@ -94,7 +92,7 @@ future<> space_watchdog::stop() noexcept {
|
||||
|
||||
// Called under the end_point_hints_manager::file_update_mutex() of the corresponding end_point_hints_manager instance.
|
||||
future<> space_watchdog::scan_one_ep_dir(fs::path path, manager& shard_manager,
|
||||
std::optional<std::variant<locator::host_id, gms::inet_address>> maybe_ep_key) {
|
||||
std::optional<locator::host_id> maybe_ep_key) {
|
||||
// It may happen that we get here and the directory has already been deleted in the context of manager::drain_for().
|
||||
// In this case simply bail out.
|
||||
if (!co_await file_exists(path.native())) {
|
||||
@@ -145,39 +143,26 @@ void space_watchdog::on_timer() {
|
||||
// not hintable).
|
||||
// If exists - let's take a file update lock so that files are not changed under our feet. Otherwise, simply
|
||||
// continue to enumeration - there is no one to change them.
|
||||
auto maybe_variant = std::invoke([&] () -> std::optional<std::variant<locator::host_id, gms::inet_address>> {
|
||||
auto maybe_host_id = std::invoke([&] () -> std::optional<locator::host_id> {
|
||||
try {
|
||||
const auto hid_or_ep = locator::host_id_or_endpoint{de.name};
|
||||
|
||||
// If hinted handoff is host-ID-based, hint directories representing IP addresses must've
|
||||
// been created by mistake and they're invalid. The same for pre-host-ID hinted handoff
|
||||
// -- hint directories representing host IDs are NOT valid.
|
||||
if (hid_or_ep.has_host_id() && shard_manager.uses_host_id()) {
|
||||
return std::variant<locator::host_id, gms::inet_address>(hid_or_ep.id());
|
||||
} else if (hid_or_ep.has_endpoint() && !shard_manager.uses_host_id()) {
|
||||
return std::variant<locator::host_id, gms::inet_address>(hid_or_ep.endpoint());
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
return locator::host_id(utils::UUID(de.name));
|
||||
} catch (...) {
|
||||
return std::nullopt;
|
||||
}
|
||||
});
|
||||
|
||||
// Case 1: The directory is managed by an endpoint manager.
|
||||
if (maybe_variant && shard_manager.have_ep_manager(*maybe_variant)) {
|
||||
const auto variant = *maybe_variant;
|
||||
return shard_manager.with_file_update_mutex_for(variant, [this, variant, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)] () mutable {
|
||||
return scan_one_ep_dir(dir / ep_name, shard_manager, variant);
|
||||
if (maybe_host_id && shard_manager.have_ep_manager(*maybe_host_id)) {
|
||||
const auto host_id = *maybe_host_id;
|
||||
return shard_manager.with_file_update_mutex_for(host_id, [this, host_id, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)] () mutable {
|
||||
return scan_one_ep_dir(dir / ep_name, shard_manager, host_id);
|
||||
});
|
||||
}
|
||||
// Case 2: The directory isn't managed by an endpoint manager, but it represents either an IP address,
|
||||
// or a host ID.
|
||||
else if (maybe_variant) {
|
||||
return scan_one_ep_dir(dir / de.name, shard_manager, *maybe_variant);
|
||||
// Case 2: The directory isn't managed by an endpoint manager, but it represents a host ID.
|
||||
else if (maybe_host_id) {
|
||||
return scan_one_ep_dir(dir / de.name, shard_manager, *maybe_host_id);
|
||||
}
|
||||
// Case 3: The directory isn't managed by an endpoint manager, and it represents neither an IP address,
|
||||
// nor a host ID.
|
||||
// Case 3: The directory isn't managed by an endpoint manager, and it doesn't represent a host ID.
|
||||
else {
|
||||
// We use trace here to prevent flooding logs with unnecessary information.
|
||||
resource_manager_logger.trace("Encountered a hint directory of invalid name while scanning: {}", de.name);
|
||||
|
||||
@@ -114,7 +114,7 @@ private:
|
||||
/// \param maybe_ep_key endpoint ID corresponding to the scanned directory
|
||||
/// \return future that resolves when scanning is complete
|
||||
future<> scan_one_ep_dir(fs::path path, manager& shard_manager,
|
||||
std::optional<std::variant<locator::host_id, gms::inet_address>> maybe_ep_key);
|
||||
std::optional<locator::host_id> maybe_ep_key);
|
||||
};
|
||||
|
||||
class resource_manager {
|
||||
|
||||
@@ -110,6 +110,7 @@ std::set<std::string_view> feature_service::supported_feature_set() const {
|
||||
"GROUP0_SCHEMA_VERSIONING"sv,
|
||||
"VIEW_BUILD_STATUS_ON_GROUP0"sv,
|
||||
"CDC_GENERATIONS_V2"sv,
|
||||
"HOST_ID_BASED_HINTED_HANDOFF"sv,
|
||||
};
|
||||
|
||||
if (is_test_only_feature_deprecated()) {
|
||||
|
||||
@@ -116,7 +116,6 @@ public:
|
||||
gms::feature tablets { *this, "TABLETS"sv };
|
||||
gms::feature table_digest_insensitive_to_expiry { *this, "TABLE_DIGEST_INSENSITIVE_TO_EXPIRY"sv };
|
||||
gms::feature supports_consistent_topology_changes { *this, "SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES"sv };
|
||||
gms::feature host_id_based_hinted_handoff { *this, "HOST_ID_BASED_HINTED_HANDOFF"sv };
|
||||
gms::feature topology_requests_type_column { *this, "TOPOLOGY_REQUESTS_TYPE_COLUMN"sv };
|
||||
gms::feature native_reverse_queries { *this, "NATIVE_REVERSE_QUERIES"sv };
|
||||
gms::feature zero_token_nodes { *this, "ZERO_TOKEN_NODES"sv };
|
||||
|
||||
@@ -7410,24 +7410,6 @@ future<> storage_proxy::wait_for_hint_sync_point(const db::hints::sync_point spo
|
||||
co_return;
|
||||
}
|
||||
|
||||
void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) {
|
||||
// Discarding these futures is safe. They're awaited by db::hints::manager::stop().
|
||||
//
|
||||
// Hint replay must be allowed throughout the execution of `drain_for`
|
||||
// (it's a precondition of it). Once enabled in `main.cc`, it stays true
|
||||
// throughout the life of the node.
|
||||
//
|
||||
// Note that if we don't perform draining here because hint replay is not
|
||||
// allowed yet, it'll be conducted by a call to `db::hints::manager::drain_left_nodes()`,
|
||||
// which is called by `main.cc` after hint replay is turned on.
|
||||
if (_hints_manager.replay_allowed() && !_hints_manager.uses_host_id()) {
|
||||
(void) _hints_manager.drain_for(hid, endpoint);
|
||||
}
|
||||
if (_hints_for_views_manager.replay_allowed() && !_hints_for_views_manager.uses_host_id()) {
|
||||
(void) _hints_for_views_manager.drain_for(hid, endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
void storage_proxy::on_released(const locator::host_id& hid) {
|
||||
// Discarding these futures is safe. They're awaited by db::hints::manager::stop().
|
||||
//
|
||||
@@ -7438,11 +7420,11 @@ void storage_proxy::on_released(const locator::host_id& hid) {
|
||||
// Note that if we don't perform draining here because hint replay is not
|
||||
// allowed yet, it'll be conducted by a call to `db::hints::manager::drain_left_nodes()`,
|
||||
// which is called by `main.cc` after hint replay is turned on.
|
||||
if (_hints_manager.replay_allowed() && _hints_manager.uses_host_id()) {
|
||||
(void) _hints_manager.drain_for(hid, {});
|
||||
if (_hints_manager.replay_allowed()) {
|
||||
(void) _hints_manager.drain_for(hid);
|
||||
}
|
||||
if (_hints_for_views_manager.replay_allowed() && _hints_for_views_manager.uses_host_id()) {
|
||||
(void) _hints_for_views_manager.drain_for(hid, {});
|
||||
if (_hints_for_views_manager.replay_allowed()) {
|
||||
(void) _hints_for_views_manager.drain_for(hid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -885,7 +885,6 @@ public:
|
||||
return _stale_pending_writes.get_future();
|
||||
}
|
||||
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) override;
|
||||
virtual void on_released(const locator::host_id& hid) override;
|
||||
virtual void on_down(const gms::inet_address& endpoint, locator::host_id hid) override;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user