sstables: Read promoted index stored in SSTables 3.x ('mc') format.
Signed-off-by: Vladimir Krivopalov <vladimir@scylladb.com>
This commit is contained in:
@@ -68,6 +68,7 @@ class promoted_index_block {
|
||||
* we expose it through a discriminated union of views.
|
||||
*/
|
||||
using bound_storage = std::variant<temporary_buffer<char>, position_in_partition>;
|
||||
// The block includes positions in the [_start, _end] range (both bounds inclusive)
|
||||
bound_storage _start;
|
||||
bound_storage _end;
|
||||
uint64_t _offset;
|
||||
@@ -143,73 +144,145 @@ private:
|
||||
size_t _current_pi_idx = 0; // for consume_until mode
|
||||
std::optional<position_in_partition_view> _pos; // for consume_until mode
|
||||
|
||||
enum class state {
|
||||
START_NAME_LENGTH,
|
||||
START_NAME_BYTES,
|
||||
END_NAME_LENGTH,
|
||||
END_NAME_BYTES,
|
||||
OFFSET,
|
||||
WIDTH,
|
||||
ADD_BLOCK,
|
||||
} _state = state::START_NAME_LENGTH;
|
||||
|
||||
temporary_buffer<char> _start;
|
||||
temporary_buffer<char> _end;
|
||||
uint64_t _offset;
|
||||
uint64_t _width;
|
||||
|
||||
promoted_index_blocks _pi_blocks;
|
||||
|
||||
public:
|
||||
void verify_end_state() {
|
||||
if (_num_blocks_left != 0) {
|
||||
throw std::runtime_error("promoted_index_blocks_reader - no more data but parsing is incomplete");
|
||||
struct k_l_parser_context {
|
||||
k_l_parser_context() {};
|
||||
|
||||
temporary_buffer<char> start;
|
||||
temporary_buffer<char> end;
|
||||
uint64_t offset = 0;
|
||||
uint64_t width = 0;
|
||||
|
||||
enum class state {
|
||||
START_NAME_LENGTH,
|
||||
START_NAME_BYTES,
|
||||
END_NAME_LENGTH,
|
||||
END_NAME_BYTES,
|
||||
OFFSET,
|
||||
WIDTH,
|
||||
ADD_BLOCK,
|
||||
} state = state::START_NAME_LENGTH;
|
||||
};
|
||||
|
||||
struct m_parser_context {
|
||||
column_values_fixed_lengths clustering_values_fixed_lengths;
|
||||
bool parsing_start_key = true;
|
||||
boost::iterator_range<column_values_fixed_lengths::const_iterator> ck_range;
|
||||
|
||||
std::vector<temporary_buffer<char>> clustering_key_values;
|
||||
bound_kind_m kind;
|
||||
|
||||
temporary_buffer<char> column_value;
|
||||
uint64_t ck_blocks_header = 0;
|
||||
uint32_t ck_blocks_header_offset = 0;
|
||||
std::optional<position_in_partition> start_pos;
|
||||
std::optional<position_in_partition> end_pos;
|
||||
std::optional<deletion_time> end_open_marker;
|
||||
|
||||
uint64_t offset;
|
||||
uint64_t width;
|
||||
|
||||
enum class state {
|
||||
CLUSTERING_START,
|
||||
CK_KIND,
|
||||
CK_SIZE,
|
||||
CK_BLOCK,
|
||||
CK_BLOCK_HEADER,
|
||||
CK_BLOCK2,
|
||||
CK_BLOCK_END,
|
||||
ADD_CLUSTERING_KEY,
|
||||
OFFSET,
|
||||
WIDTH,
|
||||
END_OPEN_MARKER_FLAG,
|
||||
END_OPEN_MARKER_LOCAL_DELETION_TIME,
|
||||
END_OPEN_MARKER_MARKED_FOR_DELETE_AT_1,
|
||||
END_OPEN_MARKER_MARKED_FOR_DELETE_AT_2,
|
||||
ADD_BLOCK,
|
||||
} state = state::CLUSTERING_START;
|
||||
|
||||
bool is_block_empty() {
|
||||
return (ck_blocks_header & (1u << (2 * ck_blocks_header_offset))) != 0;
|
||||
}
|
||||
}
|
||||
|
||||
bool non_consuming() const {
|
||||
return (_state == state::ADD_BLOCK);
|
||||
}
|
||||
bool is_block_null() {
|
||||
return (ck_blocks_header & (1u << (2 * ck_blocks_header_offset + 1))) != 0;
|
||||
}
|
||||
|
||||
processing_result process_state(temporary_buffer<char>& data) {
|
||||
bool no_more_ck_blocks() { return ck_range.empty(); }
|
||||
|
||||
void move_to_next_ck_block() {
|
||||
ck_range.advance_begin(1);
|
||||
++ck_blocks_header_offset;
|
||||
if (ck_blocks_header_offset == 32u) {
|
||||
ck_blocks_header_offset = 0u;
|
||||
}
|
||||
}
|
||||
|
||||
bool should_read_block_header() {
|
||||
return ck_blocks_header_offset == 0u;
|
||||
}
|
||||
std::optional<uint32_t> get_ck_block_value_length() {
|
||||
return ck_range.front();
|
||||
}
|
||||
|
||||
position_in_partition make_position() {
|
||||
auto key = clustering_key_prefix::from_range(clustering_key_values | boost::adaptors::transformed(
|
||||
[] (const temporary_buffer<char>& b) { return to_bytes_view(b); }));
|
||||
|
||||
if (kind == bound_kind_m::clustering) {
|
||||
return position_in_partition::for_key(std::move(key));
|
||||
}
|
||||
|
||||
bound_kind rt_marker_kind = is_in_bound_kind(kind)
|
||||
? to_bound_kind(kind)
|
||||
:(parsing_start_key ? boundary_to_start_bound : boundary_to_end_bound)(kind);
|
||||
return position_in_partition(position_in_partition::range_tag_t{}, rt_marker_kind, std::move(key));
|
||||
}
|
||||
};
|
||||
|
||||
std::variant<k_l_parser_context, m_parser_context> _ctx;
|
||||
|
||||
void process_state(temporary_buffer<char>& data, k_l_parser_context& ctx) {
|
||||
using state_k_l = typename k_l_parser_context::state;
|
||||
while (true) {
|
||||
switch (_state) {
|
||||
case state::START_NAME_LENGTH:
|
||||
switch (ctx.state) {
|
||||
case state_k_l::START_NAME_LENGTH:
|
||||
if (this->read_16(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::START_NAME_BYTES;
|
||||
ctx.state = state_k_l::START_NAME_BYTES;
|
||||
break;
|
||||
}
|
||||
case state::START_NAME_BYTES:
|
||||
if (this->read_bytes(data, this->_u16, _start) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::END_NAME_LENGTH;
|
||||
case state_k_l::START_NAME_BYTES:
|
||||
if (this->read_bytes(data, this->_u16, ctx.start) != continuous_data_consumer::read_status::ready) {
|
||||
ctx.state = state_k_l::END_NAME_LENGTH;
|
||||
break;
|
||||
}
|
||||
case state::END_NAME_LENGTH:
|
||||
case state_k_l::END_NAME_LENGTH:
|
||||
if (this->read_16(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::END_NAME_BYTES;
|
||||
ctx.state = state_k_l::END_NAME_BYTES;
|
||||
break;
|
||||
}
|
||||
case state::END_NAME_BYTES:
|
||||
if (this->read_bytes(data, this->_u16, _end) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::OFFSET;
|
||||
case state_k_l::END_NAME_BYTES:
|
||||
if (this->read_bytes(data, this->_u16, ctx.end) != continuous_data_consumer::read_status::ready) {
|
||||
ctx.state = state_k_l::OFFSET;
|
||||
break;
|
||||
}
|
||||
case state::OFFSET:
|
||||
case state_k_l::OFFSET:
|
||||
if (this->read_64(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::WIDTH;
|
||||
ctx.state = state_k_l::WIDTH;
|
||||
break;
|
||||
}
|
||||
case state::WIDTH:
|
||||
_offset = this->_u64;
|
||||
case state_k_l::WIDTH:
|
||||
ctx.offset = this->_u64;
|
||||
if (this->read_64(data) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::ADD_BLOCK;
|
||||
ctx.state = state_k_l::ADD_BLOCK;
|
||||
break;
|
||||
}
|
||||
case state::ADD_BLOCK:
|
||||
_width = this->_u64;
|
||||
_state = state::START_NAME_LENGTH;
|
||||
case state_k_l::ADD_BLOCK:
|
||||
ctx.width = this->_u64;
|
||||
ctx.state = state_k_l::START_NAME_LENGTH;
|
||||
--_num_blocks_left;
|
||||
_pi_blocks.emplace_back(std::move(_start), std::move(_end), _offset, _width);
|
||||
_pi_blocks.emplace_back(std::move(ctx.start), std::move(ctx.end), ctx.offset, ctx.width);
|
||||
if (_num_blocks_left == 0) {
|
||||
break;
|
||||
} else {
|
||||
@@ -222,10 +295,177 @@ public:
|
||||
}
|
||||
break;
|
||||
};
|
||||
}
|
||||
|
||||
void process_state(temporary_buffer<char>& data, m_parser_context& ctx) {
|
||||
static constexpr size_t width_base = 65536;
|
||||
using state_m = typename m_parser_context::state;
|
||||
while (true) {
|
||||
switch (ctx.state) {
|
||||
case state_m::CLUSTERING_START:
|
||||
clustering_start_label:
|
||||
ctx.clustering_key_values.clear();
|
||||
ctx.clustering_key_values.reserve(ctx.clustering_values_fixed_lengths.size());
|
||||
ctx.ck_range = boost::make_iterator_range(ctx.clustering_values_fixed_lengths);
|
||||
ctx.ck_blocks_header_offset = 0u;
|
||||
if (read_8(data) != read_status::ready) {
|
||||
ctx.state = state_m::CK_KIND;
|
||||
break;
|
||||
}
|
||||
case state_m::CK_KIND:
|
||||
ctx.kind = bound_kind_m{_u8};
|
||||
if (ctx.kind == bound_kind_m::clustering) {
|
||||
ctx.state = state_m::CK_BLOCK;
|
||||
goto ck_block_label;
|
||||
}
|
||||
if (read_16(data) != read_status::ready) {
|
||||
ctx.state = state_m::CK_SIZE;
|
||||
}
|
||||
case state_m::CK_SIZE:
|
||||
if (_u16 < _s.clustering_key_size()) {
|
||||
ctx.ck_range.drop_back(_s.clustering_key_size() - _u16);
|
||||
}
|
||||
|
||||
case state_m::CK_BLOCK:
|
||||
ck_block_label:
|
||||
if (ctx.no_more_ck_blocks()) {
|
||||
goto add_clustering_key_label;
|
||||
}
|
||||
if (!ctx.should_read_block_header()) {
|
||||
ctx.state = state_m::CK_BLOCK2;
|
||||
goto ck_block2_label;
|
||||
}
|
||||
if (read_unsigned_vint(data) != read_status::ready) {
|
||||
ctx.state = state_m::CK_BLOCK_HEADER;
|
||||
break;
|
||||
}
|
||||
case state_m::CK_BLOCK_HEADER:
|
||||
ctx.ck_blocks_header = _u64;
|
||||
case state_m::CK_BLOCK2:
|
||||
ck_block2_label:
|
||||
{
|
||||
if (ctx.is_block_empty()) {
|
||||
ctx.clustering_key_values.push_back({});
|
||||
ctx.move_to_next_ck_block();
|
||||
goto ck_block_label;
|
||||
}
|
||||
if (ctx.is_block_null()) {
|
||||
ctx.move_to_next_ck_block();
|
||||
goto ck_block_label;
|
||||
}
|
||||
read_status status = read_status::waiting;
|
||||
if (auto len = ctx.get_ck_block_value_length()) {
|
||||
status = read_bytes(data, *len, ctx.column_value);
|
||||
} else {
|
||||
status = read_unsigned_vint_length_bytes(data, ctx.column_value);
|
||||
}
|
||||
if (status != read_status::ready) {
|
||||
ctx.state = state_m::CK_BLOCK_END;
|
||||
break;
|
||||
}
|
||||
}
|
||||
case state_m::CK_BLOCK_END:
|
||||
ctx.clustering_key_values.push_back(std::move(ctx.column_value));
|
||||
ctx.move_to_next_ck_block();
|
||||
ctx.state = state_m::CK_BLOCK;
|
||||
goto ck_block_label;
|
||||
case state_m::ADD_CLUSTERING_KEY:
|
||||
add_clustering_key_label:
|
||||
(ctx.parsing_start_key ? ctx.start_pos : ctx.end_pos) = ctx.make_position();
|
||||
ctx.parsing_start_key = !ctx.parsing_start_key;
|
||||
if (!ctx.end_pos) {
|
||||
ctx.state = state_m::CLUSTERING_START;
|
||||
goto clustering_start_label;
|
||||
}
|
||||
case state_m::OFFSET:
|
||||
if (read_unsigned_vint(data) != continuous_data_consumer::read_status::ready) {
|
||||
ctx.state = state_m::WIDTH;
|
||||
break;
|
||||
}
|
||||
case state_m::WIDTH:
|
||||
ctx.offset = _u64;
|
||||
if (read_signed_vint(data) != continuous_data_consumer::read_status::ready) {
|
||||
ctx.state = state_m::END_OPEN_MARKER_FLAG;
|
||||
break;
|
||||
}
|
||||
case state_m::END_OPEN_MARKER_FLAG:
|
||||
assert(_i64 + width_base > 0);
|
||||
ctx.width = (_i64 + width_base);
|
||||
if (read_8(data) != continuous_data_consumer::read_status::ready) {
|
||||
ctx.state = state_m::END_OPEN_MARKER_LOCAL_DELETION_TIME;
|
||||
break;
|
||||
}
|
||||
case state_m::END_OPEN_MARKER_LOCAL_DELETION_TIME:
|
||||
if (_u8 == 0) {
|
||||
ctx.state = state_m::ADD_BLOCK;
|
||||
goto add_block_label;
|
||||
}
|
||||
ctx.end_open_marker.emplace();
|
||||
if (read_32(data) != continuous_data_consumer::read_status::ready) {
|
||||
ctx.state = state_m::END_OPEN_MARKER_MARKED_FOR_DELETE_AT_1;
|
||||
break;
|
||||
}
|
||||
case state_m::END_OPEN_MARKER_MARKED_FOR_DELETE_AT_1:
|
||||
ctx.end_open_marker->local_deletion_time = _u32;
|
||||
if (read_64(data) != continuous_data_consumer::read_status::ready) {
|
||||
ctx.state = state_m::END_OPEN_MARKER_MARKED_FOR_DELETE_AT_2;
|
||||
break;
|
||||
}
|
||||
case state_m::END_OPEN_MARKER_MARKED_FOR_DELETE_AT_2:
|
||||
ctx.end_open_marker->marked_for_delete_at = _u64;
|
||||
case m_parser_context::state::ADD_BLOCK:
|
||||
add_block_label:
|
||||
|
||||
_pi_blocks.emplace_back(*std::exchange(ctx.start_pos, {}),
|
||||
*std::exchange(ctx.end_pos, {}),
|
||||
ctx.offset,
|
||||
ctx.width,
|
||||
*std::exchange(ctx.end_open_marker, {}));
|
||||
ctx.state = state_m::CLUSTERING_START;
|
||||
--_num_blocks_left;
|
||||
if (_num_blocks_left == 0) {
|
||||
break;
|
||||
} else {
|
||||
// keep running in the loop until we either are out of data
|
||||
// or have consumed all the blocks
|
||||
continue;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
void verify_end_state() {
|
||||
if (_num_blocks_left != 0) {
|
||||
throw std::runtime_error("promoted_index_blocks_reader - no more data but parsing is incomplete");
|
||||
}
|
||||
}
|
||||
|
||||
bool non_consuming(const k_l_parser_context& ctx) const {
|
||||
return ctx.state == k_l_parser_context::state::ADD_BLOCK;
|
||||
}
|
||||
|
||||
bool non_consuming(const m_parser_context& ctx) const {
|
||||
using state_m = typename m_parser_context::state;
|
||||
return ctx.state == state_m::CK_SIZE
|
||||
|| ctx.state == state_m::CK_BLOCK_HEADER
|
||||
|| ctx.state == state_m::CK_BLOCK_END
|
||||
|| ctx.state == state_m::ADD_CLUSTERING_KEY
|
||||
|| ctx.state == state_m::END_OPEN_MARKER_MARKED_FOR_DELETE_AT_2
|
||||
|| ctx.state == state_m::ADD_BLOCK;
|
||||
}
|
||||
|
||||
bool non_consuming() const {
|
||||
return std::visit([this] (const auto& ctx) { return non_consuming(ctx); }, _ctx);
|
||||
}
|
||||
|
||||
processing_result process_state(temporary_buffer<char>& data) {
|
||||
std::visit([this, &data] (auto& ctx) mutable { return process_state(data, ctx); }, _ctx);
|
||||
|
||||
if (_mode == consuming_mode::consume_until) {
|
||||
assert(_pos);
|
||||
auto cmp_with_start = [this, pos_cmp = position_in_partition::composite_less_compare(_s)]
|
||||
auto cmp_with_start = [this, pos_cmp = promoted_index_block_compare(_s)]
|
||||
(position_in_partition_view pos, const promoted_index_block& block) -> bool {
|
||||
return pos_cmp(pos, block.start(_s));
|
||||
};
|
||||
@@ -251,12 +491,25 @@ public:
|
||||
void switch_to_consume_until_mode(position_in_partition_view pos) { _pos = pos; _mode = consuming_mode::consume_until; }
|
||||
promoted_index_blocks& get_pi_blocks() { return _pi_blocks; };
|
||||
|
||||
promoted_index_blocks_reader(input_stream<char>&& promoted_index_stream, uint32_t num_blocks, const schema& s, uint64_t start, uint64_t maxlen)
|
||||
// This constructor is used for ka/la format which does not have information about columns fixed lengths
|
||||
promoted_index_blocks_reader(input_stream<char>&& promoted_index_stream, uint32_t num_blocks,
|
||||
const schema& s, uint64_t start, uint64_t maxlen)
|
||||
: continuous_data_consumer(std::move(promoted_index_stream), start, maxlen)
|
||||
, _total_num_blocks(num_blocks)
|
||||
, _num_blocks_left(num_blocks)
|
||||
, _s(s)
|
||||
{}
|
||||
|
||||
// This constructor is used for mc format which requires information about columns fixed lengths for parsing
|
||||
promoted_index_blocks_reader(input_stream<char>&& promoted_index_stream, uint32_t num_blocks,
|
||||
const schema& s, uint64_t start, uint64_t maxlen,
|
||||
column_values_fixed_lengths&& clustering_values_fixed_lengths)
|
||||
: continuous_data_consumer(std::move(promoted_index_stream), start, maxlen)
|
||||
, _total_num_blocks{num_blocks}
|
||||
, _num_blocks_left{num_blocks}
|
||||
, _s{s}
|
||||
, _ctx{m_parser_context{std::move(clustering_values_fixed_lengths)}}
|
||||
{}
|
||||
};
|
||||
|
||||
class promoted_index {
|
||||
@@ -273,6 +526,13 @@ public:
|
||||
, _reader{std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size}
|
||||
{}
|
||||
|
||||
promoted_index(const schema& s, deletion_time del_time, input_stream<char>&& promoted_index_stream,
|
||||
uint32_t promoted_index_size, uint32_t blocks_count, column_values_fixed_lengths clustering_values_fixed_lengths)
|
||||
: _del_time{del_time}
|
||||
, _promoted_index_size(promoted_index_size)
|
||||
, _reader{std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size, std::move(clustering_values_fixed_lengths)}
|
||||
{}
|
||||
|
||||
[[nodiscard]] deletion_time get_deletion_time() const { return _del_time; }
|
||||
[[nodiscard]] uint32_t get_promoted_index_size() const { return _promoted_index_size; }
|
||||
[[nodiscard]] promoted_index_blocks_reader& get_reader() { return _reader; };
|
||||
|
||||
@@ -92,11 +92,9 @@ private:
|
||||
|
||||
trust_promoted_index _trust_pi;
|
||||
const schema& _s;
|
||||
const sstable_version_types _version;
|
||||
std::optional<column_values_fixed_lengths> _ck_values_fixed_lengths;
|
||||
|
||||
bool is_mc_format() const {
|
||||
return _version == sstable_version_types::mc;
|
||||
}
|
||||
inline bool is_mc_format() const { return static_cast<bool>(_ck_values_fixed_lengths); }
|
||||
|
||||
public:
|
||||
void verify_end_state() {
|
||||
@@ -184,10 +182,19 @@ public:
|
||||
}
|
||||
state_CONSUME_ENTRY:
|
||||
case state::CONSUME_ENTRY: {
|
||||
auto len = (_key.size() + _promoted_index_size + 14);
|
||||
auto entry_header_length = is_mc_format()
|
||||
? sizeof(uint16_t) + unsigned_vint::serialized_size(_position) + unsigned_vint::serialized_size(_promoted_index_size)
|
||||
: sizeof(uint16_t) + sizeof(uint64_t) + sizeof(uint32_t);
|
||||
auto len = entry_header_length + _key.size() + _promoted_index_size;
|
||||
size_t delta = 0;
|
||||
if (_deletion_time) {
|
||||
_num_pi_blocks = get_uint32();
|
||||
_promoted_index_size -= 16;
|
||||
delta = is_mc_format()
|
||||
? (unsigned_vint::serialized_size(_partition_header_length) + sizeof(uint32_t)
|
||||
+ sizeof(uint64_t) + unsigned_vint::serialized_size(_num_pi_blocks))
|
||||
: sizeof(uint32_t) + sizeof(uint64_t) + sizeof(uint32_t);
|
||||
|
||||
_promoted_index_size -= delta;
|
||||
}
|
||||
auto data_size = data.size();
|
||||
std::optional<input_stream<char>> promoted_index_stream;
|
||||
@@ -199,7 +206,7 @@ public:
|
||||
} else {
|
||||
promoted_index_stream = make_prepended_input_stream(
|
||||
std::move(data),
|
||||
make_file_input_stream(_index_file, _entry_offset + _key.size() + 30 + data_size,
|
||||
make_file_input_stream(_index_file, _entry_offset + _key.size() + entry_header_length + delta + data_size,
|
||||
_promoted_index_size - data_size, _options).detach());
|
||||
}
|
||||
} else {
|
||||
@@ -207,7 +214,14 @@ public:
|
||||
}
|
||||
std::unique_ptr<promoted_index> index;
|
||||
if (promoted_index_stream) {
|
||||
index = std::make_unique<promoted_index>(_s, *_deletion_time, std::move(*promoted_index_stream), _promoted_index_size, _num_pi_blocks);
|
||||
if (is_mc_format()) {
|
||||
index = std::make_unique<promoted_index>(_s, *_deletion_time, std::move(*promoted_index_stream),
|
||||
_promoted_index_size,
|
||||
_num_pi_blocks, *_ck_values_fixed_lengths);
|
||||
} else {
|
||||
index = std::make_unique<promoted_index>(_s, *_deletion_time, std::move(*promoted_index_stream),
|
||||
_promoted_index_size, _num_pi_blocks);
|
||||
}
|
||||
}
|
||||
_consumer.consume_entry(index_entry{std::move(_key), _position, std::move(index)}, _entry_offset);
|
||||
_entry_offset += len;
|
||||
@@ -230,10 +244,10 @@ public:
|
||||
|
||||
index_consume_entry_context(IndexConsumer& consumer, trust_promoted_index trust_pi, const schema& s,
|
||||
file index_file, file_input_stream_options options, uint64_t start,
|
||||
uint64_t maxlen, sstable_version_types version)
|
||||
uint64_t maxlen, std::optional<column_values_fixed_lengths> ck_values_fixed_lengths)
|
||||
: continuous_data_consumer(make_file_input_stream(index_file, start, maxlen, options), start, maxlen)
|
||||
, _consumer(consumer), _index_file(index_file), _options(options)
|
||||
, _entry_offset(start), _trust_pi(trust_pi), _s(s), _version(version)
|
||||
, _entry_offset(start), _trust_pi(trust_pi), _s(s), _ck_values_fixed_lengths(std::move(ck_values_fixed_lengths))
|
||||
{}
|
||||
|
||||
void reset(uint64_t offset) {
|
||||
@@ -308,7 +322,10 @@ class index_reader {
|
||||
: _consumer(quantity)
|
||||
, _context(_consumer,
|
||||
trust_promoted_index(sst->has_correct_promoted_index_entries()), *sst->_schema, sst->_index_file,
|
||||
get_file_input_stream_options(sst, pc), begin, end - begin, sst->get_version())
|
||||
get_file_input_stream_options(sst, pc), begin, end - begin,
|
||||
(sst->get_version() == sstable_version_types::mc
|
||||
? std::make_optional(get_clustering_values_fixed_lengths(sst->get_serialization_header()))
|
||||
: std::optional<column_values_fixed_lengths>{}))
|
||||
{ }
|
||||
};
|
||||
|
||||
|
||||
@@ -83,6 +83,23 @@ inline bool is_boundary(bound_kind_m kind) {
|
||||
}
|
||||
}
|
||||
|
||||
inline constexpr bool is_in_bound_kind(bound_kind_m kind) {
|
||||
return kind == bound_kind_m::incl_start
|
||||
|| kind == bound_kind_m::excl_start
|
||||
|| kind == bound_kind_m::incl_end
|
||||
|| kind == bound_kind_m::excl_end;
|
||||
}
|
||||
|
||||
inline bound_kind boundary_to_start_bound(bound_kind_m kind) {
|
||||
assert(kind == bound_kind_m::incl_end_excl_start || kind == bound_kind_m::excl_end_incl_start);
|
||||
return (kind == bound_kind_m::incl_end_excl_start) ? bound_kind::excl_start : bound_kind::incl_start;
|
||||
}
|
||||
|
||||
inline bound_kind boundary_to_end_bound(bound_kind_m kind) {
|
||||
assert(kind == bound_kind_m::incl_end_excl_start || kind == bound_kind_m::excl_end_incl_start);
|
||||
return (kind == bound_kind_m::incl_end_excl_start) ? bound_kind::incl_end : bound_kind::excl_end;
|
||||
}
|
||||
|
||||
class random_access_reader;
|
||||
|
||||
// Utilities for reading integral values in variable-length format
|
||||
|
||||
@@ -3597,7 +3597,10 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
return do_with(summary_generator(_components->summary),
|
||||
[this, &pc, options = std::move(options), index_file, index_size] (summary_generator& s) mutable {
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
|
||||
s, trust_promoted_index::yes, *_schema, index_file, std::move(options), 0, index_size, _version);
|
||||
s, trust_promoted_index::yes, *_schema, index_file, std::move(options), 0, index_size,
|
||||
(_version == sstable_version_types::mc
|
||||
? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header()))
|
||||
: std::optional<column_values_fixed_lengths>{}));
|
||||
return ctx->consume_input().finally([ctx] {
|
||||
return ctx->close();
|
||||
}).then([this, ctx, &s] {
|
||||
|
||||
Reference in New Issue
Block a user