From 140f373e1576bee4918ea0bf61edbd6d77bbb65c Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 14 Dec 2022 13:09:57 +0300 Subject: [PATCH] proxy: Wrap read repair entries into read_repair_mutation The schedule_repair() operates on a map of endpoint:mutations pairs. Next patch will need to extend this entry and it's going to be easier if the entry is wrapped in a helper structure in advance. This is where the forwardable reference cursor from the previous patch gets its user. The schedule_repair() produces a range of rvalue wrappers, but the create_write_response_handler accepting it is OK, it copies mutations anyway. The printing operator is added to facilitate mutations logging from mutate_internal() method. Signed-off-by: Pavel Emelyanov --- service/storage_proxy.cc | 13 +++++++++++-- service/storage_proxy.hh | 3 ++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 322a976d2e..8f0e99358c 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2692,6 +2692,14 @@ inline std::ostream& operator<<(std::ostream& os, const hint_wrapper& h) { return os << "hint_wrapper{" << h.mut << "}"; } +struct read_repair_mutation { + std::unordered_map> value; +}; + +inline std::ostream& operator<<(std::ostream& os, const read_repair_mutation& m) { + return os << m.value; +} + using namespace std::literals::chrono_literals; storage_proxy::~storage_proxy() {} @@ -2920,8 +2928,9 @@ storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consiste } result -storage_proxy::create_write_response_handler(const std::unordered_map>& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { +storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { inet_address_vector_replica_set endpoints; + const auto& m = mut.value; endpoints.reserve(m.size()); boost::copy(m | boost::adaptors::map_keys, std::inserter(endpoints, endpoints.begin())); auto mh = std::make_unique(m); @@ -3846,7 +3855,7 @@ future> storage_proxy::schedule_repair(std::unordered_map>(bo::success()); } - return mutate_internal(diffs | boost::adaptors::map_values, cl, false, std::move(trace_state), std::move(permit)); + return mutate_internal(diffs | boost::adaptors::map_values | boost::adaptors::transformed([] (auto& v) { return read_repair_mutation{std::move(v)}; }), cl, false, std::move(trace_state), std::move(permit)); } class abstract_read_resolver { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index ebf7f0e683..b0fb624179 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -82,6 +82,7 @@ class view_update_write_response_handler; class client_state; class migration_manager; struct hint_wrapper; +struct read_repair_mutation; using replicas_per_token_range = std::unordered_map>; @@ -311,7 +312,7 @@ private: const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info); result create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); - result create_write_response_handler(const std::unordered_map>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); + result create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const std::tuple, schema_ptr, shared_ptr, dht::token>& proposal, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const std::tuple, schema_ptr, dht::token, inet_address_vector_replica_set>& meta,