Introduce consumer_m and data_consume_rows_context_m

Those classes can handle SSTables in MC format.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
Piotr Jastrzebski
2018-04-16 09:23:37 +02:00
parent b343212073
commit 2ee3d8b87b
2 changed files with 272 additions and 0 deletions

View File

@@ -113,6 +113,29 @@ public:
}
};
class consumer_m {
public:
using proceed = data_consumer::proceed;
virtual ~consumer_m() = default;
// Consume the row's key and deletion_time. The latter determines if the
// row is a tombstone, and if so, when it has been deleted.
// Note that the key is in serialized form, and should be deserialized
// (according to the schema) before use.
// As explained above, the key object is only valid during this call, and
// if the implementation wishes to save it, it must copy the *contents*.
virtual proceed consume_partition_start(sstables::key_view key, sstables::deletion_time deltime) = 0;
// Called at the end of the row, after all cells.
// Returns a flag saying whether the sstable consumer should stop now, or
// proceed consuming more data.
virtual proceed consume_partition_end() = 0;
// Called when the reader is fast forwarded to given element.
virtual void reset(sstables::indexable_element) = 0;
};
namespace sstables {
// data_consume_rows_context remembers the context that an ongoing
@@ -447,4 +470,207 @@ public:
}
};
// data_consume_rows_context_m remembers the context that an ongoing
// data_consume_rows() future is in for SSTable in 3_x format.
class data_consume_rows_context_m : public data_consumer::continuous_data_consumer<data_consume_rows_context_m> {
private:
enum class state {
PARTITION_START,
DELETION_TIME,
DELETION_TIME_2,
DELETION_TIME_3,
FLAGS,
FLAGS_2,
EXTENDED_FLAGS,
STATIC_ROW,
NON_STATIC_ROW,
NON_STATIC_ROW_SIZE,
NON_STATIC_ROW_PREV_SIZE,
NON_STATIC_ROW_TIMESTAMP,
NON_STATIC_ROW_TIMESTAMP_TTL,
NON_STATIC_ROW_TIMESTAMP_DELTIME,
NON_STATIC_ROW_DELETION,
NON_STATIC_ROW_DELETION_2,
NON_STATIC_ROW_DELETION_3,
RANGE_TOMBSTONE_MARKER,
} _state = state::PARTITION_START;
consumer_m& _consumer;
temporary_buffer<char> _pk;
unfiltered_flags_m _flags{0};
unfiltered_extended_flags_m _extended_flags{0};
bool _is_first_unfiltered = true;
public:
bool non_consuming() const {
return (_state == state::DELETION_TIME_3
|| _state == state::FLAGS_2
|| _state == state::EXTENDED_FLAGS
|| _state == state::NON_STATIC_ROW_TIMESTAMP_DELTIME
|| _state == state::NON_STATIC_ROW_DELETION_3) && (_prestate == prestate::NONE);
}
data_consumer::processing_result process_state(temporary_buffer<char>& data) {
switch (_state) {
case state::PARTITION_START:
_is_first_unfiltered = true;
if (read_short_length_bytes(data, _pk) != read_status::ready) {
_state = state::DELETION_TIME;
break;
}
case state::DELETION_TIME:
if (read_32(data) != read_status::ready) {
_state = state::DELETION_TIME_2;
break;
}
case state::DELETION_TIME_2:
if (read_64(data) != read_status::ready) {
_state = state::DELETION_TIME_3;
break;
}
case state::DELETION_TIME_3: {
deletion_time del;
del.local_deletion_time = _u32;
del.marked_for_delete_at = _u64;
auto ret = _consumer.consume_partition_start(key_view(to_bytes_view(_pk)), del);
// after calling the consume function, we can release the
// buffers we held for it.
_pk.release();
_state = state::FLAGS;
if (ret == consumer_m::proceed::no) {
return consumer_m::proceed::no;
}
}
case state::FLAGS:
if (read_8(data) != read_status::ready) {
_state = state::FLAGS_2;
break;
}
case state::FLAGS_2:
_flags = unfiltered_flags_m(_u8);
if (_flags.is_end_of_partition()) {
_state = state::PARTITION_START;
if (_consumer.consume_partition_end() == consumer_m::proceed::no) {
return consumer_m::proceed::no;
}
break;
} else if (_flags.is_range_tombstone()) {
_state = state::RANGE_TOMBSTONE_MARKER;
break;
} else if (!_flags.has_extended_flags()) {
_extended_flags = unfiltered_extended_flags_m(uint8_t{0u});
_state = state::NON_STATIC_ROW;
break;
}
if (read_8(data) != read_status::ready) {
_state = state::EXTENDED_FLAGS;
break;
}
case state::EXTENDED_FLAGS:
_extended_flags = unfiltered_extended_flags_m(_u8);
if (_extended_flags.is_static()) {
if (_is_first_unfiltered) {
_state = state::STATIC_ROW;
break;
} else {
throw malformed_sstable_exception("static row should be a first unfiltered in a partition");
}
}
case state::NON_STATIC_ROW:
_is_first_unfiltered = false;
// Clustering blocks should be read here but serialization header is needed for that.
// Table with just partition key does not have any so it's ok for the first version.
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::NON_STATIC_ROW_SIZE;
break;
}
case state::NON_STATIC_ROW_SIZE:
// Ignore the result
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::NON_STATIC_ROW_PREV_SIZE;
break;
}
case state::NON_STATIC_ROW_PREV_SIZE:
// Ignore the result
if (!_flags.has_timestamp()) {
_state = state::NON_STATIC_ROW_DELETION;
break;
}
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::NON_STATIC_ROW_TIMESTAMP;
break;
}
case state::NON_STATIC_ROW_TIMESTAMP:
// TODO: consume timestamp
if (!_flags.has_ttl()) {
_state = state::NON_STATIC_ROW_DELETION;
break;
}
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::NON_STATIC_ROW_TIMESTAMP_TTL;
break;
}
case state::NON_STATIC_ROW_TIMESTAMP_TTL:
// TODO consume ttl
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::NON_STATIC_ROW_TIMESTAMP_DELTIME;
break;
}
case state::NON_STATIC_ROW_TIMESTAMP_DELTIME:
// TODO consume deltime
case state::NON_STATIC_ROW_DELETION:
if (!_flags.has_deletion()) {
_state = state::FLAGS;
break;
}
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::NON_STATIC_ROW_DELETION_2;
break;
}
case state::NON_STATIC_ROW_DELETION_2:
// TODO consume mark_for_deleted_at
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::NON_STATIC_ROW_DELETION_3;
break;
}
case state::NON_STATIC_ROW_DELETION_3:
// TODO consume local_deletion_time
_state = state::FLAGS;
break;
case state::STATIC_ROW:
throw malformed_sstable_exception("unimplemented state");
case state::RANGE_TOMBSTONE_MARKER:
throw malformed_sstable_exception("unimplemented state");
default:
throw malformed_sstable_exception("unknown state");
}
return row_consumer::proceed::yes;
}
data_consume_rows_context_m(consumer_m& consumer, input_stream<char> && input, uint64_t start, uint64_t maxlen)
: continuous_data_consumer(std::move(input), start, maxlen), _consumer(consumer)
{ }
void verify_end_state() {
if (_state != state::PARTITION_START || _prestate != prestate::NONE) {
throw malformed_sstable_exception("end of input, but not end of partition");
}
}
void reset(indexable_element el) {
switch (el) {
case indexable_element::partition:
_state = state::PARTITION_START;
break;
default:
assert(0);
}
_consumer.reset(el);
}
};
}

View File

@@ -512,5 +512,51 @@ inline column_mask operator&(column_mask m1, column_mask m2) {
inline column_mask operator|(column_mask m1, column_mask m2) {
return column_mask(static_cast<uint8_t>(m1) | static_cast<uint8_t>(m2));
}
class unfiltered_flags_m final {
static const uint8_t END_OF_PARTITION = 0x01u;
static const uint8_t IS_MARKER = 0x02u;
static const uint8_t HAS_TIMESTAMP = 0x04u;
static const uint8_t HAS_TTL = 0x08u;
static const uint8_t HAS_DELETION = 0x10u;
static const uint8_t HAS_EXTENDED_FLAGS = 0x80u;
uint8_t _flags;
bool check_flag(const uint8_t flag) const {
return (_flags & flag) != 0u;
}
public:
explicit unfiltered_flags_m(uint8_t flags) : _flags(flags) { }
bool is_end_of_partition() const {
return check_flag(END_OF_PARTITION);
}
bool is_range_tombstone() const {
return check_flag(IS_MARKER);
}
bool has_extended_flags() const {
return check_flag(HAS_EXTENDED_FLAGS);
}
bool has_timestamp() const {
return check_flag(HAS_TIMESTAMP);
}
bool has_ttl() const {
return check_flag(HAS_TTL);
}
bool has_deletion() const {
return check_flag(HAS_DELETION);
}
};
class unfiltered_extended_flags_m final {
static const uint8_t IS_STATIC = 0x01u;
uint8_t _flags;
bool check_flag(const uint8_t flag) const {
return (_flags & flag) != 0u;
}
public:
explicit unfiltered_extended_flags_m(uint8_t flags) : _flags(flags) { }
bool is_static() const {
return check_flag(IS_STATIC);
}
};
}