sstables: Read promoted index stored in SSTables 3.x ('mc') format.

Signed-off-by: Vladimir Krivopalov <vladimir@scylladb.com>
This commit is contained in:
Vladimir Krivopalov
2018-07-09 14:30:53 -07:00
parent 86d14f8166
commit 917528c427
4 changed files with 355 additions and 58 deletions

View File

@@ -68,6 +68,7 @@ class promoted_index_block {
* we expose it through a discriminated union of views.
*/
using bound_storage = std::variant<temporary_buffer<char>, position_in_partition>;
// The block includes positions in the [_start, _end] range (both bounds inclusive)
bound_storage _start;
bound_storage _end;
uint64_t _offset;
@@ -143,73 +144,145 @@ private:
size_t _current_pi_idx = 0; // for consume_until mode
std::optional<position_in_partition_view> _pos; // for consume_until mode
enum class state {
START_NAME_LENGTH,
START_NAME_BYTES,
END_NAME_LENGTH,
END_NAME_BYTES,
OFFSET,
WIDTH,
ADD_BLOCK,
} _state = state::START_NAME_LENGTH;
temporary_buffer<char> _start;
temporary_buffer<char> _end;
uint64_t _offset;
uint64_t _width;
promoted_index_blocks _pi_blocks;
public:
void verify_end_state() {
if (_num_blocks_left != 0) {
throw std::runtime_error("promoted_index_blocks_reader - no more data but parsing is incomplete");
struct k_l_parser_context {
k_l_parser_context() {};
temporary_buffer<char> start;
temporary_buffer<char> end;
uint64_t offset = 0;
uint64_t width = 0;
enum class state {
START_NAME_LENGTH,
START_NAME_BYTES,
END_NAME_LENGTH,
END_NAME_BYTES,
OFFSET,
WIDTH,
ADD_BLOCK,
} state = state::START_NAME_LENGTH;
};
struct m_parser_context {
column_values_fixed_lengths clustering_values_fixed_lengths;
bool parsing_start_key = true;
boost::iterator_range<column_values_fixed_lengths::const_iterator> ck_range;
std::vector<temporary_buffer<char>> clustering_key_values;
bound_kind_m kind;
temporary_buffer<char> column_value;
uint64_t ck_blocks_header = 0;
uint32_t ck_blocks_header_offset = 0;
std::optional<position_in_partition> start_pos;
std::optional<position_in_partition> end_pos;
std::optional<deletion_time> end_open_marker;
uint64_t offset;
uint64_t width;
enum class state {
CLUSTERING_START,
CK_KIND,
CK_SIZE,
CK_BLOCK,
CK_BLOCK_HEADER,
CK_BLOCK2,
CK_BLOCK_END,
ADD_CLUSTERING_KEY,
OFFSET,
WIDTH,
END_OPEN_MARKER_FLAG,
END_OPEN_MARKER_LOCAL_DELETION_TIME,
END_OPEN_MARKER_MARKED_FOR_DELETE_AT_1,
END_OPEN_MARKER_MARKED_FOR_DELETE_AT_2,
ADD_BLOCK,
} state = state::CLUSTERING_START;
bool is_block_empty() {
return (ck_blocks_header & (1u << (2 * ck_blocks_header_offset))) != 0;
}
}
bool non_consuming() const {
return (_state == state::ADD_BLOCK);
}
bool is_block_null() {
return (ck_blocks_header & (1u << (2 * ck_blocks_header_offset + 1))) != 0;
}
processing_result process_state(temporary_buffer<char>& data) {
bool no_more_ck_blocks() { return ck_range.empty(); }
void move_to_next_ck_block() {
ck_range.advance_begin(1);
++ck_blocks_header_offset;
if (ck_blocks_header_offset == 32u) {
ck_blocks_header_offset = 0u;
}
}
bool should_read_block_header() {
return ck_blocks_header_offset == 0u;
}
std::optional<uint32_t> get_ck_block_value_length() {
return ck_range.front();
}
position_in_partition make_position() {
auto key = clustering_key_prefix::from_range(clustering_key_values | boost::adaptors::transformed(
[] (const temporary_buffer<char>& b) { return to_bytes_view(b); }));
if (kind == bound_kind_m::clustering) {
return position_in_partition::for_key(std::move(key));
}
bound_kind rt_marker_kind = is_in_bound_kind(kind)
? to_bound_kind(kind)
:(parsing_start_key ? boundary_to_start_bound : boundary_to_end_bound)(kind);
return position_in_partition(position_in_partition::range_tag_t{}, rt_marker_kind, std::move(key));
}
};
std::variant<k_l_parser_context, m_parser_context> _ctx;
void process_state(temporary_buffer<char>& data, k_l_parser_context& ctx) {
using state_k_l = typename k_l_parser_context::state;
while (true) {
switch (_state) {
case state::START_NAME_LENGTH:
switch (ctx.state) {
case state_k_l::START_NAME_LENGTH:
if (this->read_16(data) != continuous_data_consumer::read_status::ready) {
_state = state::START_NAME_BYTES;
ctx.state = state_k_l::START_NAME_BYTES;
break;
}
case state::START_NAME_BYTES:
if (this->read_bytes(data, this->_u16, _start) != continuous_data_consumer::read_status::ready) {
_state = state::END_NAME_LENGTH;
case state_k_l::START_NAME_BYTES:
if (this->read_bytes(data, this->_u16, ctx.start) != continuous_data_consumer::read_status::ready) {
ctx.state = state_k_l::END_NAME_LENGTH;
break;
}
case state::END_NAME_LENGTH:
case state_k_l::END_NAME_LENGTH:
if (this->read_16(data) != continuous_data_consumer::read_status::ready) {
_state = state::END_NAME_BYTES;
ctx.state = state_k_l::END_NAME_BYTES;
break;
}
case state::END_NAME_BYTES:
if (this->read_bytes(data, this->_u16, _end) != continuous_data_consumer::read_status::ready) {
_state = state::OFFSET;
case state_k_l::END_NAME_BYTES:
if (this->read_bytes(data, this->_u16, ctx.end) != continuous_data_consumer::read_status::ready) {
ctx.state = state_k_l::OFFSET;
break;
}
case state::OFFSET:
case state_k_l::OFFSET:
if (this->read_64(data) != continuous_data_consumer::read_status::ready) {
_state = state::WIDTH;
ctx.state = state_k_l::WIDTH;
break;
}
case state::WIDTH:
_offset = this->_u64;
case state_k_l::WIDTH:
ctx.offset = this->_u64;
if (this->read_64(data) != continuous_data_consumer::read_status::ready) {
_state = state::ADD_BLOCK;
ctx.state = state_k_l::ADD_BLOCK;
break;
}
case state::ADD_BLOCK:
_width = this->_u64;
_state = state::START_NAME_LENGTH;
case state_k_l::ADD_BLOCK:
ctx.width = this->_u64;
ctx.state = state_k_l::START_NAME_LENGTH;
--_num_blocks_left;
_pi_blocks.emplace_back(std::move(_start), std::move(_end), _offset, _width);
_pi_blocks.emplace_back(std::move(ctx.start), std::move(ctx.end), ctx.offset, ctx.width);
if (_num_blocks_left == 0) {
break;
} else {
@@ -222,10 +295,177 @@ public:
}
break;
};
}
void process_state(temporary_buffer<char>& data, m_parser_context& ctx) {
static constexpr size_t width_base = 65536;
using state_m = typename m_parser_context::state;
while (true) {
switch (ctx.state) {
case state_m::CLUSTERING_START:
clustering_start_label:
ctx.clustering_key_values.clear();
ctx.clustering_key_values.reserve(ctx.clustering_values_fixed_lengths.size());
ctx.ck_range = boost::make_iterator_range(ctx.clustering_values_fixed_lengths);
ctx.ck_blocks_header_offset = 0u;
if (read_8(data) != read_status::ready) {
ctx.state = state_m::CK_KIND;
break;
}
case state_m::CK_KIND:
ctx.kind = bound_kind_m{_u8};
if (ctx.kind == bound_kind_m::clustering) {
ctx.state = state_m::CK_BLOCK;
goto ck_block_label;
}
if (read_16(data) != read_status::ready) {
ctx.state = state_m::CK_SIZE;
}
case state_m::CK_SIZE:
if (_u16 < _s.clustering_key_size()) {
ctx.ck_range.drop_back(_s.clustering_key_size() - _u16);
}
case state_m::CK_BLOCK:
ck_block_label:
if (ctx.no_more_ck_blocks()) {
goto add_clustering_key_label;
}
if (!ctx.should_read_block_header()) {
ctx.state = state_m::CK_BLOCK2;
goto ck_block2_label;
}
if (read_unsigned_vint(data) != read_status::ready) {
ctx.state = state_m::CK_BLOCK_HEADER;
break;
}
case state_m::CK_BLOCK_HEADER:
ctx.ck_blocks_header = _u64;
case state_m::CK_BLOCK2:
ck_block2_label:
{
if (ctx.is_block_empty()) {
ctx.clustering_key_values.push_back({});
ctx.move_to_next_ck_block();
goto ck_block_label;
}
if (ctx.is_block_null()) {
ctx.move_to_next_ck_block();
goto ck_block_label;
}
read_status status = read_status::waiting;
if (auto len = ctx.get_ck_block_value_length()) {
status = read_bytes(data, *len, ctx.column_value);
} else {
status = read_unsigned_vint_length_bytes(data, ctx.column_value);
}
if (status != read_status::ready) {
ctx.state = state_m::CK_BLOCK_END;
break;
}
}
case state_m::CK_BLOCK_END:
ctx.clustering_key_values.push_back(std::move(ctx.column_value));
ctx.move_to_next_ck_block();
ctx.state = state_m::CK_BLOCK;
goto ck_block_label;
case state_m::ADD_CLUSTERING_KEY:
add_clustering_key_label:
(ctx.parsing_start_key ? ctx.start_pos : ctx.end_pos) = ctx.make_position();
ctx.parsing_start_key = !ctx.parsing_start_key;
if (!ctx.end_pos) {
ctx.state = state_m::CLUSTERING_START;
goto clustering_start_label;
}
case state_m::OFFSET:
if (read_unsigned_vint(data) != continuous_data_consumer::read_status::ready) {
ctx.state = state_m::WIDTH;
break;
}
case state_m::WIDTH:
ctx.offset = _u64;
if (read_signed_vint(data) != continuous_data_consumer::read_status::ready) {
ctx.state = state_m::END_OPEN_MARKER_FLAG;
break;
}
case state_m::END_OPEN_MARKER_FLAG:
assert(_i64 + width_base > 0);
ctx.width = (_i64 + width_base);
if (read_8(data) != continuous_data_consumer::read_status::ready) {
ctx.state = state_m::END_OPEN_MARKER_LOCAL_DELETION_TIME;
break;
}
case state_m::END_OPEN_MARKER_LOCAL_DELETION_TIME:
if (_u8 == 0) {
ctx.state = state_m::ADD_BLOCK;
goto add_block_label;
}
ctx.end_open_marker.emplace();
if (read_32(data) != continuous_data_consumer::read_status::ready) {
ctx.state = state_m::END_OPEN_MARKER_MARKED_FOR_DELETE_AT_1;
break;
}
case state_m::END_OPEN_MARKER_MARKED_FOR_DELETE_AT_1:
ctx.end_open_marker->local_deletion_time = _u32;
if (read_64(data) != continuous_data_consumer::read_status::ready) {
ctx.state = state_m::END_OPEN_MARKER_MARKED_FOR_DELETE_AT_2;
break;
}
case state_m::END_OPEN_MARKER_MARKED_FOR_DELETE_AT_2:
ctx.end_open_marker->marked_for_delete_at = _u64;
case m_parser_context::state::ADD_BLOCK:
add_block_label:
_pi_blocks.emplace_back(*std::exchange(ctx.start_pos, {}),
*std::exchange(ctx.end_pos, {}),
ctx.offset,
ctx.width,
*std::exchange(ctx.end_open_marker, {}));
ctx.state = state_m::CLUSTERING_START;
--_num_blocks_left;
if (_num_blocks_left == 0) {
break;
} else {
// keep running in the loop until we either are out of data
// or have consumed all the blocks
continue;
}
}
break;
}
}
public:
void verify_end_state() {
if (_num_blocks_left != 0) {
throw std::runtime_error("promoted_index_blocks_reader - no more data but parsing is incomplete");
}
}
bool non_consuming(const k_l_parser_context& ctx) const {
return ctx.state == k_l_parser_context::state::ADD_BLOCK;
}
bool non_consuming(const m_parser_context& ctx) const {
using state_m = typename m_parser_context::state;
return ctx.state == state_m::CK_SIZE
|| ctx.state == state_m::CK_BLOCK_HEADER
|| ctx.state == state_m::CK_BLOCK_END
|| ctx.state == state_m::ADD_CLUSTERING_KEY
|| ctx.state == state_m::END_OPEN_MARKER_MARKED_FOR_DELETE_AT_2
|| ctx.state == state_m::ADD_BLOCK;
}
bool non_consuming() const {
return std::visit([this] (const auto& ctx) { return non_consuming(ctx); }, _ctx);
}
processing_result process_state(temporary_buffer<char>& data) {
std::visit([this, &data] (auto& ctx) mutable { return process_state(data, ctx); }, _ctx);
if (_mode == consuming_mode::consume_until) {
assert(_pos);
auto cmp_with_start = [this, pos_cmp = position_in_partition::composite_less_compare(_s)]
auto cmp_with_start = [this, pos_cmp = promoted_index_block_compare(_s)]
(position_in_partition_view pos, const promoted_index_block& block) -> bool {
return pos_cmp(pos, block.start(_s));
};
@@ -251,12 +491,25 @@ public:
void switch_to_consume_until_mode(position_in_partition_view pos) { _pos = pos; _mode = consuming_mode::consume_until; }
promoted_index_blocks& get_pi_blocks() { return _pi_blocks; };
promoted_index_blocks_reader(input_stream<char>&& promoted_index_stream, uint32_t num_blocks, const schema& s, uint64_t start, uint64_t maxlen)
// This constructor is used for ka/la format which does not have information about columns fixed lengths
promoted_index_blocks_reader(input_stream<char>&& promoted_index_stream, uint32_t num_blocks,
const schema& s, uint64_t start, uint64_t maxlen)
: continuous_data_consumer(std::move(promoted_index_stream), start, maxlen)
, _total_num_blocks(num_blocks)
, _num_blocks_left(num_blocks)
, _s(s)
{}
// This constructor is used for mc format which requires information about columns fixed lengths for parsing
promoted_index_blocks_reader(input_stream<char>&& promoted_index_stream, uint32_t num_blocks,
const schema& s, uint64_t start, uint64_t maxlen,
column_values_fixed_lengths&& clustering_values_fixed_lengths)
: continuous_data_consumer(std::move(promoted_index_stream), start, maxlen)
, _total_num_blocks{num_blocks}
, _num_blocks_left{num_blocks}
, _s{s}
, _ctx{m_parser_context{std::move(clustering_values_fixed_lengths)}}
{}
};
class promoted_index {
@@ -273,6 +526,13 @@ public:
, _reader{std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size}
{}
promoted_index(const schema& s, deletion_time del_time, input_stream<char>&& promoted_index_stream,
uint32_t promoted_index_size, uint32_t blocks_count, column_values_fixed_lengths clustering_values_fixed_lengths)
: _del_time{del_time}
, _promoted_index_size(promoted_index_size)
, _reader{std::move(promoted_index_stream), blocks_count, s, 0, promoted_index_size, std::move(clustering_values_fixed_lengths)}
{}
[[nodiscard]] deletion_time get_deletion_time() const { return _del_time; }
[[nodiscard]] uint32_t get_promoted_index_size() const { return _promoted_index_size; }
[[nodiscard]] promoted_index_blocks_reader& get_reader() { return _reader; };

View File

@@ -92,11 +92,9 @@ private:
trust_promoted_index _trust_pi;
const schema& _s;
const sstable_version_types _version;
std::optional<column_values_fixed_lengths> _ck_values_fixed_lengths;
bool is_mc_format() const {
return _version == sstable_version_types::mc;
}
inline bool is_mc_format() const { return static_cast<bool>(_ck_values_fixed_lengths); }
public:
void verify_end_state() {
@@ -184,10 +182,19 @@ public:
}
state_CONSUME_ENTRY:
case state::CONSUME_ENTRY: {
auto len = (_key.size() + _promoted_index_size + 14);
auto entry_header_length = is_mc_format()
? sizeof(uint16_t) + unsigned_vint::serialized_size(_position) + unsigned_vint::serialized_size(_promoted_index_size)
: sizeof(uint16_t) + sizeof(uint64_t) + sizeof(uint32_t);
auto len = entry_header_length + _key.size() + _promoted_index_size;
size_t delta = 0;
if (_deletion_time) {
_num_pi_blocks = get_uint32();
_promoted_index_size -= 16;
delta = is_mc_format()
? (unsigned_vint::serialized_size(_partition_header_length) + sizeof(uint32_t)
+ sizeof(uint64_t) + unsigned_vint::serialized_size(_num_pi_blocks))
: sizeof(uint32_t) + sizeof(uint64_t) + sizeof(uint32_t);
_promoted_index_size -= delta;
}
auto data_size = data.size();
std::optional<input_stream<char>> promoted_index_stream;
@@ -199,7 +206,7 @@ public:
} else {
promoted_index_stream = make_prepended_input_stream(
std::move(data),
make_file_input_stream(_index_file, _entry_offset + _key.size() + 30 + data_size,
make_file_input_stream(_index_file, _entry_offset + _key.size() + entry_header_length + delta + data_size,
_promoted_index_size - data_size, _options).detach());
}
} else {
@@ -207,7 +214,14 @@ public:
}
std::unique_ptr<promoted_index> index;
if (promoted_index_stream) {
index = std::make_unique<promoted_index>(_s, *_deletion_time, std::move(*promoted_index_stream), _promoted_index_size, _num_pi_blocks);
if (is_mc_format()) {
index = std::make_unique<promoted_index>(_s, *_deletion_time, std::move(*promoted_index_stream),
_promoted_index_size,
_num_pi_blocks, *_ck_values_fixed_lengths);
} else {
index = std::make_unique<promoted_index>(_s, *_deletion_time, std::move(*promoted_index_stream),
_promoted_index_size, _num_pi_blocks);
}
}
_consumer.consume_entry(index_entry{std::move(_key), _position, std::move(index)}, _entry_offset);
_entry_offset += len;
@@ -230,10 +244,10 @@ public:
index_consume_entry_context(IndexConsumer& consumer, trust_promoted_index trust_pi, const schema& s,
file index_file, file_input_stream_options options, uint64_t start,
uint64_t maxlen, sstable_version_types version)
uint64_t maxlen, std::optional<column_values_fixed_lengths> ck_values_fixed_lengths)
: continuous_data_consumer(make_file_input_stream(index_file, start, maxlen, options), start, maxlen)
, _consumer(consumer), _index_file(index_file), _options(options)
, _entry_offset(start), _trust_pi(trust_pi), _s(s), _version(version)
, _entry_offset(start), _trust_pi(trust_pi), _s(s), _ck_values_fixed_lengths(std::move(ck_values_fixed_lengths))
{}
void reset(uint64_t offset) {
@@ -308,7 +322,10 @@ class index_reader {
: _consumer(quantity)
, _context(_consumer,
trust_promoted_index(sst->has_correct_promoted_index_entries()), *sst->_schema, sst->_index_file,
get_file_input_stream_options(sst, pc), begin, end - begin, sst->get_version())
get_file_input_stream_options(sst, pc), begin, end - begin,
(sst->get_version() == sstable_version_types::mc
? std::make_optional(get_clustering_values_fixed_lengths(sst->get_serialization_header()))
: std::optional<column_values_fixed_lengths>{}))
{ }
};

View File

@@ -83,6 +83,23 @@ inline bool is_boundary(bound_kind_m kind) {
}
}
inline constexpr bool is_in_bound_kind(bound_kind_m kind) {
return kind == bound_kind_m::incl_start
|| kind == bound_kind_m::excl_start
|| kind == bound_kind_m::incl_end
|| kind == bound_kind_m::excl_end;
}
inline bound_kind boundary_to_start_bound(bound_kind_m kind) {
assert(kind == bound_kind_m::incl_end_excl_start || kind == bound_kind_m::excl_end_incl_start);
return (kind == bound_kind_m::incl_end_excl_start) ? bound_kind::excl_start : bound_kind::incl_start;
}
inline bound_kind boundary_to_end_bound(bound_kind_m kind) {
assert(kind == bound_kind_m::incl_end_excl_start || kind == bound_kind_m::excl_end_incl_start);
return (kind == bound_kind_m::incl_end_excl_start) ? bound_kind::incl_end : bound_kind::excl_end;
}
class random_access_reader;
// Utilities for reading integral values in variable-length format

View File

@@ -3597,7 +3597,10 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
return do_with(summary_generator(_components->summary),
[this, &pc, options = std::move(options), index_file, index_size] (summary_generator& s) mutable {
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
s, trust_promoted_index::yes, *_schema, index_file, std::move(options), 0, index_size, _version);
s, trust_promoted_index::yes, *_schema, index_file, std::move(options), 0, index_size,
(_version == sstable_version_types::mc
? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header()))
: std::optional<column_values_fixed_lengths>{}));
return ctx->consume_input().finally([ctx] {
return ctx->close();
}).then([this, ctx, &s] {