sstables: Abstract the clustering index cursor behavior

In preparation for supporting more than one algorithm for lookups in
the promoted index, extract relevant logic out of the index_reader
(which is a partition index cursor).

The clustered index cursor implementation is now hidden behind
abstract interface called clustered_index_cursor.

The current implementation is put into the
scanning_clustered_index_cursor. It's mostly code movement with minor
adjustments.

In order to encapsulate iteration over promoted index entries,
clustered_index_cursor::next_entry() was introduced.

No change in behavior intended in this patch.
This commit is contained in:
Tomasz Grabiec
2019-06-26 12:57:55 +02:00
parent a858f87b11
commit d5bf540079
4 changed files with 351 additions and 191 deletions

View File

@@ -484,60 +484,99 @@ public:
void switch_to_consume_until_mode(position_in_partition_view pos) { _pos = pos; _mode = consuming_mode::consume_until; }
promoted_index_blocks& get_pi_blocks() { return _pi_blocks; };
// This constructor is used for ka/la format which does not have information about columns fixed lengths
promoted_index_blocks_reader(reader_permit permit, input_stream<char>&& promoted_index_stream, uint32_t num_blocks,
const schema& s, uint64_t start, uint64_t maxlen)
: continuous_data_consumer(std::move(permit), std::move(promoted_index_stream), start, maxlen)
, _total_num_blocks(num_blocks)
, _num_blocks_left(num_blocks)
, _s(s)
{}
// This constructor is used for mc format which requires information about columns fixed lengths for parsing
// For the mc format clustering_values_fixed_lengths must be engaged. When not engaged ka/la is assumed.
promoted_index_blocks_reader(reader_permit permit, input_stream<char>&& promoted_index_stream, uint32_t num_blocks,
const schema& s, uint64_t start, uint64_t maxlen,
column_values_fixed_lengths&& clustering_values_fixed_lengths)
: continuous_data_consumer(std::move(permit), std::move(promoted_index_stream), start, maxlen)
std::optional<column_values_fixed_lengths> clustering_values_fixed_lengths)
: continuous_data_consumer(permit, std::move(promoted_index_stream), start, maxlen)
, _total_num_blocks{num_blocks}
, _num_blocks_left{num_blocks}
, _s{s}
, _ctx{m_parser_context{std::move(clustering_values_fixed_lengths)}}
{}
{
if (clustering_values_fixed_lengths) {
_ctx.emplace<m_parser_context>(m_parser_context{std::move(*clustering_values_fixed_lengths)});
}
}
};
// Cursor over the index for clustered elements of a single partition.
//
// The user is expected to call advance_to() for monotonically increasing positions
// in order to check if the index has information about more precise location
// of the fragments relevant for the range starting at given position.
//
// The user must serialize all async methods. The next call may start only when the future
// returned by the previous one has resolved.
//
// The user must call close() and wait for it to resolve before destroying.
//
class clustered_index_cursor {
public:
// Position of indexed elements in the data file realative to the start of the partition.
using offset_in_partition = uint64_t;
struct skip_info {
offset_in_partition offset;
tombstone active_tombstone;
position_in_partition active_tombstone_pos;
};
struct entry_info {
promoted_index_block_position start;
promoted_index_block_position end;
offset_in_partition offset;
};
virtual ~clustered_index_cursor() {};
virtual future<> close() = 0;
// Advances the cursor to given position. When the cursor has more accurate information about
// location of the fragments from the range [pos, +inf) in the data file (since it was last advanced)
// it resolves with an engaged optional containing skip_info.
//
// The index may not be precise, so fragments from the range [pos, +inf) may be located after the
// position indicated by skip_info. It is guaranteed that no such fragments are located before the returned position.
//
// Offsets returned in skip_info are monotonically increasing.
//
// Must be called for non-decreasing positions.
// The caller must ensure that pos remains valid until the future resolves.
virtual future<std::optional<skip_info>> advance_to(position_in_partition_view pos) = 0;
// Determines the data file offset relative to the start of the partition such that fragments
// from the range (-inf, pos] are located before that offset.
//
// If such offset cannot be determined in a cheap way, returns a disengaged optional.
//
// Does not advance the cursor.
//
// The caller must ensure that pos remains valid until the future resolves.
virtual future<std::optional<offset_in_partition>> probe_upper_bound(position_in_partition_view pos) = 0;
// Returns skip information about the next position after the cursor
// or nullopt if there is no information about further positions.
//
// When entry_info is returned, the cursor was advanced to entry_info::start.
virtual future<std::optional<entry_info>> next_entry() = 0;
};
class promoted_index {
deletion_time _del_time;
uint32_t _promoted_index_size;
promoted_index_blocks_reader _reader;
std::unique_ptr<clustered_index_cursor> _cursor;
bool _reader_closed = false;
public:
promoted_index(const schema& s, reader_permit permit, deletion_time del_time, input_stream<char>&& promoted_index_stream,
uint32_t promoted_index_size, uint32_t blocks_count)
promoted_index(const schema& s, deletion_time del_time, uint32_t promoted_index_size, std::unique_ptr<clustered_index_cursor> index)
: _del_time{del_time}
, _promoted_index_size(promoted_index_size)
, _reader{std::move(permit), std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size}
{}
promoted_index(const schema& s, reader_permit permit, deletion_time del_time, input_stream<char>&& promoted_index_stream,
uint32_t promoted_index_size, uint32_t blocks_count, column_values_fixed_lengths clustering_values_fixed_lengths)
: _del_time{del_time}
, _promoted_index_size(promoted_index_size)
, _reader{std::move(permit), std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size, std::move(clustering_values_fixed_lengths)}
{}
, _cursor(std::move(index))
{ }
[[nodiscard]] deletion_time get_deletion_time() const { return _del_time; }
[[nodiscard]] uint32_t get_promoted_index_size() const { return _promoted_index_size; }
[[nodiscard]] promoted_index_blocks_reader& get_reader() { return _reader; };
[[nodiscard]] const promoted_index_blocks_reader& get_reader() const { return _reader; };
future<> close_reader() {
if (!_reader_closed) {
_reader_closed = true;
return _reader.close();
}
return make_ready_future<>();
}
[[nodiscard]] clustered_index_cursor& cursor() { return *_cursor; };
[[nodiscard]] const clustered_index_cursor& cursor() const { return *_cursor; };
future<> close_reader() { return _cursor->close(); }
};
class index_entry {
@@ -575,8 +614,6 @@ public:
return {};
}
uint32_t get_promoted_index_size() const { return _index ? _index->get_promoted_index_size() : 0; }
index_entry(const schema& s, temporary_buffer<char>&& key, uint64_t position, std::unique_ptr<promoted_index>&& index)
: _s(std::cref(s))
, _key(std::move(key))
@@ -587,45 +624,11 @@ public:
index_entry(index_entry&&) = default;
index_entry& operator=(index_entry&&) = default;
// Reads promoted index blocks from the stream until it finds the upper bound
// for a given position.
// Returns the index of the element right before the upper bound one.
future<size_t> get_pi_blocks_until(position_in_partition_view pos) {
if (!_index) {
return make_ready_future<size_t>(0);
}
// Can be nullptr
const std::unique_ptr<promoted_index>& get_promoted_index() const { return _index; }
std::unique_ptr<promoted_index>& get_promoted_index() { return _index; }
uint32_t get_promoted_index_size() const { return _index ? _index->get_promoted_index_size() : 0; }
auto& reader = _index->get_reader();
reader.switch_to_consume_until_mode(pos);
promoted_index_blocks& blocks = reader.get_pi_blocks();
erase_all_but_last_two(blocks);
return reader.consume_input().then([this, &reader] {
return reader.get_current_pi_index();
});
}
// Unconditionally reads the promoted index blocks from the next data buffer
future<> get_next_pi_blocks() {
if (!_index) {
return make_ready_future<>();
}
auto& reader = _index->get_reader();
promoted_index_blocks& blocks = reader.get_pi_blocks();
blocks = promoted_index_blocks{};
reader.switch_to_consume_next_mode();
return reader.consume_input();
}
[[nodiscard]] uint32_t get_total_pi_blocks_count() const {
return _index ? _index->get_reader().get_total_num_blocks() : 0;
}
[[nodiscard]] uint32_t get_read_pi_blocks_count() const {
return _index ? _index->get_reader().get_read_num_blocks() : 0;
}
[[nodiscard]] promoted_index_blocks* get_pi_blocks() {
return _index ? &_index->get_reader().get_pi_blocks() : nullptr;
}
future<> close_pi_stream() {
if (_index) {
return _index->close_reader();

View File

@@ -28,6 +28,7 @@
#include "utils/buffer_input_stream.hh"
#include "sstables/prepended_input_stream.hh"
#include "tracing/traced_file.hh"
#include "sstables/scanning_clustered_index_cursor.hh"
namespace sstables {
@@ -222,7 +223,7 @@ public:
_num_pi_blocks = get_uint32();
}
auto data_size = data.size();
std::unique_ptr<promoted_index> index;
std::unique_ptr<promoted_index> pi;
if ((_trust_pi == trust_promoted_index::yes) && (promoted_index_size > 0)) {
input_stream<char> promoted_index_stream = [&] {
if (promoted_index_size <= data_size) {
@@ -234,18 +235,13 @@ public:
make_file_input_stream(_index_file, this->position(), promoted_index_size - data_size, _options).detach());
}
}();
if (is_mc_format()) {
index = std::make_unique<promoted_index>(_s, continuous_data_consumer::_permit, *_deletion_time, std::move(promoted_index_stream),
promoted_index_size,
_num_pi_blocks, *_ck_values_fixed_lengths);
} else {
index = std::make_unique<promoted_index>(_s, continuous_data_consumer::_permit, *_deletion_time, std::move(promoted_index_stream),
promoted_index_size, _num_pi_blocks);
}
auto index = std::make_unique<scanning_clustered_index_cursor>(_s, continuous_data_consumer::_permit,
std::move(promoted_index_stream), promoted_index_size, _num_pi_blocks, _ck_values_fixed_lengths);
pi = std::make_unique<promoted_index>(_s, *_deletion_time, promoted_index_size, std::move(index));
} else {
_num_pi_blocks = 0;
}
_consumer.consume_entry(index_entry{_s, std::move(_key), _position, std::move(index)}, _entry_offset);
_consumer.consume_entry(index_entry{_s, std::move(_key), _position, std::move(pi)}, _entry_offset);
_deletion_time = std::nullopt;
_num_pi_blocks = 0;
_state = state::START;
@@ -615,35 +611,22 @@ private:
}
index_entry& e = current_partition_entry(*_upper_bound);
if (e.get_total_pi_blocks_count() == 0) {
if (!e.get_promoted_index()) {
sstlog.trace("index {}: no promoted index", this);
return advance_to_next_partition(*_upper_bound);
}
if (e.get_read_pi_blocks_count() == 0) {
return e.get_next_pi_blocks().then([this, pos] {
return advance_upper_past(pos);
});
}
const schema& s = *_sstable->_schema;
auto cmp_with_start = [pos_cmp = promoted_index_block_compare(s), &s]
(position_in_partition_view pos, const promoted_index_block& info) -> bool {
return pos_cmp(pos, info.start(s));
};
promoted_index_blocks* pi_blocks = e.get_pi_blocks();
assert(pi_blocks);
auto i = std::upper_bound(pi_blocks->begin() + _upper_bound->current_pi_idx, pi_blocks->end(), pos, cmp_with_start);
_upper_bound->current_pi_idx = std::distance(pi_blocks->begin(), i);
if (i == pi_blocks->end()) {
return advance_to_next_partition(*_upper_bound);
}
_upper_bound->data_file_position = e.position() + i->offset();
_upper_bound->element = indexable_element::cell;
sstlog.trace("index {} upper bound: skipped to cell, _current_pi_idx={}, _data_file_position={}",
this, _upper_bound->current_pi_idx, _upper_bound->data_file_position);
return make_ready_future<>();
promoted_index& pi = *e.get_promoted_index();
return pi.cursor().probe_upper_bound(pos).then([this, &e] (std::optional<clustered_index_cursor::offset_in_partition> off) {
if (!off) {
return advance_to_next_partition(*_upper_bound);
}
_upper_bound->data_file_position = e.position() + *off;
_upper_bound->element = indexable_element::cell;
sstlog.trace("index {} upper bound: skipped to cell, _data_file_position={}", this, _upper_bound->data_file_position);
return make_ready_future<>();
});
}
// Returns position right after all partitions in the sstable
@@ -651,25 +634,6 @@ private:
return _sstable->data_size();
}
void get_info_from_promoted_block(const promoted_index_blocks::const_iterator iter,
const promoted_index_blocks& pi_blocks) {
const index_entry& e = current_partition_entry();
_lower_bound.data_file_position = e.position() + iter->offset();
_lower_bound.element = indexable_element::cell;
if (iter == pi_blocks.cbegin() || !std::prev(iter)->end_open_marker()) {
_lower_bound.end_open_marker.reset();
} else {
auto prev = std::prev(iter);
// End open marker can be only engaged in SSTables 3.x ('mc' format) and never in ka/la
auto end_pos = prev->end(*_sstable->get_schema());
position_in_partition_view* open_rt_pos = std::get_if<position_in_partition_view>(&end_pos);
assert(open_rt_pos);
_lower_bound.end_open_marker = open_rt_marker{
position_in_partition{*open_rt_pos},
tombstone(*prev->end_open_marker())};
}
}
public:
index_reader(shared_sstable sst, reader_permit permit, const io_priority_class& pc, tracing::trace_state_ptr trace_state)
: _sstable(std::move(sst))
@@ -749,52 +713,26 @@ public:
}
index_entry& e = current_partition_entry();
if (e.get_total_pi_blocks_count() == 0) {
if (!e.get_promoted_index()) {
sstlog.trace("index {}: no promoted index", this);
return make_ready_future<>();
}
const promoted_index_blocks* pi_blocks = e.get_pi_blocks();
assert(pi_blocks);
if ((e.get_total_pi_blocks_count() == e.get_read_pi_blocks_count())
&& _lower_bound.current_pi_idx >= pi_blocks->size() - 1) {
sstlog.trace("index {}: position in current block (all blocks are read)", this);
return make_ready_future<>();
}
auto cmp_with_start = [pos_cmp = promoted_index_block_compare(s), &s]
(position_in_partition_view pos, const promoted_index_block& info) -> bool {
return pos_cmp(pos, info.start(s));
};
if (!pi_blocks->empty() && cmp_with_start(pos, (*pi_blocks)[_lower_bound.current_pi_idx])) {
sstlog.trace("index {}: position in current block (exact match)", this);
return make_ready_future<>();
}
auto i = std::upper_bound(pi_blocks->cbegin() + _lower_bound.current_pi_idx, pi_blocks->cend(), pos, cmp_with_start);
_lower_bound.current_pi_idx = std::distance(pi_blocks->cbegin(), i);
if ((i != pi_blocks->cend()) || (e.get_read_pi_blocks_count() == e.get_total_pi_blocks_count())) {
if (i != pi_blocks->begin()) {
--i;
promoted_index& pi = *e.get_promoted_index();
return pi.cursor().advance_to(pos).then([this, &e] (std::optional<clustered_index_cursor::skip_info> si) {
if (!si) {
sstlog.trace("index {}: position in the same block", this);
return;
}
get_info_from_promoted_block(i, *pi_blocks);
sstlog.trace("index {}: lower bound skipped to cell, _current_pi_idx={}, _data_file_position={}",
this, _lower_bound.current_pi_idx, _lower_bound.data_file_position);
return make_ready_future<>();
}
return e.get_pi_blocks_until(pos).then([this, &s, &e, pi_blocks] (size_t current_pi_idx) {
_lower_bound.current_pi_idx = current_pi_idx;
auto i = std::cbegin(*pi_blocks);
if (_lower_bound.current_pi_idx > 0) {
std::advance(i, _lower_bound.current_pi_idx - 1);
if (!si->active_tombstone) {
// End open marker can be only engaged in SSTables 3.x ('mc' format) and never in ka/la
_lower_bound.end_open_marker.reset();
} else {
_lower_bound.end_open_marker = open_rt_marker{std::move(si->active_tombstone_pos), si->active_tombstone};
}
get_info_from_promoted_block(i, *pi_blocks);
sstlog.trace("index {}: skipped to cell, _current_pi_idx={}, _data_file_position={}",
this, _lower_bound.current_pi_idx, _lower_bound.data_file_position);
_lower_bound.data_file_position = e.position() + si->offset;
_lower_bound.element = indexable_element::cell;
sstlog.trace("index {}: skipped to cell, _data_file_position={}", this, _lower_bound.data_file_position);
});
}

View File

@@ -0,0 +1,229 @@
/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "sstables/index_entry.hh"
#include "schema.hh"
namespace sstables {
// Cursor implementation which incrementally consumes promoted index entries
// from an input stream.
//
// Memory complexity: O(1)
//
// Average cost of first lookup:
//
// comparisons: O(N)
// I/O: O(N)
//
// N = number of index entries
//
class scanning_clustered_index_cursor : public clustered_index_cursor {
const schema& _s;
uint64_t _current_pi_idx = 0; // Points to upper bound of the cursor.
promoted_index_blocks_reader _reader;
bool _reader_closed = false;
private:
// Drops blocks which are behind the cursor and are not
// need by get_info_from_promoted_block() when invoked for _current_pi_idx.
// Adjusts _current_pi_idx so that it still points to the same entry.
void trim_blocks() {
promoted_index_blocks& blocks = _reader.get_pi_blocks();
// Leave one block before _current_pi_idx.
auto target_size = blocks.size() - _current_pi_idx + 1;
while (blocks.size() > target_size) {
blocks.pop_front();
--_current_pi_idx;
}
}
// Reads promoted index blocks from the stream until it finds the upper bound
// for a given position.
// Returns the index of the element right before the upper bound one.
future<size_t> get_pi_blocks_until(position_in_partition_view pos) {
_reader.switch_to_consume_until_mode(pos);
promoted_index_blocks& blocks = _reader.get_pi_blocks();
erase_all_but_last_two(blocks);
return _reader.consume_input().then([this] {
return _reader.get_current_pi_index();
});
}
// Unconditionally reads the promoted index blocks from the next data buffer
future<> get_next_pi_blocks() {
sstlog.trace("scanning_clustered_index_cursor {}: parsing more blocks", this);
promoted_index_blocks& blocks = _reader.get_pi_blocks();
blocks = promoted_index_blocks{};
_reader.switch_to_consume_next_mode();
return _reader.consume_input();
}
[[nodiscard]] uint32_t get_total_pi_blocks_count() const {
return _reader.get_total_num_blocks();
}
[[nodiscard]] uint32_t get_read_pi_blocks_count() const {
return _reader.get_read_num_blocks();
}
[[nodiscard]] bool all_blocks_read() const {
return _reader.get_read_num_blocks() == _reader.get_total_num_blocks();
}
[[nodiscard]] promoted_index_blocks& get_pi_blocks() {
return _reader.get_pi_blocks();
}
[[nodiscard]] skip_info get_info_from_promoted_block(const promoted_index_blocks::const_iterator iter,
const promoted_index_blocks& pi_blocks) {
auto offset = iter->offset();
if (iter == pi_blocks.cbegin() || !std::prev(iter)->end_open_marker()) {
return skip_info{offset, tombstone(), position_in_partition::before_all_clustered_rows()};
} else {
auto prev = std::prev(iter);
// End open marker can be only engaged in SSTables 3.x ('mc' format) and never in ka/la
auto end_pos = prev->end(_s);
position_in_partition_view* open_rt_pos = std::get_if<position_in_partition_view>(&end_pos);
assert(open_rt_pos);
return skip_info{offset, tombstone(*prev->end_open_marker()), position_in_partition{*open_rt_pos}};
}
}
public:
scanning_clustered_index_cursor(const schema& s,
reader_permit permit,
input_stream<char>&& promoted_index_stream,
uint32_t promoted_index_size,
uint32_t blocks_count,
std::optional<column_values_fixed_lengths> clustering_values_fixed_lengths)
: _s(s)
, _reader{std::move(permit), std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size, std::move(clustering_values_fixed_lengths)}
{ }
future<std::optional<skip_info>> advance_to(position_in_partition_view pos) override {
const promoted_index_blocks* pi_blocks = &get_pi_blocks();
if (all_blocks_read() && _current_pi_idx >= pi_blocks->size() - 1) {
sstlog.trace("scanning_clustered_index_cursor {}: position in current block (all blocks are read)", this);
return make_ready_future<std::optional<skip_info>>(std::nullopt);
}
auto cmp_with_start = [pos_cmp = promoted_index_block_compare(_s), this]
(position_in_partition_view pos, const promoted_index_block& info) -> bool {
return pos_cmp(pos, info.start(_s));
};
if (!pi_blocks->empty() && cmp_with_start(pos, (*pi_blocks)[_current_pi_idx])) {
sstlog.trace("scanning_clustered_index_cursor {}: position in current block (exact match)", this);
return make_ready_future<std::optional<skip_info>>(std::nullopt);
}
auto i = std::upper_bound(pi_blocks->cbegin() + _current_pi_idx, pi_blocks->cend(), pos, cmp_with_start);
_current_pi_idx = std::distance(pi_blocks->cbegin(), i);
if (i != pi_blocks->cend() || all_blocks_read()) {
if (i != pi_blocks->begin()) {
--i;
}
auto info = get_info_from_promoted_block(i, *pi_blocks);
sstlog.trace("scanning_clustered_index_cursor {}: lower bound skipped to cell, _current_pi_idx={}, offset={}",
this, _current_pi_idx, info.offset);
return make_ready_future<std::optional<skip_info>>(std::move(info));
}
return get_pi_blocks_until(pos).then([this, pi_blocks] (size_t current_pi_idx) {
_current_pi_idx = current_pi_idx;
auto i = std::cbegin(*pi_blocks);
if (_current_pi_idx > 0) {
std::advance(i, _current_pi_idx - 1);
}
auto info = get_info_from_promoted_block(i, *pi_blocks);
sstlog.trace("scanning_clustered_index_cursor {}: skipped to cell, _current_pi_idx={}, offset={}",
this, _current_pi_idx, info.offset);
return std::make_optional(std::move(info));
});
}
future<std::optional<offset_in_partition>> probe_upper_bound(position_in_partition_view pos) override {
if (get_total_pi_blocks_count() == 0) {
sstlog.trace("scanning_clustered_index_cursor {}: no promoted index", this);
return make_ready_future<std::optional<offset_in_partition>>(std::nullopt);
}
if (get_read_pi_blocks_count() == 0) {
// It is expected that this will not do any IO, since the front of the input stream
// will be populated from the buffers we got when reading the partition index.
return get_next_pi_blocks().then([this, pos] {
return probe_upper_bound(pos);
});
}
auto cmp_with_start = [pos_cmp = promoted_index_block_compare(_s), this]
(position_in_partition_view pos, const promoted_index_block& info) -> bool {
return pos_cmp(pos, info.start(_s));
};
promoted_index_blocks* pi_blocks = &get_pi_blocks();
auto i = std::upper_bound(pi_blocks->begin(), pi_blocks->end(), pos, cmp_with_start);
if (i == pi_blocks->end()) {
// Give up. We don't want to expend I/O on fetching more entries. probe_upper_bound()
// is used to determine read-ahead boundary. Doing that accurately may be more
// expensive that the potential waste due to read-ahead over-reads.
return make_ready_future<std::optional<offset_in_partition>>(std::nullopt);
}
auto pi_index = std::distance(pi_blocks->begin(), i);
sstlog.trace("scanning_clustered_index_cursor {} upper bound: skipped to cell, pi_idx={}, offset={}", this, pi_index, i->offset());
return make_ready_future<std::optional<offset_in_partition>>(offset_in_partition(i->offset()));
}
future<std::optional<entry_info>> next_entry() override {
promoted_index_blocks& pi_blocks = get_pi_blocks();
if (pi_blocks.empty() || _current_pi_idx >= pi_blocks.size()) {
if (all_blocks_read()) {
return make_ready_future<std::optional<entry_info>>(std::nullopt);
}
return get_next_pi_blocks().then([this] {
return next_entry();
});
}
sstlog.trace("scanning_clustered_index_cursor {}: next_entry(), pi_idx={}", this, _current_pi_idx);
promoted_index_block& block = pi_blocks[_current_pi_idx];
auto ei = entry_info{block.start(_s), block.end(_s), block.offset()};
++_current_pi_idx;
trim_blocks();
return make_ready_future<std::optional<entry_info>>(std::move(ei));
}
future<> close() override {
if (!_reader_closed) {
_reader_closed = true;
return _reader.close();
}
return make_ready_future<>();
}
};
}

View File

@@ -35,10 +35,7 @@ public:
{ }
index_reader_assertions& has_monotonic_positions(const schema& s) {
auto pos_cmp = [&s] (const sstables::promoted_index_block& lhs, const sstables::promoted_index_block& rhs) {
sstables::promoted_index_block_compare cmp(s);
return cmp(rhs.start(s), lhs.end(s));
};
auto pos_cmp = sstables::promoted_index_block_compare(s);
auto rp_cmp = dht::ring_position_comparator(s);
auto prev = dht::ring_position::min();
_r->read_partition_data().get();
@@ -54,22 +51,14 @@ public:
prev = rp;
while (e.get_read_pi_blocks_count() < e.get_total_pi_blocks_count()) {
e.get_next_pi_blocks().get();
auto* infos = e.get_pi_blocks();
if (infos->empty()) {
continue;
}
auto it = std::adjacent_find(infos->begin(), infos->end(), pos_cmp);
if (it != infos->end()) {
std::cout << "promoted index:\n";
for (auto& e : *infos) {
std::cout << " " << e.start(s) << "-" << e.end(s)
<< ": +" << e.offset() << " len=" << e.width() << std::endl;
}
auto next = std::next(it);
BOOST_FAIL(format("Index blocks are not monotonic: {} >= {}", it->end(s), next->start(s)));
sstables::clustered_index_cursor& cur = e.get_promoted_index()->cursor();
std::optional<sstables::promoted_index_block_position> prev_end;
while (auto ei_opt = cur.next_entry().get0()) {
sstables::clustered_index_cursor::entry_info& ei = *ei_opt;
if (prev_end && pos_cmp(ei.start, *prev_end)) {
BOOST_FAIL(format("Index blocks are not monotonic: {} > {}", *prev_end, ei.start));
}
prev_end = ei.end;
}
_r->advance_to_next_partition().get();
}
@@ -79,7 +68,8 @@ public:
index_reader_assertions& is_empty(const schema& s) {
_r->read_partition_data().get();
while (!_r->eof()) {
BOOST_REQUIRE(_r->current_partition_entry().get_total_pi_blocks_count() == 0);
sstables::index_entry& ie = _r->current_partition_entry();
BOOST_REQUIRE(!ie.get_promoted_index() || ie.get_promoted_index()->get_promoted_index_size() == 0);
_r->advance_to_next_partition().get();
}
return *this;