repair: Do not abort the repair when one range is failed
failed_ranges is added to track the ranges that fail during repair.
This commit is contained in:
@@ -186,6 +186,10 @@ static std::vector<gms::inet_address> get_neighbors(database& db,
|
||||
#endif
|
||||
}
|
||||
|
||||
struct failed_range {
|
||||
sstring cf;
|
||||
::range<dht::token> range;
|
||||
};
|
||||
|
||||
// The repair_tracker tracks ongoing repair operations and their progress.
|
||||
// A repair which has already finished successfully is dropped from this
|
||||
@@ -465,7 +469,8 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
sstring keyspace, sstring cf, ::range<dht::token> range,
|
||||
std::vector<gms::inet_address>& neighbors,
|
||||
streaming::stream_plan& sp_in,
|
||||
streaming::stream_plan& sp_out) {
|
||||
streaming::stream_plan& sp_out,
|
||||
std::vector<failed_range>& failed_ranges) {
|
||||
if (neighbors.empty()) {
|
||||
// Nothing to do in this case...
|
||||
return make_ready_future<>();
|
||||
@@ -507,12 +512,12 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
}
|
||||
|
||||
return do_with(seastar::gate(), true, std::move(keyspace), std::move(cf), std::move(ranges),
|
||||
[&db, &neighbors, &sp_in, &sp_out] (auto& completion, auto& success, const auto& keyspace, const auto& cf, const auto& ranges) {
|
||||
return do_for_each(ranges, [&completion, &success, &db, &neighbors, &keyspace, &cf, &sp_in, &sp_out]
|
||||
[&db, &neighbors, &sp_in, &sp_out, &failed_ranges] (auto& completion, auto& success, const auto& keyspace, const auto& cf, const auto& ranges) {
|
||||
return do_for_each(ranges, [&completion, &success, &db, &neighbors, &keyspace, &cf, &sp_in, &sp_out, &failed_ranges]
|
||||
(const auto& range) {
|
||||
|
||||
check_in_shutdown();
|
||||
return parallelism_semaphore.wait(1).then([&completion, &success, &db, &neighbors, &keyspace, &cf, &range, &sp_in, &sp_out] {
|
||||
return parallelism_semaphore.wait(1).then([&completion, &success, &db, &neighbors, &keyspace, &cf, &range, &sp_in, &sp_out, &failed_ranges] {
|
||||
auto checksum_type = service::get_local_storage_service().cluster_supports_large_partitions()
|
||||
? repair_checksum::streamed : repair_checksum::legacy;
|
||||
|
||||
@@ -530,7 +535,7 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
|
||||
completion.enter();
|
||||
when_all(checksums.begin(), checksums.end()).then(
|
||||
[&db, &keyspace, &cf, &range, &neighbors, &success, &sp_in, &sp_out]
|
||||
[&db, &keyspace, &cf, &range, &neighbors, &success, &sp_in, &sp_out, &failed_ranges]
|
||||
(std::vector<future<partition_checksum>> checksums) {
|
||||
// If only some of the replicas of this range are alive,
|
||||
// we set success=false so repair will fail, but we can
|
||||
@@ -545,6 +550,7 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
utils::fb_utilities::get_broadcast_address()),
|
||||
checksums[i].get_exception());
|
||||
success = false;
|
||||
failed_ranges.push_back(failed_range{cf, range});
|
||||
// Do not break out of the loop here, so we can log
|
||||
// (and discard) all the exceptions.
|
||||
} else if (i > 0) {
|
||||
@@ -565,13 +571,14 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([&success, &range] (std::exception_ptr eptr) {
|
||||
}).handle_exception([&success, &cf, &range, &failed_ranges] (std::exception_ptr eptr) {
|
||||
// Something above (e.g., sync_range) failed. We could
|
||||
// stop the repair immediately, or let it continue with
|
||||
// other ranges (at the moment, we do the latter). But in
|
||||
// any case, we need to remember that the repair failed to
|
||||
// tell the caller.
|
||||
success = false;
|
||||
failed_ranges.push_back(failed_range{cf, range});
|
||||
logger.warn("Failed sync of range {}: {}", range, eptr);
|
||||
}).finally([&completion] {
|
||||
parallelism_semaphore.signal(1);
|
||||
@@ -580,8 +587,14 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
|
||||
});
|
||||
}).finally([&success, &completion] {
|
||||
return completion.close().then([&success] {
|
||||
return success ? make_ready_future<>() :
|
||||
make_exception_future<>(std::runtime_error("Checksum or sync of partial range failed"));
|
||||
if (!success) {
|
||||
logger.warn("Checksum or sync of partial range failed");
|
||||
}
|
||||
// We probably want the repair contiunes even if some
|
||||
// ranges fail to do the checksum. We need to set the
|
||||
// per-repair success flag to false and report after the
|
||||
// streaming is done.
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -594,14 +607,15 @@ static future<> repair_range(seastar::sharded<database>& db, sstring keyspace,
|
||||
const std::vector<sstring>& data_centers,
|
||||
const std::vector<sstring>& hosts,
|
||||
streaming::stream_plan& sp_in,
|
||||
streaming::stream_plan& sp_out) {
|
||||
streaming::stream_plan& sp_out,
|
||||
std::vector<failed_range>& failed_ranges) {
|
||||
auto id = utils::UUID_gen::get_time_UUID();
|
||||
return do_with(get_neighbors(db.local(), keyspace, range, data_centers, hosts),
|
||||
[&sp_in, &sp_out, &db, &cfs, keyspace, id, range] (auto& neighbors) {
|
||||
[&sp_in, &sp_out, &failed_ranges, &db, &cfs, keyspace, id, range] (auto& neighbors) {
|
||||
logger.debug("[repair #{}] new session: will sync {} on range {} for {}.{}", id, neighbors, range, keyspace, cfs);
|
||||
return do_for_each(cfs.begin(), cfs.end(),
|
||||
[&db, keyspace, &neighbors, id, range, &sp_in, &sp_out] (auto&& cf) {
|
||||
return repair_cf_range(db, keyspace, cf, range, neighbors, sp_in, sp_out);
|
||||
[&db, keyspace, &neighbors, id, range, &sp_in, &sp_out, &failed_ranges] (auto&& cf) {
|
||||
return repair_cf_range(db, keyspace, cf, range, neighbors, sp_in, sp_out, failed_ranges);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -801,28 +815,37 @@ static future<> repair_ranges(seastar::sharded<database>& db, sstring keyspace,
|
||||
std::vector<sstring> data_centers, std::vector<sstring> hosts) {
|
||||
return do_with(streaming::stream_plan("repair-in"),
|
||||
streaming::stream_plan("repair-out"),
|
||||
std::vector<failed_range>(),
|
||||
std::move(ranges), std::move(keyspace), std::move(cfs),
|
||||
std::move(data_centers), std::move(hosts),
|
||||
[&db, id] (auto& sp_in, auto& sp_out, auto& ranges, auto& keyspace, auto& cfs, auto& data_centers, auto& hosts) {
|
||||
[&db, id] (auto& sp_in, auto& sp_out, auto& failed_ranges, auto& ranges, auto& keyspace, auto& cfs, auto& data_centers, auto& hosts) {
|
||||
#if 1
|
||||
// repair all the ranges in parallel
|
||||
return parallel_for_each(ranges.begin(), ranges.end(), [&sp_in, &sp_out, &db, keyspace, &cfs, &data_centers, &hosts, id] (auto&& range) {
|
||||
return parallel_for_each(ranges.begin(), ranges.end(), [&sp_in, &sp_out, &failed_ranges, &db, keyspace, &cfs, &data_centers, &hosts, id] (auto&& range) {
|
||||
#else
|
||||
// repair all the ranges in sequence
|
||||
return do_for_each(ranges.begin(), ranges.end(), [&sp_in, &sp_out, &db, keyspace, &cfs, &data_centers, &hosts, id] (auto&& range) {
|
||||
return do_for_each(ranges.begin(), ranges.end(), [&sp_in, &sp_out, &failed_ranges, &db, keyspace, &cfs, &data_centers, &hosts, id] (auto&& range) {
|
||||
#endif
|
||||
check_in_shutdown();
|
||||
return repair_range(db, keyspace, range, cfs, data_centers, hosts, sp_in, sp_out);
|
||||
}).then([&sp_in, &sp_out] {
|
||||
return repair_range(db, keyspace, range, cfs, data_centers, hosts, sp_in, sp_out, failed_ranges);
|
||||
}).then([&sp_in, &sp_out, &failed_ranges] {
|
||||
return sp_in.execute().discard_result().then([&sp_out] {
|
||||
return sp_out.execute().discard_result();
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("repair's stream failed: {}", ep);
|
||||
return make_exception_future(ep);
|
||||
});
|
||||
}).then([id] {
|
||||
logger.info("repair {} completed sucessfully", id);
|
||||
repair_tracker.done(id, true);
|
||||
}).then([id, &failed_ranges] {
|
||||
if (failed_ranges.empty()) {
|
||||
logger.info("repair {} completed sucessfully", id);
|
||||
repair_tracker.done(id, true);
|
||||
} else {
|
||||
for (auto& frange: failed_ranges) {
|
||||
logger.debug("repair cf {} range {} failed", frange.cf, frange.range);
|
||||
}
|
||||
logger.info("repair {} failed - {} ranges failed", id, failed_ranges.size());
|
||||
repair_tracker.done(id, false);
|
||||
}
|
||||
}).handle_exception([id] (std::exception_ptr eptr) {
|
||||
logger.info("repair {} failed - {}", id, eptr);
|
||||
repair_tracker.done(id, false);
|
||||
|
||||
Reference in New Issue
Block a user