diff --git a/docs/row_level_repair.md b/docs/row_level_repair.md index 03d35b4afd..bd44158e2e 100644 --- a/docs/row_level_repair.md +++ b/docs/row_level_repair.md @@ -162,3 +162,122 @@ In the result, we saw the rows on wire were as expected. tx_row_nr = 1000505 + 999619 + 1001257 + 998619 (4 shards, the numbers are for each shard) = 4'000'000 rx_row_nr = 500233 + 500235 + 499559 + 499973 (4 shards, the numbers are for each shard) = 2'000'000 + +## RPC stream usage in repair + +Some of the RPC verbs used by row level repair can transmit bulk of data, +namely REPAIR_GET_FULL_ROW_HASHES, REPAIR_GET_ROW_DIFF and REPAIR_PUT_ROW_DIFF. +To have better repair bandwidth with high latency link, we want to increase the +row buff size. It is more efficent to send large amount of data with RPC +stream interface instead of the RPC verb interface. We want to switch to RPC +stream for such RPC verbs. Three functions, get_full_row_hashes(), +get_row_diff() and put_row_diff(), are converted to use the new RPC stream +interface. + + +### 1) get_full_row_hashes() +A new REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM rpc veb is introduced. + +#### The repair master sends: repair_stream_cmd. + +The repair_stream_cmd can be: +- repair_stream_cmd::get_full_row_hashes +Asks the repair follower to send all the hashes in working row buffer. + +#### The repair follower replies: repair_hash_with_cmd. + +``` +struct repair_hash_with_cmd { + repair_stream_cmd cmd; + repair_hash hash; +}; +``` + +The repair_stream_cmd in repair_hash_with_cmd can be: + +- repair_stream_cmd::hash_data +One of the hashes in the working row buffer. The hash is stored in repair_hash_with_cmd.hash. + +- repair_stream_cmd::end_of_current_hash_set +Notifies repair master that repair follower has send all the hashes in the working row buffer + +- repair_stream_cmd::error +Notifies an error has happened on the follower + + +### 2) get_row_diff() + +A new REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM rpc veb is introduced. + +``` +struct repair_hash_with_cmd { + repair_stream_cmd cmd; + repair_hash hash; +}; + +struct repair_row_on_wire_with_cmd { + repair_stream_cmd cmd; + repair_row_on_wire row; +}; +``` + +#### The repair master sends: repair_hash_with_cmd + +The repair_stream_cmd in repair_hash_with_cmd can be: + +- repair_stream_cmd::needs_all_rows +Asks the repair follower to send all the rows in the working row buffer. + +- repair_stream_cmd::hash_data +Contains the hash for the row in the working row buffer that repair master +needs. The hash is stored in repair_hash_with_cmd.hash. + +- repair_stream_cmd::end_of_current_hash_set + +Notifies repair follower that repair master has sent all the rows it needs. + +#### The repair follower replies: repair_row_on_wire_with_cmd + +The repair_stream_cmd in repair_row_on_wire_with_cmd can be: + +- repair_stream_cmd::row_data +This is one of the row repair master requested. The row data is stored in +repair_row_on_wire_with_cmd.row. + +- repair_stream_cmd::end_of_current_rows +Notifes repair follower has sent all the rows repair master requested. + +- repair_stream_cmd::error +Notifies an error has happened on the follower. + + +### 3) put_row_diff() + +A new REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM rpc veb is introduced. + +``` +struct repair_row_on_wire_with_cmd { + repair_stream_cmd cmd; + repair_row_on_wire row; +}; +``` + +#### The repair master sends: repair_row_on_wire_with_cmd + +The repair_stream_cmd in repair_row_on_wire_with_cmd can be: + +- repair_stream_cmd::row_data +Contains one of the rows that repair master wants to send to repair follower. +The row data is stored in repair_row_on_wire_with_cmd.row. + +- repair_stream_cmd::end_of_current_rows +Notifies repair follower that repair master has sent all the rows it wants to send. + +#### The repair follower replies: repair_stream_cmd + +- repair_stream_cmd::put_rows_done +Notifies repair master that repair follower has received and applied all the +rows repair master sent. + +- repair_stream_cmd::error +Notifies an error has happened on the follower. diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index f3544b6383..f458469359 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -76,4 +76,26 @@ struct get_combined_row_hash_response { enum class row_level_diff_detect_algorithm : uint8_t { send_full_set, + send_full_set_rpc_stream, +}; + +enum class repair_stream_cmd : uint8_t { + error, + hash_data, + row_data, + end_of_current_hash_set, + needs_all_rows, + end_of_current_rows, + get_full_row_hashes, + put_rows_done, +}; + +struct repair_hash_with_cmd { + repair_stream_cmd cmd; + repair_hash hash; +}; + +struct repair_row_on_wire_with_cmd { + repair_stream_cmd cmd; + partition_key_and_mutation_fragments row; }; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 94a5d76d23..64eabab375 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -464,6 +464,9 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS: case messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS: case messaging_verb::REPAIR_GET_DIFF_ALGORITHMS: + case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM: + case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM: + case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM: return 2; case messaging_verb::MUTATION_DONE: case messaging_verb::MUTATION_FAILED: @@ -691,6 +694,67 @@ void messaging_service::register_stream_mutation_fragments(std::function +future, rpc::source> +do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr rpc_client, std::unique_ptr& rpc) { + return rpc_client->make_stream_sink().then([&rpc, verb, repair_meta_id, rpc_client] (rpc::sink sink) mutable { + auto rpc_handler = rpc->make_client (uint32_t, rpc::sink)>(verb); + return rpc_handler(*rpc_client, repair_meta_id, sink).then_wrapped([sink, rpc_client] (future> source) mutable { + return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable { + return make_ready_future, rpc::source>(std::move(sink), std::move(source.get0())); + }); + }); + }); +} + +// Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM +future, rpc::source> +messaging_service::make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { + auto verb = messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM; + auto rpc_client = get_rpc_client(verb, id); + return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); +} + +rpc::sink messaging_service::make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source& source) { + return source.make_sink(); +} + +void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { + register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); +} + +// Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM +future, rpc::source> +messaging_service::make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { + auto verb = messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM; + auto rpc_client = get_rpc_client(verb, id); + return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); +} + +rpc::sink messaging_service::make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source& source) { + return source.make_sink(); +} + +void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { + register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM, std::move(func)); +} + +// Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM +future, rpc::source> +messaging_service::make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) { + auto verb = messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM; + auto rpc_client = get_rpc_client(verb, id); + return do_make_sink_source(verb, repair_meta_id, std::move(rpc_client), rpc()); +} + +rpc::sink messaging_service::make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source& source) { + return source.make_sink(); +} + +void messaging_service::register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func) { + register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM, std::move(func)); +} + // Send a message for verb template auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 32be38e592..62fecf3c44 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -130,7 +130,10 @@ enum class messaging_verb : int32_t { REPAIR_GET_ESTIMATED_PARTITIONS= 33, REPAIR_SET_ESTIMATED_PARTITIONS= 34, REPAIR_GET_DIFF_ALGORITHMS = 35, - LAST = 36, + REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM = 36, + REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM = 37, + REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM = 38, + LAST = 39, }; } // namespace netw @@ -274,6 +277,21 @@ public: rpc::sink make_sink_for_stream_mutation_fragments(rpc::source& source); future, rpc::source> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id); + // Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM + future, rpc::source> make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); + rpc::sink make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source& source); + void register_repair_get_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + + // Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM + future, rpc::source> make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); + rpc::sink make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source& source); + void register_repair_put_row_diff_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + + // Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM + future, rpc::source> make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id); + rpc::sink make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source& source); + void register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source)>&& func); + void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id); diff --git a/repair/repair.cc b/repair/repair.cc index a7aeeb2f81..1b80b12f68 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -69,6 +69,8 @@ std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo switch (algo) { case row_level_diff_detect_algorithm::send_full_set: return out << "send_full_set"; + case row_level_diff_detect_algorithm::send_full_set_rpc_stream: + return out << "send_full_set_rpc_stream"; }; return out << "unknown"; } diff --git a/repair/repair.hh b/repair/repair.hh index 6a0f49f66e..c1780df2d6 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -332,6 +332,9 @@ class partition_key_and_mutation_fragments { partition_key _key; std::list _mfs; public: + partition_key_and_mutation_fragments() + : _key(std::vector() ) { + } partition_key_and_mutation_fragments(partition_key key, std::list mfs) : _key(std::move(key)) , _mfs(std::move(mfs)) { @@ -346,8 +349,30 @@ public: using repair_row_on_wire = partition_key_and_mutation_fragments; using repair_rows_on_wire = std::list; +enum class repair_stream_cmd : uint8_t { + error, + hash_data, + row_data, + end_of_current_hash_set, + needs_all_rows, + end_of_current_rows, + get_full_row_hashes, + put_rows_done, +}; + +struct repair_hash_with_cmd { + repair_stream_cmd cmd; + repair_hash hash; +}; + +struct repair_row_on_wire_with_cmd { + repair_stream_cmd cmd; + repair_row_on_wire row; +}; + enum class row_level_diff_detect_algorithm : uint8_t { send_full_set, + send_full_set_rpc_stream, }; std::ostream& operator<<(std::ostream& out, row_level_diff_detect_algorithm algo); diff --git a/repair/row_level.cc b/repair/row_level.cc index fba3e4c04f..686cd64999 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -57,9 +57,79 @@ struct shard_config { sstring partitioner_name; }; +static bool inject_rpc_stream_error = false; + distributed* _sys_dist_ks; distributed* _view_update_generator; +// Wraps sink and source objects for repair master or repair follower nodes. +// For repair master, it stores sink and source pair for each of the followers. +// For repair follower, it stores one sink and source pair for repair master. +template +class sink_source_for_repair { + uint32_t _repair_meta_id; + using get_sink_source_fn_type = std::function, rpc::source> (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr)>; + using sink_type = std::reference_wrapper>; + using source_type = std::reference_wrapper>; + // The vectors below store sink and source object for peer nodes. + std::vector>> _sinks; + std::vector>> _sources; + std::vector _sources_closed; + get_sink_source_fn_type _fn; +public: + sink_source_for_repair(uint32_t repair_meta_id, size_t nr_peer_nodes, get_sink_source_fn_type fn) + : _repair_meta_id(repair_meta_id) + , _sinks(nr_peer_nodes) + , _sources(nr_peer_nodes) + , _sources_closed(nr_peer_nodes, false) + , _fn(std::move(fn)) { + } + void mark_source_closed(unsigned node_idx) { + _sources_closed[node_idx] = true; + } + future get_sink_source(gms::inet_address remote_node, unsigned node_idx) { + if (_sinks[node_idx] && _sources[node_idx]) { + return make_ready_future(_sinks[node_idx].value(), _sources[node_idx].value()); + } + if (_sinks[node_idx] || _sources[node_idx]) { + return make_exception_future(std::runtime_error(format("sink or source is missing for node {}", remote_node))); + } + return _fn(_repair_meta_id, netw::messaging_service::msg_addr(remote_node)).then([this, node_idx] (rpc::sink sink, rpc::source source) mutable { + _sinks[node_idx].emplace(std::move(sink)); + _sources[node_idx].emplace(std::move(source)); + return make_ready_future(_sinks[node_idx].value(), _sources[node_idx].value()); + }); + } + future<> close() { + return parallel_for_each(boost::irange(unsigned(0), unsigned(_sources.size())), [this] (unsigned node_idx) mutable { + std::optional>& sink_opt = _sinks[node_idx]; + auto f = sink_opt ? sink_opt->close() : make_ready_future<>(); + return f.finally([this, node_idx] { + std::optional>& source_opt = _sources[node_idx]; + if (source_opt && !_sources_closed[node_idx]) { + return repeat([&source_opt] () mutable { + // Keep reading source until end of stream + return (*source_opt)().then([] (std::optional> opt) mutable { + if (opt) { + return make_ready_future(stop_iteration::no); + } else { + return make_ready_future(stop_iteration::yes); + } + }).handle_exception([] (std::exception_ptr ep) { + return make_ready_future(stop_iteration::yes); + }); + }); + } + return make_ready_future<>(); + }); + }); + } +}; + +using sink_source_for_get_full_row_hashes = sink_source_for_repair; +using sink_source_for_get_row_diff = sink_source_for_repair; +using sink_source_for_put_row_diff = sink_source_for_repair; + struct row_level_repair_metrics { seastar::metrics::metric_groups _metrics; uint64_t tx_row_nr{0}; @@ -130,6 +200,11 @@ static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(const st return common_algorithms.back(); } +static bool is_rpc_stream_supported(row_level_diff_detect_algorithm algo) { + // send_full_set is the only algorithm that does not support rpc stream + return algo != row_level_diff_detect_algorithm::send_full_set; +} + static uint64_t get_random_seed() { static thread_local std::default_random_engine random_engine{std::random_device{}()}; static thread_local std::uniform_int_distribution random_dist{}; @@ -524,6 +599,9 @@ private: std::vector> _peer_row_hash_sets; // Gate used to make sure pending operation of meta data is done seastar::gate _gate; + sink_source_for_get_full_row_hashes _sink_source_for_get_full_row_hashes; + sink_source_for_get_row_diff _sink_source_for_get_row_diff; + sink_source_for_put_row_diff _sink_source_for_put_row_diff; public: repair_stats& stats() { return _stats; @@ -543,6 +621,9 @@ public: const repair_hash& working_row_buf_combined_hash() const { return _working_row_buf_combined_hash; } + bool use_rpc_stream() const { + return is_rpc_stream_supported(_algo); + } public: repair_meta( @@ -583,14 +664,30 @@ public: _seed, repair_reader::is_local_reader(_repair_master || _same_sharding_config) ) - , _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes) { + , _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes) + , _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes, + [] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) { + return netw::get_local_messaging_service().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr); + }) + , _sink_source_for_get_row_diff(_repair_meta_id, _nr_peer_nodes, + [] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) { + return netw::get_local_messaging_service().make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(repair_meta_id, addr); + }) + , _sink_source_for_put_row_diff(_repair_meta_id, _nr_peer_nodes, + [] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) { + return netw::get_local_messaging_service().make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(repair_meta_id, addr); + }) + { } public: future<> stop() { auto gate_future = _gate.close(); auto writer_future = _repair_writer.wait_for_writer_done(); - return when_all_succeed(std::move(gate_future), std::move(writer_future)); + auto f1 = _sink_source_for_get_full_row_hashes.close(); + auto f2 = _sink_source_for_get_row_diff.close(); + auto f3 = _sink_source_for_put_row_diff.close(); + return when_all_succeed(std::move(gate_future), std::move(writer_future), std::move(f1), std::move(f2), std::move(f3)); } static std::unordered_map>& repair_meta_map() { @@ -980,7 +1077,11 @@ private: get_row_diff(const std::unordered_set& set_diff, needs_all_rows_t needs_all_rows = needs_all_rows_t::no) { std::list rows; if (needs_all_rows) { - rows = _working_row_buf; + if (!_repair_master || _nr_peer_nodes == 1) { + rows = std::move(_working_row_buf); + } else { + rows = _working_row_buf; + } } else { rows = boost::copy_range>(_working_row_buf | boost::adaptors::filtered([&set_diff] (repair_row& r) { return set_diff.count(r.hash()) > 0; })); @@ -1120,6 +1221,63 @@ public: }); } +private: + future<> get_full_row_hashes_source_op( + lw_shared_ptr> current_hashes, + gms::inet_address remote_node, + unsigned node_idx, + rpc::source& source) { + return repeat([this, current_hashes, remote_node, node_idx, &source] () mutable { + return source().then([this, current_hashes, remote_node, node_idx] (std::optional> hash_cmd_opt) mutable { + if (hash_cmd_opt) { + repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value()); + rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd)); + if (hash_cmd.cmd == repair_stream_cmd::hash_data) { + current_hashes->insert(hash_cmd.hash); + return make_ready_future(stop_iteration::no); + } else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) { + return make_ready_future(stop_iteration::yes); + } else if (hash_cmd.cmd == repair_stream_cmd::error) { + throw std::runtime_error("get_full_row_hashes: Peer failed to process"); + } else { + throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd"); + } + } else { + _sink_source_for_get_full_row_hashes.mark_source_closed(node_idx); + throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream"); + } + }); + }); + } + + future<> get_full_row_hashes_sink_op(rpc::sink& sink) { + return sink(repair_stream_cmd::get_full_row_hashes).then([&sink] { + return sink.flush(); + }).handle_exception([&sink] (std::exception_ptr ep) { + return sink.close().then([ep = std::move(ep)] () mutable { + return make_exception_future<>(std::move(ep)); + }); + }); + } + +public: + future> + get_full_row_hashes_with_rpc_stream(gms::inet_address remote_node, unsigned node_idx) { + if (remote_node == _myip) { + return get_full_row_hashes_handler(); + } + auto current_hashes = make_lw_shared>(); + return _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx).then( + [this, current_hashes, remote_node, node_idx] + (rpc::sink& sink, rpc::source& source) mutable { + auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source); + auto sink_op = get_full_row_hashes_sink_op(sink); + return when_all_succeed(std::move(source_op), std::move(sink_op)); + }).then([current_hashes] () mutable { + return std::move(*current_hashes); + }); + } + // RPC handler future> get_full_row_hashes_handler() { @@ -1176,8 +1334,8 @@ public: return make_exception_future<>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later", utils::fb_utilities::get_broadcast_address()))); } - rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}", - utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range); + rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}", + utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size); return insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version)); } @@ -1291,6 +1449,102 @@ public: return make_ready_future<>(); } +private: + future<> get_row_diff_source_op( + update_peer_row_hash_sets update_hash_set, + gms::inet_address remote_node, + unsigned node_idx, + rpc::sink& sink, + rpc::source& source) { + auto current_rows = make_lw_shared(); + return repeat([this, current_rows, update_hash_set, remote_node, node_idx, &sink, &source] () mutable { + return source().then([this, current_rows, update_hash_set, remote_node, node_idx] (std::optional> row_opt) mutable { + if (row_opt) { + if (inject_rpc_stream_error) { + throw std::runtime_error("get_row_diff: Inject sender error in source loop"); + } + auto row = std::move(std::get<0>(row_opt.value())); + if (row.cmd == repair_stream_cmd::row_data) { + rlogger.trace("get_row_diff: Got repair_row_on_wire with data"); + current_rows->push_back(std::move(row.row)); + return make_ready_future(stop_iteration::no); + } else if (row.cmd == repair_stream_cmd::end_of_current_rows) { + rlogger.trace("get_row_diff: Got repair_row_on_wire with nullopt"); + return apply_rows(std::move(*current_rows), remote_node, update_working_row_buf::yes, update_hash_set, node_idx).then([current_rows] { + current_rows->clear(); + return make_ready_future(stop_iteration::yes); + }); + } else if (row.cmd == repair_stream_cmd::error) { + throw std::runtime_error("get_row_diff: Peer failed to process"); + } else { + throw std::runtime_error("get_row_diff: Got unexpected repair_stream_cmd"); + } + } else { + _sink_source_for_get_row_diff.mark_source_closed(node_idx); + throw std::runtime_error("get_row_diff: Got unexpected end of stream"); + } + }); + }); + } + + future<> get_row_diff_sink_op( + std::unordered_set set_diff, + needs_all_rows_t needs_all_rows, + rpc::sink& sink, + gms::inet_address remote_node) { + return do_with(std::move(set_diff), [needs_all_rows, remote_node, &sink] (std::unordered_set& set_diff) mutable { + if (inject_rpc_stream_error) { + return make_exception_future<>(std::runtime_error("get_row_diff: Inject sender error in sink loop")); + } + if (needs_all_rows) { + rlogger.trace("get_row_diff: request with repair_stream_cmd::needs_all_rows"); + return sink(repair_hash_with_cmd{repair_stream_cmd::needs_all_rows, repair_hash()}).then([&sink] () mutable { + return sink.flush(); + }); + } + return do_for_each(set_diff, [&sink] (const repair_hash& hash) mutable { + return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash}); + }).then([&sink] () mutable { + return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()}); + }).then([&sink] () mutable { + return sink.flush(); + }); + }).handle_exception([&sink] (std::exception_ptr ep) { + return sink.close().then([ep = std::move(ep)] () mutable { + return make_exception_future<>(std::move(ep)); + }); + }); + } + +public: + future<> get_row_diff_with_rpc_stream( + std::unordered_set set_diff, + needs_all_rows_t needs_all_rows, + update_peer_row_hash_sets update_hash_set, + gms::inet_address remote_node, + unsigned node_idx) { + if (needs_all_rows || !set_diff.empty()) { + if (remote_node == _myip) { + return make_ready_future<>(); + } + if (needs_all_rows) { + set_diff.clear(); + } else { + stats().tx_hashes_nr += set_diff.size(); + _metrics.tx_hashes_nr += set_diff.size(); + } + stats().rpc_call_nr++; + return _sink_source_for_get_row_diff.get_sink_source(remote_node, node_idx).then( + [this, set_diff = std::move(set_diff), needs_all_rows, update_hash_set, remote_node, node_idx] + (rpc::sink& sink, rpc::source& source) mutable { + auto source_op = get_row_diff_source_op(update_hash_set, remote_node, node_idx, sink, source); + auto sink_op = get_row_diff_sink_op(std::move(set_diff), needs_all_rows, sink, remote_node); + return when_all_succeed(std::move(source_op), std::move(sink_op)); + }); + } + return make_ready_future<>(); + } + // RPC handler future get_row_diff_handler(const std::unordered_set& set_diff, needs_all_rows_t needs_all_rows) { return with_gate(_gate, [this, &set_diff, needs_all_rows] { @@ -1301,12 +1555,12 @@ public: // RPC API // Send rows in the _working_row_buf with hash within the given sef_diff - future<> put_row_diff(const std::unordered_set& set_diff, gms::inet_address remote_node) { + future<> put_row_diff(const std::unordered_set& set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node) { if (!set_diff.empty()) { if (remote_node == _myip) { return make_ready_future<>(); } - std::list row_diff = get_row_diff(set_diff); + std::list row_diff = get_row_diff(set_diff, needs_all_rows); if (row_diff.size() != set_diff.size()) { throw std::runtime_error("row_diff.size() != set_diff.size()"); } @@ -1321,6 +1575,83 @@ public: return make_ready_future<>(); } +private: + future<> put_row_diff_source_op( + gms::inet_address remote_node, + unsigned node_idx, + rpc::source& source) { + return repeat([this, remote_node, node_idx, &source] () mutable { + return source().then([this, remote_node, node_idx] (std::optional> status_opt) mutable { + if (status_opt) { + repair_stream_cmd status = std::move(std::get<0>(status_opt.value())); + rlogger.trace("put_row_diff: Got status code from follower={} for put_row_diff, status={}", remote_node, int(status)); + if (status == repair_stream_cmd::put_rows_done) { + return make_ready_future(stop_iteration::yes); + } else if (status == repair_stream_cmd::error) { + throw std::runtime_error(format("put_row_diff: Repair follower={} failed in put_row_diff hanlder, status={}", remote_node, int(status))); + } else { + throw std::runtime_error("put_row_diff: Got unexpected repair_stream_cmd"); + } + } else { + _sink_source_for_put_row_diff.mark_source_closed(node_idx); + throw std::runtime_error("put_row_diff: Got unexpected end of stream"); + } + }); + }); + } + + future<> put_row_diff_sink_op( + repair_rows_on_wire rows, + rpc::sink& sink, + gms::inet_address remote_node) { + return do_with(std::move(rows), [&sink, remote_node] (repair_rows_on_wire& rows) mutable { + return do_for_each(rows, [&sink] (repair_row_on_wire& row) mutable { + rlogger.trace("put_row_diff: send row"); + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)}); + }).then([&sink] () mutable { + rlogger.trace("put_row_diff: send empty row"); + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}).then([&sink] () mutable { + rlogger.trace("put_row_diff: send done"); + return sink.flush(); + }); + }); + }).handle_exception([&sink] (std::exception_ptr ep) { + return sink.close().then([ep = std::move(ep)] () mutable { + return make_exception_future<>(std::move(ep)); + }); + }); + } + +public: + future<> put_row_diff_with_rpc_stream( + const std::unordered_set& set_diff, + needs_all_rows_t needs_all_rows, + gms::inet_address remote_node, unsigned node_idx) { + if (!set_diff.empty()) { + if (remote_node == _myip) { + return make_ready_future<>(); + } + std::list row_diff = get_row_diff(set_diff, needs_all_rows); + if (row_diff.size() != set_diff.size()) { + throw std::runtime_error("row_diff.size() != set_diff.size()"); + } + stats().tx_row_nr += row_diff.size(); + stats().tx_row_nr_peer[remote_node] += row_diff.size(); + stats().tx_row_bytes += get_repair_rows_size(row_diff); + stats().rpc_call_nr++; + return to_repair_rows_on_wire(std::move(row_diff)).then([this, remote_node, node_idx] (repair_rows_on_wire rows) { + return _sink_source_for_put_row_diff.get_sink_source(remote_node, node_idx).then( + [this, rows = std::move(rows), remote_node, node_idx] + (rpc::sink& sink, rpc::source& source) mutable { + auto source_op = put_row_diff_source_op(remote_node, node_idx, source); + auto sink_op = put_row_diff_sink_op(std::move(rows), sink, remote_node); + return when_all_succeed(std::move(source_op), std::move(sink_op)); + }); + }); + } + return make_ready_future<>(); + } + // RPC handler future<> put_row_diff_handler(repair_rows_on_wire rows, gms::inet_address from) { return with_gate(_gate, [this, rows = std::move(rows), from] () mutable { @@ -1329,10 +1660,280 @@ public: } }; +static future repair_get_row_diff_with_rpc_stream_process_op( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source, + bool &error, + std::unordered_set& current_set_diff, + std::optional> hash_cmd_opt) { + repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value()); + rlogger.trace("Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", from, hash_cmd.hash, int(hash_cmd.cmd)); + if (hash_cmd.cmd == repair_stream_cmd::hash_data) { + current_set_diff.insert(hash_cmd.hash); + return make_ready_future(stop_iteration::no); + } else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) { + if (inject_rpc_stream_error) { + return make_exception_future(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop")); + } + bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows; + return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, set_diff = std::move(current_set_diff)] { + auto rm = repair_meta::get_repair_meta(from, repair_meta_id); + return rm->get_row_diff_handler(set_diff, repair_meta::needs_all_rows_t(needs_all_rows)); + }).then([sink] (repair_rows_on_wire rows_on_wire) mutable { + if (rows_on_wire.empty()) { + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}); + } + return do_with(std::move(rows_on_wire), [sink] (repair_rows_on_wire& rows_on_wire) mutable { + return do_for_each(rows_on_wire, [sink] (repair_row_on_wire& row) mutable { + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)}); + }).then([sink] () mutable { + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()}); + }); + }); + }).then([sink] () mutable { + return sink.flush(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + } else { + return make_exception_future(std::runtime_error("Got unexpected repair_stream_cmd")); + } +} + +static future repair_put_row_diff_with_rpc_stream_process_op( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source, + bool& error, + repair_rows_on_wire& current_rows, + std::optional> row_opt) { + auto row = std::move(std::get<0>(row_opt.value())); + if (row.cmd == repair_stream_cmd::row_data) { + rlogger.trace("Got repair_rows_on_wire from peer={}, got row_data", from); + current_rows.push_back(std::move(row.row)); + return make_ready_future(stop_iteration::no); + } else if (row.cmd == repair_stream_cmd::end_of_current_rows) { + rlogger.trace("Got repair_rows_on_wire from peer={}, got end_of_current_rows", from); + return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, rows = std::move(current_rows)] () mutable { + auto rm = repair_meta::get_repair_meta(from, repair_meta_id); + return rm->put_row_diff_handler(std::move(rows), from); + }).then([sink] () mutable { + return sink(repair_stream_cmd::put_rows_done); + }).then([sink] () mutable { + return sink.flush(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + } else { + return make_exception_future(std::runtime_error("Got unexpected repair_stream_cmd")); + } +} + +static future repair_get_full_row_hashes_with_rpc_stream_process_op( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source, + bool &error, + std::optional> status_opt) { + repair_stream_cmd status = std::get<0>(status_opt.value()); + rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status)); + if (status == repair_stream_cmd::get_full_row_hashes) { + return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id] { + auto rm = repair_meta::get_repair_meta(from, repair_meta_id); + return rm->get_full_row_hashes_handler().then([] (std::unordered_set hashes) { + _metrics.tx_hashes_nr += hashes.size(); + return hashes; + }); + }).then([sink] (std::unordered_set hashes) mutable { + return do_with(std::move(hashes), [sink] (std::unordered_set& hashes) mutable { + return do_for_each(hashes, [sink] (const repair_hash& hash) mutable { + return sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash}); + }).then([sink] () mutable { + return sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()}); + }); + }); + }).then([sink] () mutable { + return sink.flush(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + } else { + return make_exception_future(std::runtime_error("Got unexpected repair_stream_cmd")); + } +} + +static future<> repair_get_row_diff_with_rpc_stream_handler( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source) { + return do_with(false, std::unordered_set(), [from, src_cpu_id, repair_meta_id, sink, source] (bool& error, std::unordered_set& current_set_diff) mutable { + return repeat([from, src_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] () mutable { + return source().then([from, src_cpu_id, repair_meta_id, sink, source, &error, ¤t_set_diff] (std::optional> hash_cmd_opt) mutable { + if (hash_cmd_opt) { + if (error) { + return make_ready_future(stop_iteration::no); + } + return repair_get_row_diff_with_rpc_stream_process_op(from, + src_cpu_id, + repair_meta_id, + sink, + source, + error, + current_set_diff, + std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable { + error = true; + return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable { + return sink.close(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + }); + } else { + if (error) { + return make_ready_future(stop_iteration::yes); + } + return sink.close().then([sink] { + return make_ready_future(stop_iteration::yes); + }); + } + }); + }); + }); +} + +static future<> repair_put_row_diff_with_rpc_stream_handler( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source) { + return do_with(false, repair_rows_on_wire(), [from, src_cpu_id, repair_meta_id, sink, source] (bool& error, repair_rows_on_wire& current_rows) mutable { + return repeat([from, src_cpu_id, repair_meta_id, sink, source, ¤t_rows, &error] () mutable { + return source().then([from, src_cpu_id, repair_meta_id, sink, source, ¤t_rows, &error] (std::optional> row_opt) mutable { + if (row_opt) { + if (error) { + return make_ready_future(stop_iteration::no); + } + return repair_put_row_diff_with_rpc_stream_process_op(from, + src_cpu_id, + repair_meta_id, + sink, + source, + error, + current_rows, + std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable { + error = true; + return sink(repair_stream_cmd::error).then([sink] () mutable { + return sink.close(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + }); + } else { + if (error) { + return make_ready_future(stop_iteration::yes); + } + return sink.close().then([sink] { + return make_ready_future(stop_iteration::yes); + }); + } + }); + }); + }); +} + +static future<> repair_get_full_row_hashes_with_rpc_stream_handler( + gms::inet_address from, + uint32_t src_cpu_id, + uint32_t repair_meta_id, + rpc::sink sink, + rpc::source source) { + return repeat([from, src_cpu_id, repair_meta_id, sink, source] () mutable { + return do_with(false, [from, src_cpu_id, repair_meta_id, sink, source] (bool& error) mutable { + return source().then([from, src_cpu_id, repair_meta_id, sink, source, &error] (std::optional> status_opt) mutable { + if (status_opt) { + if (error) { + return make_ready_future(stop_iteration::no); + } + return repair_get_full_row_hashes_with_rpc_stream_process_op(from, + src_cpu_id, + repair_meta_id, + sink, + source, + error, + std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable { + error = true; + return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable { + return sink.close(); + }).then([sink] { + return make_ready_future(stop_iteration::no); + }); + }); + } else { + if (error) { + return make_ready_future(stop_iteration::yes); + } + return sink.close().then([sink] { + return make_ready_future(stop_iteration::yes); + }); + } + }); + }); + }); +} + future<> repair_init_messaging_service_handler(repair_service& rs, distributed& sys_dist_ks, distributed& view_update_generator) { _sys_dist_ks = &sys_dist_ks; _view_update_generator = &view_update_generator; return netw::get_messaging_service().invoke_on_all([] (auto& ms) { + ms.register_repair_get_row_diff_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source source) { + auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); + auto from = cinfo.retrieve_auxiliary("baddr"); + return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(), + [&ms, src_cpu_id, from, repair_meta_id, source] () mutable { + auto sink = ms.make_sink_for_repair_get_row_diff_with_rpc_stream(source); + repair_get_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception( + [from, repair_meta_id, sink, source] (std::exception_ptr ep) { + rlogger.info("Failed to process get_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep); + }); + return make_ready_future>(sink); + }); + }); + ms.register_repair_put_row_diff_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source source) { + auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); + auto from = cinfo.retrieve_auxiliary("baddr"); + return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(), + [&ms, src_cpu_id, from, repair_meta_id, source] () mutable { + auto sink = ms.make_sink_for_repair_put_row_diff_with_rpc_stream(source); + repair_put_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception( + [from, repair_meta_id, sink, source] (std::exception_ptr ep) { + rlogger.info("Failed to process put_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep); + }); + return make_ready_future>(sink); + }); + }); + ms.register_repair_get_full_row_hashes_with_rpc_stream([&ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source source) { + auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); + auto from = cinfo.retrieve_auxiliary("baddr"); + return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(), + [&ms, src_cpu_id, from, repair_meta_id, source] () mutable { + auto sink = ms.make_sink_for_repair_get_full_row_hashes_with_rpc_stream(source); + repair_get_full_row_hashes_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception( + [from, repair_meta_id, sink, source] (std::exception_ptr ep) { + rlogger.info("Failed to process get_full_row_hashes_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep); + }); + return make_ready_future>(sink); + }); + }); ms.register_repair_get_full_row_hashes([] (const rpc::client_info& cinfo, uint32_t repair_meta_id) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto from = cinfo.retrieve_auxiliary("baddr"); @@ -1470,9 +2071,6 @@ class row_level_repair { // A flag indicates any error during the repair bool _failed = false; - // Max buffer size per repair round - static constexpr size_t _max_row_buf_size = 256 * 1024; - // Seed for the repair row hashing. If we ever had a hash conflict for a row // and we are not using stable hash, there is chance we will fix the row in // the next repair. @@ -1499,6 +2097,11 @@ private: all_done, }; + size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) { + // Max buffer size per repair round + return is_rpc_stream_supported(algo) ? 32 * 1024 * 1024 : 256 * 1024; + } + // Step A: Negotiate sync boundary to use op_status negotiate_sync_boundary(repair_meta& master) { check_in_shutdown(); @@ -1624,13 +2227,26 @@ private: // Fast path: if local has zero row and remote has rows, request them all. if (master.working_row_buf_combined_hash() == repair_hash() && combined_hashes[node_idx + 1] != repair_hash()) { - master.get_row_diff_and_update_peer_row_hash_sets(node, node_idx).get(); + master.peer_row_hash_sets(node_idx).clear(); + if (master.use_rpc_stream()) { + rlogger.debug("FastPath: get_row_diff with needs_all_rows_t::yes rpc stream"); + master.get_row_diff_with_rpc_stream({}, repair_meta::needs_all_rows_t::yes, repair_meta::update_peer_row_hash_sets::yes, node, node_idx).get(); + } else { + rlogger.debug("FastPath: get_row_diff with needs_all_rows_t::yes rpc verb"); + master.get_row_diff_and_update_peer_row_hash_sets(node, node_idx).get(); + } continue; } + rlogger.debug("Before master.get_full_row_hashes for node {}, hash_sets={}", + node, master.peer_row_hash_sets(node_idx).size()); // Ask the peer to send the full list hashes in the working row buf. - master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes(node).get0(); - rlogger.debug("Calling master.get_full_row_hashes for node {}, hash_sets={}", + if (master.use_rpc_stream()) { + master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes_with_rpc_stream(node, node_idx).get0(); + } else { + master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes(node).get0(); + } + rlogger.debug("After master.get_full_row_hashes for node {}, hash_sets={}", node, master.peer_row_hash_sets(node_idx).size()); // With hashes of rows from peer node, we can figure out @@ -1642,12 +2258,16 @@ private: // between repair master and repair follower 2. std::unordered_set set_diff = repair_meta::get_set_diff(master.peer_row_hash_sets(node_idx), master.working_row_hashes()); // Request missing sets from peer node - rlogger.debug("Calling master.get_row_diff to node {}, local={}, peer={}, set_diff={}", - node, master.working_row_hashes().size(), master.peer_row_hash_sets(node_idx).size(), set_diff); + rlogger.debug("Before get_row_diff to node {}, local={}, peer={}, set_diff={}", + node, master.working_row_hashes().size(), master.peer_row_hash_sets(node_idx).size(), set_diff.size()); // If we need to pull all rows from the peer. We can avoid // sending the row hashes on wire by setting needs_all_rows flag. auto needs_all_rows = repair_meta::needs_all_rows_t(set_diff.size() == master.peer_row_hash_sets(node_idx).size()); - master.get_row_diff(std::move(set_diff), needs_all_rows, node, node_idx).get(); + if (master.use_rpc_stream()) { + master.get_row_diff_with_rpc_stream(std::move(set_diff), needs_all_rows, repair_meta::update_peer_row_hash_sets::no, node, node_idx).get(); + } else { + master.get_row_diff(std::move(set_diff), needs_all_rows, node, node_idx).get(); + } rlogger.debug("After get_row_diff node {}, hash_sets={}", master.myip(), master.working_row_hashes().size()); } return op_status::next_step; @@ -1662,8 +2282,13 @@ private: std::unordered_set local_row_hash_sets = master.working_row_hashes(); parallel_for_each(boost::irange(size_t(0), _all_live_peer_nodes.size()), [&, this] (size_t idx) { auto set_diff = repair_meta::get_set_diff(local_row_hash_sets, master.peer_row_hash_sets(idx)); - rlogger.trace("Calling master.put_row_diff to node {}, set_diff={}", _all_live_peer_nodes[idx], set_diff.size()); - return master.put_row_diff(set_diff, _all_live_peer_nodes[idx]); + auto needs_all_rows = repair_meta::needs_all_rows_t(master.peer_row_hash_sets(idx).empty()); + rlogger.debug("Calling master.put_row_diff to node {}, set_diff={}, needs_all_rows={}", _all_live_peer_nodes[idx], set_diff.size(), needs_all_rows); + if (master.use_rpc_stream()) { + return master.put_row_diff_with_rpc_stream(set_diff, needs_all_rows, _all_live_peer_nodes[idx], idx); + } else { + return master.put_row_diff(set_diff, needs_all_rows, _all_live_peer_nodes[idx]); + } }).get(); master.stats().round_nr_slow_path++; } @@ -1675,6 +2300,7 @@ public: _ri.check_in_abort(); auto repair_meta_id = repair_meta::get_next_repair_meta_id().get0(); auto algorithm = get_common_diff_detect_algorithm(_all_live_peer_nodes); + auto max_row_buf_size = get_max_row_buf_size(algorithm); auto master_node_shard_config = shard_config { engine().cpu_id(), dht::global_partitioner().shard_count(), @@ -1689,7 +2315,7 @@ public: s, _range, algorithm, - _max_row_buf_size, + max_row_buf_size, _seed, repair_meta::repair_master::yes, repair_meta_id, @@ -1699,8 +2325,8 @@ public: // All nodes including the node itself. _all_nodes.insert(_all_nodes.begin(), master.myip()); - rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}", - master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, schema_version, _range, _seed); + rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_size={}", + master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, schema_version, _range, _seed, max_row_buf_size); try {