diff --git a/range.hh b/range.hh index c25ce40a00..6fc43c6e8b 100644 --- a/range.hh +++ b/range.hh @@ -21,6 +21,7 @@ #pragma once +#include "stdx.hh" #include #include #include @@ -332,12 +333,22 @@ public: wrapping_range right(bound(split_point, false), end()); return std::make_pair(std::move(left), std::move(right)); } - // Create a sub-range including values greater than the split_point. split_point has to be inside the range + // Create a sub-range including values greater than the split_point. Returns stdx::nullopt if + // split_point is after the end (but not included in the range, in case of wraparound ranges) // Comparator must define a total ordering on T. template - wrapping_range split_after(const T& split_point, Comparator&& cmp) const { - assert(contains(split_point, std::forward(cmp))); - return wrapping_range(bound(split_point, false), end()); + stdx::optional> split_after(const T& split_point, Comparator&& cmp) const { + if (contains(split_point, std::forward(cmp)) + && (!end() || cmp(split_point, end()->value()) != 0)) { + return wrapping_range(bound(split_point, false), end()); + } else if (end() && cmp(split_point, end()->value()) >= 0) { + // whether to return stdx::nullopt or the full range is not + // well-defined for wraparound ranges; we return nullopt + // if split_point is after the end. + return stdx::nullopt; + } else { + return *this; + } } // Transforms this range into a new range of a different value type // Supplied transformer should transform value of type T (the old type) into value of type U (the new type). @@ -512,12 +523,17 @@ public: nonwrapping_range right(bound(split_point, false), end()); return std::make_pair(std::move(left), std::move(right)); } - // Create a sub-range including values greater than the split_point. split_point has to be inside the range - // Comparator must define a total ordering on T. + // Create a sub-range including values greater than the split_point. If split_point is after + // the end, returns stdx::nullopt. template - nonwrapping_range split_after(const T& split_point, Comparator&& cmp) const { - assert(contains(split_point, std::forward(cmp))); - return nonwrapping_range(bound(split_point, false), end()); + stdx::optional split_after(const T& split_point, Comparator&& cmp) const { + if (end() && cmp(split_point, end()->value()) >= 0) { + return stdx::nullopt; + } else if (start() && cmp(split_point, start()->value()) < 0) { + return *this; + } else { + return nonwrapping_range(range_bound(split_point, false), end()); + } } // Transforms this range into a new range of a different value type // Supplied transformer should transform value of type T (the old type) into value of type U (the new type). diff --git a/row_cache.cc b/row_cache.cc index cd9d9a64b2..b6df2ab84c 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -439,9 +439,14 @@ class range_populating_reader final : public mutation_reader::impl { if (_populate_phase != _cache._populate_phaser.phase()) { assert(_last_key); auto cmp = dht::ring_position_comparator(*_schema); - _range = _range.split_after(*_last_key, cmp); + auto&& new_range = _range.split_after(*_last_key, cmp); _populate_phase = _cache._populate_phaser.phase(); - _reader = _underlying(_cache._schema, _range, query::full_slice, _pc, _trace_state); + if (new_range) { + _range = std::move(new_range).value(); + _reader = _underlying(_cache._schema, _range, query::full_slice, _pc, _trace_state); + } else { + _reader = make_empty_reader(); + } } } @@ -614,11 +619,13 @@ class scanning_and_populating_reader final : public mutation_reader::impl{ } future switch_to_secondary_only() { if (_last_key_from_primary.value && !_last_key_from_primary.outside_the_range) { - _range = _range.split_after(*_last_key_from_primary.value, dht::ring_position_comparator(*_schema)); - } - if (_range.is_wrap_around(dht::ring_position_comparator(*_schema))) { - switch_to_end(); - return make_ready_future(); + auto&& new_range = _range.split_after(*_last_key_from_primary.value, dht::ring_position_comparator(*_schema)); + if (!new_range) { + switch_to_end(); + return make_ready_future(); + } else { + _range = std::move(new_range).value(); + } } mutation_reader secondary = make_mutation_reader(_cache, _schema, _range, _slice, _pc, _trace_state, _cache._underlying, _last_key_from_primary.value, _last_key_from_primary_populate_phase, @@ -647,11 +654,13 @@ class scanning_and_populating_reader final : public mutation_reader::impl{ future switch_to_secondary_or_primary(just_cache_scanning_reader::cache_data&& data) { assert(data.mut); if (_last_key_from_primary.value && !_last_key_from_primary.outside_the_range) { - _range = _range.split_after(*_last_key_from_primary.value, dht::ring_position_comparator(*_schema)); - } - if (_range.is_wrap_around(dht::ring_position_comparator(*_schema))) { - switch_to_end(); - return make_ready_future(); + auto&& new_range = _range.split_after(*_last_key_from_primary.value, dht::ring_position_comparator(*_schema)); + if (!new_range) { + switch_to_end(); + return make_ready_future(); + } else { + _range = std::move(new_range).value(); + } } query::partition_range range(_range.start(), std::move(query::partition_range::bound(data.mut->decorated_key(), false))); diff --git a/tests/range_test.cc b/tests/range_test.cc index 57ded479e0..7361450420 100644 --- a/tests/range_test.cc +++ b/tests/range_test.cc @@ -33,6 +33,16 @@ #include "disk-error-handler.hh" #include "locator/token_metadata.hh" +// yuck, but what can one do? needed for BOOST_REQUIRE_EQUAL +namespace std { + +ostream& +operator<<(ostream& os, const stdx::nullopt_t&) { + return os << "{}"; +} + +} + thread_local disk_error_signal_type commit_error; thread_local disk_error_signal_type general_disk_error; @@ -505,3 +515,63 @@ BOOST_AUTO_TEST_CASE(test_range_interval_map) { BOOST_REQUIRE(search_item("8") == true); BOOST_REQUIRE(search_item("9") == false); } + +BOOST_AUTO_TEST_CASE(test_split_after) { + using b = range_bound; + using wr = wrapping_range; + using nwr = nonwrapping_range; + auto cmp = unsigned_comparator(); + + auto nwr1 = nwr(b(5), b(8)); + BOOST_REQUIRE_EQUAL(nwr1.split_after(2, cmp), nwr1); + BOOST_REQUIRE_EQUAL(nwr1.split_after(5, cmp), nwr(b(5, false), b(8))); + BOOST_REQUIRE_EQUAL(nwr1.split_after(6, cmp), nwr(b(6, false), b(8))); + BOOST_REQUIRE_EQUAL(nwr1.split_after(8, cmp), stdx::nullopt); + BOOST_REQUIRE_EQUAL(nwr1.split_after(9, cmp), stdx::nullopt); + auto nwr2 = nwr(b(5, false), b(8, false)); + BOOST_REQUIRE_EQUAL(nwr2.split_after(2, cmp), nwr2); + BOOST_REQUIRE_EQUAL(nwr2.split_after(5, cmp), nwr(b(5, false), b(8, false))); + BOOST_REQUIRE_EQUAL(nwr2.split_after(6, cmp), nwr(b(6, false), b(8, false))); + BOOST_REQUIRE_EQUAL(nwr2.split_after(8, cmp), stdx::nullopt); + BOOST_REQUIRE_EQUAL(nwr2.split_after(9, cmp), stdx::nullopt); + auto nwr3 = nwr(b(5, false), stdx::nullopt); + BOOST_REQUIRE_EQUAL(nwr3.split_after(2, cmp), nwr3); + BOOST_REQUIRE_EQUAL(nwr3.split_after(5, cmp), nwr3); + BOOST_REQUIRE_EQUAL(nwr3.split_after(6, cmp), nwr(b(6, false), stdx::nullopt)); + auto nwr4 = nwr(stdx::nullopt, b(5, false)); + BOOST_REQUIRE_EQUAL(nwr4.split_after(2, cmp), nwr(b(2, false), b(5, false))); + BOOST_REQUIRE_EQUAL(nwr4.split_after(5, cmp), stdx::nullopt); + BOOST_REQUIRE_EQUAL(nwr4.split_after(6, cmp), stdx::nullopt); + auto nwr5 = nwr(stdx::nullopt, stdx::nullopt); + BOOST_REQUIRE_EQUAL(nwr5.split_after(2, cmp), nwr(b(2, false), stdx::nullopt)); + + auto wr1 = wr(b(5), b(8)); + BOOST_REQUIRE_EQUAL(wr1.split_after(2, cmp), wr1); + BOOST_REQUIRE_EQUAL(wr1.split_after(5, cmp), wr(b(5, false), b(8))); + BOOST_REQUIRE_EQUAL(wr1.split_after(6, cmp), wr(b(6, false), b(8))); + BOOST_REQUIRE_EQUAL(wr1.split_after(8, cmp), stdx::nullopt); + BOOST_REQUIRE_EQUAL(wr1.split_after(9, cmp), stdx::nullopt); + auto wr2 = wr(b(5, false), b(8, false)); + BOOST_REQUIRE_EQUAL(wr2.split_after(2, cmp), wr2); + BOOST_REQUIRE_EQUAL(wr2.split_after(5, cmp), wr(b(5, false), b(8, false))); + BOOST_REQUIRE_EQUAL(wr2.split_after(6, cmp), wr(b(6, false), b(8, false))); + BOOST_REQUIRE_EQUAL(wr2.split_after(8, cmp), stdx::nullopt); + BOOST_REQUIRE_EQUAL(wr2.split_after(9, cmp), stdx::nullopt); + auto wr3 = wr(b(5, false), stdx::nullopt); + BOOST_REQUIRE_EQUAL(wr3.split_after(2, cmp), wr3); + BOOST_REQUIRE_EQUAL(wr3.split_after(5, cmp), wr3); + BOOST_REQUIRE_EQUAL(wr3.split_after(6, cmp), wr(b(6, false), stdx::nullopt)); + auto wr4 = wr(stdx::nullopt, b(5, false)); + BOOST_REQUIRE_EQUAL(wr4.split_after(2, cmp), wr(b(2, false), b(5, false))); + BOOST_REQUIRE_EQUAL(wr4.split_after(5, cmp), stdx::nullopt); + BOOST_REQUIRE_EQUAL(wr4.split_after(6, cmp), stdx::nullopt); + auto wr5 = wr(stdx::nullopt, stdx::nullopt); + BOOST_REQUIRE_EQUAL(wr5.split_after(2, cmp), wr(b(2, false), stdx::nullopt)); + auto wr6 = wr(b(8), b(5)); + BOOST_REQUIRE_EQUAL(wr6.split_after(2, cmp), wr(b(2, false), b(5))); + BOOST_REQUIRE_EQUAL(wr6.split_after(5, cmp), stdx::nullopt); + BOOST_REQUIRE_EQUAL(wr6.split_after(6, cmp), stdx::nullopt); + BOOST_REQUIRE_EQUAL(wr6.split_after(8, cmp), wr(b(8, false), b(5))); + BOOST_REQUIRE_EQUAL(wr6.split_after(9, cmp), wr(b(9, false), b(5))); + +}