diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 8fc2e16188..138b46ce02 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -132,14 +132,6 @@ void validate_read_replicas(const locator::effective_replication_map& erm, const } } -void remove_non_local_host_ids(HostIdVector auto& host_ids, const locator::effective_replication_map& erm) { - const auto my_id = erm.get_topology().my_host_id(); - auto it = std::ranges::remove_if(host_ids, [&my_id](locator::host_id& id) { - return id != my_id; - }).begin(); - host_ids.erase(it, host_ids.end()); -} - } // namespace namespace storage_proxy_stats { @@ -3427,8 +3419,20 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable cancellable, coordinator_mutate_options options) { replica::table& table = _db.local().find_column_family(s->id()); auto erm = table.get_effective_replication_map(); - host_id_vector_replica_set natural_endpoints = erm->get_natural_replicas(token); - host_id_vector_topology_change pending_endpoints = erm->get_pending_replicas(token); + + host_id_vector_replica_set natural_endpoints; + host_id_vector_topology_change pending_endpoints; + if (options.node_local_only) [[unlikely]] { + if (erm->get_sharder(*s).shard_for_writes(token).empty()) [[unlikely]] { + on_internal_error(slogger, + format("Local-only write failed on node {}: not a replica for token {} of {}.{}", + my_host_id(*erm), token, s->ks_name(), s->cf_name())); + } + natural_endpoints = host_id_vector_replica_set{my_host_id(*erm)}; + } else { + natural_endpoints = erm->get_natural_replicas(token); + pending_endpoints = erm->get_pending_replicas(token); + } slogger.trace("creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints, pending_endpoints); tracing::trace(tr_state, "Creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints ,pending_endpoints); @@ -3451,11 +3455,6 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok }).begin(); pending_endpoints.erase(itend, pending_endpoints.end()); - if (options.node_local_only) { - remove_non_local_host_ids(natural_endpoints, *erm); - remove_non_local_host_ids(pending_endpoints, *erm); - } - auto all = natural_endpoints; std::ranges::copy(pending_endpoints, std::back_inserter(all)); @@ -5810,7 +5809,7 @@ result<::shared_ptr> storage_proxy::get_read_executor(lw speculative_retry::type retry_type = schema->speculative_retry().get_type(); std::optional extra_replica; - host_id_vector_replica_set all_replicas = get_endpoints_for_reading(schema->ks_name(), *erm, token, node_local_only); + host_id_vector_replica_set all_replicas = get_endpoints_for_reading(*schema, *erm, token, node_local_only); // Check for a non-local read before heat-weighted load balancing // reordering of endpoints happens. The local endpoint, if // present, is always first in the list, as get_endpoints_for_reading() @@ -6127,7 +6126,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t while (i != ranges.end()) { dht::partition_range& range = *i; - host_id_vector_replica_set live_endpoints = get_endpoints_for_reading(schema->ks_name(), *erm, end_token(range), node_local_only); + host_id_vector_replica_set live_endpoints = get_endpoints_for_reading(*schema, *erm, end_token(range), node_local_only); host_id_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i); host_id_vector_replica_set filtered_endpoints = filter_replicas_for_read(cl, *erm, live_endpoints, merged_preferred_replicas, pcf); std::vector merged_ranges{to_token_range(range)}; @@ -6143,7 +6142,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t { const auto current_range_preferred_replicas = preferred_replicas_for_range(*i); dht::partition_range& next_range = *i; - host_id_vector_replica_set next_endpoints = get_endpoints_for_reading(schema->ks_name(), *erm, end_token(next_range), node_local_only); + host_id_vector_replica_set next_endpoints = get_endpoints_for_reading(*schema, *erm, end_token(next_range), node_local_only); host_id_vector_replica_set next_filtered_endpoints = filter_replicas_for_read(cl, *erm, next_endpoints, current_range_preferred_replicas, pcf); // Origin has this to say here: @@ -6761,14 +6760,19 @@ void storage_proxy::sort_endpoints_by_proximity(const locator::effective_replica } } -host_id_vector_replica_set storage_proxy::get_endpoints_for_reading(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token& token, node_local_only node_local_only) const { +host_id_vector_replica_set storage_proxy::get_endpoints_for_reading(const schema& s, const locator::effective_replication_map& erm, const dht::token& token, node_local_only node_local_only) const { + if (node_local_only) [[unlikely]] { + if (!erm.get_sharder(s).try_get_shard_for_reads(token)) [[unlikely]] { + on_internal_error(slogger, + format("Local-only read failed on node {}: not a replica for token {} of {}.{}", + my_host_id(erm), token, s.ks_name(), s.cf_name())); + } + return host_id_vector_replica_set{my_host_id(erm)}; + } auto endpoints = erm.get_replicas_for_reading(token); validate_read_replicas(erm, endpoints); auto it = std::ranges::remove_if(endpoints, std::not_fn(std::bind_front(&storage_proxy::is_alive, this, std::cref(erm)))).begin(); endpoints.erase(it, endpoints.end()); - if (node_local_only) { - remove_non_local_host_ids(endpoints, erm); - } sort_endpoints_by_proximity(erm, endpoints); return endpoints; } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 05c3f4fb59..3b75405522 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -389,7 +389,7 @@ private: bool hints_enabled(db::write_type type) const noexcept; db::hints::manager& hints_manager_for(db::write_type type); void sort_endpoints_by_proximity(const locator::effective_replication_map& erm, host_id_vector_replica_set& eps) const; - host_id_vector_replica_set get_endpoints_for_reading(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token& token, node_local_only node_local_only) const; + host_id_vector_replica_set get_endpoints_for_reading(const schema& s, const locator::effective_replication_map& erm, const dht::token& token, node_local_only node_local_only) const; host_id_vector_replica_set filter_replicas_for_read(db::consistency_level, const locator::effective_replication_map&, host_id_vector_replica_set live_endpoints, const host_id_vector_replica_set& preferred_endpoints, db::read_repair_decision, std::optional* extra, replica::column_family*) const; // As above with read_repair_decision=NONE, extra=nullptr. host_id_vector_replica_set filter_replicas_for_read(db::consistency_level, const locator::effective_replication_map&, const host_id_vector_replica_set& live_endpoints, const host_id_vector_replica_set& preferred_endpoints, replica::column_family*) const;