repair: stop relying on ROW_LEVEL_REPAIR feature

The feature is going away because it's over 2 years old,
so the code which depended on it becomes unconditional.
This commit is contained in:
Piotr Sarna
2021-03-08 10:54:38 +01:00
parent 115324f71a
commit 80ebedd242
2 changed files with 17 additions and 274 deletions

View File

@@ -826,7 +826,6 @@ repair_info::repair_info(seastar::sharded<database>& db_,
, ignore_nodes(ignore_nodes_)
, reason(reason_)
, nr_ranges_total(ranges.size())
, _row_level_repair(db.local().features().cluster_supports_row_level_repair())
, _ops_uuid(std::move(ops_uuid)) {
}
@@ -934,211 +933,6 @@ repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range
neighbors[range];
}
// Repair a single cf in a single local range.
// Comparable to RepairJob in Origin.
static future<> repair_cf_range(repair_info& ri,
sstring cf, ::dht::token_range range,
const std::vector<gms::inet_address>& neighbors) {
if (neighbors.empty()) {
// Nothing to do in this case...
return make_ready_future<>();
}
ri.check_in_abort();
return estimate_partitions(ri.db, ri.keyspace, cf, range).then([&ri, cf, range, &neighbors] (uint64_t estimated_partitions) {
range_splitter ranges(range, estimated_partitions, ri.target_partitions);
return do_with(seastar::gate(), true, std::move(cf), std::move(ranges),
[&ri, &neighbors] (auto& completion, auto& success, const auto& cf, auto& ranges) {
return do_until([&ranges] () { return !ranges.has_next(); },
[&ranges, &ri, &completion, &success, &neighbors, &cf] () {
auto range = ranges.next();
check_in_shutdown();
ri.check_in_abort();
return seastar::get_units(parallelism_semaphore, 1).then([&ri, &completion, &success, &neighbors, &cf, range] (auto signal_sem) {
auto checksum_type = repair_checksum::streamed;
// Ask this node, and all neighbors, to calculate checksums in
// this range. When all are done, compare the results, and if
// there are any differences, sync the content of this range.
std::vector<future<partition_checksum>> checksums;
checksums.reserve(1 + neighbors.size());
checksums.push_back(checksum_range(ri.db, ri.keyspace, cf, range, checksum_type));
for (auto&& neighbor : neighbors) {
checksums.push_back(
ri.messaging.local().send_repair_checksum_range(
netw::msg_addr{neighbor}, ri.keyspace, cf, range, checksum_type));
}
completion.enter();
auto leave = defer([&completion] { completion.leave(); });
// Do it in the background.
(void)when_all(checksums.begin(), checksums.end()).then(
[&ri, &cf, range, &neighbors, &success]
(std::vector<future<partition_checksum>> checksums) {
// If only some of the replicas of this range are alive,
// we set success=false so repair will fail, but we can
// still do our best to repair available replicas.
std::vector<gms::inet_address> live_neighbors;
std::vector<partition_checksum> live_neighbors_checksum;
bool local_checksum_failed = false;
for (unsigned i = 0; i < checksums.size(); i++) {
if (checksums[i].failed()) {
local_checksum_failed |= (i == 0);
rlogger.warn(
"Checksum of ks={}, table={}, range={} on {} failed: {}",
ri.keyspace, cf, range,
(i ? neighbors[i-1] :
utils::fb_utilities::get_broadcast_address()),
checksums[i].get_exception());
success = false;
ri.nr_failed_ranges++;
// Do not break out of the loop here, so we can log
// (and discard) all the exceptions.
} else if (i > 0) {
live_neighbors.push_back(neighbors[i - 1]);
live_neighbors_checksum.push_back(checksums[i].get0());
}
}
if (local_checksum_failed || live_neighbors.empty()) {
return make_ready_future<>();
}
// If one of the available checksums is different, repair
// all the neighbors which returned a checksum.
auto checksum0 = checksums[0].get0();
std::vector<gms::inet_address> live_neighbors_in(live_neighbors);
std::vector<gms::inet_address> live_neighbors_out(live_neighbors);
std::unordered_map<partition_checksum, std::vector<gms::inet_address>> checksum_map;
for (size_t idx = 0 ; idx < live_neighbors.size(); idx++) {
checksum_map[live_neighbors_checksum[idx]].emplace_back(live_neighbors[idx]);
}
auto node_reducer = [] (std::vector<gms::inet_address>& live_neighbors_in_or_out,
std::vector<gms::inet_address>& nodes_with_same_checksum, size_t nr_nodes_to_keep) {
// nodes_with_same_checksum contains two types of nodes:
// 1) the nodes we want to remove from live_neighbors_in_or_out.
// 2) the nodes, nr_nodes_to_keep in number, not to remove from
// live_neighbors_in_or_out
auto nr_nodes = nodes_with_same_checksum.size();
if (nr_nodes <= nr_nodes_to_keep) {
return;
}
if (nr_nodes_to_keep == 0) {
// All nodes in nodes_with_same_checksum will be removed from live_neighbors_in_or_out
} else if (nr_nodes_to_keep == 1) {
auto node_is_remote = [] (gms::inet_address ip) { return !service::get_local_storage_service().is_local_dc(ip); };
boost::partition(nodes_with_same_checksum, node_is_remote);
nodes_with_same_checksum.resize(nr_nodes - nr_nodes_to_keep);
} else {
throw std::runtime_error(format("nr_nodes_to_keep = {}, but it can only be 1 or 0", nr_nodes_to_keep));
}
// Now, nodes_with_same_checksum contains nodes we want to remove, remove it from live_neighbors_in_or_out
auto it = boost::range::remove_if(live_neighbors_in_or_out, [&nodes_with_same_checksum] (const auto& ip) {
return boost::algorithm::any_of_equal(nodes_with_same_checksum, ip);
});
live_neighbors_in_or_out.erase(it, live_neighbors_in_or_out.end());
};
// Reduce in traffic
for (auto& item : checksum_map) {
auto& sum = item.first;
auto nodes_with_same_checksum = item.second;
// If remote nodes have the same checksum, fetch only from one of them
size_t nr_nodes_to_fetch = 1;
// If remote nodes have zero checksum or have the same
// checksum as local checksum, do not fetch from them at all
if (sum == partition_checksum() || sum == checksum0) {
nr_nodes_to_fetch = 0;
}
// E.g.,
// Local Remote1 Remote2 Remote3
// 5 5 5 5 : IN: 0
// 5 5 5 0 : IN: 0
// 5 5 0 0 : IN: 0
// 5 0 0 0 : IN: 0
// 0 5 5 5 : IN: 1
// 0 5 5 0 : IN: 1
// 0 5 0 0 : IN: 1
// 0 0 0 0 : IN: 0
// 3 5 5 3 : IN: 1
// 3 5 3 3 : IN: 1
// 3 3 3 3 : IN: 0
// 3 5 4 3 : IN: 2
node_reducer(live_neighbors_in, nodes_with_same_checksum, nr_nodes_to_fetch);
}
// Reduce out traffic
if (live_neighbors_in.empty()) {
for (auto& item : checksum_map) {
auto& sum = item.first;
auto nodes_with_same_checksum = item.second;
// Skip to send to the nodes with the same checksum as local node
// E.g.,
// Local Remote1 Remote2 Remote3
// 5 5 5 5 : IN: 0 OUT: 0 SKIP_OUT: Remote1, Remote2, Remote3
// 5 5 5 0 : IN: 0 OUT: 1 SKIP_OUT: Remote1, Remote2
// 5 5 0 0 : IN: 0 OUT: 2 SKIP_OUT: Remote1
// 5 0 0 0 : IN: 0 OUT: 3 SKIP_OUT: None
// 0 0 0 0 : IN: 0 OUT: 0 SKIP_OUT: Remote1, Remote2, Remote3
if (sum == checksum0) {
size_t nr_nodes_to_send = 0;
node_reducer(live_neighbors_out, nodes_with_same_checksum, nr_nodes_to_send);
}
}
} else if (live_neighbors_in.size() == 1 && checksum0 == partition_checksum()) {
for (auto& item : checksum_map) {
auto& sum = item.first;
auto nodes_with_same_checksum = item.second;
// Skip to send to the nodes with none zero checksum
// E.g.,
// Local Remote1 Remote2 Remote3
// 0 5 5 5 : IN: 1 OUT: 0 SKIP_OUT: Remote1, Remote2, Remote3
// 0 5 5 0 : IN: 1 OUT: 1 SKIP_OUT: Remote1, Remote2
// 0 5 0 0 : IN: 1 OUT: 2 SKIP_OUT: Remote1
if (sum != checksum0) {
size_t nr_nodes_to_send = 0;
node_reducer(live_neighbors_out, nodes_with_same_checksum, nr_nodes_to_send);
}
}
}
if (!(live_neighbors_in.empty() && live_neighbors_out.empty())) {
rlogger.debug("Found differing ks={}, table={}, range={} on nodes={}, in = {}, out = {}",
ri.keyspace, cf, range, live_neighbors, live_neighbors_in, live_neighbors_out);
ri.check_in_abort();
return ri.request_transfer_ranges(cf, range, live_neighbors_in, live_neighbors_out);
}
return make_ready_future<>();
}).handle_exception([&ri, &success, &cf, range, leave = std::move(leave),
signal_sem = std::move(signal_sem)] (std::exception_ptr eptr) {
// Something above (e.g., request_transfer_ranges) failed. We could
// stop the repair immediately, or let it continue with
// other ranges (at the moment, we do the latter). But in
// any case, we need to remember that the repair failed to
// tell the caller.
success = false;
ri.nr_failed_ranges++;
rlogger.warn("Failed to sync ks={}, table={}, range={}: {}", ri.keyspace, cf, range, eptr);
});
});
}).finally([&success, &completion, &ri, &cf] {
return completion.close().then([&success, &ri, &cf] {
if (!success) {
rlogger.warn("Checksum or sync of partial range failed, ks={}, table={}", ri.keyspace, cf);
}
// We probably want the repair contiunes even if some
// ranges fail to do the checksum. We need to set the
// per-repair success flag to false and report after the
// streaming is done.
return make_ready_future<>();
});
});
});
});
}
// Repair a single local range, multiple column families.
// Comparable to RepairSession in Origin
static future<> repair_range(repair_info& ri, const dht::token_range& range) {
@@ -1184,20 +978,17 @@ static future<> repair_range(repair_info& ri, const dht::token_range& range) {
return make_ready_future<>();
}
ri._sub_ranges_nr++;
if (ri.row_level_repair()) {
if (ri.dropped_tables.contains(cf)) {
return make_ready_future<>();
}
return repair_cf_range_row_level(ri, cf, table_id, range, neighbors).handle_exception_type([&ri, cf] (no_such_column_family&) mutable {
ri.dropped_tables.insert(cf);
return make_ready_future<>();
}).handle_exception([&ri] (std::exception_ptr ep) mutable {
ri.nr_failed_ranges++;
return make_exception_future<>(std::move(ep));
});
} else {
return repair_cf_range(ri, cf, range, neighbors);
// Row level repair
if (ri.dropped_tables.contains(cf)) {
return make_ready_future<>();
}
return repair_cf_range_row_level(ri, cf, table_id, range, neighbors).handle_exception_type([&ri, cf] (no_such_column_family&) mutable {
ri.dropped_tables.insert(cf);
return make_ready_future<>();
}).handle_exception([&ri] (std::exception_ptr ep) mutable {
ri.nr_failed_ranges++;
return make_exception_future<>(std::move(ep));
});
});
});
});
@@ -1474,54 +1265,15 @@ private:
static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
if (ri->row_level_repair()) {
// repair all the ranges in limited parallelism
return parallel_for_each(ri->ranges, [ri] (auto&& range) {
return with_semaphore(repair_tracker().range_parallelism_semaphore(), 1, [ri, &range] {
check_in_shutdown();
ri->check_in_abort();
ri->ranges_index++;
rlogger.info("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}",
ri->ranges_index, ri->ranges.size(), ri->id, ri->shard, ri->keyspace, ri->table_names(), range);
return repair_range(*ri, range).then([ri] {
if (ri->reason == streaming::stream_reason::bootstrap) {
_node_ops_metrics.bootstrap_finished_ranges++;
} else if (ri->reason == streaming::stream_reason::replace) {
_node_ops_metrics.replace_finished_ranges++;
} else if (ri->reason == streaming::stream_reason::rebuild) {
_node_ops_metrics.rebuild_finished_ranges++;
} else if (ri->reason == streaming::stream_reason::decommission) {
_node_ops_metrics.decommission_finished_ranges++;
} else if (ri->reason == streaming::stream_reason::removenode) {
_node_ops_metrics.removenode_finished_ranges++;
} else if (ri->reason == streaming::stream_reason::repair) {
_node_ops_metrics.repair_finished_ranges_sum++;
ri->nr_ranges_finished++;
}
});
});
});
} else {
// repair all the ranges in sequence
return do_for_each(ri->ranges, [ri] (auto&& range) {
// repair all the ranges in limited parallelism
return parallel_for_each(ri->ranges, [ri] (auto&& range) {
return with_semaphore(repair_tracker().range_parallelism_semaphore(), 1, [ri, &range] {
check_in_shutdown();
ri->check_in_abort();
ri->ranges_index++;
rlogger.info("Repair {} out of {} ranges, id={}, shard={}, keyspace={}, table={}, range={}",
ri->ranges_index, ri->ranges.size(), ri->id, ri->shard, ri->keyspace, ri->table_names(), range);
return do_with(dht::selective_token_range_sharder(ri->sharder, range, ri->shard), [ri] (auto& sharder) {
return repeat([ri, &sharder] () {
check_in_shutdown();
ri->check_in_abort();
auto range_shard = sharder.next();
if (range_shard) {
return repair_range(*ri, *range_shard).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).then([ri] {
return repair_range(*ri, range).then([ri] {
if (ri->reason == streaming::stream_reason::bootstrap) {
_node_ops_metrics.bootstrap_finished_ranges++;
} else if (ri->reason == streaming::stream_reason::replace) {
@@ -1529,7 +1281,7 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
} else if (ri->reason == streaming::stream_reason::rebuild) {
_node_ops_metrics.rebuild_finished_ranges++;
} else if (ri->reason == streaming::stream_reason::decommission) {
_node_ops_metrics.decommission_finished_ranges++;
_node_ops_metrics.decommission_finished_ranges++;
} else if (ri->reason == streaming::stream_reason::removenode) {
_node_ops_metrics.removenode_finished_ranges++;
} else if (ri->reason == streaming::stream_reason::repair) {
@@ -1537,13 +1289,8 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
ri->nr_ranges_finished++;
}
});
}).then([ri] {
// Do streaming for the remaining ranges we do not stream in
// repair_cf_range
ri->check_in_abort();
return ri->do_streaming();
});
}
});
}
// repair_ranges repairs a list of token ranges, each assumed to be a token

View File

@@ -253,7 +253,6 @@ public:
lw_shared_ptr<streaming::stream_plan> _sp_in;
lw_shared_ptr<streaming::stream_plan> _sp_out;
repair_stats _stats;
bool _row_level_repair;
uint64_t _sub_ranges_nr = 0;
std::unordered_set<sstring> dropped_tables;
std::optional<utils::UUID> _ops_uuid;
@@ -281,9 +280,6 @@ public:
void update_statistics(const repair_stats& stats) {
_stats.add(stats);
}
bool row_level_repair() {
return _row_level_repair;
}
const std::vector<sstring>& table_names() {
return cfs;
}