From 6b59279e263030c4236f0ccf37690769ff61145c Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:21:18 +0800 Subject: [PATCH 01/45] repair: Add repair_stream_cmd It is used by row level repair to add small protocol on top of the rpc stream interface. --- repair/repair.hh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/repair/repair.hh b/repair/repair.hh index 1cca3e7a60..9778f83f09 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -305,6 +305,16 @@ public: using repair_row_on_wire = partition_key_and_mutation_fragments; using repair_rows_on_wire = std::list; +enum class repair_stream_cmd : uint8_t { + error, + hash_data, + row_data, + end_of_current_hash_set, + needs_all_rows, + end_of_current_rows, + get_full_row_hashes, + put_rows_done, +}; enum class row_level_diff_detect_algorithm : uint8_t { send_full_set, }; From 9f36d775dcf76da7f7fc1bca5c10c52bd488ff64 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:23:08 +0800 Subject: [PATCH 02/45] repair: Add repair_hash_with_cmd It is a wrapper contains both a repair cmd and repair_hash object. --- repair/repair.hh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/repair/repair.hh b/repair/repair.hh index 9778f83f09..8685f00a74 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -315,6 +315,11 @@ enum class repair_stream_cmd : uint8_t { get_full_row_hashes, put_rows_done, }; + +struct repair_hash_with_cmd { + repair_stream_cmd cmd; + repair_hash hash; +}; enum class row_level_diff_detect_algorithm : uint8_t { send_full_set, }; From 6054a56333d2e3c1512d163f90e99b2c2288535d Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:25:48 +0800 Subject: [PATCH 03/45] repair: Add repair_row_on_wire_with_cmd It is used to contain both a repair cmd and repair_row_on_wire object. --- repair/repair.hh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/repair/repair.hh b/repair/repair.hh index 8685f00a74..249c0c941b 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -320,6 +320,12 @@ struct repair_hash_with_cmd { repair_stream_cmd cmd; repair_hash hash; }; + +struct repair_row_on_wire_with_cmd { + repair_stream_cmd cmd; + repair_row_on_wire row; +}; + enum class row_level_diff_detect_algorithm : uint8_t { send_full_set, }; From 3fc53a6b72a3cda07b1b2fb5f573ebe07e928422 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:26:49 +0800 Subject: [PATCH 04/45] repair: Add send_full_set_rpc_stream in row_level_diff_detect_algorithm It is used to negotiate if the master can use the rpc stream interface to transfer data. --- repair/repair.cc | 2 ++ repair/repair.hh | 1 + 2 files changed, 3 insertions(+) diff --git a/repair/repair.cc b/repair/repair.cc index 87b9f821ef..3c65546d31 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -69,6 +69,8 @@ std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo switch (algo) { case row_level_diff_detect_algorithm::send_full_set: return out << "send_full_set"; + case row_level_diff_detect_algorithm::send_full_set_rpc_stream: + return out << "send_full_set_rpc_stream"; }; return out << "unknown"; } diff --git a/repair/repair.hh b/repair/repair.hh index 249c0c941b..9dab489139 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -328,6 +328,7 @@ struct repair_row_on_wire_with_cmd { enum class row_level_diff_detect_algorithm : uint8_t { send_full_set, + send_full_set_rpc_stream, }; std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo); From fb3f0125ee2f34e7ddbd14e2f74e51812bb2830a Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:24:18 +0800 Subject: [PATCH 05/45] repair: Add default construct for partition_key_and_mutation_fragments This is useful when we want to add an empty partition_key_and_mutation_fragments. --- repair/repair.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/repair/repair.hh b/repair/repair.hh index 9dab489139..ac463df506 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -291,6 +291,9 @@ class partition_key_and_mutation_fragments { partition_key _key; std::list _mfs; public: + partition_key_and_mutation_fragments() + : _key(std::vector() ) { + } partition_key_and_mutation_fragments(partition_key key, std::list mfs) : _key(std::move(key)) , _mfs(std::move(mfs)) { From 672c24f6b07f8d49bdc5aa64832ddce5ec85d3b0 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:18:53 +0800 Subject: [PATCH 06/45] idl: Add send_full_set_rpc_stream for row_level_diff_detect_algorithm --- idl/partition_checksum.idl.hh | 1 + 1 file changed, 1 insertion(+) diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index f3544b6383..c0675928ba 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -76,4 +76,5 @@ struct get_combined_row_hash_response { enum class row_level_diff_detect_algorithm : uint8_t { send_full_set, + send_full_set_rpc_stream, }; From 599d40fbe94d926de090bcae7d593f57fc19e144 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:19:49 +0800 Subject: [PATCH 07/45] idl: Add repair_stream_cmd --- idl/partition_checksum.idl.hh | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index c0675928ba..bc43c365d0 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -78,3 +78,14 @@ enum class row_level_diff_detect_algorithm : uint8_t { send_full_set, send_full_set_rpc_stream, }; + +enum class repair_stream_cmd : uint8_t { + error, + hash_data, + row_data, + end_of_current_hash_set, + needs_all_rows, + end_of_current_rows, + get_full_row_hashes, + put_rows_done, +}; From a90fb24efc18e6a06be4ef5be8ce87d994684352 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:20:14 +0800 Subject: [PATCH 08/45] idl: Add repair_hash_with_cmd --- idl/partition_checksum.idl.hh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index bc43c365d0..8b6da9fb7d 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -89,3 +89,8 @@ enum class repair_stream_cmd : uint8_t { get_full_row_hashes, put_rows_done, }; + +struct repair_hash_with_cmd { + repair_stream_cmd cmd; + repair_hash hash; +}; From c93113f3a5ffa30396dd78133d6b5df9881488bb Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:20:35 +0800 Subject: [PATCH 09/45] idl: Add repair_row_on_wire_with_cmd --- idl/partition_checksum.idl.hh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index 8b6da9fb7d..f458469359 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -94,3 +94,8 @@ struct repair_hash_with_cmd { repair_stream_cmd cmd; repair_hash hash; }; + +struct repair_row_on_wire_with_cmd { + repair_stream_cmd cmd; + partition_key_and_mutation_fragments row; +}; From bc295a00a656e9e9939e4f810e3eb49e9dee823c Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:33:10 +0800 Subject: [PATCH 10/45] messaging_service: Add rpc stream verb for row level repair - REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM Get repair rows from follower nodes - REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM Put repair rows to follower nodes - REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM: Get full hashes from follower nodes --- message/messaging_service.cc | 3 +++ message/messaging_service.hh | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 94a5d76d23..7cc8875e08 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -464,6 +464,9 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS: case messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS: case messaging_verb::REPAIR_GET_DIFF_ALGORITHMS: + case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM: + case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM: + case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM: return 2; case messaging_verb::MUTATION_DONE: case messaging_verb::MUTATION_FAILED: diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 32be38e592..415580f992 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -130,7 +130,10 @@ enum class messaging_verb : int32_t { REPAIR_GET_ESTIMATED_PARTITIONS= 33, REPAIR_SET_ESTIMATED_PARTITIONS= 34, REPAIR_GET_DIFF_ALGORITHMS = 35, - LAST = 36, + REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM = 36, + REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM = 37, + REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM = 38, + LAST = 39, }; } // namespace netw From f312c95b74b87951c6fbdbf402a9afb44db05670 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:36:06 +0800 Subject: [PATCH 11/45] messaging_service: Add do_make_sink_source helper It is used by the row level repair rpc stream verbs to make sink and source object. --- message/messaging_service.cc | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 7cc8875e08..bdcfa83f2a 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -694,6 +694,18 @@ void messaging_service::register_stream_mutation_fragments(std::function +future, rpc::source> +do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr rpc_client, std::unique_ptr& rpc) { + return rpc_client->make_stream_sink().then([&rpc, verb, repair_meta_id, rpc_client] (rpc::sink sink) mutable { + auto rpc_handler = rpc->make_client (uint32_t, rpc::sink)>(verb); + return rpc_handler(*rpc_client, repair_meta_id, sink).then_wrapped([sink, rpc_client] (future> source) mutable { + return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable { + return make_ready_future, rpc::source>(std::move(sink), std::move(source.get0())); + }); + }); + }); +} // Send a message for verb template auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) { From dc92bda93ba8ff0ea690b5ba053a75ac81201672 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:37:52 +0800 Subject: [PATCH 12/45] messaging_service: Add REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM support --- message/messaging_service.cc | 17 +++++++++++++++++ message/messaging_service.hh | 5 +++++ 2 files changed, 22 insertions(+) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index bdcfa83f2a..397c9ab3b4 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -706,6 +706,23 @@ do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr, rpc::source> +messaging_service::make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { + auto verb = messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM; + auto rpc_client = get_rpc_client(verb, id); + return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); +} + +rpc::sink messaging_service::make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source& source) { + return source.make_sink(); +} + +void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { + register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); +} + // Send a message for verb template auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 415580f992..5c8b5455b8 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -277,6 +277,11 @@ public: rpc::sink make_sink_for_stream_mutation_fragments(rpc::source& source); future, rpc::source> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id); + // Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM + future, rpc::source> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); + rpc::sink make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source& source); + void register_repair_get_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id); From a7c7ba976587d4657e78f23a764bfbd04c570eea Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:39:16 +0800 Subject: [PATCH 13/45] messaging_service: Add REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM support It is used by row level repair. --- message/messaging_service.cc | 16 ++++++++++++++++ message/messaging_service.hh | 5 +++++ 2 files changed, 21 insertions(+) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 397c9ab3b4..fd8287ad6d 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -723,6 +723,22 @@ void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::functi register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); } +// Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM +future, rpc::source> +messaging_service::make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { + auto verb = messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM; + auto rpc_client = get_rpc_client(verb, id); + return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); +} + +rpc::sink messaging_service::make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source& source) { + return source.make_sink(); +} + +void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { + register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); +} + // Send a message for verb template auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 5c8b5455b8..aafbd846eb 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -282,6 +282,11 @@ public: rpc::sink make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source& source); void register_repair_get_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + // Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM + future, rpc::source> make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); + rpc::sink make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source& source); + void register_repair_put_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id); From 37b3de4ea08cdfd26f8eea3453ff87f6f544d604 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:40:09 +0800 Subject: [PATCH 14/45] messaging_service: Add REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM support It is used by row level repair. --- message/messaging_service.cc | 16 ++++++++++++++++ message/messaging_service.hh | 5 +++++ 2 files changed, 21 insertions(+) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index fd8287ad6d..64eabab375 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -739,6 +739,22 @@ void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::functi register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); } +// Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM +future, rpc::source> +messaging_service::make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { + auto verb = messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM; + auto rpc_client = get_rpc_client(verb, id); + return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); +} + +rpc::sink messaging_service::make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source& source) { + return source.make_sink(); +} + +void messaging_service::register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { + register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM, std::move(func)); +} + // Send a message for verb template auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index aafbd846eb..62fecf3c44 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -287,6 +287,11 @@ public: rpc::sink make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source& source); void register_repair_put_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + // Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM + future, rpc::source> make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); + rpc::sink make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source& source); + void register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id); From 8400dafa129fb005200374e57445ca662033af05 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:44:48 +0800 Subject: [PATCH 15/45] repair: Add sink_source_for_repair helper class It is used to store the sink and source objects for the rpc stream verbs used by row level repair. --- repair/row_level.cc | 64 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index a974ed3800..b56c5fbb13 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -59,6 +59,70 @@ struct shard_config { distributed* _sys_dist_ks; distributed* _view_update_generator; +// Wraps sink and source objects for repair master or repair follower nodes. +// For repair master, it stores sink and source pair for each of the followers. +// For repair follower, it stores one sink and source pair for repair master. +template +class sink_source_for_repair { + uint32_t _repair_meta_id; + using get_sink_source_fn_type = std::function, rpc::source> (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr)>; + using sink_type = std::reference_wrapper>; + using source_type = std::reference_wrapper>; + // The vectors below store sink and source object for peer nodes. + std::vector>> _sinks; + std::vector>> _sources; + std::vector _sources_closed; + get_sink_source_fn_type _fn; +public: + sink_source_for_repair(uint32_t repair_meta_id, size_t nr_peer_nodes, get_sink_source_fn_type fn) + : _repair_meta_id(repair_meta_id) + , _sinks(nr_peer_nodes) + , _sources(nr_peer_nodes) + , _sources_closed(nr_peer_nodes, false) + , _fn(std::move(fn)) { + } + void mark_source_closed(unsigned node_idx) { + _sources_closed[node_idx] = true; + } + future get_sink_source(gms::inet_address remote_node, unsigned node_idx) { + if (_sinks[node_idx] && _sources[node_idx]) { + return make_ready_future(_sinks[node_idx].value(), _sources[node_idx].value()); + } + if (_sinks[node_idx] || _sources[node_idx]) { + return make_exception_future(std::runtime_error(format("sink or source is missing for node {}", remote_node))); + } + return _fn(_repair_meta_id, netw::messaging_service::msg_addr(remote_node)).then([this, node_idx] (rpc::sink sink, rpc::source source) mutable { + _sinks[node_idx].emplace(std::move(sink)); + _sources[node_idx].emplace(std::move(source)); + return make_ready_future(_sinks[node_idx].value(), _sources[node_idx].value()); + }); + } + future<> close() { + return parallel_for_each(boost::irange(unsigned(0), unsigned(_sources.size())), [this] (unsigned node_idx) mutable { + std::optional>& sink_opt = _sinks[node_idx]; + auto f = sink_opt ? sink_opt->close() : make_ready_future<>(); + return f.finally([this, node_idx] { + std::optional>& source_opt = _sources[node_idx]; + if (source_opt && !_sources_closed[node_idx]) { + return repeat([&source_opt] () mutable { + // Keep reading source until end of stream + return (*source_opt)().then([] (std::optional> opt) mutable { + if (opt) { + return make_ready_future(stop_iteration::no); + } else { + return make_ready_future(stop_iteration::yes); + } + }).handle_exception([] (std::exception_ptr ep) { + return make_ready_future(stop_iteration::yes); + }); + }); + } + return make_ready_future<>(); + }); + }); + } +}; + struct row_level_repair_metrics { seastar::metrics::metric_groups _metrics; uint64_t tx_row_nr{0}; From 0bffd07e7ee93c6886130357096c9ef2e2dfa43d Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:47:51 +0800 Subject: [PATCH 16/45] repair: Add sink_source_for_get_full_row_hashes Use the sink_source_for_repair to define sink_source_for_get_full_row_hashes with sink = repair_stream_cmd, source = repair_hash_with_cmd for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM rpc stream verb. --- repair/row_level.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index b56c5fbb13..8c44de7fee 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -123,6 +123,8 @@ public: } }; +using sink_source_for_get_full_row_hashes = sink_source_for_repair; + struct row_level_repair_metrics { seastar::metrics::metric_groups _metrics; uint64_t tx_row_nr{0}; From 4405f7a6ffa1c9c906f716acbc7b819e29023bee Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:49:51 +0800 Subject: [PATCH 17/45] repair: Add sink_source_for_get_row_diff Use sink_source_for_repair to define sink_source_for_get_row_diff with sink = repair_hash_with_cmd, source = repair_row_on_wire_with_cmd for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM rpc stream verb. --- repair/row_level.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index 8c44de7fee..25a3421157 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -124,6 +124,7 @@ public: }; using sink_source_for_get_full_row_hashes = sink_source_for_repair; +using sink_source_for_get_row_diff = sink_source_for_repair; struct row_level_repair_metrics { seastar::metrics::metric_groups _metrics; From acd40fd5293b124efdb28f9d5d1137d269b128cf Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:51:36 +0800 Subject: [PATCH 18/45] repair: Add sink_source_for_put_row_diff Use sink_source_for_repair to define sink_source_for_put_row_diff with sink = repair_row_on_wire_with_cmd, source = repair_stream_cmd for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM rpc stream verb. --- repair/row_level.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index 25a3421157..98e6b7b421 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -125,6 +125,7 @@ public: using sink_source_for_get_full_row_hashes = sink_source_for_repair; using sink_source_for_get_row_diff = sink_source_for_repair; +using sink_source_for_put_row_diff = sink_source_for_repair; struct row_level_repair_metrics { seastar::metrics::metric_groups _metrics; From b3e729903262d58025b355c748379b14bc87cf66 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:54:12 +0800 Subject: [PATCH 19/45] repair: Add sink and source object into repair_meta They will soon be used to sync repair hashes and repair rows bewteen master and follower nodes. --- repair/row_level.cc | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index 98e6b7b421..ad9b369f64 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -583,6 +583,9 @@ private: std::vector> _peer_row_hash_sets; // Gate used to make sure pending operation of meta data is done seastar::gate _gate; + sink_source_for_get_full_row_hashes _sink_source_for_get_full_row_hashes; + sink_source_for_get_row_diff _sink_source_for_get_row_diff; + sink_source_for_put_row_diff _sink_source_for_put_row_diff; public: repair_stats& stats() { return _stats; @@ -642,14 +645,30 @@ public: _seed, repair_reader::is_local_reader(_repair_master || _same_sharding_config) ) - , _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes) { + , _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes) + , _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes, + [] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) { + return netw::get_local_messaging_service().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr); + }) + , _sink_source_for_get_row_diff(_repair_meta_id, _nr_peer_nodes, + [] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) { + return netw::get_local_messaging_service().make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(repair_meta_id, addr); + }) + , _sink_source_for_put_row_diff(_repair_meta_id, _nr_peer_nodes, + [] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) { + return netw::get_local_messaging_service().make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(repair_meta_id, addr); + }) + { } public: future<> stop() { auto gate_future = _gate.close(); auto writer_future = _repair_writer.wait_for_writer_done(); - return when_all_succeed(std::move(gate_future), std::move(writer_future)); + auto f1 = _sink_source_for_get_full_row_hashes.close(); + auto f2 = _sink_source_for_get_row_diff.close(); + auto f3 = _sink_source_for_put_row_diff.close(); + return when_all_succeed(std::move(gate_future), std::move(writer_future), std::move(f1), std::move(f2), std::move(f3)); } static std::unordered_map>& repair_meta_map() { From 149c54b00022cff114103bb480d0e0c548c6704f Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:09:49 +0800 Subject: [PATCH 20/45] repair: Add get_full_row_hashes_source_op It is a helper that works on the source() of the get_full_row_hashes rpc stream verb. --- repair/row_level.cc | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index ad9b369f64..cebc617479 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1198,6 +1198,36 @@ public: }); } +private: + future<> get_full_row_hashes_source_op( + lw_shared_ptr> current_hashes, + gms::inet_address remote_node, + unsigned node_idx, + rpc::source& source) { + return repeat([this, current_hashes, remote_node, node_idx, &source] () mutable { + return source().then([this, current_hashes, remote_node, node_idx] (std::optional> hash_cmd_opt) mutable { + if (hash_cmd_opt) { + repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value()); + rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd)); + if (hash_cmd.cmd == repair_stream_cmd::hash_data) { + current_hashes->insert(hash_cmd.hash); + return make_ready_future(stop_iteration::no); + } else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) { + return make_ready_future(stop_iteration::yes); + } else if (hash_cmd.cmd == repair_stream_cmd::error) { + throw std::runtime_error("get_full_row_hashes: Peer failed to process"); + } else { + throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd"); + } + } else { + _sink_source_for_get_full_row_hashes.mark_source_closed(node_idx); + throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream"); + } + }); + }); + } + +public: // RPC handler future> get_full_row_hashes_handler() { From 1e2a598fe7f445a83d9395eb2cdd7e4504745813 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:10:42 +0800 Subject: [PATCH 21/45] repair: Add get_full_row_hashes_sink_op It is a helper that works on the sink() of the get_full_row_hashes rpc stream verb. --- repair/row_level.cc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index cebc617479..04cda5fa8c 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1227,6 +1227,16 @@ private: }); } + future<> get_full_row_hashes_sink_op(rpc::sink& sink) { + return sink(repair_stream_cmd::get_full_row_hashes).then([&sink] { + return sink.flush(); + }).handle_exception([&sink] (std::exception_ptr ep) { + return sink.close().then([ep = std::move(ep)] () mutable { + return make_exception_future<>(std::move(ep)); + }); + }); + } + public: // RPC handler future> From 473bd7599cb9958c84f83ffbb1f29e7f214fae4d Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:11:35 +0800 Subject: [PATCH 22/45] repair: Add get_full_row_hashes_with_rpc_stream It is rpc stream version of get_full_row_hashes. It uses rpc stream instead of rpc verb to get the repair hashes data from follower nodes. --- repair/row_level.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index 04cda5fa8c..a7f864d084 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1238,6 +1238,23 @@ private: } public: + future> + get_full_row_hashes_with_rpc_stream(gms::inet_address remote_node, unsigned node_idx) { + if (remote_node == _myip) { + return get_full_row_hashes_handler(); + } + auto current_hashes = make_lw_shared>(); + return _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx).then( + [this, current_hashes, remote_node, node_idx] + (rpc::sink& sink, rpc::source& source) mutable { + auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source); + auto sink_op = get_full_row_hashes_sink_op(sink); + return when_all_succeed(std::move(source_op), std::move(sink_op)); + }).then([current_hashes] () mutable { + return std::move(*current_hashes); + }); + } + // RPC handler future> get_full_row_hashes_handler() { From a1e19514f92a2e6cff9aed786b05b4e615751ada Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:13:37 +0800 Subject: [PATCH 23/45] repair: Add get_row_diff_source_op It is a helper that works on the source() of the get_row_diff rpc stream verb. --- repair/row_level.cc | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index a7f864d084..0ddaebdcbf 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -56,6 +56,8 @@ struct shard_config { sstring partitioner_name; }; +static bool inject_rpc_stream_error = false; + distributed* _sys_dist_ks; distributed* _view_update_generator; @@ -1426,6 +1428,44 @@ public: return make_ready_future<>(); } +private: + future<> get_row_diff_source_op( + update_peer_row_hash_sets update_hash_set, + gms::inet_address remote_node, + unsigned node_idx, + rpc::sink& sink, + rpc::source& source) { + auto current_rows = make_lw_shared(); + return repeat([this, current_rows, update_hash_set, remote_node, node_idx, &sink, &source] () mutable { + return source().then([this, current_rows, update_hash_set, remote_node, node_idx] (std::optional> row_opt) mutable { + if (row_opt) { + if (inject_rpc_stream_error) { + throw std::runtime_error("get_row_diff: Inject sender error in source loop"); + } + auto row = std::move(std::get<0>(row_opt.value())); + if (row.cmd == repair_stream_cmd::row_data) { + rlogger.trace("get_row_diff: Got repair_row_on_wire with data"); + current_rows->push_back(std::move(row.row)); + return make_ready_future(stop_iteration::no); + } else if (row.cmd == repair_stream_cmd::end_of_current_rows) { + rlogger.trace("get_row_diff: Got repair_row_on_wire with nullopt"); + return apply_rows(std::move(*current_rows), remote_node, update_working_row_buf::yes, update_hash_set, node_idx).then([current_rows] { + current_rows->clear(); + return make_ready_future(stop_iteration::yes); + }); + } else if (row.cmd == repair_stream_cmd::error) { + throw std::runtime_error("get_row_diff: Peer failed to process"); + } else { + throw std::runtime_error("get_row_diff: Got unexpected repair_stream_cmd"); + } + } else { + _sink_source_for_get_row_diff.mark_source_closed(node_idx); + throw std::runtime_error("get_row_diff: Got unexpected end of stream"); + } + }); + }); + } +public: // RPC handler future get_row_diff_handler(const std::unordered_set& set_diff, needs_all_rows_t needs_all_rows) { return with_gate(_gate, [this, &set_diff, needs_all_rows] { From 4cb44baa08a3726660616b912919d7179fe2b0a4 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:14:52 +0800 Subject: [PATCH 24/45] repair: Add get_row_diff_sink_op It is a helper that works on the sink() of the get_row_diff rpc stream verb. --- repair/row_level.cc | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index 0ddaebdcbf..ba6aaca7a9 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1465,6 +1465,36 @@ private: }); }); } + + future<> get_row_diff_sink_op( + std::unordered_set set_diff, + needs_all_rows_t needs_all_rows, + rpc::sink& sink, + gms::inet_address remote_node) { + return do_with(std::move(set_diff), [needs_all_rows, remote_node, &sink] (std::unordered_set& set_diff) mutable { + if (inject_rpc_stream_error) { + return make_exception_future<>(std::runtime_error("get_row_diff: Inject sender error in sink loop")); + } + if (needs_all_rows) { + rlogger.trace("get_row_diff: request with repair_stream_cmd::needs_all_rows"); + return sink(repair_hash_with_cmd{repair_stream_cmd::needs_all_rows, repair_hash()}).then([&sink] () mutable { + return sink.flush(); + }); + } + return do_for_each(set_diff, [&sink] (const repair_hash& hash) mutable { + return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash}); + }).then([&sink] () mutable { + return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()}); + }).then([&sink] () mutable { + return sink.flush(); + }); + }).handle_exception([&sink] (std::exception_ptr ep) { + return sink.close().then([ep = std::move(ep)] () mutable { + return make_exception_future<>(std::move(ep)); + }); + }); + } + public: // RPC handler future get_row_diff_handler(const std::unordered_set& set_diff, needs_all_rows_t needs_all_rows) { From 72d3563da162357f3c0c3fd6cc121b32252c87a9 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:16:03 +0800 Subject: [PATCH 25/45] repair: Add get_row_diff_with_rpc_stream It is rpc stream version of get_row_diff. It uses rpc stream instead of rpc verb to get the repair rows from follower nodes. --- repair/row_level.cc | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index ba6aaca7a9..dcc1360d1e 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1496,6 +1496,34 @@ private: } public: + future<> get_row_diff_with_rpc_stream( + std::unordered_set set_diff, + needs_all_rows_t needs_all_rows, + update_peer_row_hash_sets update_hash_set, + gms::inet_address remote_node, + unsigned node_idx) { + if (needs_all_rows || !set_diff.empty()) { + if (remote_node == _myip) { + return make_ready_future<>(); + } + if (needs_all_rows) { + set_diff.clear(); + } else { + stats().tx_hashes_nr += set_diff.size(); + _metrics.tx_hashes_nr += set_diff.size(); + } + stats().rpc_call_nr++; + return _sink_source_for_get_row_diff.get_sink_source(remote_node, node_idx).then( + [this, set_diff = std::move(set_diff), needs_all_rows, update_hash_set, remote_node, node_idx] + (rpc::sink& sink, rpc::source& source) mutable { + auto source_op = get_row_diff_source_op(update_hash_set, remote_node, node_idx, sink, source); + auto sink_op = get_row_diff_sink_op(std::move(set_diff), needs_all_rows, sink, remote_node); + return when_all_succeed(std::move(source_op), std::move(sink_op)); + }); + } + return make_ready_future<>(); + } + // RPC handler future get_row_diff_handler(const std::unordered_set& set_diff, needs_all_rows_t needs_all_rows) { return with_gate(_gate, [this, &set_diff, needs_all_rows] { From dbe035649b4aea66a93f7f3aa323eaa694aab180 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:18:02 +0800 Subject: [PATCH 26/45] repair: Add put_row_diff_source_op It is a helper that works on the source() of the put_row_diff rpc stream verb. --- repair/row_level.cc | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index dcc1360d1e..d6aec5f856 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1554,6 +1554,32 @@ public: return make_ready_future<>(); } +private: + future<> put_row_diff_source_op( + gms::inet_address remote_node, + unsigned node_idx, + rpc::source& source) { + return repeat([this, remote_node, node_idx, &source] () mutable { + return source().then([this, remote_node, node_idx] (std::optional> status_opt) mutable { + if (status_opt) { + repair_stream_cmd status = std::move(std::get<0>(status_opt.value())); + rlogger.trace("put_row_diff: Got status code from follower={} for put_row_diff, status={}", remote_node, int(status)); + if (status == repair_stream_cmd::put_rows_done) { + return make_ready_future(stop_iteration::yes); + } else if (status == repair_stream_cmd::error) { + throw std::runtime_error(format("put_row_diff: Repair follower={} failed in put_row_diff hanlder, status={}", remote_node, int(status))); + } else { + throw std::runtime_error("put_row_diff: Got unexpected repair_stream_cmd"); + } + } else { + _sink_source_for_put_row_diff.mark_source_closed(node_idx); + throw std::runtime_error("put_row_diff: Got unexpected end of stream"); + } + }); + }); + } + +public: // RPC handler future<> put_row_diff_handler(repair_rows_on_wire rows, gms::inet_address from) { return with_gate(_gate, [this, rows = std::move(rows), from] () mutable { From 31b30486a7d3c9210f1978b8befccd71e3843c9a Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:19:01 +0800 Subject: [PATCH 27/45] repair: Add put_row_diff_sink_op It is a helper that works on the sink() of the put_row_diff rpc stream verb. --- repair/row_level.cc | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index d6aec5f856..d42b9403c1 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1579,6 +1579,28 @@ private: }); } + future<> put_row_diff_sink_op( + repair_rows_on_wire rows, + rpc::sink& sink, + gms::inet_address remote_node) { + return do_with(std::move(rows), [&sink, remote_node] (repair_rows_on_wire& rows) mutable { + return do_for_each(rows, [&sink] (repair_row_on_wire& row) mutable { + rlogger.trace("put_row_diff: send row"); + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)}); + }).then([&sink] () mutable { + rlogger.trace("put_row_diff: send empty row"); + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}).then([&sink] () mutable { + rlogger.trace("put_row_diff: send done"); + return sink.flush(); + }); + }); + }).handle_exception([&sink] (std::exception_ptr ep) { + return sink.close().then([ep = std::move(ep)] () mutable { + return make_exception_future<>(std::move(ep)); + }); + }); + } + public: // RPC handler future<> put_row_diff_handler(repair_rows_on_wire rows, gms::inet_address from) { From b1188f299e03c804fb7b120a8a191f6bc6aec451 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:20:00 +0800 Subject: [PATCH 28/45] repair: Add put_row_diff_with_rpc_stream It is rpc stream version of put_row_diff. It uses rpc stream instead of rpc verb to put the repair rows to follower nodes. --- repair/row_level.cc | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index d42b9403c1..c7c0c656ef 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1602,6 +1602,35 @@ private: } public: + future<> put_row_diff_with_rpc_stream( + const std::unordered_set& set_diff, + needs_all_rows_t needs_all_rows, + gms::inet_address remote_node, unsigned node_idx) { + if (!set_diff.empty()) { + if (remote_node == _myip) { + return make_ready_future<>(); + } + std::list row_diff = get_row_diff(set_diff, needs_all_rows); + if (row_diff.size() != set_diff.size()) { + throw std::runtime_error("row_diff.size() != set_diff.size()"); + } + stats().tx_row_nr += row_diff.size(); + stats().tx_row_nr_peer[remote_node] += row_diff.size(); + stats().tx_row_bytes += get_repair_rows_size(row_diff); + stats().rpc_call_nr++; + return to_repair_rows_on_wire(std::move(row_diff)).then([this, remote_node, node_idx] (repair_rows_on_wire rows) { + return _sink_source_for_put_row_diff.get_sink_source(remote_node, node_idx).then( + [this, rows = std::move(rows), remote_node, node_idx] + (rpc::sink& sink, rpc::source& source) mutable { + auto source_op = put_row_diff_source_op(remote_node, node_idx, source); + auto sink_op = put_row_diff_sink_op(std::move(rows), sink, remote_node); + return when_all_succeed(std::move(source_op), std::move(sink_op)); + }); + }); + } + return make_ready_future<>(); + } + // RPC handler future<> put_row_diff_handler(repair_rows_on_wire rows, gms::inet_address from) { return with_gate(_gate, [this, rows = std::move(rows), from] () mutable { From 049e793fe5d16e1f2f4b301bf0a072d96641e2f3 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:22:20 +0800 Subject: [PATCH 29/45] repair: Add repair_get_row_diff_with_rpc_stream_process_op It is the helper for the get_row_diff rpc stream verb handler. --- repair/row_level.cc | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index c7c0c656ef..ad31bc3596 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1639,6 +1639,49 @@ public: } }; +future repair_get_row_diff_with_rpc_stream_process_op( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source, + bool &error, + std::unordered_set& current_set_diff, + std::optional> hash_cmd_opt) { + repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value()); + rlogger.trace("Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", from, hash_cmd.hash, int(hash_cmd.cmd)); + if (hash_cmd.cmd == repair_stream_cmd::hash_data) { + current_set_diff.insert(hash_cmd.hash); + return make_ready_future(stop_iteration::no); + } else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) { + if (inject_rpc_stream_error) { + return make_exception_future(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop")); + } + bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows; + return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, set_diff = std::move(current_set_diff)] { + auto rm = repair_meta::get_repair_meta(from, repair_meta_id); + return rm->get_row_diff_handler(set_diff, repair_meta::needs_all_rows_t(needs_all_rows)); + }).then([sink] (repair_rows_on_wire rows_on_wire) mutable { + if (rows_on_wire.empty()) { + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}); + } + return do_with(std::move(rows_on_wire), [sink] (repair_rows_on_wire& rows_on_wire) mutable { + return do_for_each(rows_on_wire, [sink] (repair_row_on_wire& row) mutable { + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)}); + }).then([sink] () mutable { + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}); + }); + }); + }).then([sink] () mutable { + return sink.flush(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + } else { + return make_exception_future(std::runtime_error("Got unexpected repair_stream_cmd")); + } +} + future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator) { _sys_dist_ks = &sys_dist_ks; _view_update_generator = &view_update_generator; From 39d5a9446ecffa2dfabf9b162ad1428c274e128b Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:24:23 +0800 Subject: [PATCH 30/45] repair: Add repair_put_row_diff_with_rpc_stream_process_op It is the helper for the put_row_diff rpc stream verb handler. --- repair/row_level.cc | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index ad31bc3596..4189190c00 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1682,6 +1682,37 @@ future repair_get_row_diff_with_rpc_stream_process_op( } } +future repair_put_row_diff_with_rpc_stream_process_op( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source, + bool& error, + repair_rows_on_wire& current_rows, + std::optional> row_opt) { + auto row = std::move(std::get<0>(row_opt.value())); + if (row.cmd == repair_stream_cmd::row_data) { + rlogger.trace("Got repair_rows_on_wire from peer={}, got row_data", from); + current_rows.push_back(std::move(row.row)); + return make_ready_future(stop_iteration::no); + } else if (row.cmd == repair_stream_cmd::end_of_current_rows) { + rlogger.trace("Got repair_rows_on_wire from peer={}, got end_of_current_rows", from); + return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, rows = std::move(current_rows)] () mutable { + auto rm = repair_meta::get_repair_meta(from, repair_meta_id); + return rm->put_row_diff_handler(std::move(rows), from); + }).then([sink] () mutable { + return sink(repair_stream_cmd::put_rows_done); + }).then([sink] () mutable { + return sink.flush(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + } else { + return make_exception_future(std::runtime_error("Got unexpected repair_stream_cmd")); + } +} + future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator) { _sys_dist_ks = &sys_dist_ks; _view_update_generator = &view_update_generator; From 5f25969da33e4cec5aa205d6724ec17be7debcbc Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:25:13 +0800 Subject: [PATCH 31/45] repair: Add repair_get_full_row_hashes_with_rpc_stream_process_op It is the helper for the get_full_row_hashes rpc stream verb handler. --- repair/row_level.cc | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index 4189190c00..3326547876 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1713,6 +1713,41 @@ future repair_put_row_diff_with_rpc_stream_process_op( } } +future repair_get_full_row_hashes_with_rpc_stream_process_op( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source, + bool &error, + std::optional> status_opt) { + repair_stream_cmd status = std::get<0>(status_opt.value()); + rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status)); + if (status == repair_stream_cmd::get_full_row_hashes) { + return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id] { + auto rm = repair_meta::get_repair_meta(from, repair_meta_id); + return rm->get_full_row_hashes_handler().then([] (std::unordered_set hashes) { + _metrics.tx_hashes_nr += hashes.size(); + return hashes; + }); + }).then([sink] (std::unordered_set hashes) mutable { + return do_with(std::move(hashes), [sink] (std::unordered_set& hashes) mutable { + return do_for_each(hashes, [sink] (const repair_hash& hash) mutable { + return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash}); + }).then([sink] () mutable { + return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()}); + }); + }); + }).then([sink] () mutable { + return sink.flush(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + } else { + return make_exception_future(std::runtime_error("Got unexpected repair_stream_cmd")); + } +} + future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator) { _sys_dist_ks = &sys_dist_ks; _view_update_generator = &view_update_generator; From 06ac01426175b501aca3466e52bdb659dc70e90d Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:26:47 +0800 Subject: [PATCH 32/45] repair: Add repair_get_row_diff_with_rpc_stream_handler It is the handler for the get_row_diff rpc stream verb on the receiving side. --- repair/row_level.cc | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index 3326547876..a89bd6edd3 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1748,6 +1748,47 @@ future repair_get_full_row_hashes_with_rpc_stream_process_op( } } +future<> repair_get_row_diff_with_rpc_stream_handler( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source) { + return do_with(false, std::unordered_set(), [from, src_cpu_id, repair_meta_id, sink, source] (bool& error, std::unordered_set& current_set_diff) mutable { + return repeat([from, src_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] () mutable { + return source().then([from, src_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] (std::optional> hash_cmd_opt) mutable { + if (hash_cmd_opt) { + if (error) { + return make_ready_future(stop_iteration::no); + } + return repair_get_row_diff_with_rpc_stream_process_op(from, + src_cpu_id, + repair_meta_id, + sink, + source, + error, + current_set_diff, + std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable { + error = true; + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable { + return sink.close(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + }); + } else { + if (error) { + return make_ready_future(stop_iteration::yes); + } + return sink.close().then([sink] { + return make_ready_future(stop_iteration::yes); + }); + } + }); + }); + }); +} + future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator) { _sys_dist_ks = &sys_dist_ks; _view_update_generator = &view_update_generator; From e3267ad98c0e200554a0347fe960851518e35820 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:28:06 +0800 Subject: [PATCH 33/45] repair: Add repair_put_row_diff_with_rpc_stream_handler It is the handler for the put_row_diff rpc stream verb on the receiving side. --- repair/row_level.cc | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index a89bd6edd3..0a42408954 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1789,6 +1789,47 @@ future<> repair_get_row_diff_with_rpc_stream_handler( }); } +future<> repair_put_row_diff_with_rpc_stream_handler( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source) { + return do_with(false, repair_rows_on_wire(), [from, src_cpu_id, repair_meta_id, sink, source] (bool& error, repair_rows_on_wire& current_rows) mutable { + return repeat([from, src_cpu_id, repair_meta_id, sink, source, ¤t_rows, &error] () mutable { + return source().then([from, src_cpu_id, repair_meta_id, sink, source, ¤t_rows, &error] (std::optional> row_opt) mutable { + if (row_opt) { + if (error) { + return make_ready_future(stop_iteration::no); + } + return repair_put_row_diff_with_rpc_stream_process_op(from, + src_cpu_id, + repair_meta_id, + sink, + source, + error, + current_rows, + std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable { + error = true; + return sink(repair_stream_cmd::error).then([sink] () mutable { + return sink.close(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + }); + } else { + if (error) { + return make_ready_future(stop_iteration::yes); + } + return sink.close().then([sink] { + return make_ready_future(stop_iteration::yes); + }); + } + }); + }); + }); +} + future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator) { _sys_dist_ks = &sys_dist_ks; _view_update_generator = &view_update_generator; From f255f902bd6669ad93cebab83e6e8531728bdbb6 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:28:54 +0800 Subject: [PATCH 34/45] repair: Add repair_get_full_row_hashes_with_rpc_stream_handler It is the handler for the get_full_row_hashes rpc stream verb on the receiving side. --- repair/row_level.cc | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index 0a42408954..e35e037f0a 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1830,6 +1830,46 @@ future<> repair_put_row_diff_with_rpc_stream_handler( }); } +future<> repair_get_full_row_hashes_with_rpc_stream_handler( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source) { + return repeat([from, src_cpu_id, repair_meta_id, sink, source] () mutable { + return do_with(false, [from, src_cpu_id, repair_meta_id, sink, source] (bool& error) mutable { + return source().then([from, src_cpu_id, repair_meta_id, sink, source, &error] (std::optional> status_opt) mutable { + if (status_opt) { + if (error) { + return make_ready_future(stop_iteration::no); + } + return repair_get_full_row_hashes_with_rpc_stream_process_op(from, + src_cpu_id, + repair_meta_id, + sink, + source, + error, + std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable { + error = true; + return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable { + return sink.close(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + }); + } else { + if (error) { + return make_ready_future(stop_iteration::yes); + } + return sink.close().then([sink] { + return make_ready_future(stop_iteration::yes); + }); + } + }); + }); + }); +} + future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator) { _sys_dist_ks = &sys_dist_ks; _view_update_generator = &view_update_generator; From 67130031b1f3a2aca31371133bc9bda6dc20b2ae Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:30:14 +0800 Subject: [PATCH 35/45] repair: Register repair_get_row_diff_with_rpc_stream Register the get_row_diff rpc stream verb. --- repair/row_level.cc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index e35e037f0a..c516f40181 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1874,6 +1874,19 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed source) { + auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); + auto from = cinfo.retrieve_auxiliary("baddr"); + return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(), + [&ms, src_cpu_id, from, repair_meta_id, source] () mutable { + auto sink = ms.make_sink_for_repair_get_row_diff_with_rpc_stream(source); + repair_get_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception( + [from, repair_meta_id, sink, source] (std::exception_ptr ep) { + rlogger.info("Failed to process get_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep); + }); + return make_ready_future>(sink); + }); + }); ms.register_repair_get_full_row_hashes([] (const rpc::client_info& cinfo, uint32_t repair_meta_id) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto from = cinfo.retrieve_auxiliary("baddr"); From b56cced5b82093a42887195204725bb070de81ea Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:31:25 +0800 Subject: [PATCH 36/45] repair: Register repair_put_row_diff_with_rpc_stream Register the put_row_diff rpc stream verb. --- repair/row_level.cc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index c516f40181..8deebef6fa 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1887,6 +1887,19 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed>(sink); }); }); + ms.register_repair_put_row_diff_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source source) { + auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); + auto from = cinfo.retrieve_auxiliary("baddr"); + return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(), + [&ms, src_cpu_id, from, repair_meta_id, source] () mutable { + auto sink = ms.make_sink_for_repair_put_row_diff_with_rpc_stream(source); + repair_put_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception( + [from, repair_meta_id, sink, source] (std::exception_ptr ep) { + rlogger.info("Failed to process put_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep); + }); + return make_ready_future>(sink); + }); + }); ms.register_repair_get_full_row_hashes([] (const rpc::client_info& cinfo, uint32_t repair_meta_id) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto from = cinfo.retrieve_auxiliary("baddr"); From c4eb0ee361e6592765f8006d10cbcaf40c6c731b Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:32:14 +0800 Subject: [PATCH 37/45] repair: Register repair_get_full_row_hashes_with_rpc_strea Register the get_full_row_hashes rpc stream verb. --- repair/row_level.cc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index 8deebef6fa..ddb1f03b8e 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1900,6 +1900,19 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed>(sink); }); }); + ms.register_repair_get_full_row_hashes_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source source) { + auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); + auto from = cinfo.retrieve_auxiliary("baddr"); + return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(), + [&ms, src_cpu_id, from, repair_meta_id, source] () mutable { + auto sink = ms.make_sink_for_repair_get_full_row_hashes_with_rpc_stream(source); + repair_get_full_row_hashes_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception( + [from, repair_meta_id, sink, source] (std::exception_ptr ep) { + rlogger.info("Failed to process get_full_row_hashes_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep); + }); + return make_ready_future>(sink); + }); + }); ms.register_repair_get_full_row_hashes([] (const rpc::client_info& cinfo, uint32_t repair_meta_id) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto from = cinfo.retrieve_auxiliary("baddr"); From 6595417567851636b0d7fabec3f03b981244c00a Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 10:07:05 +0800 Subject: [PATCH 38/45] repair: Optimize get_row_diff Move _working_row_buf instead of copy if it is follower node or it is master node with only one follow. In these cases, the _working_row_buf will not be used after this function, so we can move it. --- repair/row_level.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index ddb1f03b8e..3acbfae0bd 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1060,7 +1060,11 @@ private: get_row_diff(const std::unordered_set& set_diff, needs_all_rows_t needs_all_rows = needs_all_rows_t::no) { std::list rows; if (needs_all_rows) { - rows = _working_row_buf; + if (!_repair_master || _nr_peer_nodes == 1) { + rows = std::move(_working_row_buf); + } else { + rows = _working_row_buf; + } } else { rows = boost::copy_range>(_working_row_buf | boost::adaptors::filtered([&set_diff] (repair_row& r) { return set_diff.count(r.hash()) > 0; })); From 1c92643f02a2038394511960301b0dc32fc3f7ea Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 11:19:31 +0800 Subject: [PATCH 39/45] repair: Add needs_all_rows flag to put_row_diff So we can avoid copy _working_row_buf in get_row_diff on master node if there is only one follower node and all repair rows are needed by follower node. --- repair/row_level.cc | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index 3acbfae0bd..91fcec0057 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1538,12 +1538,12 @@ public: // RPC API // Send rows in the _working_row_buf with hash within the given sef_diff - future<> put_row_diff(const std::unordered_set& set_diff, gms::inet_address remote_node) { + future<> put_row_diff(const std::unordered_set& set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node) { if (!set_diff.empty()) { if (remote_node == _myip) { return make_ready_future<>(); } - std::list row_diff = get_row_diff(set_diff); + std::list row_diff = get_row_diff(set_diff, needs_all_rows); if (row_diff.size() != set_diff.size()) { throw std::runtime_error("row_diff.size() != set_diff.size()"); } @@ -2246,8 +2246,9 @@ private: std::unordered_set local_row_hash_sets = master.working_row_hashes(); parallel_for_each(boost::irange(size_t(0), _all_live_peer_nodes.size()), [&, this] (size_t idx) { auto set_diff = repair_meta::get_set_diff(local_row_hash_sets, master.peer_row_hash_sets(idx)); - rlogger.trace("Calling master.put_row_diff to node {}, set_diff={}", _all_live_peer_nodes[idx], set_diff.size()); - return master.put_row_diff(set_diff, _all_live_peer_nodes[idx]); + auto needs_all_rows = repair_meta::needs_all_rows_t(master.peer_row_hash_sets(idx).empty()); + rlogger.debug("Calling master.put_row_diff to node {}, set_diff={}, needs_all_rows={}", _all_live_peer_nodes[idx], set_diff.size(), needs_all_rows); + return master.put_row_diff(set_diff, needs_all_rows, _all_live_peer_nodes[idx]); }).get(); master.stats().round_nr_slow_path++; } From 7bf0c646be9d346f014084cdfcb6b30b0485ec08 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 15:23:22 +0800 Subject: [PATCH 40/45] repair: Add is_rpc_stream_supported Given a row_level_diff_detect_algorithm, return if this algo supports rpc stream interface. --- repair/row_level.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index 91fcec0057..b5a5799ab8 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -199,6 +199,11 @@ static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(const st return common_algorithms.back(); } +bool is_rpc_stream_supported(row_level_diff_detect_algorithm algo) { + // send_full_set is the only algorithm that does not support rpc stream + return algo != row_level_diff_detect_algorithm::send_full_set; +} + static uint64_t get_random_seed() { static thread_local std::default_random_engine random_engine{std::random_device{}()}; static thread_local std::uniform_int_distribution random_dist{}; From fccaa0324fe348c8fa87be42a406d7d0fc4458d2 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 15:25:08 +0800 Subject: [PATCH 41/45] repair: Add use_rpc_stream to repair_meta Determine if rpc stream should be used. --- repair/row_level.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/repair/row_level.cc b/repair/row_level.cc index b5a5799ab8..2d2de7ec4a 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -612,6 +612,9 @@ public: const repair_hash& working_row_buf_combined_hash() const { return _working_row_buf_combined_hash; } + bool use_rpc_stream() const { + return is_rpc_stream_supported(_algo); + } public: repair_meta( From 7d08a8d223946d966bf3776ea3f02a45d1ba3382 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 11:29:08 +0800 Subject: [PATCH 42/45] repair: Hook rpc stream version of verbs in row level repair If rpc stream is supported, use the rpc stream version of the get_row_diff, put_row_diff, get_full_row_hashes. --- repair/row_level.cc | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index 2d2de7ec4a..6fa93f8f0f 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2216,13 +2216,26 @@ private: // Fast path: if local has zero row and remote has rows, request them all. if (master.working_row_buf_combined_hash() == repair_hash() && combined_hashes[node_idx + 1] != repair_hash()) { - master.get_row_diff_and_update_peer_row_hash_sets(node, node_idx).get(); + master.peer_row_hash_sets(node_idx).clear(); + if (master.use_rpc_stream()) { + rlogger.debug("FastPath: get_row_diff with needs_all_rows_t::yes rpc stream"); + master.get_row_diff_with_rpc_stream({}, repair_meta::needs_all_rows_t::yes, repair_meta::update_peer_row_hash_sets::yes, node, node_idx).get(); + } else { + rlogger.debug("FastPath: get_row_diff with needs_all_rows_t::yes rpc verb"); + master.get_row_diff_and_update_peer_row_hash_sets(node, node_idx).get(); + } continue; } + rlogger.debug("Before master.get_full_row_hashes for node {}, hash_sets={}", + node, master.peer_row_hash_sets(node_idx).size()); // Ask the peer to send the full list hashes in the working row buf. - master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes(node).get0(); - rlogger.debug("Calling master.get_full_row_hashes for node {}, hash_sets={}", + if (master.use_rpc_stream()) { + master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes_with_rpc_stream(node, node_idx).get0(); + } else { + master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes(node).get0(); + } + rlogger.debug("After master.get_full_row_hashes for node {}, hash_sets={}", node, master.peer_row_hash_sets(node_idx).size()); // With hashes of rows from peer node, we can figure out @@ -2234,12 +2247,16 @@ private: // between repair master and repair follower 2. std::unordered_set set_diff = repair_meta::get_set_diff(master.peer_row_hash_sets(node_idx), master.working_row_hashes()); // Request missing sets from peer node - rlogger.debug("Calling master.get_row_diff to node {}, local={}, peer={}, set_diff={}", - node, master.working_row_hashes().size(), master.peer_row_hash_sets(node_idx).size(), set_diff); + rlogger.debug("Before get_row_diff to node {}, local={}, peer={}, set_diff={}", + node, master.working_row_hashes().size(), master.peer_row_hash_sets(node_idx).size(), set_diff.size()); // If we need to pull all rows from the peer. We can avoid // sending the row hashes on wire by setting needs_all_rows flag. auto needs_all_rows = repair_meta::needs_all_rows_t(set_diff.size() == master.peer_row_hash_sets(node_idx).size()); - master.get_row_diff(std::move(set_diff), needs_all_rows, node, node_idx).get(); + if (master.use_rpc_stream()) { + master.get_row_diff_with_rpc_stream(std::move(set_diff), needs_all_rows, repair_meta::update_peer_row_hash_sets::no, node, node_idx).get(); + } else { + master.get_row_diff(std::move(set_diff), needs_all_rows, node, node_idx).get(); + } rlogger.debug("After get_row_diff node {}, hash_sets={}", master.myip(), master.working_row_hashes().size()); } return op_status::next_step; @@ -2256,7 +2273,11 @@ private: auto set_diff = repair_meta::get_set_diff(local_row_hash_sets, master.peer_row_hash_sets(idx)); auto needs_all_rows = repair_meta::needs_all_rows_t(master.peer_row_hash_sets(idx).empty()); rlogger.debug("Calling master.put_row_diff to node {}, set_diff={}, needs_all_rows={}", _all_live_peer_nodes[idx], set_diff.size(), needs_all_rows); - return master.put_row_diff(set_diff, needs_all_rows, _all_live_peer_nodes[idx]); + if (master.use_rpc_stream()) { + return master.put_row_diff_with_rpc_stream(set_diff, needs_all_rows, _all_live_peer_nodes[idx], idx); + } else { + return master.put_row_diff(set_diff, needs_all_rows, _all_live_peer_nodes[idx]); + } }).get(); master.stats().round_nr_slow_path++; } From e8c13444baaba1bca4e88490c26da455d96a971f Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 15:26:18 +0800 Subject: [PATCH 43/45] repair: Increase max row buf size If the cluster supports row level repair with rpc stream interface, we can use bigger row buf size to have better repair bandwidth in high latency links. --- repair/row_level.cc | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index 6fa93f8f0f..e53be1dc46 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1325,8 +1325,8 @@ public: return make_exception_future<>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later", utils::fb_utilities::get_broadcast_address()))); } - rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}", - utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range); + rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}", + utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size); return insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version)); } @@ -2062,9 +2062,6 @@ class row_level_repair { // A flag indicates any error during the repair bool _failed = false; - // Max buffer size per repair round - static constexpr size_t _max_row_buf_size = 256 * 1024; - // Seed for the repair row hashing. If we ever had a hash conflict for a row // and we are not using stable hash, there is chance we will fix the row in // the next repair. @@ -2091,6 +2088,11 @@ private: all_done, }; + size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) { + // Max buffer size per repair round + return is_rpc_stream_supported(algo) ? 32 * 1024 * 1024 : 256 * 1024; + } + // Step A: Negotiate sync boundary to use op_status negotiate_sync_boundary(repair_meta& master) { check_in_shutdown(); @@ -2289,6 +2291,7 @@ public: _ri.check_in_abort(); auto repair_meta_id = repair_meta::get_next_repair_meta_id().get0(); auto algorithm = get_common_diff_detect_algorithm(_all_live_peer_nodes); + auto max_row_buf_size = get_max_row_buf_size(algorithm); auto master_node_shard_config = shard_config { engine().cpu_id(), dht::global_partitioner().shard_count(), @@ -2303,7 +2306,7 @@ public: s, _range, algorithm, - _max_row_buf_size, + max_row_buf_size, _seed, repair_meta::repair_master::yes, repair_meta_id, @@ -2313,8 +2316,8 @@ public: // All nodes including the node itself. _all_nodes.insert(_all_nodes.begin(), master.myip()); - rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}", - master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, schema_version, _range, _seed); + rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_size={}", + master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, schema_version, _range, _seed, max_row_buf_size); try { From 78ae5af2037b9241e61df397813c36c09893079b Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 3 Jul 2019 08:03:31 +0800 Subject: [PATCH 44/45] repair: Mark some of the helper functions static They are used only inside repair/row_level.cc. Make them static. --- repair/row_level.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index e53be1dc46..360b928b98 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -199,7 +199,7 @@ static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(const st return common_algorithms.back(); } -bool is_rpc_stream_supported(row_level_diff_detect_algorithm algo) { +static bool is_rpc_stream_supported(row_level_diff_detect_algorithm algo) { // send_full_set is the only algorithm that does not support rpc stream return algo != row_level_diff_detect_algorithm::send_full_set; } @@ -1651,7 +1651,7 @@ public: } }; -future repair_get_row_diff_with_rpc_stream_process_op( +static future repair_get_row_diff_with_rpc_stream_process_op( gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, @@ -1694,7 +1694,7 @@ future repair_get_row_diff_with_rpc_stream_process_op( } } -future repair_put_row_diff_with_rpc_stream_process_op( +static future repair_put_row_diff_with_rpc_stream_process_op( gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, @@ -1725,7 +1725,7 @@ future repair_put_row_diff_with_rpc_stream_process_op( } } -future repair_get_full_row_hashes_with_rpc_stream_process_op( +static future repair_get_full_row_hashes_with_rpc_stream_process_op( gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, @@ -1760,7 +1760,7 @@ future repair_get_full_row_hashes_with_rpc_stream_process_op( } } -future<> repair_get_row_diff_with_rpc_stream_handler( +static future<> repair_get_row_diff_with_rpc_stream_handler( gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, @@ -1801,7 +1801,7 @@ future<> repair_get_row_diff_with_rpc_stream_handler( }); } -future<> repair_put_row_diff_with_rpc_stream_handler( +static future<> repair_put_row_diff_with_rpc_stream_handler( gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, @@ -1842,7 +1842,7 @@ future<> repair_put_row_diff_with_rpc_stream_handler( }); } -future<> repair_get_full_row_hashes_with_rpc_stream_handler( +static future<> repair_get_full_row_hashes_with_rpc_stream_handler( gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, From f686f0b9d626c5d6cf78c4e937c373a06548518c Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jun 2019 09:17:42 +0800 Subject: [PATCH 45/45] docs: Add RPC stream doc for row level repair This documents RPC stream usage in row level repair. --- docs/row_level_repair.md | 119 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/docs/row_level_repair.md b/docs/row_level_repair.md index 03d35b4afd..bd44158e2e 100644 --- a/docs/row_level_repair.md +++ b/docs/row_level_repair.md @@ -162,3 +162,122 @@ In the result, we saw the rows on wire were as expected. tx_row_nr = 1000505 + 999619 + 1001257 + 998619 (4 shards, the numbers are for each shard) = 4'000'000 rx_row_nr = 500233 + 500235 + 499559 + 499973 (4 shards, the numbers are for each shard) = 2'000'000 + +## RPC stream usage in repair + +Some of the RPC verbs used by row level repair can transmit bulk of data, +namely REPAIR_GET_FULL_ROW_HASHES, REPAIR_GET_ROW_DIFF and REPAIR_PUT_ROW_DIFF. +To have better repair bandwidth with high latency link, we want to increase the +row buff size. It is more efficent to send large amount of data with RPC +stream interface instead of the RPC verb interface. We want to switch to RPC +stream for such RPC verbs. Three functions, get_full_row_hashes(), +get_row_diff() and put_row_diff(), are converted to use the new RPC stream +interface. + + +### 1) get_full_row_hashes() +A new REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM rpc veb is introduced. + +#### The repair master sends: repair_stream_cmd. + +The repair_stream_cmd can be: +- repair_stream_cmd::get_full_row_hashes +Asks the repair follower to send all the hashes in working row buffer. + +#### The repair follower replies: repair_hash_with_cmd. + +``` +struct repair_hash_with_cmd { + repair_stream_cmd cmd; + repair_hash hash; +}; +``` + +The repair_stream_cmd in repair_hash_with_cmd can be: + +- repair_stream_cmd::hash_data +One of the hashes in the working row buffer. The hash is stored in repair_hash_with_cmd.hash. + +- repair_stream_cmd::end_of_current_hash_set +Notifies repair master that repair follower has send all the hashes in the working row buffer + +- repair_stream_cmd::error +Notifies an error has happened on the follower + + +### 2) get_row_diff() + +A new REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM rpc veb is introduced. + +``` +struct repair_hash_with_cmd { + repair_stream_cmd cmd; + repair_hash hash; +}; + +struct repair_row_on_wire_with_cmd { + repair_stream_cmd cmd; + repair_row_on_wire row; +}; +``` + +#### The repair master sends: repair_hash_with_cmd + +The repair_stream_cmd in repair_hash_with_cmd can be: + +- repair_stream_cmd::needs_all_rows +Asks the repair follower to send all the rows in the working row buffer. + +- repair_stream_cmd::hash_data +Contains the hash for the row in the working row buffer that repair master +needs. The hash is stored in repair_hash_with_cmd.hash. + +- repair_stream_cmd::end_of_current_hash_set + +Notifies repair follower that repair master has sent all the rows it needs. + +#### The repair follower replies: repair_row_on_wire_with_cmd + +The repair_stream_cmd in repair_row_on_wire_with_cmd can be: + +- repair_stream_cmd::row_data +This is one of the row repair master requested. The row data is stored in +repair_row_on_wire_with_cmd.row. + +- repair_stream_cmd::end_of_current_rows +Notifes repair follower has sent all the rows repair master requested. + +- repair_stream_cmd::error +Notifies an error has happened on the follower. + + +### 3) put_row_diff() + +A new REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM rpc veb is introduced. + +``` +struct repair_row_on_wire_with_cmd { + repair_stream_cmd cmd; + repair_row_on_wire row; +}; +``` + +#### The repair master sends: repair_row_on_wire_with_cmd + +The repair_stream_cmd in repair_row_on_wire_with_cmd can be: + +- repair_stream_cmd::row_data +Contains one of the rows that repair master wants to send to repair follower. +The row data is stored in repair_row_on_wire_with_cmd.row. + +- repair_stream_cmd::end_of_current_rows +Notifies repair follower that repair master has sent all the rows it wants to send. + +#### The repair follower replies: repair_stream_cmd + +- repair_stream_cmd::put_rows_done +Notifies repair master that repair follower has received and applied all the +rows repair master sent. + +- repair_stream_cmd::error +Notifies an error has happened on the follower.