repair: coroutinize repair_range

This commit is contained in:
Avi Kivity
2022-12-07 18:13:10 +02:00
parent 47a8fad2a2
commit 796ec5996f

View File

@@ -635,8 +635,10 @@ future<> shard_repair_task_impl::repair_range(const dht::token_range& range, ::t
check_in_shutdown();
check_in_abort();
ranges_index++;
repair_neighbors neighbors = get_repair_neighbors(range);
return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [this, range, table_id] (auto& neighbors, auto& mandatory_neighbors) {
repair_neighbors r_neighbors = get_repair_neighbors(range);
auto neighbors = std::move(r_neighbors.all);
auto mandatory_neighbors = std::move(r_neighbors.mandatory);
{
auto live_neighbors = boost::copy_range<std::vector<gms::inet_address>>(neighbors |
boost::adaptors::filtered([this] (const gms::inet_address& node) { return gossiper.is_alive(node); }));
for (auto& node : mandatory_neighbors) {
@@ -647,7 +649,7 @@ future<> shard_repair_task_impl::repair_range(const dht::token_range& range, ::t
rlogger.error("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
id.uuid(), ranges_index, ranges_size(), id.shard(), _status.keyspace, table_names(), range, neighbors, live_neighbors, status);
abort_repair_info();
return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}",
co_await coroutine::return_exception(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}",
node, _status.keyspace, mandatory_neighbors)));
}
}
@@ -657,7 +659,7 @@ future<> shard_repair_task_impl::repair_range(const dht::token_range& range, ::t
rlogger.warn("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
id.uuid(), ranges_index, ranges_size(), id.shard(), _status.keyspace, table_names(), range, neighbors, live_neighbors, status);
if (live_neighbors.empty()) {
return make_ready_future<>();
co_return;
}
neighbors.swap(live_neighbors);
}
@@ -665,30 +667,32 @@ future<> shard_repair_task_impl::repair_range(const dht::token_range& range, ::t
auto status = "skipped_no_followers";
rlogger.warn("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
id.uuid(), ranges_index, ranges_size(), id.shard(), _status.keyspace, table_names(), range, neighbors, live_neighbors, status);
return make_ready_future<>();
co_return;
}
rlogger.debug("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}",
id.uuid(), ranges_index, ranges_size(), id.shard(), _status.keyspace, table_names(), range, neighbors, live_neighbors);
return mm.sync_schema(db.local(), neighbors).then([this, &neighbors, range, table_id] {
co_await mm.sync_schema(db.local(), neighbors);
{
sstring cf;
try {
cf = db.local().find_column_family(table_id).schema()->cf_name();
} catch (replica::no_such_column_family&) {
return make_ready_future<>();
co_return;
}
// Row level repair
if (dropped_tables.contains(cf)) {
return make_ready_future<>();
co_return;
}
return repair_cf_range_row_level(*this, cf, table_id, range, neighbors).handle_exception_type([this, cf] (replica::no_such_column_family&) mutable {
try {
co_await repair_cf_range_row_level(*this, cf, table_id, range, neighbors);
} catch (replica::no_such_column_family&) {
dropped_tables.insert(cf);
return make_ready_future<>();
}).handle_exception([this] (std::exception_ptr ep) mutable {
} catch (...) {
nr_failed_ranges++;
return make_exception_future<>(std::move(ep));
});
});
});
throw;
}
}
}
}
void repair_stats::add(const repair_stats& o) {