repair: iterator over subranges instead of list

When starting repair, we divided the large token ranges (vnodes) linto small
subranges of a desired length (around 100 partition), and built a huge list
of those subranges - to iterate over them later and compare checksums of
those chunks.

However, building this list up-front is completely unnecessary, and wastes
a lot of memory: In a test with 1 TB of data, as much as 3 gigabytes was
spent on this list. Instead, what we do in this patch is to find the next
chunk in a DFS-like splitting algorithm, using only the token range
midpoint() function (as before). The amount of memory needed for this is
O(logN), instead of O(N) in the previous implementation.

Refs #2430.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
This commit is contained in:
Nadav Har'El
2017-06-06 00:08:19 +03:00
committed by Asias He
parent 0ca1e5cca3
commit b3ff37e67f
3 changed files with 117 additions and 50 deletions

76
repair/range_split.hh Normal file
View File

@@ -0,0 +1,76 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <stack>
#include "dht/i_partitioner.hh"
// range_splitter(r, N, K) is a helper for splitting a given token_range r of
// estimated size N into many small ranges of size K, and later iterating
// over those small ranges once with the has_next() and next() methods.
// This implementation assumes only the availability of a range::midpoint()
// operation, and as result creates ranges with size between K/2 and K.
// Moreover, it has memory requirement log(N). With more general arithmetic
// support over tokens, we could get exactly K and O(1) memory.
class range_splitter {
std::stack<std::pair<::dht::token_range, float>> _stack;
uint64_t _desired;
public:
range_splitter(::dht::token_range r, uint64_t N, uint64_t K) {
_stack.push({r, N});
_desired = K;
}
bool has_next() const {
return !_stack.empty();
}
::dht::token_range next() {
// If the head range's estimated size is small enough, return it.
// Otherwise split it to two halves, push the second half on the
// stack, and repeat with the first half. May need to do this more
// than once (up to log(N/K) times) until we have one range small
// enough to return.
assert(!_stack.empty());
auto range = _stack.top().first;
auto size = _stack.top().second;
_stack.pop();
while (size > _desired) {
// The use of minimum_token() here twice is not a typo - because wrap-
// around token ranges are supported by midpoint(), the beyond-maximum
// token can also be represented by minimum_token().
auto midpoint = dht::global_partitioner().midpoint(
range.start() ? range.start()->value() : dht::minimum_token(),
range.end() ? range.end()->value() : dht::minimum_token());
// This shouldn't happen, but if the range included just one token, we
// can't split further (split() may actually fail with assertion failure)
if ((range.start() && midpoint == range.start()->value()) ||
(range.end() && midpoint == range.end()->value())) {
return range;
}
auto halves = range.split(midpoint, dht::token_comparator());
_stack.push({halves.second, size / 2.0});
range = halves.first;
size /= 2.0;
}
return range;
}
};

View File

@@ -20,6 +20,7 @@
*/
#include "repair.hh"
#include "range_split.hh"
#include "streaming/stream_plan.hh"
#include "streaming/stream_state.hh"
@@ -478,31 +479,6 @@ future<partition_checksum> checksum_range(seastar::sharded<database> &db,
});
}
static void split_and_add(std::vector<::dht::token_range>& ranges,
const dht::token_range& range,
uint64_t estimated_partitions, uint64_t target_partitions) {
if (estimated_partitions < target_partitions) {
// We're done, the range is small enough to not be split further
ranges.push_back(range);
return;
}
// The use of minimum_token() here twice is not a typo - because wrap-
// around token ranges are supported by midpoint(), the beyond-maximum
// token can also be represented by minimum_token().
auto midpoint = dht::global_partitioner().midpoint(
range.start() ? range.start()->value() : dht::minimum_token(),
range.end() ? range.end()->value() : dht::minimum_token());
// This shouldn't happen, but if the range included just one token, we
// can't split further (split() may actually fail with assertion failure)
if ((range.start() && midpoint == range.start()->value()) ||
(range.end() && midpoint == range.end()->value())) {
ranges.push_back(range);
return;
}
auto halves = range.split(midpoint, dht::token_comparator());
ranges.push_back(halves.first);
ranges.push_back(halves.second);
}
// We don't need to wait for one checksum to finish before we start the
// next, but doing too many of these operations in parallel also doesn't
// make sense, so we limit the number of concurrent ongoing checksum
@@ -548,33 +524,14 @@ static future<> repair_cf_range(repair_info& ri,
}
return estimate_partitions(ri.db, ri.keyspace, cf, range).then([&ri, cf, range, &neighbors] (uint64_t estimated_partitions) {
// Additionally, we want to break up large ranges so they will have
// (approximately) a desired number of rows each.
std::vector<::dht::token_range> ranges;
ranges.push_back(range);
// FIXME: we should have an on-the-fly iterator generator here, not
// fill a vector in advance.
std::vector<::dht::token_range> tosplit;
while (estimated_partitions > ri.target_partitions) {
tosplit.clear();
ranges.swap(tosplit);
for (const auto& range : tosplit) {
split_and_add(ranges, range, estimated_partitions, ri.target_partitions);
}
estimated_partitions /= 2;
if (ranges.size() >= ri.sub_ranges_max) {
break;
}
}
rlogger.debug("target_partitions={}, estimated_partitions={}, ranges.size={}, range={} -> ranges={}",
ri.target_partitions, estimated_partitions, ranges.size(), range, ranges);
range_splitter ranges(range, estimated_partitions, ri.target_partitions);
return do_with(seastar::gate(), true, std::move(cf), std::move(ranges),
[&ri, &neighbors] (auto& completion, auto& success, const auto& cf, auto& ranges) {
return do_for_each(ranges, [&ri, &completion, &success, &neighbors, &cf] (const auto& range) {
return do_until([&ranges] () { return !ranges.has_next(); },
[&ranges, &ri, &completion, &success, &neighbors, &cf] () {
auto range = ranges.next();
check_in_shutdown();
return parallelism_semaphore.wait(1).then([&ri, &completion, &success, &neighbors, &cf, &range] {
return parallelism_semaphore.wait(1).then([&ri, &completion, &success, &neighbors, &cf, range] {
auto checksum_type = service::get_local_storage_service().cluster_supports_large_partitions()
? repair_checksum::streamed : repair_checksum::legacy;
@@ -592,7 +549,7 @@ static future<> repair_cf_range(repair_info& ri,
completion.enter();
when_all(checksums.begin(), checksums.end()).then(
[&ri, &cf, &range, &neighbors, &success]
[&ri, &cf, range, &neighbors, &success]
(std::vector<future<partition_checksum>> checksums) {
// If only some of the replicas of this range are alive,
// we set success=false so repair will fail, but we can

View File

@@ -32,6 +32,7 @@
#include "types.hh"
#include "schema_builder.hh"
#include "utils/div_ceil.hh"
#include "repair/range_split.hh"
#include "disk-error-handler.hh"
#include "simple_schema.hh"
@@ -921,3 +922,36 @@ do_test_split_range_to_single_shard(const dht::i_partitioner& part, const schema
BOOST_AUTO_TEST_CASE(test_split_range_single_shard) {
return test_something_with_some_interesting_ranges_and_partitioners(do_test_split_range_to_single_shard);
}
// tests for range_split() utility function in repair/range_split.hh
static int test_split(int N, int K) {
auto t1 = token_from_long(0x6000'0000'0000'0000);
auto t2 = token_from_long(0x9000'0000'0000'0000);
dht::token_range r(t1, t2);
auto splitter = range_splitter(r, N, K);
int c = 0;
dht::token_range prev_range;
while (splitter.has_next()) {
auto range = splitter.next();
//std::cerr << range << "\n";
if (c == 0) {
BOOST_REQUIRE(range.start() == r.start());
} else {
std::experimental::optional<dht::token_range::bound> e({prev_range.end()->value(), !prev_range.end()->is_inclusive()});
BOOST_REQUIRE(range.start() == e);
}
prev_range = range;
c++;
}
if (c > 0) {
BOOST_REQUIRE(prev_range.end() == r.end());
}
return c;
}
BOOST_AUTO_TEST_CASE(test_split_1) {
BOOST_REQUIRE(test_split(128, 16) == 8);
// will make 7 binary splits: 500, 250, 125.5, 62.5, 31.25, 15.625,
// 7.8125, so expect 2^7 = 128 ranges:
BOOST_REQUIRE(test_split(1000, 11) == 128);
}