range: adjust split_after to allow split_point outside input range

Make split_after() more generic by allowing split_point to be anywhere,
not just within the input range.  If the split_point is before, the entire
range is returned; and if it is after, stdx::nullopt is returned.

"before" and "after" are not well defined for wrap-around ranges, so
but we are phasing them out and soon there will not be
wrapping_range::split_after() users.

This is a prerequisite for converting partition_range and friends to
nonwrapping_range.
Message-Id: <1475765099-10657-1-git-send-email-avi@scylladb.com>
This commit is contained in:
Avi Kivity
2016-10-06 17:44:59 +03:00
committed by Tomasz Grabiec
parent 7ea4513595
commit 9ac441d3b5
3 changed files with 116 additions and 21 deletions

View File

@@ -21,6 +21,7 @@
#pragma once
#include "stdx.hh"
#include <experimental/optional>
#include <iostream>
#include <boost/range/algorithm/copy.hpp>
@@ -332,12 +333,22 @@ public:
wrapping_range right(bound(split_point, false), end());
return std::make_pair(std::move(left), std::move(right));
}
// Create a sub-range including values greater than the split_point. split_point has to be inside the range
// Create a sub-range including values greater than the split_point. Returns stdx::nullopt if
// split_point is after the end (but not included in the range, in case of wraparound ranges)
// Comparator must define a total ordering on T.
template<typename Comparator>
wrapping_range<T> split_after(const T& split_point, Comparator&& cmp) const {
assert(contains(split_point, std::forward<Comparator>(cmp)));
return wrapping_range(bound(split_point, false), end());
stdx::optional<wrapping_range<T>> split_after(const T& split_point, Comparator&& cmp) const {
if (contains(split_point, std::forward<Comparator>(cmp))
&& (!end() || cmp(split_point, end()->value()) != 0)) {
return wrapping_range(bound(split_point, false), end());
} else if (end() && cmp(split_point, end()->value()) >= 0) {
// whether to return stdx::nullopt or the full range is not
// well-defined for wraparound ranges; we return nullopt
// if split_point is after the end.
return stdx::nullopt;
} else {
return *this;
}
}
// Transforms this range into a new range of a different value type
// Supplied transformer should transform value of type T (the old type) into value of type U (the new type).
@@ -512,12 +523,17 @@ public:
nonwrapping_range right(bound(split_point, false), end());
return std::make_pair(std::move(left), std::move(right));
}
// Create a sub-range including values greater than the split_point. split_point has to be inside the range
// Comparator must define a total ordering on T.
// Create a sub-range including values greater than the split_point. If split_point is after
// the end, returns stdx::nullopt.
template<typename Comparator>
nonwrapping_range<T> split_after(const T& split_point, Comparator&& cmp) const {
assert(contains(split_point, std::forward<Comparator>(cmp)));
return nonwrapping_range(bound(split_point, false), end());
stdx::optional<nonwrapping_range> split_after(const T& split_point, Comparator&& cmp) const {
if (end() && cmp(split_point, end()->value()) >= 0) {
return stdx::nullopt;
} else if (start() && cmp(split_point, start()->value()) < 0) {
return *this;
} else {
return nonwrapping_range(range_bound<T>(split_point, false), end());
}
}
// Transforms this range into a new range of a different value type
// Supplied transformer should transform value of type T (the old type) into value of type U (the new type).

View File

@@ -439,9 +439,14 @@ class range_populating_reader final : public mutation_reader::impl {
if (_populate_phase != _cache._populate_phaser.phase()) {
assert(_last_key);
auto cmp = dht::ring_position_comparator(*_schema);
_range = _range.split_after(*_last_key, cmp);
auto&& new_range = _range.split_after(*_last_key, cmp);
_populate_phase = _cache._populate_phaser.phase();
_reader = _underlying(_cache._schema, _range, query::full_slice, _pc, _trace_state);
if (new_range) {
_range = std::move(new_range).value();
_reader = _underlying(_cache._schema, _range, query::full_slice, _pc, _trace_state);
} else {
_reader = make_empty_reader();
}
}
}
@@ -614,11 +619,13 @@ class scanning_and_populating_reader final : public mutation_reader::impl{
}
future<streamed_mutation_opt> switch_to_secondary_only() {
if (_last_key_from_primary.value && !_last_key_from_primary.outside_the_range) {
_range = _range.split_after(*_last_key_from_primary.value, dht::ring_position_comparator(*_schema));
}
if (_range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
switch_to_end();
return make_ready_future<streamed_mutation_opt>();
auto&& new_range = _range.split_after(*_last_key_from_primary.value, dht::ring_position_comparator(*_schema));
if (!new_range) {
switch_to_end();
return make_ready_future<streamed_mutation_opt>();
} else {
_range = std::move(new_range).value();
}
}
mutation_reader secondary = make_mutation_reader<range_populating_reader>(_cache, _schema, _range, _slice, _pc,
_trace_state, _cache._underlying, _last_key_from_primary.value, _last_key_from_primary_populate_phase,
@@ -647,11 +654,13 @@ class scanning_and_populating_reader final : public mutation_reader::impl{
future<streamed_mutation_opt> switch_to_secondary_or_primary(just_cache_scanning_reader::cache_data&& data) {
assert(data.mut);
if (_last_key_from_primary.value && !_last_key_from_primary.outside_the_range) {
_range = _range.split_after(*_last_key_from_primary.value, dht::ring_position_comparator(*_schema));
}
if (_range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
switch_to_end();
return make_ready_future<streamed_mutation_opt>();
auto&& new_range = _range.split_after(*_last_key_from_primary.value, dht::ring_position_comparator(*_schema));
if (!new_range) {
switch_to_end();
return make_ready_future<streamed_mutation_opt>();
} else {
_range = std::move(new_range).value();
}
}
query::partition_range range(_range.start(),
std::move(query::partition_range::bound(data.mut->decorated_key(), false)));

View File

@@ -33,6 +33,16 @@
#include "disk-error-handler.hh"
#include "locator/token_metadata.hh"
// yuck, but what can one do? needed for BOOST_REQUIRE_EQUAL
namespace std {
ostream&
operator<<(ostream& os, const stdx::nullopt_t&) {
return os << "{}";
}
}
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;
@@ -505,3 +515,63 @@ BOOST_AUTO_TEST_CASE(test_range_interval_map) {
BOOST_REQUIRE(search_item("8") == true);
BOOST_REQUIRE(search_item("9") == false);
}
BOOST_AUTO_TEST_CASE(test_split_after) {
using b = range_bound<unsigned>;
using wr = wrapping_range<unsigned>;
using nwr = nonwrapping_range<unsigned>;
auto cmp = unsigned_comparator();
auto nwr1 = nwr(b(5), b(8));
BOOST_REQUIRE_EQUAL(nwr1.split_after(2, cmp), nwr1);
BOOST_REQUIRE_EQUAL(nwr1.split_after(5, cmp), nwr(b(5, false), b(8)));
BOOST_REQUIRE_EQUAL(nwr1.split_after(6, cmp), nwr(b(6, false), b(8)));
BOOST_REQUIRE_EQUAL(nwr1.split_after(8, cmp), stdx::nullopt);
BOOST_REQUIRE_EQUAL(nwr1.split_after(9, cmp), stdx::nullopt);
auto nwr2 = nwr(b(5, false), b(8, false));
BOOST_REQUIRE_EQUAL(nwr2.split_after(2, cmp), nwr2);
BOOST_REQUIRE_EQUAL(nwr2.split_after(5, cmp), nwr(b(5, false), b(8, false)));
BOOST_REQUIRE_EQUAL(nwr2.split_after(6, cmp), nwr(b(6, false), b(8, false)));
BOOST_REQUIRE_EQUAL(nwr2.split_after(8, cmp), stdx::nullopt);
BOOST_REQUIRE_EQUAL(nwr2.split_after(9, cmp), stdx::nullopt);
auto nwr3 = nwr(b(5, false), stdx::nullopt);
BOOST_REQUIRE_EQUAL(nwr3.split_after(2, cmp), nwr3);
BOOST_REQUIRE_EQUAL(nwr3.split_after(5, cmp), nwr3);
BOOST_REQUIRE_EQUAL(nwr3.split_after(6, cmp), nwr(b(6, false), stdx::nullopt));
auto nwr4 = nwr(stdx::nullopt, b(5, false));
BOOST_REQUIRE_EQUAL(nwr4.split_after(2, cmp), nwr(b(2, false), b(5, false)));
BOOST_REQUIRE_EQUAL(nwr4.split_after(5, cmp), stdx::nullopt);
BOOST_REQUIRE_EQUAL(nwr4.split_after(6, cmp), stdx::nullopt);
auto nwr5 = nwr(stdx::nullopt, stdx::nullopt);
BOOST_REQUIRE_EQUAL(nwr5.split_after(2, cmp), nwr(b(2, false), stdx::nullopt));
auto wr1 = wr(b(5), b(8));
BOOST_REQUIRE_EQUAL(wr1.split_after(2, cmp), wr1);
BOOST_REQUIRE_EQUAL(wr1.split_after(5, cmp), wr(b(5, false), b(8)));
BOOST_REQUIRE_EQUAL(wr1.split_after(6, cmp), wr(b(6, false), b(8)));
BOOST_REQUIRE_EQUAL(wr1.split_after(8, cmp), stdx::nullopt);
BOOST_REQUIRE_EQUAL(wr1.split_after(9, cmp), stdx::nullopt);
auto wr2 = wr(b(5, false), b(8, false));
BOOST_REQUIRE_EQUAL(wr2.split_after(2, cmp), wr2);
BOOST_REQUIRE_EQUAL(wr2.split_after(5, cmp), wr(b(5, false), b(8, false)));
BOOST_REQUIRE_EQUAL(wr2.split_after(6, cmp), wr(b(6, false), b(8, false)));
BOOST_REQUIRE_EQUAL(wr2.split_after(8, cmp), stdx::nullopt);
BOOST_REQUIRE_EQUAL(wr2.split_after(9, cmp), stdx::nullopt);
auto wr3 = wr(b(5, false), stdx::nullopt);
BOOST_REQUIRE_EQUAL(wr3.split_after(2, cmp), wr3);
BOOST_REQUIRE_EQUAL(wr3.split_after(5, cmp), wr3);
BOOST_REQUIRE_EQUAL(wr3.split_after(6, cmp), wr(b(6, false), stdx::nullopt));
auto wr4 = wr(stdx::nullopt, b(5, false));
BOOST_REQUIRE_EQUAL(wr4.split_after(2, cmp), wr(b(2, false), b(5, false)));
BOOST_REQUIRE_EQUAL(wr4.split_after(5, cmp), stdx::nullopt);
BOOST_REQUIRE_EQUAL(wr4.split_after(6, cmp), stdx::nullopt);
auto wr5 = wr(stdx::nullopt, stdx::nullopt);
BOOST_REQUIRE_EQUAL(wr5.split_after(2, cmp), wr(b(2, false), stdx::nullopt));
auto wr6 = wr(b(8), b(5));
BOOST_REQUIRE_EQUAL(wr6.split_after(2, cmp), wr(b(2, false), b(5)));
BOOST_REQUIRE_EQUAL(wr6.split_after(5, cmp), stdx::nullopt);
BOOST_REQUIRE_EQUAL(wr6.split_after(6, cmp), stdx::nullopt);
BOOST_REQUIRE_EQUAL(wr6.split_after(8, cmp), wr(b(8, false), b(5)));
BOOST_REQUIRE_EQUAL(wr6.split_after(9, cmp), wr(b(9, false), b(5)));
}