db/hints: Ensure that draining happens
Before hinted handoff is migrated to using host IDs
to identify nodes in the cluster, we keep track
of mappings between hint endpoint managers
identified by host IDs and the hint directories
managed by them and represented by IP addresses.
As a consequence, it may happen that one hint
directory corresponds to multiple nodes
-- it's intended. See 64ba620 for more details.
Before these changes, we only started the draining
process of a hint directory if the node leaving
the cluster corresponded to that hint directory
AND was identified by the same host ID as
the hint endpoint manager managing that directory.
As a result, the draining did not always happen
when it was supposed to.
Draining should start no matter which of the nodes
corresponding to a hint directory is leaving
the cluster. This commit ensures that it happens.
This commit is contained in:
@@ -618,12 +618,12 @@ bool manager::check_dc_for(endpoint_id ep) const noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
future<> manager::drain_for(endpoint_id endpoint) noexcept {
|
||||
future<> manager::drain_for(endpoint_id host_id, gms::inet_address ip) noexcept {
|
||||
if (!started() || stopping() || draining_all()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
manager_logger.trace("on_leave_cluster: {} is removed/decommissioned", endpoint);
|
||||
manager_logger.trace("on_leave_cluster: {} is removed/decommissioned", host_id);
|
||||
|
||||
const auto holder = seastar::gate::holder{_draining_eps_gate};
|
||||
// As long as we hold on to this lock, no migration of hinted handoff to host IDs
|
||||
@@ -642,7 +642,7 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept {
|
||||
|
||||
std::exception_ptr eptr = nullptr;
|
||||
|
||||
if (_proxy.local_db().get_token_metadata().get_topology().is_me(endpoint)) {
|
||||
if (_proxy.local_db().get_token_metadata().get_topology().is_me(host_id)) {
|
||||
set_draining_all();
|
||||
|
||||
try {
|
||||
@@ -657,28 +657,45 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept {
|
||||
_ep_managers.clear();
|
||||
_hint_directory_manager.clear();
|
||||
} else {
|
||||
auto it = _ep_managers.find(endpoint);
|
||||
|
||||
if (it != _ep_managers.end()) {
|
||||
try {
|
||||
co_await drain_ep_manager(it->second);
|
||||
} catch (...) {
|
||||
eptr = std::current_exception();
|
||||
const auto maybe_host_id = std::invoke([&] () -> std::optional<locator::host_id> {
|
||||
if (_uses_host_id) {
|
||||
return host_id;
|
||||
}
|
||||
// Before the whole cluster is migrated to the host-ID-based hinted handoff,
|
||||
// one hint directory may correspond to multiple target nodes. If *any* of them
|
||||
// leaves the cluster, we should drain the hint directory. This is why we need
|
||||
// to rely on this mapping here.
|
||||
const auto maybe_mapping = _hint_directory_manager.get_mapping(host_id, ip);
|
||||
if (maybe_mapping) {
|
||||
return maybe_mapping->first;
|
||||
}
|
||||
return std::nullopt;
|
||||
});
|
||||
|
||||
// We can't provide the function with `it` here because we co_await above,
|
||||
// so iterators could have been invalidated.
|
||||
// This never throws.
|
||||
_ep_managers.erase(endpoint);
|
||||
_hint_directory_manager.remove_mapping(endpoint);
|
||||
if (maybe_host_id) {
|
||||
auto it = _ep_managers.find(*maybe_host_id);
|
||||
|
||||
if (it != _ep_managers.end()) {
|
||||
try {
|
||||
co_await drain_ep_manager(it->second);
|
||||
} catch (...) {
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
|
||||
// We can't provide the function with `it` here because we co_await above,
|
||||
// so iterators could have been invalidated.
|
||||
// This never throws.
|
||||
_ep_managers.erase(*maybe_host_id);
|
||||
_hint_directory_manager.remove_mapping(*maybe_host_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (eptr) {
|
||||
manager_logger.error("Exception when draining {}: {}", endpoint, eptr);
|
||||
manager_logger.error("Exception when draining {}: {}", host_id, eptr);
|
||||
}
|
||||
|
||||
manager_logger.trace("drain_for: finished draining {}", endpoint);
|
||||
manager_logger.trace("drain_for: finished draining {}", host_id);
|
||||
}
|
||||
|
||||
void manager::update_backlog(size_t backlog, size_t max_backlog) {
|
||||
|
||||
@@ -317,8 +317,9 @@ public:
|
||||
/// In both cases - removes the corresponding hints' directories after all hints have been drained and erases the
|
||||
/// corresponding hint_endpoint_manager objects.
|
||||
///
|
||||
/// \param endpoint node that left the cluster
|
||||
future<> drain_for(endpoint_id endpoint) noexcept;
|
||||
/// \param host_id host ID of the node that left the cluster
|
||||
/// \param ip the IP of the node that left the cluster
|
||||
future<> drain_for(endpoint_id host_id, gms::inet_address ip) noexcept;
|
||||
|
||||
void update_backlog(size_t backlog, size_t max_backlog);
|
||||
|
||||
|
||||
@@ -6597,8 +6597,8 @@ void storage_proxy::on_join_cluster(const gms::inet_address& endpoint) {};
|
||||
|
||||
void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) {
|
||||
// Discarding these futures is safe. They're awaited by db::hints::manager::stop().
|
||||
(void) _hints_manager.drain_for(hid);
|
||||
(void) _hints_for_views_manager.drain_for(hid);
|
||||
(void) _hints_manager.drain_for(hid, endpoint);
|
||||
(void) _hints_for_views_manager.drain_for(hid, endpoint);
|
||||
}
|
||||
|
||||
void storage_proxy::on_up(const gms::inet_address& endpoint) {};
|
||||
|
||||
Reference in New Issue
Block a user