diff --git a/repair/repair.cc b/repair/repair.cc index 75e41b7ba3..662ffca756 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -186,6 +186,10 @@ static std::vector get_neighbors(database& db, #endif } +struct failed_range { + sstring cf; + ::range range; +}; // The repair_tracker tracks ongoing repair operations and their progress. // A repair which has already finished successfully is dropped from this @@ -465,7 +469,8 @@ static future<> repair_cf_range(seastar::sharded& db, sstring keyspace, sstring cf, ::range range, std::vector& neighbors, streaming::stream_plan& sp_in, - streaming::stream_plan& sp_out) { + streaming::stream_plan& sp_out, + std::vector& failed_ranges) { if (neighbors.empty()) { // Nothing to do in this case... return make_ready_future<>(); @@ -507,12 +512,12 @@ static future<> repair_cf_range(seastar::sharded& db, } return do_with(seastar::gate(), true, std::move(keyspace), std::move(cf), std::move(ranges), - [&db, &neighbors, &sp_in, &sp_out] (auto& completion, auto& success, const auto& keyspace, const auto& cf, const auto& ranges) { - return do_for_each(ranges, [&completion, &success, &db, &neighbors, &keyspace, &cf, &sp_in, &sp_out] + [&db, &neighbors, &sp_in, &sp_out, &failed_ranges] (auto& completion, auto& success, const auto& keyspace, const auto& cf, const auto& ranges) { + return do_for_each(ranges, [&completion, &success, &db, &neighbors, &keyspace, &cf, &sp_in, &sp_out, &failed_ranges] (const auto& range) { check_in_shutdown(); - return parallelism_semaphore.wait(1).then([&completion, &success, &db, &neighbors, &keyspace, &cf, &range, &sp_in, &sp_out] { + return parallelism_semaphore.wait(1).then([&completion, &success, &db, &neighbors, &keyspace, &cf, &range, &sp_in, &sp_out, &failed_ranges] { auto checksum_type = service::get_local_storage_service().cluster_supports_large_partitions() ? repair_checksum::streamed : repair_checksum::legacy; @@ -530,7 +535,7 @@ static future<> repair_cf_range(seastar::sharded& db, completion.enter(); when_all(checksums.begin(), checksums.end()).then( - [&db, &keyspace, &cf, &range, &neighbors, &success, &sp_in, &sp_out] + [&db, &keyspace, &cf, &range, &neighbors, &success, &sp_in, &sp_out, &failed_ranges] (std::vector> checksums) { // If only some of the replicas of this range are alive, // we set success=false so repair will fail, but we can @@ -545,6 +550,7 @@ static future<> repair_cf_range(seastar::sharded& db, utils::fb_utilities::get_broadcast_address()), checksums[i].get_exception()); success = false; + failed_ranges.push_back(failed_range{cf, range}); // Do not break out of the loop here, so we can log // (and discard) all the exceptions. } else if (i > 0) { @@ -565,13 +571,14 @@ static future<> repair_cf_range(seastar::sharded& db, } } return make_ready_future<>(); - }).handle_exception([&success, &range] (std::exception_ptr eptr) { + }).handle_exception([&success, &cf, &range, &failed_ranges] (std::exception_ptr eptr) { // Something above (e.g., sync_range) failed. We could // stop the repair immediately, or let it continue with // other ranges (at the moment, we do the latter). But in // any case, we need to remember that the repair failed to // tell the caller. success = false; + failed_ranges.push_back(failed_range{cf, range}); logger.warn("Failed sync of range {}: {}", range, eptr); }).finally([&completion] { parallelism_semaphore.signal(1); @@ -580,8 +587,14 @@ static future<> repair_cf_range(seastar::sharded& db, }); }).finally([&success, &completion] { return completion.close().then([&success] { - return success ? make_ready_future<>() : - make_exception_future<>(std::runtime_error("Checksum or sync of partial range failed")); + if (!success) { + logger.warn("Checksum or sync of partial range failed"); + } + // We probably want the repair contiunes even if some + // ranges fail to do the checksum. We need to set the + // per-repair success flag to false and report after the + // streaming is done. + return make_ready_future<>(); }); }); }); @@ -594,14 +607,15 @@ static future<> repair_range(seastar::sharded& db, sstring keyspace, const std::vector& data_centers, const std::vector& hosts, streaming::stream_plan& sp_in, - streaming::stream_plan& sp_out) { + streaming::stream_plan& sp_out, + std::vector& failed_ranges) { auto id = utils::UUID_gen::get_time_UUID(); return do_with(get_neighbors(db.local(), keyspace, range, data_centers, hosts), - [&sp_in, &sp_out, &db, &cfs, keyspace, id, range] (auto& neighbors) { + [&sp_in, &sp_out, &failed_ranges, &db, &cfs, keyspace, id, range] (auto& neighbors) { logger.debug("[repair #{}] new session: will sync {} on range {} for {}.{}", id, neighbors, range, keyspace, cfs); return do_for_each(cfs.begin(), cfs.end(), - [&db, keyspace, &neighbors, id, range, &sp_in, &sp_out] (auto&& cf) { - return repair_cf_range(db, keyspace, cf, range, neighbors, sp_in, sp_out); + [&db, keyspace, &neighbors, id, range, &sp_in, &sp_out, &failed_ranges] (auto&& cf) { + return repair_cf_range(db, keyspace, cf, range, neighbors, sp_in, sp_out, failed_ranges); }); }); } @@ -801,28 +815,37 @@ static future<> repair_ranges(seastar::sharded& db, sstring keyspace, std::vector data_centers, std::vector hosts) { return do_with(streaming::stream_plan("repair-in"), streaming::stream_plan("repair-out"), + std::vector(), std::move(ranges), std::move(keyspace), std::move(cfs), std::move(data_centers), std::move(hosts), - [&db, id] (auto& sp_in, auto& sp_out, auto& ranges, auto& keyspace, auto& cfs, auto& data_centers, auto& hosts) { + [&db, id] (auto& sp_in, auto& sp_out, auto& failed_ranges, auto& ranges, auto& keyspace, auto& cfs, auto& data_centers, auto& hosts) { #if 1 // repair all the ranges in parallel - return parallel_for_each(ranges.begin(), ranges.end(), [&sp_in, &sp_out, &db, keyspace, &cfs, &data_centers, &hosts, id] (auto&& range) { + return parallel_for_each(ranges.begin(), ranges.end(), [&sp_in, &sp_out, &failed_ranges, &db, keyspace, &cfs, &data_centers, &hosts, id] (auto&& range) { #else // repair all the ranges in sequence - return do_for_each(ranges.begin(), ranges.end(), [&sp_in, &sp_out, &db, keyspace, &cfs, &data_centers, &hosts, id] (auto&& range) { + return do_for_each(ranges.begin(), ranges.end(), [&sp_in, &sp_out, &failed_ranges, &db, keyspace, &cfs, &data_centers, &hosts, id] (auto&& range) { #endif check_in_shutdown(); - return repair_range(db, keyspace, range, cfs, data_centers, hosts, sp_in, sp_out); - }).then([&sp_in, &sp_out] { + return repair_range(db, keyspace, range, cfs, data_centers, hosts, sp_in, sp_out, failed_ranges); + }).then([&sp_in, &sp_out, &failed_ranges] { return sp_in.execute().discard_result().then([&sp_out] { return sp_out.execute().discard_result(); }).handle_exception([] (auto ep) { logger.warn("repair's stream failed: {}", ep); return make_exception_future(ep); }); - }).then([id] { - logger.info("repair {} completed sucessfully", id); - repair_tracker.done(id, true); + }).then([id, &failed_ranges] { + if (failed_ranges.empty()) { + logger.info("repair {} completed sucessfully", id); + repair_tracker.done(id, true); + } else { + for (auto& frange: failed_ranges) { + logger.debug("repair cf {} range {} failed", frange.cf, frange.range); + } + logger.info("repair {} failed - {} ranges failed", id, failed_ranges.size()); + repair_tracker.done(id, false); + } }).handle_exception([id] (std::exception_ptr eptr) { logger.info("repair {} failed - {}", id, eptr); repair_tracker.done(id, false);