sstables: Factor out promoted index into a separate class.

An index entry may or may not have a promoted index. All the optional
fields are better scoped under the same class to avoid lots of separate
optional fields and give better representation.

Signed-off-by: Vladimir Krivopalov <vladimir@scylladb.com>
This commit is contained in:
Vladimir Krivopalov
2018-06-25 13:47:44 -07:00
parent fc629b9ca6
commit 81fba73e9d
2 changed files with 68 additions and 33 deletions

View File

@@ -173,15 +173,40 @@ public:
{}
};
class promoted_index {
deletion_time _del_time;
uint32_t _promoted_index_size;
promoted_index_blocks_reader _reader;
bool _reader_closed = false;
public:
promoted_index(const schema& s, deletion_time del_time, input_stream<char>&& promoted_index_stream,
uint32_t promoted_index_size, uint32_t blocks_count)
: _del_time{del_time}
, _promoted_index_size(promoted_index_size)
, _reader{std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size}
{}
[[nodiscard]] deletion_time get_deletion_time() const { return _del_time; }
[[nodiscard]] uint32_t get_promoted_index_size() const { return _promoted_index_size; }
[[nodiscard]] promoted_index_blocks_reader& get_reader() { return _reader; };
[[nodiscard]] const promoted_index_blocks_reader& get_reader() const { return _reader; };
future<> close_reader() {
if (!_reader_closed) {
_reader_closed = true;
return _reader.close();
}
return make_ready_future<>();
}
};
class index_entry {
private:
temporary_buffer<char> _key;
mutable std::optional<dht::token> _token;
uint64_t _position;
std::optional<promoted_index_blocks_reader> _reader;
bool _reader_closed = false;
uint32_t _promoted_index_size;
std::optional<deletion_time> _del_time;
std::optional<promoted_index> _index;
public:
@@ -202,21 +227,21 @@ public:
uint64_t position() const { return _position; };
std::optional<deletion_time> get_deletion_time() const { return _del_time; }
uint32_t get_promoted_index_size() const { return _promoted_index_size; }
std::optional<deletion_time> get_deletion_time() const {
if (_index) {
return _index->get_deletion_time();
}
index_entry(temporary_buffer<char>&& key, uint64_t position,
std::optional<input_stream<char>>&& promoted_index_stream, uint32_t promoted_index_size,
std::optional<deletion_time>&& del_time, uint32_t num_pi_blocks, const schema& s)
return {};
}
uint32_t get_promoted_index_size() const { return _index ? _index->get_promoted_index_size() : 0; }
index_entry(temporary_buffer<char>&& key, uint64_t position, std::optional<promoted_index>&& index)
: _key(std::move(key))
, _position(position)
, _promoted_index_size(promoted_index_size)
, _del_time(std::move(del_time))
{
if (promoted_index_stream) {
_reader.emplace(std::move(*promoted_index_stream), num_pi_blocks, s, 0, _promoted_index_size);
}
}
, _index(std::move(index))
{}
index_entry(index_entry&&) = default;
index_entry& operator=(index_entry&&) = default;
@@ -225,39 +250,46 @@ public:
// for a given position.
// Returns the index of the element right before the upper bound one.
future<size_t> get_pi_blocks_until(position_in_partition_view pos) {
if (!_reader) {
if (!_index) {
return make_ready_future<size_t>(0);
}
_reader->switch_to_consume_until_mode(pos);
promoted_index_blocks& blocks = _reader->get_pi_blocks();
auto& reader = _index->get_reader();
reader.switch_to_consume_until_mode(pos);
promoted_index_blocks& blocks = reader.get_pi_blocks();
if (!blocks.empty()) {
erase_all_but_last(blocks);
}
return _reader->consume_input().then([this] {
return make_ready_future<size_t>(_reader->get_current_pi_index());
return reader.consume_input().then([this, &reader] {
return reader.get_current_pi_index();
});
}
// Unconditionally reads the promoted index blocks from the next data buffer
future<> get_next_pi_blocks() {
if (!_reader) {
if (!_index) {
return make_ready_future<>();
}
promoted_index_blocks& blocks = _reader->get_pi_blocks();
auto& reader = _index->get_reader();
promoted_index_blocks& blocks = reader.get_pi_blocks();
blocks = promoted_index_blocks{};
_reader->switch_to_consume_next_mode();
return _reader->consume_input();
reader.switch_to_consume_next_mode();
return reader.consume_input();
}
uint32_t get_total_pi_blocks_count() const { return _reader ? _reader->get_total_num_blocks() : 0; }
uint32_t get_read_pi_blocks_count() const { return _reader ? _reader->get_read_num_blocks() : 0; }
promoted_index_blocks* get_pi_blocks() { return _reader ? &_reader->get_pi_blocks() : nullptr; }
[[nodiscard]] uint32_t get_total_pi_blocks_count() const {
return _index ? _index->get_reader().get_total_num_blocks() : 0;
}
[[nodiscard]] uint32_t get_read_pi_blocks_count() const {
return _index ? _index->get_reader().get_read_num_blocks() : 0;
}
[[nodiscard]] promoted_index_blocks* get_pi_blocks() {
return _index ? &_index->get_reader().get_pi_blocks() : nullptr;
}
future<> close_pi_stream() {
if (_reader && !_reader_closed) {
_reader_closed = true;
return _reader->close();
if (_index) {
return _index->close_reader();
}
return make_ready_future<>();

View File

@@ -171,8 +171,11 @@ public:
} else {
_num_pi_blocks = 0;
}
_consumer.consume_entry(index_entry{std::move(_key), _position, std::move(promoted_index_stream),
_promoted_index_size, std::move(_deletion_time), _num_pi_blocks, _s}, _entry_offset);
std::optional<promoted_index> index;
if (promoted_index_stream) {
index.emplace(_s, *_deletion_time, std::move(*promoted_index_stream), _promoted_index_size, _num_pi_blocks);
}
_consumer.consume_entry(index_entry{std::move(_key), _position, std::move(index)}, _entry_offset);
_entry_offset += len;
_deletion_time = std::nullopt;
_num_pi_blocks = 0;