repair: Turn repair_range a repair_info method

This routine uses global migration_manager pointer. Next patches
will keep the reference on a manager on repair_info and it will
be possible to use this->migration_manager reference.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-05-13 13:41:07 +03:00
parent 5c24b0750e
commit d92d404629
2 changed files with 22 additions and 20 deletions

View File

@@ -730,29 +730,29 @@ repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range
// Repair a single local range, multiple column families.
// Comparable to RepairSession in Origin
static future<> repair_range(repair_info& ri, const dht::token_range& range) {
future<> repair_info::repair_range(const dht::token_range& range) {
auto id = utils::UUID_gen::get_time_UUID();
repair_neighbors neighbors = ri.get_repair_neighbors(range);
return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [&ri, range, id] (auto& neighbors, auto& mandatory_neighbors) {
repair_neighbors neighbors = get_repair_neighbors(range);
return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [this, range, id] (auto& neighbors, auto& mandatory_neighbors) {
auto live_neighbors = boost::copy_range<std::vector<gms::inet_address>>(neighbors |
boost::adaptors::filtered([] (const gms::inet_address& node) { return gms::get_local_gossiper().is_alive(node); }));
for (auto& node : mandatory_neighbors) {
auto it = std::find(live_neighbors.begin(), live_neighbors.end(), node);
if (it == live_neighbors.end()) {
ri.nr_failed_ranges++;
nr_failed_ranges++;
auto status = format("failed: mandatory neighbor={} is not alive", node);
rlogger.error("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
ri.ranges_index, ri.ranges.size(), ri.id, ri.shard, ri.keyspace, ri.table_names(), range, neighbors, live_neighbors, status);
ri.abort();
ranges_index, ranges.size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
abort();
return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}",
node, ri.keyspace, mandatory_neighbors)));
node, keyspace, mandatory_neighbors)));
}
}
if (live_neighbors.size() != neighbors.size()) {
ri.nr_failed_ranges++;
nr_failed_ranges++;
auto status = live_neighbors.empty() ? "skipped" : "partial";
rlogger.warn("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
ri.ranges_index, ri.ranges.size(), ri.id, ri.shard, ri.keyspace, ri.table_names(), range, neighbors, live_neighbors, status);
ranges_index, ranges.size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
if (live_neighbors.empty()) {
return make_ready_future<>();
}
@@ -761,27 +761,27 @@ static future<> repair_range(repair_info& ri, const dht::token_range& range) {
if (neighbors.empty()) {
auto status = "skipped_no_followers";
rlogger.warn("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
ri.ranges_index, ri.ranges.size(), ri.id, ri.shard, ri.keyspace, ri.table_names(), range, neighbors, live_neighbors, status);
ranges_index, ranges.size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
return make_ready_future<>();
}
return _migration_manager->local().sync_schema(ri.db.local(), neighbors).then([&neighbors, &ri, range, id] {
return do_for_each(ri.table_ids.begin(), ri.table_ids.end(), [&ri, &neighbors, range] (utils::UUID table_id) {
return _migration_manager->local().sync_schema(db.local(), neighbors).then([this, &neighbors, range, id] {
return do_for_each(table_ids.begin(), table_ids.end(), [this, &neighbors, range] (utils::UUID table_id) {
sstring cf;
try {
cf = ri.db.local().find_column_family(table_id).schema()->cf_name();
cf = db.local().find_column_family(table_id).schema()->cf_name();
} catch (no_such_column_family&) {
return make_ready_future<>();
}
ri._sub_ranges_nr++;
_sub_ranges_nr++;
// Row level repair
if (ri.dropped_tables.contains(cf)) {
if (dropped_tables.contains(cf)) {
return make_ready_future<>();
}
return repair_cf_range_row_level(ri, cf, table_id, range, neighbors).handle_exception_type([&ri, cf] (no_such_column_family&) mutable {
ri.dropped_tables.insert(cf);
return repair_cf_range_row_level(*this, cf, table_id, range, neighbors).handle_exception_type([this, cf] (no_such_column_family&) mutable {
dropped_tables.insert(cf);
return make_ready_future<>();
}).handle_exception([&ri] (std::exception_ptr ep) mutable {
ri.nr_failed_ranges++;
}).handle_exception([this] (std::exception_ptr ep) mutable {
nr_failed_ranges++;
return make_exception_future<>(std::move(ep));
});
});
@@ -1068,7 +1068,7 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
ri->ranges_index++;
rlogger.info("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}",
ri->ranges_index, ri->ranges.size(), ri->id, ri->shard, ri->keyspace, ri->table_names(), range);
return repair_range(*ri, range).then([ri] {
return ri->repair_range(range).then([ri] {
if (ri->reason == streaming::stream_reason::bootstrap) {
_node_ops_metrics.bootstrap_finished_ranges++;
} else if (ri->reason == streaming::stream_reason::replace) {

View File

@@ -258,6 +258,8 @@ public:
const std::optional<utils::UUID>& ops_uuid() const {
return _ops_uuid;
};
future<> repair_range(const dht::token_range& range);
};
// The repair_tracker tracks ongoing repair operations and their progress.