Parse promoted index entries lazily upon request rather than immediately.
Now promoted index is converted into an input_stream and skipped over instead of being consumed immediately and stored as a single buffer. The only part that is read right away is the deletion time as it is likely to be there in the already read buffer and reading it should both be cheap and prevent from reading the whole promoted index if only deletion time mark is needed. When accessed, promoted index is parsed in chunks, buffer by buffer, to limit memory consumption. Fixes #2981 Signed-off-by: Vladimir Krivopalov <vladimir@scylladb.com>
This commit is contained in:
256
sstables/index_entry.hh
Normal file
256
sstables/index_entry.hh
Normal file
@@ -0,0 +1,256 @@
|
||||
/*
|
||||
* Copyright (C) 2017 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#include "consumer.hh"
|
||||
#include "types.hh"
|
||||
#include <boost/variant.hpp>
|
||||
#include <seastar/util/variant_utils.hh>
|
||||
|
||||
namespace sstables {
|
||||
|
||||
inline void erase_all_but_last(promoted_index_blocks& pi_blocks) {
|
||||
while (pi_blocks.size() > 1) {
|
||||
pi_blocks.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
// promoted_index_blocks_reader parses the promoted index blocks from the provided stream.
|
||||
// It has two operational modes:
|
||||
// 1. consume_until - in this mode, a position is provided and the reader will read & parse
|
||||
// buffer by buffer until it either finds the upper bound for the given position or exhausts the stream
|
||||
// 2. consume_next - in this mode, the reader unconditionally reads & parses the next buffer and stops
|
||||
//
|
||||
class promoted_index_blocks_reader : public data_consumer::continuous_data_consumer<promoted_index_blocks_reader> {
|
||||
using proceed = data_consumer::proceed;
|
||||
using processing_result = data_consumer::processing_result;
|
||||
using continuous_data_consumer = data_consumer::continuous_data_consumer<promoted_index_blocks_reader>;
|
||||
|
||||
private:
|
||||
enum class consuming_mode {
|
||||
consume_until, // reads/parses buffers until finds an upper bound block for given position
|
||||
consume_next, // reads/parses the next buffer from stream and stops unconditionally
|
||||
};
|
||||
|
||||
uint32_t _total_num_blocks; // the total number of blocks in the stream
|
||||
uint32_t _num_blocks_left; // the number of unread blocks left in the stream
|
||||
const schema& _s;
|
||||
consuming_mode _mode = consuming_mode::consume_next;
|
||||
size_t _current_pi_idx = 0; // for consume_until mode
|
||||
stdx::optional<position_in_partition_view> _pos; // for consume_until mode
|
||||
|
||||
enum class state {
|
||||
START_NAME_LENGTH,
|
||||
START_NAME_BYTES,
|
||||
END_NAME_LENGTH,
|
||||
END_NAME_BYTES,
|
||||
OFFSET,
|
||||
WIDTH,
|
||||
ADD_BLOCK,
|
||||
} _state = state::START_NAME_LENGTH;
|
||||
|
||||
temporary_buffer<char> _start;
|
||||
temporary_buffer<char> _end;
|
||||
uint64_t _offset;
|
||||
uint64_t _width;
|
||||
|
||||
promoted_index_blocks _pi_blocks;
|
||||
|
||||
public:
|
||||
void verify_end_state() {
|
||||
}
|
||||
|
||||
bool non_consuming() const {
|
||||
return (_state == state::ADD_BLOCK);
|
||||
}
|
||||
|
||||
processing_result process_state(temporary_buffer<char>& data) {
|
||||
while (true) {
|
||||
switch (_state) {
|
||||
case state::START_NAME_LENGTH:
|
||||
if (this->read_16(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::START_NAME_BYTES;
|
||||
break;
|
||||
}
|
||||
case state::START_NAME_BYTES:
|
||||
if (this->read_bytes(data, this->_u16, _start) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::END_NAME_LENGTH;
|
||||
break;
|
||||
}
|
||||
case state::END_NAME_LENGTH:
|
||||
if (this->read_16(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::END_NAME_BYTES;
|
||||
break;
|
||||
}
|
||||
case state::END_NAME_BYTES:
|
||||
if (this->read_bytes(data, this->_u16, _end) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::OFFSET;
|
||||
break;
|
||||
}
|
||||
case state::OFFSET:
|
||||
if (this->read_64(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::WIDTH;
|
||||
break;
|
||||
}
|
||||
case state::WIDTH:
|
||||
_offset = this->_u64;
|
||||
if (this->read_64(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::ADD_BLOCK;
|
||||
break;
|
||||
}
|
||||
case state::ADD_BLOCK:
|
||||
_width = this->_u64;
|
||||
_state = state::START_NAME_LENGTH;
|
||||
--_num_blocks_left;
|
||||
_pi_blocks.emplace_back(std::move(_start), std::move(_end), _offset, _width);
|
||||
if (_num_blocks_left == 0) {
|
||||
break;
|
||||
} else {
|
||||
// keep running in the loop until we either are out of data
|
||||
// or have consumed all the blocks
|
||||
continue;
|
||||
}
|
||||
default:
|
||||
throw malformed_sstable_exception("unknown state");
|
||||
}
|
||||
break;
|
||||
};
|
||||
|
||||
if (_mode == consuming_mode::consume_until) {
|
||||
assert(_pos);
|
||||
auto cmp_with_start = [this, pos_cmp = position_in_partition::composite_less_compare(_s)]
|
||||
(position_in_partition_view pos, const promoted_index_block& block) -> bool {
|
||||
return pos_cmp(pos, block.start(_s));
|
||||
};
|
||||
auto i = std::upper_bound(std::begin(_pi_blocks), std::end(_pi_blocks), *_pos, cmp_with_start);
|
||||
_current_pi_idx = std::distance(std::begin(_pi_blocks), i);
|
||||
if ((i != std::end(_pi_blocks)) || (_num_blocks_left == 0)) {
|
||||
return proceed::no;
|
||||
} else {
|
||||
// we need to preserve the last block as if the next one we read
|
||||
// appears to be the upper bound, we will take the data file position
|
||||
// from the previous block
|
||||
erase_all_but_last(_pi_blocks);
|
||||
}
|
||||
}
|
||||
|
||||
return (_mode == consuming_mode::consume_next) ? proceed::no : proceed::yes;
|
||||
}
|
||||
|
||||
uint32_t get_total_num_blocks() const { return _total_num_blocks; }
|
||||
uint32_t get_read_num_blocks() const { return _total_num_blocks - _num_blocks_left; }
|
||||
size_t get_current_pi_index() const { return _current_pi_idx; }
|
||||
void switch_to_consume_next_mode() { _mode = consuming_mode::consume_next; }
|
||||
void switch_to_consume_until_mode(position_in_partition_view pos) { _pos = pos; _mode = consuming_mode::consume_until; }
|
||||
promoted_index_blocks& get_pi_blocks() { return _pi_blocks; };
|
||||
|
||||
promoted_index_blocks_reader(input_stream<char>&& promoted_index_stream, uint32_t num_blocks, const schema& s, uint64_t start, uint64_t maxlen)
|
||||
: continuous_data_consumer(std::move(promoted_index_stream), start, maxlen)
|
||||
, _total_num_blocks(num_blocks)
|
||||
, _num_blocks_left(num_blocks)
|
||||
, _s(s)
|
||||
{}
|
||||
};
|
||||
|
||||
class index_entry {
|
||||
private:
|
||||
temporary_buffer<char> _key;
|
||||
mutable stdx::optional<dht::token> _token;
|
||||
uint64_t _position;
|
||||
stdx::optional<promoted_index_blocks_reader> _reader;
|
||||
uint32_t _promoted_index_size;
|
||||
stdx::optional<deletion_time> _del_time;
|
||||
|
||||
public:
|
||||
|
||||
bytes_view get_key_bytes() const {
|
||||
return to_bytes_view(_key);
|
||||
}
|
||||
|
||||
key_view get_key() const {
|
||||
return key_view{get_key_bytes()};
|
||||
}
|
||||
|
||||
decorated_key_view get_decorated_key() const {
|
||||
if (!_token) {
|
||||
_token.emplace(dht::global_partitioner().get_token(get_key()));
|
||||
}
|
||||
return decorated_key_view(*_token, get_key());
|
||||
}
|
||||
|
||||
uint64_t position() const { return _position; };
|
||||
|
||||
stdx::optional<deletion_time> get_deletion_time() const { return _del_time; }
|
||||
uint32_t get_promoted_index_size() const { return _promoted_index_size; }
|
||||
|
||||
index_entry(temporary_buffer<char>&& key, uint64_t position,
|
||||
stdx::optional<input_stream<char>>&& promoted_index_stream, uint32_t promoted_index_size,
|
||||
stdx::optional<deletion_time>&& del_time, uint32_t num_pi_blocks, const schema& s)
|
||||
: _key(std::move(key))
|
||||
, _position(position)
|
||||
, _promoted_index_size(promoted_index_size)
|
||||
, _del_time(std::move(del_time))
|
||||
{
|
||||
if (promoted_index_stream) {
|
||||
_reader.emplace(std::move(*promoted_index_stream), num_pi_blocks, s, 0, _promoted_index_size);
|
||||
}
|
||||
}
|
||||
|
||||
index_entry(index_entry&&) = default;
|
||||
index_entry& operator=(index_entry&&) = default;
|
||||
|
||||
// Reads promoted index blocks from the stream until it finds the upper bound
|
||||
// for a given position.
|
||||
// Returns the index of the element right before the upper bound one.
|
||||
future<size_t> get_pi_blocks_until(position_in_partition_view pos) {
|
||||
if (!_reader) {
|
||||
return make_ready_future<size_t>(0);
|
||||
}
|
||||
|
||||
_reader->switch_to_consume_until_mode(pos);
|
||||
promoted_index_blocks& blocks = _reader->get_pi_blocks();
|
||||
if (!blocks.empty()) {
|
||||
erase_all_but_last(blocks);
|
||||
}
|
||||
return _reader->consume_input(*_reader).then([this] {
|
||||
return make_ready_future<size_t>(_reader->get_current_pi_index());
|
||||
});
|
||||
}
|
||||
|
||||
// Unconditionally reads the promoted index blocks from the next data buffer
|
||||
future<> get_next_pi_blocks() {
|
||||
if (!_reader) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
promoted_index_blocks& blocks = _reader->get_pi_blocks();
|
||||
blocks = promoted_index_blocks{};
|
||||
_reader->switch_to_consume_next_mode();
|
||||
return _reader->consume_input(*_reader);
|
||||
}
|
||||
|
||||
uint32_t get_total_pi_blocks_count() const { return _reader ? _reader->get_total_num_blocks() : 0; }
|
||||
uint32_t get_read_pi_blocks_count() const { return _reader ? _reader->get_read_num_blocks() : 0; }
|
||||
promoted_index_blocks* get_pi_blocks() { return _reader ? &_reader->get_pi_blocks() : nullptr; }
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -25,6 +25,8 @@
|
||||
#include "downsampling.hh"
|
||||
#include "sstables/shared_index_lists.hh"
|
||||
#include <seastar/util/bool_class.hh>
|
||||
#include "utils/buffer_input_stream.hh"
|
||||
#include "sstables/prepended_input_stream.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -56,9 +58,12 @@ using trust_promoted_index = bool_class<trust_promoted_index_tag>;
|
||||
template <class IndexConsumer>
|
||||
class index_consume_entry_context : public data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>> {
|
||||
using proceed = data_consumer::proceed;
|
||||
using processing_result = data_consumer::processing_result;
|
||||
using continuous_data_consumer = data_consumer::continuous_data_consumer<index_consume_entry_context<IndexConsumer>>;
|
||||
private:
|
||||
IndexConsumer& _consumer;
|
||||
file _index_file;
|
||||
file_input_stream_options _options;
|
||||
uint64_t _entry_offset;
|
||||
|
||||
enum class state {
|
||||
@@ -67,25 +72,30 @@ private:
|
||||
KEY_BYTES,
|
||||
POSITION,
|
||||
PROMOTED_SIZE,
|
||||
PROMOTED_BYTES,
|
||||
LOCAL_DELETION_TIME,
|
||||
MARKED_FOR_DELETE_AT,
|
||||
NUM_PROMOTED_INDEX_BLOCKS,
|
||||
CONSUME_ENTRY,
|
||||
} _state = state::START;
|
||||
|
||||
temporary_buffer<char> _key;
|
||||
temporary_buffer<char> _promoted;
|
||||
uint32_t _promoted_index_size;
|
||||
uint64_t _position;
|
||||
stdx::optional<deletion_time> _deletion_time;
|
||||
uint32_t _num_pi_blocks = 0;
|
||||
|
||||
trust_promoted_index _trust_pi;
|
||||
const schema& _s;
|
||||
|
||||
public:
|
||||
void verify_end_state() {
|
||||
}
|
||||
|
||||
bool non_consuming() const {
|
||||
return ((_state == state::CONSUME_ENTRY) || (_state == state::START) ||
|
||||
((_state == state::PROMOTED_BYTES) && (continuous_data_consumer::_prestate == continuous_data_consumer::prestate::NONE)));
|
||||
return ((_state == state::CONSUME_ENTRY) || (_state == state::START));
|
||||
}
|
||||
|
||||
proceed process_state(temporary_buffer<char>& data) {
|
||||
processing_result process_state(temporary_buffer<char>& data) {
|
||||
switch (_state) {
|
||||
// START comes first, to make the handling of the 0-quantity case simpler
|
||||
case state::START:
|
||||
@@ -107,23 +117,69 @@ public:
|
||||
break;
|
||||
}
|
||||
case state::PROMOTED_SIZE:
|
||||
_position = this->_u64;
|
||||
if (this->read_32(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::PROMOTED_BYTES;
|
||||
_state = state::LOCAL_DELETION_TIME;
|
||||
break;
|
||||
}
|
||||
case state::PROMOTED_BYTES:
|
||||
if (this->read_bytes(data, this->_u32, _promoted) != continuous_data_consumer::read_status::ready) {
|
||||
case state::LOCAL_DELETION_TIME:
|
||||
_promoted_index_size = this->_u32;
|
||||
if (_promoted_index_size == 0) {
|
||||
_state = state::CONSUME_ENTRY;
|
||||
goto state_CONSUME_ENTRY;
|
||||
}
|
||||
_deletion_time.emplace();
|
||||
if (this->read_32(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::MARKED_FOR_DELETE_AT;
|
||||
break;
|
||||
}
|
||||
case state::MARKED_FOR_DELETE_AT:
|
||||
_deletion_time->local_deletion_time = this->_u32;
|
||||
if (this->read_64(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::NUM_PROMOTED_INDEX_BLOCKS;
|
||||
break;
|
||||
}
|
||||
case state::NUM_PROMOTED_INDEX_BLOCKS:
|
||||
_deletion_time->marked_for_delete_at = this->_u64;
|
||||
if (this->read_32(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::CONSUME_ENTRY;
|
||||
break;
|
||||
}
|
||||
state_CONSUME_ENTRY:
|
||||
case state::CONSUME_ENTRY: {
|
||||
auto len = (_key.size() + _promoted.size() + 14);
|
||||
if (_trust_pi == trust_promoted_index::no) {
|
||||
_promoted = temporary_buffer<char>();
|
||||
auto len = (_key.size() + _promoted_index_size + 14);
|
||||
if (_deletion_time) {
|
||||
_num_pi_blocks = this->_u32;
|
||||
_promoted_index_size -= 16;
|
||||
}
|
||||
_consumer.consume_entry(index_entry(std::move(_key), this->_u64, std::move(_promoted)), _entry_offset);
|
||||
auto data_size = data.size();
|
||||
stdx::optional<input_stream<char>> promoted_index_stream;
|
||||
if ((_trust_pi == trust_promoted_index::yes) && (_promoted_index_size > 0)) {
|
||||
if (_promoted_index_size <= data_size) {
|
||||
auto buf = data.share();
|
||||
buf.trim(_promoted_index_size);
|
||||
promoted_index_stream = make_buffer_input_stream(std::move(buf));
|
||||
} else {
|
||||
promoted_index_stream = make_prepended_input_stream(
|
||||
std::move(data),
|
||||
make_file_input_stream(_index_file, _entry_offset + _key.size() + 30 + data_size,
|
||||
_promoted_index_size - data_size, _options).detach());
|
||||
}
|
||||
} else {
|
||||
_num_pi_blocks = 0;
|
||||
}
|
||||
_consumer.consume_entry(index_entry{std::move(_key), _position, std::move(promoted_index_stream),
|
||||
_promoted_index_size, std::move(_deletion_time), _num_pi_blocks, _s}, _entry_offset);
|
||||
_entry_offset += len;
|
||||
_deletion_time = stdx::nullopt;
|
||||
_num_pi_blocks = 0;
|
||||
_state = state::START;
|
||||
if (_promoted_index_size <= data_size) {
|
||||
data.trim_front(_promoted_index_size);
|
||||
} else {
|
||||
assert(data.empty());
|
||||
return skip_bytes{_promoted_index_size - data_size};
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@@ -132,10 +188,11 @@ public:
|
||||
return proceed::yes;
|
||||
}
|
||||
|
||||
index_consume_entry_context(IndexConsumer& consumer, trust_promoted_index trust_pi,
|
||||
input_stream<char>&& input, uint64_t start, uint64_t maxlen)
|
||||
: continuous_data_consumer(std::move(input), start, maxlen)
|
||||
, _consumer(consumer), _entry_offset(start), _trust_pi(trust_pi)
|
||||
index_consume_entry_context(IndexConsumer& consumer, trust_promoted_index trust_pi, const schema& s,
|
||||
file index_file, file_input_stream_options options, uint64_t start, uint64_t maxlen)
|
||||
: continuous_data_consumer(make_file_input_stream(index_file, start, maxlen, options), start, maxlen)
|
||||
, _consumer(consumer), _index_file(index_file), _options(options)
|
||||
, _entry_offset(start), _trust_pi(trust_pi), _s(s)
|
||||
{}
|
||||
|
||||
void reset(uint64_t offset) {
|
||||
@@ -190,19 +247,19 @@ class index_reader {
|
||||
index_consumer _consumer;
|
||||
index_consume_entry_context<index_consumer> _context;
|
||||
|
||||
static auto create_file_input_stream(shared_sstable sst, const io_priority_class& pc, uint64_t begin, uint64_t end) {
|
||||
inline static file_input_stream_options get_file_input_stream_options(shared_sstable sst, const io_priority_class& pc) {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = sst->sstable_buffer_size;
|
||||
options.read_ahead = 2;
|
||||
options.io_priority_class = pc;
|
||||
return make_file_input_stream(sst->_index_file, begin, end - begin, std::move(options));
|
||||
return options;
|
||||
}
|
||||
|
||||
reader(shared_sstable sst, const io_priority_class& pc, uint64_t begin, uint64_t end, uint64_t quantity)
|
||||
: _consumer(quantity)
|
||||
, _context(_consumer,
|
||||
trust_promoted_index(sst->has_correct_promoted_index_entries()),
|
||||
create_file_input_stream(sst, pc, begin, end), begin, end - begin)
|
||||
trust_promoted_index(sst->has_correct_promoted_index_entries()), *sst->_schema, sst->_index_file,
|
||||
get_file_input_stream_options(sst, pc), begin, end - begin)
|
||||
{ }
|
||||
};
|
||||
|
||||
@@ -336,12 +393,7 @@ public:
|
||||
// It may be unavailable for old sstables for which this information was not generated.
|
||||
// Can be called only when partition_data_ready().
|
||||
stdx::optional<sstables::deletion_time> partition_tombstone() {
|
||||
index_entry& e = current_partition_entry();
|
||||
auto pi = e.get_promoted_index_view();
|
||||
if (!pi) {
|
||||
return stdx::nullopt;
|
||||
}
|
||||
return pi.get_deletion_time();
|
||||
return current_partition_entry().get_deletion_time();
|
||||
}
|
||||
|
||||
// Returns the key for current partition.
|
||||
@@ -395,97 +447,59 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
const schema& s = *_sstable->_schema;
|
||||
index_entry& e = current_partition_entry();
|
||||
promoted_index* pi = nullptr;
|
||||
try {
|
||||
pi = e.get_promoted_index(s);
|
||||
} catch (...) {
|
||||
sstlog.error("Failed to get promoted index for sstable {}, page {}, index {}: {}", _sstable->get_filename(),
|
||||
_current_summary_idx, _current_index_idx, std::current_exception());
|
||||
}
|
||||
if (!pi) {
|
||||
if (e.get_total_pi_blocks_count() == 0) {
|
||||
sstlog.trace("index {}: no promoted index", this);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
if (sstlog.is_enabled(seastar::log_level::trace)) {
|
||||
sstlog.trace("index {}: promoted index:", this);
|
||||
for (auto&& e : pi->entries) {
|
||||
sstlog.trace(" {}-{}: +{} len={}", e.start, e.end, e.offset, e.width);
|
||||
}
|
||||
}
|
||||
|
||||
auto cmp_with_start = [pos_cmp = position_in_partition::composite_less_compare(s)]
|
||||
(position_in_partition_view pos, const promoted_index::entry& e) -> bool {
|
||||
return pos_cmp(pos, e.start);
|
||||
};
|
||||
promoted_index_blocks* pi_blocks = e.get_pi_blocks();
|
||||
assert(pi_blocks);
|
||||
|
||||
// Optimize short skips which typically land in the same block
|
||||
if (_current_pi_idx >= pi->entries.size() || cmp_with_start(pos, pi->entries[_current_pi_idx])) {
|
||||
sstlog.trace("index {}: position in current block", this);
|
||||
if ((e.get_total_pi_blocks_count() == e.get_read_pi_blocks_count()) && _current_pi_idx >= pi_blocks->size() -1) {
|
||||
sstlog.trace("index {}: position in current block (all blocks are read)", this);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto i = std::upper_bound(pi->entries.begin() + _current_pi_idx, pi->entries.end(), pos, cmp_with_start);
|
||||
_current_pi_idx = std::distance(pi->entries.begin(), i);
|
||||
if (i != pi->entries.begin()) {
|
||||
--i;
|
||||
}
|
||||
_data_file_position = e.position() + i->offset;
|
||||
_element = indexable_element::cell;
|
||||
sstlog.trace("index {}: skipped to cell, _current_pi_idx={}, _data_file_position={}", this, _current_pi_idx, _data_file_position);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Forwards the cursor to a position which is greater than given position in current partition.
|
||||
//
|
||||
// Note that the index within partition, unlike the partition index, doesn't cover all keys.
|
||||
// So this may not forward to the smallest position which is greater than pos.
|
||||
//
|
||||
// May advance to the next partition if it's not possible to find a suitable position inside
|
||||
// current partition.
|
||||
//
|
||||
// Must be called only when !eof().
|
||||
future<> advance_past(position_in_partition_view pos) {
|
||||
sstlog.trace("index {}: advance_past({}), current data_file_pos={}", this, pos, _data_file_position);
|
||||
|
||||
if (!partition_data_ready()) {
|
||||
return read_partition_data().then([this, pos] {
|
||||
assert(partition_data_ready());
|
||||
return advance_past(pos);
|
||||
});
|
||||
}
|
||||
|
||||
const schema& s = *_sstable->_schema;
|
||||
index_entry& e = current_partition_entry();
|
||||
promoted_index* pi = nullptr;
|
||||
try {
|
||||
pi = e.get_promoted_index(s);
|
||||
} catch (...) {
|
||||
sstlog.error("Failed to get promoted index for sstable {}, page {}, index {}: {}", _sstable->get_filename(),
|
||||
_current_summary_idx, _current_index_idx, std::current_exception());
|
||||
}
|
||||
if (!pi || pi->entries.empty()) {
|
||||
sstlog.trace("index {}: no promoted index", this);
|
||||
return advance_to_next_partition();
|
||||
}
|
||||
|
||||
auto cmp_with_start = [pos_cmp = position_in_partition::composite_less_compare(s)]
|
||||
(position_in_partition_view pos, const promoted_index::entry& e) -> bool {
|
||||
return pos_cmp(pos, e.start);
|
||||
auto cmp_with_start = [pos_cmp = position_in_partition::composite_less_compare(s), s]
|
||||
(position_in_partition_view pos, const promoted_index_block& info) -> bool {
|
||||
return pos_cmp(pos, info.start(s));
|
||||
};
|
||||
|
||||
auto i = std::upper_bound(pi->entries.begin() + _current_pi_idx, pi->entries.end(), pos, cmp_with_start);
|
||||
_current_pi_idx = std::distance(pi->entries.begin(), i);
|
||||
if (i == pi->entries.end()) {
|
||||
return advance_to_next_partition();
|
||||
|
||||
if (!pi_blocks->empty() && cmp_with_start(pos, (*pi_blocks)[_current_pi_idx])) {
|
||||
sstlog.trace("index {}: position in current block (exact match)", this);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
_data_file_position = e.position() + i->offset;
|
||||
_element = indexable_element::cell;
|
||||
sstlog.trace("index {}: skipped to cell, _current_pi_idx={}, _data_file_position={}", this, _current_pi_idx, _data_file_position);
|
||||
return make_ready_future<>();
|
||||
auto i = std::upper_bound(pi_blocks->begin() + _current_pi_idx, pi_blocks->end(), pos, cmp_with_start);
|
||||
_current_pi_idx = std::distance(pi_blocks->begin(), i);
|
||||
if ((i != pi_blocks->end()) || (e.get_read_pi_blocks_count() == e.get_total_pi_blocks_count())) {
|
||||
if (i != pi_blocks->begin()) {
|
||||
--i;
|
||||
}
|
||||
|
||||
_data_file_position = e.position() + i->offset();
|
||||
_element = indexable_element::cell;
|
||||
sstlog.trace("index {}: skipped to cell, _current_pi_idx={}, _data_file_position={}",
|
||||
this, _current_pi_idx, _data_file_position);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return e.get_pi_blocks_until(pos).then([this, &e, pi_blocks] (size_t current_pi_idx) {
|
||||
_current_pi_idx = current_pi_idx;
|
||||
auto i = std::begin(*pi_blocks);
|
||||
if (_current_pi_idx > 0) {
|
||||
std::advance(i, _current_pi_idx - 1);
|
||||
}
|
||||
_data_file_position = e.position() + i->offset();
|
||||
_element = indexable_element::cell;
|
||||
sstlog.trace("index {}: skipped to cell, _current_pi_idx={}, _data_file_position={}",
|
||||
this, _current_pi_idx, _data_file_position);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
// Like advance_to(dht::ring_position_view), but returns information whether the key was found
|
||||
|
||||
@@ -752,61 +752,6 @@ static inline T read_be(const signed char* p) {
|
||||
return ::read_be<T>(reinterpret_cast<const char*>(p));
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
static inline T consume_be(bytes_view& p) {
|
||||
ensure_len(p, sizeof(T));
|
||||
T i = read_be<T>(p.data());
|
||||
p.remove_prefix(sizeof(T));
|
||||
return i;
|
||||
}
|
||||
|
||||
static inline bytes_view consume_bytes(bytes_view& p, size_t len) {
|
||||
ensure_len(p, len);
|
||||
auto ret = bytes_view(p.data(), len);
|
||||
p.remove_prefix(len);
|
||||
return ret;
|
||||
}
|
||||
|
||||
promoted_index promoted_index_view::parse(const schema& s) const {
|
||||
bytes_view data = _bytes;
|
||||
|
||||
sstables::deletion_time del_time;
|
||||
del_time.local_deletion_time = consume_be<uint32_t>(data);
|
||||
del_time.marked_for_delete_at = consume_be<uint64_t>(data);
|
||||
|
||||
auto num_blocks = consume_be<uint32_t>(data);
|
||||
std::deque<promoted_index::entry> entries;
|
||||
while (num_blocks--) {
|
||||
uint16_t len = consume_be<uint16_t>(data);
|
||||
auto start_ck = composite_view(consume_bytes(data, len), s.is_compound());
|
||||
len = consume_be<uint16_t>(data);
|
||||
auto end_ck = composite_view(consume_bytes(data, len), s.is_compound());
|
||||
uint64_t offset = consume_be<uint64_t>(data);
|
||||
uint64_t width = consume_be<uint64_t>(data);
|
||||
entries.emplace_back(promoted_index::entry{start_ck, end_ck, offset, width});
|
||||
}
|
||||
|
||||
return promoted_index{del_time, std::move(entries)};
|
||||
}
|
||||
|
||||
sstables::deletion_time promoted_index_view::get_deletion_time() const {
|
||||
bytes_view data = _bytes;
|
||||
sstables::deletion_time del_time;
|
||||
del_time.local_deletion_time = consume_be<uint32_t>(data);
|
||||
del_time.marked_for_delete_at = consume_be<uint64_t>(data);
|
||||
return del_time;
|
||||
}
|
||||
|
||||
static
|
||||
future<> advance_to_upper_bound(index_reader& ix, const schema& s, const query::partition_slice& slice, dht::ring_position_view key) {
|
||||
auto& ranges = slice.row_ranges(s, *key.key());
|
||||
if (ranges.empty()) {
|
||||
return ix.advance_past(position_in_partition_view::for_static_row());
|
||||
} else {
|
||||
return ix.advance_past(position_in_partition_view::for_range_end(ranges[ranges.size() - 1]));
|
||||
}
|
||||
}
|
||||
|
||||
class sstable_mutation_reader : public flat_mutation_reader::impl {
|
||||
friend class mp_row_consumer;
|
||||
shared_sstable _sst;
|
||||
@@ -894,7 +839,7 @@ public:
|
||||
_sst->get_filter_tracker().add_true_positive();
|
||||
|
||||
_rh_index = std::make_unique<index_reader>(*_lh_index);
|
||||
auto f = advance_to_upper_bound(*_rh_index, *_schema, slice, key);
|
||||
auto f = _rh_index->advance_to_next_partition();
|
||||
return f.then([this, &slice, &pc] () mutable {
|
||||
_read_enabled = _lh_index->data_file_position() != _rh_index->data_file_position();
|
||||
_context = _sst->data_consume_single_partition(_consumer,
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "types.hh"
|
||||
#include "index_entry.hh"
|
||||
#include <vector>
|
||||
#include <seastar/core/future.hh>
|
||||
#include "utils/loading_shared_values.hh"
|
||||
|
||||
@@ -2466,11 +2466,10 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
options.io_priority_class = pc;
|
||||
auto stream = make_file_input_stream(index_file, 0, index_size, std::move(options));
|
||||
return do_with(summary_generator(_components->summary),
|
||||
[this, &pc, stream = std::move(stream), index_size] (summary_generator& s) mutable {
|
||||
[this, &pc, options = std::move(options), index_file, index_size] (summary_generator& s) mutable {
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
|
||||
s, trust_promoted_index::yes, std::move(stream), 0, index_size);
|
||||
s, trust_promoted_index::yes, *_schema, index_file, std::move(options), 0, index_size);
|
||||
return ctx->consume_input(*ctx).finally([ctx] {
|
||||
return ctx->close();
|
||||
}).then([this, ctx, &s] {
|
||||
|
||||
@@ -93,91 +93,43 @@ enum class indexable_element {
|
||||
cell
|
||||
};
|
||||
|
||||
// Exploded view of promoted index.
|
||||
// Contains pointers into external buffer, so that buffer must be kept alive
|
||||
// as long as this is used.
|
||||
struct promoted_index {
|
||||
struct entry {
|
||||
composite_view start;
|
||||
composite_view end;
|
||||
uint64_t offset;
|
||||
uint64_t width;
|
||||
};
|
||||
deletion_time del_time;
|
||||
std::deque<entry> entries;
|
||||
};
|
||||
|
||||
class promoted_index_view {
|
||||
bytes_view _bytes;
|
||||
class promoted_index_block {
|
||||
public:
|
||||
explicit promoted_index_view(bytes_view v) : _bytes(v) {}
|
||||
sstables::deletion_time get_deletion_time() const;
|
||||
promoted_index parse(const schema&) const;
|
||||
explicit operator bool() const { return !_bytes.empty(); }
|
||||
promoted_index_block(temporary_buffer<char>&& start, temporary_buffer<char>&& end,
|
||||
uint64_t offset, uint64_t width)
|
||||
: _start(std::move(start)), _end(std::move(end))
|
||||
, _offset(offset), _width(width)
|
||||
{}
|
||||
promoted_index_block(const promoted_index_block& rhs)
|
||||
: _start(rhs._start.get(), rhs._start.size()), _end(rhs._end.get(), rhs._end.size())
|
||||
, _offset(rhs._offset), _width(rhs._width)
|
||||
{}
|
||||
promoted_index_block(promoted_index_block&&) noexcept = default;
|
||||
|
||||
promoted_index_block& operator=(const promoted_index_block& rhs) {
|
||||
if (this != &rhs) {
|
||||
_start = temporary_buffer<char>(rhs._start.get(), rhs._start.size());
|
||||
_end = temporary_buffer<char>(rhs._end.get(), rhs._end.size());
|
||||
_offset = rhs._offset;
|
||||
_width = rhs._width;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
promoted_index_block& operator=(promoted_index_block&&) noexcept = default;
|
||||
|
||||
composite_view start(const schema& s) const { return composite_view(to_bytes_view(_start), s.is_compound());}
|
||||
composite_view end(const schema& s) const { return composite_view(to_bytes_view(_end), s.is_compound());}
|
||||
uint64_t offset() const { return _offset; }
|
||||
uint64_t width() const { return _width; }
|
||||
|
||||
private:
|
||||
temporary_buffer<char> _start;
|
||||
temporary_buffer<char> _end;
|
||||
uint64_t _offset;
|
||||
uint64_t _width;
|
||||
};
|
||||
|
||||
class index_entry {
|
||||
temporary_buffer<char> _key;
|
||||
mutable stdx::optional<dht::token> _token;
|
||||
uint64_t _position;
|
||||
temporary_buffer<char> _promoted_index_bytes;
|
||||
stdx::optional<promoted_index> _promoted_index;
|
||||
public:
|
||||
|
||||
bytes_view get_key_bytes() const {
|
||||
return to_bytes_view(_key);
|
||||
}
|
||||
|
||||
key_view get_key() const {
|
||||
return key_view{get_key_bytes()};
|
||||
}
|
||||
|
||||
decorated_key_view get_decorated_key() const {
|
||||
if (!_token) {
|
||||
_token.emplace(dht::global_partitioner().get_token(get_key()));
|
||||
}
|
||||
return decorated_key_view(*_token, get_key());
|
||||
}
|
||||
|
||||
uint64_t position() const {
|
||||
return _position;
|
||||
}
|
||||
|
||||
bytes_view get_promoted_index_bytes() const {
|
||||
return to_bytes_view(_promoted_index_bytes);
|
||||
}
|
||||
|
||||
promoted_index_view get_promoted_index_view() const {
|
||||
return promoted_index_view(get_promoted_index_bytes());
|
||||
}
|
||||
|
||||
index_entry(temporary_buffer<char>&& key, uint64_t position, temporary_buffer<char>&& promoted_index)
|
||||
: _key(std::move(key)), _position(position), _promoted_index_bytes(std::move(promoted_index)) {}
|
||||
|
||||
index_entry(const index_entry& o)
|
||||
: _key(o._key.get(), o._key.size())
|
||||
, _position(o._position)
|
||||
, _promoted_index_bytes(o._promoted_index_bytes.get(), o._promoted_index_bytes.size())
|
||||
{ }
|
||||
|
||||
index_entry(index_entry&&) = default;
|
||||
|
||||
index_entry& operator=(const index_entry& x) {
|
||||
return operator=(index_entry(x));
|
||||
}
|
||||
|
||||
index_entry& operator=(index_entry&&) = default;
|
||||
|
||||
promoted_index* get_promoted_index(const schema& s) {
|
||||
if (!_promoted_index) {
|
||||
auto v = get_promoted_index_view();
|
||||
if (v) {
|
||||
_promoted_index = v.parse(s);
|
||||
}
|
||||
}
|
||||
return _promoted_index ? &*_promoted_index : nullptr;
|
||||
}
|
||||
};
|
||||
using promoted_index_blocks = seastar::circular_buffer<promoted_index_block>;
|
||||
|
||||
struct summary_entry {
|
||||
dht::token token;
|
||||
|
||||
@@ -38,7 +38,8 @@ public:
|
||||
auto prev = dht::ring_position::min();
|
||||
_r->read_partition_data().get();
|
||||
while (!_r->eof()) {
|
||||
auto k = _r->current_partition_entry().get_decorated_key();
|
||||
auto& e = _r->current_partition_entry();
|
||||
auto k = e.get_decorated_key();
|
||||
auto rp = dht::ring_position(k.token(), k.key().to_partition_key(s));
|
||||
|
||||
if (!rp_cmp(prev, rp)) {
|
||||
@@ -47,17 +48,22 @@ public:
|
||||
|
||||
prev = rp;
|
||||
|
||||
auto* pi = _r->current_partition_entry().get_promoted_index(s);
|
||||
if (!pi->entries.empty()) {
|
||||
auto& prev = pi->entries[0];
|
||||
for (size_t i = 1; i < pi->entries.size(); ++i) {
|
||||
auto& cur = pi->entries[i];
|
||||
if (pos_cmp(cur.start, prev.end)) {
|
||||
while (e.get_read_pi_blocks_count() < e.get_total_pi_blocks_count()) {
|
||||
e.get_next_pi_blocks().get();
|
||||
auto* infos = e.get_pi_blocks();
|
||||
if (infos->empty()) {
|
||||
continue;
|
||||
}
|
||||
auto& prev = (*infos)[0];
|
||||
for (size_t i = 1; i < infos->size(); ++i) {
|
||||
auto& cur = (*infos)[i];
|
||||
if (pos_cmp(cur.start(s), prev.end(s))) {
|
||||
std::cout << "promoted index:\n";
|
||||
for (auto& e : pi->entries) {
|
||||
std::cout << " " << e.start << "-" << e.end << ": +" << e.offset << " len=" << e.width << std::endl;
|
||||
for (auto& e : *infos) {
|
||||
std::cout << " " << e.start(s) << "-" << e.end(s)
|
||||
<< ": +" << e.offset() << " len=" << e.width() << std::endl;
|
||||
}
|
||||
BOOST_FAIL(sprint("Index blocks are not monotonic: %s >= %s", prev.end, cur.start));
|
||||
BOOST_FAIL(sprint("Index blocks are not monotonic: %s >= %s", prev.end(s), cur.start(s)));
|
||||
}
|
||||
cur = prev;
|
||||
}
|
||||
@@ -70,8 +76,7 @@ public:
|
||||
index_reader_assertions& is_empty(const schema& s) {
|
||||
_r->read_partition_data().get();
|
||||
while (!_r->eof()) {
|
||||
auto* pi = _r->current_partition_entry().get_promoted_index(s);
|
||||
BOOST_REQUIRE(pi == nullptr);
|
||||
BOOST_REQUIRE(_r->current_partition_entry().get_total_pi_blocks_count() == 0);
|
||||
_r->advance_to_next_partition().get();
|
||||
}
|
||||
return *this;
|
||||
|
||||
@@ -1027,7 +1027,7 @@ SEASTAR_TEST_CASE(promoted_index_read) {
|
||||
schema_ptr s = large_partition_schema();
|
||||
return sstables::test(sstp).read_indexes().then([sstp] (index_list vec) {
|
||||
BOOST_REQUIRE(vec.size() == 1);
|
||||
BOOST_REQUIRE(vec.front().get_promoted_index_bytes().size() == 468);
|
||||
BOOST_REQUIRE(vec.front().get_promoted_index_size() == 452);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ public:
|
||||
auto l = make_lw_shared<index_list>();
|
||||
return do_with(_sst->get_index_reader(default_priority_class()), [l] (auto& ir) {
|
||||
return ir->read_partition_data().then([&, l] {
|
||||
l->push_back(ir->current_partition_entry());
|
||||
l->push_back(std::move(ir->current_partition_entry()));
|
||||
}).then([&, l] {
|
||||
return repeat([&, l] {
|
||||
return ir->advance_to_next_partition().then([&, l] {
|
||||
@@ -87,13 +87,13 @@ public:
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
|
||||
l->push_back(ir->current_partition_entry());
|
||||
l->push_back(std::move(ir->current_partition_entry()));
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then([l] {
|
||||
return *l;
|
||||
return std::move(*l);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user