repair: stop relying on ROW_LEVEL_REPAIR feature
The feature is going away because it's over 2 years old, so the code which depended on it becomes unconditional.
This commit is contained in:
287
repair/repair.cc
287
repair/repair.cc
@@ -826,7 +826,6 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
||||
, ignore_nodes(ignore_nodes_)
|
||||
, reason(reason_)
|
||||
, nr_ranges_total(ranges.size())
|
||||
, _row_level_repair(db.local().features().cluster_supports_row_level_repair())
|
||||
, _ops_uuid(std::move(ops_uuid)) {
|
||||
}
|
||||
|
||||
@@ -934,211 +933,6 @@ repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range
|
||||
neighbors[range];
|
||||
}
|
||||
|
||||
// Repair a single cf in a single local range.
|
||||
// Comparable to RepairJob in Origin.
|
||||
static future<> repair_cf_range(repair_info& ri,
|
||||
sstring cf, ::dht::token_range range,
|
||||
const std::vector<gms::inet_address>& neighbors) {
|
||||
if (neighbors.empty()) {
|
||||
// Nothing to do in this case...
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
ri.check_in_abort();
|
||||
return estimate_partitions(ri.db, ri.keyspace, cf, range).then([&ri, cf, range, &neighbors] (uint64_t estimated_partitions) {
|
||||
range_splitter ranges(range, estimated_partitions, ri.target_partitions);
|
||||
return do_with(seastar::gate(), true, std::move(cf), std::move(ranges),
|
||||
[&ri, &neighbors] (auto& completion, auto& success, const auto& cf, auto& ranges) {
|
||||
return do_until([&ranges] () { return !ranges.has_next(); },
|
||||
[&ranges, &ri, &completion, &success, &neighbors, &cf] () {
|
||||
auto range = ranges.next();
|
||||
check_in_shutdown();
|
||||
ri.check_in_abort();
|
||||
return seastar::get_units(parallelism_semaphore, 1).then([&ri, &completion, &success, &neighbors, &cf, range] (auto signal_sem) {
|
||||
auto checksum_type = repair_checksum::streamed;
|
||||
|
||||
// Ask this node, and all neighbors, to calculate checksums in
|
||||
// this range. When all are done, compare the results, and if
|
||||
// there are any differences, sync the content of this range.
|
||||
std::vector<future<partition_checksum>> checksums;
|
||||
checksums.reserve(1 + neighbors.size());
|
||||
checksums.push_back(checksum_range(ri.db, ri.keyspace, cf, range, checksum_type));
|
||||
for (auto&& neighbor : neighbors) {
|
||||
checksums.push_back(
|
||||
ri.messaging.local().send_repair_checksum_range(
|
||||
netw::msg_addr{neighbor}, ri.keyspace, cf, range, checksum_type));
|
||||
}
|
||||
|
||||
completion.enter();
|
||||
auto leave = defer([&completion] { completion.leave(); });
|
||||
|
||||
// Do it in the background.
|
||||
(void)when_all(checksums.begin(), checksums.end()).then(
|
||||
[&ri, &cf, range, &neighbors, &success]
|
||||
(std::vector<future<partition_checksum>> checksums) {
|
||||
// If only some of the replicas of this range are alive,
|
||||
// we set success=false so repair will fail, but we can
|
||||
// still do our best to repair available replicas.
|
||||
std::vector<gms::inet_address> live_neighbors;
|
||||
std::vector<partition_checksum> live_neighbors_checksum;
|
||||
bool local_checksum_failed = false;
|
||||
for (unsigned i = 0; i < checksums.size(); i++) {
|
||||
if (checksums[i].failed()) {
|
||||
local_checksum_failed |= (i == 0);
|
||||
rlogger.warn(
|
||||
"Checksum of ks={}, table={}, range={} on {} failed: {}",
|
||||
ri.keyspace, cf, range,
|
||||
(i ? neighbors[i-1] :
|
||||
utils::fb_utilities::get_broadcast_address()),
|
||||
checksums[i].get_exception());
|
||||
success = false;
|
||||
ri.nr_failed_ranges++;
|
||||
// Do not break out of the loop here, so we can log
|
||||
// (and discard) all the exceptions.
|
||||
} else if (i > 0) {
|
||||
live_neighbors.push_back(neighbors[i - 1]);
|
||||
live_neighbors_checksum.push_back(checksums[i].get0());
|
||||
}
|
||||
}
|
||||
if (local_checksum_failed || live_neighbors.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// If one of the available checksums is different, repair
|
||||
// all the neighbors which returned a checksum.
|
||||
auto checksum0 = checksums[0].get0();
|
||||
std::vector<gms::inet_address> live_neighbors_in(live_neighbors);
|
||||
std::vector<gms::inet_address> live_neighbors_out(live_neighbors);
|
||||
|
||||
std::unordered_map<partition_checksum, std::vector<gms::inet_address>> checksum_map;
|
||||
for (size_t idx = 0 ; idx < live_neighbors.size(); idx++) {
|
||||
checksum_map[live_neighbors_checksum[idx]].emplace_back(live_neighbors[idx]);
|
||||
}
|
||||
|
||||
auto node_reducer = [] (std::vector<gms::inet_address>& live_neighbors_in_or_out,
|
||||
std::vector<gms::inet_address>& nodes_with_same_checksum, size_t nr_nodes_to_keep) {
|
||||
// nodes_with_same_checksum contains two types of nodes:
|
||||
// 1) the nodes we want to remove from live_neighbors_in_or_out.
|
||||
// 2) the nodes, nr_nodes_to_keep in number, not to remove from
|
||||
// live_neighbors_in_or_out
|
||||
auto nr_nodes = nodes_with_same_checksum.size();
|
||||
if (nr_nodes <= nr_nodes_to_keep) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (nr_nodes_to_keep == 0) {
|
||||
// All nodes in nodes_with_same_checksum will be removed from live_neighbors_in_or_out
|
||||
} else if (nr_nodes_to_keep == 1) {
|
||||
auto node_is_remote = [] (gms::inet_address ip) { return !service::get_local_storage_service().is_local_dc(ip); };
|
||||
boost::partition(nodes_with_same_checksum, node_is_remote);
|
||||
nodes_with_same_checksum.resize(nr_nodes - nr_nodes_to_keep);
|
||||
} else {
|
||||
throw std::runtime_error(format("nr_nodes_to_keep = {}, but it can only be 1 or 0", nr_nodes_to_keep));
|
||||
}
|
||||
|
||||
// Now, nodes_with_same_checksum contains nodes we want to remove, remove it from live_neighbors_in_or_out
|
||||
auto it = boost::range::remove_if(live_neighbors_in_or_out, [&nodes_with_same_checksum] (const auto& ip) {
|
||||
return boost::algorithm::any_of_equal(nodes_with_same_checksum, ip);
|
||||
});
|
||||
live_neighbors_in_or_out.erase(it, live_neighbors_in_or_out.end());
|
||||
};
|
||||
|
||||
// Reduce in traffic
|
||||
for (auto& item : checksum_map) {
|
||||
auto& sum = item.first;
|
||||
auto nodes_with_same_checksum = item.second;
|
||||
// If remote nodes have the same checksum, fetch only from one of them
|
||||
size_t nr_nodes_to_fetch = 1;
|
||||
// If remote nodes have zero checksum or have the same
|
||||
// checksum as local checksum, do not fetch from them at all
|
||||
if (sum == partition_checksum() || sum == checksum0) {
|
||||
nr_nodes_to_fetch = 0;
|
||||
}
|
||||
// E.g.,
|
||||
// Local Remote1 Remote2 Remote3
|
||||
// 5 5 5 5 : IN: 0
|
||||
// 5 5 5 0 : IN: 0
|
||||
// 5 5 0 0 : IN: 0
|
||||
// 5 0 0 0 : IN: 0
|
||||
// 0 5 5 5 : IN: 1
|
||||
// 0 5 5 0 : IN: 1
|
||||
// 0 5 0 0 : IN: 1
|
||||
// 0 0 0 0 : IN: 0
|
||||
// 3 5 5 3 : IN: 1
|
||||
// 3 5 3 3 : IN: 1
|
||||
// 3 3 3 3 : IN: 0
|
||||
// 3 5 4 3 : IN: 2
|
||||
node_reducer(live_neighbors_in, nodes_with_same_checksum, nr_nodes_to_fetch);
|
||||
}
|
||||
|
||||
// Reduce out traffic
|
||||
if (live_neighbors_in.empty()) {
|
||||
for (auto& item : checksum_map) {
|
||||
auto& sum = item.first;
|
||||
auto nodes_with_same_checksum = item.second;
|
||||
// Skip to send to the nodes with the same checksum as local node
|
||||
// E.g.,
|
||||
// Local Remote1 Remote2 Remote3
|
||||
// 5 5 5 5 : IN: 0 OUT: 0 SKIP_OUT: Remote1, Remote2, Remote3
|
||||
// 5 5 5 0 : IN: 0 OUT: 1 SKIP_OUT: Remote1, Remote2
|
||||
// 5 5 0 0 : IN: 0 OUT: 2 SKIP_OUT: Remote1
|
||||
// 5 0 0 0 : IN: 0 OUT: 3 SKIP_OUT: None
|
||||
// 0 0 0 0 : IN: 0 OUT: 0 SKIP_OUT: Remote1, Remote2, Remote3
|
||||
if (sum == checksum0) {
|
||||
size_t nr_nodes_to_send = 0;
|
||||
node_reducer(live_neighbors_out, nodes_with_same_checksum, nr_nodes_to_send);
|
||||
}
|
||||
}
|
||||
} else if (live_neighbors_in.size() == 1 && checksum0 == partition_checksum()) {
|
||||
for (auto& item : checksum_map) {
|
||||
auto& sum = item.first;
|
||||
auto nodes_with_same_checksum = item.second;
|
||||
// Skip to send to the nodes with none zero checksum
|
||||
// E.g.,
|
||||
// Local Remote1 Remote2 Remote3
|
||||
// 0 5 5 5 : IN: 1 OUT: 0 SKIP_OUT: Remote1, Remote2, Remote3
|
||||
// 0 5 5 0 : IN: 1 OUT: 1 SKIP_OUT: Remote1, Remote2
|
||||
// 0 5 0 0 : IN: 1 OUT: 2 SKIP_OUT: Remote1
|
||||
if (sum != checksum0) {
|
||||
size_t nr_nodes_to_send = 0;
|
||||
node_reducer(live_neighbors_out, nodes_with_same_checksum, nr_nodes_to_send);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!(live_neighbors_in.empty() && live_neighbors_out.empty())) {
|
||||
rlogger.debug("Found differing ks={}, table={}, range={} on nodes={}, in = {}, out = {}",
|
||||
ri.keyspace, cf, range, live_neighbors, live_neighbors_in, live_neighbors_out);
|
||||
ri.check_in_abort();
|
||||
return ri.request_transfer_ranges(cf, range, live_neighbors_in, live_neighbors_out);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([&ri, &success, &cf, range, leave = std::move(leave),
|
||||
signal_sem = std::move(signal_sem)] (std::exception_ptr eptr) {
|
||||
// Something above (e.g., request_transfer_ranges) failed. We could
|
||||
// stop the repair immediately, or let it continue with
|
||||
// other ranges (at the moment, we do the latter). But in
|
||||
// any case, we need to remember that the repair failed to
|
||||
// tell the caller.
|
||||
success = false;
|
||||
ri.nr_failed_ranges++;
|
||||
rlogger.warn("Failed to sync ks={}, table={}, range={}: {}", ri.keyspace, cf, range, eptr);
|
||||
});
|
||||
});
|
||||
}).finally([&success, &completion, &ri, &cf] {
|
||||
return completion.close().then([&success, &ri, &cf] {
|
||||
if (!success) {
|
||||
rlogger.warn("Checksum or sync of partial range failed, ks={}, table={}", ri.keyspace, cf);
|
||||
}
|
||||
// We probably want the repair contiunes even if some
|
||||
// ranges fail to do the checksum. We need to set the
|
||||
// per-repair success flag to false and report after the
|
||||
// streaming is done.
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Repair a single local range, multiple column families.
|
||||
// Comparable to RepairSession in Origin
|
||||
static future<> repair_range(repair_info& ri, const dht::token_range& range) {
|
||||
@@ -1184,20 +978,17 @@ static future<> repair_range(repair_info& ri, const dht::token_range& range) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
ri._sub_ranges_nr++;
|
||||
if (ri.row_level_repair()) {
|
||||
if (ri.dropped_tables.contains(cf)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return repair_cf_range_row_level(ri, cf, table_id, range, neighbors).handle_exception_type([&ri, cf] (no_such_column_family&) mutable {
|
||||
ri.dropped_tables.insert(cf);
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([&ri] (std::exception_ptr ep) mutable {
|
||||
ri.nr_failed_ranges++;
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
} else {
|
||||
return repair_cf_range(ri, cf, range, neighbors);
|
||||
// Row level repair
|
||||
if (ri.dropped_tables.contains(cf)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return repair_cf_range_row_level(ri, cf, table_id, range, neighbors).handle_exception_type([&ri, cf] (no_such_column_family&) mutable {
|
||||
ri.dropped_tables.insert(cf);
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([&ri] (std::exception_ptr ep) mutable {
|
||||
ri.nr_failed_ranges++;
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1474,54 +1265,15 @@ private:
|
||||
|
||||
|
||||
static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
if (ri->row_level_repair()) {
|
||||
// repair all the ranges in limited parallelism
|
||||
return parallel_for_each(ri->ranges, [ri] (auto&& range) {
|
||||
return with_semaphore(repair_tracker().range_parallelism_semaphore(), 1, [ri, &range] {
|
||||
check_in_shutdown();
|
||||
ri->check_in_abort();
|
||||
ri->ranges_index++;
|
||||
rlogger.info("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}",
|
||||
ri->ranges_index, ri->ranges.size(), ri->id, ri->shard, ri->keyspace, ri->table_names(), range);
|
||||
return repair_range(*ri, range).then([ri] {
|
||||
if (ri->reason == streaming::stream_reason::bootstrap) {
|
||||
_node_ops_metrics.bootstrap_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::replace) {
|
||||
_node_ops_metrics.replace_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::rebuild) {
|
||||
_node_ops_metrics.rebuild_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::decommission) {
|
||||
_node_ops_metrics.decommission_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::removenode) {
|
||||
_node_ops_metrics.removenode_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::repair) {
|
||||
_node_ops_metrics.repair_finished_ranges_sum++;
|
||||
ri->nr_ranges_finished++;
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
} else {
|
||||
// repair all the ranges in sequence
|
||||
return do_for_each(ri->ranges, [ri] (auto&& range) {
|
||||
// repair all the ranges in limited parallelism
|
||||
return parallel_for_each(ri->ranges, [ri] (auto&& range) {
|
||||
return with_semaphore(repair_tracker().range_parallelism_semaphore(), 1, [ri, &range] {
|
||||
check_in_shutdown();
|
||||
ri->check_in_abort();
|
||||
ri->ranges_index++;
|
||||
rlogger.info("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}",
|
||||
ri->ranges_index, ri->ranges.size(), ri->id, ri->shard, ri->keyspace, ri->table_names(), range);
|
||||
return do_with(dht::selective_token_range_sharder(ri->sharder, range, ri->shard), [ri] (auto& sharder) {
|
||||
return repeat([ri, &sharder] () {
|
||||
check_in_shutdown();
|
||||
ri->check_in_abort();
|
||||
auto range_shard = sharder.next();
|
||||
if (range_shard) {
|
||||
return repair_range(*ri, *range_shard).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
}).then([ri] {
|
||||
return repair_range(*ri, range).then([ri] {
|
||||
if (ri->reason == streaming::stream_reason::bootstrap) {
|
||||
_node_ops_metrics.bootstrap_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::replace) {
|
||||
@@ -1529,7 +1281,7 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
} else if (ri->reason == streaming::stream_reason::rebuild) {
|
||||
_node_ops_metrics.rebuild_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::decommission) {
|
||||
_node_ops_metrics.decommission_finished_ranges++;
|
||||
_node_ops_metrics.decommission_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::removenode) {
|
||||
_node_ops_metrics.removenode_finished_ranges++;
|
||||
} else if (ri->reason == streaming::stream_reason::repair) {
|
||||
@@ -1537,13 +1289,8 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
ri->nr_ranges_finished++;
|
||||
}
|
||||
});
|
||||
}).then([ri] {
|
||||
// Do streaming for the remaining ranges we do not stream in
|
||||
// repair_cf_range
|
||||
ri->check_in_abort();
|
||||
return ri->do_streaming();
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// repair_ranges repairs a list of token ranges, each assumed to be a token
|
||||
|
||||
@@ -253,7 +253,6 @@ public:
|
||||
lw_shared_ptr<streaming::stream_plan> _sp_in;
|
||||
lw_shared_ptr<streaming::stream_plan> _sp_out;
|
||||
repair_stats _stats;
|
||||
bool _row_level_repair;
|
||||
uint64_t _sub_ranges_nr = 0;
|
||||
std::unordered_set<sstring> dropped_tables;
|
||||
std::optional<utils::UUID> _ops_uuid;
|
||||
@@ -281,9 +280,6 @@ public:
|
||||
void update_statistics(const repair_stats& stats) {
|
||||
_stats.add(stats);
|
||||
}
|
||||
bool row_level_repair() {
|
||||
return _row_level_repair;
|
||||
}
|
||||
const std::vector<sstring>& table_names() {
|
||||
return cfs;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user