diff --git a/idl/repair.idl.hh b/idl/repair.idl.hh index c3436ad299..468177e891 100644 --- a/idl/repair.idl.hh +++ b/idl/repair.idl.hh @@ -98,3 +98,13 @@ struct repair_flush_hints_batchlog_response { verb [[with_client_info]] repair_update_system_table (repair_update_system_table_request req [[ref]]) -> repair_update_system_table_response; verb [[with_client_info]] repair_flush_hints_batchlog (repair_flush_hints_batchlog_request req [[ref]]) -> repair_flush_hints_batchlog_response; +verb [[with_client_info]] repair_get_full_row_hashes (uint32_t repair_meta_id, shard_id dst_shard_id [[version 5.2]]) -> repair_hash_set; +verb [[with_client_info]] repair_get_combined_row_hash (uint32_t repair_meta_id, std::optional common_sync_boundary, shard_id dst_shard_id [[version 5.2]]) -> get_combined_row_hash_response; +verb [[with_client_info]] repair_get_sync_boundary (uint32_t repair_meta_id, std::optional skipped_sync_boundary, shard_id dst_shard_id [[version 5.2]]) -> get_sync_boundary_response; +verb [[with_client_info]] repair_get_row_diff (uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_shard_id [[version 5.2]]) -> repair_rows_on_wire; +verb [[with_client_info]] repair_put_row_diff (uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_shard_id [[version 5.2]]); +verb [[with_client_info]] repair_row_level_start (uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason [[version 4.1.0]], gc_clock::time_point compaction_time [[version 5.2]], shard_id dst_shard_id [[version 5.2]]) -> repair_row_level_start_response [[version 4.2.0]]; +verb [[with_client_info]] repair_row_level_stop (uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_shard_id [[version 5.2]]); +verb [[with_client_info]] repair_get_estimated_partitions (uint32_t repair_meta_id, shard_id dst_shard_id [[version 5.2]]) -> uint64_t; +verb [[with_client_info]] repair_set_estimated_partitions (uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_shard_id [[version 5.2]]); +verb [[with_client_info]] repair_get_diff_algorithms () -> std::vector; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 760d1263a7..3fc78eaf2a 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1259,115 +1259,6 @@ future<> messaging_service::unregister_complete_message() { return unregister_handler(messaging_verb::COMPLETE_MESSAGE); } -// Wrapper for REPAIR_GET_FULL_ROW_HASHES -void messaging_service::register_repair_get_full_row_hashes(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func)); -} -future<> messaging_service::unregister_repair_get_full_row_hashes() { - return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES); -} -future messaging_service::send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(id), repair_meta_id, dst_shard_id); -} - -// Wrapper for REPAIR_GET_COMBINED_ROW_HASH -void messaging_service::register_repair_get_combined_row_hash(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional common_sync_boundary, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_COMBINED_ROW_HASH, std::move(func)); -} -future<> messaging_service::unregister_repair_get_combined_row_hash() { - return unregister_handler(messaging_verb::REPAIR_GET_COMBINED_ROW_HASH); -} -future messaging_service::send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional common_sync_boundary, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_GET_COMBINED_ROW_HASH, std::move(id), repair_meta_id, std::move(common_sync_boundary), dst_shard_id); -} - -void messaging_service::register_repair_get_sync_boundary(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional skipped_sync_boundary, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_SYNC_BOUNDARY, std::move(func)); -} -future<> messaging_service::unregister_repair_get_sync_boundary() { - return unregister_handler(messaging_verb::REPAIR_GET_SYNC_BOUNDARY); -} -future messaging_service::send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional skipped_sync_boundary, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_GET_SYNC_BOUNDARY, std::move(id), repair_meta_id, std::move(skipped_sync_boundary), dst_shard_id); -} - -// Wrapper for REPAIR_GET_ROW_DIFF -void messaging_service::register_repair_get_row_diff(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(func)); -} -future<> messaging_service::unregister_repair_get_row_diff() { - return unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF); -} -future messaging_service::send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(id), repair_meta_id, std::move(set_diff), needs_all_rows, dst_shard_id); -} - -// Wrapper for REPAIR_PUT_ROW_DIFF -void messaging_service::register_repair_put_row_diff(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF, std::move(func)); -} -future<> messaging_service::unregister_repair_put_row_diff() { - return unregister_handler(messaging_verb::REPAIR_PUT_ROW_DIFF); -} -future<> messaging_service::send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_shard_id) { - return send_message(this, messaging_verb::REPAIR_PUT_ROW_DIFF, std::move(id), repair_meta_id, std::move(row_diff), dst_shard_id); -} - -// Wrapper for REPAIR_ROW_LEVEL_START -void messaging_service::register_repair_row_level_start(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional reason, rpc::optional compaction_time, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func)); -} -future<> messaging_service::unregister_repair_row_level_start() { - return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START); -} -future> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason, compaction_time, dst_shard_id); -} - -// Wrapper for REPAIR_ROW_LEVEL_STOP -void messaging_service::register_repair_row_level_stop(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_STOP, std::move(func)); -} -future<> messaging_service::unregister_repair_row_level_stop() { - return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_STOP); -} -future<> messaging_service::send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_shard_id) { - return send_message(this, messaging_verb::REPAIR_ROW_LEVEL_STOP, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), dst_shard_id); -} - -// Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS -void messaging_service::register_repair_get_estimated_partitions(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS, std::move(func)); -} -future<> messaging_service::unregister_repair_get_estimated_partitions() { - return unregister_handler(messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS); -} -future messaging_service::send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS, std::move(id), repair_meta_id, dst_shard_id); -} - -// Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS -void messaging_service::register_repair_set_estimated_partitions(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS, std::move(func)); -} -future<> messaging_service::unregister_repair_set_estimated_partitions() { - return unregister_handler(messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS); -} -future<> messaging_service::send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_shard_id) { - return send_message(this, messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS, std::move(id), repair_meta_id, estimated_partitions, dst_shard_id); -} - -// Wrapper for REPAIR_GET_DIFF_ALGORITHMS -void messaging_service::register_repair_get_diff_algorithms(std::function> (const rpc::client_info& cinfo)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(func)); -} -future<> messaging_service::unregister_repair_get_diff_algorithms() { - return unregister_handler(messaging_verb::REPAIR_GET_DIFF_ALGORITHMS); -} -future> messaging_service::send_repair_get_diff_algorithms(msg_addr id) { - return send_message>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id)); -} - // Wrapper for TASKS_CHILDREN_REQUEST void messaging_service::register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) { register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 32a5fbd3e2..a59983e482 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -438,56 +438,6 @@ public: future<> send_complete_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id, bool failed = false); future<> unregister_complete_message(); - // Wrapper for REPAIR_GET_FULL_ROW_HASHES - void register_repair_get_full_row_hashes(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_get_full_row_hashes(); - future send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id, shard_id dst_cpu_id); - - // Wrapper for REPAIR_GET_COMBINED_ROW_HASH - void register_repair_get_combined_row_hash(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional common_sync_boundary, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_get_combined_row_hash(); - future send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional common_sync_boundary, shard_id dst_cpu_id); - - // Wrapper for REPAIR_GET_SYNC_BOUNDARY - void register_repair_get_sync_boundary(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional skipped_sync_boundary, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_get_sync_boundary(); - future send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional skipped_sync_boundary, shard_id dst_cpu_id); - - // Wrapper for REPAIR_GET_ROW_DIFF - void register_repair_get_row_diff(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_get_row_diff(); - future send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_cpu_id); - - // Wrapper for REPAIR_PUT_ROW_DIFF - void register_repair_put_row_diff(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_put_row_diff(); - future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_cpu_id); - - // Wrapper for REPAIR_ROW_LEVEL_START - void register_repair_row_level_start(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional reason, rpc::optional compaction_time, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_row_level_start(); - future> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, shard_id dst_cpu_id); - - // Wrapper for REPAIR_ROW_LEVEL_STOP - void register_repair_row_level_stop(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_row_level_stop(); - future<> send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_cpu_id); - - // Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS - void register_repair_get_estimated_partitions(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_get_estimated_partitions(); - future send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id, shard_id dst_cpu_id); - - // Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS - void register_repair_set_estimated_partitions(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_set_estimated_partitions(); - future<> send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_cpu_id); - - // Wrapper for REPAIR_GET_DIFF_ALGORITHMS - void register_repair_get_diff_algorithms(std::function> (const rpc::client_info& cinfo)>&& func); - future<> unregister_repair_get_diff_algorithms(); - future> send_repair_get_diff_algorithms(msg_addr id); - // Wrapper for TASKS_GET_CHILDREN void register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func); future<> unregister_tasks_get_children(); diff --git a/repair/row_level.cc b/repair/row_level.cc index d262604eff..750c60ff4f 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -224,7 +224,7 @@ static const std::vector& suportted_diff_detect static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(netw::messaging_service& ms, const inet_address_vector_replica_set& nodes) { std::vector> nodes_algorithms(nodes.size()); parallel_for_each(std::views::iota(size_t(0), nodes.size()), coroutine::lambda([&] (size_t idx) -> future<> { - std::vector algorithms = co_await ms.send_repair_get_diff_algorithms(netw::messaging_service::msg_addr(nodes[idx])); + std::vector algorithms = co_await ser::repair_rpc_verbs::send_repair_get_diff_algorithms(&ms, netw::messaging_service::msg_addr(nodes[idx])); std::sort(algorithms.begin(), algorithms.end()); nodes_algorithms[idx] = std::move(algorithms); rlogger.trace("Got node_algorithms={}, from node={}", nodes_algorithms[idx], nodes[idx]); @@ -1431,7 +1431,7 @@ public: if (remote_node == myip()) { co_return co_await get_full_row_hashes_handler(); } - repair_hash_set hashes = co_await _messaging.send_repair_get_full_row_hashes(msg_addr(remote_node), + repair_hash_set hashes = co_await ser::repair_rpc_verbs::send_repair_get_full_row_hashes(&_messaging, msg_addr(remote_node), _repair_meta_id, dst_cpu_id); rlogger.debug("Got full hashes from peer={}, nr_hashes={}", remote_node, hashes.size()); _metrics.rx_hashes_nr += hashes.size(); @@ -1508,7 +1508,7 @@ public: if (remote_node == myip()) { co_return co_await get_combined_row_hash_handler(common_sync_boundary); } - get_combined_row_hash_response resp = co_await _messaging.send_repair_get_combined_row_hash(msg_addr(remote_node), + get_combined_row_hash_response resp = co_await ser::repair_rpc_verbs::send_repair_get_combined_row_hash(&_messaging, msg_addr(remote_node), _repair_meta_id, common_sync_boundary, dst_cpu_id); stats().rpc_call_nr++; stats().rx_hashes_nr++; @@ -1542,7 +1542,7 @@ public: // the time this change is introduced. sstring remote_partitioner_name = "org.apache.cassandra.dht.Murmur3Partitioner"; rpc::optional resp = - co_await _messaging.send_repair_row_level_start(msg_addr(remote_node), + co_await ser::repair_rpc_verbs::send_repair_row_level_start(&_messaging, msg_addr(remote_node), _repair_meta_id, ks_name, cf_name, std::move(range), _algo, _max_row_buf_size, _seed, _master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb, remote_partitioner_name, std::move(schema_version), reason, compaction_time, dst_cpu_id); @@ -1575,7 +1575,7 @@ public: co_return co_await stop(); } stats().rpc_call_nr++; - co_return co_await _messaging.send_repair_row_level_stop(msg_addr(remote_node), + co_return co_await ser::repair_rpc_verbs::send_repair_row_level_stop(&_messaging, msg_addr(remote_node), _repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), dst_cpu_id); } @@ -1596,7 +1596,7 @@ public: co_return co_await get_estimated_partitions(); } stats().rpc_call_nr++; - co_return co_await _messaging.send_repair_get_estimated_partitions(msg_addr(remote_node), _repair_meta_id, dst_cpu_id); + co_return co_await ser::repair_rpc_verbs::send_repair_get_estimated_partitions(&_messaging, msg_addr(remote_node), _repair_meta_id, dst_cpu_id); } @@ -1615,7 +1615,7 @@ public: co_return co_await set_estimated_partitions(estimated_partitions); } stats().rpc_call_nr++; - co_return co_await _messaging.send_repair_set_estimated_partitions(msg_addr(remote_node), _repair_meta_id, estimated_partitions, dst_cpu_id); + co_return co_await ser::repair_rpc_verbs::send_repair_set_estimated_partitions(&_messaging, msg_addr(remote_node), _repair_meta_id, estimated_partitions, dst_cpu_id); } @@ -1635,7 +1635,7 @@ public: co_return co_await get_sync_boundary_handler(skipped_sync_boundary); } stats().rpc_call_nr++; - co_return co_await _messaging.send_repair_get_sync_boundary(msg_addr(remote_node), _repair_meta_id, skipped_sync_boundary, dst_cpu_id); + co_return co_await ser::repair_rpc_verbs::send_repair_get_sync_boundary(&_messaging, msg_addr(remote_node), _repair_meta_id, skipped_sync_boundary, dst_cpu_id); } // RPC handler @@ -1662,7 +1662,7 @@ public: _metrics.tx_hashes_nr += set_diff.size(); } stats().rpc_call_nr++; - repair_rows_on_wire rows = _messaging.send_repair_get_row_diff(msg_addr(remote_node), + repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, msg_addr(remote_node), _repair_meta_id, std::move(set_diff), bool(needs_all_rows), dst_cpu_id).get(); if (!rows.empty()) { apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::no, node_idx); @@ -1676,7 +1676,7 @@ public: return; } stats().rpc_call_nr++; - repair_rows_on_wire rows = _messaging.send_repair_get_row_diff(msg_addr(remote_node), + repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, msg_addr(remote_node), _repair_meta_id, {}, bool(needs_all_rows_t::yes), dst_cpu_id).get(); if (!rows.empty()) { apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::yes, node_idx); @@ -1803,7 +1803,7 @@ public: stats().tx_row_bytes += row_bytes; stats().rpc_call_nr++; repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff)); - co_await _messaging.send_repair_put_row_diff(msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id); + co_await ser::repair_rpc_verbs::send_repair_put_row_diff(&_messaging, msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id); } } @@ -2425,7 +2425,7 @@ future<> repair_service::init_ms_handlers() { }); return make_ready_future>(sink); }); - ms.register_repair_get_full_row_hashes([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id_opt) { + ser::repair_rpc_verbs::register_repair_get_full_row_hashes(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto from = cinfo.retrieve_auxiliary("baddr"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2439,7 +2439,7 @@ future<> repair_service::init_ms_handlers() { }); }) ; }); - ms.register_repair_get_combined_row_hash([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_get_combined_row_hash(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional common_sync_boundary, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2455,7 +2455,7 @@ future<> repair_service::init_ms_handlers() { }); }); }); - ms.register_repair_get_sync_boundary([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_get_sync_boundary(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional skipped_sync_boundary, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto from = cinfo.retrieve_auxiliary("baddr"); @@ -2470,7 +2470,7 @@ future<> repair_service::init_ms_handlers() { }); }); }); - ms.register_repair_get_row_diff([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_get_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2493,7 +2493,7 @@ future<> repair_service::init_ms_handlers() { } }); }); - ms.register_repair_put_row_diff([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_put_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2513,7 +2513,7 @@ future<> repair_service::init_ms_handlers() { } }); }); - ms.register_repair_row_level_start([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name, + ser::repair_rpc_verbs::register_repair_row_level_start(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional reason, rpc::optional compaction_time, rpc::optional dst_cpu_id_opt) { @@ -2535,7 +2535,7 @@ future<> repair_service::init_ms_handlers() { schema_version, r, ct, _repair_module->abort_source()); }); }); - ms.register_repair_row_level_stop([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_row_level_stop(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name, sstring cf_name, dht::token_range range, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2545,7 +2545,7 @@ future<> repair_service::init_ms_handlers() { std::move(ks_name), std::move(cf_name), std::move(range)); }); }); - ms.register_repair_get_estimated_partitions([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id_opt) { + ser::repair_rpc_verbs::register_repair_get_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); auto from = cinfo.retrieve_auxiliary("baddr"); @@ -2553,7 +2553,7 @@ future<> repair_service::init_ms_handlers() { return repair_meta::repair_get_estimated_partitions_handler(local_repair, from, repair_meta_id); }); }); - ms.register_repair_set_estimated_partitions([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_set_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2562,7 +2562,7 @@ future<> repair_service::init_ms_handlers() { return repair_meta::repair_set_estimated_partitions_handler(local_repair, from, repair_meta_id, estimated_partitions); }); }); - ms.register_repair_get_diff_algorithms([] (const rpc::client_info& cinfo) { + ser::repair_rpc_verbs::register_repair_get_diff_algorithms(&ms, [] (const rpc::client_info& cinfo) { return make_ready_future>(suportted_diff_detect_algorithms()); }); ser::repair_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) { @@ -2584,16 +2584,6 @@ future<> repair_service::uninit_ms_handlers() { ms.unregister_repair_get_row_diff_with_rpc_stream(), ms.unregister_repair_put_row_diff_with_rpc_stream(), ms.unregister_repair_get_full_row_hashes_with_rpc_stream(), - ms.unregister_repair_get_full_row_hashes(), - ms.unregister_repair_get_combined_row_hash(), - ms.unregister_repair_get_sync_boundary(), - ms.unregister_repair_get_row_diff(), - ms.unregister_repair_put_row_diff(), - ms.unregister_repair_row_level_start(), - ms.unregister_repair_row_level_stop(), - ms.unregister_repair_get_estimated_partitions(), - ms.unregister_repair_set_estimated_partitions(), - ms.unregister_repair_get_diff_algorithms(), ser::repair_rpc_verbs::unregister(&ms) ).discard_result(); }