proxy: Wrap read repair entries into read_repair_mutation

The schedule_repair() operates on a map of endpoint:mutations pairs.
Next patch will need to extend this entry and it's going to be easier if
the entry is wrapped in a helper structure in advance.

This is where the forwardable reference cursor from the previous patch
gets its user. The schedule_repair() produces a range of rvalue
wrappers, but the create_write_response_handler accepting it is OK, it
copies mutations anyway.

The printing operator is added to facilitate mutations logging from
mutate_internal() method.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-12-14 13:09:57 +03:00
parent 014b563ef1
commit 140f373e15
2 changed files with 13 additions and 3 deletions

View File

@@ -2692,6 +2692,14 @@ inline std::ostream& operator<<(std::ostream& os, const hint_wrapper& h) {
return os << "hint_wrapper{" << h.mut << "}";
}
struct read_repair_mutation {
std::unordered_map<gms::inet_address, std::optional<mutation>> value;
};
inline std::ostream& operator<<(std::ostream& os, const read_repair_mutation& m) {
return os << m.value;
}
using namespace std::literals::chrono_literals;
storage_proxy::~storage_proxy() {}
@@ -2920,8 +2928,9 @@ storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consiste
}
result<storage_proxy::response_id_type>
storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
storage_proxy::create_write_response_handler(const read_repair_mutation& mut, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) {
inet_address_vector_replica_set endpoints;
const auto& m = mut.value;
endpoints.reserve(m.size());
boost::copy(m | boost::adaptors::map_keys, std::inserter(endpoints, endpoints.begin()));
auto mh = std::make_unique<per_destination_mutation>(m);
@@ -3846,7 +3855,7 @@ future<result<>> storage_proxy::schedule_repair(std::unordered_map<dht::token, s
if (diffs.empty()) {
return make_ready_future<result<>>(bo::success());
}
return mutate_internal(diffs | boost::adaptors::map_values, cl, false, std::move(trace_state), std::move(permit));
return mutate_internal(diffs | boost::adaptors::map_values | boost::adaptors::transformed([] (auto& v) { return read_repair_mutation{std::move(v)}; }), cl, false, std::move(trace_state), std::move(permit));
}
class abstract_read_resolver {

View File

@@ -82,6 +82,7 @@ class view_update_write_response_handler;
class client_state;
class migration_manager;
struct hint_wrapper;
struct read_repair_mutation;
using replicas_per_token_range = std::unordered_map<dht::token_range, std::vector<locator::host_id>>;
@@ -311,7 +312,7 @@ private:
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info);
result<response_id_type> create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const read_repair_mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, inet_address_vector_replica_set>& meta,