diff --git a/repair/repair.cc b/repair/repair.cc index cc66fa6750..bda2849f48 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -24,6 +24,7 @@ #include "streaming/stream_plan.hh" #include "streaming/stream_state.hh" #include "gms/inet_address.hh" +#include "db/config.hh" #include #include @@ -200,47 +201,6 @@ public: } } repair_tracker; -// repair_start() can run on any cpu; It runs on cpu0 the function -// do_repair_start(). The benefit of always running that function on the same -// CPU is that it allows us to keep some state (like a list of ongoing -// repairs). It is fine to always do this on one CPU, because the function -// itself does very little (mainly tell other nodes and CPUs what to do). - -// Repair a single range. Comparable to RepairSession in Origin -// In Origin, this is composed of several "repair jobs", each with one cf, -// but our streaming already works for several cfs. -static future<> repair_range(seastar::sharded& db, sstring keyspace, - query::range range, std::vector cfs) { - auto sp = make_lw_shared("repair"); - auto id = utils::UUID_gen::get_time_UUID(); - - auto neighbors = get_neighbors(db.local(), keyspace, range); - logger.info("[repair #{}] new session: will sync {} on range {} for {}.{}", id, neighbors, range, keyspace, cfs); - for (auto peer : neighbors) { - // FIXME: obviously, we'll need Merkel trees or another alternative - // method to decide which parts of the data we need to stream instead - // of streaming everything like we do now. So this logging is kind of - // silly, and we never log the corresponding "... is consistent with" - // message: see SyncTask.run() in Origin for the original messages. - auto me = utils::fb_utilities::get_broadcast_address(); - for (auto &&cf : cfs) { - logger.info("[repair #{}] Endpoints {} and {} have {} range(s) out of sync for {}", id, me, peer, 1, cf); - } - - // FIXME: think: if we have several neighbors, perhaps we need to - // request ranges from all of them and only later transfer ranges to - // all of them? Otherwise, we won't necessarily fully repair the - // other ndoes, just this one? What does Cassandra do here? - sp->transfer_ranges(peer, keyspace, {range}, cfs); - sp->request_ranges(peer, keyspace, {range}, cfs); - } - return sp->execute().discard_result().then([sp, id] { - logger.info("repair session #{} successful", id); - }).handle_exception([id] (auto ep) { - logger.error("repair session #{} stream failed: {}", id, ep); - return make_exception_future(std::runtime_error("repair_range failed")); - }); -} partition_checksum::partition_checksum(const mutation& m) { auto frozen = freeze(m); @@ -387,6 +347,151 @@ static future<> sync_range(seastar::sharded& db, }); }); } +static void split_and_add(std::vector<::range>& ranges, + const range& range, + uint64_t estimated_partitions, uint64_t target_partitions) { + if (estimated_partitions < target_partitions) { + // We're done, the range is small enough to not be split further + ranges.push_back(range); + return; + } + // The use of minimum_token() here twice is not a typo - because wrap- + // around token ranges are supported by midpoint(), the beyond-maximum + // token can also be represented by minimum_token(). + auto midpoint = dht::global_partitioner().midpoint( + range.start() ? range.start()->value() : dht::minimum_token(), + range.end() ? range.end()->value() : dht::minimum_token()); + auto halves = range.split(midpoint, dht::token_comparator()); + ranges.push_back(halves.first); + ranges.push_back(halves.second); +} + +// Repair a single cf in a single local range. +// Comparable to RepairJob in Origin. +static future<> repair_cf_range(seastar::sharded& db, + sstring keyspace, sstring cf, ::range range, + std::vector& neighbors) { + if (neighbors.empty()) { + // Nothing to do in this case... + return make_ready_future<>(); + } + + // The partition iterating code inside checksum_range_shard does not + // support wrap-around ranges, so we need to break at least wrap- + // around ranges. + std::vector<::range> ranges; + if (range.is_wrap_around(dht::token_comparator())) { + auto unwrapped = range.unwrap(); + ranges.push_back(unwrapped.first); + ranges.push_back(unwrapped.second); + } else { + ranges.push_back(range); + } + // Additionally, we want to break up large ranges so they will have + // (approximately) a desired number of rows each. + // FIXME: column_family should have a method to estimate the number of + // partitions (and of course it should use cardinality estimation bitmaps, + // not trivial sum). We shouldn't have this ugly code here... + auto sstables = db.local().find_column_family(keyspace, cf).get_sstables(); + uint64_t estimated_partitions = 0; + for (auto sst : *sstables) { + estimated_partitions += sst.second->get_estimated_key_count(); + } + // This node contains replicas of rf * vnodes ranges like this one, so + // estimate the number of partitions in just this range: + estimated_partitions /= db.local().get_config().num_tokens(); + estimated_partitions /= db.local().find_keyspace(keyspace).get_replication_strategy().get_replication_factor(); + + // FIXME: we should have an on-the-fly iterator generator here, not + // fill a vector in advance. + std::vector<::range> tosplit; + ranges.swap(tosplit); + for (const auto& range : tosplit) { + // FIXME: this "100" needs to be a parameter. + split_and_add(ranges, range, estimated_partitions, 100); + } + + // We don't need to wait for one checksum to finish before we start the + // next, but doing too many of these operations in parallel also doesn't + // make sense, so we limit the number of concurrent ongoing checksum + // requests with a semaphore. + constexpr int parallelism = 100; + return do_with(semaphore(parallelism), true, std::move(keyspace), std::move(cf), std::move(ranges), + [&db, &neighbors, parallelism] (auto& sem, auto& success, const auto& keyspace, const auto& cf, const auto& ranges) { + return do_for_each(ranges, [&sem, &success, &db, &neighbors, &keyspace, &cf] + (const auto& range) { + return sem.wait(1).then([&sem, &success, &db, &neighbors, &keyspace, &cf, &range] { + // Ask this node, and all neighbors, to calculate checksums in + // this range. When all are done, compare the results, and if + // there are any differences, sync the content of this range. + std::vector> checksums; + checksums.reserve(1 + neighbors.size()); + checksums.push_back(checksum_range(db, keyspace, cf, range)); + for (auto&& neighbor : neighbors) { + checksums.push_back( + net::get_local_messaging_service().send_repair_checksum_range( + net::shard_id{neighbor},keyspace, cf, range)); + } + when_all(checksums.begin(), checksums.end()).then( + [&db, &keyspace, &cf, &range, &neighbors, &success] + (std::vector> checksums) { + for (unsigned i = 0; i < checksums.size(); i++) { + if (checksums[i].failed()) { + logger.warn( + "Checksum of range {} on {} failed: {}", + range, + (i ? neighbors[i-1] : + utils::fb_utilities::get_broadcast_address()), + checksums[i].get_exception()); + success = false; + // Do not break out of the loop here, so we can log + // (and discard) all the exceptions. + } + } + if (!success) { + return make_ready_future<>(); + } + auto checksum0 = checksums[0].get(); + for (unsigned i = 1; i < checksums.size(); i++) { + if (checksum0 != checksums[i].get()) { + logger.info("Found differing range {}", range); + return sync_range(db, keyspace, cf, range, neighbors); + } + } + return make_ready_future<>(); + }).handle_exception([&success, &range] (std::exception_ptr eptr) { + // Something above (e.g., sync_range) failed. We could + // stop the repair immediately, or let it continue with + // other ranges (at the moment, we do the latter). But in + // any case, we need to remember that the repair failed to + // tell the caller. + success = false; + logger.warn("Failed sync of range {}: {}", range, eptr); + }).finally([&sem] { sem.signal(1); }); + }); + }).finally([&sem, &success, parallelism] { + return sem.wait(parallelism).then([&success] { + return success ? make_ready_future<>() : + make_exception_future<>(std::runtime_error("Checksum or sync of partial range failed")); + }); + }); + }); +} + +// Repair a single local range, multiple column families. +// Comparable to RepairSession in Origin +static future<> repair_range(seastar::sharded& db, sstring keyspace, + ::range range, std::vector& cfs) { + auto id = utils::UUID_gen::get_time_UUID(); + return do_with(get_neighbors(db.local(), keyspace, range), [&db, &cfs, keyspace, id, range] (auto& neighbors) { + logger.info("[repair #{}] new session: will sync {} on range {} for {}.{}", id, neighbors, range, keyspace, cfs); + return do_for_each(cfs.begin(), cfs.end(), + [&db, keyspace, &neighbors, id, range] (auto&& cf) { + return repair_cf_range(db, keyspace, cf, range, neighbors); + }); + }); +} + static std::vector> get_ranges_for_endpoint( database& db, sstring keyspace, gms::inet_address ep) { auto& rs = db.find_keyspace(keyspace).get_replication_strategy(); @@ -542,6 +647,37 @@ private: } }; +// repair_ranges repairs a list of token ranges, each assumed to be a token +// range for which this node holds a replica, and, importantly, each range +// is assumed to be a indivisible in the sense that all the tokens in has the +// same nodes as replicas. +static future<> repair_ranges(seastar::sharded& db, sstring keyspace, + std::vector> ranges, std::vector cfs, int id) { + return do_with(std::move(ranges), std::move(keyspace), std::move(cfs), + [&db, id] (auto& ranges, auto& keyspace, auto& cfs) { +#if 1 + // repair all the ranges in parallel + return parallel_for_each(ranges.begin(), ranges.end(), [&db, keyspace, &cfs, id] (auto&& range) { +#else + // repair all the ranges in sequence + return do_for_each(ranges.begin(), ranges.end(), [&db, keyspace, &cfs, id] (auto&& range) { +#endif + return repair_range(db, keyspace, range, cfs); + }).then([id] { + logger.info("repair {} completed sucessfully", id); + repair_tracker.done(id, true); + }).handle_exception([id] (std::exception_ptr eptr) { + logger.info("repair {} failed - {}", id, eptr); + repair_tracker.done(id, false); + }); + }); +} + +// repair_start() can run on any cpu; It runs on cpu0 the function +// do_repair_start(). The benefit of always running that function on the same +// CPU is that it allows us to keep some state (like a list of ongoing +// repairs). It is fine to always do this on one CPU, because the function +// itself does very little (mainly tell other nodes and CPUs what to do). static int do_repair_start(seastar::sharded& db, sstring keyspace, std::unordered_map options_map) { @@ -601,23 +737,7 @@ static int do_repair_start(seastar::sharded& db, sstring keyspace, cfs = list_column_families(db.local(), keyspace); } - do_with(std::move(ranges), [&db, keyspace, cfs, id] (auto& ranges) { -#if 1 - // repair all the ranges in parallel - return parallel_for_each(ranges.begin(), ranges.end(), [&db, keyspace, cfs, id] (auto&& range) { -#else - // repair all the ranges in sequence - return do_for_each(ranges.begin(), ranges.end(), [&db, keyspace, cfs, id] (auto&& range) { -#endif - return repair_range(db, keyspace, range, cfs); - }).then([id] { - logger.info("repair {} completed sucessfully", id); - repair_tracker.done(id, true); - }).handle_exception([id] (std::exception_ptr eptr) { - logger.info("repair {} failed", id); - repair_tracker.done(id, false); - }); - }); + repair_ranges(db, std::move(keyspace), std::move(ranges), std::move(cfs), id); return id; }