diff --git a/configure.py b/configure.py
index baad29aed6..f160a1faf4 100755
--- a/configure.py
+++ b/configure.py
@@ -390,6 +390,7 @@ scylla_tests = set([
'test/boost/vint_serialization_test',
'test/boost/virtual_reader_test',
'test/boost/bptree_test',
+ 'test/boost/double_decker_test',
'test/manual/ec2_snitch_test',
'test/manual/gce_snitch_test',
'test/manual/gossip',
diff --git a/test/boost/double_decker_test.cc b/test/boost/double_decker_test.cc
new file mode 100644
index 0000000000..0f18c69dad
--- /dev/null
+++ b/test/boost/double_decker_test.cc
@@ -0,0 +1,397 @@
+
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#define BOOST_TEST_MODULE double_decker
+
+#include
+#include
+#include
+#include
+
+#include "utils/double-decker.hh"
+#include "test/lib/random_utils.hh"
+
+class compound_key {
+public:
+ int key;
+ std::string sub_key;
+
+ compound_key(int k, std::string sk) noexcept : key(k), sub_key(sk) {}
+
+ compound_key(const compound_key& other) = delete;
+ compound_key(compound_key&& other) noexcept : key(other.key), sub_key(std::move(other.sub_key)) {}
+
+ compound_key& operator=(const compound_key& other) = delete;
+ compound_key& operator=(compound_key&& other) noexcept {
+ key = other.key;
+ sub_key = std::move(other.sub_key);
+ return *this;
+ }
+
+ std::string format() const {
+ return seastar::format("{}.{}", key, sub_key);
+ }
+
+ bool operator==(const compound_key& other) const {
+ return key == other.key && sub_key == other.sub_key;
+ }
+
+ bool operator!=(const compound_key& other) const { return !(*this == other); }
+
+ struct compare {
+ int operator()(const int& a, const int& b) const { return a - b; }
+ int operator()(const int& a, const compound_key& b) const { return a - b.key; }
+ int operator()(const compound_key& a, const int& b) const { return a.key - b; }
+
+ int operator()(const compound_key& a, const compound_key& b) const {
+ if (a.key != b.key) {
+ return this->operator()(a.key, b.key);
+ } else {
+ return a.sub_key.compare(b.sub_key);
+ }
+ }
+ };
+
+ struct less_compare {
+ compare cmp;
+
+ template
+ bool operator()(const A& a, const B& b) const noexcept {
+ return cmp(a, b) < 0;
+ }
+ };
+};
+
+class test_data {
+ compound_key _key;
+ bool _head = false;
+ bool _tail = false;
+ bool _train = false;
+
+ int *_cookie;
+ int *_cookie2;
+public:
+ bool is_head() const noexcept { return _head; }
+ bool is_tail() const noexcept { return _tail; }
+ bool with_train() const noexcept { return _train; }
+ void set_head(bool v) noexcept { _head = v; }
+ void set_tail(bool v) noexcept { _tail = v; }
+ void set_train(bool v) noexcept { _train = v; }
+
+ test_data(int key, std::string sub) : _key(key, sub), _cookie(new int(0)), _cookie2(new int(0)) {}
+
+ test_data(const test_data& other) = delete;
+ test_data(test_data&& other) noexcept : _key(std::move(other._key)),
+ _head(other._head), _tail(other._tail), _train(other._train),
+ _cookie(other._cookie), _cookie2(new int(0)) {
+ other._cookie = nullptr;
+ }
+
+ ~test_data() {
+ if (_cookie != nullptr) {
+ delete _cookie;
+ }
+ delete _cookie2;
+ }
+
+ bool operator==(const compound_key& k) { return _key == k; }
+
+ test_data& operator=(const test_data& other) = delete;
+ test_data& operator=(test_data&& other) = delete;
+
+ std::string format() const { return _key.format(); }
+
+ struct compare {
+ compound_key::compare kcmp;
+ int operator()(const int& a, const int& b) { return kcmp(a, b); }
+ int operator()(const compound_key& a, const int& b) { return kcmp(a.key, b); }
+ int operator()(const int& a, const compound_key& b) { return kcmp(a, b.key); }
+ int operator()(const compound_key& a, const compound_key& b) { return kcmp(a, b); }
+ int operator()(const compound_key& a, const test_data& b) { return kcmp(a, b._key); }
+ int operator()(const test_data& a, const compound_key& b) { return kcmp(a._key, b); }
+ int operator()(const test_data& a, const test_data& b) { return kcmp(a._key, b._key); }
+ };
+};
+
+using collection = double_decker;
+using oracle = std::set;
+
+BOOST_AUTO_TEST_CASE(test_lower_bound) {
+ collection c(compound_key::less_compare{});
+ test_data::compare cmp;
+
+ c.insert(3, test_data(3, "e"), cmp);
+ c.insert(5, test_data(5, "i"), cmp);
+ c.insert(5, test_data(5, "o"), cmp);
+
+ collection::bound_hint h;
+
+ BOOST_REQUIRE(*c.lower_bound(compound_key(2, "a"), cmp, h) == compound_key(3, "e") && !h.key_match);
+ BOOST_REQUIRE(*c.lower_bound(compound_key(3, "a"), cmp, h) == compound_key(3, "e") && h.key_match && !h.key_tail && !h.match);
+ BOOST_REQUIRE(*c.lower_bound(compound_key(3, "e"), cmp, h) == compound_key(3, "e") && h.key_match && !h.key_tail && h.match);
+ BOOST_REQUIRE(*c.lower_bound(compound_key(3, "o"), cmp, h) == compound_key(5, "i") && h.key_match && h.key_tail && !h.match);
+ BOOST_REQUIRE(*c.lower_bound(compound_key(4, "i"), cmp, h) == compound_key(5, "i") && !h.key_match);
+ BOOST_REQUIRE(*c.lower_bound(compound_key(5, "a"), cmp, h) == compound_key(5, "i") && h.key_match && !h.key_tail && !h.match);
+ BOOST_REQUIRE(*c.lower_bound(compound_key(5, "i"), cmp, h) == compound_key(5, "i") && h.key_match && !h.key_tail && h.match);
+ BOOST_REQUIRE(*c.lower_bound(compound_key(5, "l"), cmp, h) == compound_key(5, "o") && h.key_match && !h.key_tail && !h.match);
+ BOOST_REQUIRE(*c.lower_bound(compound_key(5, "o"), cmp, h) == compound_key(5, "o") && h.key_match && !h.key_tail && h.match);
+ BOOST_REQUIRE(c.lower_bound(compound_key(5, "q"), cmp, h) == c.end() && h.key_match && h.key_tail);
+ BOOST_REQUIRE(c.lower_bound(compound_key(6, "q"), cmp, h) == c.end() && !h.key_match);
+
+ c.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_upper_bound) {
+ collection c(compound_key::less_compare{});
+ test_data::compare cmp;
+
+ c.insert(3, test_data(3, "e"), cmp);
+ c.insert(5, test_data(5, "i"), cmp);
+ c.insert(5, test_data(5, "o"), cmp);
+
+ BOOST_REQUIRE(*c.upper_bound(compound_key(2, "a"), cmp) == compound_key(3, "e"));
+ BOOST_REQUIRE(*c.upper_bound(compound_key(3, "a"), cmp) == compound_key(3, "e"));
+ BOOST_REQUIRE(*c.upper_bound(compound_key(3, "e"), cmp) == compound_key(5, "i"));
+ BOOST_REQUIRE(*c.upper_bound(compound_key(3, "o"), cmp) == compound_key(5, "i"));
+ BOOST_REQUIRE(*c.upper_bound(compound_key(4, "i"), cmp) == compound_key(5, "i"));
+ BOOST_REQUIRE(*c.upper_bound(compound_key(5, "a"), cmp) == compound_key(5, "i"));
+ BOOST_REQUIRE(*c.upper_bound(compound_key(5, "i"), cmp) == compound_key(5, "o"));
+ BOOST_REQUIRE(*c.upper_bound(compound_key(5, "l"), cmp) == compound_key(5, "o"));
+ BOOST_REQUIRE(c.upper_bound(compound_key(5, "o"), cmp) == c.end());
+ BOOST_REQUIRE(c.upper_bound(compound_key(5, "q"), cmp) == c.end());
+ BOOST_REQUIRE(c.upper_bound(compound_key(6, "q"), cmp) == c.end());
+
+ c.clear();
+}
+BOOST_AUTO_TEST_CASE(test_self_iterator) {
+ collection c(compound_key::less_compare{});
+ test_data::compare cmp;
+
+ c.insert(1, std::move(test_data(1, "a")), cmp);
+ c.insert(1, std::move(test_data(1, "b")), cmp);
+ c.insert(2, std::move(test_data(2, "c")), cmp);
+ c.insert(3, std::move(test_data(3, "d")), cmp);
+ c.insert(3, std::move(test_data(3, "e")), cmp);
+
+ auto erase_by_ptr = [&] (int key, std::string sub) {
+ test_data* d = &*c.find(compound_key(key, sub), cmp);
+ collection::iterator di(d);
+ di.erase(compound_key::less_compare{});
+ };
+
+ erase_by_ptr(1, "b");
+ erase_by_ptr(2, "c");
+ erase_by_ptr(3, "d");
+
+ auto i = c.begin();
+ BOOST_REQUIRE(*i++ == compound_key(1, "a"));
+ BOOST_REQUIRE(*i++ == compound_key(3, "e"));
+ BOOST_REQUIRE(i == c.end());
+
+ c.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_end_iterator) {
+ collection c(compound_key::less_compare{});
+ test_data::compare cmp;
+
+ c.insert(1, std::move(test_data(1, "a")), cmp);
+ auto i = std::prev(c.end());
+ BOOST_REQUIRE(*i == compound_key(1, "a"));
+
+ c.clear();
+}
+
+void validate_sorted(collection& c) {
+ auto i = c.begin();
+ if (i == c.end()) {
+ return;
+ }
+
+ while (1) {
+ auto cur = i;
+ i++;
+ if (i == c.end()) {
+ break;
+ }
+ test_data::compare cmp;
+ BOOST_REQUIRE(cmp(*cur, *i) < 0);
+ }
+}
+
+void compare_with_set(collection& c, oracle& s) {
+ test_data::compare cmp;
+ /* All keys must be findable */
+ for (auto i = s.begin(); i != s.end(); i++) {
+ auto j = c.find(*i, cmp);
+ BOOST_REQUIRE(j != c.end() && *j == *i);
+ }
+
+ /* Both iterators must coinside */
+ auto i = c.begin();
+ auto j = s.begin();
+
+ while (i != c.end()) {
+ BOOST_REQUIRE(*i == *j);
+ i++;
+ j++;
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_insert_via_emplace) {
+ collection c(compound_key::less_compare{});
+ test_data::compare cmp;
+ oracle s;
+ int nr = 0;
+
+ while (nr < 4000) {
+ compound_key k(tests::random::get_int(900), tests::random::get_sstring(4));
+
+ collection::bound_hint h;
+ auto i = c.lower_bound(k, cmp, h);
+
+ if (i == c.end() || !h.match) {
+ auto it = c.emplace_before(i, k.key, h, k.key, k.sub_key);
+ BOOST_REQUIRE(*it == k);
+ s.insert(std::move(k));
+ nr++;
+ }
+ }
+
+ compare_with_set(c, s);
+ c.clear();
+}
+
+BOOST_AUTO_TEST_CASE(test_insert_and_erase) {
+ collection c(compound_key::less_compare{});
+ test_data::compare cmp;
+ int nr = 0;
+
+ while (nr < 500) {
+ compound_key k(tests::random::get_int(100), tests::random::get_sstring(3));
+
+ if (c.find(k, cmp) == c.end()) {
+ auto it = c.insert(k.key, std::move(test_data(k.key, k.sub_key)), cmp);
+ BOOST_REQUIRE(*it == k);
+ nr++;
+ }
+ }
+
+ validate_sorted(c);
+
+ while (nr > 0) {
+ int n = tests::random::get_int() % nr;
+
+ auto i = c.begin();
+ while (n > 0) {
+ i++;
+ n--;
+ }
+
+ i.erase(compound_key::less_compare{});
+ nr--;
+
+ validate_sorted(c);
+ }
+}
+
+BOOST_AUTO_TEST_CASE(test_compaction) {
+ logalloc::region reg;
+ with_allocator(reg.allocator(), [&] {
+ collection c(compound_key::less_compare{});
+ test_data::compare cmp;
+ oracle s;
+
+ {
+ logalloc::reclaim_lock rl(reg);
+
+ int nr = 0;
+
+ while (nr < 1500) {
+ compound_key k(tests::random::get_int(400), tests::random::get_sstring(3));
+
+ if (c.find(k, cmp) == c.end()) {
+ auto it = c.insert(k.key, std::move(test_data(k.key, k.sub_key)), cmp);
+ BOOST_REQUIRE(*it == k);
+ s.insert(std::move(k));
+ nr++;
+ }
+ }
+ }
+
+ reg.full_compaction();
+
+ compare_with_set(c, s);
+ c.clear();
+ });
+}
+
+BOOST_AUTO_TEST_CASE(test_range_erase) {
+ std::vector keys;
+ test_data::compare cmp;
+
+ keys.emplace_back(1, "a");
+ keys.emplace_back(1, "b");
+ keys.emplace_back(1, "c");
+ keys.emplace_back(1, "d");
+ keys.emplace_back(2, "a");
+ keys.emplace_back(2, "b");
+ keys.emplace_back(2, "c");
+ keys.emplace_back(2, "d");
+ keys.emplace_back(2, "e");
+ keys.emplace_back(3, "a");
+ keys.emplace_back(3, "b");
+ keys.emplace_back(3, "c");
+
+ for (size_t f = 0; f < keys.size(); f++) {
+ for (size_t t = f; t <= keys.size(); t++) {
+ collection c(compound_key::less_compare{});
+
+ for (auto&& k : keys) {
+ c.insert(k.key, std::move(test_data(k.key, k.sub_key)), cmp);
+ }
+
+ auto iter_at = [&c] (size_t at) -> collection::iterator {
+ auto it = c.begin();
+ for (size_t i = 0; i < at; i++, it++) ;
+ return it;
+ };
+
+ auto n = c.erase(iter_at(f), iter_at(t));
+
+ auto r = c.begin();
+ for (size_t i = 0; i < keys.size(); i++) {
+ if (!(i >= f && i < t)) {
+ if (i == t) {
+ BOOST_REQUIRE(*n == keys[i]);
+ }
+ BOOST_REQUIRE(*(r++) == keys[i]);
+ }
+ }
+ if (t == keys.size()) {
+ BOOST_REQUIRE(n == c.end());
+ }
+ BOOST_REQUIRE(r == c.end());
+ }
+ }
+}
diff --git a/utils/double-decker.hh b/utils/double-decker.hh
new file mode 100644
index 0000000000..5616d44b4d
--- /dev/null
+++ b/utils/double-decker.hh
@@ -0,0 +1,403 @@
+/*
+ * Copyright (C) 2020 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#pragma once
+
+#include
+#include
+#include "utils/bptree.hh"
+#include "utils/intrusive-array.hh"
+#include "utils/collection-concepts.hh"
+#include
+
+/*
+ * The double-decker is the ordered keeper of key:value pairs having
+ * the pairs sorted by both key and value (key first).
+ *
+ * The keys collisions are expected to be rare enough to afford holding
+ * the values in a sorted array with the help of linear algorithms.
+ */
+
+template
+SEASTAR_CONCEPT( requires Comparable && std::is_nothrow_move_constructible_v )
+class double_decker {
+public:
+ using inner_array = intrusive_array;
+ using outer_tree = bplus::tree;
+ using outer_iterator = typename outer_tree::iterator;
+ using outer_const_iterator = typename outer_tree::const_iterator;
+
+private:
+ outer_tree _tree;
+
+public:
+ template
+ class iterator_base {
+ friend class double_decker;
+ using outer_iterator = std::conditional_t;
+
+ protected:
+ outer_iterator _bucket;
+ int _idx;
+
+ public:
+ iterator_base() = default;
+ iterator_base(outer_iterator bkt, int idx) noexcept : _bucket(bkt), _idx(idx) {}
+
+ using iterator_category = std::bidirectional_iterator_tag;
+ using difference_type = ssize_t;
+ using value_type = std::conditional_t;
+ using pointer = value_type*;
+ using reference = value_type&;
+
+ reference operator*() const noexcept { return (*_bucket)[_idx]; }
+ pointer operator->() const noexcept { return &((*_bucket)[_idx]); }
+
+ iterator_base& operator++() noexcept {
+ if ((*_bucket)[_idx++].is_tail()) {
+ _bucket++;
+ _idx = 0;
+ }
+
+ return *this;
+ }
+
+ iterator_base operator++(int) noexcept {
+ iterator_base cur = *this;
+ operator++();
+ return cur;
+ }
+
+ iterator_base& operator--() noexcept {
+ if (_idx-- == 0) {
+ _bucket--;
+ _idx = _bucket->index_of(_bucket->end()) - 1;
+ }
+
+ return *this;
+ }
+
+ iterator_base operator--(int) noexcept {
+ iterator_base cur = *this;
+ operator--();
+ return cur;
+ }
+
+ bool operator==(const iterator_base& o) const noexcept { return _bucket == o._bucket && _idx == o._idx; }
+ bool operator!=(const iterator_base& o) const noexcept { return !(*this == o); }
+ };
+
+ using const_iterator = iterator_base;
+
+ class iterator final : public iterator_base {
+ friend class double_decker;
+ using super = iterator_base;
+
+ iterator(const const_iterator&& other) noexcept : super(std::move(other._bucket), other._idx) {}
+
+ public:
+ iterator() noexcept : super() {}
+ iterator(outer_iterator bkt, int idx) noexcept : super(bkt, idx) {}
+
+ iterator(T* ptr) noexcept {
+ inner_array& arr = inner_array::from_element(ptr, super::_idx);
+ super::_bucket = outer_iterator(&arr);
+ }
+
+ template
+ SEASTAR_CONCEPT(requires Disposer)
+ iterator erase_and_dispose(Less less, Func&& disp) noexcept {
+ disp(&**this); // * to deref this, * to call operator*, & to get addr from ref
+
+ if (super::_bucket->is_single_element()) {
+ outer_iterator bkt = super::_bucket.erase(less);
+ return iterator(bkt, 0);
+ }
+
+ bool tail = (*super::_bucket)[super::_idx].is_tail();
+ super::_bucket->erase(super::_idx);
+ if (tail) {
+ super::_bucket++;
+ super::_idx = 0;
+ }
+
+ return *this;
+ }
+
+ iterator erase(Less less) noexcept { return erase_and_dispose(less, bplus::default_dispose); }
+ };
+
+ /*
+ * Structure that shed some more light on how the lower_bound
+ * actually found the bounding elements.
+ */
+ struct bound_hint {
+ /*
+ * Set to true if the element fully matched to the key
+ * according to Compare
+ */
+ bool match;
+ /*
+ * Set to true if the bucket for the given key exists
+ */
+ bool key_match;
+ /*
+ * Set to true if the given key is more than anything
+ * on the bucket and iterator was switched to the next
+ * one (or when the key_match is false)
+ */
+ bool key_tail;
+
+ /*
+ * This helper says whether the emplace will invalidate (some)
+ * iterators or not. Emplacing with !key_match will go and create
+ * new node in B+ which doesn't invalidate iterators. In another
+ * case some existing B+ data node will be reconstructed, so the
+ * iterators on those nodes will become invalid.
+ */
+ bool emplace_keeps_iterators() const noexcept { return !key_match; }
+ };
+
+ iterator begin() noexcept { return iterator(_tree.begin(), 0); }
+ const_iterator begin() const noexcept { return const_iterator(_tree.begin(), 0); }
+ const_iterator cbegin() const noexcept { return const_iterator(_tree.begin(), 0); }
+
+ iterator end() noexcept { return iterator(_tree.end(), 0); }
+ const_iterator end() const noexcept { return const_iterator(_tree.end(), 0); }
+ const_iterator cend() const noexcept { return const_iterator(_tree.end(), 0); }
+
+ explicit double_decker(Less less) noexcept : _tree(less) { }
+
+ double_decker(const double_decker& other) = delete;
+ double_decker(double_decker&& other) noexcept : _tree(std::move(other._tree)) {}
+
+ iterator insert(Key k, T value, Compare cmp) {
+ std::pair oip = _tree.emplace(std::move(k), std::move(value));
+ outer_iterator& bkt = oip.first;
+ int idx = 0;
+
+ if (!oip.second) {
+ /*
+ * Unlikely, but in this case we reconstruct the array. The value
+ * must not have been moved by emplace() above.
+ */
+ idx = bkt->index_of(bkt->lower_bound(value, cmp));
+ size_t new_size = (bkt->size() + 1) * sizeof(T);
+ bkt.reconstruct(new_size, *bkt,
+ typename inner_array::grow_tag{idx}, std::move(value));
+ }
+
+ return iterator(bkt, idx);
+ }
+
+ template
+ iterator emplace_before(iterator i, Key k, const bound_hint& hint, Args&&... args) {
+ assert(!hint.match);
+ outer_iterator& bucket = i._bucket;
+
+ if (!hint.key_match) {
+ /*
+ * The most expected case -- no key conflict, respectively the
+ * bucket is not found, and i points to the next one. Just go
+ * ahead and emplace the new bucket before the i and push the
+ * 0th element into it.
+ */
+ outer_iterator nb = bucket.emplace_before(std::move(k), _tree.less(), std::forward(args)...);
+ return iterator(nb, 0);
+ }
+
+ /*
+ * Key conflict, need to expand some inner vector, but still there
+ * are two cases -- whether the bounding element is on k's bucket
+ * or the bound search overflew and switched to the next one.
+ */
+
+ int idx = i._idx;
+
+ if (hint.key_tail) {
+ /*
+ * The latter case -- i points to the next one. Need to shift
+ * back and append the new element to its tail.
+ */
+ bucket--;
+ idx = bucket->index_of(bucket->end());
+ }
+
+ size_t new_size = (bucket->size() + 1) * sizeof(T);
+ bucket.reconstruct(new_size, *bucket,
+ typename inner_array::grow_tag{idx}, std::forward(args)...);
+ return iterator(bucket, idx);
+ }
+
+ template
+ SEASTAR_CONCEPT( requires Comparable )
+ const_iterator find(const K& key, Compare cmp) const {
+ outer_const_iterator bkt = _tree.find(key);
+ int idx = 0;
+
+ if (bkt != _tree.end()) {
+ bool match = false;
+ idx = bkt->index_of(bkt->lower_bound(key, cmp, match));
+ if (!match) {
+ bkt = _tree.end();
+ idx = 0;
+ }
+ }
+
+ return const_iterator(bkt, idx);
+ }
+
+ template
+ SEASTAR_CONCEPT( requires Comparable )
+ iterator find(const K& k, Compare cmp) {
+ return iterator(const_cast(this)->find(k, std::move(cmp)));
+ }
+
+ template
+ SEASTAR_CONCEPT( requires Comparable )
+ const_iterator lower_bound(const K& key, Compare cmp, bound_hint& hint) const {
+ outer_const_iterator bkt = _tree.lower_bound(key, hint.key_match);
+
+ hint.key_tail = false;
+ hint.match = false;
+
+ if (bkt == _tree.end() || !hint.key_match) {
+ return const_iterator(bkt, 0);
+ }
+
+ int i = bkt->index_of(bkt->lower_bound(key, cmp, hint.match));
+
+ if (i != 0 && (*bkt)[i - 1].is_tail()) {
+ /*
+ * The lower_bound is after the last element -- shift
+ * to the net bucket's 0'th one.
+ */
+ bkt++;
+ i = 0;
+ hint.key_tail = true;
+ }
+
+ return const_iterator(bkt, i);
+ }
+
+ template
+ SEASTAR_CONCEPT( requires Comparable )
+ iterator lower_bound(const K& key, Compare cmp, bound_hint& hint) {
+ return iterator(const_cast(this)->lower_bound(key, std::move(cmp), hint));
+ }
+
+ template
+ SEASTAR_CONCEPT( requires Comparable )
+ const_iterator lower_bound(const K& key, Compare cmp) const {
+ bound_hint hint;
+ return lower_bound(key, cmp, hint);
+ }
+
+ template
+ SEASTAR_CONCEPT( requires Comparable )
+ iterator lower_bound(const K& key, Compare cmp) {
+ return iterator(const_cast(this)->lower_bound(key, std::move(cmp)));
+ }
+
+ template
+ SEASTAR_CONCEPT( requires Comparable )
+ const_iterator upper_bound(const K& key, Compare cmp) const {
+ bool key_match;
+ outer_const_iterator bkt = _tree.lower_bound(key, key_match);
+
+ if (bkt == _tree.end() || !key_match) {
+ return const_iterator(bkt, 0);
+ }
+
+ int i = bkt->index_of(bkt->upper_bound(key, cmp));
+
+ if (i != 0 && (*bkt)[i - 1].is_tail()) {
+ // Beyond the end() boundary
+ bkt++;
+ i = 0;
+ }
+
+ return const_iterator(bkt, i);
+ }
+
+ template
+ SEASTAR_CONCEPT( requires Comparable )
+ iterator upper_bound(const K& key, Compare cmp) {
+ return iterator(const_cast(this)->upper_bound(key, std::move(cmp)));
+ }
+
+ template
+ SEASTAR_CONCEPT(requires Disposer)
+ void clear_and_dispose(Func&& disp) noexcept {
+ _tree.clear_and_dispose([&disp] (inner_array* arr) noexcept {
+ arr->for_each(disp);
+ });
+ }
+
+ void clear() noexcept { clear_and_dispose(bplus::default_dispose); }
+
+ template
+ SEASTAR_CONCEPT(requires Disposer)
+ iterator erase_and_dispose(iterator begin, iterator end, Func&& disp) noexcept {
+ bool same_bucket = begin._bucket == end._bucket;
+
+ // Drop the tail of the starting bucket if it's not fully erased
+ while (begin._idx != 0) {
+ if (same_bucket) {
+ if (begin == end) {
+ return begin;
+ }
+ end._idx--;
+ }
+
+ begin = begin.erase_and_dispose(_tree.less(), disp);
+ }
+
+ // Drop all the buckets in between
+ outer_iterator nb = _tree.erase_and_dispose(begin._bucket, end._bucket, [&disp] (inner_array* arr) noexcept {
+ arr->for_each(disp);
+ });
+
+ assert(nb == end._bucket);
+
+ /*
+ * Drop the head of the ending bucket. Every erased element is the 0th
+ * one, when erased it will shift the rest left and reconstruct the array,
+ * thus we cannot rely on the end to keep neither _bucket not _idx.
+ *
+ * Said that -- just erase the required number of elements. A corner case
+ * when end points to the tree end is handled, _idx is 0 in this case.
+ */
+ iterator next(nb, 0);
+ while (end._idx-- != 0) {
+ next = next.erase_and_dispose(_tree.less(), disp);
+ }
+
+ return next;
+ }
+
+ iterator erase(iterator begin, iterator end) noexcept {
+ return erase_and_dispose(begin, end, bplus::default_dispose);
+ }
+
+ bool empty() const noexcept { return _tree.empty(); }
+};