diff --git a/repair/range_split.hh b/repair/range_split.hh new file mode 100644 index 0000000000..f660e80007 --- /dev/null +++ b/repair/range_split.hh @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "dht/i_partitioner.hh" + +// range_splitter(r, N, K) is a helper for splitting a given token_range r of +// estimated size N into many small ranges of size K, and later iterating +// over those small ranges once with the has_next() and next() methods. +// This implementation assumes only the availability of a range::midpoint() +// operation, and as result creates ranges with size between K/2 and K. +// Moreover, it has memory requirement log(N). With more general arithmetic +// support over tokens, we could get exactly K and O(1) memory. +class range_splitter { + std::stack> _stack; + uint64_t _desired; +public: + range_splitter(::dht::token_range r, uint64_t N, uint64_t K) { + _stack.push({r, N}); + _desired = K; + } + bool has_next() const { + return !_stack.empty(); + } + ::dht::token_range next() { + // If the head range's estimated size is small enough, return it. + // Otherwise split it to two halves, push the second half on the + // stack, and repeat with the first half. May need to do this more + // than once (up to log(N/K) times) until we have one range small + // enough to return. + assert(!_stack.empty()); + auto range = _stack.top().first; + auto size = _stack.top().second; + _stack.pop(); + while (size > _desired) { + // The use of minimum_token() here twice is not a typo - because wrap- + // around token ranges are supported by midpoint(), the beyond-maximum + // token can also be represented by minimum_token(). + auto midpoint = dht::global_partitioner().midpoint( + range.start() ? range.start()->value() : dht::minimum_token(), + range.end() ? range.end()->value() : dht::minimum_token()); + // This shouldn't happen, but if the range included just one token, we + // can't split further (split() may actually fail with assertion failure) + if ((range.start() && midpoint == range.start()->value()) || + (range.end() && midpoint == range.end()->value())) { + return range; + } + auto halves = range.split(midpoint, dht::token_comparator()); + _stack.push({halves.second, size / 2.0}); + range = halves.first; + size /= 2.0; + } + return range; + } +}; diff --git a/repair/repair.cc b/repair/repair.cc index 987b8770f8..9fb1da9a3f 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -20,6 +20,7 @@ */ #include "repair.hh" +#include "range_split.hh" #include "streaming/stream_plan.hh" #include "streaming/stream_state.hh" @@ -478,31 +479,6 @@ future checksum_range(seastar::sharded &db, }); } -static void split_and_add(std::vector<::dht::token_range>& ranges, - const dht::token_range& range, - uint64_t estimated_partitions, uint64_t target_partitions) { - if (estimated_partitions < target_partitions) { - // We're done, the range is small enough to not be split further - ranges.push_back(range); - return; - } - // The use of minimum_token() here twice is not a typo - because wrap- - // around token ranges are supported by midpoint(), the beyond-maximum - // token can also be represented by minimum_token(). - auto midpoint = dht::global_partitioner().midpoint( - range.start() ? range.start()->value() : dht::minimum_token(), - range.end() ? range.end()->value() : dht::minimum_token()); - // This shouldn't happen, but if the range included just one token, we - // can't split further (split() may actually fail with assertion failure) - if ((range.start() && midpoint == range.start()->value()) || - (range.end() && midpoint == range.end()->value())) { - ranges.push_back(range); - return; - } - auto halves = range.split(midpoint, dht::token_comparator()); - ranges.push_back(halves.first); - ranges.push_back(halves.second); -} // We don't need to wait for one checksum to finish before we start the // next, but doing too many of these operations in parallel also doesn't // make sense, so we limit the number of concurrent ongoing checksum @@ -548,33 +524,14 @@ static future<> repair_cf_range(repair_info& ri, } return estimate_partitions(ri.db, ri.keyspace, cf, range).then([&ri, cf, range, &neighbors] (uint64_t estimated_partitions) { - // Additionally, we want to break up large ranges so they will have - // (approximately) a desired number of rows each. - std::vector<::dht::token_range> ranges; - ranges.push_back(range); - - // FIXME: we should have an on-the-fly iterator generator here, not - // fill a vector in advance. - std::vector<::dht::token_range> tosplit; - while (estimated_partitions > ri.target_partitions) { - tosplit.clear(); - ranges.swap(tosplit); - for (const auto& range : tosplit) { - split_and_add(ranges, range, estimated_partitions, ri.target_partitions); - } - estimated_partitions /= 2; - if (ranges.size() >= ri.sub_ranges_max) { - break; - } - } - rlogger.debug("target_partitions={}, estimated_partitions={}, ranges.size={}, range={} -> ranges={}", - ri.target_partitions, estimated_partitions, ranges.size(), range, ranges); - + range_splitter ranges(range, estimated_partitions, ri.target_partitions); return do_with(seastar::gate(), true, std::move(cf), std::move(ranges), [&ri, &neighbors] (auto& completion, auto& success, const auto& cf, auto& ranges) { - return do_for_each(ranges, [&ri, &completion, &success, &neighbors, &cf] (const auto& range) { + return do_until([&ranges] () { return !ranges.has_next(); }, + [&ranges, &ri, &completion, &success, &neighbors, &cf] () { + auto range = ranges.next(); check_in_shutdown(); - return parallelism_semaphore.wait(1).then([&ri, &completion, &success, &neighbors, &cf, &range] { + return parallelism_semaphore.wait(1).then([&ri, &completion, &success, &neighbors, &cf, range] { auto checksum_type = service::get_local_storage_service().cluster_supports_large_partitions() ? repair_checksum::streamed : repair_checksum::legacy; @@ -592,7 +549,7 @@ static future<> repair_cf_range(repair_info& ri, completion.enter(); when_all(checksums.begin(), checksums.end()).then( - [&ri, &cf, &range, &neighbors, &success] + [&ri, &cf, range, &neighbors, &success] (std::vector> checksums) { // If only some of the replicas of this range are alive, // we set success=false so repair will fail, but we can diff --git a/tests/partitioner_test.cc b/tests/partitioner_test.cc index 7e01659117..54b7321d83 100644 --- a/tests/partitioner_test.cc +++ b/tests/partitioner_test.cc @@ -32,6 +32,7 @@ #include "types.hh" #include "schema_builder.hh" #include "utils/div_ceil.hh" +#include "repair/range_split.hh" #include "disk-error-handler.hh" #include "simple_schema.hh" @@ -921,3 +922,36 @@ do_test_split_range_to_single_shard(const dht::i_partitioner& part, const schema BOOST_AUTO_TEST_CASE(test_split_range_single_shard) { return test_something_with_some_interesting_ranges_and_partitioners(do_test_split_range_to_single_shard); } + +// tests for range_split() utility function in repair/range_split.hh +static int test_split(int N, int K) { + auto t1 = token_from_long(0x6000'0000'0000'0000); + auto t2 = token_from_long(0x9000'0000'0000'0000); + dht::token_range r(t1, t2); + auto splitter = range_splitter(r, N, K); + int c = 0; + dht::token_range prev_range; + while (splitter.has_next()) { + auto range = splitter.next(); + //std::cerr << range << "\n"; + if (c == 0) { + BOOST_REQUIRE(range.start() == r.start()); + } else { + std::experimental::optional e({prev_range.end()->value(), !prev_range.end()->is_inclusive()}); + BOOST_REQUIRE(range.start() == e); + } + prev_range = range; + c++; + } + if (c > 0) { + BOOST_REQUIRE(prev_range.end() == r.end()); + } + return c; +} + +BOOST_AUTO_TEST_CASE(test_split_1) { + BOOST_REQUIRE(test_split(128, 16) == 8); + // will make 7 binary splits: 500, 250, 125.5, 62.5, 31.25, 15.625, + // 7.8125, so expect 2^7 = 128 ranges: + BOOST_REQUIRE(test_split(1000, 11) == 128); +}