From 4c261d2e51fdd6c26db022023c71c6ae700ea72f Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Mon, 18 Jun 2018 20:26:39 +0200 Subject: [PATCH 1/3] sstable: add is_set and to_row_marker to liveness_info Signed-off-by: Piotr Jastrzebski --- sstables/liveness_info.hh | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sstables/liveness_info.hh b/sstables/liveness_info.hh index 8ab7de9e04..9b5b49190a 100644 --- a/sstables/liveness_info.hh +++ b/sstables/liveness_info.hh @@ -26,6 +26,7 @@ #include "gc_clock.hh" #include "sstables/m_format_read_helpers.hh" #include "sstables/types.hh" +#include "mutation_partition.hh" namespace sstables { @@ -55,6 +56,16 @@ public: api::timestamp_type timestamp() const { return _timestamp; } gc_clock::duration ttl() const { return _ttl; } gc_clock::time_point local_deletion_time() const { return _local_deletion_time; } + bool is_set() const { + return _timestamp != api::missing_timestamp + || _ttl != gc_clock::duration::zero() + || _local_deletion_time != gc_clock::time_point::max(); + } + row_marker to_row_marker() const { + return _ttl != gc_clock::duration::zero() || _local_deletion_time != gc_clock::time_point::max() + ? row_marker(_timestamp, _ttl, _local_deletion_time) + : row_marker(_timestamp); + } }; } From cbfc741d70cb685a5a5a289512188c7cca4c29c5 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Mon, 18 Jun 2018 20:28:02 +0200 Subject: [PATCH 2/3] sstable: Add consumer_m::consume_row_marker_and_tombstone Signed-off-by: Piotr Jastrzebski --- sstables/liveness_info.hh | 13 ++++++++----- sstables/mp_row_consumer.hh | 7 +++++++ sstables/row.hh | 3 +++ 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/sstables/liveness_info.hh b/sstables/liveness_info.hh index 9b5b49190a..87294a5883 100644 --- a/sstables/liveness_info.hh +++ b/sstables/liveness_info.hh @@ -35,6 +35,11 @@ class liveness_info { api::timestamp_type _timestamp; gc_clock::duration _ttl; gc_clock::time_point _local_deletion_time; + bool is_set() const { + return _timestamp != api::missing_timestamp + || _ttl != gc_clock::duration::zero() + || _local_deletion_time != gc_clock::time_point::max(); + } public: explicit liveness_info(const serialization_header& header) : _header(header) @@ -56,12 +61,10 @@ public: api::timestamp_type timestamp() const { return _timestamp; } gc_clock::duration ttl() const { return _ttl; } gc_clock::time_point local_deletion_time() const { return _local_deletion_time; } - bool is_set() const { - return _timestamp != api::missing_timestamp - || _ttl != gc_clock::duration::zero() - || _local_deletion_time != gc_clock::time_point::max(); - } row_marker to_row_marker() const { + if (!is_set()) { + return row_marker(); + } return _ttl != gc_clock::duration::zero() || _local_deletion_time != gc_clock::time_point::max() ? row_marker(_timestamp, _ttl, _local_deletion_time) : row_marker(_timestamp); diff --git a/sstables/mp_row_consumer.hh b/sstables/mp_row_consumer.hh index c460a060fa..b996c9e1a7 100644 --- a/sstables/mp_row_consumer.hh +++ b/sstables/mp_row_consumer.hh @@ -31,6 +31,7 @@ #include "row.hh" #include "clustering_ranges_walker.hh" #include "utils/data_input.hh" +#include "liveness_info.hh" namespace sstables { @@ -931,6 +932,12 @@ public: return proceed::yes; } + virtual proceed consume_row_marker_and_tombstone(const liveness_info& info, tombstone t) override { + _in_progress_row.apply(t); + _in_progress_row.apply(info.to_row_marker()); + return proceed::yes; + } + virtual proceed consume_static_row_start() override { _inside_static_row = true; _in_progress_static_row = static_row(); diff --git a/sstables/row.hh b/sstables/row.hh index 7d57df664f..df07a2f4e3 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -41,6 +41,7 @@ #include "column_translation.hh" #include "sstables.hh" +#include "tombstone.hh" // sstables::data_consume_row feeds the contents of a single row into a // row_consumer object: @@ -154,6 +155,8 @@ public: virtual proceed consume_row_start(const std::vector>& ecp) = 0; + virtual proceed consume_row_marker_and_tombstone(const sstables::liveness_info& info, tombstone t) = 0; + virtual proceed consume_static_row_start() = 0; virtual proceed consume_column(stdx::optional column_id, From 75edaff7b6cf64844c49b1d7ee1213966cb5d2d0 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Mon, 18 Jun 2018 20:28:37 +0200 Subject: [PATCH 3/3] sstable: consume row marker in data_consume_rows_context_m Signed-off-by: Piotr Jastrzebski --- sstables/row.hh | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/sstables/row.hh b/sstables/row.hh index df07a2f4e3..c72c3a4c9f 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -554,6 +554,7 @@ private: ROW_BODY_DELETION, ROW_BODY_DELETION_2, ROW_BODY_DELETION_3, + ROW_BODY_MARKER, ROW_BODY_MISSING_COLUMNS, ROW_BODY_MISSING_COLUMNS_2, ROW_BODY_MISSING_COLUMNS_READ_COLUMNS, @@ -601,6 +602,8 @@ private: boost::iterator_range>::const_iterator> _ck_column_value_fix_lengths; + tombstone _row_tombstone; + column_flags_m _column_flags{0}; api::timestamp_type _column_timestamp; gc_clock::time_point _column_local_deletion_time; @@ -735,6 +738,7 @@ public: case state::FLAGS: flags_label: _liveness.reset(); + _row_tombstone = {}; if (read_8(data) != read_status::ready) { _state = state::FLAGS_2; break; @@ -877,23 +881,28 @@ public: case state::ROW_BODY_DELETION: row_body_deletion_label: if (!_flags.has_deletion()) { - _state = state::ROW_BODY_MISSING_COLUMNS; - goto row_body_missing_columns_label; + _state = state::ROW_BODY_MARKER; + goto row_body_marker_label; } if (read_unsigned_vint(data) != read_status::ready) { _state = state::ROW_BODY_DELETION_2; break; } case state::ROW_BODY_DELETION_2: - // TODO consume mark_for_deleted_at + _row_tombstone.timestamp = parse_timestamp(_header, _u64); if (read_unsigned_vint(data) != read_status::ready) { _state = state::ROW_BODY_DELETION_3; break; } case state::ROW_BODY_DELETION_3: - // TODO consume local_deletion_time + _row_tombstone.deletion_time = parse_expiry(_header, _u64); + case state::ROW_BODY_MARKER: + row_body_marker_label: + if (_consumer.consume_row_marker_and_tombstone(_liveness, std::move(_row_tombstone)) == consumer_m::proceed::no) { + _state = state::ROW_BODY_MISSING_COLUMNS; + break; + } case state::ROW_BODY_MISSING_COLUMNS: - row_body_missing_columns_label: if (!_flags.has_all_columns()) { if (read_unsigned_vint(data) != read_status::ready) { _state = state::ROW_BODY_MISSING_COLUMNS_2;