diff --git a/db/hints/internal/hint_sender.cc b/db/hints/internal/hint_sender.cc index e6c1f9c68c..6d11a0067e 100644 --- a/db/hints/internal/hint_sender.cc +++ b/db/hints/internal/hint_sender.cc @@ -259,23 +259,23 @@ void hint_sender::start() { future<> hint_sender::send_one_mutation(frozen_mutation_and_schema m) { auto ermp = _db.find_column_family(m.s).get_effective_replication_map(); auto token = dht::get_token(*m.s, m.fm.key()); - inet_address_vector_replica_set natural_endpoints = ermp->get_natural_endpoints(std::move(token)); + host_id_vector_replica_set natural_endpoints = ermp->get_natural_replicas(std::move(token)); return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints] () mutable -> future<> { // The fact that we send with CL::ALL in both cases below ensures that new hints are not going // to be generated as a result of hints sending. const auto& tm = ermp->get_token_metadata(); - const auto maybe_addr = tm.get_endpoint_for_host_id_if_known(end_point_key()); + const auto dst = end_point_key(); - if (maybe_addr && boost::range::find(natural_endpoints, *maybe_addr) != natural_endpoints.end() && !tm.is_leaving(end_point_key())) { - manager_logger.trace("Sending directly to {}", end_point_key()); - return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), *maybe_addr); + if (std::ranges::contains(natural_endpoints, dst) && !tm.is_leaving(dst)) { + manager_logger.trace("Sending directly to {}", dst); + return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), dst); } else { if (manager_logger.is_enabled(log_level::trace)) { if (tm.is_leaving(end_point_key())) { - manager_logger.trace("The original target endpoint {} is leaving. Mutating from scratch...", end_point_key()); + manager_logger.trace("The original target endpoint {} is leaving. Mutating from scratch...", dst); } else { - manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key()); + manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", dst); } } return _proxy.send_hint_to_all_replicas(std::move(m)); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 38d10363f3..f845c71324 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -64,6 +64,7 @@ #include #include #include +#include #include "locator/abstract_replication_strategy.hh" #include "service/paxos/cas_request.hh" #include "mutation/mutation_partition_view.hh" @@ -4054,11 +4055,12 @@ bool storage_proxy::cannot_hint(const Range& targets, db::write_type type) const std::ranges::any_of(targets, std::bind(&db::hints::manager::too_many_in_flight_hints_for, &_hints_manager, std::placeholders::_1)); } +template future<> storage_proxy::send_to_endpoint( std::unique_ptr m, locator::effective_replication_map_ptr ermp, - gms::inet_address target, - inet_address_vector_topology_change pending_endpoints, + NodesContainer::value_type target, + NodesContainer pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, @@ -4075,14 +4077,21 @@ future<> storage_proxy::send_to_endpoint( timeout = clock_type::now() + 5min; } - // FIXME: the function is called from hint manager and view builder - // both need to be amended to pass host ids and then this can be dropped - auto n = ermp->get_topology().find_node(target); - if (!n) { - on_internal_error(slogger, fmt::format("Node {} was not found in topology", target)); + locator::host_id target_id; + host_id_vector_topology_change pending_replicas; + if constexpr (!std::is_same_v) { + // FIXME: the function is called from hint manager and view builder + // host use host ids but views use addresses still + auto n = ermp->get_topology().find_node(target); + if (!n) { + on_internal_error(slogger, fmt::format("Node {} was not found in topology", target)); + } + target_id = n->host_id(); + pending_replicas = addr_vector_to_id(ermp->get_topology(), pending_endpoints); + } else { + target_id = target; + pending_replicas = std::move(pending_endpoints); } - auto target_id = n->host_id(); - auto pending_replicas = addr_vector_to_id(ermp->get_topology(), pending_endpoints); return mutate_prepare(std::array{std::move(m)}, cl, type, /* does view building should hold a real permit */ empty_service_permit(), [this, tr_state, erm = std::move(ermp), target = std::array{target_id}, pending_endpoints = std::move(pending_replicas), &stats, cancellable] ( @@ -4140,12 +4149,12 @@ future<> storage_proxy::send_to_endpoint( cancellable); } -future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, gms::inet_address target) { +future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target) { return send_to_endpoint( std::make_unique(std::move(fm_a_s)), std::move(ermp), std::move(target), - { }, + host_id_vector_topology_change{}, db::write_type::SIMPLE, tracing::trace_state_ptr(), get_stats(), diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 8d832b5ab0..14bd8dbf17 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -442,11 +442,12 @@ private: future> do_mutate(std::vector mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool, db::allow_per_partition_rate_limit allow_limit, lw_shared_ptr cdc_tracker); + template future<> send_to_endpoint( std::unique_ptr m, locator::effective_replication_map_ptr ermp, - gms::inet_address target, - inet_address_vector_topology_change pending_endpoints, + NodesContainer::value_type target, + NodesContainer pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, @@ -638,7 +639,7 @@ public: // Send a mutation to a specific remote target as a hint. // Unlike regular mutations during write operations, hints are sent on the streaming connection // and use different RPC verb. - future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, gms::inet_address target); + future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target); /** * Performs the truncate operatoin, which effectively deletes all data from