diff --git a/atomic_cell.hh b/atomic_cell.hh index 1d1ee698e6..dfe12c3acd 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -48,6 +48,108 @@ T get_field(const bytes_view& v, unsigned offset) { class atomic_cell_or_collection; +template +class basic_atomic_cell_value_view { +public: + using fragment_type = std::conditional_t; +private: + fragment_type _value; +public: + explicit basic_atomic_cell_value_view(fragment_type value) : _value(value) { } + + class iterator { + fragment_type _view; + public: + using iterator_category = std::forward_iterator_tag; + using value_type = fragment_type; + using pointer = const fragment_type*; + using reference = const fragment_type&; + using difference_type = std::ptrdiff_t; + + explicit iterator(fragment_type fv) noexcept + : _view(fv) { } + + const fragment_type& operator*() const { + return _view; + } + const fragment_type* operator->() const { + return &_view; + } + iterator& operator++() { + _view = { }; + return *this; + } + iterator operator++(int) { + auto it = *this; + operator++(); + return it; + } + + bool operator==(const iterator& other) const { + return _view.data() == other._view.data(); + } + bool operator!=(const iterator& other) const { + return !(*this == other); + } + }; + + using const_iterator = iterator; + + auto begin() const { + return iterator(_value); + } + auto end() const { + return iterator(fragment_type()); + } + + bool operator==(const basic_atomic_cell_value_view& other) const noexcept { + return _value == other._value; + } + bool operator==(bytes_view bv) const noexcept { + return _value == bv; + } + + size_t size_bytes() const noexcept { + return _value.size(); + } + + bool empty() const noexcept { + return _value.empty(); + } + + bool is_fragmented() const noexcept { + return false; + } + + fragment_type first_fragment() const noexcept { + return _value; + } + + bytes linearize() const { + return bytes(_value.begin(), _value.end()); + } + + template + decltype(auto) with_linearized(Function&& fn) const { + return fn(_value); + } + + friend std::ostream& operator<<(std::ostream& os, const basic_atomic_cell_value_view& vv) { + return os << vv.first_fragment(); + } + +}; + +using atomic_cell_value_view = basic_atomic_cell_value_view; +using atomic_cell_value_mutable_view = basic_atomic_cell_value_view; + +inline int compare_unsigned(atomic_cell_value_view a, atomic_cell_value_view b) +{ + assert(!a.is_fragmented() && !b.is_fragmented()); + return compare_unsigned(a.first_fragment(), b.first_fragment()); +} + /* * Represents atomic cell layout. Works on serialized form. * @@ -102,11 +204,11 @@ private: return cell; } public: - static bytes_view value(bytes_view cell) { - return do_get_value(cell); + static atomic_cell_value_view value(bytes_view cell) { + return atomic_cell_value_view(do_get_value(cell)); } - static bytes_mutable_view value(bytes_mutable_view cell) { - return do_get_value(cell); + static atomic_cell_value_mutable_view value(bytes_mutable_view cell) { + return atomic_cell_value_mutable_view(do_get_value(cell)); } // Can be called on live counter update cells only static int64_t counter_update_value(bytes_view cell) { @@ -232,6 +334,9 @@ public: auto value() const { return atomic_cell_type::value(_data); } + bool is_value_fragmented() const { + return false; + } // Can be called on live counter update cells only int64_t counter_update_value() const { return atomic_cell_type::counter_update_value(_data); @@ -274,6 +379,10 @@ public: friend class atomic_cell; }; +template +using basic_atomic_cell_view = std::conditional_t; + class atomic_cell_ref final : public atomic_cell_base { public: atomic_cell_ref(managed_bytes& buf) : atomic_cell_base(buf) {} @@ -345,19 +454,18 @@ public: class collection_mutation_view { public: - bytes_view data; - bytes_view serialize() const { return data; } - static collection_mutation_view from_bytes(bytes_view v) { return { v }; } + // FIXME: encapsulate properly + atomic_cell_value_view data; }; inline collection_mutation::collection_mutation(collection_mutation_view v) - : data(v.data) { + : data(v.data.linearize()) { } inline collection_mutation::operator collection_mutation_view() const { - return { data }; + return { atomic_cell_value_view(bytes_view(data)) }; } class column_definition; diff --git a/atomic_cell_hash.hh b/atomic_cell_hash.hh index c82730c24a..87dc9356cb 100644 --- a/atomic_cell_hash.hh +++ b/atomic_cell_hash.hh @@ -33,13 +33,15 @@ template<> struct appending_hash { template void operator()(Hasher& h, collection_mutation_view cell, const column_definition& cdef) const { + cell.data.with_linearized([&] (bytes_view cell_bv) { auto ctype = static_pointer_cast(cdef.type); - auto m_view = ctype->deserialize_mutation_form(cell); + auto m_view = ctype->deserialize_mutation_form(cell_bv); ::feed_hash(h, m_view.tomb); for (auto&& key_and_value : m_view.cells) { ::feed_hash(h, key_and_value.first); ::feed_hash(h, key_and_value.second, cdef); } + }); } }; @@ -51,7 +53,9 @@ struct appending_hash { feed_hash(h, cell.timestamp()); if (cell.is_live()) { if (cdef.is_counter()) { - ::feed_hash(h, counter_cell_view(cell)); + counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) { + ::feed_hash(h, ccv); + }); return; } if (cell.is_live_and_has_ttl()) { @@ -91,4 +95,16 @@ struct appending_hash { feed_hash(h, c.as_collection_mutation(), cdef); } } -}; \ No newline at end of file +}; + +template<> +struct appending_hash { + template + void operator()(Hasher& h, atomic_cell_value_view v) const { + using boost::range::for_each; + feed_hash(h, v.size_bytes()); + for_each(v, [&h] (auto&& chk) { + h.update(reinterpret_cast(chk.data()), chk.size()); + }); + } +}; diff --git a/atomic_cell_or_collection.hh b/atomic_cell_or_collection.hh index 7e0b1effe5..b3b69b123d 100644 --- a/atomic_cell_or_collection.hh +++ b/atomic_cell_or_collection.hh @@ -57,7 +57,7 @@ public: return std::move(data.data); } collection_mutation_view as_collection_mutation() const { - return collection_mutation_view{_data}; + return collection_mutation_view{atomic_cell_value_view(bytes_view(_data))}; } bytes_view serialize() const { return _data; diff --git a/converting_mutation_partition_applier.hh b/converting_mutation_partition_applier.hh index 1a03ade7c7..e5826582a1 100644 --- a/converting_mutation_partition_applier.hh +++ b/converting_mutation_partition_applier.hh @@ -47,8 +47,9 @@ private: if (!is_compatible(new_def, old_type, kind)) { return; } + cell.data.with_linearized([&] (bytes_view cell_bv) { auto&& ctype = static_pointer_cast(old_type); - auto old_view = ctype->deserialize_mutation_form(cell); + auto old_view = ctype->deserialize_mutation_form(cell_bv); collection_type_impl::mutation_view new_view; if (old_view.tomb.timestamp > new_def.dropped_at()) { @@ -60,6 +61,7 @@ private: } } dst.apply(new_def, ctype->serialize_mutation_form(std::move(new_view))); + }); } public: converting_mutation_partition_applier( diff --git a/counters.cc b/counters.cc index 0883c78ea9..77d7292d34 100644 --- a/counters.cc +++ b/counters.cc @@ -78,10 +78,10 @@ std::vector counter_cell_view::shards_compatible_with_1_7_4() con return sorted_shards; } -static bool apply_in_place(const column_definition& cdef, atomic_cell_or_collection& dst, atomic_cell_or_collection& src) +static bool apply_in_place(const column_definition& cdef, atomic_cell_mutable_view dst, atomic_cell_mutable_view src) { - auto dst_ccmv = counter_cell_mutable_view(dst.as_mutable_atomic_cell(cdef)); - auto src_ccmv = counter_cell_mutable_view(src.as_mutable_atomic_cell(cdef)); + auto dst_ccmv = counter_cell_mutable_view(dst); + auto src_ccmv = counter_cell_mutable_view(src); auto dst_shards = dst_ccmv.shards(); auto src_shards = src_ccmv.shards(); @@ -143,16 +143,22 @@ void counter_cell_view::apply(const column_definition& cdef, atomic_cell_or_coll assert(!dst_ac.is_counter_update()); assert(!src_ac.is_counter_update()); + with_linearized(dst_ac, [&] (counter_cell_view dst_ccv) { + with_linearized(src_ac, [&] (counter_cell_view src_ccv) { - if (counter_cell_view(dst_ac).shard_count() >= counter_cell_view(src_ac).shard_count() + if (dst_ccv.shard_count() >= src_ccv.shard_count() && dst.can_use_mutable_view() && src.can_use_mutable_view()) { - if (apply_in_place(cdef, dst, src)) { - return; + auto dst_amc = dst.as_mutable_atomic_cell(cdef); + auto src_amc = src.as_mutable_atomic_cell(cdef); + if (!dst_amc.is_value_fragmented() && !src_amc.is_value_fragmented()) { + if (apply_in_place(cdef, dst_amc, src_amc)) { + return; + } } } - auto dst_shards = counter_cell_view(dst_ac).shards(); - auto src_shards = counter_cell_view(src_ac).shards(); + auto dst_shards = dst_ccv.shards(); + auto src_shards = src_ccv.shards(); counter_cell_builder result; combine(dst_shards.begin(), dst_shards.end(), src_shards.begin(), src_shards.end(), @@ -161,7 +167,9 @@ void counter_cell_view::apply(const column_definition& cdef, atomic_cell_or_coll }); auto cell = result.build(std::max(dst_ac.timestamp(), src_ac.timestamp())); - src = std::exchange(dst, atomic_cell_or_collection(*counter_type, cell)); + src = std::exchange(dst, atomic_cell_or_collection(std::move(cell))); + }); + }); } stdx::optional counter_cell_view::difference(atomic_cell_view a, atomic_cell_view b) @@ -176,8 +184,10 @@ stdx::optional counter_cell_view::difference(atomic_cell_view a, at return { }; } - auto a_shards = counter_cell_view(a).shards(); - auto b_shards = counter_cell_view(b).shards(); + return with_linearized(a, [&] (counter_cell_view a_ccv) { + return with_linearized(b, [&] (counter_cell_view b_ccv) { + auto a_shards = a_ccv.shards(); + auto b_shards = b_ccv.shards(); auto a_it = a_shards.begin(); auto a_end = a_shards.end(); @@ -202,6 +212,8 @@ stdx::optional counter_cell_view::difference(atomic_cell_view a, at diff = atomic_cell::make_live(*counter_type, a.timestamp(), bytes_view()); } return diff; + }); + }); } @@ -239,12 +251,13 @@ void transform_counter_updates_to_shards(mutation& m, const mutation* current_st if (!acv.is_live()) { return; // continue -- we are in lambda } - counter_cell_view ccv(acv); + counter_cell_view::with_linearized(acv, [&] (counter_cell_view ccv) { auto cs = ccv.local_shard(); if (!cs) { return; // continue } shards.emplace_back(std::make_pair(id, counter_shard(*cs))); + }); }); transformee.for_each_cell([&] (column_id id, atomic_cell_or_collection& ac_o_c) { diff --git a/counters.hh b/counters.hh index ab66c4e92e..97cd0f1565 100644 --- a/counters.hh +++ b/counters.hh @@ -79,7 +79,7 @@ static_assert(std::is_pod::value, "counter_id should be a POD type") std::ostream& operator<<(std::ostream& os, const counter_id& id); -template +template class basic_counter_shard_view { enum class offset : unsigned { id = 0u, @@ -88,7 +88,8 @@ class basic_counter_shard_view { total_size = unsigned(logical_clock) + sizeof(int64_t), }; private: - typename View::pointer _base; + using pointer_type = std::conditional_t; + pointer_type _base; private: template T read(offset off) const { @@ -100,7 +101,7 @@ public: static constexpr auto size = size_t(offset::total_size); public: basic_counter_shard_view() = default; - explicit basic_counter_shard_view(typename View::pointer ptr) noexcept + explicit basic_counter_shard_view(pointer_type ptr) noexcept : _base(ptr) { } counter_id id() const { return read(offset::id); } @@ -111,7 +112,7 @@ public: static constexpr size_t off = size_t(offset::value); static constexpr size_t size = size_t(offset::total_size) - off; - typename View::value_type tmp[size]; + signed char tmp[size]; std::copy_n(_base + off, size, tmp); std::copy_n(other._base + off, size, _base + off); std::copy_n(tmp, size, other._base + off); @@ -138,7 +139,7 @@ public: }; }; -using counter_shard_view = basic_counter_shard_view; +using counter_shard_view = basic_counter_shard_view; std::ostream& operator<<(std::ostream& os, counter_shard_view csv); @@ -287,28 +288,32 @@ public: // := // := // := * -template +template class basic_counter_cell_view { protected: - atomic_cell_base _cell; + using linearized_value_view = std::conditional_t; + using pointer_type = typename linearized_value_view::pointer; + basic_atomic_cell_view _cell; + linearized_value_view _value; private: - class shard_iterator : public std::iterator> { - typename View::pointer _current; - basic_counter_shard_view _current_view; + class shard_iterator : public std::iterator> { + pointer_type _current; + basic_counter_shard_view _current_view; public: shard_iterator() = default; - shard_iterator(typename View::pointer ptr) noexcept + shard_iterator(pointer_type ptr) noexcept : _current(ptr), _current_view(ptr) { } - basic_counter_shard_view& operator*() noexcept { + basic_counter_shard_view& operator*() noexcept { return _current_view; } - basic_counter_shard_view* operator->() noexcept { + basic_counter_shard_view* operator->() noexcept { return &_current_view; } shard_iterator& operator++() noexcept { _current += counter_shard_view::size; - _current_view = basic_counter_shard_view(_current); + _current_view = basic_counter_shard_view(_current); return *this; } shard_iterator operator++(int) noexcept { @@ -318,7 +323,7 @@ private: } shard_iterator& operator--() noexcept { _current -= counter_shard_view::size; - _current_view = basic_counter_shard_view(_current); + _current_view = basic_counter_shard_view(_current); return *this; } shard_iterator operator--(int) noexcept { @@ -335,22 +340,23 @@ private: }; public: boost::iterator_range shards() const { - auto bv = _cell.value(); - auto begin = shard_iterator(bv.data()); - auto end = shard_iterator(bv.data() + bv.size()); + auto begin = shard_iterator(_value.data()); + auto end = shard_iterator(_value.data() + _value.size()); return boost::make_iterator_range(begin, end); } size_t shard_count() const { - return _cell.value().size() / counter_shard_view::size; + return _cell.value().size_bytes() / counter_shard_view::size; } -public: +protected: // ac must be a live counter cell - explicit basic_counter_cell_view(atomic_cell_base ac) noexcept : _cell(ac) { + explicit basic_counter_cell_view(basic_atomic_cell_view ac, linearized_value_view vv) noexcept + : _cell(ac), _value(vv) + { assert(_cell.is_live()); assert(!_cell.is_counter_update()); } - +public: api::timestamp_type timestamp() const { return _cell.timestamp(); } static data_type total_value_type() { return long_type; } @@ -381,9 +387,17 @@ public: } }; -struct counter_cell_view : basic_counter_cell_view { +struct counter_cell_view : basic_counter_cell_view { using basic_counter_cell_view::basic_counter_cell_view; + template + static decltype(auto) with_linearized(basic_atomic_cell_view ac, Function&& fn) { + return ac.value().with_linearized([&] (bytes_view value_view) { + counter_cell_view ccv(ac, value_view); + return fn(ccv); + }); + } + // Returns counter shards in an order that is compatible with Scylla 1.7.4. std::vector shards_compatible_with_1_7_4() const; @@ -397,9 +411,15 @@ struct counter_cell_view : basic_counter_cell_view { friend std::ostream& operator<<(std::ostream& os, counter_cell_view ccv); }; -struct counter_cell_mutable_view : basic_counter_cell_view { +struct counter_cell_mutable_view : basic_counter_cell_view { using basic_counter_cell_view::basic_counter_cell_view; + explicit counter_cell_mutable_view(atomic_cell_mutable_view ac) noexcept + : basic_counter_cell_view(ac, ac.value().first_fragment()) + { + assert(!ac.value().is_fragmented()); + } + void set_timestamp(api::timestamp_type ts) { _cell.set_timestamp(ts); } }; diff --git a/cql3/restrictions/single_column_restriction.hh b/cql3/restrictions/single_column_restriction.hh index 4e47ed3e59..72770f4deb 100644 --- a/cql3/restrictions/single_column_restriction.hh +++ b/cql3/restrictions/single_column_restriction.hh @@ -113,7 +113,7 @@ public: class contains; protected: - bytes_view_opt get_value(const schema& schema, + std::optional get_value(const schema& schema, const partition_key& key, const clustering_key_prefix& ckey, const row& cells, diff --git a/cql3/restrictions/statement_restrictions.cc b/cql3/restrictions/statement_restrictions.cc index c76e955c84..ca947834b8 100644 --- a/cql3/restrictions/statement_restrictions.cc +++ b/cql3/restrictions/statement_restrictions.cc @@ -430,7 +430,7 @@ void statement_restrictions::validate_secondary_index_selections(bool selects_on } } -static bytes_view_opt do_get_value(const schema& schema, +static std::optional do_get_value(const schema& schema, const column_definition& cdef, const partition_key& key, const clustering_key_prefix& ckey, @@ -438,21 +438,21 @@ static bytes_view_opt do_get_value(const schema& schema, gc_clock::time_point now) { switch(cdef.kind) { case column_kind::partition_key: - return key.get_component(schema, cdef.component_index()); + return atomic_cell_value_view(key.get_component(schema, cdef.component_index())); case column_kind::clustering_key: - return ckey.get_component(schema, cdef.component_index()); + return atomic_cell_value_view(ckey.get_component(schema, cdef.component_index())); default: auto cell = cells.find_cell(cdef.id); if (!cell) { - return stdx::nullopt; + return std::nullopt; } assert(cdef.is_atomic()); auto c = cell->as_atomic_cell(cdef); - return c.is_dead(now) ? stdx::nullopt : bytes_view_opt(c.value()); + return c.is_dead(now) ? std::nullopt : std::optional(c.value()); } } -bytes_view_opt single_column_restriction::get_value(const schema& schema, +std::optional single_column_restriction::get_value(const schema& schema, const partition_key& key, const clustering_key_prefix& ckey, const row& cells, @@ -472,7 +472,12 @@ bool single_column_restriction::EQ::is_satisfied_by(const schema& schema, auto operand = value(options); if (operand) { auto cell_value = get_value(schema, key, ckey, cells, now); - return cell_value && _column_def.type->compare(*operand, *cell_value) == 0; + if (!cell_value) { + return false; + } + return cell_value->with_linearized([&] (bytes_view cell_value_bv) { + return _column_def.type->compare(*operand, cell_value_bv) == 0; + }); } return false; } @@ -491,9 +496,11 @@ bool single_column_restriction::IN::is_satisfied_by(const schema& schema, return false; } auto operands = values(options); + return cell_value->with_linearized([&] (bytes_view cell_value_bv) { return std::any_of(operands.begin(), operands.end(), [&] (auto&& operand) { - return operand && _column_def.type->compare(*operand, *cell_value) == 0; + return operand && _column_def.type->compare(*operand, cell_value_bv) == 0; }); + }); } static query::range to_range(const term_slice& slice, const query_options& options) { @@ -526,7 +533,9 @@ bool single_column_restriction::slice::is_satisfied_by(const schema& schema, if (!cell_value) { return false; } - return to_range(_slice, options).contains(*cell_value, _column_def.type->as_tri_comparator()); + return cell_value->with_linearized([&] (bytes_view cell_value_bv) { + return to_range(_slice, options).contains(cell_value_bv, _column_def.type->as_tri_comparator()); + }); } bool single_column_restriction::contains::is_satisfied_by(const schema& schema, @@ -552,7 +561,8 @@ bool single_column_restriction::contains::is_satisfied_by(const schema& schema, auto&& element_type = col_type->is_set() ? col_type->name_comparator() : col_type->value_comparator(); if (_column_def.type->is_multi_cell()) { auto cell = cells.find_cell(_column_def.id); - auto&& elements = col_type->deserialize_mutation_form(cell->as_collection_mutation()).cells; + return cell->as_collection_mutation().data.with_linearized([&] (bytes_view collection_bv) { + auto&& elements = col_type->deserialize_mutation_form(collection_bv).cells; auto end = std::remove_if(elements.begin(), elements.end(), [now] (auto&& element) { return element.second.is_dead(now); }); @@ -562,7 +572,9 @@ bool single_column_restriction::contains::is_satisfied_by(const schema& schema, continue; } auto found = std::find_if(elements.begin(), end, [&] (auto&& element) { - return element_type->compare(element.second.value(), *val) == 0; + return element.second.value().with_linearized([&] (bytes_view value_bv) { + return element_type->compare(value_bv, *val) == 0; + }); }); if (found == end) { return false; @@ -589,16 +601,26 @@ bool single_column_restriction::contains::is_satisfied_by(const schema& schema, auto found = std::find_if(elements.begin(), end, [&] (auto&& element) { return map_key_type->compare(element.first, *map_key) == 0; }); - if (found == end || element_type->compare(found->second.value(), *map_value) != 0) { + if (found == end) { + return false; + } + auto cmp = found->second.value().with_linearized([&] (bytes_view value_bv) { + return element_type->compare(value_bv, *map_value); + }); + if (cmp != 0) { return false; } } + return true; + }); } else { auto cell_value = get_value(schema, key, ckey, cells, now); if (!cell_value) { return false; } - auto deserialized = _column_def.type->deserialize(*cell_value); + auto deserialized = cell_value->with_linearized([&] (bytes_view cell_value_bv) { + return _column_def.type->deserialize(cell_value_bv); + }); for (auto&& value : _values) { auto val = value->bind_and_get(options); if (!val) { @@ -669,7 +691,9 @@ bool token_restriction::EQ::is_satisfied_by(const schema& schema, for (auto&& operand : values(options)) { if (operand) { auto cell_value = do_get_value(schema, **cdef, key, ckey, cells, now); - satisfied = cell_value && (*cdef)->type->compare(*operand, *cell_value) == 0; + satisfied = cell_value && cell_value->with_linearized([&] (bytes_view cell_value_bv) { + return (*cdef)->type->compare(*operand, cell_value_bv) == 0; + }); } if (!satisfied) { break; @@ -691,7 +715,9 @@ bool token_restriction::slice::is_satisfied_by(const schema& schema, if (!cell_value) { return false; } - satisfied = range.contains(*cell_value, cdef->type->as_tri_comparator()); + satisfied = cell_value->with_linearized([&] (bytes_view cell_value_bv) { + return range.contains(cell_value_bv, cdef->type->as_tri_comparator()); + }); if (!satisfied) { break; } diff --git a/cql3/statements/batch_statement.cc b/cql3/statements/batch_statement.cc index 35d0458732..f1f3226fef 100644 --- a/cql3/statements/batch_statement.cc +++ b/cql3/statements/batch_statement.cc @@ -239,18 +239,18 @@ void batch_statement::verify_batch_size(const std::vector& mutations) public: void accept_partition_tombstone(tombstone) override {} void accept_static_cell(column_id, atomic_cell_view v) override { - size += v.value().size(); + size += v.value().size_bytes(); } void accept_static_cell(column_id, collection_mutation_view v) override { - size += v.data.size(); + size += v.data.size_bytes(); } void accept_row_tombstone(const range_tombstone&) override {} void accept_row(position_in_partition_view, const row_tombstone&, const row_marker&, is_dummy, is_continuous) override {} void accept_row_cell(column_id, atomic_cell_view v) override { - size += v.value().size(); + size += v.value().size_bytes(); } void accept_row_cell(column_id id, collection_mutation_view v) override { - size += v.data.size(); + size += v.data.size_bytes(); } size_t size = 0; diff --git a/database.cc b/database.cc index fb86378152..7905b4d641 100644 --- a/database.cc +++ b/database.cc @@ -3681,7 +3681,7 @@ std::ostream& operator<<(std::ostream& os, const atomic_cell_view& acv) { if (acv.is_live()) { return fprint(os, "atomic_cell{%s;ts=%d;expiry=%d,ttl=%d}", - to_hex(acv.value()), + to_hex(acv.value().linearize()), acv.timestamp(), acv.is_live_and_has_ttl() ? acv.expiry().time_since_epoch().count() : -1, acv.is_live_and_has_ttl() ? acv.ttl().count() : 0); diff --git a/db/view/view.cc b/db/view/view.cc index 69ae601c03..65312dc174 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -318,7 +318,8 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons } deletable_row& view_updates::get_view_row(const partition_key& base_key, const clustering_row& update) { - auto get_value = boost::adaptors::transformed([&, this] (const column_definition& cdef) { + std::vector linearized_values; + auto get_value = boost::adaptors::transformed([&, this] (const column_definition& cdef) -> bytes_view { auto* base_col = _base->get_column_definition(cdef.name()); assert(base_col); switch (base_col->kind) { @@ -328,10 +329,11 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c return update.key().get_component(*_base, base_col->position()); default: auto& c = update.cells().cell_at(base_col->id); - if (base_col->is_atomic()) { - return c.as_atomic_cell(cdef).value(); + auto value_view = base_col->is_atomic() ? c.as_atomic_cell(cdef).value() : c.as_collection_mutation().data; + if (value_view.is_fragmented()) { + return linearized_values.emplace_back(value_view.linearize()); } - return c.as_collection_mutation().data; + return value_view.first_fragment(); } }); auto& partition = partition_for(partition_key::from_range(_view->partition_key_columns() | get_value)); diff --git a/memtable.hh b/memtable.hh index 3721400775..1db112f534 100644 --- a/memtable.hh +++ b/memtable.hh @@ -186,11 +186,13 @@ private: update(item.as_atomic_cell(col)); } else { auto ctype = static_pointer_cast(col.type); - auto mview = ctype->deserialize_mutation_form(item.as_collection_mutation()); + item.as_collection_mutation().data.with_linearized([&] (bytes_view bv) { + auto mview = ctype->deserialize_mutation_form(bv); update(mview.tomb); for (auto& entry : mview.cells) { update(entry.second); } + }); } }); } diff --git a/mutation_partition.cc b/mutation_partition.cc index cf2e06e94f..c795f87a30 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -595,7 +595,7 @@ void write_cell(RowWriter& w, const query::partition_slice& slice, ::atomic_cell } else { return std::move(wr).skip_expiry(); } - }().write_value(c.value()); + }().write_fragmented_value(c.value()); [&, wr = std::move(after_value)] () mutable { if (slice.options.contains() && c.is_live_and_has_ttl()) { return std::move(wr).write_ttl(c.ttl()); @@ -621,6 +621,7 @@ void write_cell(RowWriter& w, const query::partition_slice& slice, const data_ty template void write_counter_cell(RowWriter& w, const query::partition_slice& slice, ::atomic_cell_view c) { assert(c.is_live()); + counter_cell_view::with_linearized(c, [&] (counter_cell_view ccv) { auto wr = w.add().write(); [&, wr = std::move(wr)] () mutable { if (slice.options.contains()) { @@ -629,9 +630,10 @@ void write_counter_cell(RowWriter& w, const query::partition_slice& slice, ::ato return std::move(wr).skip_timestamp(); } }().skip_expiry() - .write_value(counter_cell_view::total_value_type()->decompose(counter_cell_view(c).total_value())) + .write_value(counter_cell_view::total_value_type()->decompose(ccv.total_value())) .skip_ttl() .end_qr_cell(); + }); } // Used to return the timestamp of the latest update to the row @@ -1624,7 +1626,8 @@ bool row::compact_and_expire( } else { auto&& cell = c.as_collection_mutation(); auto&& ctype = static_pointer_cast(def.type); - auto m_view = ctype->deserialize_mutation_form(cell); + cell.data.with_linearized([&] (bytes_view cell_bv) { + auto m_view = ctype->deserialize_mutation_form(cell_bv); collection_type_impl::mutation m = m_view.materialize(*ctype); any_live |= m.compact_and_expire(tomb, query_time, can_gc, gc_before); if (m.cells.empty() && m.tomb <= tomb.tomb()) { @@ -1632,6 +1635,7 @@ bool row::compact_and_expire( } else { c = ctype->serialize_mutation_form(m); } + }); } return erase; }); diff --git a/mutation_partition_serializer.cc b/mutation_partition_serializer.cc index 97b75bf205..9b809a3a86 100644 --- a/mutation_partition_serializer.cc +++ b/mutation_partition_serializer.cc @@ -44,7 +44,7 @@ template auto write_live_cell(Writer&& writer, atomic_cell_view c) { return std::move(writer).write_created_at(c.timestamp()) - .write_value(c.value()) + .write_fragmented_value(c.value()) .end_live_cell(); } @@ -59,7 +59,7 @@ auto write_counter_cell(Writer&& writer, atomic_cell_view c) .write_delta(delta) .end_counter_cell_update(); } else { - counter_cell_view ccv(c); + return counter_cell_view::with_linearized(c, [&] (counter_cell_view ccv) { auto shards = std::move(value).start_value_counter_cell_full() .start_shards(); if (service::get_local_storage_service().cluster_supports_correct_counter_order()) { @@ -72,6 +72,7 @@ auto write_counter_cell(Writer&& writer, atomic_cell_view c) } } return std::move(shards).end_shards().end_counter_cell_full(); + }); } }().end_counter_cell(); } @@ -83,7 +84,7 @@ auto write_expiring_cell(Writer&& writer, atomic_cell_view c) .write_expiry(c.expiry()) .start_c() .write_created_at(c.timestamp()) - .write_value(c.value()) + .write_fragmented_value(c.value()) .end_c() .end_expiring_cell(); } @@ -101,8 +102,9 @@ auto write_dead_cell(Writer&& writer, atomic_cell_view c) template auto write_collection_cell(Writer&& collection_writer, collection_mutation_view cmv, const column_definition& def) { + return cmv.data.with_linearized([&] (bytes_view cmv_bv) { auto&& ctype = static_pointer_cast(def.type); - auto m_view = ctype->deserialize_mutation_form(cmv); + auto m_view = ctype->deserialize_mutation_form(cmv_bv); auto cells_writer = std::move(collection_writer).write_tomb(m_view.tomb).start_elements(); for (auto&& c : m_view.cells) { auto cell_writer = cells_writer.add().write_key(c.first); @@ -115,6 +117,7 @@ auto write_collection_cell(Writer&& collection_writer, collection_mutation_view } } return std::move(cells_writer).end_elements().end_collection_cell(); + }); } template diff --git a/mutation_partition_view.cc b/mutation_partition_view.cc index 6aee34f77f..a195ed7cff 100644 --- a/mutation_partition_view.cc +++ b/mutation_partition_view.cc @@ -108,7 +108,7 @@ collection_mutation read_collection_cell(const collection_type_impl& ctype, ser: for (auto&& e : elements) { mut.cells.emplace_back(e.key(), read_atomic_cell(*ctype.value_comparator(), e.value())); } - return collection_type_impl::serialize_mutation_form(mut); + return ctype.serialize_mutation_form(mut); } template diff --git a/sstables/disk_types.hh b/sstables/disk_types.hh index dc42090b51..7ba7ea76dc 100644 --- a/sstables/disk_types.hh +++ b/sstables/disk_types.hh @@ -29,6 +29,7 @@ #include #include #include +#include "atomic_cell.hh" namespace sstables { @@ -67,6 +68,11 @@ struct disk_string_view { bytes_view value; }; +template +struct disk_data_value_view { + atomic_cell_value_view value; +}; + template struct disk_array { static_assert(std::is_integral::value, "Length type must be convertible to integer"); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index de20afec82..001424d5f0 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -378,6 +378,15 @@ inline void write(sstable_version_types v, file_writer& out, const disk_string_v write(v, out, len, s.value); } +template +inline void write(sstable_version_types ver, file_writer& out, const disk_data_value_view& v) { + SizeType length; + check_truncate_and_assign(length, v.value.size_bytes()); + write(ver, out, length); + using boost::range::for_each; + for_each(v.value, [&] (bytes_view fragment) { write(ver, out, fragment); }); +} + // We cannot simply read the whole array at once, because we don't know its // full size. We know the number of elements, but if we are talking about // disk_strings, for instance, we have no idea how much of the stream each @@ -1712,6 +1721,16 @@ void write_cell_value(file_writer& out, const abstract_type& type, bytes_view va } } +void write_cell_value(file_writer& out, const abstract_type& type, atomic_cell_value_view value) { + if (!value.empty()) { + if (!type.value_length_if_fixed()) { + write_vint(out, value.size_bytes()); + } + using boost::range::for_each; + for_each(value, [&] (bytes_view fragment) { write(sstable_version_types::mc, out, fragment); }); + } +} + static inline void update_cell_stats(column_stats& c_stats, api::timestamp_type timestamp) { c_stats.update_timestamp(timestamp); c_stats.cells_count++; @@ -1770,19 +1789,20 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell, const column_d column_mask mask = column_mask::counter; write(_version, out, mask, int64_t(0), timestamp); - counter_cell_view ccv(cell); + counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) { write_counter_value(ccv, out, _version, [v = _version] (file_writer& out, uint32_t value) { return write(v, out, value); }); _c_stats.update_local_deletion_time(std::numeric_limits::max()); + }); } else if (cell.is_live_and_has_ttl()) { // expiring cell column_mask mask = column_mask::expiration; uint32_t ttl = cell.ttl().count(); uint32_t expiration = cell.expiry().time_since_epoch().count(); - disk_string_view cell_value { cell.value() }; + disk_data_value_view cell_value { cell.value() }; _c_stats.update_local_deletion_time(expiration); // tombstone histogram is updated with expiration time because if ttl is longer @@ -1795,7 +1815,7 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell, const column_d // regular cell column_mask mask = column_mask::none; - disk_string_view cell_value { cell.value() }; + disk_data_value_view cell_value { cell.value() }; _c_stats.update_local_deletion_time(std::numeric_limits::max()); @@ -1884,8 +1904,9 @@ void sstable::write_range_tombstone(file_writer& out, } void sstable::write_collection(file_writer& out, const composite& clustering_key, const column_definition& cdef, collection_mutation_view collection) { + collection.data.with_linearized([&] (bytes_view collection_bv) { auto t = static_pointer_cast(cdef.type); - auto mview = t->deserialize_mutation_form(collection); + auto mview = t->deserialize_mutation_form(collection_bv); const bytes& column_name = cdef.name(); if (mview.tomb) { write_range_tombstone(out, clustering_key, composite::eoc::start, clustering_key, composite::eoc::end, { column_name }, mview.tomb); @@ -1894,6 +1915,7 @@ void sstable::write_collection(file_writer& out, const composite& clustering_key index_and_write_column_name(out, clustering_key, { column_name, cp.first }); write_cell(out, cp.second, cdef); } + }); } // This function is about writing a clustered_row to data file according to SSTables format. @@ -2904,10 +2926,11 @@ void sstable_writer_m::write_cell(file_writer& writer, atomic_cell_view cell, co if (has_value) { if (cdef.is_counter()) { assert(!cell.is_counter_update()); - counter_cell_view ccv(cell); + counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) { write_counter_value(ccv, writer, sstable_version_types::mc, [] (file_writer& out, uint32_t value) { return write_vint(out, value); }); + }); } else { write_cell_value(writer, *cdef.type, cell.value()); } @@ -2954,7 +2977,8 @@ void sstable_writer_m::write_liveness_info(file_writer& writer, const row_marker void sstable_writer_m::write_collection(file_writer& writer, const column_definition& cdef, collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion) { auto& ctype = *static_pointer_cast(cdef.type); - auto mview = ctype.deserialize_mutation_form(collection); + collection.data.with_linearized([&] (bytes_view collection_bv) { + auto mview = ctype.deserialize_mutation_form(collection_bv); if (has_complex_deletion) { auto dt = to_deletion_time(mview.tomb); write_delta_deletion_time(writer, dt); @@ -2972,6 +2996,7 @@ void sstable_writer_m::write_collection(file_writer& writer, const column_defini ++_c_stats.cells_count; write_cell(writer, cell, cdef, properties, cell_path); } + }); } void sstable_writer_m::write_cells(file_writer& writer, column_kind kind, const row& row_body, @@ -3071,12 +3096,14 @@ static bool row_has_complex_deletion(const schema& s, const row& r) { return stop_iteration::no; } auto t = static_pointer_cast(cdef.type); - auto mview = t->deserialize_mutation_form(c.as_collection_mutation()); + return c.as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) { + auto mview = t->deserialize_mutation_form(c_bv); if (mview.tomb) { result = true; } return stop_iteration(static_cast(mview.tomb)); }); + }); return result; } diff --git a/tests/counter_test.cc b/tests/counter_test.cc index d59fc2b9cf..2f1880c223 100644 --- a/tests/counter_test.cc +++ b/tests/counter_test.cc @@ -67,24 +67,28 @@ SEASTAR_TEST_CASE(test_counter_cell) { b1.add_shard(counter_shard(id[0], 5, 1)); b1.add_shard(counter_shard(id[1], -4, 1)); auto c1 = atomic_cell_or_collection(b1.build(0)); - - auto cv = counter_cell_view(c1.as_atomic_cell(cdef)); + + atomic_cell_or_collection c2; + counter_cell_view::with_linearized(c1.as_atomic_cell(cdef), [&] (counter_cell_view cv) { BOOST_REQUIRE_EQUAL(cv.total_value(), 1); verify_shard_order(cv); counter_cell_builder b2; b2.add_shard(counter_shard(*cv.get_shard(id[0])).update(2, 1)); b2.add_shard(counter_shard(id[2], 1, 1)); - auto c2 = atomic_cell_or_collection(b2.build(0)); + c2 = atomic_cell_or_collection(b2.build(0)); + }); - cv = counter_cell_view(c2.as_atomic_cell(cdef)); + counter_cell_view::with_linearized(c2.as_atomic_cell(cdef), [&] (counter_cell_view cv) { BOOST_REQUIRE_EQUAL(cv.total_value(), 8); verify_shard_order(cv); + }); counter_cell_view::apply(cdef, c1, c2); - cv = counter_cell_view(c1.as_atomic_cell(cdef)); + counter_cell_view::with_linearized(c1.as_atomic_cell(cdef), [&] (counter_cell_view cv) { BOOST_REQUIRE_EQUAL(cv.total_value(), 4); verify_shard_order(cv); + }); }); } @@ -97,9 +101,10 @@ SEASTAR_TEST_CASE(test_apply) { auto src = b.copy(*cdef.type); counter_cell_view::apply(cdef, dst, src); - auto cv = counter_cell_view(dst.as_atomic_cell(cdef)); + counter_cell_view::with_linearized(dst.as_atomic_cell(cdef), [&] (counter_cell_view cv) { BOOST_REQUIRE_EQUAL(cv.total_value(), value); BOOST_REQUIRE_EQUAL(cv.timestamp(), std::max(dst.as_atomic_cell(cdef).timestamp(), src.as_atomic_cell(cdef).timestamp())); + }); }; auto id = generate_ids(5); @@ -235,15 +240,17 @@ SEASTAR_TEST_CASE(test_counter_mutations) { m.apply(m2); auto ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - counter_cell_view ccv { ac }; + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), -102); verify_shard_order(ccv); + }); ac = get_static_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 20); verify_shard_order(ccv); + }); m.apply(m3); ac = get_counter_cell(m); @@ -263,28 +270,32 @@ SEASTAR_TEST_CASE(test_counter_mutations) { m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m2.partition())); ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 2); verify_shard_order(ccv); + }); ac = get_static_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 11); verify_shard_order(ccv); + }); m = mutation(s, m1.decorated_key(), m2.partition().difference(s, m1.partition())); ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), -105); verify_shard_order(ccv); + }); ac = get_static_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 9); verify_shard_order(ccv); + }); m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m3.partition())); BOOST_REQUIRE_EQUAL(m.partition().clustered_rows().calculate_size(), 0); @@ -420,30 +431,34 @@ SEASTAR_TEST_CASE(test_transfer_updates_to_shards) { auto ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - auto ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 5); verify_shard_order(ccv); + }); ac = get_static_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 4); verify_shard_order(ccv); + }); m = m2; transform_counter_updates_to_shards(m, &m0, 0); ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 14); verify_shard_order(ccv); + }); ac = get_static_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 12); verify_shard_order(ccv); + }); m = m3; transform_counter_updates_to_shards(m, &m0, 0); @@ -502,13 +517,14 @@ SEASTAR_TEST_CASE(test_sanitize_corrupted_cells) { auto c2 = atomic_cell_or_collection(b2.build(0)); // Compare - auto cv1 = counter_cell_view(c1.as_atomic_cell(cdef)); - auto cv2 = counter_cell_view(c2.as_atomic_cell(cdef)); - + counter_cell_view::with_linearized(c1.as_atomic_cell(cdef), [&] (counter_cell_view cv1) { + counter_cell_view::with_linearized(c2.as_atomic_cell(cdef), [&] (counter_cell_view cv2) { BOOST_REQUIRE_EQUAL(cv1, cv2); BOOST_REQUIRE_EQUAL(cv1.total_value(), cv2.total_value()); verify_shard_order(cv1); verify_shard_order(cv2); + }); + }); } }); } @@ -561,7 +577,7 @@ SEASTAR_TEST_CASE(test_shards_compatible_with_1_7_4) { } auto ac = atomic_cell_or_collection(ccb.build(0)); - auto cv = counter_cell_view(ac.as_atomic_cell(cdef)); + counter_cell_view::with_linearized(ac.as_atomic_cell(cdef), [&] (counter_cell_view cv) { verify_shard_order(cv); @@ -573,6 +589,7 @@ SEASTAR_TEST_CASE(test_shards_compatible_with_1_7_4) { } previous = cs.id(); } + }); }); } diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index 419168ad8d..da2c81956a 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -228,12 +228,11 @@ public: if (!col_def->type->is_multi_cell()) { auto c = cell->as_atomic_cell(*col_def); assert(c.is_live()); - actual = { c.value().begin(), c.value().end() }; + actual = c.value().linearize(); } else { auto c = cell->as_collection_mutation(); auto type = dynamic_pointer_cast(col_def->type); - actual = type->to_value(type->deserialize_mutation_form(c), - cql_serialization_format::internal()); + actual = type->to_value(c, cql_serialization_format::internal()); } assert(col_def->type->equal(actual, exp)); }); diff --git a/tests/flat_mutation_reader_assertions.hh b/tests/flat_mutation_reader_assertions.hh index 028f36b732..0f3dc63628 100644 --- a/tests/flat_mutation_reader_assertions.hh +++ b/tests/flat_mutation_reader_assertions.hh @@ -116,7 +116,7 @@ public: BOOST_FAIL(sprint("Expected static row with column %s, but it is not present", columns[i].name)); } auto& cdef = _reader.schema()->static_column_at(columns[i].id); - auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value()); + auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value().linearize()); if (cmp != 0) { BOOST_FAIL(sprint("Expected static row with column %s having value %s, but it has value %s", columns[i].name, @@ -150,7 +150,7 @@ public: BOOST_FAIL(sprint("Expected row with column %s, but it is not present", columns[i].name)); } auto& cdef = _reader.schema()->regular_column_at(columns[i].id); - auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value()); + auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value().linearize()); if (cmp != 0) { BOOST_FAIL(sprint("Expected row with column %s having value %s, but it has value %s", columns[i].name, diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc index a09a478296..7aef592063 100644 --- a/tests/mutation_test.cc +++ b/tests/mutation_test.cc @@ -75,6 +75,11 @@ static atomic_cell make_atomic_cell(data_type dt, T value) { return atomic_cell::make_live(*dt, 0, dt->decompose(std::move(value))); }; +template +static atomic_cell make_collection_member(data_type dt, T value) { + return atomic_cell::make_live(*dt, 0, dt->decompose(std::move(value))); +}; + static mutation_partition get_partition(memtable& mt, const partition_key& key) { auto dk = dht::global_partitioner().decorate_key(*mt.schema(), key); auto reader = mt.make_flat_reader(mt.schema(), dht::partition_range::make_singular(dk)); @@ -119,7 +124,7 @@ SEASTAR_TEST_CASE(test_mutation_is_applied) { BOOST_REQUIRE(i); auto cell = i->as_atomic_cell(r1_col); BOOST_REQUIRE(cell.is_live()); - BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(3))); + BOOST_REQUIRE(int32_type->equal(cell.value().linearize(), int32_type->decompose(3))); }); } @@ -207,19 +212,19 @@ SEASTAR_TEST_CASE(test_map_mutations) { auto mt = make_lw_shared(s); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); - auto mmut1 = make_collection_mutation({}, int32_type->decompose(101), make_atomic_cell(utf8_type, sstring("101"))); + auto mmut1 = make_collection_mutation({}, int32_type->decompose(101), make_collection_member(utf8_type, sstring("101"))); mutation m1(s, key); m1.set_static_cell(column, my_map_type->serialize_mutation_form(mmut1)); mt->apply(m1); - auto mmut2 = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell(utf8_type, sstring("102"))); + auto mmut2 = make_collection_mutation({}, int32_type->decompose(102), make_collection_member(utf8_type, sstring("102"))); mutation m2(s, key); m2.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2)); mt->apply(m2); - auto mmut3 = make_collection_mutation({}, int32_type->decompose(103), make_atomic_cell(utf8_type, sstring("103"))); + auto mmut3 = make_collection_mutation({}, int32_type->decompose(103), make_collection_member(utf8_type, sstring("103"))); mutation m3(s, key); m3.set_static_cell(column, my_map_type->serialize_mutation_form(mmut3)); mt->apply(m3); - auto mmut2o = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell(utf8_type, sstring("102 override"))); + auto mmut2o = make_collection_mutation({}, int32_type->decompose(102), make_collection_member(utf8_type, sstring("102 override"))); mutation m2o(s, key); m2o.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2o)); mt->apply(m2o); @@ -229,7 +234,8 @@ SEASTAR_TEST_CASE(test_map_mutations) { auto i = r.find_cell(column.id); BOOST_REQUIRE(i); auto cell = i->as_collection_mutation(); - auto muts = my_map_type->deserialize_mutation_form(cell); + auto cell_b = cell.data.linearize(); + auto muts = my_map_type->deserialize_mutation_form(cell_b); BOOST_REQUIRE(muts.cells.size() == 3); // FIXME: more strict tests }); @@ -265,7 +271,8 @@ SEASTAR_TEST_CASE(test_set_mutations) { auto i = r.find_cell(column.id); BOOST_REQUIRE(i); auto cell = i->as_collection_mutation(); - auto muts = my_set_type->deserialize_mutation_form(cell); + auto cell_b = cell.data.linearize(); + auto muts = my_set_type->deserialize_mutation_form(cell_b); BOOST_REQUIRE(muts.cells.size() == 3); // FIXME: more strict tests }); @@ -280,19 +287,19 @@ SEASTAR_TEST_CASE(test_list_mutations) { auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); auto make_key = [] { return timeuuid_type->decompose(utils::UUID_gen::get_time_UUID()); }; - auto mmut1 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 101)); + auto mmut1 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 101)); mutation m1(s, key); m1.set_static_cell(column, my_list_type->serialize_mutation_form(mmut1)); mt->apply(m1); - auto mmut2 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 102)); + auto mmut2 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 102)); mutation m2(s, key); m2.set_static_cell(column, my_list_type->serialize_mutation_form(mmut2)); mt->apply(m2); - auto mmut3 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 103)); + auto mmut3 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 103)); mutation m3(s, key); m3.set_static_cell(column, my_list_type->serialize_mutation_form(mmut3)); mt->apply(m3); - auto mmut2o = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 102)); + auto mmut2o = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 102)); mutation m2o(s, key); m2o.set_static_cell(column, my_list_type->serialize_mutation_form(mmut2o)); mt->apply(m2o); @@ -302,7 +309,8 @@ SEASTAR_TEST_CASE(test_list_mutations) { auto i = r.find_cell(column.id); BOOST_REQUIRE(i); auto cell = i->as_collection_mutation(); - auto muts = my_list_type->deserialize_mutation_form(cell); + auto cell_b = cell.data.linearize(); + auto muts = my_list_type->deserialize_mutation_form(cell_b); BOOST_REQUIRE(muts.cells.size() == 4); // FIXME: more strict tests }); @@ -347,7 +355,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { BOOST_REQUIRE(i); auto cell = i->as_atomic_cell(r1_col); BOOST_REQUIRE(cell.is_live()); - BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(r1))); + BOOST_REQUIRE(int32_type->equal(cell.value().linearize(), int32_type->decompose(r1))); } }; verify_row(1001, 2001); @@ -485,7 +493,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { auto c1 = value_cast(int32_type->deserialize(re.key().explode(*s)[0])); auto cell = re.row().cells().find_cell(r1_col.id); if (cell) { - result[p1][c1] = value_cast(int32_type->deserialize(cell->as_atomic_cell(r1_col).value())); + result[p1][c1] = value_cast(int32_type->deserialize(cell->as_atomic_cell(r1_col).value().linearize())); } } return true; @@ -899,7 +907,8 @@ SEASTAR_TEST_CASE(test_mutation_diff) { BOOST_REQUIRE(m2_1.find_row(*s, ckey2)); BOOST_REQUIRE(m2_1.find_row(*s, ckey2)->find_cell(2)); auto cmv = m2_1.find_row(*s, ckey2)->find_cell(2)->as_collection_mutation(); - auto cm = my_set_type->deserialize_mutation_form(cmv); + auto cmv_b = cmv.data.linearize(); + auto cm = my_set_type->deserialize_mutation_form(cmv_b); BOOST_REQUIRE(cm.cells.size() == 1); BOOST_REQUIRE(cm.cells.front().first == int32_type->decompose(3)); @@ -916,7 +925,8 @@ SEASTAR_TEST_CASE(test_mutation_diff) { BOOST_REQUIRE(!m1_2.find_row(*s, ckey2)->find_cell(0)); BOOST_REQUIRE(!m1_2.find_row(*s, ckey2)->find_cell(1)); cmv = m1_2.find_row(*s, ckey2)->find_cell(2)->as_collection_mutation(); - cm = my_set_type->deserialize_mutation_form(cmv); + cmv_b = cmv.data.linearize(); + cm = my_set_type->deserialize_mutation_form(cmv_b); BOOST_REQUIRE(cm.cells.size() == 1); BOOST_REQUIRE(cm.cells.front().first == int32_type->decompose(2)); @@ -962,7 +972,7 @@ SEASTAR_TEST_CASE(test_large_blobs) { BOOST_REQUIRE(i); auto cell = i->as_atomic_cell(s1_col); BOOST_REQUIRE(cell.is_live()); - BOOST_REQUIRE(bytes_type->equal(cell.value(), bytes_type->decompose(data_value(blob1)))); + BOOST_REQUIRE(bytes_type->equal(cell.value().linearize(), bytes_type->decompose(data_value(blob1)))); // Stress managed_bytes::linearize and scatter by merging a value into the cell mutation m2(s, key); @@ -975,7 +985,7 @@ SEASTAR_TEST_CASE(test_large_blobs) { BOOST_REQUIRE(i2); auto cell2 = i2->as_atomic_cell(s1_col); BOOST_REQUIRE(cell2.is_live()); - BOOST_REQUIRE(bytes_type->equal(cell2.value(), bytes_type->decompose(data_value(blob2)))); + BOOST_REQUIRE(bytes_type->equal(cell2.value().linearize(), bytes_type->decompose(data_value(blob2)))); }); } diff --git a/tests/simple_schema.hh b/tests/simple_schema.hh index 7bbf08b657..698357fa13 100644 --- a/tests/simple_schema.hh +++ b/tests/simple_schema.hh @@ -114,7 +114,7 @@ public: if (!ac.is_live()) { throw std::runtime_error("cell is dead"); } - return std::make_pair(value_cast(utf8_type->deserialize(ac.value())), ac.timestamp()); + return std::make_pair(value_cast(utf8_type->deserialize(ac.value().linearize())), ac.timestamp()); } mutation_fragment make_row(const clustering_key& key, sstring v) { diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index 4cefa3e0c3..94ae3b8dd0 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -823,7 +823,8 @@ SEASTAR_TEST_CASE(datafile_generation_11) { auto cell = r->find_cell(set_col.id); BOOST_REQUIRE(cell); auto t = static_pointer_cast(set_col.type); - return t->deserialize_mutation_form(cell->as_collection_mutation()); + auto bv = cell->as_collection_mutation().data.linearize(); + return t->deserialize_mutation_form(bv).materialize(*t); }; auto sst = make_sstable(s, tmpdir_path, 11, la, big); @@ -832,7 +833,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) { return do_with(make_dkey(s, "key1"), [sstp, s, verifier, tomb, &static_set_col] (auto& key) { auto rd = make_lw_shared(sstp->read_row_flat(s, key)); return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, verifier, tomb, &static_set_col, rd] (auto mutation) { - auto verify_set = [&tomb] (auto m) { + auto verify_set = [&tomb] (const collection_type_impl::mutation& m) { BOOST_REQUIRE(bool(m.tomb) == true); BOOST_REQUIRE(m.tomb == tomb); BOOST_REQUIRE(m.cells.size() == 3); @@ -849,7 +850,8 @@ SEASTAR_TEST_CASE(datafile_generation_11) { // The static set auto t = static_pointer_cast(static_set_col.type); - auto mut = t->deserialize_mutation_form(scol->as_collection_mutation()); + auto bv = scol->as_collection_mutation().data.linearize(); + auto mut = t->deserialize_mutation_form(bv).materialize(*t); verify_set(mut); // The clustered set @@ -2875,7 +2877,7 @@ SEASTAR_TEST_CASE(test_counter_read) { BOOST_REQUIRE(mfopt->is_clustering_row()); const clustering_row* cr = &mfopt->as_clustering_row(); cr->cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) { - counter_cell_view ccv { c.as_atomic_cell(s->regular_column_at(id)) }; + counter_cell_view::with_linearized(c.as_atomic_cell(s->regular_column_at(id)), [&] (counter_cell_view ccv) { auto& col = s->column_at(column_kind::regular_column, id); if (col.name_as_text() == "c1") { BOOST_REQUIRE_EQUAL(ccv.total_value(), 13); @@ -2896,6 +2898,7 @@ SEASTAR_TEST_CASE(test_counter_read) { } else { BOOST_FAIL(sprint("Unexpected column \'%s\'", col.name_as_text())); } + }); }); mfopt = reader().get0(); @@ -4373,11 +4376,12 @@ SEASTAR_TEST_CASE(test_wrong_counter_shard_order) { size_t n = 0; row.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& ac_o_c) { auto acv = ac_o_c.as_atomic_cell(s->regular_column_at(id)); - auto ccv = counter_cell_view(acv); + counter_cell_view::with_linearized(acv, [&] (counter_cell_view ccv) { counter_shard_view::less_compare_by_id cmp; BOOST_REQUIRE_MESSAGE(boost::algorithm::is_sorted(ccv.shards(), cmp), ccv << " is expected to be sorted"); BOOST_REQUIRE_EQUAL(ccv.total_value(), expected_value); n++; + }); }); BOOST_REQUIRE_EQUAL(n, 5); }; diff --git a/tests/sstable_test.hh b/tests/sstable_test.hh index 0d18f631cd..cb67463ce3 100644 --- a/tests/sstable_test.hh +++ b/tests/sstable_test.hh @@ -559,7 +559,9 @@ inline void match(const row& row, const schema& s, bytes col, const data_value& } auto expected = cdef->type->decompose(value); - BOOST_REQUIRE(c.value() == expected); + auto val = c.value().linearize(); + assert(val == expected); + BOOST_REQUIRE(c.value().linearize() == expected); if (timestamp) { BOOST_REQUIRE(c.timestamp() == timestamp); } @@ -592,9 +594,11 @@ match_collection(const row& row, const schema& s, bytes col, const tombstone& t) BOOST_CHECK_NO_THROW(row.cell_at(cdef->id)); auto c = row.cell_at(cdef->id).as_collection_mutation(); auto ctype = static_pointer_cast(cdef->type); - auto&& mut = ctype->deserialize_mutation_form(c); + return c.data.with_linearized([&] (bytes_view c_bv) { + auto&& mut = ctype->deserialize_mutation_form(c_bv); BOOST_REQUIRE(mut.tomb == t); return mut.materialize(*ctype); + }); } template @@ -612,7 +616,7 @@ inline void match_collection_element(const std::pair& elemen // the schema for the set type, and is enough for the purposes of this // test. if (expected_serialized_value) { - BOOST_REQUIRE(element.second.value() == *expected_serialized_value); + BOOST_REQUIRE(element.second.value().linearize() == *expected_serialized_value); } } diff --git a/types.cc b/types.cc index bfc7594002..4c951de5a1 100644 --- a/types.cc +++ b/types.cc @@ -2098,7 +2098,9 @@ collection_type_impl::as_cql3_type() const { bytes collection_type_impl::to_value(collection_mutation_view mut, cql_serialization_format sf) const { - return to_value(deserialize_mutation_form(mut), sf); + return mut.data.with_linearized([&] (bytes_view bv) { + return to_value(deserialize_mutation_form(bv), sf); + }); } collection_type_impl::mutation @@ -2422,12 +2424,19 @@ map_type_impl::serialized_values(std::vector cells) const { bytes map_type_impl::to_value(mutation_view mut, cql_serialization_format sf) const { + std::vector linearized; std::vector tmp; tmp.reserve(mut.cells.size() * 2); for (auto&& e : mut.cells) { if (e.second.is_live(mut.tomb, false)) { tmp.emplace_back(e.first); - tmp.emplace_back(e.second.value()); + auto value_view = e.second.value(); + if (value_view.is_fragmented()) { + auto& v = linearized.emplace_back(value_view.linearize()); + tmp.emplace_back(v); + } else { + tmp.emplace_back(value_view.first_fragment()); + } } } return pack(tmp.begin(), tmp.end(), tmp.size() / 2, sf); @@ -2477,8 +2486,7 @@ bool map_type_impl::references_duration() const { return _keys->references_duration() || _values->references_duration(); } -auto collection_type_impl::deserialize_mutation_form(collection_mutation_view cm) const -> mutation_view { - auto&& in = cm.data; +auto collection_type_impl::deserialize_mutation_form(bytes_view in) const -> mutation_view { mutation_view ret; auto has_tomb = read_simple(in); if (has_tomb) { @@ -2502,13 +2510,14 @@ auto collection_type_impl::deserialize_mutation_form(collection_mutation_view cm } bool collection_type_impl::is_empty(collection_mutation_view cm) const { - auto&& in = cm.data; + return cm.data.with_linearized([&] (bytes_view in) { // FIXME: we can guarantee that this is in the first fragment auto has_tomb = read_simple(in); return !has_tomb && read_simple(in) == 0; + }); } bool collection_type_impl::is_any_live(collection_mutation_view cm, tombstone tomb, gc_clock::time_point now) const { - auto&& in = cm.data; + return cm.data.with_linearized([&] (bytes_view in) { auto has_tomb = read_simple(in); if (has_tomb) { auto ts = read_simple(in); @@ -2526,10 +2535,11 @@ bool collection_type_impl::is_any_live(collection_mutation_view cm, tombstone to } } return false; + }); } api::timestamp_type collection_type_impl::last_update(collection_mutation_view cm) const { - auto&& in = cm.data; + return cm.data.with_linearized([&] (bytes_view in) { api::timestamp_type max = api::missing_timestamp; auto has_tomb = read_simple(in); if (has_tomb) { @@ -2545,11 +2555,13 @@ api::timestamp_type collection_type_impl::last_update(collection_mutation_view c max = std::max(value.timestamp(), max); } return max; + }); } template collection_mutation do_serialize_mutation_form( + const collection_type_impl& ctype, const tombstone& tomb, boost::iterator_range cells) { auto element_size = [] (size_t c, auto&& e) -> size_t { @@ -2577,9 +2589,10 @@ do_serialize_mutation_form( auto&& k = kv.first; auto&& v = kv.second; writeb(k); + writeb(v.serialize()); } - return collection_mutation{std::move(ret)}; + return collection_mutation(std::move(ret)); } bool collection_type_impl::mutation::compact_and_expire(row_tombstone base_tomb, gc_clock::time_point query_time, @@ -2619,26 +2632,28 @@ bool collection_type_impl::mutation::compact_and_expire(row_tombstone base_tomb, } collection_mutation -collection_type_impl::serialize_mutation_form(const mutation& mut) { - return do_serialize_mutation_form(mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end())); +collection_type_impl::serialize_mutation_form(const mutation& mut) const { + return do_serialize_mutation_form(*this, mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end())); } collection_mutation -collection_type_impl::serialize_mutation_form(mutation_view mut) { - return do_serialize_mutation_form(mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end())); +collection_type_impl::serialize_mutation_form(mutation_view mut) const { + return do_serialize_mutation_form(*this, mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end())); } collection_mutation -collection_type_impl::serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now) { - return do_serialize_mutation_form(mut.tomb, mut.cells | boost::adaptors::filtered([t = mut.tomb, now] (auto&& e) { +collection_type_impl::serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now) const { + return do_serialize_mutation_form(*this, mut.tomb, mut.cells | boost::adaptors::filtered([t = mut.tomb, now] (auto&& e) { return e.second.is_live(t, now, false); })); } collection_mutation collection_type_impl::merge(collection_mutation_view a, collection_mutation_view b) const { - auto aa = deserialize_mutation_form(a); - auto bb = deserialize_mutation_form(b); + return a.data.with_linearized([&] (bytes_view a_in) { + return b.data.with_linearized([&] (bytes_view b_in) { + auto aa = deserialize_mutation_form(a_in); + auto bb = deserialize_mutation_form(b_in); mutation_view merged; merged.cells.reserve(aa.cells.size() + bb.cells.size()); using element_type = std::pair; @@ -2672,13 +2687,17 @@ collection_type_impl::merge(collection_mutation_view a, collection_mutation_view merge); merged.tomb = std::max(aa.tomb, bb.tomb); return serialize_mutation_form(merged); + }); + }); } collection_mutation collection_type_impl::difference(collection_mutation_view a, collection_mutation_view b) const { - auto aa = deserialize_mutation_form(a); - auto bb = deserialize_mutation_form(b); + return a.data.with_linearized([&] (bytes_view a_in) { + return b.data.with_linearized([&] (bytes_view b_in) { + auto aa = deserialize_mutation_form(a_in); + auto bb = deserialize_mutation_form(b_in); mutation_view diff; diff.cells.reserve(std::max(aa.cells.size(), bb.cells.size())); auto key_type = name_comparator(); @@ -2698,6 +2717,8 @@ collection_type_impl::difference(collection_mutation_view a, collection_mutation diff.tomb = aa.tomb; } return serialize_mutation_form(diff); + }); + }); } bytes_opt @@ -3166,11 +3187,18 @@ list_type_impl::serialized_values(std::vector cells) const { bytes list_type_impl::to_value(mutation_view mut, cql_serialization_format sf) const { + std::vector linearized; std::vector tmp; tmp.reserve(mut.cells.size()); for (auto&& e : mut.cells) { if (e.second.is_live(mut.tomb, false)) { - tmp.emplace_back(e.second.value()); + auto value_view = e.second.value(); + if (value_view.is_fragmented()) { + auto& v = linearized.emplace_back(value_view.linearize()); + tmp.emplace_back(v); + } else { + tmp.emplace_back(value_view.first_fragment()); + } } } return pack(tmp.begin(), tmp.end(), tmp.size(), sf); diff --git a/types.hh b/types.hh index 3d516a6384..45c78561be 100644 --- a/types.hh +++ b/types.hh @@ -818,26 +818,29 @@ public: virtual shared_ptr as_cql3_type() const override; template static bytes pack(BytesViewIterator start, BytesViewIterator finish, int elements, cql_serialization_format sf); - mutation_view deserialize_mutation_form(collection_mutation_view in) const; + // requires linearized collection_mutation_view, lifetime + mutation_view deserialize_mutation_form(bytes_view in) const; bool is_empty(collection_mutation_view in) const; bool is_any_live(collection_mutation_view in, tombstone tomb = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const; api::timestamp_type last_update(collection_mutation_view in) const; virtual bytes to_value(mutation_view mut, cql_serialization_format sf) const = 0; bytes to_value(collection_mutation_view mut, cql_serialization_format sf) const; // FIXME: use iterators? - static collection_mutation serialize_mutation_form(const mutation& mut); - static collection_mutation serialize_mutation_form(mutation_view mut); - static collection_mutation serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now); + collection_mutation serialize_mutation_form(const mutation& mut) const; + collection_mutation serialize_mutation_form(mutation_view mut) const; + collection_mutation serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now) const; collection_mutation merge(collection_mutation_view a, collection_mutation_view b) const; collection_mutation difference(collection_mutation_view a, collection_mutation_view b) const; // Calls Func(atomic_cell_view) for each cell in this collection. // noexcept if Func doesn't throw. template void for_each_cell(collection_mutation_view c, Func&& func) const { - auto m_view = deserialize_mutation_form(std::move(c)); + c.data.with_linearized([&] (bytes_view c_bv) { + auto m_view = deserialize_mutation_form(c_bv); for (auto&& c : m_view.cells) { func(std::move(c.second)); } + }); } virtual void serialize(const void* value, bytes::iterator& out, cql_serialization_format sf) const = 0; virtual data_value deserialize(bytes_view v, cql_serialization_format sf) const = 0;