diff --git a/sstables/row.hh b/sstables/row.hh index bcaf547be3..e42f8e514b 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -113,6 +113,29 @@ public: } }; +class consumer_m { +public: + using proceed = data_consumer::proceed; + + virtual ~consumer_m() = default; + + // Consume the row's key and deletion_time. The latter determines if the + // row is a tombstone, and if so, when it has been deleted. + // Note that the key is in serialized form, and should be deserialized + // (according to the schema) before use. + // As explained above, the key object is only valid during this call, and + // if the implementation wishes to save it, it must copy the *contents*. + virtual proceed consume_partition_start(sstables::key_view key, sstables::deletion_time deltime) = 0; + + // Called at the end of the row, after all cells. + // Returns a flag saying whether the sstable consumer should stop now, or + // proceed consuming more data. + virtual proceed consume_partition_end() = 0; + + // Called when the reader is fast forwarded to given element. + virtual void reset(sstables::indexable_element) = 0; +}; + namespace sstables { // data_consume_rows_context remembers the context that an ongoing @@ -447,4 +470,207 @@ public: } }; +// data_consume_rows_context_m remembers the context that an ongoing +// data_consume_rows() future is in for SSTable in 3_x format. +class data_consume_rows_context_m : public data_consumer::continuous_data_consumer { +private: + enum class state { + PARTITION_START, + DELETION_TIME, + DELETION_TIME_2, + DELETION_TIME_3, + FLAGS, + FLAGS_2, + EXTENDED_FLAGS, + STATIC_ROW, + NON_STATIC_ROW, + NON_STATIC_ROW_SIZE, + NON_STATIC_ROW_PREV_SIZE, + NON_STATIC_ROW_TIMESTAMP, + NON_STATIC_ROW_TIMESTAMP_TTL, + NON_STATIC_ROW_TIMESTAMP_DELTIME, + NON_STATIC_ROW_DELETION, + NON_STATIC_ROW_DELETION_2, + NON_STATIC_ROW_DELETION_3, + RANGE_TOMBSTONE_MARKER, + } _state = state::PARTITION_START; + + consumer_m& _consumer; + + temporary_buffer _pk; + + unfiltered_flags_m _flags{0}; + unfiltered_extended_flags_m _extended_flags{0}; + bool _is_first_unfiltered = true; +public: + bool non_consuming() const { + return (_state == state::DELETION_TIME_3 + || _state == state::FLAGS_2 + || _state == state::EXTENDED_FLAGS + || _state == state::NON_STATIC_ROW_TIMESTAMP_DELTIME + || _state == state::NON_STATIC_ROW_DELETION_3) && (_prestate == prestate::NONE); + } + + data_consumer::processing_result process_state(temporary_buffer& data) { + switch (_state) { + case state::PARTITION_START: + _is_first_unfiltered = true; + if (read_short_length_bytes(data, _pk) != read_status::ready) { + _state = state::DELETION_TIME; + break; + } + case state::DELETION_TIME: + if (read_32(data) != read_status::ready) { + _state = state::DELETION_TIME_2; + break; + } + case state::DELETION_TIME_2: + if (read_64(data) != read_status::ready) { + _state = state::DELETION_TIME_3; + break; + } + case state::DELETION_TIME_3: { + deletion_time del; + del.local_deletion_time = _u32; + del.marked_for_delete_at = _u64; + auto ret = _consumer.consume_partition_start(key_view(to_bytes_view(_pk)), del); + // after calling the consume function, we can release the + // buffers we held for it. + _pk.release(); + _state = state::FLAGS; + if (ret == consumer_m::proceed::no) { + return consumer_m::proceed::no; + } + } + case state::FLAGS: + if (read_8(data) != read_status::ready) { + _state = state::FLAGS_2; + break; + } + case state::FLAGS_2: + _flags = unfiltered_flags_m(_u8); + + if (_flags.is_end_of_partition()) { + _state = state::PARTITION_START; + if (_consumer.consume_partition_end() == consumer_m::proceed::no) { + return consumer_m::proceed::no; + } + break; + } else if (_flags.is_range_tombstone()) { + _state = state::RANGE_TOMBSTONE_MARKER; + break; + } else if (!_flags.has_extended_flags()) { + _extended_flags = unfiltered_extended_flags_m(uint8_t{0u}); + _state = state::NON_STATIC_ROW; + break; + } + if (read_8(data) != read_status::ready) { + _state = state::EXTENDED_FLAGS; + break; + } + case state::EXTENDED_FLAGS: + _extended_flags = unfiltered_extended_flags_m(_u8); + if (_extended_flags.is_static()) { + if (_is_first_unfiltered) { + _state = state::STATIC_ROW; + break; + } else { + throw malformed_sstable_exception("static row should be a first unfiltered in a partition"); + } + } + case state::NON_STATIC_ROW: + _is_first_unfiltered = false; + // Clustering blocks should be read here but serialization header is needed for that. + // Table with just partition key does not have any so it's ok for the first version. + + if (read_unsigned_vint(data) != read_status::ready) { + _state = state::NON_STATIC_ROW_SIZE; + break; + } + case state::NON_STATIC_ROW_SIZE: + // Ignore the result + if (read_unsigned_vint(data) != read_status::ready) { + _state = state::NON_STATIC_ROW_PREV_SIZE; + break; + } + case state::NON_STATIC_ROW_PREV_SIZE: + // Ignore the result + if (!_flags.has_timestamp()) { + _state = state::NON_STATIC_ROW_DELETION; + break; + } + if (read_unsigned_vint(data) != read_status::ready) { + _state = state::NON_STATIC_ROW_TIMESTAMP; + break; + } + case state::NON_STATIC_ROW_TIMESTAMP: + // TODO: consume timestamp + if (!_flags.has_ttl()) { + _state = state::NON_STATIC_ROW_DELETION; + break; + } + if (read_unsigned_vint(data) != read_status::ready) { + _state = state::NON_STATIC_ROW_TIMESTAMP_TTL; + break; + } + case state::NON_STATIC_ROW_TIMESTAMP_TTL: + // TODO consume ttl + if (read_unsigned_vint(data) != read_status::ready) { + _state = state::NON_STATIC_ROW_TIMESTAMP_DELTIME; + break; + } + case state::NON_STATIC_ROW_TIMESTAMP_DELTIME: + // TODO consume deltime + case state::NON_STATIC_ROW_DELETION: + if (!_flags.has_deletion()) { + _state = state::FLAGS; + break; + } + if (read_unsigned_vint(data) != read_status::ready) { + _state = state::NON_STATIC_ROW_DELETION_2; + break; + } + case state::NON_STATIC_ROW_DELETION_2: + // TODO consume mark_for_deleted_at + if (read_unsigned_vint(data) != read_status::ready) { + _state = state::NON_STATIC_ROW_DELETION_3; + break; + } + case state::NON_STATIC_ROW_DELETION_3: + // TODO consume local_deletion_time + _state = state::FLAGS; + break; + case state::STATIC_ROW: + throw malformed_sstable_exception("unimplemented state"); + case state::RANGE_TOMBSTONE_MARKER: + throw malformed_sstable_exception("unimplemented state"); + default: + throw malformed_sstable_exception("unknown state"); + } + + return row_consumer::proceed::yes; + } + + data_consume_rows_context_m(consumer_m& consumer, input_stream && input, uint64_t start, uint64_t maxlen) + : continuous_data_consumer(std::move(input), start, maxlen), _consumer(consumer) + { } + + void verify_end_state() { + if (_state != state::PARTITION_START || _prestate != prestate::NONE) { + throw malformed_sstable_exception("end of input, but not end of partition"); + } + } + + void reset(indexable_element el) { + switch (el) { + case indexable_element::partition: + _state = state::PARTITION_START; + break; + default: + assert(0); + } + _consumer.reset(el); + } +}; + } diff --git a/sstables/types.hh b/sstables/types.hh index ba9bd21119..f80882db98 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -512,5 +512,51 @@ inline column_mask operator&(column_mask m1, column_mask m2) { inline column_mask operator|(column_mask m1, column_mask m2) { return column_mask(static_cast(m1) | static_cast(m2)); } + +class unfiltered_flags_m final { + static const uint8_t END_OF_PARTITION = 0x01u; + static const uint8_t IS_MARKER = 0x02u; + static const uint8_t HAS_TIMESTAMP = 0x04u; + static const uint8_t HAS_TTL = 0x08u; + static const uint8_t HAS_DELETION = 0x10u; + static const uint8_t HAS_EXTENDED_FLAGS = 0x80u; + uint8_t _flags; + bool check_flag(const uint8_t flag) const { + return (_flags & flag) != 0u; + } +public: + explicit unfiltered_flags_m(uint8_t flags) : _flags(flags) { } + bool is_end_of_partition() const { + return check_flag(END_OF_PARTITION); + } + bool is_range_tombstone() const { + return check_flag(IS_MARKER); + } + bool has_extended_flags() const { + return check_flag(HAS_EXTENDED_FLAGS); + } + bool has_timestamp() const { + return check_flag(HAS_TIMESTAMP); + } + bool has_ttl() const { + return check_flag(HAS_TTL); + } + bool has_deletion() const { + return check_flag(HAS_DELETION); + } +}; + +class unfiltered_extended_flags_m final { + static const uint8_t IS_STATIC = 0x01u; + uint8_t _flags; + bool check_flag(const uint8_t flag) const { + return (_flags & flag) != 0u; + } +public: + explicit unfiltered_extended_flags_m(uint8_t flags) : _flags(flags) { } + bool is_static() const { + return check_flag(IS_STATIC); + } +}; }