diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 322a976d2e..8f0e99358c 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2692,6 +2692,14 @@ inline std::ostream& operator<<(std::ostream& os, const hint_wrapper& h) { return os << "hint_wrapper{" << h.mut << "}"; } +struct read_repair_mutation { + std::unordered_map> value; +}; + +inline std::ostream& operator<<(std::ostream& os, const read_repair_mutation& m) { + return os << m.value; +} + using namespace std::literals::chrono_literals; storage_proxy::~storage_proxy() {} @@ -2920,8 +2928,9 @@ storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consiste } result -storage_proxy::create_write_response_handler(const std::unordered_map>& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { +storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { inet_address_vector_replica_set endpoints; + const auto& m = mut.value; endpoints.reserve(m.size()); boost::copy(m | boost::adaptors::map_keys, std::inserter(endpoints, endpoints.begin())); auto mh = std::make_unique(m); @@ -3846,7 +3855,7 @@ future> storage_proxy::schedule_repair(std::unordered_map>(bo::success()); } - return mutate_internal(diffs | boost::adaptors::map_values, cl, false, std::move(trace_state), std::move(permit)); + return mutate_internal(diffs | boost::adaptors::map_values | boost::adaptors::transformed([] (auto& v) { return read_repair_mutation{std::move(v)}; }), cl, false, std::move(trace_state), std::move(permit)); } class abstract_read_resolver { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index ebf7f0e683..b0fb624179 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -82,6 +82,7 @@ class view_update_write_response_handler; class client_state; class migration_manager; struct hint_wrapper; +struct read_repair_mutation; using replicas_per_token_range = std::unordered_map>; @@ -311,7 +312,7 @@ private: const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info); result create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); - result create_write_response_handler(const std::unordered_map>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); + result create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const std::tuple, schema_ptr, shared_ptr, dht::token>& proposal, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit); result create_write_response_handler(const std::tuple, schema_ptr, dht::token, inet_address_vector_replica_set>& meta,