hints: use host id to send hints

Drop address translation that no longer needed. Templates here are used
temporarily until another user of the function (MV) is converted as
well.
This commit is contained in:
Gleb Natapov
2024-11-24 13:56:36 +02:00
parent 5b9e4c2f07
commit 0ca14ef8b7
3 changed files with 31 additions and 21 deletions

View File

@@ -259,23 +259,23 @@ void hint_sender::start() {
future<> hint_sender::send_one_mutation(frozen_mutation_and_schema m) {
auto ermp = _db.find_column_family(m.s).get_effective_replication_map();
auto token = dht::get_token(*m.s, m.fm.key());
inet_address_vector_replica_set natural_endpoints = ermp->get_natural_endpoints(std::move(token));
host_id_vector_replica_set natural_endpoints = ermp->get_natural_replicas(std::move(token));
return futurize_invoke([this, m = std::move(m), ermp = std::move(ermp), &natural_endpoints] () mutable -> future<> {
// The fact that we send with CL::ALL in both cases below ensures that new hints are not going
// to be generated as a result of hints sending.
const auto& tm = ermp->get_token_metadata();
const auto maybe_addr = tm.get_endpoint_for_host_id_if_known(end_point_key());
const auto dst = end_point_key();
if (maybe_addr && boost::range::find(natural_endpoints, *maybe_addr) != natural_endpoints.end() && !tm.is_leaving(end_point_key())) {
manager_logger.trace("Sending directly to {}", end_point_key());
return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), *maybe_addr);
if (std::ranges::contains(natural_endpoints, dst) && !tm.is_leaving(dst)) {
manager_logger.trace("Sending directly to {}", dst);
return _proxy.send_hint_to_endpoint(std::move(m), std::move(ermp), dst);
} else {
if (manager_logger.is_enabled(log_level::trace)) {
if (tm.is_leaving(end_point_key())) {
manager_logger.trace("The original target endpoint {} is leaving. Mutating from scratch...", end_point_key());
manager_logger.trace("The original target endpoint {} is leaving. Mutating from scratch...", dst);
} else {
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", dst);
}
}
return _proxy.send_hint_to_all_replicas(std::move(m));

View File

@@ -64,6 +64,7 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/all.hh>
#include <type_traits>
#include "locator/abstract_replication_strategy.hh"
#include "service/paxos/cas_request.hh"
#include "mutation/mutation_partition_view.hh"
@@ -4054,11 +4055,12 @@ bool storage_proxy::cannot_hint(const Range& targets, db::write_type type) const
std::ranges::any_of(targets, std::bind(&db::hints::manager::too_many_in_flight_hints_for, &_hints_manager, std::placeholders::_1));
}
template<typename NodesContainer>
future<> storage_proxy::send_to_endpoint(
std::unique_ptr<mutation_holder> m,
locator::effective_replication_map_ptr ermp,
gms::inet_address target,
inet_address_vector_topology_change pending_endpoints,
NodesContainer::value_type target,
NodesContainer pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
@@ -4075,14 +4077,21 @@ future<> storage_proxy::send_to_endpoint(
timeout = clock_type::now() + 5min;
}
// FIXME: the function is called from hint manager and view builder
// both need to be amended to pass host ids and then this can be dropped
auto n = ermp->get_topology().find_node(target);
if (!n) {
on_internal_error(slogger, fmt::format("Node {} was not found in topology", target));
locator::host_id target_id;
host_id_vector_topology_change pending_replicas;
if constexpr (!std::is_same_v<NodesContainer, host_id_vector_topology_change>) {
// FIXME: the function is called from hint manager and view builder
// host use host ids but views use addresses still
auto n = ermp->get_topology().find_node(target);
if (!n) {
on_internal_error(slogger, fmt::format("Node {} was not found in topology", target));
}
target_id = n->host_id();
pending_replicas = addr_vector_to_id(ermp->get_topology(), pending_endpoints);
} else {
target_id = target;
pending_replicas = std::move(pending_endpoints);
}
auto target_id = n->host_id();
auto pending_replicas = addr_vector_to_id(ermp->get_topology(), pending_endpoints);
return mutate_prepare(std::array{std::move(m)}, cl, type, /* does view building should hold a real permit */ empty_service_permit(),
[this, tr_state, erm = std::move(ermp), target = std::array{target_id}, pending_endpoints = std::move(pending_replicas), &stats, cancellable] (
@@ -4140,12 +4149,12 @@ future<> storage_proxy::send_to_endpoint(
cancellable);
}
future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, gms::inet_address target) {
future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target) {
return send_to_endpoint(
std::make_unique<hint_mutation>(std::move(fm_a_s)),
std::move(ermp),
std::move(target),
{ },
host_id_vector_topology_change{},
db::write_type::SIMPLE,
tracing::trace_state_ptr(),
get_stats(),

View File

@@ -442,11 +442,12 @@ private:
future<result<>> do_mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool, db::allow_per_partition_rate_limit allow_limit, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker);
template<typename NodesContainer>
future<> send_to_endpoint(
std::unique_ptr<mutation_holder> m,
locator::effective_replication_map_ptr ermp,
gms::inet_address target,
inet_address_vector_topology_change pending_endpoints,
NodesContainer::value_type target,
NodesContainer pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
@@ -638,7 +639,7 @@ public:
// Send a mutation to a specific remote target as a hint.
// Unlike regular mutations during write operations, hints are sent on the streaming connection
// and use different RPC verb.
future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, gms::inet_address target);
future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, locator::effective_replication_map_ptr ermp, locator::host_id target);
/**
* Performs the truncate operatoin, which effectively deletes all data from