From b6ccaffd4516ff6449e0c28c2e9b10c8a0b57781 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 26 Jun 2025 15:17:18 +0200 Subject: [PATCH] storage_proxy: rename create_write_response_handler -> make_write_response_handler Most of the create_write_response_handler overloads follow the same signature pattern to satisfy the sp::mutate_prepare call. The one which doesn't follow it is invoked by others and is responsible for creating a concrete handler instance. In this refactoring commit we rename it to make_write_response_handler to reduce confusion. --- service/storage_proxy.cc | 12 ++++++------ service/storage_proxy.hh | 19 ++++++++++++++++++- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3c6b2c45d2..33cc579b5d 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2683,7 +2683,7 @@ future> storage_proxy::response_wait(storage_proxy::response_id_type id return _response_handlers.find(id)->second; } -result storage_proxy::create_write_response_handler(locator::effective_replication_map_ptr ermp, +result storage_proxy::make_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr m, host_id_vector_replica_set targets, const host_id_vector_topology_change& pending_endpoints, host_id_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable cancellable) @@ -3456,7 +3456,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok db::assure_sufficient_live_nodes(cl, *erm, live_endpoints, pending_endpoints); - return create_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints, + return make_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info, cancellable); } @@ -3497,7 +3497,7 @@ storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db tracing::trace(tr_state, "Creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints); // No rate limiting for read repair - return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), host_id_vector_topology_change(), host_id_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no); + return make_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), host_id_vector_topology_change(), host_id_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no); } result @@ -3526,7 +3526,7 @@ storage_proxy::create_write_response_handler(const std::tupleget_effective_replication_map(); // No rate limiting for paxos (yet) - return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique(std::move(commit), s, nullptr), std::move(endpoints), + return make_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique(std::move(commit), s, nullptr), std::move(endpoints), host_id_vector_topology_change(), host_id_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no); } @@ -4088,7 +4088,7 @@ storage_proxy::mutate_atomically_result(utils::chunked_vector mutation future> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) { return _p.mutate_prepare<>(std::array{std::move(m)}, [this, cl, permit = _permit] (const mutation& m) { - return _p.create_write_response_handler(_ermp, cl, db::write_type::BATCH_LOG, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, permit, std::monostate(), is_cancellable::no); + return _p.make_write_response_handler(_ermp, cl, db::write_type::BATCH_LOG, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, permit, std::monostate(), is_cancellable::no); }).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) { _p.register_cdc_operation_result_tracker(ids, _cdc_tracker); return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout); @@ -4228,7 +4228,7 @@ future<> storage_proxy::send_to_endpoint( std::bind_front(&storage_proxy::is_alive, this, std::cref(*erm))); slogger.trace("Creating write handler with live: {}; dead: {}", targets, dead_endpoints); db::assure_sufficient_live_nodes(cl, *erm, targets, pending_endpoints); - return create_write_response_handler( + return make_write_response_handler( std::move(erm), cl, type, diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 9d7c428f70..8a57cc7be2 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -336,10 +336,26 @@ private: void got_failure_response(response_id_type id, locator::host_id from, size_t count, std::optional backlog, error err, std::optional msg); future> response_wait(response_id_type id, clock_type::time_point timeout); ::shared_ptr& get_write_response_handler(storage_proxy::response_id_type id); + + // The `make_write_response_handler` function instantiates a concrete response handler + // for the given set of replicas. + // + // The various `create_write_response_handler` overloads share a similar signature + // to satisfy the requirements of `sp::mutate_prepare`. They differ only in the type + // of the first `mutation` parameter and dispatch accordingly: + // + // - `create_write_response_handler_helper`: selects the appropriate replica set + // based on the token and schema, then delegates to `make_write_response_handler`. + // + // - `make_write_response_handler`: builds the final handler using the resolved replica set. + // + // In summary, the overloads abstract away mutation-type-specific logic while ensuring + // that replica selection and handler instantiation follow a consistent flow. + result create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable); - result create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr m, host_id_vector_replica_set targets, + result make_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr m, host_id_vector_replica_set targets, const host_id_vector_topology_change& pending_endpoints, host_id_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable); result create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); @@ -349,6 +365,7 @@ private: db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const std::tuple, schema_ptr, shared_ptr, dht::token, host_id_vector_replica_set>& meta, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); + void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr tracker); template bool should_reject_due_to_view_backlog(const Range& targets, const schema_ptr& s) const;