From d2f1b4448742b60fc91fc2efdd92736c41b899ca Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 4 Sep 2024 15:03:00 +0300 Subject: [PATCH] repair: row_level: row_level_repair::negotiate_sync_boundary() The function itself is threaded, but the inner lambda is coroutinized. --- repair/row_level.cc | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index 46324671a1..f15ec86d13 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2667,14 +2667,15 @@ private: rlogger.debug("ROUND {}, _last_sync_boundary={}, _current_sync_boundary={}, _skipped_sync_boundary={}", master.stats().round_nr, master.last_sync_boundary(), master.current_sync_boundary(), _skipped_sync_boundary); master.stats().round_nr++; - parallel_for_each(master.all_nodes(), [&, this] (repair_node_state& ns) { + parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> { const auto& node = ns.node; auto dst_cpu_id = ns.shard; // By calling `get_sync_boundary`, the `_last_sync_boundary` // is moved to the `_current_sync_boundary` or // `_skipped_sync_boundary` if it is not std::nullopt. ns.state = repair_state::get_sync_boundary_started; - return master.get_sync_boundary(node, _skipped_sync_boundary, dst_cpu_id).then([&, this] (get_sync_boundary_response res) { + try { + get_sync_boundary_response res = co_await master.get_sync_boundary(node, _skipped_sync_boundary, dst_cpu_id); ns.state = repair_state::get_sync_boundary_finished; master.stats().row_from_disk_bytes[node] += res.new_rows_size; master.stats().row_from_disk_nr[node] += res.new_rows_nr; @@ -2689,12 +2690,13 @@ private: } rlogger.debug("Called master.get_sync_boundary for node {} sb={}, combined_csum={}, row_size={}, zero_rows={}, skipped_sync_boundary={}", node, res.boundary, res.row_buf_combined_csum, res.row_buf_size, _zero_rows, _skipped_sync_boundary); - }).handle_exception([this, node] (std::exception_ptr ep) { + } catch (...) { + auto ep = std::current_exception(); rlogger.warn("repair[{}]: get_sync_boundary: got error from node={}, keyspace={}, table={}, range={}, error={}", _shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, ep); - return make_exception_future<>(std::move(ep)); - }); - }).get(); + std::rethrow_exception(ep); + } + })).get(); rlogger.debug("sync_boundaries nr={}, combined_hashes nr={}", _sync_boundaries.size(), _combined_hashes.size()); if (!_sync_boundaries.empty()) {