repair: Convert put_row_diff_with_rpc_stream to use coroutine

It will be easier to add more logics in this function.
This commit is contained in:
Asias He
2023-11-08 10:10:57 +08:00
parent a3621dbd3e
commit 194507dffa

View File

@@ -1871,36 +1871,37 @@ public:
repair_hash_set set_diff,
needs_all_rows_t needs_all_rows,
gms::inet_address remote_node, unsigned node_idx) {
if (!set_diff.empty()) {
if (remote_node == _myip) {
return make_ready_future<>();
}
size_t sz = set_diff.size();
return get_row_diff(std::move(set_diff), needs_all_rows).then([this, remote_node, node_idx, sz] (std::list<repair_row> row_diff) {
if (row_diff.size() != sz) {
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
}
return do_with(std::move(row_diff), [this, remote_node, node_idx] (std::list<repair_row>& row_diff) {
return get_repair_rows_size(row_diff).then([this, remote_node, node_idx, &row_diff] (size_t row_bytes) mutable {
stats().tx_row_nr += row_diff.size();
stats().tx_row_nr_peer[remote_node] += row_diff.size();
stats().tx_row_bytes += row_bytes;
stats().rpc_call_nr++;
return to_repair_rows_on_wire(std::move(row_diff)).then([this, remote_node, node_idx] (repair_rows_on_wire rows) {
return _sink_source_for_put_row_diff.get_sink_source(remote_node, node_idx).then_unpack(
[this, rows = std::move(rows), remote_node, node_idx]
(rpc::sink<repair_row_on_wire_with_cmd>& sink, rpc::source<repair_stream_cmd>& source) mutable {
auto source_op = put_row_diff_source_op(remote_node, node_idx, source);
auto sink_op = put_row_diff_sink_op(std::move(rows), sink, remote_node);
return when_all_succeed(std::move(source_op), std::move(sink_op)).discard_result();
});
});
});
});
});
if (set_diff.empty()) {
co_return;
}
return make_ready_future<>();
if (remote_node == _myip) {
co_return;
}
size_t sz = set_diff.size();
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
if (row_diff.size() != sz) {
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
}
size_t row_bytes = co_await get_repair_rows_size(row_diff);
stats().tx_row_nr += row_diff.size();
stats().tx_row_nr_peer[remote_node] += row_diff.size();
stats().tx_row_bytes += row_bytes;
stats().rpc_call_nr++;
repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff));
auto [sink, source] = co_await _sink_source_for_put_row_diff.get_sink_source(remote_node, node_idx);
auto source_op = [&] () mutable -> future<> {
co_await put_row_diff_source_op(remote_node, node_idx, source);
};
auto sink_op = [&] () mutable -> future<> {
co_await put_row_diff_sink_op(std::move(rows), sink, remote_node);
};
co_await coroutine::all(source_op, sink_op);
}
// RPC handler