messaging_service: move repair verbs to IDL
This commit is contained in:
@@ -98,3 +98,13 @@ struct repair_flush_hints_batchlog_response {
|
||||
|
||||
verb [[with_client_info]] repair_update_system_table (repair_update_system_table_request req [[ref]]) -> repair_update_system_table_response;
|
||||
verb [[with_client_info]] repair_flush_hints_batchlog (repair_flush_hints_batchlog_request req [[ref]]) -> repair_flush_hints_batchlog_response;
|
||||
verb [[with_client_info]] repair_get_full_row_hashes (uint32_t repair_meta_id, shard_id dst_shard_id [[version 5.2]]) -> repair_hash_set;
|
||||
verb [[with_client_info]] repair_get_combined_row_hash (uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary, shard_id dst_shard_id [[version 5.2]]) -> get_combined_row_hash_response;
|
||||
verb [[with_client_info]] repair_get_sync_boundary (uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary, shard_id dst_shard_id [[version 5.2]]) -> get_sync_boundary_response;
|
||||
verb [[with_client_info]] repair_get_row_diff (uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_shard_id [[version 5.2]]) -> repair_rows_on_wire;
|
||||
verb [[with_client_info]] repair_put_row_diff (uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_shard_id [[version 5.2]]);
|
||||
verb [[with_client_info]] repair_row_level_start (uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason [[version 4.1.0]], gc_clock::time_point compaction_time [[version 5.2]], shard_id dst_shard_id [[version 5.2]]) -> repair_row_level_start_response [[version 4.2.0]];
|
||||
verb [[with_client_info]] repair_row_level_stop (uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_shard_id [[version 5.2]]);
|
||||
verb [[with_client_info]] repair_get_estimated_partitions (uint32_t repair_meta_id, shard_id dst_shard_id [[version 5.2]]) -> uint64_t;
|
||||
verb [[with_client_info]] repair_set_estimated_partitions (uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_shard_id [[version 5.2]]);
|
||||
verb [[with_client_info]] repair_get_diff_algorithms () -> std::vector<row_level_diff_detect_algorithm>;
|
||||
|
||||
@@ -1259,115 +1259,6 @@ future<> messaging_service::unregister_complete_message() {
|
||||
return unregister_handler(messaging_verb::COMPLETE_MESSAGE);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
|
||||
void messaging_service::register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_full_row_hashes() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES);
|
||||
}
|
||||
future<repair_hash_set> messaging_service::send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id, shard_id dst_shard_id) {
|
||||
return send_message<future<repair_hash_set>>(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(id), repair_meta_id, dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
|
||||
void messaging_service::register_repair_get_combined_row_hash(std::function<future<get_combined_row_hash_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_COMBINED_ROW_HASH, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_combined_row_hash() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_COMBINED_ROW_HASH);
|
||||
}
|
||||
future<get_combined_row_hash_response> messaging_service::send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary, shard_id dst_shard_id) {
|
||||
return send_message<future<get_combined_row_hash_response>>(this, messaging_verb::REPAIR_GET_COMBINED_ROW_HASH, std::move(id), repair_meta_id, std::move(common_sync_boundary), dst_shard_id);
|
||||
}
|
||||
|
||||
void messaging_service::register_repair_get_sync_boundary(std::function<future<get_sync_boundary_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_SYNC_BOUNDARY, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_sync_boundary() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_SYNC_BOUNDARY);
|
||||
}
|
||||
future<get_sync_boundary_response> messaging_service::send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary, shard_id dst_shard_id) {
|
||||
return send_message<future<get_sync_boundary_response>>(this, messaging_verb::REPAIR_GET_SYNC_BOUNDARY, std::move(id), repair_meta_id, std::move(skipped_sync_boundary), dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_ROW_DIFF
|
||||
void messaging_service::register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_row_diff() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF);
|
||||
}
|
||||
future<repair_rows_on_wire> messaging_service::send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_shard_id) {
|
||||
return send_message<future<repair_rows_on_wire>>(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(id), repair_meta_id, std::move(set_diff), needs_all_rows, dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_PUT_ROW_DIFF
|
||||
void messaging_service::register_repair_put_row_diff(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_put_row_diff() {
|
||||
return unregister_handler(messaging_verb::REPAIR_PUT_ROW_DIFF);
|
||||
}
|
||||
future<> messaging_service::send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_shard_id) {
|
||||
return send_message<void>(this, messaging_verb::REPAIR_PUT_ROW_DIFF, std::move(id), repair_meta_id, std::move(row_diff), dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_START
|
||||
void messaging_service::register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_row_level_start() {
|
||||
return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START);
|
||||
}
|
||||
future<rpc::optional<repair_row_level_start_response>> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, shard_id dst_shard_id) {
|
||||
return send_message<rpc::optional<repair_row_level_start_response>>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason, compaction_time, dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_STOP
|
||||
void messaging_service::register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_STOP, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_row_level_stop() {
|
||||
return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_STOP);
|
||||
}
|
||||
future<> messaging_service::send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_shard_id) {
|
||||
return send_message<void>(this, messaging_verb::REPAIR_ROW_LEVEL_STOP, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS
|
||||
void messaging_service::register_repair_get_estimated_partitions(std::function<future<uint64_t> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_estimated_partitions() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS);
|
||||
}
|
||||
future<uint64_t> messaging_service::send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id, shard_id dst_shard_id) {
|
||||
return send_message<future<uint64_t>>(this, messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS, std::move(id), repair_meta_id, dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS
|
||||
void messaging_service::register_repair_set_estimated_partitions(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions, rpc::optional<shard_id> dst_shard_id)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_set_estimated_partitions() {
|
||||
return unregister_handler(messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS);
|
||||
}
|
||||
future<> messaging_service::send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_shard_id) {
|
||||
return send_message<void>(this, messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS, std::move(id), repair_meta_id, estimated_partitions, dst_shard_id);
|
||||
}
|
||||
|
||||
// Wrapper for REPAIR_GET_DIFF_ALGORITHMS
|
||||
void messaging_service::register_repair_get_diff_algorithms(std::function<future<std::vector<row_level_diff_detect_algorithm>> (const rpc::client_info& cinfo)>&& func) {
|
||||
register_handler(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_repair_get_diff_algorithms() {
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_DIFF_ALGORITHMS);
|
||||
}
|
||||
future<std::vector<row_level_diff_detect_algorithm>> messaging_service::send_repair_get_diff_algorithms(msg_addr id) {
|
||||
return send_message<future<std::vector<row_level_diff_detect_algorithm>>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id));
|
||||
}
|
||||
|
||||
// Wrapper for TASKS_CHILDREN_REQUEST
|
||||
void messaging_service::register_tasks_get_children(std::function<future<tasks::get_children_response> (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) {
|
||||
register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func));
|
||||
|
||||
@@ -438,56 +438,6 @@ public:
|
||||
future<> send_complete_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id, bool failed = false);
|
||||
future<> unregister_complete_message();
|
||||
|
||||
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
|
||||
void register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_get_full_row_hashes();
|
||||
future<repair_hash_set> send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_COMBINED_ROW_HASH
|
||||
void register_repair_get_combined_row_hash(std::function<future<get_combined_row_hash_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_get_combined_row_hash();
|
||||
future<get_combined_row_hash_response> send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> common_sync_boundary, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_SYNC_BOUNDARY
|
||||
void register_repair_get_sync_boundary(std::function<future<get_sync_boundary_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_get_sync_boundary();
|
||||
future<get_sync_boundary_response> send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional<repair_sync_boundary> skipped_sync_boundary, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_ROW_DIFF
|
||||
void register_repair_get_row_diff(std::function<future<repair_rows_on_wire> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_get_row_diff();
|
||||
future<repair_rows_on_wire> send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_PUT_ROW_DIFF
|
||||
void register_repair_put_row_diff(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_put_row_diff();
|
||||
future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_START
|
||||
void register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_row_level_start();
|
||||
future<rpc::optional<repair_row_level_start_response>> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_ROW_LEVEL_STOP
|
||||
void register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_row_level_stop();
|
||||
future<> send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS
|
||||
void register_repair_get_estimated_partitions(std::function<future<uint64_t> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_get_estimated_partitions();
|
||||
future<uint64_t> send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS
|
||||
void register_repair_set_estimated_partitions(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions, rpc::optional<shard_id> dst_cpu_id)>&& func);
|
||||
future<> unregister_repair_set_estimated_partitions();
|
||||
future<> send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_cpu_id);
|
||||
|
||||
// Wrapper for REPAIR_GET_DIFF_ALGORITHMS
|
||||
void register_repair_get_diff_algorithms(std::function<future<std::vector<row_level_diff_detect_algorithm>> (const rpc::client_info& cinfo)>&& func);
|
||||
future<> unregister_repair_get_diff_algorithms();
|
||||
future<std::vector<row_level_diff_detect_algorithm>> send_repair_get_diff_algorithms(msg_addr id);
|
||||
|
||||
// Wrapper for TASKS_GET_CHILDREN
|
||||
void register_tasks_get_children(std::function<future<tasks::get_children_response> (const rpc::client_info& cinfo, tasks::get_children_request)>&& func);
|
||||
future<> unregister_tasks_get_children();
|
||||
|
||||
@@ -224,7 +224,7 @@ static const std::vector<row_level_diff_detect_algorithm>& suportted_diff_detect
|
||||
static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(netw::messaging_service& ms, const inet_address_vector_replica_set& nodes) {
|
||||
std::vector<std::vector<row_level_diff_detect_algorithm>> nodes_algorithms(nodes.size());
|
||||
parallel_for_each(std::views::iota(size_t(0), nodes.size()), coroutine::lambda([&] (size_t idx) -> future<> {
|
||||
std::vector<row_level_diff_detect_algorithm> algorithms = co_await ms.send_repair_get_diff_algorithms(netw::messaging_service::msg_addr(nodes[idx]));
|
||||
std::vector<row_level_diff_detect_algorithm> algorithms = co_await ser::repair_rpc_verbs::send_repair_get_diff_algorithms(&ms, netw::messaging_service::msg_addr(nodes[idx]));
|
||||
std::sort(algorithms.begin(), algorithms.end());
|
||||
nodes_algorithms[idx] = std::move(algorithms);
|
||||
rlogger.trace("Got node_algorithms={}, from node={}", nodes_algorithms[idx], nodes[idx]);
|
||||
@@ -1431,7 +1431,7 @@ public:
|
||||
if (remote_node == myip()) {
|
||||
co_return co_await get_full_row_hashes_handler();
|
||||
}
|
||||
repair_hash_set hashes = co_await _messaging.send_repair_get_full_row_hashes(msg_addr(remote_node),
|
||||
repair_hash_set hashes = co_await ser::repair_rpc_verbs::send_repair_get_full_row_hashes(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, dst_cpu_id);
|
||||
rlogger.debug("Got full hashes from peer={}, nr_hashes={}", remote_node, hashes.size());
|
||||
_metrics.rx_hashes_nr += hashes.size();
|
||||
@@ -1508,7 +1508,7 @@ public:
|
||||
if (remote_node == myip()) {
|
||||
co_return co_await get_combined_row_hash_handler(common_sync_boundary);
|
||||
}
|
||||
get_combined_row_hash_response resp = co_await _messaging.send_repair_get_combined_row_hash(msg_addr(remote_node),
|
||||
get_combined_row_hash_response resp = co_await ser::repair_rpc_verbs::send_repair_get_combined_row_hash(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, common_sync_boundary, dst_cpu_id);
|
||||
stats().rpc_call_nr++;
|
||||
stats().rx_hashes_nr++;
|
||||
@@ -1542,7 +1542,7 @@ public:
|
||||
// the time this change is introduced.
|
||||
sstring remote_partitioner_name = "org.apache.cassandra.dht.Murmur3Partitioner";
|
||||
rpc::optional<repair_row_level_start_response> resp =
|
||||
co_await _messaging.send_repair_row_level_start(msg_addr(remote_node),
|
||||
co_await ser::repair_rpc_verbs::send_repair_row_level_start(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, ks_name, cf_name, std::move(range), _algo, _max_row_buf_size, _seed,
|
||||
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb,
|
||||
remote_partitioner_name, std::move(schema_version), reason, compaction_time, dst_cpu_id);
|
||||
@@ -1575,7 +1575,7 @@ public:
|
||||
co_return co_await stop();
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
co_return co_await _messaging.send_repair_row_level_stop(msg_addr(remote_node),
|
||||
co_return co_await ser::repair_rpc_verbs::send_repair_row_level_stop(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), dst_cpu_id);
|
||||
}
|
||||
|
||||
@@ -1596,7 +1596,7 @@ public:
|
||||
co_return co_await get_estimated_partitions();
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
co_return co_await _messaging.send_repair_get_estimated_partitions(msg_addr(remote_node), _repair_meta_id, dst_cpu_id);
|
||||
co_return co_await ser::repair_rpc_verbs::send_repair_get_estimated_partitions(&_messaging, msg_addr(remote_node), _repair_meta_id, dst_cpu_id);
|
||||
}
|
||||
|
||||
|
||||
@@ -1615,7 +1615,7 @@ public:
|
||||
co_return co_await set_estimated_partitions(estimated_partitions);
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
co_return co_await _messaging.send_repair_set_estimated_partitions(msg_addr(remote_node), _repair_meta_id, estimated_partitions, dst_cpu_id);
|
||||
co_return co_await ser::repair_rpc_verbs::send_repair_set_estimated_partitions(&_messaging, msg_addr(remote_node), _repair_meta_id, estimated_partitions, dst_cpu_id);
|
||||
}
|
||||
|
||||
|
||||
@@ -1635,7 +1635,7 @@ public:
|
||||
co_return co_await get_sync_boundary_handler(skipped_sync_boundary);
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
co_return co_await _messaging.send_repair_get_sync_boundary(msg_addr(remote_node), _repair_meta_id, skipped_sync_boundary, dst_cpu_id);
|
||||
co_return co_await ser::repair_rpc_verbs::send_repair_get_sync_boundary(&_messaging, msg_addr(remote_node), _repair_meta_id, skipped_sync_boundary, dst_cpu_id);
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
@@ -1662,7 +1662,7 @@ public:
|
||||
_metrics.tx_hashes_nr += set_diff.size();
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
repair_rows_on_wire rows = _messaging.send_repair_get_row_diff(msg_addr(remote_node),
|
||||
repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, std::move(set_diff), bool(needs_all_rows), dst_cpu_id).get();
|
||||
if (!rows.empty()) {
|
||||
apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::no, node_idx);
|
||||
@@ -1676,7 +1676,7 @@ public:
|
||||
return;
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
repair_rows_on_wire rows = _messaging.send_repair_get_row_diff(msg_addr(remote_node),
|
||||
repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, msg_addr(remote_node),
|
||||
_repair_meta_id, {}, bool(needs_all_rows_t::yes), dst_cpu_id).get();
|
||||
if (!rows.empty()) {
|
||||
apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::yes, node_idx);
|
||||
@@ -1803,7 +1803,7 @@ public:
|
||||
stats().tx_row_bytes += row_bytes;
|
||||
stats().rpc_call_nr++;
|
||||
repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff));
|
||||
co_await _messaging.send_repair_put_row_diff(msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id);
|
||||
co_await ser::repair_rpc_verbs::send_repair_put_row_diff(&_messaging, msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2425,7 +2425,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
});
|
||||
return make_ready_future<rpc::sink<repair_hash_with_cmd>>(sink);
|
||||
});
|
||||
ms.register_repair_get_full_row_hashes([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
ser::repair_rpc_verbs::register_repair_get_full_row_hashes(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2439,7 +2439,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
});
|
||||
}) ;
|
||||
});
|
||||
ms.register_repair_get_combined_row_hash([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_get_combined_row_hash(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
std::optional<repair_sync_boundary> common_sync_boundary, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2455,7 +2455,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
});
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_sync_boundary([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_get_sync_boundary(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
std::optional<repair_sync_boundary> skipped_sync_boundary, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
@@ -2470,7 +2470,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
});
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_row_diff([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_get_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
repair_hash_set set_diff, bool needs_all_rows, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2493,7 +2493,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
}
|
||||
});
|
||||
});
|
||||
ms.register_repair_put_row_diff([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_put_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
repair_rows_on_wire row_diff, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2513,7 +2513,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
}
|
||||
});
|
||||
});
|
||||
ms.register_repair_row_level_start([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
|
||||
ser::repair_rpc_verbs::register_repair_row_level_start(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
|
||||
sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed,
|
||||
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version,
|
||||
rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
@@ -2535,7 +2535,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
schema_version, r, ct, _repair_module->abort_source());
|
||||
});
|
||||
});
|
||||
ms.register_repair_row_level_stop([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_row_level_stop(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
sstring ks_name, sstring cf_name, dht::token_range range, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2545,7 +2545,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
std::move(ks_name), std::move(cf_name), std::move(range));
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_estimated_partitions([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
ser::repair_rpc_verbs::register_repair_get_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
@@ -2553,7 +2553,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
return repair_meta::repair_get_estimated_partitions_handler(local_repair, from, repair_meta_id);
|
||||
});
|
||||
});
|
||||
ms.register_repair_set_estimated_partitions([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
ser::repair_rpc_verbs::register_repair_set_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
uint64_t estimated_partitions, rpc::optional<shard_id> dst_cpu_id_opt) {
|
||||
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
|
||||
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
|
||||
@@ -2562,7 +2562,7 @@ future<> repair_service::init_ms_handlers() {
|
||||
return repair_meta::repair_set_estimated_partitions_handler(local_repair, from, repair_meta_id, estimated_partitions);
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_diff_algorithms([] (const rpc::client_info& cinfo) {
|
||||
ser::repair_rpc_verbs::register_repair_get_diff_algorithms(&ms, [] (const rpc::client_info& cinfo) {
|
||||
return make_ready_future<std::vector<row_level_diff_detect_algorithm>>(suportted_diff_detect_algorithms());
|
||||
});
|
||||
ser::repair_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) {
|
||||
@@ -2584,16 +2584,6 @@ future<> repair_service::uninit_ms_handlers() {
|
||||
ms.unregister_repair_get_row_diff_with_rpc_stream(),
|
||||
ms.unregister_repair_put_row_diff_with_rpc_stream(),
|
||||
ms.unregister_repair_get_full_row_hashes_with_rpc_stream(),
|
||||
ms.unregister_repair_get_full_row_hashes(),
|
||||
ms.unregister_repair_get_combined_row_hash(),
|
||||
ms.unregister_repair_get_sync_boundary(),
|
||||
ms.unregister_repair_get_row_diff(),
|
||||
ms.unregister_repair_put_row_diff(),
|
||||
ms.unregister_repair_row_level_start(),
|
||||
ms.unregister_repair_row_level_stop(),
|
||||
ms.unregister_repair_get_estimated_partitions(),
|
||||
ms.unregister_repair_set_estimated_partitions(),
|
||||
ms.unregister_repair_get_diff_algorithms(),
|
||||
ser::repair_rpc_verbs::unregister(&ms)
|
||||
).discard_result();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user