storage_proxy: rename create_write_response_handler -> make_write_response_handler

Most of the create_write_response_handler overloads follow the same
signature pattern to satisfy the sp::mutate_prepare call. The one which
doesn't follow it is invoked by others and is responsible for creating
a concrete handler instance. In this refactoring commit we rename
it to make_write_response_handler to reduce confusion.
This commit is contained in:
Petr Gusev
2025-06-26 15:17:18 +02:00
parent db946edd1d
commit b6ccaffd45
2 changed files with 24 additions and 7 deletions

View File

@@ -2683,7 +2683,7 @@ future<result<>> storage_proxy::response_wait(storage_proxy::response_id_type id
return _response_handlers.find(id)->second;
}
result<storage_proxy::response_id_type> storage_proxy::create_write_response_handler(locator::effective_replication_map_ptr ermp,
result<storage_proxy::response_id_type> storage_proxy::make_write_response_handler(locator::effective_replication_map_ptr ermp,
db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
host_id_vector_replica_set targets, const host_id_vector_topology_change& pending_endpoints, host_id_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable cancellable)
@@ -3456,7 +3456,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
db::assure_sufficient_live_nodes(cl, *erm, live_endpoints, pending_endpoints);
return create_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints,
return make_write_response_handler(std::move(erm), cl, type, std::move(mh), std::move(live_endpoints), pending_endpoints,
std::move(dead_endpoints), std::move(tr_state), get_stats(), std::move(permit), rate_limit_info, cancellable);
}
@@ -3497,7 +3497,7 @@ storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db
tracing::trace(tr_state, "Creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints);
// No rate limiting for read repair
return create_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), host_id_vector_topology_change(), host_id_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no);
return make_write_response_handler(std::move(mut.ermp), cl, type, std::move(mh), std::move(endpoints), host_id_vector_topology_change(), host_id_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no);
}
result<storage_proxy::response_id_type>
@@ -3526,7 +3526,7 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
auto ermp = paxos_handler->get_effective_replication_map();
// No rate limiting for paxos (yet)
return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
return make_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
host_id_vector_topology_change(), host_id_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit), std::monostate(), is_cancellable::no);
}
@@ -4088,7 +4088,7 @@ storage_proxy::mutate_atomically_result(utils::chunked_vector<mutation> mutation
future<result<>> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) {
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, [this, cl, permit = _permit] (const mutation& m) {
return _p.create_write_response_handler(_ermp, cl, db::write_type::BATCH_LOG, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, permit, std::monostate(), is_cancellable::no);
return _p.make_write_response_handler(_ermp, cl, db::write_type::BATCH_LOG, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, permit, std::monostate(), is_cancellable::no);
}).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) {
_p.register_cdc_operation_result_tracker(ids, _cdc_tracker);
return _p.mutate_begin(std::move(ids), cl, _trace_state, _timeout);
@@ -4228,7 +4228,7 @@ future<> storage_proxy::send_to_endpoint(
std::bind_front(&storage_proxy::is_alive, this, std::cref(*erm)));
slogger.trace("Creating write handler with live: {}; dead: {}", targets, dead_endpoints);
db::assure_sufficient_live_nodes(cl, *erm, targets, pending_endpoints);
return create_write_response_handler(
return make_write_response_handler(
std::move(erm),
cl,
type,

View File

@@ -336,10 +336,26 @@ private:
void got_failure_response(response_id_type id, locator::host_id from, size_t count, std::optional<db::view::update_backlog> backlog, error err, std::optional<sstring> msg);
future<result<>> response_wait(response_id_type id, clock_type::time_point timeout);
::shared_ptr<abstract_write_response_handler>& get_write_response_handler(storage_proxy::response_id_type id);
// The `make_write_response_handler` function instantiates a concrete response handler
// for the given set of replicas.
//
// The various `create_write_response_handler` overloads share a similar signature
// to satisfy the requirements of `sp::mutate_prepare`. They differ only in the type
// of the first `mutation` parameter and dispatch accordingly:
//
// - `create_write_response_handler_helper`: selects the appropriate replica set
// based on the token and schema, then delegates to `make_write_response_handler`.
//
// - `make_write_response_handler`: builds the final handler using the resolved replica set.
//
// In summary, the overloads abstract away mutation-type-specific logic while ensuring
// that replica selection and handler instantiation follow a consistent flow.
result<response_id_type> create_write_response_handler_helper(schema_ptr s, const dht::token& token,
std::unique_ptr<mutation_holder> mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state,
service_permit permit, db::allow_per_partition_rate_limit allow_limit, is_cancellable);
result<response_id_type> create_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, host_id_vector_replica_set targets,
result<response_id_type> make_write_response_handler(locator::effective_replication_map_ptr ermp, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, host_id_vector_replica_set targets,
const host_id_vector_topology_change& pending_endpoints, host_id_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info, is_cancellable);
result<response_id_type> create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
@@ -349,6 +365,7 @@ private:
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token, host_id_vector_replica_set>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker);
template<typename Range>
bool should_reject_due_to_view_backlog(const Range& targets, const schema_ptr& s) const;