From 374324e6fb8a2bfbd3a18524887eff71208bd3c5 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 7 Dec 2016 13:09:48 +0800 Subject: [PATCH] repair: Fix shard_begin and shard_end A range now alternates between different shards: the first part of the range goes to shard X, the next to shard X+1, but after a while we go back to shard X. So we can't do a simple loop between shard_begin and shard_end. Fix by using the newly introduced dht::split_range_to_shards Use the cf.make_streaming_reader with ranges to simplify the code a bit. --- repair/repair.cc | 54 +++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 72e4f3a0e9..07ddb69e47 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -426,26 +426,24 @@ std::ostream& operator<<(std::ostream& out, const partition_checksum& c) { // data is coming in). static future checksum_range_shard(database &db, const sstring& keyspace_name, const sstring& cf_name, - const ::nonwrapping_range& range, repair_checksum hash_version) { + const std::vector& prs, repair_checksum hash_version) { auto& cf = db.find_column_family(keyspace_name, cf_name); - return do_with(dht::to_partition_range(range), [&cf, hash_version] (const auto& partition_range) { - auto reader = cf.make_streaming_reader(cf.schema(), partition_range); - return do_with(std::move(reader), partition_checksum(), - [hash_version] (auto& reader, auto& checksum) { - return repeat([&reader, &checksum, hash_version] () { - return reader().then([&checksum, hash_version] (auto mopt) { - if (mopt) { - return partition_checksum::compute(std::move(*mopt), hash_version).then([&checksum] (auto pc) { - checksum.add(pc); - return stop_iteration::no; - }); - } else { - return make_ready_future(stop_iteration::yes); - } - }); - }).then([&checksum] { - return checksum; + auto reader = cf.make_streaming_reader(cf.schema(), prs); + return do_with(std::move(reader), partition_checksum(), + [hash_version] (auto& reader, auto& checksum) { + return repeat([&reader, &checksum, hash_version] () { + return reader().then([&checksum, hash_version] (auto mopt) { + if (mopt) { + return partition_checksum::compute(std::move(*mopt), hash_version).then([&checksum] (auto pc) { + checksum.add(pc); + return stop_iteration::no; + }); + } else { + return make_ready_future(stop_iteration::yes); + } }); + }).then([&checksum] { + return checksum; }); }); } @@ -464,16 +462,16 @@ static future checksum_range_shard(database &db, future checksum_range(seastar::sharded &db, const sstring& keyspace, const sstring& cf, const ::nonwrapping_range& range, repair_checksum hash_version) { - unsigned shard_begin = range.start() ? - dht::shard_of(range.start()->value()) : 0; - unsigned shard_end = range.end() ? - dht::shard_of(range.end()->value())+1 : smp::count; - return do_with(partition_checksum(), [shard_begin, shard_end, &db, &keyspace, &cf, &range, hash_version] (auto& result) { - return parallel_for_each(boost::counting_iterator(shard_begin), - boost::counting_iterator(shard_end), - [&db, &keyspace, &cf, &range, &result, hash_version] (unsigned shard) { - return db.invoke_on(shard, [&keyspace, &cf, &range, hash_version] (database& db) { - return checksum_range_shard(db, keyspace, cf, range, hash_version); + auto& schema = db.local().find_column_family(keyspace, cf).schema(); + auto shard_ranges = dht::split_range_to_shards(dht::to_partition_range(range), *schema); + return do_with(partition_checksum(), std::move(shard_ranges), [&db, &keyspace, &cf, hash_version] (auto& result, auto& shard_ranges) { + return parallel_for_each(shard_ranges, [&db, &keyspace, &cf, &result, hash_version] (auto& shard_range) { + auto& shard = shard_range.first; + auto& prs = shard_range.second; + return db.invoke_on(shard, [keyspace, cf, prs = std::move(prs), hash_version] (database& db) mutable { + return do_with(std::move(keyspace), std::move(cf), std::move(prs), [&db, hash_version] (auto& keyspace, auto& cf, auto& prs) { + return checksum_range_shard(db, keyspace, cf, prs, hash_version); + }); }).then([&result] (partition_checksum sum) { result.add(sum); });