repair: Fix shard_begin and shard_end

A range now alternates between different shards: the first part of the
range goes to shard X, the next to shard X+1, but after a while we go
back to shard X. So we can't do a simple loop between shard_begin and
shard_end.

Fix by using the newly introduced dht::split_range_to_shards

Use the cf.make_streaming_reader with ranges to simplify the code a bit.
This commit is contained in:
Asias He
2016-12-07 13:09:48 +08:00
parent 1987264beb
commit 374324e6fb

View File

@@ -426,26 +426,24 @@ std::ostream& operator<<(std::ostream& out, const partition_checksum& c) {
// data is coming in).
static future<partition_checksum> checksum_range_shard(database &db,
const sstring& keyspace_name, const sstring& cf_name,
const ::nonwrapping_range<dht::token>& range, repair_checksum hash_version) {
const std::vector<query::partition_range>& prs, repair_checksum hash_version) {
auto& cf = db.find_column_family(keyspace_name, cf_name);
return do_with(dht::to_partition_range(range), [&cf, hash_version] (const auto& partition_range) {
auto reader = cf.make_streaming_reader(cf.schema(), partition_range);
return do_with(std::move(reader), partition_checksum(),
[hash_version] (auto& reader, auto& checksum) {
return repeat([&reader, &checksum, hash_version] () {
return reader().then([&checksum, hash_version] (auto mopt) {
if (mopt) {
return partition_checksum::compute(std::move(*mopt), hash_version).then([&checksum] (auto pc) {
checksum.add(pc);
return stop_iteration::no;
});
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).then([&checksum] {
return checksum;
auto reader = cf.make_streaming_reader(cf.schema(), prs);
return do_with(std::move(reader), partition_checksum(),
[hash_version] (auto& reader, auto& checksum) {
return repeat([&reader, &checksum, hash_version] () {
return reader().then([&checksum, hash_version] (auto mopt) {
if (mopt) {
return partition_checksum::compute(std::move(*mopt), hash_version).then([&checksum] (auto pc) {
checksum.add(pc);
return stop_iteration::no;
});
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).then([&checksum] {
return checksum;
});
});
}
@@ -464,16 +462,16 @@ static future<partition_checksum> checksum_range_shard(database &db,
future<partition_checksum> checksum_range(seastar::sharded<database> &db,
const sstring& keyspace, const sstring& cf,
const ::nonwrapping_range<dht::token>& range, repair_checksum hash_version) {
unsigned shard_begin = range.start() ?
dht::shard_of(range.start()->value()) : 0;
unsigned shard_end = range.end() ?
dht::shard_of(range.end()->value())+1 : smp::count;
return do_with(partition_checksum(), [shard_begin, shard_end, &db, &keyspace, &cf, &range, hash_version] (auto& result) {
return parallel_for_each(boost::counting_iterator<int>(shard_begin),
boost::counting_iterator<int>(shard_end),
[&db, &keyspace, &cf, &range, &result, hash_version] (unsigned shard) {
return db.invoke_on(shard, [&keyspace, &cf, &range, hash_version] (database& db) {
return checksum_range_shard(db, keyspace, cf, range, hash_version);
auto& schema = db.local().find_column_family(keyspace, cf).schema();
auto shard_ranges = dht::split_range_to_shards(dht::to_partition_range(range), *schema);
return do_with(partition_checksum(), std::move(shard_ranges), [&db, &keyspace, &cf, hash_version] (auto& result, auto& shard_ranges) {
return parallel_for_each(shard_ranges, [&db, &keyspace, &cf, &result, hash_version] (auto& shard_range) {
auto& shard = shard_range.first;
auto& prs = shard_range.second;
return db.invoke_on(shard, [keyspace, cf, prs = std::move(prs), hash_version] (database& db) mutable {
return do_with(std::move(keyspace), std::move(cf), std::move(prs), [&db, hash_version] (auto& keyspace, auto& cf, auto& prs) {
return checksum_range_shard(db, keyspace, cf, prs, hash_version);
});
}).then([&result] (partition_checksum sum) {
result.add(sum);
});