diff --git a/sstables/index_entry.hh b/sstables/index_entry.hh index 345410ac9c..ee9842dc27 100644 --- a/sstables/index_entry.hh +++ b/sstables/index_entry.hh @@ -484,60 +484,99 @@ public: void switch_to_consume_until_mode(position_in_partition_view pos) { _pos = pos; _mode = consuming_mode::consume_until; } promoted_index_blocks& get_pi_blocks() { return _pi_blocks; }; - // This constructor is used for ka/la format which does not have information about columns fixed lengths - promoted_index_blocks_reader(reader_permit permit, input_stream&& promoted_index_stream, uint32_t num_blocks, - const schema& s, uint64_t start, uint64_t maxlen) - : continuous_data_consumer(std::move(permit), std::move(promoted_index_stream), start, maxlen) - , _total_num_blocks(num_blocks) - , _num_blocks_left(num_blocks) - , _s(s) - {} - - // This constructor is used for mc format which requires information about columns fixed lengths for parsing + // For the mc format clustering_values_fixed_lengths must be engaged. When not engaged ka/la is assumed. promoted_index_blocks_reader(reader_permit permit, input_stream&& promoted_index_stream, uint32_t num_blocks, const schema& s, uint64_t start, uint64_t maxlen, - column_values_fixed_lengths&& clustering_values_fixed_lengths) - : continuous_data_consumer(std::move(permit), std::move(promoted_index_stream), start, maxlen) + std::optional clustering_values_fixed_lengths) + : continuous_data_consumer(permit, std::move(promoted_index_stream), start, maxlen) , _total_num_blocks{num_blocks} , _num_blocks_left{num_blocks} , _s{s} - , _ctx{m_parser_context{std::move(clustering_values_fixed_lengths)}} - {} + { + if (clustering_values_fixed_lengths) { + _ctx.emplace(m_parser_context{std::move(*clustering_values_fixed_lengths)}); + } + } +}; + +// Cursor over the index for clustered elements of a single partition. +// +// The user is expected to call advance_to() for monotonically increasing positions +// in order to check if the index has information about more precise location +// of the fragments relevant for the range starting at given position. +// +// The user must serialize all async methods. The next call may start only when the future +// returned by the previous one has resolved. +// +// The user must call close() and wait for it to resolve before destroying. +// +class clustered_index_cursor { +public: + // Position of indexed elements in the data file realative to the start of the partition. + using offset_in_partition = uint64_t; + + struct skip_info { + offset_in_partition offset; + tombstone active_tombstone; + position_in_partition active_tombstone_pos; + }; + + struct entry_info { + promoted_index_block_position start; + promoted_index_block_position end; + offset_in_partition offset; + }; + + virtual ~clustered_index_cursor() {}; + virtual future<> close() = 0; + + // Advances the cursor to given position. When the cursor has more accurate information about + // location of the fragments from the range [pos, +inf) in the data file (since it was last advanced) + // it resolves with an engaged optional containing skip_info. + // + // The index may not be precise, so fragments from the range [pos, +inf) may be located after the + // position indicated by skip_info. It is guaranteed that no such fragments are located before the returned position. + // + // Offsets returned in skip_info are monotonically increasing. + // + // Must be called for non-decreasing positions. + // The caller must ensure that pos remains valid until the future resolves. + virtual future> advance_to(position_in_partition_view pos) = 0; + + // Determines the data file offset relative to the start of the partition such that fragments + // from the range (-inf, pos] are located before that offset. + // + // If such offset cannot be determined in a cheap way, returns a disengaged optional. + // + // Does not advance the cursor. + // + // The caller must ensure that pos remains valid until the future resolves. + virtual future> probe_upper_bound(position_in_partition_view pos) = 0; + + // Returns skip information about the next position after the cursor + // or nullopt if there is no information about further positions. + // + // When entry_info is returned, the cursor was advanced to entry_info::start. + virtual future> next_entry() = 0; }; class promoted_index { deletion_time _del_time; uint32_t _promoted_index_size; - promoted_index_blocks_reader _reader; + std::unique_ptr _cursor; bool _reader_closed = false; - public: - promoted_index(const schema& s, reader_permit permit, deletion_time del_time, input_stream&& promoted_index_stream, - uint32_t promoted_index_size, uint32_t blocks_count) + promoted_index(const schema& s, deletion_time del_time, uint32_t promoted_index_size, std::unique_ptr index) : _del_time{del_time} , _promoted_index_size(promoted_index_size) - , _reader{std::move(permit), std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size} - {} - - promoted_index(const schema& s, reader_permit permit, deletion_time del_time, input_stream&& promoted_index_stream, - uint32_t promoted_index_size, uint32_t blocks_count, column_values_fixed_lengths clustering_values_fixed_lengths) - : _del_time{del_time} - , _promoted_index_size(promoted_index_size) - , _reader{std::move(permit), std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size, std::move(clustering_values_fixed_lengths)} - {} + , _cursor(std::move(index)) + { } [[nodiscard]] deletion_time get_deletion_time() const { return _del_time; } [[nodiscard]] uint32_t get_promoted_index_size() const { return _promoted_index_size; } - [[nodiscard]] promoted_index_blocks_reader& get_reader() { return _reader; }; - [[nodiscard]] const promoted_index_blocks_reader& get_reader() const { return _reader; }; - future<> close_reader() { - if (!_reader_closed) { - _reader_closed = true; - return _reader.close(); - } - - return make_ready_future<>(); - } + [[nodiscard]] clustered_index_cursor& cursor() { return *_cursor; }; + [[nodiscard]] const clustered_index_cursor& cursor() const { return *_cursor; }; + future<> close_reader() { return _cursor->close(); } }; class index_entry { @@ -575,8 +614,6 @@ public: return {}; } - uint32_t get_promoted_index_size() const { return _index ? _index->get_promoted_index_size() : 0; } - index_entry(const schema& s, temporary_buffer&& key, uint64_t position, std::unique_ptr&& index) : _s(std::cref(s)) , _key(std::move(key)) @@ -587,45 +624,11 @@ public: index_entry(index_entry&&) = default; index_entry& operator=(index_entry&&) = default; - // Reads promoted index blocks from the stream until it finds the upper bound - // for a given position. - // Returns the index of the element right before the upper bound one. - future get_pi_blocks_until(position_in_partition_view pos) { - if (!_index) { - return make_ready_future(0); - } + // Can be nullptr + const std::unique_ptr& get_promoted_index() const { return _index; } + std::unique_ptr& get_promoted_index() { return _index; } + uint32_t get_promoted_index_size() const { return _index ? _index->get_promoted_index_size() : 0; } - auto& reader = _index->get_reader(); - reader.switch_to_consume_until_mode(pos); - promoted_index_blocks& blocks = reader.get_pi_blocks(); - erase_all_but_last_two(blocks); - return reader.consume_input().then([this, &reader] { - return reader.get_current_pi_index(); - }); - } - - // Unconditionally reads the promoted index blocks from the next data buffer - future<> get_next_pi_blocks() { - if (!_index) { - return make_ready_future<>(); - } - - auto& reader = _index->get_reader(); - promoted_index_blocks& blocks = reader.get_pi_blocks(); - blocks = promoted_index_blocks{}; - reader.switch_to_consume_next_mode(); - return reader.consume_input(); - } - - [[nodiscard]] uint32_t get_total_pi_blocks_count() const { - return _index ? _index->get_reader().get_total_num_blocks() : 0; - } - [[nodiscard]] uint32_t get_read_pi_blocks_count() const { - return _index ? _index->get_reader().get_read_num_blocks() : 0; - } - [[nodiscard]] promoted_index_blocks* get_pi_blocks() { - return _index ? &_index->get_reader().get_pi_blocks() : nullptr; - } future<> close_pi_stream() { if (_index) { return _index->close_reader(); diff --git a/sstables/index_reader.hh b/sstables/index_reader.hh index 85ded1893a..51770696cb 100644 --- a/sstables/index_reader.hh +++ b/sstables/index_reader.hh @@ -28,6 +28,7 @@ #include "utils/buffer_input_stream.hh" #include "sstables/prepended_input_stream.hh" #include "tracing/traced_file.hh" +#include "sstables/scanning_clustered_index_cursor.hh" namespace sstables { @@ -222,7 +223,7 @@ public: _num_pi_blocks = get_uint32(); } auto data_size = data.size(); - std::unique_ptr index; + std::unique_ptr pi; if ((_trust_pi == trust_promoted_index::yes) && (promoted_index_size > 0)) { input_stream promoted_index_stream = [&] { if (promoted_index_size <= data_size) { @@ -234,18 +235,13 @@ public: make_file_input_stream(_index_file, this->position(), promoted_index_size - data_size, _options).detach()); } }(); - if (is_mc_format()) { - index = std::make_unique(_s, continuous_data_consumer::_permit, *_deletion_time, std::move(promoted_index_stream), - promoted_index_size, - _num_pi_blocks, *_ck_values_fixed_lengths); - } else { - index = std::make_unique(_s, continuous_data_consumer::_permit, *_deletion_time, std::move(promoted_index_stream), - promoted_index_size, _num_pi_blocks); - } + auto index = std::make_unique(_s, continuous_data_consumer::_permit, + std::move(promoted_index_stream), promoted_index_size, _num_pi_blocks, _ck_values_fixed_lengths); + pi = std::make_unique(_s, *_deletion_time, promoted_index_size, std::move(index)); } else { _num_pi_blocks = 0; } - _consumer.consume_entry(index_entry{_s, std::move(_key), _position, std::move(index)}, _entry_offset); + _consumer.consume_entry(index_entry{_s, std::move(_key), _position, std::move(pi)}, _entry_offset); _deletion_time = std::nullopt; _num_pi_blocks = 0; _state = state::START; @@ -615,35 +611,22 @@ private: } index_entry& e = current_partition_entry(*_upper_bound); - if (e.get_total_pi_blocks_count() == 0) { + + if (!e.get_promoted_index()) { sstlog.trace("index {}: no promoted index", this); return advance_to_next_partition(*_upper_bound); } - if (e.get_read_pi_blocks_count() == 0) { - return e.get_next_pi_blocks().then([this, pos] { - return advance_upper_past(pos); - }); - } - - const schema& s = *_sstable->_schema; - auto cmp_with_start = [pos_cmp = promoted_index_block_compare(s), &s] - (position_in_partition_view pos, const promoted_index_block& info) -> bool { - return pos_cmp(pos, info.start(s)); - }; - promoted_index_blocks* pi_blocks = e.get_pi_blocks(); - assert(pi_blocks); - auto i = std::upper_bound(pi_blocks->begin() + _upper_bound->current_pi_idx, pi_blocks->end(), pos, cmp_with_start); - _upper_bound->current_pi_idx = std::distance(pi_blocks->begin(), i); - if (i == pi_blocks->end()) { - return advance_to_next_partition(*_upper_bound); - } - - _upper_bound->data_file_position = e.position() + i->offset(); - _upper_bound->element = indexable_element::cell; - sstlog.trace("index {} upper bound: skipped to cell, _current_pi_idx={}, _data_file_position={}", - this, _upper_bound->current_pi_idx, _upper_bound->data_file_position); - return make_ready_future<>(); + promoted_index& pi = *e.get_promoted_index(); + return pi.cursor().probe_upper_bound(pos).then([this, &e] (std::optional off) { + if (!off) { + return advance_to_next_partition(*_upper_bound); + } + _upper_bound->data_file_position = e.position() + *off; + _upper_bound->element = indexable_element::cell; + sstlog.trace("index {} upper bound: skipped to cell, _data_file_position={}", this, _upper_bound->data_file_position); + return make_ready_future<>(); + }); } // Returns position right after all partitions in the sstable @@ -651,25 +634,6 @@ private: return _sstable->data_size(); } - void get_info_from_promoted_block(const promoted_index_blocks::const_iterator iter, - const promoted_index_blocks& pi_blocks) { - const index_entry& e = current_partition_entry(); - _lower_bound.data_file_position = e.position() + iter->offset(); - _lower_bound.element = indexable_element::cell; - if (iter == pi_blocks.cbegin() || !std::prev(iter)->end_open_marker()) { - _lower_bound.end_open_marker.reset(); - } else { - auto prev = std::prev(iter); - // End open marker can be only engaged in SSTables 3.x ('mc' format) and never in ka/la - auto end_pos = prev->end(*_sstable->get_schema()); - position_in_partition_view* open_rt_pos = std::get_if(&end_pos); - assert(open_rt_pos); - _lower_bound.end_open_marker = open_rt_marker{ - position_in_partition{*open_rt_pos}, - tombstone(*prev->end_open_marker())}; - } - } - public: index_reader(shared_sstable sst, reader_permit permit, const io_priority_class& pc, tracing::trace_state_ptr trace_state) : _sstable(std::move(sst)) @@ -749,52 +713,26 @@ public: } index_entry& e = current_partition_entry(); - if (e.get_total_pi_blocks_count() == 0) { + if (!e.get_promoted_index()) { sstlog.trace("index {}: no promoted index", this); return make_ready_future<>(); } - const promoted_index_blocks* pi_blocks = e.get_pi_blocks(); - assert(pi_blocks); - - if ((e.get_total_pi_blocks_count() == e.get_read_pi_blocks_count()) - && _lower_bound.current_pi_idx >= pi_blocks->size() - 1) { - sstlog.trace("index {}: position in current block (all blocks are read)", this); - return make_ready_future<>(); - } - - auto cmp_with_start = [pos_cmp = promoted_index_block_compare(s), &s] - (position_in_partition_view pos, const promoted_index_block& info) -> bool { - return pos_cmp(pos, info.start(s)); - }; - - if (!pi_blocks->empty() && cmp_with_start(pos, (*pi_blocks)[_lower_bound.current_pi_idx])) { - sstlog.trace("index {}: position in current block (exact match)", this); - return make_ready_future<>(); - } - - auto i = std::upper_bound(pi_blocks->cbegin() + _lower_bound.current_pi_idx, pi_blocks->cend(), pos, cmp_with_start); - _lower_bound.current_pi_idx = std::distance(pi_blocks->cbegin(), i); - if ((i != pi_blocks->cend()) || (e.get_read_pi_blocks_count() == e.get_total_pi_blocks_count())) { - if (i != pi_blocks->begin()) { - --i; + promoted_index& pi = *e.get_promoted_index(); + return pi.cursor().advance_to(pos).then([this, &e] (std::optional si) { + if (!si) { + sstlog.trace("index {}: position in the same block", this); + return; } - - get_info_from_promoted_block(i, *pi_blocks); - sstlog.trace("index {}: lower bound skipped to cell, _current_pi_idx={}, _data_file_position={}", - this, _lower_bound.current_pi_idx, _lower_bound.data_file_position); - return make_ready_future<>(); - } - - return e.get_pi_blocks_until(pos).then([this, &s, &e, pi_blocks] (size_t current_pi_idx) { - _lower_bound.current_pi_idx = current_pi_idx; - auto i = std::cbegin(*pi_blocks); - if (_lower_bound.current_pi_idx > 0) { - std::advance(i, _lower_bound.current_pi_idx - 1); + if (!si->active_tombstone) { + // End open marker can be only engaged in SSTables 3.x ('mc' format) and never in ka/la + _lower_bound.end_open_marker.reset(); + } else { + _lower_bound.end_open_marker = open_rt_marker{std::move(si->active_tombstone_pos), si->active_tombstone}; } - get_info_from_promoted_block(i, *pi_blocks); - sstlog.trace("index {}: skipped to cell, _current_pi_idx={}, _data_file_position={}", - this, _lower_bound.current_pi_idx, _lower_bound.data_file_position); + _lower_bound.data_file_position = e.position() + si->offset; + _lower_bound.element = indexable_element::cell; + sstlog.trace("index {}: skipped to cell, _data_file_position={}", this, _lower_bound.data_file_position); }); } diff --git a/sstables/scanning_clustered_index_cursor.hh b/sstables/scanning_clustered_index_cursor.hh new file mode 100644 index 0000000000..b1e5bf3065 --- /dev/null +++ b/sstables/scanning_clustered_index_cursor.hh @@ -0,0 +1,229 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "sstables/index_entry.hh" +#include "schema.hh" + +namespace sstables { + +// Cursor implementation which incrementally consumes promoted index entries +// from an input stream. +// +// Memory complexity: O(1) +// +// Average cost of first lookup: +// +// comparisons: O(N) +// I/O: O(N) +// +// N = number of index entries +// +class scanning_clustered_index_cursor : public clustered_index_cursor { + const schema& _s; + uint64_t _current_pi_idx = 0; // Points to upper bound of the cursor. + promoted_index_blocks_reader _reader; + bool _reader_closed = false; +private: + // Drops blocks which are behind the cursor and are not + // need by get_info_from_promoted_block() when invoked for _current_pi_idx. + // Adjusts _current_pi_idx so that it still points to the same entry. + void trim_blocks() { + promoted_index_blocks& blocks = _reader.get_pi_blocks(); + // Leave one block before _current_pi_idx. + auto target_size = blocks.size() - _current_pi_idx + 1; + while (blocks.size() > target_size) { + blocks.pop_front(); + --_current_pi_idx; + } + } + + // Reads promoted index blocks from the stream until it finds the upper bound + // for a given position. + // Returns the index of the element right before the upper bound one. + future get_pi_blocks_until(position_in_partition_view pos) { + _reader.switch_to_consume_until_mode(pos); + promoted_index_blocks& blocks = _reader.get_pi_blocks(); + erase_all_but_last_two(blocks); + return _reader.consume_input().then([this] { + return _reader.get_current_pi_index(); + }); + } + + // Unconditionally reads the promoted index blocks from the next data buffer + future<> get_next_pi_blocks() { + sstlog.trace("scanning_clustered_index_cursor {}: parsing more blocks", this); + promoted_index_blocks& blocks = _reader.get_pi_blocks(); + blocks = promoted_index_blocks{}; + _reader.switch_to_consume_next_mode(); + return _reader.consume_input(); + } + + [[nodiscard]] uint32_t get_total_pi_blocks_count() const { + return _reader.get_total_num_blocks(); + } + + [[nodiscard]] uint32_t get_read_pi_blocks_count() const { + return _reader.get_read_num_blocks(); + } + + [[nodiscard]] bool all_blocks_read() const { + return _reader.get_read_num_blocks() == _reader.get_total_num_blocks(); + } + + [[nodiscard]] promoted_index_blocks& get_pi_blocks() { + return _reader.get_pi_blocks(); + } + + [[nodiscard]] skip_info get_info_from_promoted_block(const promoted_index_blocks::const_iterator iter, + const promoted_index_blocks& pi_blocks) { + auto offset = iter->offset(); + if (iter == pi_blocks.cbegin() || !std::prev(iter)->end_open_marker()) { + return skip_info{offset, tombstone(), position_in_partition::before_all_clustered_rows()}; + } else { + auto prev = std::prev(iter); + // End open marker can be only engaged in SSTables 3.x ('mc' format) and never in ka/la + auto end_pos = prev->end(_s); + position_in_partition_view* open_rt_pos = std::get_if(&end_pos); + assert(open_rt_pos); + return skip_info{offset, tombstone(*prev->end_open_marker()), position_in_partition{*open_rt_pos}}; + } + } +public: + scanning_clustered_index_cursor(const schema& s, + reader_permit permit, + input_stream&& promoted_index_stream, + uint32_t promoted_index_size, + uint32_t blocks_count, + std::optional clustering_values_fixed_lengths) + : _s(s) + , _reader{std::move(permit), std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size, std::move(clustering_values_fixed_lengths)} + { } + + future> advance_to(position_in_partition_view pos) override { + const promoted_index_blocks* pi_blocks = &get_pi_blocks(); + + if (all_blocks_read() && _current_pi_idx >= pi_blocks->size() - 1) { + sstlog.trace("scanning_clustered_index_cursor {}: position in current block (all blocks are read)", this); + return make_ready_future>(std::nullopt); + } + + auto cmp_with_start = [pos_cmp = promoted_index_block_compare(_s), this] + (position_in_partition_view pos, const promoted_index_block& info) -> bool { + return pos_cmp(pos, info.start(_s)); + }; + + if (!pi_blocks->empty() && cmp_with_start(pos, (*pi_blocks)[_current_pi_idx])) { + sstlog.trace("scanning_clustered_index_cursor {}: position in current block (exact match)", this); + return make_ready_future>(std::nullopt); + } + + auto i = std::upper_bound(pi_blocks->cbegin() + _current_pi_idx, pi_blocks->cend(), pos, cmp_with_start); + _current_pi_idx = std::distance(pi_blocks->cbegin(), i); + if (i != pi_blocks->cend() || all_blocks_read()) { + if (i != pi_blocks->begin()) { + --i; + } + + auto info = get_info_from_promoted_block(i, *pi_blocks); + sstlog.trace("scanning_clustered_index_cursor {}: lower bound skipped to cell, _current_pi_idx={}, offset={}", + this, _current_pi_idx, info.offset); + return make_ready_future>(std::move(info)); + } + + return get_pi_blocks_until(pos).then([this, pi_blocks] (size_t current_pi_idx) { + _current_pi_idx = current_pi_idx; + auto i = std::cbegin(*pi_blocks); + if (_current_pi_idx > 0) { + std::advance(i, _current_pi_idx - 1); + } + auto info = get_info_from_promoted_block(i, *pi_blocks); + sstlog.trace("scanning_clustered_index_cursor {}: skipped to cell, _current_pi_idx={}, offset={}", + this, _current_pi_idx, info.offset); + return std::make_optional(std::move(info)); + }); + } + + future> probe_upper_bound(position_in_partition_view pos) override { + if (get_total_pi_blocks_count() == 0) { + sstlog.trace("scanning_clustered_index_cursor {}: no promoted index", this); + return make_ready_future>(std::nullopt); + } + + if (get_read_pi_blocks_count() == 0) { + // It is expected that this will not do any IO, since the front of the input stream + // will be populated from the buffers we got when reading the partition index. + return get_next_pi_blocks().then([this, pos] { + return probe_upper_bound(pos); + }); + } + + auto cmp_with_start = [pos_cmp = promoted_index_block_compare(_s), this] + (position_in_partition_view pos, const promoted_index_block& info) -> bool { + return pos_cmp(pos, info.start(_s)); + }; + + promoted_index_blocks* pi_blocks = &get_pi_blocks(); + auto i = std::upper_bound(pi_blocks->begin(), pi_blocks->end(), pos, cmp_with_start); + if (i == pi_blocks->end()) { + // Give up. We don't want to expend I/O on fetching more entries. probe_upper_bound() + // is used to determine read-ahead boundary. Doing that accurately may be more + // expensive that the potential waste due to read-ahead over-reads. + return make_ready_future>(std::nullopt); + } + + auto pi_index = std::distance(pi_blocks->begin(), i); + sstlog.trace("scanning_clustered_index_cursor {} upper bound: skipped to cell, pi_idx={}, offset={}", this, pi_index, i->offset()); + return make_ready_future>(offset_in_partition(i->offset())); + } + + future> next_entry() override { + promoted_index_blocks& pi_blocks = get_pi_blocks(); + + if (pi_blocks.empty() || _current_pi_idx >= pi_blocks.size()) { + if (all_blocks_read()) { + return make_ready_future>(std::nullopt); + } + return get_next_pi_blocks().then([this] { + return next_entry(); + }); + } + + sstlog.trace("scanning_clustered_index_cursor {}: next_entry(), pi_idx={}", this, _current_pi_idx); + promoted_index_block& block = pi_blocks[_current_pi_idx]; + auto ei = entry_info{block.start(_s), block.end(_s), block.offset()}; + ++_current_pi_idx; + trim_blocks(); + return make_ready_future>(std::move(ei)); + } + + future<> close() override { + if (!_reader_closed) { + _reader_closed = true; + return _reader.close(); + } + + return make_ready_future<>(); + } +}; + +} diff --git a/test/lib/index_reader_assertions.hh b/test/lib/index_reader_assertions.hh index fe33c26799..6a05fbc63d 100644 --- a/test/lib/index_reader_assertions.hh +++ b/test/lib/index_reader_assertions.hh @@ -35,10 +35,7 @@ public: { } index_reader_assertions& has_monotonic_positions(const schema& s) { - auto pos_cmp = [&s] (const sstables::promoted_index_block& lhs, const sstables::promoted_index_block& rhs) { - sstables::promoted_index_block_compare cmp(s); - return cmp(rhs.start(s), lhs.end(s)); - }; + auto pos_cmp = sstables::promoted_index_block_compare(s); auto rp_cmp = dht::ring_position_comparator(s); auto prev = dht::ring_position::min(); _r->read_partition_data().get(); @@ -54,22 +51,14 @@ public: prev = rp; - while (e.get_read_pi_blocks_count() < e.get_total_pi_blocks_count()) { - e.get_next_pi_blocks().get(); - auto* infos = e.get_pi_blocks(); - if (infos->empty()) { - continue; - } - auto it = std::adjacent_find(infos->begin(), infos->end(), pos_cmp); - if (it != infos->end()) { - std::cout << "promoted index:\n"; - for (auto& e : *infos) { - std::cout << " " << e.start(s) << "-" << e.end(s) - << ": +" << e.offset() << " len=" << e.width() << std::endl; - } - auto next = std::next(it); - BOOST_FAIL(format("Index blocks are not monotonic: {} >= {}", it->end(s), next->start(s))); + sstables::clustered_index_cursor& cur = e.get_promoted_index()->cursor(); + std::optional prev_end; + while (auto ei_opt = cur.next_entry().get0()) { + sstables::clustered_index_cursor::entry_info& ei = *ei_opt; + if (prev_end && pos_cmp(ei.start, *prev_end)) { + BOOST_FAIL(format("Index blocks are not monotonic: {} > {}", *prev_end, ei.start)); } + prev_end = ei.end; } _r->advance_to_next_partition().get(); } @@ -79,7 +68,8 @@ public: index_reader_assertions& is_empty(const schema& s) { _r->read_partition_data().get(); while (!_r->eof()) { - BOOST_REQUIRE(_r->current_partition_entry().get_total_pi_blocks_count() == 0); + sstables::index_entry& ie = _r->current_partition_entry(); + BOOST_REQUIRE(!ie.get_promoted_index() || ie.get_promoted_index()->get_promoted_index_size() == 0); _r->advance_to_next_partition().get(); } return *this;