diff --git a/sstables/liveness_info.hh b/sstables/liveness_info.hh index 8ab7de9e04..87294a5883 100644 --- a/sstables/liveness_info.hh +++ b/sstables/liveness_info.hh @@ -26,6 +26,7 @@ #include "gc_clock.hh" #include "sstables/m_format_read_helpers.hh" #include "sstables/types.hh" +#include "mutation_partition.hh" namespace sstables { @@ -34,6 +35,11 @@ class liveness_info { api::timestamp_type _timestamp; gc_clock::duration _ttl; gc_clock::time_point _local_deletion_time; + bool is_set() const { + return _timestamp != api::missing_timestamp + || _ttl != gc_clock::duration::zero() + || _local_deletion_time != gc_clock::time_point::max(); + } public: explicit liveness_info(const serialization_header& header) : _header(header) @@ -55,6 +61,14 @@ public: api::timestamp_type timestamp() const { return _timestamp; } gc_clock::duration ttl() const { return _ttl; } gc_clock::time_point local_deletion_time() const { return _local_deletion_time; } + row_marker to_row_marker() const { + if (!is_set()) { + return row_marker(); + } + return _ttl != gc_clock::duration::zero() || _local_deletion_time != gc_clock::time_point::max() + ? row_marker(_timestamp, _ttl, _local_deletion_time) + : row_marker(_timestamp); + } }; } diff --git a/sstables/mp_row_consumer.hh b/sstables/mp_row_consumer.hh index c460a060fa..b996c9e1a7 100644 --- a/sstables/mp_row_consumer.hh +++ b/sstables/mp_row_consumer.hh @@ -31,6 +31,7 @@ #include "row.hh" #include "clustering_ranges_walker.hh" #include "utils/data_input.hh" +#include "liveness_info.hh" namespace sstables { @@ -931,6 +932,12 @@ public: return proceed::yes; } + virtual proceed consume_row_marker_and_tombstone(const liveness_info& info, tombstone t) override { + _in_progress_row.apply(t); + _in_progress_row.apply(info.to_row_marker()); + return proceed::yes; + } + virtual proceed consume_static_row_start() override { _inside_static_row = true; _in_progress_static_row = static_row(); diff --git a/sstables/row.hh b/sstables/row.hh index 7d57df664f..c72c3a4c9f 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -41,6 +41,7 @@ #include "column_translation.hh" #include "sstables.hh" +#include "tombstone.hh" // sstables::data_consume_row feeds the contents of a single row into a // row_consumer object: @@ -154,6 +155,8 @@ public: virtual proceed consume_row_start(const std::vector>& ecp) = 0; + virtual proceed consume_row_marker_and_tombstone(const sstables::liveness_info& info, tombstone t) = 0; + virtual proceed consume_static_row_start() = 0; virtual proceed consume_column(stdx::optional column_id, @@ -551,6 +554,7 @@ private: ROW_BODY_DELETION, ROW_BODY_DELETION_2, ROW_BODY_DELETION_3, + ROW_BODY_MARKER, ROW_BODY_MISSING_COLUMNS, ROW_BODY_MISSING_COLUMNS_2, ROW_BODY_MISSING_COLUMNS_READ_COLUMNS, @@ -598,6 +602,8 @@ private: boost::iterator_range>::const_iterator> _ck_column_value_fix_lengths; + tombstone _row_tombstone; + column_flags_m _column_flags{0}; api::timestamp_type _column_timestamp; gc_clock::time_point _column_local_deletion_time; @@ -732,6 +738,7 @@ public: case state::FLAGS: flags_label: _liveness.reset(); + _row_tombstone = {}; if (read_8(data) != read_status::ready) { _state = state::FLAGS_2; break; @@ -874,23 +881,28 @@ public: case state::ROW_BODY_DELETION: row_body_deletion_label: if (!_flags.has_deletion()) { - _state = state::ROW_BODY_MISSING_COLUMNS; - goto row_body_missing_columns_label; + _state = state::ROW_BODY_MARKER; + goto row_body_marker_label; } if (read_unsigned_vint(data) != read_status::ready) { _state = state::ROW_BODY_DELETION_2; break; } case state::ROW_BODY_DELETION_2: - // TODO consume mark_for_deleted_at + _row_tombstone.timestamp = parse_timestamp(_header, _u64); if (read_unsigned_vint(data) != read_status::ready) { _state = state::ROW_BODY_DELETION_3; break; } case state::ROW_BODY_DELETION_3: - // TODO consume local_deletion_time + _row_tombstone.deletion_time = parse_expiry(_header, _u64); + case state::ROW_BODY_MARKER: + row_body_marker_label: + if (_consumer.consume_row_marker_and_tombstone(_liveness, std::move(_row_tombstone)) == consumer_m::proceed::no) { + _state = state::ROW_BODY_MISSING_COLUMNS; + break; + } case state::ROW_BODY_MISSING_COLUMNS: - row_body_missing_columns_label: if (!_flags.has_all_columns()) { if (read_unsigned_vint(data) != read_status::ready) { _state = state::ROW_BODY_MISSING_COLUMNS_2;