From 194507dffa5abe6748e52c8bbbbf6791f4706964 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 8 Nov 2023 10:10:57 +0800 Subject: [PATCH] repair: Convert put_row_diff_with_rpc_stream to use coroutine It will be easier to add more logics in this function. --- repair/row_level.cc | 59 +++++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index 99693014ad..e76ad0e989 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -1871,36 +1871,37 @@ public: repair_hash_set set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node, unsigned node_idx) { - if (!set_diff.empty()) { - if (remote_node == _myip) { - return make_ready_future<>(); - } - size_t sz = set_diff.size(); - return get_row_diff(std::move(set_diff), needs_all_rows).then([this, remote_node, node_idx, sz] (std::list row_diff) { - if (row_diff.size() != sz) { - rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.", - _schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz); - } - return do_with(std::move(row_diff), [this, remote_node, node_idx] (std::list& row_diff) { - return get_repair_rows_size(row_diff).then([this, remote_node, node_idx, &row_diff] (size_t row_bytes) mutable { - stats().tx_row_nr += row_diff.size(); - stats().tx_row_nr_peer[remote_node] += row_diff.size(); - stats().tx_row_bytes += row_bytes; - stats().rpc_call_nr++; - return to_repair_rows_on_wire(std::move(row_diff)).then([this, remote_node, node_idx] (repair_rows_on_wire rows) { - return _sink_source_for_put_row_diff.get_sink_source(remote_node, node_idx).then_unpack( - [this, rows = std::move(rows), remote_node, node_idx] - (rpc::sink& sink, rpc::source& source) mutable { - auto source_op = put_row_diff_source_op(remote_node, node_idx, source); - auto sink_op = put_row_diff_sink_op(std::move(rows), sink, remote_node); - return when_all_succeed(std::move(source_op), std::move(sink_op)).discard_result(); - }); - }); - }); - }); - }); + if (set_diff.empty()) { + co_return; } - return make_ready_future<>(); + if (remote_node == _myip) { + co_return; + } + size_t sz = set_diff.size(); + + std::list row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows); + if (row_diff.size() != sz) { + rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.", + _schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz); + } + + size_t row_bytes = co_await get_repair_rows_size(row_diff); + + stats().tx_row_nr += row_diff.size(); + stats().tx_row_nr_peer[remote_node] += row_diff.size(); + stats().tx_row_bytes += row_bytes; + stats().rpc_call_nr++; + + repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff)); + + auto [sink, source] = co_await _sink_source_for_put_row_diff.get_sink_source(remote_node, node_idx); + auto source_op = [&] () mutable -> future<> { + co_await put_row_diff_source_op(remote_node, node_idx, source); + }; + auto sink_op = [&] () mutable -> future<> { + co_await put_row_diff_sink_op(std::move(rows), sink, remote_node); + }; + co_await coroutine::all(source_op, sink_op); } // RPC handler