Files
scylladb/sstables/index_reader.hh
Raphael S. Carvalho cabb7fbd3b sstables: Close promoted index readers when advancing to next summary index
Problem fixed on master since 5ed559c. So branch-4.5 and up aren't affected.

Index reader fails to close input streams of promoted index readers when advancing
to next summary entry, so Scylla can abort as a result of a stream being destroyed
while there were reads in progress. This problem was seen when row cache issued
a fast forward, so index reader was asked to advance to next summary entry while
the previous one still had reads in progress.
By closing the list of index readers when there's only one owner holding it,
the problem is safely fixed, because it cannot happen that an index_bound like
_lower_bound or _upper_bound will be left with a list that's already closed.

Fixes #9049.

test: mode(dev, debug).

No observable perf regression:

BEFORE:

   read    skip      time (s)   iterations     frags     frag/s    mad f/s    max f/s    min f/s    avg aio    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
-> 1       0         8.168640            4    100000      12242        108      12262      11982    50032.2  50049    6403116   20707       0        0        8        8        0        0        0  83.3%
-> 1       1        22.257916            4     50000       2246          3       2249       2238   150025.0 150025    6454272  100001       0    49999   100000   149999        0        0        0  54.7%
-> 1       8         9.384961            4     11112       1184          5       1184       1178    77781.2  77781    1439328   66618   11111        1    33334    44444        0        0        0  44.0%
-> 1       16        4.976144            4      5883       1182          6       1184       1173    41180.0  41180     762053   35264    5882        0    17648    23530        0        0        0  44.1%
-> 1       32        2.582744            4      3031       1174          4       1175       1167    21216.0  21216     392619   18176    3031        0     9092    12122        0        0        0  43.8%
-> 1       64        1.308410            4      1539       1176          2       1178       1173    10772.0  10772     199353    9233    1539        0     4616     6154        0        0        0  44.0%
-> 1       256       0.331037            4       390       1178         12       1190       1165     2729.0   2729      50519    2338     390        0     1169     1558        0        0        0  44.0%
-> 1       1024      0.085108            4        98       1151          7       1155       1141      685.0    685      12694     587      98        0      293      390        0        0        0  42.9%
-> 1       4096      0.024393            6        25       1025          5       1029       1020      174.0    174       3238     149      25        0       74       98        0        0        0  37.4%
-> 64      1         8.765446            4     98462      11233         16      11236      11182    54642.0  54648    6405470   23632       1     1538     4615     4615        0        0        0  79.3%
-> 64      8         8.456430            4     88896      10512         48      10582      10464    55578.0  55578    6405971   24031    4166        0     5553     5553        0        0        0  77.3%
-> 64      16        7.798197            4     80000      10259        108      10299      10077    51248.0  51248    5922500   22160    4996        0     4998     4998        0        0        0  74.8%
-> 64      32        6.605148            4     66688      10096         64      10168      10033    42715.0  42715    4936359   18796    4164        0     4165     4165        0        0        0  75.5%
-> 64      64        4.933287            4     50016      10138         28      10189      10111    32039.0  32039    3702428   14106    3124        0     3125     3125        0        0        0  75.3%
-> 64      256       1.971701            4     20032      10160         57      10347      10103    12831.0  12831    1482993    5731    1252        0     1250     1250        0        0        0  74.1%
-> 64      1024      0.587026            4      5888      10030         84      10277       9946     3770.0   3770     435895    1635     368        0      366      366        0        0        0  74.6%
-> 64      4096      0.157401            4      1600      10165         69      10202       9698     1023.0   1023     118449     455     100        0       98       98        0        0        0  73.9%

AFTER:

   read    skip      time (s)   iterations     frags     frag/s    mad f/s    max f/s    min f/s    avg aio    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
-> 1       0         8.191639            4    100000      12208         46      12279      12161    50031.2  50025    6403108   20243       0        0        0        0        0        0        0  87.0%
-> 1       1        22.933121            4     50000       2180         36       2198       2115   150025.0 150025    6454272  100001       0    49999   100000   149999        0        0        0  54.9%
-> 1       8         9.471735            4     11112       1173          5       1178       1168    77781.2  77781    1439328   66663   11111        0    33334    44445        0        0        0  44.6%
-> 1       16        5.001569            4      5883       1176          2       1176       1170    41180.0  41180     762053   35296    5882        1    17648    23529        0        0        0  44.6%
-> 1       32        2.587069            4      3031       1172          1       1173       1164    21216.0  21216     392619   18185    3031        1     9092    12121        0        0        0  44.8%
-> 1       64        1.310747            4      1539       1174          3       1177       1171    10772.0  10772     199353    9233    1539        0     4616     6154        0        0        0  44.9%
-> 1       256       0.335490            4       390       1162          2       1167       1161     2729.0   2729      50519    2338     390        0     1169     1558        0        0        0  45.7%
-> 1       1024      0.081944            4        98       1196         21       1210       1162      685.0    685      12694     585      98        0      293      390        0        0        0  46.2%
-> 1       4096      0.022266            6        25       1123          3       1125       1105      174.0    174       3238     149      24        0       74       98        0        0        0  41.9%
-> 64      1         8.731741            4     98462      11276         45      11417      11231    54642.0  54640    6405470   23686       0     1538     4615     4615        0        0        0  80.2%
-> 64      8         8.396247            4     88896      10588         19      10596      10560    55578.0  55578    6405971   24275    4166        0     5553     5553        0        0        0  77.6%
-> 64      16        7.700995            4     80000      10388         88      10405      10221    51248.0  51248    5922500   22100    5000        0     4998     4998        0        0        0  76.4%
-> 64      32        6.517276            4     66688      10232         31      10342      10201    42715.0  42715    4936359   19013    4164        0     4165     4165        0        0        0  75.3%
-> 64      64        4.898669            4     50016      10210         60      10291      10150    32039.0  32039    3702428   14110    3124        0     3125     3125        0        0        0  74.4%
-> 64      256       1.969972            4     20032      10169         22      10173      10091    12831.0  12831    1482993    5660    1252        0     1250     1250        0        0        0  74.3%
-> 64      1024      0.575180            4      5888      10237         84      10316      10028     3770.0   3770     435895    1656     368        0      366      366        0        0        0  74.6%
-> 64      4096      0.158503            4      1600      10094         81      10195      10014     1023.0   1023     118449     460     100        0       98       98        0        0        0  73.5%

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20210722180302.64675-1-raphaelsc@scylladb.com>
(cherry picked from commit 9dce1e4b2b)
2021-07-25 17:24:09 +03:00

862 lines
38 KiB
C++

/*
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "sstables.hh"
#include "consumer.hh"
#include "downsampling.hh"
#include "sstables/shared_index_lists.hh"
#include <seastar/util/bool_class.hh>
#include "utils/buffer_input_stream.hh"
#include "sstables/prepended_input_stream.hh"
#include "tracing/traced_file.hh"
#include "sstables/scanning_clustered_index_cursor.hh"
#include "sstables/mx/bsearch_clustered_cursor.hh"
namespace sstables {
extern seastar::logger sstlog;
extern thread_local cached_file::metrics index_page_cache_metrics;
extern thread_local mc::cached_promoted_index::metrics promoted_index_cache_metrics;
class index_consumer {
uint64_t max_quantity;
public:
index_list indexes;
index_consumer(uint64_t q) : max_quantity(q) {
indexes.reserve(q);
}
void consume_entry(index_entry&& ie, uint64_t offset) {
indexes.push_back(std::move(ie));
}
void reset() {
indexes.clear();
}
};
// See #2993
class trust_promoted_index_tag;
using trust_promoted_index = bool_class<trust_promoted_index_tag>;
// IndexConsumer is a concept that implements:
//
// bool should_continue();
// void consume_entry(index_entry&& ie, uint64_t offset);
//
// TODO: make it templated on SSTables version since the exact format can be passed in at compile time
template <class IndexConsumer>
class index_consume_entry_context : public data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>> {
using proceed = data_consumer::proceed;
using processing_result = data_consumer::processing_result;
using continuous_data_consumer = data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>>;
using read_status = typename continuous_data_consumer::read_status;
private:
IndexConsumer& _consumer;
sstring _file_name;
file _index_file;
file_input_stream_options _options;
uint64_t _entry_offset;
enum class state {
START,
KEY_SIZE,
KEY_BYTES,
POSITION,
PROMOTED_SIZE,
PARTITION_HEADER_LENGTH_1,
PARTITION_HEADER_LENGTH_2,
LOCAL_DELETION_TIME,
MARKED_FOR_DELETE_AT,
NUM_PROMOTED_INDEX_BLOCKS,
CONSUME_ENTRY,
} _state = state::START;
friend std::ostream& operator<<(std::ostream& out, const state& s) {
switch (s) {
case state::START: return out << "START";
case state::KEY_SIZE: return out << "KEY_SIZE";
case state::KEY_BYTES: return out << "KEY_BYTES";
case state::POSITION: return out << "POSITION";
case state::PROMOTED_SIZE: return out << "PROMOTED_SIZE";
case state::PARTITION_HEADER_LENGTH_1: return out << "PARTITION_HEADER_LENGTH_1";
case state::PARTITION_HEADER_LENGTH_2: return out << "PARTITION_HEADER_LENGTH_2";
case state::LOCAL_DELETION_TIME: return out << "LOCAL_DELETION_TIME";
case state::MARKED_FOR_DELETE_AT: return out << "MARKED_FOR_DELETE_AT";
case state::NUM_PROMOTED_INDEX_BLOCKS: return out << "NUM_PROMOTED_INDEX_BLOCKS";
case state::CONSUME_ENTRY: return out << "CONSUME_ENTRY";
}
abort();
}
temporary_buffer<char> _key;
uint64_t _promoted_index_end;
uint64_t _position;
uint64_t _partition_header_length = 0;
std::optional<deletion_time> _deletion_time;
uint32_t _num_pi_blocks = 0;
trust_promoted_index _trust_pi;
const schema& _s;
std::optional<column_values_fixed_lengths> _ck_values_fixed_lengths;
bool _use_binary_search;
tracing::trace_state_ptr _trace_state;
inline bool is_mc_format() const { return static_cast<bool>(_ck_values_fixed_lengths); }
public:
void verify_end_state() const {
if (this->_remain > 0) {
throw std::runtime_error("index_consume_entry_context - no more data but parsing is incomplete");
}
}
bool non_consuming() const {
return ((_state == state::CONSUME_ENTRY) || (_state == state::START));
}
processing_result process_state(temporary_buffer<char>& data) {
auto current_pos = [&] { return this->position() - data.size(); };
auto read_vint_or_uint64 = [this] (temporary_buffer<char>& data) {
return is_mc_format() ? this->read_unsigned_vint(data) : this->read_64(data);
};
auto read_vint_or_uint32 = [this] (temporary_buffer<char>& data) {
return is_mc_format() ? this->read_unsigned_vint(data) : this->read_32(data);
};
auto get_uint32 = [this] {
return is_mc_format() ? static_cast<uint32_t>(this->_u64) : this->_u32;
};
switch (_state) {
// START comes first, to make the handling of the 0-quantity case simpler
case state::START:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::START);
_state = state::KEY_SIZE;
break;
case state::KEY_SIZE:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::KEY_SIZE);
_entry_offset = current_pos();
if (this->read_16(data) != continuous_data_consumer::read_status::ready) {
_state = state::KEY_BYTES;
break;
}
case state::KEY_BYTES:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::KEY_BYTES);
if (this->read_bytes(data, this->_u16, _key) != continuous_data_consumer::read_status::ready) {
_state = state::POSITION;
break;
}
case state::POSITION:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::POSITION);
if (read_vint_or_uint64(data) != continuous_data_consumer::read_status::ready) {
_state = state::PROMOTED_SIZE;
break;
}
case state::PROMOTED_SIZE:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::PROMOTED_SIZE);
_position = this->_u64;
if (read_vint_or_uint32(data) != continuous_data_consumer::read_status::ready) {
_state = state::PARTITION_HEADER_LENGTH_1;
break;
}
case state::PARTITION_HEADER_LENGTH_1: {
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::PARTITION_HEADER_LENGTH_1);
auto promoted_index_size_with_header = get_uint32();
_promoted_index_end = current_pos() + promoted_index_size_with_header;
if (promoted_index_size_with_header == 0) {
_state = state::CONSUME_ENTRY;
goto state_CONSUME_ENTRY;
}
if (!is_mc_format()) {
// SSTables ka/la don't have a partition_header_length field
_state = state::LOCAL_DELETION_TIME;
goto state_LOCAL_DELETION_TIME;
}
if (this->read_unsigned_vint(data) != continuous_data_consumer::read_status::ready) {
_state = state::PARTITION_HEADER_LENGTH_2;
break;
}
}
case state::PARTITION_HEADER_LENGTH_2:
sstlog.trace("{}: pos {} state {} {}", fmt::ptr(this), current_pos(), state::PARTITION_HEADER_LENGTH_2, this->_u64);
_partition_header_length = this->_u64;
state_LOCAL_DELETION_TIME:
case state::LOCAL_DELETION_TIME:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::LOCAL_DELETION_TIME);
_deletion_time.emplace();
if (this->read_32(data) != continuous_data_consumer::read_status::ready) {
_state = state::MARKED_FOR_DELETE_AT;
break;
}
case state::MARKED_FOR_DELETE_AT:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::MARKED_FOR_DELETE_AT);
_deletion_time->local_deletion_time = this->_u32;
if (this->read_64(data) != continuous_data_consumer::read_status::ready) {
_state = state::NUM_PROMOTED_INDEX_BLOCKS;
break;
}
case state::NUM_PROMOTED_INDEX_BLOCKS:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::NUM_PROMOTED_INDEX_BLOCKS);
_deletion_time->marked_for_delete_at = this->_u64;
if (read_vint_or_uint32(data) != continuous_data_consumer::read_status::ready) {
_state = state::CONSUME_ENTRY;
break;
}
state_CONSUME_ENTRY:
case state::CONSUME_ENTRY: {
auto promoted_index_start = current_pos();
auto promoted_index_size = _promoted_index_end - promoted_index_start;
sstlog.trace("{}: pos {} state {} size {}", fmt::ptr(this), current_pos(), state::CONSUME_ENTRY, promoted_index_size);
if (_deletion_time) {
_num_pi_blocks = get_uint32();
}
auto data_size = data.size();
std::unique_ptr<promoted_index> pi;
if ((_trust_pi == trust_promoted_index::yes) && (promoted_index_size > 0)) {
std::unique_ptr<clustered_index_cursor> cursor;
if (_use_binary_search) {
cached_file f(_index_file, continuous_data_consumer::_permit, index_page_cache_metrics,
promoted_index_start, promoted_index_size, _file_name);
if (promoted_index_size <= data_size) {
f.populate_front(data.share());
} else {
f.populate_front(std::move(data));
}
cursor = std::make_unique<mc::bsearch_clustered_cursor>(_s,
promoted_index_cache_metrics, continuous_data_consumer::_permit,
*_ck_values_fixed_lengths, std::move(f), _options.io_priority_class, _num_pi_blocks, _trace_state);
} else {
input_stream<char> promoted_index_stream = [&] {
if (promoted_index_size <= data_size) {
auto buf = data.share();
buf.trim(promoted_index_size);
return make_buffer_input_stream(std::move(buf));
} else {
return make_prepended_input_stream(std::move(data),
make_file_input_stream(_index_file, this->position(), promoted_index_size - data_size, _options).detach());
}
}();
cursor = std::make_unique<scanning_clustered_index_cursor>(_s, continuous_data_consumer::_permit,
std::move(promoted_index_stream), promoted_index_size, _num_pi_blocks, _ck_values_fixed_lengths);
}
pi = std::make_unique<promoted_index>(_s, *_deletion_time, promoted_index_size, std::move(cursor));
} else {
_num_pi_blocks = 0;
}
_consumer.consume_entry(index_entry{_s, std::move(_key), _position, std::move(pi)}, _entry_offset);
_deletion_time = std::nullopt;
_num_pi_blocks = 0;
_state = state::START;
if (promoted_index_size <= data_size) {
data.trim_front(promoted_index_size);
} else {
data.trim(0);
sstlog.trace("{}: skip {} pos {} state {}", fmt::ptr(this), promoted_index_size - data_size, current_pos(), _state);
return skip_bytes{promoted_index_size - data_size};
}
}
break;
}
sstlog.trace("{}: exit pos {} state {}", fmt::ptr(this), current_pos(), _state);
return proceed::yes;
}
index_consume_entry_context(reader_permit permit, IndexConsumer& consumer, trust_promoted_index trust_pi, const schema& s,
sstring file_name, file index_file, file_input_stream_options options, uint64_t start,
uint64_t maxlen, std::optional<column_values_fixed_lengths> ck_values_fixed_lengths, tracing::trace_state_ptr trace_state = {})
: continuous_data_consumer(std::move(permit), make_file_input_stream(index_file, start, maxlen, options), start, maxlen)
, _consumer(consumer), _file_name(std::move(file_name)), _index_file(index_file), _options(options)
, _entry_offset(start), _trust_pi(trust_pi), _s(s), _ck_values_fixed_lengths(std::move(ck_values_fixed_lengths))
, _use_binary_search(is_mc_format() && use_binary_search_in_promoted_index)
, _trace_state(std::move(trace_state))
{}
void reset(uint64_t offset) {
_state = state::START;
_entry_offset = offset;
_consumer.reset();
}
};
// Less-comparator for lookups in the partition index.
class index_comparator {
dht::ring_position_comparator_for_sstables _tri_cmp;
public:
index_comparator(const schema& s) : _tri_cmp(s) {}
bool operator()(const summary_entry& e, dht::ring_position_view rp) const {
return _tri_cmp(e.get_decorated_key(), rp) < 0;
}
bool operator()(const index_entry& e, dht::ring_position_view rp) const {
return _tri_cmp(e.get_decorated_key(), rp) < 0;
}
bool operator()(dht::ring_position_view rp, const summary_entry& e) const {
return _tri_cmp(e.get_decorated_key(), rp) > 0;
}
bool operator()(dht::ring_position_view rp, const index_entry& e) const {
return _tri_cmp(e.get_decorated_key(), rp) > 0;
}
};
inline static
future<> close_index_list(shared_index_lists::list_ptr& list) {
if (list) {
return parallel_for_each(*list, [](index_entry &ie) {
return ie.close_pi_stream();
}).finally([&list] {
list = {};
});
}
return make_ready_future<>();
}
// Stores information about open end RT marker
// of the lower index bound
struct open_rt_marker {
position_in_partition pos;
tombstone tomb;
};
// Contains information about index_reader position in the index file
struct index_bound {
index_bound() = default;
shared_index_lists::list_ptr current_list;
uint64_t previous_summary_idx = 0;
uint64_t current_summary_idx = 0;
uint64_t current_index_idx = 0;
uint64_t current_pi_idx = 0; // Points to upper bound of the cursor.
uint64_t data_file_position = 0;
indexable_element element = indexable_element::partition;
std::optional<open_rt_marker> end_open_marker;
};
// Provides access to sstable indexes.
//
// Maintains logical cursors to sstable elements (partitions, cells).
// Holds two cursors pointing to the range within sstable (upper cursor may be not set).
// Initially the lower cursor is positioned on the first partition in the sstable.
// Lower cursor can be accessed and advanced from outside.
// Upper cursor can only be advanced along with the lower cursor and not accessed from outside.
//
// If eof() then the lower bound cursor is positioned past all partitions in the sstable.
class index_reader {
shared_sstable _sstable;
reader_permit _permit;
const io_priority_class& _pc;
tracing::trace_state_ptr _trace_state;
shared_index_lists _index_lists;
future<> _background_closes = make_ready_future<>();
struct reader {
index_consumer _consumer;
index_consume_entry_context<index_consumer> _context;
static file get_file(sstable& sst, reader_permit permit, tracing::trace_state_ptr trace_state) {
auto f = make_tracked_file(sst._index_file, std::move(permit));
if (!trace_state) {
return f;
}
return tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", sst.filename(component_type::Index)));
}
inline static file_input_stream_options get_file_input_stream_options(shared_sstable sst, const io_priority_class& pc) {
file_input_stream_options options;
options.buffer_size = sst->sstable_buffer_size;
options.read_ahead = 2;
options.io_priority_class = pc;
options.dynamic_adjustments = sst->_index_history;
return options;
}
reader(shared_sstable sst, reader_permit permit, const io_priority_class& pc, tracing::trace_state_ptr trace_state, uint64_t begin, uint64_t end, uint64_t quantity)
: _consumer(quantity)
, _context(permit, _consumer,
trust_promoted_index(sst->has_correct_promoted_index_entries()), *sst->_schema,
trace_state ? sst->filename(component_type::Index) : sstring(),
get_file(*sst, permit, trace_state),
get_file_input_stream_options(sst, pc), begin, end - begin,
(sst->get_version() >= sstable_version_types::mc
? std::make_optional(get_clustering_values_fixed_lengths(sst->get_serialization_header()))
: std::optional<column_values_fixed_lengths>{}),
trace_state)
{ }
};
private:
index_bound _lower_bound;
// Upper bound may remain uninitialized
std::optional<index_bound> _upper_bound;
private:
void advance_to_end(index_bound& bound) {
sstlog.trace("index {}: advance_to_end() bound {}", fmt::ptr(this), fmt::ptr(&bound));
bound.data_file_position = data_file_end();
bound.element = indexable_element::partition;
bound.current_list = {};
bound.end_open_marker.reset();
}
// Must be called for non-decreasing summary_idx.
future<> advance_to_page(index_bound& bound, uint64_t summary_idx) {
sstlog.trace("index {}: advance_to_page({}), bound {}", fmt::ptr(this), summary_idx, fmt::ptr(&bound));
assert(!bound.current_list || bound.current_summary_idx <= summary_idx);
if (bound.current_list && bound.current_summary_idx == summary_idx) {
sstlog.trace("index {}: same page", fmt::ptr(this));
return make_ready_future<>();
}
auto& summary = _sstable->get_summary();
if (summary_idx >= summary.header.size) {
sstlog.trace("index {}: eof", fmt::ptr(this));
advance_to_end(bound);
return make_ready_future<>();
}
auto loader = [this] (uint64_t summary_idx) -> future<index_list> {
auto& summary = _sstable->get_summary();
uint64_t position = summary.entries[summary_idx].position;
uint64_t quantity = downsampling::get_effective_index_interval_after_index(summary_idx, summary.header.sampling_level,
summary.header.min_index_interval);
uint64_t end;
if (summary_idx + 1 >= summary.header.size) {
end = _sstable->index_size();
} else {
end = summary.entries[summary_idx + 1].position;
}
return do_with(std::make_unique<reader>(_sstable, _permit, _pc, _trace_state, position, end, quantity), [this, summary_idx] (auto& entries_reader) {
return entries_reader->_context.consume_input().then_wrapped([this, summary_idx, &entries_reader] (future<> f) {
std::exception_ptr ex;
if (f.failed()) {
ex = f.get_exception();
sstlog.error("failed reading index for {}: {}", _sstable->get_filename(), ex);
}
auto indexes = std::move(entries_reader->_consumer.indexes);
return entries_reader->_context.close().then([indexes = std::move(indexes), ex = std::move(ex)] () mutable {
if (ex) {
return do_with(std::move(indexes), [ex = std::move(ex)] (index_list& indexes) mutable {
return parallel_for_each(indexes, [] (index_entry& ie) mutable {
return ie.close_pi_stream();
}).then_wrapped([ex = std::move(ex)] (future<>&& fut) mutable {
fut.ignore_ready_future();
return make_exception_future<index_list>(std::move(ex));
});
});
}
return make_ready_future<index_list>(std::move(indexes));
});
});
});
};
return _index_lists.get_or_load(summary_idx, loader).then([this, &bound, summary_idx] (shared_index_lists::list_ptr ref) {
// to make sure list is not closed when another bound is still using it, index list will only be closed when there's only one owner holding it
if (bound.current_list && bound.current_list.use_count() == 1) {
// a new background close will only be initiated when previous ones terminate, so as to limit the concurrency.
_background_closes = _background_closes.then_wrapped([current_list = std::move(bound.current_list)] (future<>&& f) mutable {
f.ignore_ready_future();
return do_with(std::move(current_list), [] (shared_index_lists::list_ptr& current_list) mutable {
return close_index_list(current_list);
});
});
}
bound.current_list = std::move(ref);
bound.current_summary_idx = summary_idx;
bound.current_index_idx = 0;
bound.current_pi_idx = 0;
if (bound.current_list->empty()) {
throw malformed_sstable_exception("missing index entry", _sstable->filename(component_type::Index));
}
bound.data_file_position = (*bound.current_list)[0].position();
bound.element = indexable_element::partition;
bound.end_open_marker.reset();
if (sstlog.is_enabled(seastar::log_level::trace)) {
sstlog.trace("index {} bound {}: page:", fmt::ptr(this), fmt::ptr(&bound));
for (const index_entry& e : *bound.current_list) {
auto dk = dht::decorate_key(*_sstable->_schema,
e.get_key().to_partition_key(*_sstable->_schema));
sstlog.trace(" {} -> {}", dk, e.position());
}
}
});
}
future<> advance_lower_to_start(const dht::partition_range &range) {
if (range.start()) {
return advance_to(_lower_bound,
dht::ring_position_view(range.start()->value(),
dht::ring_position_view::after_key(!range.start()->is_inclusive())));
}
return make_ready_future<>();
}
future<> advance_upper_to_end(const dht::partition_range &range) {
if (!_upper_bound) {
_upper_bound.emplace();
}
if (range.end()) {
return advance_to(*_upper_bound,
dht::ring_position_view(range.end()->value(),
dht::ring_position_view::after_key(range.end()->is_inclusive())));
}
advance_to_end(*_upper_bound);
return make_ready_future<>();
}
// Tells whether details about current partition can be accessed.
// If this returns false, you have to call read_partition_data().
//
// Calling read_partition_data() may involve doing I/O. The reason
// why control over this is exposed and not done under the hood is that
// in some cases it only makes sense to access partition details from index
// if it is readily available, and if it is not, we're better off obtaining
// them by continuing reading from sstable.
bool partition_data_ready(const index_bound& bound) const {
return static_cast<bool>(bound.current_list);
}
// Valid if partition_data_ready(bound)
index_entry& current_partition_entry(index_bound& bound) {
assert(bound.current_list);
return (*bound.current_list)[bound.current_index_idx];
}
future<> advance_to_next_partition(index_bound& bound) {
sstlog.trace("index {} bound {}: advance_to_next_partition()", fmt::ptr(&bound), fmt::ptr(this));
if (!partition_data_ready(bound)) {
return advance_to_page(bound, 0).then([this, &bound] {
return advance_to_next_partition(bound);
});
}
if (bound.current_index_idx + 1 < bound.current_list->size()) {
++bound.current_index_idx;
bound.current_pi_idx = 0;
bound.data_file_position = (*bound.current_list)[bound.current_index_idx].position();
bound.element = indexable_element::partition;
bound.end_open_marker.reset();
return make_ready_future<>();
}
auto& summary = _sstable->get_summary();
if (bound.current_summary_idx + 1 < summary.header.size) {
return advance_to_page(bound, bound.current_summary_idx + 1);
}
advance_to_end(bound);
return make_ready_future<>();
}
future<> advance_to(index_bound& bound, dht::ring_position_view pos) {
sstlog.trace("index {} bound {}: advance_to({}), _previous_summary_idx={}, _current_summary_idx={}",
fmt::ptr(this), fmt::ptr(&bound), pos, bound.previous_summary_idx, bound.current_summary_idx);
if (pos.is_min()) {
sstlog.trace("index {}: first entry", fmt::ptr(this));
return make_ready_future<>();
} else if (pos.is_max()) {
advance_to_end(bound);
return make_ready_future<>();
}
auto& summary = _sstable->get_summary();
bound.previous_summary_idx = std::distance(std::begin(summary.entries),
std::lower_bound(summary.entries.begin() + bound.previous_summary_idx, summary.entries.end(), pos, index_comparator(*_sstable->_schema)));
if (bound.previous_summary_idx == 0) {
sstlog.trace("index {}: first entry", fmt::ptr(this));
return make_ready_future<>();
}
auto summary_idx = bound.previous_summary_idx - 1;
sstlog.trace("index {}: summary_idx={}", fmt::ptr(this), summary_idx);
// Despite the requirement that the values of 'pos' in subsequent calls
// are increasing we still may encounter a situation when we try to read
// the previous bucket.
// For example, let's say we have index like this:
// summary: A K ...
// index: A C D F K M N O ...
// Now, we want to get positions for range [G, J]. We start with [G,
// summary look up will tel us to check the first bucket. However, there
// is no G in that bucket so we read the following one to get the
// position (see the advance_to_page() call below). After we've got it, it's time to
// get J] position. Again, summary points us to the first bucket and we
// hit an assert since the reader is already at the second bucket and we
// cannot go backward.
// The solution is this condition above. If our lookup requires reading
// the previous bucket we assume that the entry doesn't exist and return
// the position of the first one in the current index bucket.
if (summary_idx + 1 == bound.current_summary_idx) {
return make_ready_future<>();
}
return advance_to_page(bound, summary_idx).then([this, &bound, pos, summary_idx] {
sstlog.trace("index {}: old page index = {}", fmt::ptr(this), bound.current_index_idx);
auto& entries = *bound.current_list;
auto i = std::lower_bound(std::begin(entries) + bound.current_index_idx, std::end(entries), pos, index_comparator(*_sstable->_schema));
if (i == std::end(entries)) {
sstlog.trace("index {}: not found", fmt::ptr(this));
return advance_to_page(bound, summary_idx + 1);
}
bound.current_index_idx = std::distance(std::begin(entries), i);
bound.current_pi_idx = 0;
bound.data_file_position = i->position();
bound.element = indexable_element::partition;
bound.end_open_marker.reset();
sstlog.trace("index {}: new page index = {}, pos={}", fmt::ptr(this), bound.current_index_idx, bound.data_file_position);
return make_ready_future<>();
});
}
// Forwards the upper bound cursor to a position which is greater than given position in current partition.
//
// Note that the index within partition, unlike the partition index, doesn't cover all keys.
// So this may not forward to the smallest position which is greater than pos.
//
// May advance to the next partition if it's not possible to find a suitable position inside
// current partition.
//
// Must be called only when !eof().
future<> advance_upper_past(position_in_partition_view pos) {
sstlog.trace("index {}: advance_upper_past({})", fmt::ptr(this), pos);
// We advance cursor within the current lower bound partition
// So need to make sure first that it is read
if (!partition_data_ready(_lower_bound)) {
return read_partition_data().then([this, pos] {
assert(partition_data_ready());
return advance_upper_past(pos);
});
}
if (!_upper_bound) {
_upper_bound = _lower_bound;
}
index_entry& e = current_partition_entry(*_upper_bound);
if (!e.get_promoted_index()) {
sstlog.trace("index {}: no promoted index", fmt::ptr(this));
return advance_to_next_partition(*_upper_bound);
}
promoted_index& pi = *e.get_promoted_index();
return pi.cursor().probe_upper_bound(pos).then([this, &e] (std::optional<clustered_index_cursor::offset_in_partition> off) {
if (!off) {
return advance_to_next_partition(*_upper_bound);
}
_upper_bound->data_file_position = e.position() + *off;
_upper_bound->element = indexable_element::cell;
sstlog.trace("index {} upper bound: skipped to cell, _data_file_position={}", fmt::ptr(this), _upper_bound->data_file_position);
return make_ready_future<>();
});
}
// Returns position right after all partitions in the sstable
uint64_t data_file_end() const {
return _sstable->data_size();
}
public:
index_reader(shared_sstable sst, reader_permit permit, const io_priority_class& pc, tracing::trace_state_ptr trace_state)
: _sstable(std::move(sst))
, _permit(std::move(permit))
, _pc(pc)
, _trace_state(std::move(trace_state))
{
sstlog.trace("index {}: index_reader for {}", fmt::ptr(this), _sstable->get_filename());
}
// Ensures that partition_data_ready() returns true.
// Can be called only when !eof()
future<> read_partition_data() {
assert(!eof());
if (partition_data_ready(_lower_bound)) {
return make_ready_future<>();
}
// The only case when _current_list may be missing is when the cursor is at the beginning
assert(_lower_bound.current_summary_idx == 0);
return advance_to_page(_lower_bound, 0);
}
// Advance index_reader bounds to the bounds of the supplied range
future<> advance_to(const dht::partition_range& range) {
return seastar::when_all_succeed(
advance_lower_to_start(range),
advance_upper_to_end(range)).discard_result();
}
// Get current index entry
index_entry& current_partition_entry() {
return current_partition_entry(_lower_bound);
}
// Returns tombstone for the current partition if it was recorded in the sstable.
// It may be unavailable for old sstables for which this information was not generated.
// Can be called only when partition_data_ready().
std::optional<sstables::deletion_time> partition_tombstone() {
return current_partition_entry(_lower_bound).get_deletion_time();
}
// Returns the key for current partition.
// Can be called only when partition_data_ready().
// The result is valid as long as index_reader is valid.
key_view partition_key() {
index_entry& e = current_partition_entry(_lower_bound);
return e.get_key();
}
bool partition_data_ready() const {
return partition_data_ready(_lower_bound);
}
// Forwards the cursor to the given position in the current partition.
//
// Note that the index within partition, unlike the partition index, doesn't cover all keys.
// So this may forward the cursor to some position pos' which precedes pos, even though
// there exist rows with positions in the range [pos', pos].
//
// Must be called for non-decreasing positions.
// Must be called only after advanced to some partition and !eof().
future<> advance_to(position_in_partition_view pos) {
sstlog.trace("index {}: advance_to({}), current data_file_pos={}",
fmt::ptr(this), pos, _lower_bound.data_file_position);
const schema& s = *_sstable->_schema;
if (pos.is_before_all_fragments(s)) {
return make_ready_future<>();
}
if (!partition_data_ready()) {
return read_partition_data().then([this, pos] {
sstlog.trace("index {}: page done", fmt::ptr(this));
assert(partition_data_ready(_lower_bound));
return advance_to(pos);
});
}
index_entry& e = current_partition_entry();
if (!e.get_promoted_index()) {
sstlog.trace("index {}: no promoted index", fmt::ptr(this));
return make_ready_future<>();
}
promoted_index& pi = *e.get_promoted_index();
return pi.cursor().advance_to(pos).then([this, &e] (std::optional<clustered_index_cursor::skip_info> si) {
if (!si) {
sstlog.trace("index {}: position in the same block", fmt::ptr(this));
return;
}
if (!si->active_tombstone) {
// End open marker can be only engaged in SSTables 3.x ('mc' format) and never in ka/la
_lower_bound.end_open_marker.reset();
} else {
_lower_bound.end_open_marker = open_rt_marker{std::move(si->active_tombstone_pos), si->active_tombstone};
}
_lower_bound.data_file_position = e.position() + si->offset;
_lower_bound.element = indexable_element::cell;
sstlog.trace("index {}: skipped to cell, _data_file_position={}", fmt::ptr(this), _lower_bound.data_file_position);
});
}
// Like advance_to(dht::ring_position_view), but returns information whether the key was found
// If upper_bound is provided, the upper bound within position is looked up
future<bool> advance_lower_and_check_if_present(
dht::ring_position_view key, std::optional<position_in_partition_view> pos = {}) {
return advance_to(_lower_bound, key).then([this, key, pos] {
if (eof()) {
return make_ready_future<bool>(false);
}
return read_partition_data().then([this, key, pos] {
index_comparator cmp(*_sstable->_schema);
bool found = cmp(key, current_partition_entry(_lower_bound)) == 0;
if (!found || !pos) {
return make_ready_future<bool>(found);
}
return advance_upper_past(*pos).then([] {
return make_ready_future<bool>(true);
});
});
});
}
// Moves the cursor to the beginning of next partition.
// Can be called only when !eof().
future<> advance_to_next_partition() {
return advance_to_next_partition(_lower_bound);
}
// Positions the cursor on the first partition which is not smaller than pos (like std::lower_bound).
// Must be called for non-decreasing positions.
future<> advance_to(dht::ring_position_view pos) {
return advance_to(_lower_bound, pos);
}
struct data_file_positions_range {
uint64_t start;
std::optional<uint64_t> end;
};
// Returns positions in the data file of the cursor.
// End position may be unset
data_file_positions_range data_file_positions() const {
data_file_positions_range result;
result.start = _lower_bound.data_file_position;
if (_upper_bound) {
result.end = _upper_bound->data_file_position;
}
return result;
}
// Returns the kind of sstable element the cursor is pointing at.
indexable_element element_kind() const {
return _lower_bound.element;
}
std::optional<open_rt_marker> end_open_marker() const {
return _lower_bound.end_open_marker;
}
bool eof() const {
return _lower_bound.data_file_position == data_file_end();
}
future<> close() {
// Need to close consequently as we expect to not have close_current_list_ptr to run in parallel
return close_index_list(_lower_bound.current_list).then([this] {
if (_upper_bound) {
return close_index_list(_upper_bound->current_list);
}
return make_ready_future<>();
}).then([this] () mutable {
return std::move(_background_closes);
});
}
};
}