From ac4e86d861bba046dc27da96d50fe74cbba7d4c7 Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Tue, 5 Jan 2016 15:37:51 +0200 Subject: [PATCH] repair: use repair_checksum_range The existing repair code always streamed the entire content of the database. In this overhaul, we send "repair_checksum_range" messages to the other nodes to verify whether they have exactly the same data as this node, and if they do, we avoid streaming the identical code. We make an attempt to split the token ranges up to contain an estimated 100 keys each, and send these ranges' checksums. Future versions of this code will need to improve this estimation (and make this "100" a parameter) Signed-off-by: Nadav Har'El --- repair/repair.cc | 236 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 178 insertions(+), 58 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index cc66fa6750..bda2849f48 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -24,6 +24,7 @@ #include "streaming/stream_plan.hh" #include "streaming/stream_state.hh" #include "gms/inet_address.hh" +#include "db/config.hh" #include #include @@ -200,47 +201,6 @@ public: } } repair_tracker; -// repair_start() can run on any cpu; It runs on cpu0 the function -// do_repair_start(). The benefit of always running that function on the same -// CPU is that it allows us to keep some state (like a list of ongoing -// repairs). It is fine to always do this on one CPU, because the function -// itself does very little (mainly tell other nodes and CPUs what to do). - -// Repair a single range. Comparable to RepairSession in Origin -// In Origin, this is composed of several "repair jobs", each with one cf, -// but our streaming already works for several cfs. -static future<> repair_range(seastar::sharded& db, sstring keyspace, - query::range range, std::vector cfs) { - auto sp = make_lw_shared("repair"); - auto id = utils::UUID_gen::get_time_UUID(); - - auto neighbors = get_neighbors(db.local(), keyspace, range); - logger.info("[repair #{}] new session: will sync {} on range {} for {}.{}", id, neighbors, range, keyspace, cfs); - for (auto peer : neighbors) { - // FIXME: obviously, we'll need Merkel trees or another alternative - // method to decide which parts of the data we need to stream instead - // of streaming everything like we do now. So this logging is kind of - // silly, and we never log the corresponding "... is consistent with" - // message: see SyncTask.run() in Origin for the original messages. - auto me = utils::fb_utilities::get_broadcast_address(); - for (auto &&cf : cfs) { - logger.info("[repair #{}] Endpoints {} and {} have {} range(s) out of sync for {}", id, me, peer, 1, cf); - } - - // FIXME: think: if we have several neighbors, perhaps we need to - // request ranges from all of them and only later transfer ranges to - // all of them? Otherwise, we won't necessarily fully repair the - // other ndoes, just this one? What does Cassandra do here? - sp->transfer_ranges(peer, keyspace, {range}, cfs); - sp->request_ranges(peer, keyspace, {range}, cfs); - } - return sp->execute().discard_result().then([sp, id] { - logger.info("repair session #{} successful", id); - }).handle_exception([id] (auto ep) { - logger.error("repair session #{} stream failed: {}", id, ep); - return make_exception_future(std::runtime_error("repair_range failed")); - }); -} partition_checksum::partition_checksum(const mutation& m) { auto frozen = freeze(m); @@ -387,6 +347,151 @@ static future<> sync_range(seastar::sharded& db, }); }); } +static void split_and_add(std::vector<::range>& ranges, + const range& range, + uint64_t estimated_partitions, uint64_t target_partitions) { + if (estimated_partitions < target_partitions) { + // We're done, the range is small enough to not be split further + ranges.push_back(range); + return; + } + // The use of minimum_token() here twice is not a typo - because wrap- + // around token ranges are supported by midpoint(), the beyond-maximum + // token can also be represented by minimum_token(). + auto midpoint = dht::global_partitioner().midpoint( + range.start() ? range.start()->value() : dht::minimum_token(), + range.end() ? range.end()->value() : dht::minimum_token()); + auto halves = range.split(midpoint, dht::token_comparator()); + ranges.push_back(halves.first); + ranges.push_back(halves.second); +} + +// Repair a single cf in a single local range. +// Comparable to RepairJob in Origin. +static future<> repair_cf_range(seastar::sharded& db, + sstring keyspace, sstring cf, ::range range, + std::vector& neighbors) { + if (neighbors.empty()) { + // Nothing to do in this case... + return make_ready_future<>(); + } + + // The partition iterating code inside checksum_range_shard does not + // support wrap-around ranges, so we need to break at least wrap- + // around ranges. + std::vector<::range> ranges; + if (range.is_wrap_around(dht::token_comparator())) { + auto unwrapped = range.unwrap(); + ranges.push_back(unwrapped.first); + ranges.push_back(unwrapped.second); + } else { + ranges.push_back(range); + } + // Additionally, we want to break up large ranges so they will have + // (approximately) a desired number of rows each. + // FIXME: column_family should have a method to estimate the number of + // partitions (and of course it should use cardinality estimation bitmaps, + // not trivial sum). We shouldn't have this ugly code here... + auto sstables = db.local().find_column_family(keyspace, cf).get_sstables(); + uint64_t estimated_partitions = 0; + for (auto sst : *sstables) { + estimated_partitions += sst.second->get_estimated_key_count(); + } + // This node contains replicas of rf * vnodes ranges like this one, so + // estimate the number of partitions in just this range: + estimated_partitions /= db.local().get_config().num_tokens(); + estimated_partitions /= db.local().find_keyspace(keyspace).get_replication_strategy().get_replication_factor(); + + // FIXME: we should have an on-the-fly iterator generator here, not + // fill a vector in advance. + std::vector<::range> tosplit; + ranges.swap(tosplit); + for (const auto& range : tosplit) { + // FIXME: this "100" needs to be a parameter. + split_and_add(ranges, range, estimated_partitions, 100); + } + + // We don't need to wait for one checksum to finish before we start the + // next, but doing too many of these operations in parallel also doesn't + // make sense, so we limit the number of concurrent ongoing checksum + // requests with a semaphore. + constexpr int parallelism = 100; + return do_with(semaphore(parallelism), true, std::move(keyspace), std::move(cf), std::move(ranges), + [&db, &neighbors, parallelism] (auto& sem, auto& success, const auto& keyspace, const auto& cf, const auto& ranges) { + return do_for_each(ranges, [&sem, &success, &db, &neighbors, &keyspace, &cf] + (const auto& range) { + return sem.wait(1).then([&sem, &success, &db, &neighbors, &keyspace, &cf, &range] { + // Ask this node, and all neighbors, to calculate checksums in + // this range. When all are done, compare the results, and if + // there are any differences, sync the content of this range. + std::vector> checksums; + checksums.reserve(1 + neighbors.size()); + checksums.push_back(checksum_range(db, keyspace, cf, range)); + for (auto&& neighbor : neighbors) { + checksums.push_back( + net::get_local_messaging_service().send_repair_checksum_range( + net::shard_id{neighbor},keyspace, cf, range)); + } + when_all(checksums.begin(), checksums.end()).then( + [&db, &keyspace, &cf, &range, &neighbors, &success] + (std::vector> checksums) { + for (unsigned i = 0; i < checksums.size(); i++) { + if (checksums[i].failed()) { + logger.warn( + "Checksum of range {} on {} failed: {}", + range, + (i ? neighbors[i-1] : + utils::fb_utilities::get_broadcast_address()), + checksums[i].get_exception()); + success = false; + // Do not break out of the loop here, so we can log + // (and discard) all the exceptions. + } + } + if (!success) { + return make_ready_future<>(); + } + auto checksum0 = checksums[0].get(); + for (unsigned i = 1; i < checksums.size(); i++) { + if (checksum0 != checksums[i].get()) { + logger.info("Found differing range {}", range); + return sync_range(db, keyspace, cf, range, neighbors); + } + } + return make_ready_future<>(); + }).handle_exception([&success, &range] (std::exception_ptr eptr) { + // Something above (e.g., sync_range) failed. We could + // stop the repair immediately, or let it continue with + // other ranges (at the moment, we do the latter). But in + // any case, we need to remember that the repair failed to + // tell the caller. + success = false; + logger.warn("Failed sync of range {}: {}", range, eptr); + }).finally([&sem] { sem.signal(1); }); + }); + }).finally([&sem, &success, parallelism] { + return sem.wait(parallelism).then([&success] { + return success ? make_ready_future<>() : + make_exception_future<>(std::runtime_error("Checksum or sync of partial range failed")); + }); + }); + }); +} + +// Repair a single local range, multiple column families. +// Comparable to RepairSession in Origin +static future<> repair_range(seastar::sharded& db, sstring keyspace, + ::range range, std::vector& cfs) { + auto id = utils::UUID_gen::get_time_UUID(); + return do_with(get_neighbors(db.local(), keyspace, range), [&db, &cfs, keyspace, id, range] (auto& neighbors) { + logger.info("[repair #{}] new session: will sync {} on range {} for {}.{}", id, neighbors, range, keyspace, cfs); + return do_for_each(cfs.begin(), cfs.end(), + [&db, keyspace, &neighbors, id, range] (auto&& cf) { + return repair_cf_range(db, keyspace, cf, range, neighbors); + }); + }); +} + static std::vector> get_ranges_for_endpoint( database& db, sstring keyspace, gms::inet_address ep) { auto& rs = db.find_keyspace(keyspace).get_replication_strategy(); @@ -542,6 +647,37 @@ private: } }; +// repair_ranges repairs a list of token ranges, each assumed to be a token +// range for which this node holds a replica, and, importantly, each range +// is assumed to be a indivisible in the sense that all the tokens in has the +// same nodes as replicas. +static future<> repair_ranges(seastar::sharded& db, sstring keyspace, + std::vector> ranges, std::vector cfs, int id) { + return do_with(std::move(ranges), std::move(keyspace), std::move(cfs), + [&db, id] (auto& ranges, auto& keyspace, auto& cfs) { +#if 1 + // repair all the ranges in parallel + return parallel_for_each(ranges.begin(), ranges.end(), [&db, keyspace, &cfs, id] (auto&& range) { +#else + // repair all the ranges in sequence + return do_for_each(ranges.begin(), ranges.end(), [&db, keyspace, &cfs, id] (auto&& range) { +#endif + return repair_range(db, keyspace, range, cfs); + }).then([id] { + logger.info("repair {} completed sucessfully", id); + repair_tracker.done(id, true); + }).handle_exception([id] (std::exception_ptr eptr) { + logger.info("repair {} failed - {}", id, eptr); + repair_tracker.done(id, false); + }); + }); +} + +// repair_start() can run on any cpu; It runs on cpu0 the function +// do_repair_start(). The benefit of always running that function on the same +// CPU is that it allows us to keep some state (like a list of ongoing +// repairs). It is fine to always do this on one CPU, because the function +// itself does very little (mainly tell other nodes and CPUs what to do). static int do_repair_start(seastar::sharded& db, sstring keyspace, std::unordered_map options_map) { @@ -601,23 +737,7 @@ static int do_repair_start(seastar::sharded& db, sstring keyspace, cfs = list_column_families(db.local(), keyspace); } - do_with(std::move(ranges), [&db, keyspace, cfs, id] (auto& ranges) { -#if 1 - // repair all the ranges in parallel - return parallel_for_each(ranges.begin(), ranges.end(), [&db, keyspace, cfs, id] (auto&& range) { -#else - // repair all the ranges in sequence - return do_for_each(ranges.begin(), ranges.end(), [&db, keyspace, cfs, id] (auto&& range) { -#endif - return repair_range(db, keyspace, range, cfs); - }).then([id] { - logger.info("repair {} completed sucessfully", id); - repair_tracker.done(id, true); - }).handle_exception([id] (std::exception_ptr eptr) { - logger.info("repair {} failed", id); - repair_tracker.done(id, false); - }); - }); + repair_ranges(db, std::move(keyspace), std::move(ranges), std::move(cfs), id); return id; }