repair: Turn repair_range a repair_info method
This routine uses global migration_manager pointer. Next patches will keep the reference on a manager on repair_info and it will be possible to use this->migration_manager reference. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -730,29 +730,29 @@ repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range
|
||||
|
||||
// Repair a single local range, multiple column families.
|
||||
// Comparable to RepairSession in Origin
|
||||
static future<> repair_range(repair_info& ri, const dht::token_range& range) {
|
||||
future<> repair_info::repair_range(const dht::token_range& range) {
|
||||
auto id = utils::UUID_gen::get_time_UUID();
|
||||
repair_neighbors neighbors = ri.get_repair_neighbors(range);
|
||||
return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [&ri, range, id] (auto& neighbors, auto& mandatory_neighbors) {
|
||||
repair_neighbors neighbors = get_repair_neighbors(range);
|
||||
return do_with(std::move(neighbors.all), std::move(neighbors.mandatory), [this, range, id] (auto& neighbors, auto& mandatory_neighbors) {
|
||||
auto live_neighbors = boost::copy_range<std::vector<gms::inet_address>>(neighbors |
|
||||
boost::adaptors::filtered([] (const gms::inet_address& node) { return gms::get_local_gossiper().is_alive(node); }));
|
||||
for (auto& node : mandatory_neighbors) {
|
||||
auto it = std::find(live_neighbors.begin(), live_neighbors.end(), node);
|
||||
if (it == live_neighbors.end()) {
|
||||
ri.nr_failed_ranges++;
|
||||
nr_failed_ranges++;
|
||||
auto status = format("failed: mandatory neighbor={} is not alive", node);
|
||||
rlogger.error("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
|
||||
ri.ranges_index, ri.ranges.size(), ri.id, ri.shard, ri.keyspace, ri.table_names(), range, neighbors, live_neighbors, status);
|
||||
ri.abort();
|
||||
ranges_index, ranges.size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
abort();
|
||||
return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}",
|
||||
node, ri.keyspace, mandatory_neighbors)));
|
||||
node, keyspace, mandatory_neighbors)));
|
||||
}
|
||||
}
|
||||
if (live_neighbors.size() != neighbors.size()) {
|
||||
ri.nr_failed_ranges++;
|
||||
nr_failed_ranges++;
|
||||
auto status = live_neighbors.empty() ? "skipped" : "partial";
|
||||
rlogger.warn("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
|
||||
ri.ranges_index, ri.ranges.size(), ri.id, ri.shard, ri.keyspace, ri.table_names(), range, neighbors, live_neighbors, status);
|
||||
ranges_index, ranges.size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
if (live_neighbors.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -761,27 +761,27 @@ static future<> repair_range(repair_info& ri, const dht::token_range& range) {
|
||||
if (neighbors.empty()) {
|
||||
auto status = "skipped_no_followers";
|
||||
rlogger.warn("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
|
||||
ri.ranges_index, ri.ranges.size(), ri.id, ri.shard, ri.keyspace, ri.table_names(), range, neighbors, live_neighbors, status);
|
||||
ranges_index, ranges.size(), id, shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _migration_manager->local().sync_schema(ri.db.local(), neighbors).then([&neighbors, &ri, range, id] {
|
||||
return do_for_each(ri.table_ids.begin(), ri.table_ids.end(), [&ri, &neighbors, range] (utils::UUID table_id) {
|
||||
return _migration_manager->local().sync_schema(db.local(), neighbors).then([this, &neighbors, range, id] {
|
||||
return do_for_each(table_ids.begin(), table_ids.end(), [this, &neighbors, range] (utils::UUID table_id) {
|
||||
sstring cf;
|
||||
try {
|
||||
cf = ri.db.local().find_column_family(table_id).schema()->cf_name();
|
||||
cf = db.local().find_column_family(table_id).schema()->cf_name();
|
||||
} catch (no_such_column_family&) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
ri._sub_ranges_nr++;
|
||||
_sub_ranges_nr++;
|
||||
// Row level repair
|
||||
if (ri.dropped_tables.contains(cf)) {
|
||||
if (dropped_tables.contains(cf)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return repair_cf_range_row_level(ri, cf, table_id, range, neighbors).handle_exception_type([&ri, cf] (no_such_column_family&) mutable {
|
||||
ri.dropped_tables.insert(cf);
|
||||
return repair_cf_range_row_level(*this, cf, table_id, range, neighbors).handle_exception_type([this, cf] (no_such_column_family&) mutable {
|
||||
dropped_tables.insert(cf);
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([&ri] (std::exception_ptr ep) mutable {
|
||||
ri.nr_failed_ranges++;
|
||||
}).handle_exception([this] (std::exception_ptr ep) mutable {
|
||||
nr_failed_ranges++;
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
@@ -1068,7 +1068,7 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
ri->ranges_index++;
|
||||
rlogger.info("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}",
|
||||
ri->ranges_index, ri->ranges.size(), ri->id, ri->shard, ri->keyspace, ri->table_names(), range);
|
||||
return repair_range(*ri, range).then([ri] {
|
||||
return ri->repair_range(range).then([ri] {
|
||||
if (ri->reason == streaming::stream_reason::bootstrap) {
|
||||
_node_ops_metrics.bootstrap_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::replace) {
|
||||
|
||||
@@ -258,6 +258,8 @@ public:
|
||||
const std::optional<utils::UUID>& ops_uuid() const {
|
||||
return _ops_uuid;
|
||||
};
|
||||
|
||||
future<> repair_range(const dht::token_range& range);
|
||||
};
|
||||
|
||||
// The repair_tracker tracks ongoing repair operations and their progress.
|
||||
|
||||
Reference in New Issue
Block a user