From 21a0e95a061642acc82c470589a42d5bb6faaaae Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 6 Jun 2018 15:44:06 +0200 Subject: [PATCH 01/11] Implement read_unsigned_vint_length_bytes It's a common operation that's used in multiple places so it's best to have it implemented once. Signed-off-by: Piotr Jastrzebski --- sstables/consumer.hh | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/sstables/consumer.hh b/sstables/consumer.hh index 824b1d2244..a41f80f123 100644 --- a/sstables/consumer.hh +++ b/sstables/consumer.hh @@ -73,7 +73,9 @@ protected: READING_BYTES, READING_U16_BYTES, READING_UNSIGNED_VINT, + READING_UNSIGNED_VINT_LENGTH_BYTES, READING_UNSIGNED_VINT_WITH_LEN, + READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN, } _prestate = prestate::NONE; // state for non-NONE prestates @@ -188,6 +190,29 @@ protected: } } } + inline read_status read_unsigned_vint_length_bytes(temporary_buffer& data, temporary_buffer& where) { + if (data.empty()) { + _prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES; + _read_bytes_where = &where; + return read_status::waiting; + } else { + const vint_size_type len = unsigned_vint::serialized_size_from_first_byte(*data.begin()); + if (data.size() >= len) { + _u64 = unsigned_vint::deserialize( + bytes_view(reinterpret_cast(data.get_write()), len)).value; + data.trim_front(len); + return read_bytes(data, static_cast(_u64), where); + } else { + _read_bytes = temporary_buffer(len); + std::copy(data.begin(), data.end(), _read_bytes.get_write()); + _pos = data.size(); + data.trim(0); + _read_bytes_where = &where; + _prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN; + return read_status::waiting; + } + } + } inline void process_buffer(temporary_buffer& data) { if (__builtin_expect((_prestate != prestate::NONE), 0)) { @@ -205,6 +230,10 @@ private: if (read_unsigned_vint(data) == read_status::ready) { _prestate = prestate::NONE; } + } else if (_prestate == prestate::READING_UNSIGNED_VINT_LENGTH_BYTES) { + if (read_unsigned_vint_length_bytes(data, *_read_bytes_where) == read_status::ready) { + _prestate = prestate::NONE; + } } else if (_prestate == prestate::READING_UNSIGNED_VINT_WITH_LEN) { const auto n = std::min(_read_bytes.size() - _pos, data.size()); std::copy_n(data.begin(), n, _read_bytes.get_write() + _pos); @@ -215,6 +244,18 @@ private: bytes_view(reinterpret_cast(_read_bytes.get_write()), _read_bytes.size())).value; _prestate = prestate::NONE; } + } else if (_prestate == prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN) { + const auto n = std::min(_read_bytes.size() - _pos, data.size()); + std::copy_n(data.begin(), n, _read_bytes.get_write() + _pos); + data.trim_front(n); + _pos += n; + if (_pos == _read_bytes.size()) { + _u64 = unsigned_vint::deserialize( + bytes_view(reinterpret_cast(_read_bytes.get_write()), _read_bytes.size())).value; + if (read_bytes(data, _u64, *_read_bytes_where) == read_status::ready) { + _prestate = prestate::NONE; + } + } } else if (_prestate == prestate::READING_BYTES) { auto n = std::min(_read_bytes.size() - _pos, data.size()); std::copy(data.begin(), data.begin() + n, From 3b8b16505318ecc7dbc0d9f162639d8da2fdc7a1 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 6 Jun 2018 15:44:53 +0200 Subject: [PATCH 02/11] Use read_unsigned_vint_length_bytes for CK_BLOCKS Signed-off-by: Piotr Jastrzebski --- sstables/row.hh | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/sstables/row.hh b/sstables/row.hh index 09ae3bef57..604dec289a 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -529,8 +529,6 @@ private: CK_BLOCK, CK_BLOCK_HEADER, CK_BLOCK2, - CK_BLOCK_VALUE_LENGTH, - CK_BLOCK_VALUE_BYTES, CK_BLOCK_END, CLUSTERING_ROW_CONSUME, ROW_BODY, @@ -655,7 +653,6 @@ public: || _state == state::EXTENDED_FLAGS || _state == state::CLUSTERING_ROW || _state == state::CK_BLOCK_HEADER - || _state == state::CK_BLOCK_VALUE_LENGTH || _state == state::CK_BLOCK_END || _state == state::CLUSTERING_ROW_CONSUME || _state == state::ROW_BODY_TIMESTAMP_DELTIME @@ -767,31 +764,23 @@ public: case state::CK_BLOCK_HEADER: _ck_blocks_header = _u64; case state::CK_BLOCK2: - ck_block2_label: + ck_block2_label: { if (is_block_empty()) { _row_key.push_back({}); move_to_next_ck_block(); goto ck_block_label; } + read_status status = read_status::waiting; if (auto len = get_ck_block_value_length()) { - _column_value_length = *len; - _column_value = temporary_buffer(_column_value_length); - _state = state::CK_BLOCK_VALUE_BYTES; - goto ck_block_value_bytes_label; + status = read_bytes(data, *len, _column_value); + } else { + status = read_unsigned_vint_length_bytes(data, _column_value); } - if (read_unsigned_vint(data) != read_status::ready) { - _state = state::CK_BLOCK_VALUE_LENGTH; - break; - } - case state::CK_BLOCK_VALUE_LENGTH: - _column_value_length = static_cast(_u64); - _column_value = temporary_buffer(_column_value_length); - case state::CK_BLOCK_VALUE_BYTES: - ck_block_value_bytes_label: - if (read_bytes(data, _column_value_length, _column_value) != read_status::ready) { + if (status != read_status::ready) { _state = state::CK_BLOCK_END; break; } + } case state::CK_BLOCK_END: _row_key.push_back(std::move(_column_value)); move_to_next_ck_block(); From f7a1d5a437b6355ac57a4b25f203e4c60114b244 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 6 Jun 2018 15:54:17 +0200 Subject: [PATCH 03/11] Use read_unsigned_vint_length_bytes for COLUMN_VALUE Signed-off-by: Piotr Jastrzebski --- sstables/row.hh | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/sstables/row.hh b/sstables/row.hh index 604dec289a..d0ad25b314 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -555,8 +555,6 @@ private: COLUMN_TTL, COLUMN_TTL_2, COLUMN_VALUE, - COLUMN_VALUE_LENGTH, - COLUMN_VALUE_BYTES, COLUMN_END, RANGE_TOMBSTONE_MARKER, } _state = state::PARTITION_START; @@ -664,7 +662,6 @@ public: || _state == state::COLUMN_TIMESTAMP || _state == state::COLUMN_DELETION_TIME_2 || _state == state::COLUMN_TTL_2 - || _state == state::COLUMN_VALUE_LENGTH || _state == state::COLUMN_END) && (_prestate == prestate::NONE); } @@ -934,30 +931,23 @@ public: _column_ttl = parse_ttl(_header, _u64); case state::COLUMN_VALUE: column_value_label: + { if (!_column_flags.has_value()) { _column_value = temporary_buffer(0); _state = state::COLUMN_END; goto column_end_label; } + read_status status = read_status::waiting; if (auto len = get_column_value_length()) { - _column_value_length = *len; - _column_value = temporary_buffer(_column_value_length); - _state = state::COLUMN_VALUE_BYTES; - goto column_value_bytes_label; + status = read_bytes(data, *len, _column_value); + } else { + status = read_unsigned_vint_length_bytes(data, _column_value); } - if (read_unsigned_vint(data) != read_status::ready) { - _state = state::COLUMN_VALUE_LENGTH; - break; - } - case state::COLUMN_VALUE_LENGTH: - _column_value_length = static_cast(_u64); - _column_value = temporary_buffer(_column_value_length); - case state::COLUMN_VALUE_BYTES: - column_value_bytes_label: - if (read_bytes(data, _column_value_length, _column_value) != read_status::ready) { + if (status != read_status::ready) { _state = state::COLUMN_END; break; } + } case state::COLUMN_END: column_end_label: _state = state::NEXT_COLUMN; From 2b8ff15f9f7af8637b9959cea57b1cc1acacf2f3 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Thu, 7 Jun 2018 22:47:19 +0200 Subject: [PATCH 04/11] column_flags_m: add HAS_COMPLEX_DELETION Signed-off-by: Piotr Jastrzebski --- sstables/types.hh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sstables/types.hh b/sstables/types.hh index 57377cdc2e..f25aded4f1 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -563,7 +563,8 @@ class unfiltered_flags_m final { static const uint8_t HAS_TIMESTAMP = 0x04u; static const uint8_t HAS_TTL = 0x08u; static const uint8_t HAS_DELETION = 0x10u; - static const uint8_t HAS_ALL_COLUMNS = 0x20; + static const uint8_t HAS_ALL_COLUMNS = 0x20u; + static const uint8_t HAS_COMPLEX_DELETION = 0x40u; static const uint8_t HAS_EXTENDED_FLAGS = 0x80u; uint8_t _flags; bool check_flag(const uint8_t flag) const { @@ -592,6 +593,9 @@ public: bool has_all_columns() const { return check_flag(HAS_ALL_COLUMNS); } + bool has_complex_deletion() const { + return check_flag(HAS_COMPLEX_DELETION); + } }; class unfiltered_extended_flags_m final { From 7bb25a2dd9a16f48fe4fb03761ca710adc8d7b15 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Thu, 7 Jun 2018 22:48:43 +0200 Subject: [PATCH 05/11] column_translation: add *_column_is_collection() Signed-off-by: Piotr Jastrzebski --- sstables/column_translation.hh | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/sstables/column_translation.hh b/sstables/column_translation.hh index 28147c5536..292c27c7e7 100644 --- a/sstables/column_translation.hh +++ b/sstables/column_translation.hh @@ -39,21 +39,25 @@ class column_translation { struct state { - static std::pair>, std::vector>> build( - const schema& s, - const utils::chunked_vector& src, - bool is_static) { + static std::tuple>, + std::vector>, + std::vector> build( + const schema& s, + const utils::chunked_vector& src, + bool is_static) { std::vector> ids; std::vector> lens; + std::vector is_collection; if (s.is_dense()) { if (is_static) { ids.push_back(s.static_begin()->id); lens.push_back(s.static_begin()->type->value_length_if_fixed()); + is_collection.push_back(s.static_begin()->is_multi_cell()); } else { ids.push_back(s.regular_begin()->id); lens.push_back(s.regular_begin()->type->value_length_if_fixed()); + is_collection.push_back(s.regular_begin()->is_multi_cell()); } - } else { ids.reserve(src.size()); lens.reserve(src.size()); @@ -62,13 +66,15 @@ class column_translation { if (def) { ids.push_back(def->id); lens.push_back(def->type->value_length_if_fixed()); + is_collection.push_back(def->is_multi_cell()); } else { ids.push_back(stdx::nullopt); lens.push_back(std::nullopt); + is_collection.push_back(false); } } } - return std::make_pair(std::move(ids), std::move(lens)); + return std::make_tuple(std::move(ids), std::move(lens), std::move(is_collection)); } utils::UUID schema_uuid; @@ -77,6 +83,8 @@ class column_translation { std::vector> regular_column_value_fix_lengths; std::vector> static_column_value_fix_lengths; std::vector> clustering_column_value_fix_lengths; + std::vector static_column_is_collection; + std::vector regular_column_is_collection; state() = default; state(const state&) = delete; @@ -87,9 +95,13 @@ class column_translation { state(const schema& s, const serialization_header& header) : schema_uuid(s.version()) { - std::tie(regular_schema_column_id_from_sstable, regular_column_value_fix_lengths) = + std::tie(regular_schema_column_id_from_sstable, + regular_column_value_fix_lengths, + regular_column_is_collection) = build(s, header.regular_columns.elements, false); - std::tie(static_schema_column_id_from_sstable, static_column_value_fix_lengths) = + std::tie(static_schema_column_id_from_sstable, + static_column_value_fix_lengths, + static_column_is_collection) = build(s, header.static_columns.elements, true); clustering_column_value_fix_lengths.reserve(header.clustering_key_types_names.elements.size()); for (auto&& t : header.clustering_key_types_names.elements) { @@ -124,6 +136,12 @@ public: const std::vector>& clustering_column_value_fix_legths() const { return _state->clustering_column_value_fix_lengths; } + const std::vector& static_column_is_collection() const { + return _state->static_column_is_collection; + } + const std::vector& regular_column_is_collection() const { + return _state->regular_column_is_collection; + } }; }; // namespace sstables From 5e1dd89d4d92a738ff33f96cd0949b835c9a694f Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Thu, 7 Jun 2018 22:50:23 +0200 Subject: [PATCH 06/11] Use column_translation::*_is_collection in reading Signed-off-by: Piotr Jastrzebski --- sstables/row.hh | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/sstables/row.hh b/sstables/row.hh index d0ad25b314..26586a7aba 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -574,6 +574,7 @@ private: boost::iterator_range>::const_iterator> _column_ids; boost::iterator_range>::const_iterator> _column_value_fix_lengths; + boost::iterator_range::const_iterator> _column_is_collection; boost::dynamic_bitset _columns_selector; uint64_t _missing_columns_to_read; @@ -589,9 +590,11 @@ private: uint32_t _ck_blocks_header_offset; void setup_columns(const std::vector>& column_ids, - const std::vector>& column_value_fix_lengths) { + const std::vector>& column_value_fix_lengths, + const std::vector& column_is_collection) { _column_ids = boost::make_iterator_range(column_ids); _column_value_fix_lengths = boost::make_iterator_range(column_value_fix_lengths); + _column_is_collection = boost::make_iterator_range(column_is_collection); } bool is_current_column_present() { return _columns_selector.test(_columns_selector.size() - _column_ids.size()); @@ -603,6 +606,7 @@ private: } _column_ids.advance_begin(pos); _column_value_fix_lengths.advance_begin(pos); + _column_is_collection.advance_begin(pos); } bool no_more_columns() { return _column_ids.empty(); } void move_to_next_column() { @@ -612,8 +616,9 @@ private: : next_pos - current_pos; _column_ids.advance_begin(jump_to_next); _column_value_fix_lengths.advance_begin(jump_to_next); + _column_is_collection.advance_begin(jump_to_next); } - bool is_column_simple() { return true; } + bool is_column_simple() { return !_column_is_collection.front(); } stdx::optional get_column_id() { return _column_ids.front(); } @@ -719,7 +724,8 @@ public: _extended_flags = unfiltered_extended_flags_m(uint8_t{0u}); _state = state::CLUSTERING_ROW; setup_columns(_column_translation.regular_columns(), - _column_translation.regular_column_value_fix_legths()); + _column_translation.regular_column_value_fix_legths(), + _column_translation.regular_column_is_collection()); goto clustering_row_label; } if (read_8(data) != read_status::ready) { @@ -731,7 +737,8 @@ public: if (_extended_flags.is_static()) { if (_is_first_unfiltered) { setup_columns(_column_translation.static_columns(), - _column_translation.static_column_value_fix_legths()); + _column_translation.static_column_value_fix_legths(), + _column_translation.static_column_is_collection()); _is_first_unfiltered = false; _consumer.consume_static_row_start(); goto row_body_label; @@ -740,7 +747,8 @@ public: } } setup_columns(_column_translation.regular_columns(), - _column_translation.regular_column_value_fix_legths()); + _column_translation.regular_column_value_fix_legths(), + _column_translation.regular_column_is_collection()); case state::CLUSTERING_ROW: clustering_row_label: _is_first_unfiltered = false; From ffb6b9ed24f86e9e1c085fa25443baa85bb69639 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Thu, 7 Jun 2018 23:30:40 +0200 Subject: [PATCH 07/11] data_consume_rows_context_m: introduce cell_path Signed-off-by: Piotr Jastrzebski --- sstables/row.hh | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/sstables/row.hh b/sstables/row.hh index 26586a7aba..e7c4cfba76 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -554,6 +554,8 @@ private: COLUMN_DELETION_TIME_2, COLUMN_TTL, COLUMN_TTL_2, + COLUMN_CELL_PATH, + CELL_PATH_SIZE, COLUMN_VALUE, COLUMN_END, RANGE_TOMBSTONE_MARKER, @@ -586,6 +588,7 @@ private: gc_clock::duration _column_ttl; uint32_t _column_value_length; temporary_buffer _column_value; + temporary_buffer _cell_path; uint64_t _ck_blocks_header; uint32_t _ck_blocks_header_offset; @@ -925,11 +928,11 @@ public: if (_column_flags.use_row_timestamp()) { _column_ttl = _liveness.ttl(); _state = state::COLUMN_VALUE; - goto column_value_label; + goto column_cell_path_label; } else if (!_column_flags.is_expiring()) { _column_ttl = gc_clock::duration::zero(); _state = state::COLUMN_VALUE; - goto column_value_label; + goto column_cell_path_label; } if (read_unsigned_vint(data) != read_status::ready) { _state = state::COLUMN_TTL_2; @@ -937,8 +940,17 @@ public: } case state::COLUMN_TTL_2: _column_ttl = parse_ttl(_header, _u64); + case state::COLUMN_CELL_PATH: + column_cell_path_label: + if (!is_column_simple()) { + if (read_unsigned_vint_length_bytes(data, _cell_path) != read_status::ready) { + _state = state::COLUMN_VALUE; + break; + } + } else { + _cell_path = temporary_buffer(0); + } case state::COLUMN_VALUE: - column_value_label: { if (!_column_flags.has_value()) { _column_value = temporary_buffer(0); From fd89f42b09ce97e8264f7543875142e905e92108 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Thu, 7 Jun 2018 23:35:12 +0200 Subject: [PATCH 08/11] mp_row_consumer_m: Add support for collections Signed-off-by: Piotr Jastrzebski --- sstables/mp_row_consumer.hh | 44 +++++++++++++++++++++++++++++++++---- sstables/row.hh | 7 ++++++ 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/sstables/mp_row_consumer.hh b/sstables/mp_row_consumer.hh index 92cd374bba..6d190153a4 100644 --- a/sstables/mp_row_consumer.hh +++ b/sstables/mp_row_consumer.hh @@ -838,6 +838,7 @@ class mp_row_consumer_m : public consumer_m { atomic_cell_or_collection val; }; std::vector _cells; + collection_type_impl::mutation _cm; void set_up_ck_ranges(const partition_key& pk) { _ck_ranges = query::clustering_key_filter_ranges::get_ranges(*_schema, _slice, pk); @@ -846,6 +847,10 @@ class mp_row_consumer_m : public consumer_m { _out_of_range = false; } + const column_definition& get_column_definition(stdx::optional column_id) { + auto column_type = _inside_static_row ? column_kind::static_column : column_kind::regular_column; + return _schema->column_at(column_type, *column_id); + } public: mp_row_consumer_m(mp_row_consumer_reader* reader, const schema_ptr schema, @@ -959,6 +964,7 @@ public: } virtual proceed consume_column(stdx::optional column_id, + bytes_view cell_path, bytes_view value, api::timestamp_type timestamp, gc_clock::duration ttl, @@ -966,13 +972,43 @@ public: if (!column_id) { return proceed::yes; } - auto column_type = _inside_static_row ? column_kind::static_column : column_kind::regular_column; - const column_definition& column_def = _schema->column_at(column_type, *column_id); + const column_definition& column_def = get_column_definition(column_id); if (timestamp <= column_def.dropped_at()) { return proceed::yes; } - auto ac = make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time, atomic_cell::collection_member::no); - _cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))}); + if (column_def.is_multi_cell()) { + auto ctype = static_pointer_cast(column_def.type); + auto ac = make_atomic_cell(*ctype->value_comparator(), + timestamp, + value, + ttl, + local_deletion_time, + atomic_cell::collection_member::yes); + _cm.cells.emplace_back(to_bytes(cell_path), std::move(ac)); + } else { + auto ac = make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time, + atomic_cell::collection_member::no); + _cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))}); + } + return proceed::yes; + } + + virtual proceed consume_complex_column_start(stdx::optional column_id, + tombstone tomb) override { + _cm.tomb = tomb; + _cm.cells.clear(); + return proceed::yes; + } + + virtual proceed consume_complex_column_end(stdx::optional column_id) override { + if (column_id) { + const column_definition& column_def = get_column_definition(column_id); + auto ctype = static_pointer_cast(column_def.type); + auto ac = atomic_cell_or_collection::from_collection_mutation(ctype->serialize_mutation_form(_cm)); + _cells.push_back({column_def.id, atomic_cell_or_collection(std::move(ac))}); + _cm.tomb = {}; + _cm.cells.clear(); + } return proceed::yes; } diff --git a/sstables/row.hh b/sstables/row.hh index e7c4cfba76..0dc40507fe 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -155,11 +155,17 @@ public: virtual proceed consume_static_row_start() = 0; virtual proceed consume_column(stdx::optional column_id, + bytes_view cell_path, bytes_view value, api::timestamp_type timestamp, gc_clock::duration ttl, gc_clock::time_point local_deletion_time) = 0; + virtual proceed consume_complex_column_start(stdx::optional column_id, + tombstone tomb) = 0; + + virtual proceed consume_complex_column_end(stdx::optional column_id) = 0; + virtual proceed consume_row_end(const sstables::liveness_info&) = 0; // Called when the reader is fast forwarded to given element. @@ -972,6 +978,7 @@ public: column_end_label: _state = state::NEXT_COLUMN; if (_consumer.consume_column(get_column_id(), + to_bytes_view(_cell_path), to_bytes_view(_column_value), _column_timestamp, _column_ttl, From f9c62b81880eb97ce35544a32b695bea95266653 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Thu, 7 Jun 2018 23:39:07 +0200 Subject: [PATCH 09/11] data_consume_rows_context_m: add support for collections Signed-off-by: Piotr Jastrzebski --- sstables/row.hh | 75 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 12 deletions(-) diff --git a/sstables/row.hh b/sstables/row.hh index 0dc40507fe..5948445121 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -553,6 +553,11 @@ private: COLUMN, SIMPLE_COLUMN, COMPLEX_COLUMN, + COMPLEX_COLUMN_MARKED_FOR_DELETE, + COMPLEX_COLUMN_LOCAL_DELETION_TIME, + COMPLEX_COLUMN_2, + COMPLEX_COLUMN_SIZE, + COMPLEX_COLUMN_SIZE_2, NEXT_COLUMN, COLUMN_FLAGS, COLUMN_TIMESTAMP, @@ -597,6 +602,9 @@ private: temporary_buffer _cell_path; uint64_t _ck_blocks_header; uint32_t _ck_blocks_header_offset; + uint64_t _subcolumns_to_read = 0; + api::timestamp_type _complex_column_marked_for_delete; + tombstone _complex_column_tombstone; void setup_columns(const std::vector>& column_ids, const std::vector>& column_value_fix_lengths, @@ -882,16 +890,19 @@ public: } case state::COLUMN: column_label: - if (no_more_columns()) { - _state = state::FLAGS; - if (_consumer.consume_row_end(_liveness) == consumer_m::proceed::no) { - return consumer_m::proceed::no; + if (_subcolumns_to_read == 0) { + if (no_more_columns()) { + _state = state::FLAGS; + if (_consumer.consume_row_end(_liveness) == consumer_m::proceed::no) { + return consumer_m::proceed::no; + } + goto flags_label; } - goto flags_label; - } - if (!is_column_simple()) { - _state = state::COMPLEX_COLUMN; - goto complex_column_label; + if (!is_column_simple()) { + _state = state::COMPLEX_COLUMN; + goto complex_column_label; + } + _subcolumns_to_read = 0; } case state::SIMPLE_COLUMN: if (read_8(data) != read_status::ready) { @@ -986,8 +997,19 @@ public: return consumer_m::proceed::no; } case state::NEXT_COLUMN: - move_to_next_column(); - _state = state::COLUMN; + if (!is_column_simple()) { + --_subcolumns_to_read; + if (_subcolumns_to_read == 0) { + auto id = get_column_id(); + move_to_next_column(); + if (_consumer.consume_complex_column_end(std::move(id)) != consumer_m::proceed::yes) { + _state = state::COLUMN; + return consumer_m::proceed::no; + } + } + } else { + move_to_next_column(); + } goto column_label; case state::ROW_BODY_MISSING_COLUMNS_2: row_body_missing_columns_2_label: { @@ -1026,7 +1048,36 @@ public: goto row_body_missing_columns_read_columns_label; case state::COMPLEX_COLUMN: complex_column_label: - throw malformed_sstable_exception("unimplemented state: complex columns not supported"); + if (!_flags.has_complex_deletion()) { + _complex_column_tombstone = {}; + goto complex_column_2_label; + } + if (read_unsigned_vint(data) != read_status::ready) { + _state = state::COMPLEX_COLUMN_MARKED_FOR_DELETE; + break; + } + case state::COMPLEX_COLUMN_MARKED_FOR_DELETE: + _complex_column_marked_for_delete = parse_timestamp(_header, _u64); + if (read_unsigned_vint(data) != read_status::ready) { + _state = state::COMPLEX_COLUMN_LOCAL_DELETION_TIME; + break; + } + case state::COMPLEX_COLUMN_LOCAL_DELETION_TIME: + _complex_column_tombstone = {_complex_column_marked_for_delete, parse_expiry(_header, _u64)}; + case state::COMPLEX_COLUMN_2: + complex_column_2_label: + if (_consumer.consume_complex_column_start(get_column_id(), _complex_column_tombstone) == consumer_m::proceed::no) { + _state = state::COMPLEX_COLUMN_SIZE; + return consumer_m::proceed::no; + } + case state::COMPLEX_COLUMN_SIZE: + if (read_unsigned_vint(data) != read_status::ready) { + _state = state::COMPLEX_COLUMN_SIZE_2; + break; + } + case state::COMPLEX_COLUMN_SIZE_2: + _subcolumns_to_read = _u64; + goto column_label; case state::RANGE_TOMBSTONE_MARKER: range_tombstone_marker_label: throw malformed_sstable_exception("unimplemented state"); From 176305c2f2f94cc9c52558d049e0177708ae169d Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Thu, 7 Jun 2018 23:39:51 +0200 Subject: [PATCH 10/11] flat_mutation_reader_assertions: add more flexible asserts Signed-off-by: Piotr Jastrzebski --- tests/flat_mutation_reader_assertions.hh | 36 +++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/tests/flat_mutation_reader_assertions.hh b/tests/flat_mutation_reader_assertions.hh index 0f3dc63628..81d9f5c21a 100644 --- a/tests/flat_mutation_reader_assertions.hh +++ b/tests/flat_mutation_reader_assertions.hh @@ -25,6 +25,7 @@ #include #include "flat_mutation_reader.hh" #include "mutation_assertions.hh" +#include "schema.hh" // Intended to be called in a seastar thread class flat_reader_assertions { @@ -150,17 +151,50 @@ public: BOOST_FAIL(sprint("Expected row with column %s, but it is not present", columns[i].name)); } auto& cdef = _reader.schema()->regular_column_at(columns[i].id); + assert (!cdef.is_multi_cell()); auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value().linearize()); if (cmp != 0) { BOOST_FAIL(sprint("Expected row with column %s having value %s, but it has value %s", columns[i].name, columns[i].value, - cell->as_atomic_cell(cdef).value())); + cell->as_atomic_cell(cdef).value().linearize())); } } return *this; } + using assert_function = noncopyable_function; + + flat_reader_assertions& produces_row(const clustering_key& ck, + const std::vector& column_ids, + const std::vector& column_assert) { + BOOST_TEST_MESSAGE(sprint("Expect %s", ck)); + auto mfopt = read_next(); + if (!mfopt) { + BOOST_FAIL(sprint("Expected row with key %s, but got end of stream", ck)); + } + if (!mfopt->is_clustering_row()) { + BOOST_FAIL(sprint("Expected row with key %s, but got %s", ck, *mfopt)); + } + auto& actual = mfopt->as_clustering_row().key(); + if (!actual.equal(*_reader.schema(), ck)) { + BOOST_FAIL(sprint("Expected row with key %s, but key is %s", ck, actual)); + } + auto& cells = mfopt->as_clustering_row().cells(); + if (cells.size() != column_ids.size()) { + BOOST_FAIL(sprint("Expected row with %s columns, but has %s", column_ids.size(), cells.size())); + } + for (size_t i = 0; i < column_ids.size(); ++i) { + const atomic_cell_or_collection* cell = cells.find_cell(column_ids[i]); + if (!cell) { + BOOST_FAIL(sprint("Expected row with column %d, but it is not present", column_ids[i])); + } + auto& cdef = _reader.schema()->regular_column_at(column_ids[i]); + column_assert[i](cdef, cell); + } + return *this; + } + // If ck_ranges is passed, verifies only that information relevant for ck_ranges matches. flat_reader_assertions& produces_range_tombstone(const range_tombstone& rt, const query::clustering_row_ranges& ck_ranges = {}) { BOOST_TEST_MESSAGE(sprint("Expect %s", rt)); From 7d3abb0668fcc43c370726e3b9268ac5fb0859cc Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Thu, 7 Jun 2018 23:40:10 +0200 Subject: [PATCH 11/11] sstables 3: Add tests for reading collections Signed-off-by: Piotr Jastrzebski --- tests/sstable_3_x_test.cc | 143 ++++++++++++++++++ .../uncompressed/collections/mc-1-big-CRC.db | Bin 0 -> 8 bytes .../uncompressed/collections/mc-1-big-Data.db | Bin 0 -> 885 bytes .../collections/mc-1-big-Digest.crc32 | 1 + .../collections/mc-1-big-Filter.db | Bin 0 -> 24 bytes .../collections/mc-1-big-Index.db | Bin 0 -> 44 bytes .../collections/mc-1-big-Statistics.db | Bin 0 -> 4926 bytes .../collections/mc-1-big-Summary.db | Bin 0 -> 56 bytes .../uncompressed/collections/mc-1-big-TOC.txt | 8 + 9 files changed, 152 insertions(+) create mode 100644 tests/sstables/3.x/uncompressed/collections/mc-1-big-CRC.db create mode 100644 tests/sstables/3.x/uncompressed/collections/mc-1-big-Data.db create mode 100644 tests/sstables/3.x/uncompressed/collections/mc-1-big-Digest.crc32 create mode 100644 tests/sstables/3.x/uncompressed/collections/mc-1-big-Filter.db create mode 100644 tests/sstables/3.x/uncompressed/collections/mc-1-big-Index.db create mode 100644 tests/sstables/3.x/uncompressed/collections/mc-1-big-Statistics.db create mode 100644 tests/sstables/3.x/uncompressed/collections/mc-1-big-Summary.db create mode 100644 tests/sstables/3.x/uncompressed/collections/mc-1-big-TOC.txt diff --git a/tests/sstable_3_x_test.cc b/tests/sstable_3_x_test.cc index df5811326f..f5e7960c3a 100644 --- a/tests/sstable_3_x_test.cc +++ b/tests/sstable_3_x_test.cc @@ -1306,6 +1306,149 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_compound_ck_read) { .produces_end_of_stream(); } +// Following tests run on files in tests/sstables/3.x/uncompressed/collections +// They were created using following CQL statements: +// +// CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; +// +// CREATE TABLE test_ks.test_table ( pk INT, set_val set, list_val list, map_val map, PRIMARY KEY(pk)) +// WITH compression = { 'enabled' : false }; +// +// INSERT INTO test_ks.test_table(pk, set_val, list_val, map_val) +// VALUES(1, {1, 2, 3}, ['Text 1', 'Text 2', 'Text 3'], {1 : 'Text 1', 2 : 'Text 2', 3 : 'Text 3'}); +// INSERT INTO test_ks.test_table(pk, set_val, list_val, map_val) +// VALUES(2, {4, 5, 6}, ['Text 4', 'Text 5', 'Text 6'], {4 : 'Text 4', 5 : 'Text 5', 6 : 'Text 6'}); +// INSERT INTO test_ks.test_table(pk, set_val, list_val, map_val) +// VALUES(3, {7, 8, 9}, ['Text 7', 'Text 8', 'Text 9'], {7 : 'Text 7', 8 : 'Text 8', 9 : 'Text 9'}); +// INSERT INTO test_ks.test_table(pk, set_val, list_val, map_val) +// VALUES(4, {10, 11, 12}, ['Text 10', 'Text 11', 'Text 12'], {10 : 'Text 10', 11 : 'Text 11', 12 : 'Text 12'}); +// INSERT INTO test_ks.test_table(pk, set_val, list_val, map_val) +// VALUES(5, {13, 14, 15}, ['Text 13', 'Text 14', 'Text 15'], {13 : 'Text 13', 14 : 'Text 14', 15 : 'Text 15'}); + +static thread_local const sstring UNCOMPRESSED_COLLECTIONS_PATH = + "tests/sstables/3.x/uncompressed/collections"; +static thread_local const schema_ptr UNCOMPRESSED_COLLECTIONS_SCHEMA = + schema_builder("test_ks", "test_table") + .with_column("pk", int32_type, column_kind::partition_key) + .with_column("set_val", set_type_impl::get_instance(int32_type, true)) + .with_column("list_val", list_type_impl::get_instance(utf8_type, true)) + .with_column("map_val", map_type_impl::get_instance(int32_type, utf8_type, true)) + .build(); + +SEASTAR_THREAD_TEST_CASE(test_uncompressed_collections_read) { + sstable_assertions sst(UNCOMPRESSED_COLLECTIONS_SCHEMA, UNCOMPRESSED_COLLECTIONS_PATH); + sst.load(); + auto to_key = [] (int key) { + auto bytes = int32_type->decompose(int32_t(key)); + auto pk = partition_key::from_single_value(*UNCOMPRESSED_COLLECTIONS_SCHEMA, bytes); + return dht::global_partitioner().decorate_key(*UNCOMPRESSED_COLLECTIONS_SCHEMA, pk); + }; + + auto set_cdef = UNCOMPRESSED_COLLECTIONS_SCHEMA->get_column_definition(to_bytes("set_val")); + BOOST_REQUIRE(set_cdef); + + auto list_cdef = UNCOMPRESSED_COLLECTIONS_SCHEMA->get_column_definition(to_bytes("list_val")); + BOOST_REQUIRE(list_cdef); + + auto map_cdef = UNCOMPRESSED_COLLECTIONS_SCHEMA->get_column_definition(to_bytes("map_val")); + BOOST_REQUIRE(map_cdef); + + auto generate = [&] (std::vector set_val, std::vector list_val, std::vector> map_val) { + std::vector assertions; + + assertions.push_back([val = std::move(set_val)] (const column_definition& def, + const atomic_cell_or_collection* cell) { + BOOST_REQUIRE(def.is_multi_cell()); + auto ctype = static_pointer_cast(def.type); + int idx = 0; + cell->as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) { + auto m_view = ctype->deserialize_mutation_form(c_bv); + for (auto&& entry : m_view.cells) { + auto cmp = compare_unsigned(int32_type->decompose(int32_t(val[idx])), entry.first); + if (cmp != 0) { + BOOST_FAIL(sprint("Expected row with column %s having value %s, but it has value %s", + def.id, + int32_type->decompose(int32_t(val[idx])), + entry.first)); + } + ++idx; + } + }); + }); + + assertions.push_back([val = std::move(list_val)] (const column_definition& def, + const atomic_cell_or_collection* cell) { + BOOST_REQUIRE(def.is_multi_cell()); + auto ctype = static_pointer_cast(def.type); + int idx = 0; + cell->as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) { + auto m_view = ctype->deserialize_mutation_form(c_bv); + for (auto&& entry : m_view.cells) { + auto cmp = compare_unsigned(utf8_type->decompose(val[idx]), entry.second.value().linearize()); + if (cmp != 0) { + BOOST_FAIL(sprint("Expected row with column %s having value %s, but it has value %s", + def.id, + utf8_type->decompose(val[idx]), + entry.second.value().linearize())); + } + ++idx; + } + }); + }); + + assertions.push_back([val = std::move(map_val)] (const column_definition& def, + const atomic_cell_or_collection* cell) { + BOOST_REQUIRE(def.is_multi_cell()); + auto ctype = static_pointer_cast(def.type); + int idx = 0; + cell->as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) { + auto m_view = ctype->deserialize_mutation_form(c_bv); + for (auto&& entry : m_view.cells) { + auto cmp1 = compare_unsigned(int32_type->decompose(int32_t(val[idx].first)), entry.first); + auto cmp2 = compare_unsigned(utf8_type->decompose(val[idx].second), entry.second.value().linearize()); + if (cmp1 != 0 || cmp2 != 0) { + BOOST_FAIL( + sprint("Expected row with column %s having value (%s, %s), but it has value (%s, %s)", + def.id, + int32_type->decompose(int32_t(val[idx].first)), + utf8_type->decompose(val[idx].second), + entry.first, + entry.second.value().linearize())); + } + ++idx; + } + }); + }); + + return std::move(assertions); + }; + + std::vector ids{set_cdef->id, list_cdef->id, map_cdef->id}; + + assert_that(sst.read_rows_flat()) + .produces_partition_start(to_key(5)) + .produces_row(clustering_key_prefix::make_empty(), ids, + generate({13, 14, 15}, {"Text 13", "Text 14", "Text 15"}, {{13,"Text 13"}, {14,"Text 14"}, {15,"Text 15"}})) + .produces_partition_end() + .produces_partition_start(to_key(1)) + .produces_row(clustering_key_prefix::make_empty(), ids, + generate({1, 2, 3}, {"Text 1", "Text 2", "Text 3"}, {{1,"Text 1"}, {2,"Text 2"}, {3,"Text 3"}})) + .produces_partition_end() + .produces_partition_start(to_key(2)) + .produces_row(clustering_key_prefix::make_empty(), ids, + generate({4, 5, 6}, {"Text 4", "Text 5", "Text 6"}, {{4,"Text 4"}, {5,"Text 5"}, {6, "Text 6"}})) + .produces_partition_end() + .produces_partition_start(to_key(4)) + .produces_row(clustering_key_prefix::make_empty(), ids, + generate({10, 11, 12}, {"Text 10", "Text 11", "Text 12"}, {{10,"Text 10"}, {11,"Text 11"}, {12,"Text 12"}})) + .produces_partition_end() + .produces_partition_start(to_key(3)) + .produces_row(clustering_key_prefix::make_empty(), ids, + generate({7, 8, 9}, {"Text 7", "Text 8", "Text 9"}, {{7,"Text 7"}, {8,"Text 8"}, {9,"Text 9"}})) + .produces_partition_end() + .produces_end_of_stream(); +} + static void compare_files(sstring filename1, sstring filename2) { std::ifstream ifs1(filename1); std::ifstream ifs2(filename2); diff --git a/tests/sstables/3.x/uncompressed/collections/mc-1-big-CRC.db b/tests/sstables/3.x/uncompressed/collections/mc-1-big-CRC.db new file mode 100644 index 0000000000000000000000000000000000000000..5f5cdabf65d1300b9558e7c04c1da9f0ed1147e0 GIT binary patch literal 8 PcmZQzWMDA=Jg*1<1#$vB literal 0 HcmV?d00001 diff --git a/tests/sstables/3.x/uncompressed/collections/mc-1-big-Data.db b/tests/sstables/3.x/uncompressed/collections/mc-1-big-Data.db new file mode 100644 index 0000000000000000000000000000000000000000..c48d7213856375223d2dca23f64f978275ff86cb GIT binary patch literal 885 zcmZvZJ5K^p5QXou3%fi7+(IqU#>zxqiUo=C8;Bi5jV98ov6I5yXlpEpy^l&teukYU zCTQoYmi5k=fv~wta!)v%$(b*Ma+Ff>vmgk%`U`#byFLFsy>hSL-HGkxx1Sd_JA>Je z+vWM2`uS7cZ62H+6^dmfTMp$imQ0!a^hq@gL!BcnzI!BWQDOW zd`l?}oIm`0m|mV>U<8k5P|`tXZeRvwYQvs!1DP0`5)zvYO#;k<#-KJs#=?--{&4g* z((mt>Zrf^nnTzRwt;V+*R1y4`!I~5jpu zcM0Hjb7y+J{86hKtliv=NQy}IB9fAH5_4+OLoO|@2t*wjFrWFy0XbfF$JY5fEJUHfDJ literal 0 HcmV?d00001 diff --git a/tests/sstables/3.x/uncompressed/collections/mc-1-big-Digest.crc32 b/tests/sstables/3.x/uncompressed/collections/mc-1-big-Digest.crc32 new file mode 100644 index 0000000000..b9f7e5a57a --- /dev/null +++ b/tests/sstables/3.x/uncompressed/collections/mc-1-big-Digest.crc32 @@ -0,0 +1 @@ +938712690 \ No newline at end of file diff --git a/tests/sstables/3.x/uncompressed/collections/mc-1-big-Filter.db b/tests/sstables/3.x/uncompressed/collections/mc-1-big-Filter.db new file mode 100644 index 0000000000000000000000000000000000000000..e72c7521e97757f1fe3605b8cc6c164fcecc1c52 GIT binary patch literal 24 fcmZQzU|?lnU|?!c&|qj_(-2c+5^_4hpuhkC8+QWp literal 0 HcmV?d00001 diff --git a/tests/sstables/3.x/uncompressed/collections/mc-1-big-Index.db b/tests/sstables/3.x/uncompressed/collections/mc-1-big-Index.db new file mode 100644 index 0000000000000000000000000000000000000000..aa4b59db2d373d8dd500776ff9a5e02d7fb3b488 GIT binary patch literal 44 icmZQzVPIfj1!54**svYKU}}tkFj$&+Aq?iGBMbmSvjhzQ literal 0 HcmV?d00001 diff --git a/tests/sstables/3.x/uncompressed/collections/mc-1-big-Statistics.db b/tests/sstables/3.x/uncompressed/collections/mc-1-big-Statistics.db new file mode 100644 index 0000000000000000000000000000000000000000..d66ab1e5b621d3412053fc88ee9c3da162fcc2b2 GIT binary patch literal 4926 zcmeI$YfMvD902h9=v!K8L3vFw6{av#3>_kg3TPeFNMxlT4XB$s=B z_kYgqIp?0+b11_wGUjQ@ASad_8_2E)sBsI% znp&^1Y@6G;&21_d+}=8GU1Nj8UFd86q2zSpPEz@qa5y}nO=0C9UcbTL_a9aCUp}iE z>+-S3eL*FOx06Fm;)lXSTWkzWq){kI9gr@{rA*|kpe%kiOezSH*HDh5tfQ=_oJcvD z@*K+ZDQ8p8rM!%CDdm?bZ=t-4@_~q@arH+mje7u>XQk_r%j^;=?xoz1%_ULZgUwU- z=R0s*R*5Y)VJntnE2;jf9^<%X5IZh}J##O%;VAa(SFlruu%FGre(ngiu?jo?5cbM* z*yXdZH>Y5GUczn>usf;wJ7~u7P&zW3{wsEs4f_zDpKNFt#kmUnK64kb`KrsPyiA7Y zzwFv26qkS5imVvxLRM9Tk<}9pbCT}S`!Jq%^4)vTb6+U^7<$2X>z6_2T&+C~ zU9kC$!_ej%i`PS2k2v$7?U|nyL2v&_TmI zFSLEtD)|1hUFB!s`_G2=>)`v(IfA#&g8JOZsTIw@yq68`1D0i6F<79-$9KW!Bmcea z09-$mSqY=XpuDR6qvlV5HS3NIxPjv~wf+M8ue+|l2m7zTR_Zr^^0R~%7hDfgP89YT zK|KAdQ3HHl8TU&pEg-(AGL#AT19_&O26P}^7`WT_I`G<#?o+Tm8$$(oa6UI!&7Eza zysx@va0K}9_czaV0{0*Ppc}R~cp-Zze=^P*-n#T;C9q}dgU5R&&3Gbt{2l!FJs1b- zh*@cGAYGP8naEjA8GR>^*uU+$qE%OKs5JPCbs9o z;}P3)q49|AxpL5W#P(dda6DpruK%*xZ>dfE%;}S7Y#N~^cokfEM{g#L>5S?0kXT~8dRbS`vZrU!?Tc@hEmJ}2#Eu5-g vrR~L~V!Q*ep5-Z@o`GQl;iJ4YM7XsSC-w!!0=UiT^-)9YQd4OxEzUE%CP literal 0 HcmV?d00001 diff --git a/tests/sstables/3.x/uncompressed/collections/mc-1-big-Summary.db b/tests/sstables/3.x/uncompressed/collections/mc-1-big-Summary.db new file mode 100644 index 0000000000000000000000000000000000000000..ab6bd5d065001dab321da058d45d4d71968b5047 GIT binary patch literal 56 mcmZQzU}#`qU|