sstables: index_reader: Do not store cluster index cursor inside partition indexes
Currently, the partition index page parser will create and store promoted index cursors for each entry. The assumption is that partition index pages are not shared by readers so each promoted index cursor will be used by a single index_reader (the top-level cursor). In order to be able to share partition index entries we must make the entries immutable and thus move the cursor outside. The promoted index cursor is now created and owned by each index_reader. There is at most one such active cursor per index_reader bound (lower/upper).
This commit is contained in:
@@ -30,6 +30,8 @@
|
||||
#include "utils/overloaded_functor.hh"
|
||||
#include "sstables/mx/parsers.hh"
|
||||
|
||||
#include <seastar/core/fstream.hh>
|
||||
|
||||
namespace sstables {
|
||||
|
||||
using promoted_index_block_position_view = std::variant<composite_view, position_in_partition_view>;
|
||||
@@ -398,21 +400,37 @@ public:
|
||||
|
||||
class promoted_index {
|
||||
deletion_time _del_time;
|
||||
file _index_file;
|
||||
uint64_t _promoted_index_start;
|
||||
uint32_t _promoted_index_size;
|
||||
std::unique_ptr<clustered_index_cursor> _cursor;
|
||||
bool _reader_closed = false;
|
||||
uint32_t _num_blocks;
|
||||
temporary_buffer<char> _front; // pre-read front of the promoted index.
|
||||
bool _use_binary_search;
|
||||
public:
|
||||
promoted_index(const schema& s, deletion_time del_time, uint32_t promoted_index_size, std::unique_ptr<clustered_index_cursor> index)
|
||||
promoted_index(const schema& s,
|
||||
deletion_time del_time,
|
||||
file index_file,
|
||||
uint64_t promoted_index_start,
|
||||
uint32_t promoted_index_size,
|
||||
uint32_t num_blocks,
|
||||
temporary_buffer<char> front,
|
||||
bool use_binary_search)
|
||||
: _del_time{del_time}
|
||||
, _index_file(std::move(index_file))
|
||||
, _promoted_index_start(promoted_index_start)
|
||||
, _promoted_index_size(promoted_index_size)
|
||||
, _cursor(std::move(index))
|
||||
, _num_blocks(num_blocks)
|
||||
, _front(std::move(front))
|
||||
, _use_binary_search(use_binary_search)
|
||||
{ }
|
||||
|
||||
[[nodiscard]] deletion_time get_deletion_time() const { return _del_time; }
|
||||
[[nodiscard]] uint32_t get_promoted_index_size() const { return _promoted_index_size; }
|
||||
[[nodiscard]] clustered_index_cursor& cursor() { return *_cursor; };
|
||||
[[nodiscard]] const clustered_index_cursor& cursor() const { return *_cursor; };
|
||||
future<> close_reader() { return _cursor->close(); }
|
||||
|
||||
std::unique_ptr<clustered_index_cursor> make_cursor(shared_sstable,
|
||||
reader_permit,
|
||||
tracing::trace_state_ptr,
|
||||
file_input_stream_options);
|
||||
};
|
||||
|
||||
class index_entry {
|
||||
@@ -466,10 +484,6 @@ public:
|
||||
uint32_t get_promoted_index_size() const { return _index ? _index->get_promoted_index_size() : 0; }
|
||||
|
||||
future<> close_pi_stream() {
|
||||
if (_index) {
|
||||
return _index->close_reader();
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -74,7 +74,6 @@ private:
|
||||
IndexConsumer& _consumer;
|
||||
sstring _file_name;
|
||||
file _index_file;
|
||||
file_input_stream_options _options;
|
||||
uint64_t _entry_offset;
|
||||
|
||||
enum class state {
|
||||
@@ -232,33 +231,17 @@ public:
|
||||
auto data_size = data.size();
|
||||
std::unique_ptr<promoted_index> pi;
|
||||
if ((_trust_pi == trust_promoted_index::yes) && (promoted_index_size > 0)) {
|
||||
std::unique_ptr<clustered_index_cursor> cursor;
|
||||
if (_use_binary_search) {
|
||||
cached_file f(_index_file, continuous_data_consumer::_permit, index_page_cache_metrics,
|
||||
promoted_index_start, promoted_index_size, _file_name);
|
||||
temporary_buffer<char> buf = [&] {
|
||||
if (promoted_index_size <= data_size) {
|
||||
f.populate_front(data.share());
|
||||
auto buf = data.share();
|
||||
buf.trim(promoted_index_size);
|
||||
return buf;
|
||||
} else {
|
||||
f.populate_front(std::move(data));
|
||||
return std::move(data);
|
||||
}
|
||||
cursor = std::make_unique<mc::bsearch_clustered_cursor>(_s,
|
||||
promoted_index_cache_metrics, continuous_data_consumer::_permit,
|
||||
*_ck_values_fixed_lengths, std::move(f), _options.io_priority_class, _num_pi_blocks, _trace_state);
|
||||
} else {
|
||||
input_stream<char> promoted_index_stream = [&] {
|
||||
if (promoted_index_size <= data_size) {
|
||||
auto buf = data.share();
|
||||
buf.trim(promoted_index_size);
|
||||
return make_buffer_input_stream(std::move(buf));
|
||||
} else {
|
||||
return make_prepended_input_stream(std::move(data),
|
||||
make_file_input_stream(_index_file, this->position(), promoted_index_size - data_size, _options).detach());
|
||||
}
|
||||
}();
|
||||
cursor = std::make_unique<scanning_clustered_index_cursor>(_s, continuous_data_consumer::_permit,
|
||||
std::move(promoted_index_stream), promoted_index_size, _num_pi_blocks, _ck_values_fixed_lengths);
|
||||
}
|
||||
pi = std::make_unique<promoted_index>(_s, *_deletion_time, promoted_index_size, std::move(cursor));
|
||||
}();
|
||||
pi = std::make_unique<promoted_index>(_s, *_deletion_time, _index_file,
|
||||
promoted_index_start, promoted_index_size, _num_pi_blocks, std::move(buf), _use_binary_search);
|
||||
} else {
|
||||
_num_pi_blocks = 0;
|
||||
}
|
||||
@@ -281,10 +264,10 @@ public:
|
||||
}
|
||||
|
||||
index_consume_entry_context(reader_permit permit, IndexConsumer& consumer, trust_promoted_index trust_pi, const schema& s,
|
||||
sstring file_name, file index_file, file_input_stream_options options, uint64_t start,
|
||||
file index_file, file_input_stream_options options, uint64_t start,
|
||||
uint64_t maxlen, std::optional<column_values_fixed_lengths> ck_values_fixed_lengths, tracing::trace_state_ptr trace_state = {})
|
||||
: continuous_data_consumer(std::move(permit), make_file_input_stream(index_file, start, maxlen, options), start, maxlen)
|
||||
, _consumer(consumer), _file_name(std::move(file_name)), _index_file(index_file), _options(options)
|
||||
, _consumer(consumer), _index_file(index_file)
|
||||
, _entry_offset(start), _trust_pi(trust_pi), _s(s), _ck_values_fixed_lengths(std::move(ck_values_fixed_lengths))
|
||||
, _use_binary_search(is_mc_format() && use_binary_search_in_promoted_index)
|
||||
, _trace_state(std::move(trace_state))
|
||||
@@ -297,6 +280,48 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
inline
|
||||
std::unique_ptr<clustered_index_cursor> promoted_index::make_cursor(shared_sstable sst,
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
file_input_stream_options options)
|
||||
{
|
||||
std::optional<column_values_fixed_lengths> ck_values_fixed_lengths;
|
||||
if (sst->get_version() >= sstable_version_types::mc) {
|
||||
ck_values_fixed_lengths = std::make_optional(
|
||||
get_clustering_values_fixed_lengths(sst->get_serialization_header()));
|
||||
}
|
||||
|
||||
if (_use_binary_search) {
|
||||
cached_file f(_index_file, permit,
|
||||
index_page_cache_metrics,
|
||||
_promoted_index_start,
|
||||
_promoted_index_size,
|
||||
trace_state ? sst->filename(component_type::Index) : sstring());
|
||||
|
||||
f.populate_front(_front.share());
|
||||
|
||||
return std::make_unique<mc::bsearch_clustered_cursor>(*sst->get_schema(),
|
||||
promoted_index_cache_metrics, permit,
|
||||
*ck_values_fixed_lengths, std::move(f), options.io_priority_class, _num_blocks, trace_state);
|
||||
}
|
||||
|
||||
input_stream<char> promoted_index_stream = [&] {
|
||||
if (_promoted_index_size <= _front.size()) {
|
||||
return make_buffer_input_stream(_front.share());
|
||||
} else {
|
||||
return make_prepended_input_stream(_front.share(),
|
||||
make_file_input_stream(_index_file,
|
||||
_promoted_index_start + _front.size(),
|
||||
_promoted_index_size - _front.size(),
|
||||
options).detach());
|
||||
}
|
||||
}();
|
||||
|
||||
return std::make_unique<scanning_clustered_index_cursor>(*sst->get_schema(), permit,
|
||||
std::move(promoted_index_stream), _promoted_index_size, _num_blocks, ck_values_fixed_lengths);
|
||||
}
|
||||
|
||||
// Less-comparator for lookups in the partition index.
|
||||
class index_comparator {
|
||||
dht::ring_position_comparator_for_sstables _tri_cmp;
|
||||
@@ -350,6 +375,24 @@ struct index_bound {
|
||||
uint64_t data_file_position = 0;
|
||||
indexable_element element = indexable_element::partition;
|
||||
std::optional<open_rt_marker> end_open_marker;
|
||||
|
||||
// Holds the cursor for the current partition. Lazily initialized.
|
||||
std::unique_ptr<clustered_index_cursor> clustered_cursor;
|
||||
|
||||
// Cannot use default implementation because clustered_cursor is non-copyable.
|
||||
index_bound(const index_bound& other)
|
||||
: current_list(other.current_list)
|
||||
, previous_summary_idx(other.previous_summary_idx)
|
||||
, current_summary_idx(other.current_summary_idx)
|
||||
, current_index_idx(other.current_index_idx)
|
||||
, current_pi_idx(other.current_pi_idx)
|
||||
, data_file_position(other.data_file_position)
|
||||
, element(other.element)
|
||||
, end_open_marker(other.end_open_marker)
|
||||
{ }
|
||||
|
||||
index_bound(index_bound&&) noexcept = default;
|
||||
index_bound& operator=(index_bound&&) noexcept = default;
|
||||
};
|
||||
|
||||
// Provides access to sstable indexes.
|
||||
@@ -393,7 +436,6 @@ class index_reader {
|
||||
: _consumer(quantity)
|
||||
, _context(permit, _consumer,
|
||||
trust_promoted_index(sst->has_correct_promoted_index_entries()), *sst->_schema,
|
||||
trace_state ? sst->filename(component_type::Index) : sstring(),
|
||||
get_file(*sst, permit, trace_state),
|
||||
get_file_input_stream_options(sst, pc), begin, end - begin,
|
||||
(sst->get_version() >= sstable_version_types::mc
|
||||
@@ -409,12 +451,22 @@ private:
|
||||
std::optional<index_bound> _upper_bound;
|
||||
|
||||
private:
|
||||
void advance_to_end(index_bound& bound) {
|
||||
static future<> reset_clustered_cursor(index_bound& bound) {
|
||||
if (bound.clustered_cursor) {
|
||||
return bound.clustered_cursor->close().then([&bound] {
|
||||
bound.clustered_cursor.reset();
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> advance_to_end(index_bound& bound) {
|
||||
sstlog.trace("index {}: advance_to_end() bound {}", fmt::ptr(this), fmt::ptr(&bound));
|
||||
bound.data_file_position = data_file_end();
|
||||
bound.element = indexable_element::partition;
|
||||
bound.current_list = {};
|
||||
bound.end_open_marker.reset();
|
||||
return reset_clustered_cursor(bound);
|
||||
}
|
||||
|
||||
// Must be called for non-decreasing summary_idx.
|
||||
@@ -429,8 +481,7 @@ private:
|
||||
auto& summary = _sstable->get_summary();
|
||||
if (summary_idx >= summary.header.size) {
|
||||
sstlog.trace("index {}: eof", fmt::ptr(this));
|
||||
advance_to_end(bound);
|
||||
return make_ready_future<>();
|
||||
return advance_to_end(bound);
|
||||
}
|
||||
auto loader = [this] (uint64_t summary_idx) -> future<index_list> {
|
||||
auto& summary = _sstable->get_summary();
|
||||
@@ -491,6 +542,8 @@ private:
|
||||
sstlog.trace(" {} -> {}", dk, e.position());
|
||||
}
|
||||
}
|
||||
|
||||
return reset_clustered_cursor(bound);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -512,8 +565,7 @@ private:
|
||||
dht::ring_position_view(range.end()->value(),
|
||||
dht::ring_position_view::after_key(range.end()->is_inclusive())));
|
||||
}
|
||||
advance_to_end(*_upper_bound);
|
||||
return make_ready_future<>();
|
||||
return advance_to_end(*_upper_bound);
|
||||
}
|
||||
|
||||
// Tells whether details about current partition can be accessed.
|
||||
@@ -547,14 +599,13 @@ private:
|
||||
bound.data_file_position = (*bound.current_list)[bound.current_index_idx].position();
|
||||
bound.element = indexable_element::partition;
|
||||
bound.end_open_marker.reset();
|
||||
return make_ready_future<>();
|
||||
return reset_clustered_cursor(bound);
|
||||
}
|
||||
auto& summary = _sstable->get_summary();
|
||||
if (bound.current_summary_idx + 1 < summary.header.size) {
|
||||
return advance_to_page(bound, bound.current_summary_idx + 1);
|
||||
}
|
||||
advance_to_end(bound);
|
||||
return make_ready_future<>();
|
||||
return advance_to_end(bound);
|
||||
}
|
||||
|
||||
future<> advance_to(index_bound& bound, dht::ring_position_view pos) {
|
||||
@@ -565,8 +616,7 @@ private:
|
||||
sstlog.trace("index {}: first entry", fmt::ptr(this));
|
||||
return make_ready_future<>();
|
||||
} else if (pos.is_max()) {
|
||||
advance_to_end(bound);
|
||||
return make_ready_future<>();
|
||||
return advance_to_end(bound);
|
||||
}
|
||||
|
||||
auto& summary = _sstable->get_summary();
|
||||
@@ -616,7 +666,7 @@ private:
|
||||
bound.element = indexable_element::partition;
|
||||
bound.end_open_marker.reset();
|
||||
sstlog.trace("index {}: new page index = {}, pos={}", fmt::ptr(this), bound.current_index_idx, bound.data_file_position);
|
||||
return make_ready_future<>();
|
||||
return reset_clustered_cursor(bound);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -646,14 +696,14 @@ private:
|
||||
}
|
||||
|
||||
index_entry& e = current_partition_entry(*_upper_bound);
|
||||
clustered_index_cursor* cur = current_clustered_cursor(*_upper_bound);
|
||||
|
||||
if (!e.get_promoted_index()) {
|
||||
if (!cur) {
|
||||
sstlog.trace("index {}: no promoted index", fmt::ptr(this));
|
||||
return advance_to_next_partition(*_upper_bound);
|
||||
}
|
||||
|
||||
promoted_index& pi = *e.get_promoted_index();
|
||||
return pi.cursor().probe_upper_bound(pos).then([this, &e] (std::optional<clustered_index_cursor::offset_in_partition> off) {
|
||||
return cur->probe_upper_bound(pos).then([this, &e] (std::optional<clustered_index_cursor::offset_in_partition> off) {
|
||||
if (!off) {
|
||||
return advance_to_next_partition(*_upper_bound);
|
||||
}
|
||||
@@ -669,6 +719,11 @@ private:
|
||||
return _sstable->data_size();
|
||||
}
|
||||
|
||||
future<> close(index_bound& b) {
|
||||
return reset_clustered_cursor(b).then([this, &b] {
|
||||
return close_index_list(b.current_list);
|
||||
});
|
||||
}
|
||||
public:
|
||||
index_reader(shared_sstable sst, reader_permit permit, const io_priority_class& pc, tracing::trace_state_ptr trace_state)
|
||||
: _sstable(std::move(sst))
|
||||
@@ -703,6 +758,31 @@ public:
|
||||
return current_partition_entry(_lower_bound);
|
||||
}
|
||||
|
||||
file_input_stream_options get_file_input_stream_options(const io_priority_class& pc) {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = _sstable->sstable_buffer_size;
|
||||
options.read_ahead = 2;
|
||||
options.io_priority_class = pc;
|
||||
options.dynamic_adjustments = _sstable->_index_history;
|
||||
return options;
|
||||
}
|
||||
|
||||
// Returns a pointer to the clustered index cursor for the current partition
|
||||
// or nullptr if there is no clustered index in the current partition.
|
||||
// Returns the same instance until we move to a different partition.
|
||||
clustered_index_cursor* current_clustered_cursor(index_bound& bound) {
|
||||
if (!bound.clustered_cursor) {
|
||||
index_entry& e = current_partition_entry(bound);
|
||||
promoted_index* pi = e.get_promoted_index().get();
|
||||
if (!pi) {
|
||||
return nullptr;
|
||||
}
|
||||
bound.clustered_cursor = pi->make_cursor(_sstable, _permit, _trace_state,
|
||||
get_file_input_stream_options(_pc));
|
||||
}
|
||||
return &*bound.clustered_cursor;
|
||||
}
|
||||
|
||||
// Returns tombstone for the current partition if it was recorded in the sstable.
|
||||
// It may be unavailable for old sstables for which this information was not generated.
|
||||
// Can be called only when partition_data_ready().
|
||||
@@ -748,13 +828,14 @@ public:
|
||||
}
|
||||
|
||||
index_entry& e = current_partition_entry();
|
||||
if (!e.get_promoted_index()) {
|
||||
clustered_index_cursor* cur = current_clustered_cursor(_lower_bound);
|
||||
|
||||
if (!cur) {
|
||||
sstlog.trace("index {}: no promoted index", fmt::ptr(this));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
promoted_index& pi = *e.get_promoted_index();
|
||||
return pi.cursor().advance_to(pos).then([this, &e] (std::optional<clustered_index_cursor::skip_info> si) {
|
||||
return cur->advance_to(pos).then([this, &e] (std::optional<clustered_index_cursor::skip_info> si) {
|
||||
if (!si) {
|
||||
sstlog.trace("index {}: position in the same block", fmt::ptr(this));
|
||||
return;
|
||||
@@ -834,11 +915,13 @@ public:
|
||||
return _lower_bound.data_file_position == data_file_end();
|
||||
}
|
||||
|
||||
const shared_sstable& sstable() const { return _sstable; }
|
||||
|
||||
future<> close() {
|
||||
// Need to close consequently as we expect to not have close_current_list_ptr to run in parallel
|
||||
return close_index_list(_lower_bound.current_list).then([this] {
|
||||
return close(_lower_bound).then([this] {
|
||||
if (_upper_bound) {
|
||||
return close_index_list(_upper_bound->current_list);
|
||||
return close(*_upper_bound);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
@@ -1761,7 +1761,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
[this, &pc, options = std::move(options), index_file, index_size] (summary_generator& s) mutable {
|
||||
auto sem = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{});
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
|
||||
sem->make_permit(_schema.get(), "generate-summary"), s, trust_promoted_index::yes, *_schema, "", index_file, std::move(options), 0, index_size,
|
||||
sem->make_permit(_schema.get(), "generate-summary"), s, trust_promoted_index::yes, *_schema, index_file, std::move(options), 0, index_size,
|
||||
(_version >= sstable_version_types::mc
|
||||
? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header()))
|
||||
: std::optional<column_values_fixed_lengths>{}));
|
||||
|
||||
@@ -51,9 +51,10 @@ public:
|
||||
|
||||
prev = rp;
|
||||
|
||||
sstables::clustered_index_cursor& cur = e.get_promoted_index()->cursor();
|
||||
std::unique_ptr<sstables::clustered_index_cursor> cur = e.get_promoted_index()->make_cursor(
|
||||
_r->sstable(), tests::make_permit(), nullptr, {});
|
||||
std::optional<sstables::promoted_index_block_position> prev_end;
|
||||
while (auto ei_opt = cur.next_entry().get0()) {
|
||||
while (auto ei_opt = cur->next_entry().get0()) {
|
||||
sstables::clustered_index_cursor::entry_info& ei = *ei_opt;
|
||||
if (prev_end && pos_cmp(ei.start, sstables::to_view(*prev_end))) {
|
||||
BOOST_FAIL(format("Index blocks are not monotonic: {} > {}", *prev_end, ei.start));
|
||||
|
||||
Reference in New Issue
Block a user