repair: row_level: row_level_repair::negotiate_sync_boundary()

The function itself is threaded, but the inner lambda is coroutinized.
This commit is contained in:
Avi Kivity
2024-09-04 15:03:00 +03:00
parent 645e39e746
commit d2f1b44487

View File

@@ -2667,14 +2667,15 @@ private:
rlogger.debug("ROUND {}, _last_sync_boundary={}, _current_sync_boundary={}, _skipped_sync_boundary={}",
master.stats().round_nr, master.last_sync_boundary(), master.current_sync_boundary(), _skipped_sync_boundary);
master.stats().round_nr++;
parallel_for_each(master.all_nodes(), [&, this] (repair_node_state& ns) {
parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> {
const auto& node = ns.node;
auto dst_cpu_id = ns.shard;
// By calling `get_sync_boundary`, the `_last_sync_boundary`
// is moved to the `_current_sync_boundary` or
// `_skipped_sync_boundary` if it is not std::nullopt.
ns.state = repair_state::get_sync_boundary_started;
return master.get_sync_boundary(node, _skipped_sync_boundary, dst_cpu_id).then([&, this] (get_sync_boundary_response res) {
try {
get_sync_boundary_response res = co_await master.get_sync_boundary(node, _skipped_sync_boundary, dst_cpu_id);
ns.state = repair_state::get_sync_boundary_finished;
master.stats().row_from_disk_bytes[node] += res.new_rows_size;
master.stats().row_from_disk_nr[node] += res.new_rows_nr;
@@ -2689,12 +2690,13 @@ private:
}
rlogger.debug("Called master.get_sync_boundary for node {} sb={}, combined_csum={}, row_size={}, zero_rows={}, skipped_sync_boundary={}",
node, res.boundary, res.row_buf_combined_csum, res.row_buf_size, _zero_rows, _skipped_sync_boundary);
}).handle_exception([this, node] (std::exception_ptr ep) {
} catch (...) {
auto ep = std::current_exception();
rlogger.warn("repair[{}]: get_sync_boundary: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, ep);
return make_exception_future<>(std::move(ep));
});
}).get();
std::rethrow_exception(ep);
}
})).get();
rlogger.debug("sync_boundaries nr={}, combined_hashes nr={}",
_sync_boundaries.size(), _combined_hashes.size());
if (!_sync_boundaries.empty()) {