mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
Merge "Consume row marker correctly" from Piotr
"
Make sure we properly handle row marker and row tombstone
when reading a row.
Tests: unit {release}
"
* 'haaawk/sstables3/read-liveness-info-v4' of ssh://github.com/scylladb/seastar-dev:
sstable: consume row marker in data_consume_rows_context_m
sstable: Add consumer_m::consume_row_marker_and_tombstone
sstable: add is_set and to_row_marker to liveness_info
This commit is contained in:
@@ -26,6 +26,7 @@
|
||||
#include "gc_clock.hh"
|
||||
#include "sstables/m_format_read_helpers.hh"
|
||||
#include "sstables/types.hh"
|
||||
#include "mutation_partition.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -34,6 +35,11 @@ class liveness_info {
|
||||
api::timestamp_type _timestamp;
|
||||
gc_clock::duration _ttl;
|
||||
gc_clock::time_point _local_deletion_time;
|
||||
bool is_set() const {
|
||||
return _timestamp != api::missing_timestamp
|
||||
|| _ttl != gc_clock::duration::zero()
|
||||
|| _local_deletion_time != gc_clock::time_point::max();
|
||||
}
|
||||
public:
|
||||
explicit liveness_info(const serialization_header& header)
|
||||
: _header(header)
|
||||
@@ -55,6 +61,14 @@ public:
|
||||
api::timestamp_type timestamp() const { return _timestamp; }
|
||||
gc_clock::duration ttl() const { return _ttl; }
|
||||
gc_clock::time_point local_deletion_time() const { return _local_deletion_time; }
|
||||
row_marker to_row_marker() const {
|
||||
if (!is_set()) {
|
||||
return row_marker();
|
||||
}
|
||||
return _ttl != gc_clock::duration::zero() || _local_deletion_time != gc_clock::time_point::max()
|
||||
? row_marker(_timestamp, _ttl, _local_deletion_time)
|
||||
: row_marker(_timestamp);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "row.hh"
|
||||
#include "clustering_ranges_walker.hh"
|
||||
#include "utils/data_input.hh"
|
||||
#include "liveness_info.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -931,6 +932,12 @@ public:
|
||||
return proceed::yes;
|
||||
}
|
||||
|
||||
virtual proceed consume_row_marker_and_tombstone(const liveness_info& info, tombstone t) override {
|
||||
_in_progress_row.apply(t);
|
||||
_in_progress_row.apply(info.to_row_marker());
|
||||
return proceed::yes;
|
||||
}
|
||||
|
||||
virtual proceed consume_static_row_start() override {
|
||||
_inside_static_row = true;
|
||||
_in_progress_static_row = static_row();
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
#include "column_translation.hh"
|
||||
|
||||
#include "sstables.hh"
|
||||
#include "tombstone.hh"
|
||||
|
||||
// sstables::data_consume_row feeds the contents of a single row into a
|
||||
// row_consumer object:
|
||||
@@ -154,6 +155,8 @@ public:
|
||||
|
||||
virtual proceed consume_row_start(const std::vector<temporary_buffer<char>>& ecp) = 0;
|
||||
|
||||
virtual proceed consume_row_marker_and_tombstone(const sstables::liveness_info& info, tombstone t) = 0;
|
||||
|
||||
virtual proceed consume_static_row_start() = 0;
|
||||
|
||||
virtual proceed consume_column(stdx::optional<column_id> column_id,
|
||||
@@ -551,6 +554,7 @@ private:
|
||||
ROW_BODY_DELETION,
|
||||
ROW_BODY_DELETION_2,
|
||||
ROW_BODY_DELETION_3,
|
||||
ROW_BODY_MARKER,
|
||||
ROW_BODY_MISSING_COLUMNS,
|
||||
ROW_BODY_MISSING_COLUMNS_2,
|
||||
ROW_BODY_MISSING_COLUMNS_READ_COLUMNS,
|
||||
@@ -598,6 +602,8 @@ private:
|
||||
|
||||
boost::iterator_range<std::vector<std::optional<uint32_t>>::const_iterator> _ck_column_value_fix_lengths;
|
||||
|
||||
tombstone _row_tombstone;
|
||||
|
||||
column_flags_m _column_flags{0};
|
||||
api::timestamp_type _column_timestamp;
|
||||
gc_clock::time_point _column_local_deletion_time;
|
||||
@@ -732,6 +738,7 @@ public:
|
||||
case state::FLAGS:
|
||||
flags_label:
|
||||
_liveness.reset();
|
||||
_row_tombstone = {};
|
||||
if (read_8(data) != read_status::ready) {
|
||||
_state = state::FLAGS_2;
|
||||
break;
|
||||
@@ -874,23 +881,28 @@ public:
|
||||
case state::ROW_BODY_DELETION:
|
||||
row_body_deletion_label:
|
||||
if (!_flags.has_deletion()) {
|
||||
_state = state::ROW_BODY_MISSING_COLUMNS;
|
||||
goto row_body_missing_columns_label;
|
||||
_state = state::ROW_BODY_MARKER;
|
||||
goto row_body_marker_label;
|
||||
}
|
||||
if (read_unsigned_vint(data) != read_status::ready) {
|
||||
_state = state::ROW_BODY_DELETION_2;
|
||||
break;
|
||||
}
|
||||
case state::ROW_BODY_DELETION_2:
|
||||
// TODO consume mark_for_deleted_at
|
||||
_row_tombstone.timestamp = parse_timestamp(_header, _u64);
|
||||
if (read_unsigned_vint(data) != read_status::ready) {
|
||||
_state = state::ROW_BODY_DELETION_3;
|
||||
break;
|
||||
}
|
||||
case state::ROW_BODY_DELETION_3:
|
||||
// TODO consume local_deletion_time
|
||||
_row_tombstone.deletion_time = parse_expiry(_header, _u64);
|
||||
case state::ROW_BODY_MARKER:
|
||||
row_body_marker_label:
|
||||
if (_consumer.consume_row_marker_and_tombstone(_liveness, std::move(_row_tombstone)) == consumer_m::proceed::no) {
|
||||
_state = state::ROW_BODY_MISSING_COLUMNS;
|
||||
break;
|
||||
}
|
||||
case state::ROW_BODY_MISSING_COLUMNS:
|
||||
row_body_missing_columns_label:
|
||||
if (!_flags.has_all_columns()) {
|
||||
if (read_unsigned_vint(data) != read_status::ready) {
|
||||
_state = state::ROW_BODY_MISSING_COLUMNS_2;
|
||||
|
||||
Reference in New Issue
Block a user