From c585444c60e0242eab16a3b18d10cc3b6416d3b8 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Wed, 3 Apr 2024 15:13:11 +0200 Subject: [PATCH 01/18] db/hints: Fix indentation --- db/hints/internal/hint_storage.cc | 4 ++-- db/hints/manager.cc | 26 +++++++++++++------------- db/hints/manager.hh | 4 ++-- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/db/hints/internal/hint_storage.cc b/db/hints/internal/hint_storage.cc index 874977a5f8..ca4311d7f8 100644 --- a/db/hints/internal/hint_storage.cc +++ b/db/hints/internal/hint_storage.cc @@ -90,7 +90,7 @@ future get_current_hints_segments(const fs::path& hint_direc return lister::scan_dir(dir / de.name, lister::dir_entry_types::of(), [¤t_hint_segments, shard_id] (fs::path dir, directory_entry de) { manager_logger.trace("\tIP: {}", de.name); - + // Hint files. return lister::scan_dir(dir / de.name, lister::dir_entry_types::of(), [¤t_hint_segments, shard_id, ep = de.name] (fs::path dir, directory_entry de) { @@ -255,7 +255,7 @@ future<> remove_irrelevant_shards_directories(const fs::path& hint_directory) { lister::show_hidden::yes, [] (fs::path dir, directory_entry de) { return io_check(remove_file, (dir / de.name).native()); }); - + co_await io_check(remove_file, (dir / de.name).native()); } }); diff --git a/db/hints/manager.cc b/db/hints/manager.cc index cda42bc915..7099669fc4 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -72,13 +72,13 @@ public: } const auto units = co_await seastar::get_units(_lock, 1); - + utils::directories::set dir_set; dir_set.add_sharded(_hints_directory); - + manager_logger.debug("Creating and validating hint directories: {}", _hints_directory); co_await _dirs.create_and_verify(std::move(dir_set)); - + _state = state::created_and_validated; } @@ -92,7 +92,7 @@ public: } const auto units = co_await seastar::get_units(_lock, 1); - + manager_logger.debug("Rebalancing hints in {}", _hints_directory); co_await rebalance_hints(fs::path{_hints_directory}); @@ -171,7 +171,7 @@ void manager::register_metrics(const sstring& group_name) { sm::make_counter("corrupted_files", _stats.corrupted_files, sm::description("Number of hints files that were discarded during sending because the file was corrupted.")), - sm::make_gauge("pending_drains", + sm::make_gauge("pending_drains", sm::description("Number of tasks waiting in the queue for draining hints"), [this] { return _drain_lock.waiters(); }), @@ -195,7 +195,7 @@ future<> manager::start(shared_ptr gossiper_ptr) { return get_ep_manager(ep).populate_segments_to_replay(); }); - + co_await compute_hints_dir_device_id(); set_started(); } @@ -239,7 +239,7 @@ void manager::forbid_hints() { void manager::forbid_hints_for_eps_with_pending_hints() { manager_logger.trace("space_watchdog: Going to block hints to: {}", _eps_with_pending_hints); - + for (auto& [_, ep_man] : _ep_managers) { if (has_ep_with_pending_hints(ep_man.end_point_key())) { ep_man.forbid_hints(); @@ -296,7 +296,7 @@ future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_ } return it->second; } (); - + try { co_await ep_man.wait_until_hints_are_replayed_up_to(local_as, rp); } catch (abort_requested_exception&) { @@ -318,7 +318,7 @@ hint_endpoint_manager& manager::get_ep_manager(endpoint_id ep) { if (emplaced) { manager_logger.trace("Created an endpoint manager for {}", ep); - ep_man.start(); + ep_man.start(); } return ep_man; @@ -439,7 +439,7 @@ future<> manager::change_host_filter(host_filter filter) { _host_filter = std::move(filter); eptr = std::current_exception(); } - + try { // Remove endpoint managers which are rejected by the filter. co_await coroutine::parallel_for_each(_ep_managers, [this] (auto& pair) { @@ -448,7 +448,7 @@ future<> manager::change_host_filter(host_filter filter) { if (_host_filter.can_hint_for(_proxy.get_token_metadata_ptr()->get_topology(), ep)) { return make_ready_future<>(); } - + return ep_man.stop(drain::no).finally([this, ep] { _ep_managers.erase(ep); }); @@ -496,7 +496,7 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept { if (_proxy.local_db().get_token_metadata().get_topology().is_me(endpoint)) { set_draining_all(); - + try { co_await coroutine::parallel_for_each(_ep_managers | boost::adaptors::map_values, [&drain_ep_manager] (hint_endpoint_manager& ep_man) { @@ -509,7 +509,7 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept { _ep_managers.clear(); } else { auto it = _ep_managers.find(endpoint); - + if (it != _ep_managers.end()) { try { co_await drain_ep_manager(it->second); diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 31d75071c8..8b6c4fbf3c 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -126,13 +126,13 @@ private: public: manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager& res_manager, sharded& db); - + manager(const manager&) = delete; manager& operator=(const manager&) = delete; manager(manager&&) = delete; manager& operator=(manager&&) = delete; - + ~manager() noexcept { assert(_ep_managers.empty()); } From a36387d942638289d4adfccfb8b407e06d8c2b10 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Wed, 3 Apr 2024 15:13:25 +0200 Subject: [PATCH 02/18] service: Fix indentation --- service/storage_proxy.cc | 4 ++-- service/storage_proxy.hh | 2 +- service/storage_service.cc | 2 +- service/topology_coordinator.cc | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 953e9a3d90..19a6c7731a 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2488,7 +2488,7 @@ storage_proxy_stats::split_stats::split_stats(const sstring& category, const sst , _long_description_prefix(long_description_prefix) , _category(category) , _op_type(op_type) - , _auto_register_metrics(auto_register_metrics) + , _auto_register_metrics(auto_register_metrics) , _sg(current_scheduling_group()) { } storage_proxy_stats::write_stats::write_stats() @@ -2996,7 +2996,7 @@ storage_proxy::mutate_locally(std::vector mutations, tracing::trace_st }); } -future<> +future<> storage_proxy::mutate_locally(std::vector mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) { return mutate_locally(std::move(mutation), tr_state, timeout, _write_smp_service_group, rate_limit_info); } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index f57997f74e..4b9746f47e 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -58,7 +58,7 @@ class one_or_two_partition_ranges; } namespace cdc { - class cdc_service; + class cdc_service; } namespace gms { diff --git a/service/storage_service.cc b/service/storage_service.cc index d4fe433772..31dd2e5831 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1709,7 +1709,7 @@ future<> storage_service::join_token_ring(sharded topology_coordinator::build_coordinator_state(group0_guard guard) { auto get_application_state = [&] (locator::host_id host_id, gms::inet_address ep, const gms::application_state_map& epmap, gms::application_state app_state) -> sstring { const auto it = epmap.find(app_state); if (it == epmap.end()) { - throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {}/{}: application state {} is missing in gossip", + throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {}/{}: application state {} is missing in gossip", host_id, ep, app_state)); } // it's versioned_value::value(), not std::optional::value() - it does not throw @@ -2531,7 +2531,7 @@ future<> topology_coordinator::build_coordinator_state(group0_guard guard) { .set("cleanup_status", cleanup_status::clean) .set("request_id", utils::UUID()) .set("supported_features", supported_features); - + rtlogger.debug("node {} will contain the following parameters: " "datacenter={}, rack={}, tokens={}, shard_count={}, ignore_msb={}, supported_features={}", host_id, datacenter, rack, tokens, shard_count, ignore_msb, supported_features); From 54ae9797b9bcd1c63f75c11c7894afdefbd7006b Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 5 Jan 2024 18:29:27 +0100 Subject: [PATCH 03/18] service: Add locator::host_id to on_leave_cluster We extend the function endpoint_lifecycle_subscriber::on_leave_cluster by another argument -- locator::host_id. It's more convenient to have a consistent pair of IP and host ID. --- service/endpoint_lifecycle_subscriber.hh | 8 ++-- service/qos/service_level_controller.cc | 2 +- service/qos/service_level_controller.hh | 2 +- service/storage_proxy.cc | 2 +- service/storage_proxy.hh | 2 +- service/storage_service.cc | 57 ++++++++++++------------ service/storage_service.hh | 8 ++-- service/topology_coordinator.cc | 2 +- transport/event_notifier.cc | 2 +- transport/server.hh | 2 +- 10 files changed, 46 insertions(+), 41 deletions(-) diff --git a/service/endpoint_lifecycle_subscriber.hh b/service/endpoint_lifecycle_subscriber.hh index 1f28422bcb..01677b4a54 100644 --- a/service/endpoint_lifecycle_subscriber.hh +++ b/service/endpoint_lifecycle_subscriber.hh @@ -11,6 +11,7 @@ #pragma once #include "gms/inet_address.hh" +#include "locator/host_id.hh" #include "utils/atomic_vector.hh" namespace service { @@ -39,9 +40,10 @@ public: /** * Called when a new node leave the cluster (decommission or removeToken). * - * @param endpoint the endpoint that is leaving. + * @param endpoint the IP of the endpoint that is leaving. + * @param host_id the host ID of the endpoint that is leaving. */ - virtual void on_leave_cluster(const gms::inet_address& endpoint) = 0; + virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& host_id) = 0; /** * Called when a node is marked UP. @@ -66,7 +68,7 @@ public: future<> unregister_subscriber(endpoint_lifecycle_subscriber* subscriber) noexcept; future<> notify_down(gms::inet_address endpoint); - future<> notify_left(gms::inet_address endpoint); + future<> notify_left(gms::inet_address endpoint, locator::host_id host_id); future<> notify_up(gms::inet_address endpoint); future<> notify_joined(gms::inet_address endpoint); }; diff --git a/service/qos/service_level_controller.cc b/service/qos/service_level_controller.cc index b784d36106..91e6a04402 100644 --- a/service/qos/service_level_controller.cc +++ b/service/qos/service_level_controller.cc @@ -557,7 +557,7 @@ future<> service_level_controller::do_remove_service_level(sstring name, bool re void service_level_controller::on_join_cluster(const gms::inet_address& endpoint) { } -void service_level_controller::on_leave_cluster(const gms::inet_address& endpoint) { +void service_level_controller::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) { auto my_address = _auth_service.local().query_processor().proxy().local_db().get_token_metadata().get_topology().my_address(); if (this_shard_id() == global_controller && endpoint == my_address) { _global_controller_db->dist_data_update_aborter.request_abort(); diff --git a/service/qos/service_level_controller.hh b/service/qos/service_level_controller.hh index 259ae43630..41a7e3c365 100644 --- a/service/qos/service_level_controller.hh +++ b/service/qos/service_level_controller.hh @@ -259,7 +259,7 @@ public: static sstring default_service_level_name; virtual void on_join_cluster(const gms::inet_address& endpoint) override; - virtual void on_leave_cluster(const gms::inet_address& endpoint) override; + virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) override; virtual void on_up(const gms::inet_address& endpoint) override; virtual void on_down(const gms::inet_address& endpoint) override; }; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 19a6c7731a..7cbb6359b9 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -6568,7 +6568,7 @@ future<> storage_proxy::wait_for_hint_sync_point(const db::hints::sync_point spo void storage_proxy::on_join_cluster(const gms::inet_address& endpoint) {}; -void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint) { +void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) { // Discarding these futures is safe. They're awaited by db::hints::manager::stop(). (void) _hints_manager.drain_for(endpoint); (void) _hints_for_views_manager.drain_for(endpoint); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 4b9746f47e..2f429f3f9a 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -724,7 +724,7 @@ public: } virtual void on_join_cluster(const gms::inet_address& endpoint) override; - virtual void on_leave_cluster(const gms::inet_address& endpoint) override; + virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) override; virtual void on_up(const gms::inet_address& endpoint) override; virtual void on_down(const gms::inet_address& endpoint) override; diff --git a/service/storage_service.cc b/service/storage_service.cc index 31dd2e5831..4057cd1e4c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -402,30 +402,31 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm std::vector> sys_ks_futures; - auto remove_ip = [&](inet_address ip, bool notify) -> future<> { + auto remove_ip = [&](inet_address ip, locator::host_id host_id, bool notify) -> future<> { sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(ip)); if (_gossiper.get_endpoint_state_ptr(ip) && !get_used_ips().contains(ip)) { co_await _gossiper.force_remove_endpoint(ip, gms::null_permit_id); if (notify) { - co_await notify_left(ip); + co_await notify_left(ip, host_id); } } }; auto process_left_node = [&] (raft::server_id id) -> future<> { + locator::host_id host_id{id.uuid()}; + if (const auto ip = am.find(id)) { - co_await remove_ip(*ip, true); + co_await remove_ip(*ip, host_id, true); } - locator::host_id host_id{id.uuid()}; if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) { update_topology(host_id, std::nullopt, t.left_nodes_rs.at(id)); } _group0->modifiable_address_map().set_expiring(id); // However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted. - co_await _messaging.local().ban_host(locator::host_id{id.uuid()}); + co_await _messaging.local().ban_host(host_id); }; auto process_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> { @@ -474,7 +475,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm _exit(1); }); // IP change is not expected to emit REMOVED_NODE notifications - co_await remove_ip(it->second, false); + co_await remove_ip(it->second, host_id, false); } } update_topology(host_id, ip, rs); @@ -2456,7 +2457,7 @@ future<> storage_service::handle_state_left(inet_address endpoint, std::vector(tokens_from_tm.begin(), tokens_from_tm.end()); } - co_await excise(tokens, endpoint, extract_expire_time(pieces), pid); + co_await excise(tokens, endpoint, host_id, extract_expire_time(pieces), pid); } future<> storage_service::handle_state_removed(inet_address endpoint, std::vector pieces, gms::permit_id pid) { @@ -2477,7 +2478,7 @@ future<> storage_service::handle_state_removed(inet_address endpoint, std::vecto auto state = pieces[0]; auto remove_tokens = get_token_metadata().get_tokens(host_id); std::unordered_set tmp(remove_tokens.begin(), remove_tokens.end()); - co_await excise(std::move(tmp), endpoint, extract_expire_time(pieces), pid); + co_await excise(std::move(tmp), endpoint, host_id, extract_expire_time(pieces), pid); } else { // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it add_expire_time_if_found(endpoint, extract_expire_time(pieces)); co_await remove_endpoint(endpoint, pid); @@ -3969,7 +3970,7 @@ future<> storage_service::removenode(locator::host_id host_id, std::list tmp(tokens.begin(), tokens.end()); - ss.excise(std::move(tmp), endpoint, pid).get(); + ss.excise(std::move(tmp), endpoint, host_id, pid).get(); removed_from_token_ring = true; slogger.info("removenode[{}]: Finished removing the node from the ring", uuid); } catch (...) { @@ -4795,27 +4796,27 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node, }); } -future<> storage_service::excise(std::unordered_set tokens, inet_address endpoint, gms::permit_id pid) { - slogger.info("Removing tokens {} for {}", tokens, endpoint); +future<> storage_service::excise(std::unordered_set tokens, inet_address endpoint_ip, + locator::host_id endpoint_hid, gms::permit_id pid) { + slogger.info("Removing tokens {} for {}", tokens, endpoint_ip); // FIXME: HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint); - co_await remove_endpoint(endpoint, pid); + co_await remove_endpoint(endpoint_ip, pid); auto tmlock = std::make_optional(co_await get_token_metadata_lock()); auto tmptr = co_await get_mutable_token_metadata_ptr(); - if (const auto host_id = tmptr->get_host_id_if_known(endpoint); host_id) { - tmptr->remove_endpoint(*host_id); - } + tmptr->remove_endpoint(endpoint_hid); tmptr->remove_bootstrap_tokens(tokens); - co_await update_topology_change_info(tmptr, ::format("excise {}", endpoint)); + co_await update_topology_change_info(tmptr, ::format("excise {}", endpoint_ip)); co_await replicate_to_all_cores(std::move(tmptr)); tmlock.reset(); - co_await notify_left(endpoint); + co_await notify_left(endpoint_ip, endpoint_hid); } -future<> storage_service::excise(std::unordered_set tokens, inet_address endpoint, int64_t expire_time, gms::permit_id pid) { - add_expire_time_if_found(endpoint, expire_time); - return excise(tokens, endpoint, pid); +future<> storage_service::excise(std::unordered_set tokens, inet_address endpoint_ip, + locator::host_id endpoint_hid, int64_t expire_time, gms::permit_id pid) { + add_expire_time_if_found(endpoint_ip, expire_time); + return excise(tokens, endpoint_ip, endpoint_hid, pid); } future<> storage_service::leave_ring() { @@ -6619,7 +6620,7 @@ future<> storage_service::force_remove_completion() { const auto& pid = permit.id(); co_await ss._gossiper.advertise_token_removed(*endpoint, host_id, pid); std::unordered_set tokens_set(tokens.begin(), tokens.end()); - co_await ss.excise(tokens_set, *endpoint, pid); + co_await ss.excise(tokens_set, *endpoint, host_id, pid); slogger.info("force_remove_completion: removing endpoint {} from group 0", *endpoint); assert(ss._group0); @@ -6759,11 +6760,11 @@ future<> storage_service::notify_down(inet_address endpoint) { slogger.debug("Notify node {} has been down", endpoint); } -future<> endpoint_lifecycle_notifier::notify_left(gms::inet_address endpoint) { - return seastar::async([this, endpoint] { - _subscribers.thread_for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) { +future<> endpoint_lifecycle_notifier::notify_left(gms::inet_address endpoint, locator::host_id hid) { + return seastar::async([this, endpoint, hid] { + _subscribers.thread_for_each([endpoint, hid] (endpoint_lifecycle_subscriber* subscriber) { try { - subscriber->on_leave_cluster(endpoint); + subscriber->on_leave_cluster(endpoint, hid); } catch (...) { slogger.warn("Leave cluster notification failed {}: {}", endpoint, std::current_exception()); } @@ -6771,9 +6772,9 @@ future<> endpoint_lifecycle_notifier::notify_left(gms::inet_address endpoint) { }); } -future<> storage_service::notify_left(inet_address endpoint) { - co_await container().invoke_on_all([endpoint] (auto&& ss) { - return ss._lifecycle_notifier.notify_left(endpoint); +future<> storage_service::notify_left(inet_address endpoint, locator::host_id hid) { + co_await container().invoke_on_all([endpoint, hid] (auto&& ss) { + return ss._lifecycle_notifier.notify_left(endpoint, hid); }); slogger.debug("Notify node {} has left the cluster", endpoint); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 86ad633ff7..a347e95add 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -546,8 +546,10 @@ private: future<> handle_state_removed(inet_address endpoint, std::vector pieces, gms::permit_id); private: - future<> excise(std::unordered_set tokens, inet_address endpoint, gms::permit_id); - future<> excise(std::unordered_set tokens, inet_address endpoint, long expire_time, gms::permit_id); + future<> excise(std::unordered_set tokens, inet_address endpoint_ip, locator::host_id endpoint_hid, + gms::permit_id); + future<> excise(std::unordered_set tokens, inet_address endpoint_ip, locator::host_id endpoint_hid, + long expire_time, gms::permit_id); /** unlike excise we just need this endpoint gone without going through any notifications **/ future<> remove_endpoint(inet_address endpoint, gms::permit_id pid); @@ -733,7 +735,7 @@ private: future<> isolate(); future<> notify_down(inet_address endpoint); - future<> notify_left(inet_address endpoint); + future<> notify_left(inet_address endpoint, locator::host_id hid); future<> notify_up(inet_address endpoint); future<> notify_joined(inet_address endpoint); future<> notify_cql_change(inet_address endpoint, bool ready); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 80e2622be9..e9816ea853 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -2280,7 +2280,7 @@ public: future<> run(); virtual void on_join_cluster(const gms::inet_address& endpoint) {} - virtual void on_leave_cluster(const gms::inet_address& endpoint) {}; + virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) {}; virtual void on_up(const gms::inet_address& endpoint) {}; virtual void on_down(const gms::inet_address& endpoint) { _topo_sm.event.broadcast(); }; }; diff --git a/transport/event_notifier.cc b/transport/event_notifier.cc index 25328ea58a..eab99d89e2 100644 --- a/transport/event_notifier.cc +++ b/transport/event_notifier.cc @@ -236,7 +236,7 @@ void cql_server::event_notifier::send_join_cluster(const gms::inet_address& endp } } -void cql_server::event_notifier::on_leave_cluster(const gms::inet_address& endpoint) +void cql_server::event_notifier::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) { for (auto&& conn : _topology_change_listeners) { using namespace cql_transport; diff --git a/transport/server.hh b/transport/server.hh index fc414bbb34..ec58e13b40 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -329,7 +329,7 @@ public: virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override; virtual void on_join_cluster(const gms::inet_address& endpoint) override; - virtual void on_leave_cluster(const gms::inet_address& endpoint) override; + virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) override; virtual void on_up(const gms::inet_address& endpoint) override; virtual void on_down(const gms::inet_address& endpoint) override; }; From 1af7fa74e89ed8f92e186a3e330746eed9d7ad7d Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Wed, 3 Apr 2024 15:33:32 +0200 Subject: [PATCH 04/18] db/hints: Remove noexcept in do_send_one_mutation() While the function is marked as noexcept, the returned future can in fact store an exception. We remove the specifier to reflect the actual behavior of the function. --- db/hints/internal/hint_sender.cc | 2 +- db/hints/internal/hint_sender.hh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/db/hints/internal/hint_sender.cc b/db/hints/internal/hint_sender.cc index 17ffffc757..88170a45f0 100644 --- a/db/hints/internal/hint_sender.cc +++ b/db/hints/internal/hint_sender.cc @@ -76,7 +76,7 @@ future hint_sender::get_last_file_modification(const sstring& fname) { }); } -future<> hint_sender::do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints) noexcept { +future<> hint_sender::do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints) { return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints] () mutable -> future<> { // The fact that we send with CL::ALL in both cases below ensures that new hints are not going // to be generated as a result of hints sending. diff --git a/db/hints/internal/hint_sender.hh b/db/hints/internal/hint_sender.hh index da3db82196..0d990cf2d4 100644 --- a/db/hints/internal/hint_sender.hh +++ b/db/hints/internal/hint_sender.hh @@ -242,7 +242,7 @@ private: /// \param ermp points to the effective_replication_map used to obtain \c natural_endpoints /// \param natural_endpoints current replicas for the given mutation /// \return future that resolves when the operation is complete - future<> do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints) noexcept; + future<> do_send_one_mutation(frozen_mutation_and_schema m, locator::effective_replication_map_ptr ermp, const inet_address_vector_replica_set& natural_endpoints); /// \brief Send one mutation out. /// From cfd03fe2730f64e986a1248127be2edcaf928629 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Mon, 8 Jan 2024 13:36:04 +0100 Subject: [PATCH 05/18] db/hints: Migrate to locator::host_id We change the type of node identifiers used within the module and fix compilation. Directories storing hints to specific nodes are now represented by host IDs instead of IPs. --- db/hints/host_filter.cc | 2 +- db/hints/host_filter.hh | 5 ++- db/hints/internal/common.hh | 4 +-- db/hints/internal/hint_sender.cc | 30 ++++++++++++---- db/hints/manager.cc | 60 ++++++++++++++++++++++--------- db/hints/manager.hh | 3 +- db/hints/resource_manager.cc | 10 +++--- db/hints/resource_manager.hh | 7 ++-- service/storage_proxy.cc | 62 ++++++++++++++++++++++---------- service/storage_proxy.hh | 3 +- 10 files changed, 130 insertions(+), 56 deletions(-) diff --git a/db/hints/host_filter.cc b/db/hints/host_filter.cc index 1795addb97..83d9fd9fb0 100644 --- a/db/hints/host_filter.cc +++ b/db/hints/host_filter.cc @@ -28,7 +28,7 @@ host_filter::host_filter(std::unordered_set allowed_dcs) , _dcs(std::move(allowed_dcs)) { } -bool host_filter::can_hint_for(const locator::topology& topo, gms::inet_address ep) const { +bool host_filter::can_hint_for(const locator::topology& topo, endpoint_id ep) const { switch (_enabled_kind) { case enabled_kind::enabled_for_all: return true; diff --git a/db/hints/host_filter.hh b/db/hints/host_filter.hh index b90be25d3d..16e98fe96f 100644 --- a/db/hints/host_filter.hh +++ b/db/hints/host_filter.hh @@ -15,6 +15,7 @@ #include #include "seastarx.hh" +#include "db/hints/internal/common.hh" namespace gms { class inet_address; @@ -28,6 +29,8 @@ namespace hints { // host_filter tells hints_manager towards which endpoints it is allowed to generate hints. class host_filter final { private: + using endpoint_id = internal::endpoint_id; + enum class enabled_kind { enabled_for_all, enabled_selectively, @@ -58,7 +61,7 @@ public: // Parses hint filtering configuration from a list of DCs. static host_filter parse_from_dc_list(sstring opt); - bool can_hint_for(const locator::topology& topo, gms::inet_address ep) const; + bool can_hint_for(const locator::topology& topo, endpoint_id ep) const; inline const std::unordered_set& get_dcs() const { return _dcs; diff --git a/db/hints/internal/common.hh b/db/hints/internal/common.hh index eef65341be..f344dc9515 100644 --- a/db/hints/internal/common.hh +++ b/db/hints/internal/common.hh @@ -12,7 +12,7 @@ #include // Scylla includes. -#include "gms/inet_address.hh" +#include "locator/host_id.hh" // STD. #include @@ -21,7 +21,7 @@ namespace db::hints { namespace internal { /// Type identifying the host a specific subset of hints should be sent to. -using endpoint_id = gms::inet_address; +using endpoint_id = locator::host_id; /// Tag specifying if hint sending should enter the so-called "drain mode". /// If it should, that means that if a failure while sending a hint occurs, diff --git a/db/hints/internal/hint_sender.cc b/db/hints/internal/hint_sender.cc index 88170a45f0..6bf5eb8840 100644 --- a/db/hints/internal/hint_sender.cc +++ b/db/hints/internal/hint_sender.cc @@ -80,9 +80,12 @@ future<> hint_sender::do_send_one_mutation(frozen_mutation_and_schema m, locator return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints] () mutable -> future<> { // The fact that we send with CL::ALL in both cases below ensures that new hints are not going // to be generated as a result of hints sending. - if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) { + const auto& tm = ermp->get_token_metadata(); + const auto maybe_addr = tm.get_endpoint_for_host_id_if_known(end_point_key()); + + if (maybe_addr && boost::range::find(natural_endpoints, *maybe_addr) != natural_endpoints.end()) { manager_logger.trace("Sending directly to {}", end_point_key()); - return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), end_point_key()); + return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), *maybe_addr); } else { manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key()); return _proxy.send_hint_to_all_replicas(std::move(m)); @@ -95,17 +98,30 @@ bool hint_sender::can_send() noexcept { return false; } + const auto tmptr = _shard_manager._proxy.get_token_metadata_ptr(); + const auto maybe_ep = std::invoke([&] () noexcept -> std::optional { + try { + return tmptr->get_endpoint_for_host_id_if_known(_ep_key); + } catch (...) { + return std::nullopt; + } + }); + try { - if (_gossiper.is_alive(end_point_key())) { + // `hint_sender` can never target this node, so if the returned optional is empty, + // that must mean the current locator::token_metadata doesn't store the information + // about the target node. + if (maybe_ep && _gossiper.is_alive(*maybe_ep)) { _state.remove(state::ep_state_left_the_ring); return true; } else { if (!_state.contains(state::ep_state_left_the_ring)) { - const auto& tm = _shard_manager.local_db().get_token_metadata(); - const auto host_id = tm.get_host_id_if_known(end_point_key()); - _state.set_if(!host_id || !tm.is_normal_token_owner(*host_id)); + _state.set_if(!tmptr->is_normal_token_owner(_ep_key)); } - // send the hints out if the destination Node is part of the ring - we will send to all new replicas in this case + // If the node is not part of the ring, we will send hints to all new replicas. + // Note that if the optional -- `maybe_ep` -- is empty, that could mean that `_ep_key` + // is the locator::host_id of THIS node. However, that's impossible because instances + // of `hint_sender` are only created for OTHER nodes, so this logic is correct. return _state.contains(state::ep_state_left_the_ring); } } catch (...) { diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 7099669fc4..b32e69dc1e 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -187,7 +187,7 @@ future<> manager::start(shared_ptr gossiper_ptr) { co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of(), [this] (fs::path datadir, directory_entry de) { - endpoint_id ep = endpoint_id{de.name}; + endpoint_id ep = endpoint_id{utils::UUID{de.name}}; if (!check_dc_for(ep)) { return make_ready_future<>(); @@ -249,14 +249,21 @@ void manager::forbid_hints_for_eps_with_pending_hints() { } } -sync_point::shard_rps manager::calculate_current_sync_point(std::span target_eps) const { +sync_point::shard_rps manager::calculate_current_sync_point(std::span target_eps) const { sync_point::shard_rps rps; + const auto tmptr = _proxy.get_token_metadata_ptr(); for (auto addr : target_eps) { - auto it = _ep_managers.find(addr); + const auto hid = tmptr->get_host_id_if_known(addr); + // Ignore the IPs that we cannot map. + if (!hid) { + continue; + } + + auto it = _ep_managers.find(*hid); if (it != _ep_managers.end()) { const hint_endpoint_manager& ep_man = it->second; - rps[ep_man.end_point_key()] = ep_man.last_written_replay_position(); + rps[addr] = ep_man.last_written_replay_position(); } } @@ -276,22 +283,34 @@ future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_ local_as.request_abort(); } + const auto tmptr = _proxy.get_token_metadata_ptr(); + std::unordered_map hid_rps{}; + hid_rps.reserve(rps.size()); + + for (const auto& [addr, rp] : rps) { + const auto maybe_hid = tmptr->get_host_id_if_known(addr); + // Ignore the IPs we cannot map. + if (maybe_hid) [[likely]] { + hid_rps.emplace(*maybe_hid, rp); + } + } + bool was_aborted = false; co_await coroutine::parallel_for_each(_ep_managers, - coroutine::lambda([&rps, &local_as, &was_aborted] (auto& pair) -> future<> { + coroutine::lambda([&hid_rps, &local_as, &was_aborted] (auto& pair) -> future<> { auto& [ep, ep_man] = pair; - // When `rps` doesn't specify a replay position for a given endpoint, we use + // When `hid_rps` doesn't specify a replay position for a given endpoint, we use // its default value. Normally, it should be equal to returning a ready future here. // However, foreign segments (i.e. segments that were moved from another shard at start-up) // are treated differently from "regular" segments -- we can think of their replay positions // as equal to negative infinity or simply smaller from any other replay position, which // also includes the default value. Because of that, we don't have a choice -- we have to - // pass either rps[ep] or the default replay position to the endpoint manager because + // pass either hid_rps[ep] or the default replay position to the endpoint manager because // some hints MIGHT need to be sent. const replay_position rp = [&] { - auto it = rps.find(ep); - if (it == rps.end()) { + auto it = hid_rps.find(ep); + if (it == hid_rps.end()) { return replay_position{}; } return it->second; @@ -354,10 +373,15 @@ bool manager::store_hint(endpoint_id ep, schema_ptr s, lw_shared_ptr MAX_SIZE_OF_HINTS_IN_PROGRESS + if (!(_stats.size_of_hints_in_progress > MAX_SIZE_OF_HINTS_IN_PROGRESS && !_proxy.local_db().get_token_metadata().get_topology().is_me(ep) - && hints_in_progress_for(ep) > 0 - && local_gossiper().get_endpoint_downtime(ep) <= _max_hint_window_us; + && hints_in_progress_for(ep) > 0)) { + return false; + } + + return std::ranges::any_of(local_gossiper().get_nodes_with_host_id(ep), [this] (const auto& ip) { + return local_gossiper().get_endpoint_downtime(ip) <= _max_hint_window_us; + }); } bool manager::can_hint_for(endpoint_id ep) const noexcept { @@ -389,9 +413,13 @@ bool manager::can_hint_for(endpoint_id ep) const noexcept { } // Check if the endpoint has been down for too long. - const auto ep_downtime = local_gossiper().get_endpoint_downtime(ep); - if (ep_downtime > _max_hint_window_us) { - manager_logger.trace("{} has been down for {}, not hinting", ep, ep_downtime); + const auto addrs = local_gossiper().get_nodes_with_host_id(ep); + const bool node_is_alive = std::ranges::any_of(addrs, [this] (const auto& addr) { + return local_gossiper().get_endpoint_downtime(addr) <= _max_hint_window_us; + }); + + if (!node_is_alive) { + manager_logger.trace("{} has been down for too long, not hinting", ep); return false; } @@ -424,7 +452,7 @@ future<> manager::change_host_filter(host_filter filter) { // for some of them co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of(), [this] (fs::path datadir, directory_entry de) { - const endpoint_id ep = endpoint_id{de.name}; + const endpoint_id ep = endpoint_id{utils::UUID{de.name}}; const auto& topology = _proxy.get_token_metadata_ptr()->get_topology(); if (_ep_managers.contains(ep) || !_host_filter.can_hint_for(topology, ep)) { diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 8b6c4fbf3c..94ea6986ba 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -26,6 +26,7 @@ #include "db/hints/resource_manager.hh" #include "db/hints/host_filter.hh" #include "db/hints/sync_point.hh" +#include "gms/inet_address.hh" #include "locator/abstract_replication_strategy.hh" // STD. @@ -243,7 +244,7 @@ public: } /// \brief Returns a set of replay positions for hint queues towards endpoints from the `target_eps`. - sync_point::shard_rps calculate_current_sync_point(std::span target_eps) const; + sync_point::shard_rps calculate_current_sync_point(std::span target_eps) const; /// \brief Waits until hint replay reach replay positions described in `rps`. future<> wait_for_sync_point(abort_source& as, const sync_point::shard_rps& rps); diff --git a/db/hints/resource_manager.cc b/db/hints/resource_manager.cc index 57605997d3..f7bb2bc8c7 100644 --- a/db/hints/resource_manager.cc +++ b/db/hints/resource_manager.cc @@ -91,7 +91,7 @@ future<> space_watchdog::stop() noexcept { } // Called under the end_point_hints_manager::file_update_mutex() of the corresponding end_point_hints_manager instance. -future<> space_watchdog::scan_one_ep_dir(fs::path path, manager& shard_manager, ep_key_type ep_key) { +future<> space_watchdog::scan_one_ep_dir(fs::path path, manager& shard_manager, endpoint_id ep_key) { return do_with(std::move(path), [this, ep_key, &shard_manager] (fs::path& path) { // It may happen that we get here and the directory has already been deleted in the context of manager::drain_for(). // In this case simply bail out. @@ -146,14 +146,14 @@ void space_watchdog::on_timer() { // not hintable). // If exists - let's take a file update lock so that files are not changed under our feet. Otherwise, simply // continue to enumeration - there is no one to change them. - const internal::endpoint_id ep{de.name}; + const internal::endpoint_id ep{utils::UUID{de.name}}; if (shard_manager.have_ep_manager(ep)) { - return shard_manager.with_file_update_mutex_for(ep, [this, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)] () mutable { - return scan_one_ep_dir(dir / ep_name, shard_manager, ep_key_type(ep_name)); + return shard_manager.with_file_update_mutex_for(ep, [this, ep, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)] () mutable { + return scan_one_ep_dir(dir / ep_name, shard_manager, ep); }); } else { - return scan_one_ep_dir(dir / de.name, shard_manager, ep_key_type(de.name)); + return scan_one_ep_dir(dir / de.name, shard_manager, ep); } }).get(); } diff --git a/db/hints/resource_manager.hh b/db/hints/resource_manager.hh index f67f115eee..8a4b1c3fc9 100644 --- a/db/hints/resource_manager.hh +++ b/db/hints/resource_manager.hh @@ -20,6 +20,7 @@ #include "utils/small_vector.hh" #include "utils/updateable_value.hh" #include "enum_set.hh" +#include "db/hints/internal/common.hh" // Usually we don't define namespace aliases in our headers // but this one is already entrenched. @@ -31,7 +32,6 @@ class storage_proxy; namespace gms { class gossiper; - class inet_address; } // namespace gms namespace db { @@ -46,7 +46,8 @@ class manager; class space_watchdog { private: - using ep_key_type = gms::inet_address; + using endpoint_id = internal::endpoint_id; + static const std::chrono::seconds _watchdog_period; struct manager_hash { @@ -112,7 +113,7 @@ private: /// \param path directory to scan /// \param ep_name end point ID (as a string) /// \return future that resolves when scanning is complete - future<> scan_one_ep_dir(fs::path path, manager& shard_manager, ep_key_type ep_key); + future<> scan_one_ep_dir(fs::path path, manager& shard_manager, endpoint_id ep_key); }; class resource_manager { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 7cbb6359b9..f98b2e7bae 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1015,7 +1015,8 @@ protected: schema_ptr _schema; public: virtual ~mutation_holder() {} - virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) = 0; + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr ermptr, + tracing::trace_state_ptr tr_state) = 0; virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, fencing_token fence) = 0; @@ -1054,10 +1055,12 @@ public: _mutations.emplace(m.first, std::move(fm)); } } - virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr ermptr, + tracing::trace_state_ptr tr_state) override { auto m = _mutations[ep]; if (m) { - return hm.store_hint(ep, _schema, std::move(m), tr_state); + const auto hid = ermptr->get_token_metadata().get_host_id(ep); + return hm.store_hint(hid, _schema, std::move(m), tr_state); } else { return false; } @@ -1113,8 +1116,10 @@ public: } explicit shared_mutation(const mutation& m) : shared_mutation(frozen_mutation_and_schema{freeze(m), m.schema()}) { } - virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { - return hm.store_hint(ep, _schema, _mutation, tr_state); + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr ermptr, + tracing::trace_state_ptr tr_state) override { + const auto hid = ermptr->get_token_metadata().get_host_id(ep); + return hm.store_hint(hid, _schema, _mutation, tr_state); } virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, @@ -1143,7 +1148,8 @@ public: class hint_mutation : public shared_mutation { public: using shared_mutation::shared_mutation; - virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr, + tracing::trace_state_ptr tr_state) override { throw std::runtime_error("Attempted to store a hint for a hint"); } virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, @@ -1268,8 +1274,9 @@ public: _size = _proposal->update.representation().size(); _schema = std::move(s); } - virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { - return false; // CAS does not save hints yet + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr, + tracing::trace_state_ptr tr_state) override { + return false; // CAS does not save hints yet } virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, @@ -1502,7 +1509,8 @@ public: // we are here because either cl was achieved, but targets left in the handler are not // responding, so a hint should be written for them, or cl == any in which case // hints are counted towards consistency, so we need to write hints and count how much was written - auto hints = _proxy->hint_to_dead_endpoints(_mutation_holder, get_targets(), _type, get_trace_state()); + auto hints = _proxy->hint_to_dead_endpoints(_mutation_holder, get_targets(), _effective_replication_map_ptr, + _type, get_trace_state()); signal(hints); if (_cl == db::consistency_level::ANY && hints) { slogger.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); @@ -1574,8 +1582,9 @@ public: const inet_address_vector_topology_change& get_dead_endpoints() const { return _dead_endpoints; } - bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) { - return _mutation_holder->store_hint(hm, ep, tr_state); + bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr ermptr, + tracing::trace_state_ptr tr_state) { + return _mutation_holder->store_hint(hm, ep, std::move(ermptr), tr_state); } future<> apply_locally(storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { auto op = _proxy->start_write(); @@ -3113,10 +3122,24 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok pending_endpoints.erase(itend, pending_endpoints.end()); auto all = boost::range::join(natural_endpoints, pending_endpoints); + auto all_hids = all | boost::adaptors::transformed([&erm] (const gms::inet_address& ep) { + const auto& tm = erm->get_token_metadata(); + const auto maybe_host_id = tm.get_host_id_if_known(ep); + if (maybe_host_id) { + return *maybe_host_id; + } + // We need this additional check because even after removing the mapping IP-host ID corresponding + // to this node from `locator::token_metadata` while decommissioning, we still perform mutations + // targetting the local node. + if (tm.get_topology().is_me(ep)) { + return tm.get_topology().my_host_id(); + } + on_internal_error(slogger, seastar::format("No mapping for {} in the passed effective replication map", ep)); + }); // If the manager hasn't started yet, no mutation will be performed to another node. // No hint will need to be stored. - if (cannot_hint(all, type)) { + if (cannot_hint(all_hids, type)) { get_stats().writes_failed_due_to_too_many_in_flight_hints++; // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead. @@ -3228,8 +3251,8 @@ void storage_proxy::register_cdc_operation_result_tracker(const storage_proxy::u void storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level cl) { auto& h = *get_write_response_handler(id); - - size_t hints = hint_to_dead_endpoints(h._mutation_holder, h.get_dead_endpoints(), h._type, h.get_trace_state()); + size_t hints = hint_to_dead_endpoints(h._mutation_holder, h.get_dead_endpoints(), h._effective_replication_map_ptr, + h._type, h.get_trace_state()); if (cl == db::consistency_level::ANY) { // for cl==ANY hints are counted towards consistency @@ -4120,12 +4143,13 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo // returns number of hints stored template -size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept +size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets, + locator::effective_replication_map_ptr ermptr, db::write_type type, tracing::trace_state_ptr tr_state) noexcept { if (hints_enabled(type)) { db::hints::manager& hints_manager = hints_manager_for(type); - return boost::count_if(targets, [&mh, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool { - return mh->store_hint(hints_manager, target, tr_state); + return boost::count_if(targets, [&mh, ermptr, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool { + return mh->store_hint(hints_manager, target, ermptr, tr_state); }); } else { return 0; @@ -6570,8 +6594,8 @@ void storage_proxy::on_join_cluster(const gms::inet_address& endpoint) {}; void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) { // Discarding these futures is safe. They're awaited by db::hints::manager::stop(). - (void) _hints_manager.drain_for(endpoint); - (void) _hints_for_views_manager.drain_for(endpoint); + (void) _hints_manager.drain_for(hid); + (void) _hints_for_views_manager.drain_for(hid); } void storage_proxy::on_up(const gms::inet_address& endpoint) {}; diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 2f429f3f9a..df9e9f0766 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -334,7 +334,8 @@ private: void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr tracker); void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout); template - size_t hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets, db::write_type type, tracing::trace_state_ptr tr_state) noexcept; + size_t hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets, + locator::effective_replication_map_ptr ermptr, db::write_type type, tracing::trace_state_ptr tr_state) noexcept; void hint_to_dead_endpoints(response_id_type, db::consistency_level); template bool cannot_hint(const Range& targets, db::write_type type) const; From 063d4d5e9143ab3132055c1fa045389d43ad6f47 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Wed, 20 Mar 2024 10:20:09 +0100 Subject: [PATCH 06/18] db/hints: Prepare initializing endpoint managers for migrating from IP to host ID We extract the initialization of endpoint managers from the start method of the hint manager to a separate function and make it handle directories that represent either IP addresses, or host IDs; other directories are ignored. It's necessary because before Scylla is upgraded to a version that uses host-ID-based hinted handoff, we need to continue only managing IP directories. When Scylla has been upgraded, we will need to handle host ID directories. It may also happen that after an upgrade (but not before it), Scylla fails while renaming the directories, so we end up with some of them representing IP address, and some representing host IDs. After these changes, the code handles that scenario as well. --- db/hints/internal/hint_endpoint_manager.cc | 4 +- db/hints/internal/hint_endpoint_manager.hh | 2 +- db/hints/manager.cc | 72 +++++++++++++++++----- db/hints/manager.hh | 9 ++- 4 files changed, 69 insertions(+), 18 deletions(-) diff --git a/db/hints/internal/hint_endpoint_manager.cc b/db/hints/internal/hint_endpoint_manager.cc index 67721986b8..bab7a281ea 100644 --- a/db/hints/internal/hint_endpoint_manager.cc +++ b/db/hints/internal/hint_endpoint_manager.cc @@ -119,13 +119,13 @@ future<> hint_endpoint_manager::stop(drain should_drain) noexcept { }); } -hint_endpoint_manager::hint_endpoint_manager(const endpoint_id& key, manager& shard_manager) +hint_endpoint_manager::hint_endpoint_manager(const endpoint_id& key, fs::path hint_directory, manager& shard_manager) : _key(key) , _shard_manager(shard_manager) , _file_update_mutex_ptr(make_lw_shared()) , _file_update_mutex(*_file_update_mutex_ptr) , _state(state_set::of()) - , _hints_dir(_shard_manager.hints_dir() / format("{}", _key).c_str()) + , _hints_dir(std::move(hint_directory)) // Approximate the position of the last written hint by using the same formula as for segment id calculation in commitlog // TODO: Should this logic be deduplicated with what is in the commitlog? , _last_written_rp(this_shard_id(), std::chrono::duration_cast(runtime::get_boot_time().time_since_epoch()).count()) diff --git a/db/hints/internal/hint_endpoint_manager.hh b/db/hints/internal/hint_endpoint_manager.hh index 8d2ba641e7..e525eb3df5 100644 --- a/db/hints/internal/hint_endpoint_manager.hh +++ b/db/hints/internal/hint_endpoint_manager.hh @@ -68,7 +68,7 @@ private: hint_sender _sender; public: - hint_endpoint_manager(const endpoint_id& key, manager& shard_manager); + hint_endpoint_manager(const endpoint_id& key, std::filesystem::path hint_directory, manager& shard_manager); hint_endpoint_manager(hint_endpoint_manager&&); ~hint_endpoint_manager(); diff --git a/db/hints/manager.cc b/db/hints/manager.cc index b32e69dc1e..135ffe59df 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -29,6 +29,7 @@ // Scylla includes. #include "db/hints/internal/hint_logger.hh" #include "gms/gossiper.hh" +#include "gms/inet_address.hh" #include "locator/abstract_replication_strategy.hh" #include "replica/database.hh" #include "service/storage_proxy.hh" @@ -184,17 +185,7 @@ void manager::register_metrics(const sstring& group_name) { future<> manager::start(shared_ptr gossiper_ptr) { _gossiper_anchor = std::move(gossiper_ptr); - - co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of(), - [this] (fs::path datadir, directory_entry de) { - endpoint_id ep = endpoint_id{utils::UUID{de.name}}; - - if (!check_dc_for(ep)) { - return make_ready_future<>(); - } - - return get_ep_manager(ep).populate_segments_to_replay(); - }); + co_await initialize_endpoint_managers(); co_await compute_hints_dir_device_id(); set_started(); @@ -331,12 +322,21 @@ future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_ } } -hint_endpoint_manager& manager::get_ep_manager(endpoint_id ep) { - auto [it, emplaced] = _ep_managers.try_emplace(ep, ep, *this); +// For now, because `_uses_host_id` is always set to true, we don't use the second argument. +hint_endpoint_manager& manager::get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip = {}) { + const auto hint_directory = std::invoke([&] () -> std::filesystem::path { + if (_uses_host_id) { + return hints_dir() / host_id.to_sstring(); + } else { + return hints_dir() / ip.to_sstring(); + } + }); + + auto [it, emplaced] = _ep_managers.try_emplace(host_id, host_id, std::move(hint_directory), *this); hint_endpoint_manager& ep_man = it->second; if (emplaced) { - manager_logger.trace("Created an endpoint manager for {}", ep); + manager_logger.trace("Created an endpoint manager for {}", host_id); ep_man.start(); } @@ -571,4 +571,48 @@ future<> manager::with_file_update_mutex_for(endpoint_id ep, noncopyable_functio return _ep_managers.at(ep).with_file_update_mutex(std::move(func)); } +// The function assumes that if `_uses_host_id == true`, then there are no directories that represent IP addresses, +// i.e. every directory is either valid and represents a host ID, or is invalid (so it should be ignored anyway). +future<> manager::initialize_endpoint_managers() { + auto maybe_create_ep_mgr = [this] (const locator::host_id& host_id, const gms::inet_address& ip) -> future<> { + if (!check_dc_for(host_id)) { + co_return; + } + + co_await get_ep_manager(host_id, ip).populate_segments_to_replay(); + }; + + // We dispatch here to not hold on to the token metadata if hinted handoff is host-ID-based. + // In that case, there are no directories that represent IP addresses, so we won't need to use it. + // We want to avoid a situation when topology changes are prevented while we hold on to this pointer. + const auto tmptr = _uses_host_id ? nullptr : _proxy.get_token_metadata_ptr(); + + co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of(), + [&] (fs::path directory, directory_entry de) -> future<> { + auto maybe_host_id_or_ep = std::invoke([&] () -> std::optional { + try { + return locator::host_id_or_endpoint{de.name}; + } catch (...) { + // The name represents neither an IP address, nor a host ID. + return std::nullopt; + } + }); + + // The directory is invalid, so there's nothing more to do. + if (!maybe_host_id_or_ep) { + co_return; + } + + if (_uses_host_id) { + // If hinted handoff is host-ID-based, `get_ep_manager` will NOT use the passed IP address, + // so we simply pass the default value there. + co_return co_await maybe_create_ep_mgr(maybe_host_id_or_ep->id(), gms::inet_address{}); + } + + // If we have got to this line, hinted handoff is still IP-based. + const locator::host_id host_id = maybe_host_id_or_ep->resolve_id(*tmptr); + co_await maybe_create_ep_mgr(host_id, maybe_host_id_or_ep->endpoint()); + }); +} + } // namespace db::hints diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 94ea6986ba..9ca12d67fc 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -124,6 +124,9 @@ private: std::unordered_set _eps_with_pending_hints; seastar::named_semaphore _drain_lock = {1, named_semaphore_exception_factory{"drain lock"}}; + // For now, this never changes. + const bool _uses_host_id = true; + public: manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter, int64_t max_hint_window_ms, resource_manager& res_manager, sharded& db); @@ -268,7 +271,7 @@ private: return _local_db; } - hint_endpoint_manager& get_ep_manager(endpoint_id ep); + hint_endpoint_manager& get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip); public: bool have_ep_manager(endpoint_id ep) const noexcept; @@ -317,6 +320,10 @@ private: bool draining_all() noexcept { return _state.contains(state::draining_all); } + + /// Iterates over existing hint directories and for each, if the corresponding endpoint is present + /// in locator::topology, creates an endpoint manager. + future<> initialize_endpoint_managers(); }; } // namespace db::hints From e36f853f9b020928c07619dff18c8671c6acfff4 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Wed, 20 Mar 2024 10:40:59 +0100 Subject: [PATCH 07/18] db/hints: Take both IP and host ID when storing hints The store_hint() method starts taking both an IP and a host ID as its arguments. The rationale for the change is depending on the stage of the cluster (before an upgrade to the host-ID-based hinted handdof and after it), we might need to create a directory representing either an IP address, or a host ID. Because locator::topology can change in the before obtaining the host ID we pass and when the function is being executed, we need to pass both parameters explicitly to ensure the consistency between them. --- db/hints/manager.cc | 16 ++++++++-------- db/hints/manager.hh | 3 ++- service/storage_proxy.cc | 4 ++-- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 135ffe59df..c3f2f2099c 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -347,23 +347,23 @@ bool manager::have_ep_manager(endpoint_id ep) const noexcept { return _ep_managers.contains(ep); } -bool manager::store_hint(endpoint_id ep, schema_ptr s, lw_shared_ptr fm, +bool manager::store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s, lw_shared_ptr fm, tracing::trace_state_ptr tr_state) noexcept { - if (stopping() || draining_all() || !started() || !can_hint_for(ep)) { - manager_logger.trace("Can't store a hint to {}", ep); + if (stopping() || draining_all() || !started() || !can_hint_for(host_id)) { + manager_logger.trace("Can't store a hint to {}", host_id); ++_stats.dropped; return false; } try { - manager_logger.trace("Going to store a hint to {}", ep); - tracing::trace(tr_state, "Going to store a hint to {}", ep); + manager_logger.trace("Going to store a hint to {}", host_id); + tracing::trace(tr_state, "Going to store a hint to {}", host_id); - return get_ep_manager(ep).store_hint(std::move(s), std::move(fm), tr_state); + return get_ep_manager(host_id, ip).store_hint(std::move(s), std::move(fm), tr_state); } catch (...) { - manager_logger.trace("Failed to store a hint to {}: {}", ep, std::current_exception()); - tracing::trace(tr_state, "Failed to store a hint to {}: {}", ep, std::current_exception()); + manager_logger.trace("Failed to store a hint to {}: {}", host_id, std::current_exception()); + tracing::trace(tr_state, "Failed to store a hint to {}: {}", host_id, std::current_exception()); ++_stats.errors; return false; diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 9ca12d67fc..155ac7f695 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -145,7 +145,8 @@ public: void register_metrics(const sstring& group_name); future<> start(shared_ptr gossiper_ptr); future<> stop(); - bool store_hint(endpoint_id ep, schema_ptr s, lw_shared_ptr fm, tracing::trace_state_ptr tr_state) noexcept; + bool store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s, lw_shared_ptr fm, + tracing::trace_state_ptr tr_state) noexcept; /// \brief Changes the host_filter currently used, stopping and starting endpoint_managers relevant to the new host_filter. /// \param filter the new host_filter diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index f98b2e7bae..d56de47c96 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1060,7 +1060,7 @@ public: auto m = _mutations[ep]; if (m) { const auto hid = ermptr->get_token_metadata().get_host_id(ep); - return hm.store_hint(hid, _schema, std::move(m), tr_state); + return hm.store_hint(hid, ep, _schema, std::move(m), tr_state); } else { return false; } @@ -1119,7 +1119,7 @@ public: virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, locator::effective_replication_map_ptr ermptr, tracing::trace_state_ptr tr_state) override { const auto hid = ermptr->get_token_metadata().get_host_id(ep); - return hm.store_hint(hid, _schema, _mutation, tr_state); + return hm.store_hint(hid, ep, _schema, _mutation, tr_state); } virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state, db::per_partition_rate_limit::info rate_limit_info, From 934e4bb45e45cbc42333419619a6fe5155e25b82 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 22 Mar 2024 12:41:37 +0100 Subject: [PATCH 08/18] db/hints: Add function for migrating hint directories to host ID We add a function that will be used while migrating hinted handoff to using host IDs. It iterates over existing hint directories and tries to rename them to the corresponding host IDs. In case of a failure, we remove it so that at the end of its execution the only remaining directories are those that represent host IDs. --- db/hints/manager.cc | 91 +++++++++++++++++++++++++++++++++++++++++++++ db/hints/manager.hh | 12 ++++++ 2 files changed, 103 insertions(+) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index c3f2f2099c..d14d631e82 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -14,8 +14,11 @@ // Seastar features. #include #include +#include +#include #include #include +#include #include #include #include @@ -31,9 +34,11 @@ #include "gms/gossiper.hh" #include "gms/inet_address.hh" #include "locator/abstract_replication_strategy.hh" +#include "locator/token_metadata.hh" #include "replica/database.hh" #include "service/storage_proxy.hh" #include "utils/directories.hh" +#include "utils/disk-error-handler.hh" #include "utils/error_injection.hh" #include "utils/lister.hh" #include "seastarx.hh" @@ -615,4 +620,90 @@ future<> manager::initialize_endpoint_managers() { }); } +// This function assumes that the hint directory is NOT modified as long as this function is being executed. +future<> manager::migrate_ip_directories() { + std::vector hint_directories{}; + + // Step 1. Gather the names of the hint directories. + co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of(), + [&] (std::filesystem::path, directory_entry de) -> future<> { + hint_directories.push_back(std::move(de.name)); + co_return; + }); + + struct hint_dir_mapping { + sstring current_name; + sstring new_name; + }; + + std::vector dirs_to_rename{}; + std::vector dirs_to_remove{}; + + /* RAII lock for token metadata */ { + // We need to keep the topology consistent throughout the loop below to + // ensure that, for example, two different IPs won't be mapped to + // the same host ID. + // + // We don't want to hold on to this pointer for longer than necessary. + // Topology changes might be postponed otherwise. + auto tmptr = _proxy.get_token_metadata_ptr(); + + // Step 2. Obtain mappings IP -> host ID for the directories. + for (auto& directory : hint_directories) { + try { + locator::host_id_or_endpoint hid_or_ep{directory}; + + // If the directory's name already represents a host ID, there is nothing to do. + if (hid_or_ep.has_host_id()) { + continue; + } + + const locator::host_id host_id = hid_or_ep.resolve_id(*tmptr); + dirs_to_rename.push_back({.current_name = std::move(directory), .new_name = host_id.to_sstring()}); + } catch (...) { + // We cannot map the IP to the corresponding host ID either because + // the relevant mapping doesn't exist anymore or an error occurred. Drop it. + // + // We only care about directories named after IPs during an upgrade, + // so we don't want to make this more complex than necessary. + manager_logger.warn("No mapping IP-host ID for hint directory {}. It is going to be removed", directory); + dirs_to_remove.push_back(_hints_dir / std::move(directory)); + } + } + } + + // We don't need this memory anymore. The only remaining elements are the names of the directories + // that already represent valid host IDs. We won't do anything with them. The rest have been moved + // to either `dirs_to_rename` or `dirs_to_remove`. + hint_directories.clear(); + + // Step 3. Try to rename the directories. + co_await coroutine::parallel_for_each(dirs_to_rename, [&] (auto& mapping) -> future<> { + std::filesystem::path old_name = _hints_dir / std::move(mapping.current_name); + std::filesystem::path new_name = _hints_dir / std::move(mapping.new_name); + + try { + manager_logger.info("Renaming hint directory {} to {}", old_name.native(), new_name.native()); + co_await rename_file(old_name.native(), new_name.native()); + } catch (...) { + manager_logger.warn("Renaming directory {} to {} has failed: {}", + old_name.native(), new_name.native(), std::current_exception()); + dirs_to_remove.push_back(std::move(old_name)); + } + }); + + // Step 4. Remove directories that don't represent host IDs. + co_await coroutine::parallel_for_each(dirs_to_remove, [] (auto& directory) -> future<> { + try { + manager_logger.warn("Removing hint directory {}", directory.native()); + co_await lister::rmdir(directory); + } catch (...) { + on_internal_error(manager_logger, + seastar::format("Removing a hint directory has failed. Reason: {}", std::current_exception())); + } + }); + + co_await io_check(sync_directory, _hints_dir.native()); +} + } // namespace db::hints diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 155ac7f695..cc73c73df9 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -325,6 +325,18 @@ private: /// Iterates over existing hint directories and for each, if the corresponding endpoint is present /// in locator::topology, creates an endpoint manager. future<> initialize_endpoint_managers(); + + /// Renames host directories named after IPs to host IDs. + /// + /// In the past, hosts were identified by their IPs. Now we use host IDs for that purpose, + /// but we want to ensure that old hints don't get lost if possible. This function serves + /// this purpose. It's only necessary when upgrading Scylla. + /// + /// This function should ONLY be called by `manager::start()`. + /// + /// Calling this function again while the previous call has not yet finished + /// is undefined behavior. + future<> migrate_ip_directories(); }; } // namespace db::hints From 8fd9c8038749698d4f1a1cee349343744021b45c Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 22 Mar 2024 13:10:42 +0100 Subject: [PATCH 09/18] db/hints: Expose update lock of space watchdog We expose the update lock of space watchdog to be able to prevent it from scanning hint directories. It will be necessary in an upcoming commit when we will be renaming hint directories and possibly removing some of them. Race conditions are unacceptable, so resource manager cannot be able to access the directory during that time. --- db/hints/resource_manager.hh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/db/hints/resource_manager.hh b/db/hints/resource_manager.hh index 8a4b1c3fc9..b40473aa26 100644 --- a/db/hints/resource_manager.hh +++ b/db/hints/resource_manager.hh @@ -86,7 +86,7 @@ public: void start(); future<> stop() noexcept; - seastar::named_semaphore& update_lock() { + seastar::named_semaphore& update_lock() noexcept { return _update_lock; } @@ -194,6 +194,10 @@ public: /// The hints::managers can be added either before or after resource_manager starts. /// If resource_manager is already started, the hints manager will also be started. future<> register_manager(manager& m); + + seastar::named_semaphore& update_lock() noexcept { + return _space_watchdog.update_lock(); + } }; } From 59d49c5219c6dab24a8655506597a73b25926d2a Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 22 Mar 2024 13:18:24 +0100 Subject: [PATCH 10/18] db/hints: Coroutinize space_watchdog::scan_one_ep_dir() --- db/hints/resource_manager.cc | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/db/hints/resource_manager.cc b/db/hints/resource_manager.cc index f7bb2bc8c7..f9c6cc08c9 100644 --- a/db/hints/resource_manager.cc +++ b/db/hints/resource_manager.cc @@ -92,27 +92,23 @@ future<> space_watchdog::stop() noexcept { // Called under the end_point_hints_manager::file_update_mutex() of the corresponding end_point_hints_manager instance. future<> space_watchdog::scan_one_ep_dir(fs::path path, manager& shard_manager, endpoint_id ep_key) { - return do_with(std::move(path), [this, ep_key, &shard_manager] (fs::path& path) { - // It may happen that we get here and the directory has already been deleted in the context of manager::drain_for(). - // In this case simply bail out. - return file_exists(path.native()).then([this, ep_key, &shard_manager, &path] (bool exists) { - if (!exists) { - return make_ready_future<>(); - } else { - return lister::scan_dir(path, lister::dir_entry_types::of(), [this, ep_key, &shard_manager] (fs::path dir, directory_entry de) { - // Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory - if (_files_count == 1) { - shard_manager.add_ep_with_pending_hints(ep_key); - } - ++_files_count; + // It may happen that we get here and the directory has already been deleted in the context of manager::drain_for(). + // In this case simply bail out. + if (!co_await file_exists(path.native())) { + co_return; + } - return io_check(file_size, (dir / de.name.c_str()).c_str()).then([this] (uint64_t fsize) { - _total_size += fsize; - }); - }); - } - }); - }); + co_await lister::scan_dir(path, lister::dir_entry_types::of(), + coroutine::lambda([this, ep_key, &shard_manager] (fs::path dir, directory_entry de) -> future<> { + // Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory + if (_files_count == 1) { + shard_manager.add_ep_with_pending_hints(ep_key); + } + ++_files_count; + + const auto filename = (std::move(dir) / std::move(de.name)).native(); + _total_size += co_await io_check(file_size, filename); + })); } // Called from the context of a seastar::thread. From f9af01852d35d6efd02480d368e363c420fbc6f1 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 22 Mar 2024 13:19:28 +0100 Subject: [PATCH 11/18] db/hints/resource_manager: Update function description The current description of the function `space_watchdog::scan_one_ep_dir` is not up-to-date with the function's signature. This commit updates it. --- db/hints/resource_manager.hh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/db/hints/resource_manager.hh b/db/hints/resource_manager.hh index b40473aa26..f57a3f9407 100644 --- a/db/hints/resource_manager.hh +++ b/db/hints/resource_manager.hh @@ -111,7 +111,8 @@ private: /// value. /// /// \param path directory to scan - /// \param ep_name end point ID (as a string) + /// \param shard_manager the hint manager managing the directory specified by `path` + /// \param ep_key endpoint ID corresponding to the scanned directory /// \return future that resolves when scanning is complete future<> scan_one_ep_dir(fs::path path, manager& shard_manager, endpoint_id ep_key); }; From d0f58736c839b1d9c76672ccac38d301d1dc1be6 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 22 Mar 2024 14:12:09 +0100 Subject: [PATCH 12/18] db/hints: Introduce hint_directory_manager This commit introduces a new class responsible for keeping track of mappings IP-host ID. Before hinted handoff is migrated to using host IDs, hint directories still have to represent IP addresses. However, since we identify endpoint managers by host IDs already, we need to be able to associate them with the directories they manage. This class serves this purpose. --- db/hints/internal/hint_storage.cc | 71 +++++++++++++++++++++++++++++++ db/hints/internal/hint_storage.hh | 36 ++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/db/hints/internal/hint_storage.cc b/db/hints/internal/hint_storage.cc index ca4311d7f8..07ff70b91c 100644 --- a/db/hints/internal/hint_storage.cc +++ b/db/hints/internal/hint_storage.cc @@ -275,5 +275,76 @@ future<> rebalance_hints(fs::path hint_directory) { co_await remove_irrelevant_shards_directories(hint_directory); } +std::pair hint_directory_manager::insert_mapping(const locator::host_id& host_id, + const gms::inet_address& ip) +{ + const auto maybe_mapping = get_mapping(host_id, ip); + if (maybe_mapping) { + return *maybe_mapping; + } + + + _mappings.emplace(host_id, ip); + return std::make_pair(host_id, ip); +} + +std::optional hint_directory_manager::get_mapping(const locator::host_id& host_id) const noexcept { + auto it = _mappings.find(host_id); + if (it != _mappings.end()) { + return it->second; + } + return {}; +} + +std::optional hint_directory_manager::get_mapping(const gms::inet_address& ip) const noexcept { + for (const auto& [host_id, ep] : _mappings) { + if (ep == ip) { + return host_id; + } + } + return {}; +} + +std::optional> hint_directory_manager::get_mapping( + const locator::host_id& host_id, const gms::inet_address& ip) const noexcept +{ + for (const auto& [hid, ep] : _mappings) { + if (hid == host_id || ep == ip) { + return std::make_pair(hid, ep); + } + } + return {}; +} + +void hint_directory_manager::remove_mapping(const locator::host_id& host_id) noexcept { + _mappings.erase(host_id); +} + +void hint_directory_manager::remove_mapping(const gms::inet_address& ip) noexcept { + for (const auto& [host_id, ep] : _mappings) { + if (ep == ip) { + _mappings.erase(host_id); + break; + } + } +} + +bool hint_directory_manager::has_mapping(const locator::host_id& host_id) const noexcept { + return _mappings.contains(host_id); +} + +bool hint_directory_manager::has_mapping(const gms::inet_address& ip) const noexcept { + for (const auto& [_, ep] : _mappings) { + if (ip == ep) { + return true; + } + } + return false; +} + +void hint_directory_manager::clear() noexcept { + _mappings.clear(); +} + } // namespace internal } // namespace db::hints diff --git a/db/hints/internal/hint_storage.hh b/db/hints/internal/hint_storage.hh index 384a7ba565..6e52b48fd1 100644 --- a/db/hints/internal/hint_storage.hh +++ b/db/hints/internal/hint_storage.hh @@ -46,5 +46,41 @@ using hint_entry_reader = commitlog_entry_reader; /// \return A future that resolves when the operation is complete. future<> rebalance_hints(std::filesystem::path hint_directory); +class hint_directory_manager { +private: + std::map _mappings; + +public: + // Inserts a new mapping and returns it. + // If either the host ID or the IP is already in the map, the function inserts nothings + // and returns the existing mapping instead. + std::pair insert_mapping(const locator::host_id& host_id, + const gms::inet_address& ip); + + // Returns the corresponding IP for a given host ID if a mapping is present in the directory manager. + // Otherwise, an empty optional is returned. + [[nodiscard]] std::optional get_mapping(const locator::host_id& host_id) const noexcept; + + // Returns the corresponding host ID for a given IP if a mapping is present in the directory manager. + // Otherwise, an empty optional is returned. + [[nodiscard]] std::optional get_mapping(const gms::inet_address& ip) const noexcept; + + // Returns a mapping corresponding to either the passed host ID, or the passed IP if the mapping exists. + // Otherwise, an empty optional is returned. + [[nodiscard]] std::optional> get_mapping( + const locator::host_id& host_id, const gms::inet_address& ip) const noexcept; + + // Removes a mapping corresponding to the passed host ID if the mapping exists. + void remove_mapping(const locator::host_id& host_id) noexcept; + // Removes a mapping corresponding to the passed IP if the mapping exists. + void remove_mapping(const gms::inet_address& ip) noexcept; + + bool has_mapping(const locator::host_id& host_id) const noexcept; + bool has_mapping(const gms::inet_address& ip) const noexcept; + + // Removes all of the mappings. + void clear() noexcept; +}; + } // namespace internal } // namespace db::hints From aa4b06a895362eb9e8fd35f00a8a433fb33b69fe Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 22 Mar 2024 14:38:38 +0100 Subject: [PATCH 13/18] db/hints: Enforce providing IP in get_ep_manager() We drop the default argument in the function's signature. Also, we adjust the code of change_host_filter() to be able to perform calls to get_ep_manager(). --- db/hints/manager.cc | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index d14d631e82..81acbe5b67 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -327,8 +327,7 @@ future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_ } } -// For now, because `_uses_host_id` is always set to true, we don't use the second argument. -hint_endpoint_manager& manager::get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip = {}) { +hint_endpoint_manager& manager::get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip) { const auto hint_directory = std::invoke([&] () -> std::filesystem::path { if (_uses_host_id) { return hints_dir() / host_id.to_sstring(); @@ -453,18 +452,39 @@ future<> manager::change_host_filter(host_filter filter) { std::exception_ptr eptr = nullptr; try { + const auto tmptr = _proxy.get_token_metadata_ptr(); + // Iterate over existing hint directories and see if we can enable an endpoint manager // for some of them co_await lister::scan_dir(_hints_dir, lister::dir_entry_types::of(), - [this] (fs::path datadir, directory_entry de) { - const endpoint_id ep = endpoint_id{utils::UUID{de.name}}; + [&] (fs::path datadir, directory_entry de) -> future<> { + using pair_type = std::pair; - const auto& topology = _proxy.get_token_metadata_ptr()->get_topology(); - if (_ep_managers.contains(ep) || !_host_filter.can_hint_for(topology, ep)) { - return make_ready_future(); + const auto maybe_host_id_and_ip = std::invoke([&] () -> std::optional { + try { + locator::host_id_or_endpoint hid_or_ep{de.name}; + if (hid_or_ep.has_host_id()) { + return std::make_optional(pair_type{hid_or_ep.id(), hid_or_ep.resolve_endpoint(*tmptr)}); + } else { + return std::make_optional(pair_type{hid_or_ep.resolve_id(*tmptr), hid_or_ep.endpoint()}); + } + } catch (...) { + return std::nullopt; + } + }); + + if (!maybe_host_id_and_ip) { + co_return; } - return get_ep_manager(ep).populate_segments_to_replay(); + const auto& topology = _proxy.get_token_metadata_ptr()->get_topology(); + const auto& [host_id, ip] = *maybe_host_id_and_ip; + + if (_ep_managers.contains(host_id) || !_host_filter.can_hint_for(topology, host_id)) { + co_return; + } + + co_await get_ep_manager(host_id, ip).populate_segments_to_replay(); }); } catch (...) { // Revert the changes in the filter. The code below will stop the additional managers From ee84e810ca6bf68b41d2cd48929069b9e2327e78 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 22 Mar 2024 14:57:42 +0100 Subject: [PATCH 14/18] db/hints: Start using hint_directory_manager We start keeping track of mappings IP - host ID. The mappings are between endpoint managers (identified by host IDs) and the hint directories managed by them (represented by IP addresses). This is a prelude to handling IP directories by the hint shard manager. The structure should only be used by the hint manager before it's migrated to using host IDs. The reason for that is that we rely on the information obtained from the structure, but it might not make sense later on. When we start creating directories named after host IDs and there are no longer directories representing IP addresses, there is no relation between host IDs and IPs -- just because the structure is supposed to keep track between endpoint managers and hint directories that represent IP addresses. If they represent host IDs, the connection between the two is lost. Still using the data structure could lead to bugs, e.g. if we tried to associate a given endpoint manager's host ID with its corresponding IP address from locator::token_metadata, it could happen that two different host IDs would be bound to the same IP address by the data structure: node A has IP I1, node A changes its IP to I2, node B changes its IP to I1. Though nodes A and B have different host IDs (because they are unique), the code would try to save hints towards node B in node A's hint directory, which should NOT happen. Relying on the data structure is thus only safe before migrating hinted handoff to using host IDs. It may happen that we save a hint in the hint directory of the wrong node indeed, but since migration to using host IDs is a process that only happens once, it's a price we are ready to pay. It's only imperative to prevent it from happening in normal circumstances. --- db/hints/manager.cc | 70 ++++++++++++++++++++++++++++++++++----------- db/hints/manager.hh | 17 +++++++++-- 2 files changed, 68 insertions(+), 19 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 81acbe5b67..445efff8e6 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -34,6 +34,7 @@ #include "gms/gossiper.hh" #include "gms/inet_address.hh" #include "locator/abstract_replication_strategy.hh" +#include "locator/host_id.hh" #include "locator/token_metadata.hh" #include "replica/database.hh" #include "service/storage_proxy.hh" @@ -46,6 +47,7 @@ // STD. #include #include +#include namespace db::hints { @@ -206,6 +208,7 @@ future<> manager::stop() { return ep_man.stop(); }).finally([this] { _ep_managers.clear(); + _hint_directory_manager.clear(); manager_logger.info("Shard hint manager has stopped"); }); }); @@ -328,27 +331,50 @@ future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_ } hint_endpoint_manager& manager::get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip) { - const auto hint_directory = std::invoke([&] () -> std::filesystem::path { - if (_uses_host_id) { - return hints_dir() / host_id.to_sstring(); - } else { - return hints_dir() / ip.to_sstring(); + // If this is enabled, we can't rely on the information obtained from `_hint_directory_manager`. + if (_uses_host_id) { + if (auto it = _ep_managers.find(host_id); it != _ep_managers.end()) { + return it->second; + } + } else { + if (const auto maybe_mapping = _hint_directory_manager.get_mapping(host_id, ip)) { + return _ep_managers.at(maybe_mapping->first); } - }); - auto [it, emplaced] = _ep_managers.try_emplace(host_id, host_id, std::move(hint_directory), *this); - hint_endpoint_manager& ep_man = it->second; - - if (emplaced) { - manager_logger.trace("Created an endpoint manager for {}", host_id); - ep_man.start(); + // If there is no mapping in `_hint_directory_manager` corresponding to either `host_id`, or `ip`, + // we need to create a new endpoint manager. + _hint_directory_manager.insert_mapping(host_id, ip); } - return ep_man; + try { + const auto hint_directory = std::invoke([&] () -> std::filesystem::path { + if (_uses_host_id) { + return hints_dir() / host_id.to_sstring(); + } else { + return hints_dir() / ip.to_sstring(); + } + }); + + auto [it, _] = _ep_managers.emplace(host_id, hint_endpoint_manager{host_id, std::move(hint_directory), *this}); + hint_endpoint_manager& ep_man = it->second; + + manager_logger.trace("Created an endpoint manager for {}", host_id); + ep_man.start(); + + return ep_man; + } catch (...) { + manager_logger.warn("Starting a hint endpoint manager {}/{} has failed", host_id, ip); + _hint_directory_manager.remove_mapping(host_id); + _ep_managers.erase(host_id); + throw; + } } -bool manager::have_ep_manager(endpoint_id ep) const noexcept { - return _ep_managers.contains(ep); +bool manager::have_ep_manager(const std::variant& ep) const noexcept { + if (std::holds_alternative(ep)) { + return _ep_managers.contains(std::get(ep)); + } + return _hint_directory_manager.has_mapping(std::get(ep)); } bool manager::store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s, lw_shared_ptr fm, @@ -504,6 +530,7 @@ future<> manager::change_host_filter(host_filter filter) { return ep_man.stop(drain::no).finally([this, ep] { _ep_managers.erase(ep); + _hint_directory_manager.remove_mapping(ep); }); }); } catch (...) { @@ -560,6 +587,7 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept { } _ep_managers.clear(); + _hint_directory_manager.clear(); } else { auto it = _ep_managers.find(endpoint); @@ -574,6 +602,7 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept { // so iterators could have been invalidated. // This never throws. _ep_managers.erase(endpoint); + _hint_directory_manager.remove_mapping(endpoint); } } @@ -592,8 +621,15 @@ void manager::update_backlog(size_t backlog, size_t max_backlog) { } } -future<> manager::with_file_update_mutex_for(endpoint_id ep, noncopyable_function ()> func) { - return _ep_managers.at(ep).with_file_update_mutex(std::move(func)); +future<> manager::with_file_update_mutex_for(const std::variant& ep, + noncopyable_function ()> func) { + const locator::host_id host_id = std::invoke([&] { + if (std::holds_alternative(ep)) { + return std::get(ep); + } + return *_hint_directory_manager.get_mapping(std::get(ep)); + }); + return _ep_managers.at(host_id).with_file_update_mutex(std::move(func)); } // The function assumes that if `_uses_host_id == true`, then there are no directories that represent IP addresses, diff --git a/db/hints/manager.hh b/db/hints/manager.hh index cc73c73df9..7740565390 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -79,6 +79,8 @@ private: using hint_endpoint_manager = internal::hint_endpoint_manager; using node_to_hint_store_factory_type = internal::node_to_hint_store_factory_type; + using hint_directory_manager = internal::hint_directory_manager; + enum class state { started, // Hinting is currently allowed (start() has completed). replay_allowed, // Replaying (sending) hints is allowed. @@ -119,6 +121,16 @@ private: resource_manager& _resource_manager; std::unordered_map _ep_managers; + + // This is ONLY used when `_uses_host_id` is false. Otherwise, this map should stay EMPTY. + // + // Invariants: + // (1) there is an endpoint manager in `_ep_managers` identified by host ID `H` if an only if + // there is a mapping corresponding to `H` in `_hint_directory_manager`, + // (2) a hint directory representing an IP address `I` is managed by an endpoint manager + // if and only if there is a mapping corresponding to `I` in `_hint_directory_manager`. + hint_directory_manager _hint_directory_manager; + hint_stats _stats; seastar::metrics::metric_groups _metrics; std::unordered_set _eps_with_pending_hints; @@ -190,7 +202,8 @@ public: /// /// \param ep endpoint whose file update mutex should be locked /// \param func functor to be executed - future<> with_file_update_mutex_for(endpoint_id ep, noncopyable_function ()> func); + future<> with_file_update_mutex_for(const std::variant& ep, + noncopyable_function ()> func); /// \brief Checks if hints are disabled for all endpoints /// \return TRUE if hints are disabled. @@ -275,7 +288,7 @@ private: hint_endpoint_manager& get_ep_manager(const endpoint_id& host_id, const gms::inet_address& ip); public: - bool have_ep_manager(endpoint_id ep) const noexcept; + bool have_ep_manager(const std::variant& ep) const noexcept; public: /// \brief Initiate the draining when we detect that the node has left the cluster. From 58784cd8db92561e467bcedabd8c3468708c3da4 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 22 Mar 2024 15:30:17 +0100 Subject: [PATCH 15/18] db/hints: Handle arbitrary directories in resource manager Before these changes, resource manager only handled the case when directories it browsed represented valid host IDs. However, since before migrating hinted handoff to using host IDs we still name directories after IP addresses, that would lead to exceptins that shouldn't happen. We make resource manager handle directories of arbitrary names correctly. --- db/hints/manager.cc | 7 +++--- db/hints/manager.hh | 15 +++++++++--- db/hints/resource_manager.cc | 44 ++++++++++++++++++++++++++++-------- db/hints/resource_manager.hh | 6 +++-- 4 files changed, 53 insertions(+), 19 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 445efff8e6..62dbe3137a 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -237,10 +237,9 @@ void manager::forbid_hints() { } void manager::forbid_hints_for_eps_with_pending_hints() { - manager_logger.trace("space_watchdog: Going to block hints to: {}", _eps_with_pending_hints); - - for (auto& [_, ep_man] : _ep_managers) { - if (has_ep_with_pending_hints(ep_man.end_point_key())) { + for (auto& [host_id, ep_man] : _ep_managers) { + const auto ip = *_hint_directory_manager.get_mapping(host_id); + if (has_ep_with_pending_hints(host_id) || has_ep_with_pending_hints(ip)) { ep_man.forbid_hints(); } else { ep_man.allow_hints(); diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 7740565390..97827351bf 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -133,7 +133,16 @@ private: hint_stats _stats; seastar::metrics::metric_groups _metrics; - std::unordered_set _eps_with_pending_hints; + + // We need to keep a variant here. Before migrating hinted handoff to using host ID, hint directories will + // still represent IP addresses. But after the migration, they will start representing host IDs. + // We need to handle either case. + // + // It's especially important when dealing with the scenario when there is an IP directory, but there is + // no mapping for in locator::token_metadata. Since we sometimes have to save a directory like that + // in this set as well, this variant is necessary. + std::unordered_set> _eps_with_pending_hints; + seastar::named_semaphore _drain_lock = {1, named_semaphore_exception_factory{"drain lock"}}; // For now, this never changes. @@ -227,7 +236,7 @@ public: return it->second.hints_in_progress(); } - void add_ep_with_pending_hints(endpoint_id key) { + void add_ep_with_pending_hints(const std::variant& key) { _eps_with_pending_hints.insert(key); } @@ -236,7 +245,7 @@ public: _eps_with_pending_hints.reserve(_ep_managers.size()); } - bool has_ep_with_pending_hints(endpoint_id key) const { + bool has_ep_with_pending_hints(const std::variant& key) const { return _eps_with_pending_hints.contains(key); } diff --git a/db/hints/resource_manager.cc b/db/hints/resource_manager.cc index f9c6cc08c9..2ef6da8dbe 100644 --- a/db/hints/resource_manager.cc +++ b/db/hints/resource_manager.cc @@ -7,6 +7,8 @@ */ #include "resource_manager.hh" +#include "gms/inet_address.hh" +#include "locator/token_metadata.hh" #include "manager.hh" #include "log.hh" #include @@ -91,7 +93,8 @@ future<> space_watchdog::stop() noexcept { } // Called under the end_point_hints_manager::file_update_mutex() of the corresponding end_point_hints_manager instance. -future<> space_watchdog::scan_one_ep_dir(fs::path path, manager& shard_manager, endpoint_id ep_key) { +future<> space_watchdog::scan_one_ep_dir(fs::path path, manager& shard_manager, + std::optional> maybe_ep_key) { // It may happen that we get here and the directory has already been deleted in the context of manager::drain_for(). // In this case simply bail out. if (!co_await file_exists(path.native())) { @@ -99,10 +102,10 @@ future<> space_watchdog::scan_one_ep_dir(fs::path path, manager& shard_manager, } co_await lister::scan_dir(path, lister::dir_entry_types::of(), - coroutine::lambda([this, ep_key, &shard_manager] (fs::path dir, directory_entry de) -> future<> { + coroutine::lambda([this, maybe_ep_key, &shard_manager] (fs::path dir, directory_entry de) -> future<> { // Put the current end point ID to state.eps_with_pending_hints when we see the second hints file in its directory - if (_files_count == 1) { - shard_manager.add_ep_with_pending_hints(ep_key); + if (maybe_ep_key && _files_count == 1) { + shard_manager.add_ep_with_pending_hints(*maybe_ep_key); } ++_files_count; @@ -142,14 +145,35 @@ void space_watchdog::on_timer() { // not hintable). // If exists - let's take a file update lock so that files are not changed under our feet. Otherwise, simply // continue to enumeration - there is no one to change them. - const internal::endpoint_id ep{utils::UUID{de.name}}; + auto maybe_variant = std::invoke([&] () -> std::optional> { + try { + const auto hid_or_ep = locator::host_id_or_endpoint{de.name}; + if (hid_or_ep.has_host_id()) { + return std::variant(hid_or_ep.id()); + } else { + return std::variant(hid_or_ep.endpoint()); + } + } catch (...) { + return std::nullopt; + } + }); - if (shard_manager.have_ep_manager(ep)) { - return shard_manager.with_file_update_mutex_for(ep, [this, ep, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)] () mutable { - return scan_one_ep_dir(dir / ep_name, shard_manager, ep); + // Case 1: The directory is managed by an endpoint manager. + if (maybe_variant && shard_manager.have_ep_manager(*maybe_variant)) { + const auto variant = *maybe_variant; + return shard_manager.with_file_update_mutex_for(variant, [this, variant, &shard_manager, dir = std::move(dir), ep_name = std::move(de.name)] () mutable { + return scan_one_ep_dir(dir / ep_name, shard_manager, variant); }); - } else { - return scan_one_ep_dir(dir / de.name, shard_manager, ep); + } + // Case 2: The directory isn't managed by an endpoint manager, but it represents either an IP address, + // or a host ID. + else if (maybe_variant) { + return scan_one_ep_dir(dir / de.name, shard_manager, *maybe_variant); + } + // Case 3: The directory isn't managed by an endpoint manager, and it represents neither an IP address, + // nor a host ID. + else { + return scan_one_ep_dir(dir / de.name, shard_manager, {}); } }).get(); } diff --git a/db/hints/resource_manager.hh b/db/hints/resource_manager.hh index f57a3f9407..ec154fec36 100644 --- a/db/hints/resource_manager.hh +++ b/db/hints/resource_manager.hh @@ -21,6 +21,7 @@ #include "utils/updateable_value.hh" #include "enum_set.hh" #include "db/hints/internal/common.hh" +#include "gms/inet_address.hh" // Usually we don't define namespace aliases in our headers // but this one is already entrenched. @@ -112,9 +113,10 @@ private: /// /// \param path directory to scan /// \param shard_manager the hint manager managing the directory specified by `path` - /// \param ep_key endpoint ID corresponding to the scanned directory + /// \param maybe_ep_key endpoint ID corresponding to the scanned directory /// \return future that resolves when scanning is complete - future<> scan_one_ep_dir(fs::path path, manager& shard_manager, endpoint_id ep_key); + future<> scan_one_ep_dir(fs::path path, manager& shard_manager, + std::optional> maybe_ep_key); }; class resource_manager { From 0ef8d67d32d939f9c1ce36bcc625e07e1d459336 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 22 Mar 2024 15:51:09 +0100 Subject: [PATCH 16/18] db/hints: Migrate hinted handoff when cluster feature is enabled These changes migrate hinted handoff to using host ID as soon as the corresponding cluster feature is enabled. When a node starts, it defaults to creating directories naming them after IP addresses. When the whole cluster has upgraded to a version of Scylla that can handle directories representing host IDs, we perform a migration of the IP folders, i.e. we try to rename them to host IDs. Invalid directories, i.e. those that represent neither an IP address, nor a host ID, are removed. During the migration, hinted handoff is disabled. It is necessary because we have to modify the disk's contents, so new hints cannot be saved until the migration finishes. --- db/hints/manager.cc | 116 ++++++++++++++++++++++++++++++++++++++++- db/hints/manager.hh | 24 +++++++-- gms/feature_service.hh | 1 + 3 files changed, 137 insertions(+), 4 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 62dbe3137a..d62764cff1 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -31,6 +31,7 @@ // Scylla includes. #include "db/hints/internal/hint_logger.hh" +#include "gms/feature_service.hh" #include "gms/gossiper.hh" #include "gms/inet_address.hh" #include "locator/abstract_replication_strategy.hh" @@ -47,6 +48,7 @@ // STD. #include #include +#include #include namespace db::hints { @@ -192,10 +194,21 @@ void manager::register_metrics(const sstring& group_name) { future<> manager::start(shared_ptr gossiper_ptr) { _gossiper_anchor = std::move(gossiper_ptr); + if (_proxy.features().host_id_based_hinted_handoff) { + _uses_host_id = true; + co_await migrate_ip_directories(); + } + co_await initialize_endpoint_managers(); co_await compute_hints_dir_device_id(); set_started(); + + if (!_uses_host_id) { + _migration_callback = _proxy.features().host_id_based_hinted_handoff.when_enabled([this] { + _migrating_done = perform_migration(); + }); + } } future<> manager::stop() { @@ -203,7 +216,9 @@ future<> manager::stop() { set_stopping(); - return _draining_eps_gate.close().finally([this] { + return _migrating_done.finally([this] { + return _draining_eps_gate.close(); + }).finally([this] { return parallel_for_each(_ep_managers | boost::adaptors::map_values, [] (hint_endpoint_manager& ep_man) { return ep_man.stop(); }).finally([this] { @@ -269,6 +284,19 @@ sync_point::shard_rps manager::calculate_current_sync_point(std::span manager::wait_for_sync_point(abort_source& as, const sync_point::shard_rps& rps) { + using lock_type = std::shared_lock; + // Prevent the migration to host-ID-based hinted handoff until this function finishes its execution. + const auto shared_lock = co_await std::invoke(coroutine::lambda([&] () -> future { + co_await _migration_mutex.lock_shared(); + + try { + co_return lock_type{_migration_mutex, std::adopt_lock_t{}}; + } catch (...) { + _migration_mutex.unlock_shared(); + throw; + } + })); + abort_source local_as; auto sub = as.subscribe([&local_as] () noexcept { @@ -414,6 +442,10 @@ bool manager::too_many_in_flight_hints_for(endpoint_id ep) const noexcept { } bool manager::can_hint_for(endpoint_id ep) const noexcept { + if (_state.contains(state::migrating)) { + return false; + } + if (_proxy.local_db().get_token_metadata().get_topology().is_me(ep)) { return false; } @@ -560,6 +592,9 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept { manager_logger.trace("on_leave_cluster: {} is removed/decommissioned", endpoint); const auto holder = seastar::gate::holder{_draining_eps_gate}; + // As long as we hold on to this lock, no migration of hinted handoff to host IDs + // can be being performed because `manager::perform_migration()` takes it + // at the beginning of its execution too. const auto sem_unit = co_await seastar::get_units(_drain_lock, 1); // After an endpoint has been drained, we remove its directory with all of its contents. @@ -761,4 +796,83 @@ future<> manager::migrate_ip_directories() { co_await io_check(sync_directory, _hints_dir.native()); } +future<> manager::perform_migration() { + // This function isn't marked as noexcept, but the only parts of the code that + // can throw an exception are: + // 1. the call to `migrate_ip_directories()`: if we fail there, the failure is critical. + // It doesn't lead to any data corruption, but the node must be stopped; + // 2. the re-initialization of the endpoint managers: a failure there is the same failure + // that can happen when starting a node. It may be seen as critical, but it should only + // boil down to not initializing some of the endpoint managers. No data corruption + // is possible. + if (_state.contains(state::stopping) || _state.contains(state::draining_all)) { + // It's possible the cluster feature is enabled right after the local node decides + // to leave the cluster. In that case, the migration callback might still potentially + // be called, but we don't want to perform it. We need to stop the node as soon as possible. + // + // The `state::draining_all` case is more tricky. The semantics of self-draining is not + // specified, but based on the description of the state in the header file, it means + // the node is leaving the cluster and it works like that indeed, so we apply the same reasoning. + co_return; + } + + manager_logger.info("Migration of hinted handoff to host ID is starting"); + // Step 1. Prevent acceping incoming hints. + _state.set(state::migrating); + + // Step 2. Make sure during the migration there is no draining process and we don't await any sync points. + + // We're taking this lock for two reasons: + // 1. we're waiting for the ongoing drains to finish so that there's no data race, + // 2. we suspend new drain requests -- to prevent data races. + const auto lock = co_await seastar::get_units(_drain_lock, 1); + + using lock_type = std::unique_lock; + // We're taking this lock because we're about to stop endpoint managers here, wheras + // `manager::wait_for_sync_point` browses them and awaits their corresponding sync points. + // If we stop them during that process, that function will get exceptions. + // + // Although in the current implemenation there is no danger of race conditions + // (or at least race conditions that could be harmful in any way), it's better + // to avoid them anyway. Hence this lock. + const auto unique_lock = co_await std::invoke(coroutine::lambda([&] () -> future { + co_await _migration_mutex.lock(); + + try { + co_return lock_type{_migration_mutex, std::adopt_lock_t{}}; + } catch (...) { + _migration_mutex.unlock(); + throw; + } + })); + + // Step 3. Stop endpoint managers. We will modify the hint directory contents, so this is necessary. + co_await coroutine::parallel_for_each(_ep_managers | std::views::values, [] (auto& ep_manager) -> future<> { + return ep_manager.stop(drain::no); + }); + + // Step 4. Prevent resource manager from scanning the hint directory. Race conditions are unacceptable. + auto resource_manager_lock = co_await seastar::get_units(_resource_manager.update_lock(), 1); + + // Once the resource manager cannot scan anything anymore, we can safely get rid of these. + _ep_managers.clear(); + _eps_with_pending_hints.clear(); + + // We won't need this anymore. + _hint_directory_manager.clear(); + + // Step 5. Rename the hint directories so that those that remain all represent valid host IDs. + co_await migrate_ip_directories(); + _uses_host_id = true; + + // Step 6. Make resource manager scan the hint directory again. + resource_manager_lock.return_all(); + // Step 7. Once resource manager is working again, endpoint managers can be safely recreated. + // We won't modify the contents of the hint directory anymore. + co_await initialize_endpoint_managers(); + // Step 8. Start accepting incoming hints again. + _state.remove(state::migrating); + manager_logger.info("Migration of hinted handoff to host ID has finished successfully"); +} + } // namespace db::hints diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 97827351bf..353190d07e 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -83,6 +84,9 @@ private: enum class state { started, // Hinting is currently allowed (start() has completed). + migrating, // The hint manager is being migrated from using IPs to name + // hint directories to using host IDs for that purpose. No new + // incoming hints will be accepted as long as this is the state. replay_allowed, // Replaying (sending) hints is allowed. draining_all, // Accepting new hints is not allowed. All endpoint managers // are being drained because the node is leaving the cluster. @@ -92,6 +96,7 @@ private: using state_set = enum_set>; @@ -145,8 +150,13 @@ private: seastar::named_semaphore _drain_lock = {1, named_semaphore_exception_factory{"drain lock"}}; - // For now, this never changes. - const bool _uses_host_id = true; + bool _uses_host_id = false; + std::any _migration_callback = std::nullopt; + future<> _migrating_done = make_ready_future(); + + // Unique lock if and only if there is an ongoing migration to the host-ID-based hinted handoff. + // Shared lock if and only if there is a fiber already executing `manager::wait_for_sync_point`. + seastar::shared_mutex _migration_mutex{}; public: manager(service::storage_proxy& proxy, sstring hints_directory, host_filter filter, @@ -354,11 +364,19 @@ private: /// but we want to ensure that old hints don't get lost if possible. This function serves /// this purpose. It's only necessary when upgrading Scylla. /// - /// This function should ONLY be called by `manager::start()`. + /// This function should ONLY be called by `manager::start()` and `manager::perform_migration()`. /// /// Calling this function again while the previous call has not yet finished /// is undefined behavior. future<> migrate_ip_directories(); + + /// Migrates this hint manager to using host IDs, i.e. when a call to this function ends, + /// the names of hint directories will start being represented by host IDs instead of IPs. + /// + /// This function suspends hinted handoff throughout its execution. Among other consequences, + /// ALL requested sync points will be canceled, i.e. an exception will be issued + /// in the corresponding futures. + future<> perform_migration(); }; } // namespace db::hints diff --git a/gms/feature_service.hh b/gms/feature_service.hh index b9813d8923..3c0bb39e4f 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -137,6 +137,7 @@ public: // revert to the digest method when necessary (if we must perform a schema change during RECOVERY). gms::feature group0_schema_versioning { *this, "GROUP0_SCHEMA_VERSIONING"sv }; gms::feature supports_consistent_topology_changes { *this, "SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES"sv }; + gms::feature host_id_based_hinted_handoff { *this, "HOST_ID_BASED_HINTED_HANDOFF"sv }; // A feature just for use in tests. It must not be advertised unless // the "features_enable_test_feature" injection is enabled. From 46ab22f805edd0b5a10ffb6141404959f739d4dd Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 5 Apr 2024 15:03:11 +0200 Subject: [PATCH 17/18] db/hints: Add endpoint_downtime_not_bigger_than() We add an auxiliary function checking if a node hasn't been down for too long. Although `gms::gossiper` provides already exposes a function responsible for that, it requires that its argument be an IP address. That's the reason we add a new function. --- db/hints/manager.cc | 62 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index d62764cff1..b7f21594ff 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -31,6 +31,8 @@ // Scylla includes. #include "db/hints/internal/hint_logger.hh" +#include "gms/application_state.hh" +#include "gms/endpoint_state.hh" #include "gms/feature_service.hh" #include "gms/gossiper.hh" #include "gms/inet_address.hh" @@ -427,18 +429,55 @@ bool manager::store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s } } +/// Checks if there is a node corresponding to a given host ID that hasn't been down for longer +/// than a given amount of time. The function relies on information obtained from the passed `gms::gossiper`. +static bool endpoint_downtime_not_bigger_than(const gms::gossiper& gossiper, const locator::host_id& host_id, + uint64_t max_downtime_us) +{ + // We want to enforce small buffer optimization in the call + // to `gms::gossiper::for_each_endpoint_state_until()` below + // to avoid an unnecessary allocation. + // Since we need all these four pieces of information in the lambda, + // the function object passed to the function might be too big. + // That's why we create it locally on the stack and only pass a reference to it. + struct sbo_info { + locator::host_id host_id; + const gms::gossiper& gossiper; + int64_t max_hint_window_us; + bool small_node_downtime; + }; + + sbo_info info { + .host_id = host_id, + .gossiper = gossiper, + .max_hint_window_us = max_downtime_us, + .small_node_downtime = false + }; + + gossiper.for_each_endpoint_state_until( + [&info] (const gms::inet_address& ip, const gms::endpoint_state& state) { + const auto app_state = state.get_application_state_ptr(gms::application_state::HOST_ID); + const auto host_id = locator::host_id{utils::UUID{app_state->value()}}; + if (!app_state || host_id != info.host_id) { + return stop_iteration::no; + } + if (info.gossiper.get_endpoint_downtime(ip) <= info.max_hint_window_us) { + info.small_node_downtime = true; + return stop_iteration::yes; + } + return stop_iteration::no; + }); + + return info.small_node_downtime; +} + bool manager::too_many_in_flight_hints_for(endpoint_id ep) const noexcept { // There is no need to check the DC here because if there is an in-flight hint for this // endpoint, then this means that its DC has already been checked and found to be ok. - if (!(_stats.size_of_hints_in_progress > MAX_SIZE_OF_HINTS_IN_PROGRESS + return _stats.size_of_hints_in_progress > MAX_SIZE_OF_HINTS_IN_PROGRESS && !_proxy.local_db().get_token_metadata().get_topology().is_me(ep) - && hints_in_progress_for(ep) > 0)) { - return false; - } - - return std::ranges::any_of(local_gossiper().get_nodes_with_host_id(ep), [this] (const auto& ip) { - return local_gossiper().get_endpoint_downtime(ip) <= _max_hint_window_us; - }); + && hints_in_progress_for(ep) > 0 + && endpoint_downtime_not_bigger_than(local_gossiper(), ep, _max_hint_window_us); } bool manager::can_hint_for(endpoint_id ep) const noexcept { @@ -473,12 +512,7 @@ bool manager::can_hint_for(endpoint_id ep) const noexcept { return false; } - // Check if the endpoint has been down for too long. - const auto addrs = local_gossiper().get_nodes_with_host_id(ep); - const bool node_is_alive = std::ranges::any_of(addrs, [this] (const auto& addr) { - return local_gossiper().get_endpoint_downtime(addr) <= _max_hint_window_us; - }); - + const bool node_is_alive = endpoint_downtime_not_bigger_than(local_gossiper(), ep, _max_hint_window_us); if (!node_is_alive) { manager_logger.trace("{} has been down for too long, not hinting", ep); return false; From bf802e99eb9028f80726042eff534dee824f9dc0 Mon Sep 17 00:00:00 2001 From: Dawid Medrek Date: Fri, 22 Mar 2024 17:43:06 +0100 Subject: [PATCH 18/18] docs: Update Hinted Handoff documentation We briefly explain the process of migration of Hinted Handoff to host IDs, the rationale for it, consequences, and possible side effects. --- docs/dev/hinted_handoff_design.md | 42 +++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/docs/dev/hinted_handoff_design.md b/docs/dev/hinted_handoff_design.md index c51d385134..b6674dc910 100644 --- a/docs/dev/hinted_handoff_design.md +++ b/docs/dev/hinted_handoff_design.md @@ -1,14 +1,14 @@ # Hinted Handoff Design ## Abstract -Hinted Handoff is a feature that allows replaying failed writes. The mutation and the destination replica are saved in a log and replayed later according to the feature configuration. +Hinted Handoff is a feature that allows replaying failed writes. The mutation and the destination replica are saved in a log and replayed later according to the feature configuration. -## Hinted Handoff Configuration Parameters +## Hinted Handoff Configuration Parameters * _hinted_handoff_enabled_: Enables or disables the Hinted Handoff feature completely or enumerate DCs for which hints are allowed. * _max_hint_window_in_ms_: Don't generate hints if the destination Node has been down for more than this value. The hints generation should resume once the Node is seen up. * _hints_directory_: Directory where scylla will store hints. By default `$SCYLLA_HOME/hints` * _hints_compression_: Compression to apply to hints files. By default, hints files are stored uncompressed. - + ## Future configuration * We should define the fairness configuration between the regular WRITES and hints WRITES. Since we don't have either CPU scheduler or (and) Network scheduler at the moment we can't give any guarantee regarding the runtime and/or networking bandwidth fairness. @@ -16,7 +16,7 @@ Hinted Handoff is a feature that allows replaying failed writes. The mutation an ## Hints generation * Once the WRITE mutation fails with a timeout we create a _hints_queue_ for the target node. - * The queue is specified by a destination node IP. + * The queue is specified by a destination node host ID. * Each hint is specified by: * Mutation. @@ -31,15 +31,15 @@ As long as hints are appended to the queue the files are closed and flushed to t We are going to reuse the commitlog infrastructure for writing hints to disk - it provides both the internal buffering and the memory consumption control. -Hints to the specific destination are stored under the _hints_directory_/\/\ directory. +Hints to the specific destination are stored under the _hints_directory_/\/\ directory. ### When new hints may be dropped? - * A new hint is going to be dropped when there are more than 10MB "in progress" (yet to be stored) hints per-shard and when there are "in progress" hints to the destination the current hint is aimed to. + * A new hint is going to be dropped when there are more than 10MB "in progress" (yet to be stored) hints per-shard and when there are "in progress" hints to the destination the current hint is aimed to. * If there are no "in progress" hints to the current destination the new hint won't be dropped due to the per-shard memory limitation. * A hint is going to be dropped if the disk space quota (to the whole node it's 10% of the total disk space of the disk partition where _hints_directory_ is located) has been depleted and when there are pending (already stored) hints to the current destination. * Disk quota is divided equally between all present shards. - * If there are no pending hints to the current destination a new hint won't be dropped due to a disk space limitation. - * If a new hint is dropped the corresponding metrics counter is increased. + * If there are no pending hints to the current destination a new hint won't be dropped due to a disk space limitation. + * If a new hint is dropped the corresponding metrics counter is increased. ### Redistribution of hints when node boots. * When node boots all present hints files are redistributed equally between all present shards. @@ -47,7 +47,7 @@ Hints to the specific destination are stored under the _hints_directory_/\