From 796ec5996ff7df24da4fc8530dfb5bc578db316a Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 7 Dec 2022 18:13:10 +0200 Subject: [PATCH] repair: coroutinize repair_range --- repair/repair.cc | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index c19de33da2..e3fcb1665e 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -635,8 +635,10 @@ future<> shard_repair_task_impl::repair_range(const dht::token_range& range, ::t check_in_shutdown(); check_in_abort(); ranges_index++; - repair_neighbors neighbors = get_repair_neighbors(range); - return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [this, range, table_id] (auto& neighbors, auto& mandatory_neighbors) { + repair_neighbors r_neighbors = get_repair_neighbors(range); + auto neighbors = std::move(r_neighbors.all); + auto mandatory_neighbors = std::move(r_neighbors.mandatory); + { auto live_neighbors = boost::copy_range>(neighbors | boost::adaptors::filtered([this] (const gms::inet_address& node) { return gossiper.is_alive(node); })); for (auto& node : mandatory_neighbors) { @@ -647,7 +649,7 @@ future<> shard_repair_task_impl::repair_range(const dht::token_range& range, ::t rlogger.error("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}", id.uuid(), ranges_index, ranges_size(), id.shard(), _status.keyspace, table_names(), range, neighbors, live_neighbors, status); abort_repair_info(); - return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}", + co_await coroutine::return_exception(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}", node, _status.keyspace, mandatory_neighbors))); } } @@ -657,7 +659,7 @@ future<> shard_repair_task_impl::repair_range(const dht::token_range& range, ::t rlogger.warn("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}", id.uuid(), ranges_index, ranges_size(), id.shard(), _status.keyspace, table_names(), range, neighbors, live_neighbors, status); if (live_neighbors.empty()) { - return make_ready_future<>(); + co_return; } neighbors.swap(live_neighbors); } @@ -665,30 +667,32 @@ future<> shard_repair_task_impl::repair_range(const dht::token_range& range, ::t auto status = "skipped_no_followers"; rlogger.warn("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}", id.uuid(), ranges_index, ranges_size(), id.shard(), _status.keyspace, table_names(), range, neighbors, live_neighbors, status); - return make_ready_future<>(); + co_return; } rlogger.debug("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}", id.uuid(), ranges_index, ranges_size(), id.shard(), _status.keyspace, table_names(), range, neighbors, live_neighbors); - return mm.sync_schema(db.local(), neighbors).then([this, &neighbors, range, table_id] { + co_await mm.sync_schema(db.local(), neighbors); + { sstring cf; try { cf = db.local().find_column_family(table_id).schema()->cf_name(); } catch (replica::no_such_column_family&) { - return make_ready_future<>(); + co_return; } // Row level repair if (dropped_tables.contains(cf)) { - return make_ready_future<>(); + co_return; } - return repair_cf_range_row_level(*this, cf, table_id, range, neighbors).handle_exception_type([this, cf] (replica::no_such_column_family&) mutable { + try { + co_await repair_cf_range_row_level(*this, cf, table_id, range, neighbors); + } catch (replica::no_such_column_family&) { dropped_tables.insert(cf); - return make_ready_future<>(); - }).handle_exception([this] (std::exception_ptr ep) mutable { + } catch (...) { nr_failed_ranges++; - return make_exception_future<>(std::move(ep)); - }); - }); - }); + throw; + } + } + } } void repair_stats::add(const repair_stats& o) {