From aa25f0844fe76a2c2f71469d5ed3aa3e14ff666d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Thu, 5 Oct 2017 08:19:35 +0100 Subject: [PATCH] atomic_cell: introduce fragmented buffer value interface As a prepratation for the switch to the new cell representation this patch changes the type returned by atomic_cell_view::value() to one that requires explicit linearisation of the cell value. Even though the value is still implicitly linearised (and only when managed by the LSA) the new interface is the same as the target one so that no more changes to its users will be needed. --- atomic_cell.hh | 126 ++++++++++++++++-- atomic_cell_hash.hh | 22 ++- atomic_cell_or_collection.hh | 2 +- converting_mutation_partition_applier.hh | 4 +- counters.cc | 37 +++-- counters.hh | 68 ++++++---- .../restrictions/single_column_restriction.hh | 2 +- cql3/restrictions/statement_restrictions.cc | 56 +++++--- cql3/statements/batch_statement.cc | 8 +- database.cc | 2 +- db/view/view.cc | 10 +- memtable.hh | 4 +- mutation_partition.cc | 10 +- mutation_partition_serializer.cc | 11 +- mutation_partition_view.cc | 2 +- sstables/disk_types.hh | 6 + sstables/sstables.cc | 41 +++++- tests/counter_test.cc | 57 +++++--- tests/cql_test_env.cc | 5 +- tests/flat_mutation_reader_assertions.hh | 4 +- tests/mutation_test.cc | 46 ++++--- tests/simple_schema.hh | 2 +- tests/sstable_datafile_test.cc | 14 +- tests/sstable_test.hh | 10 +- types.cc | 66 ++++++--- types.hh | 13 +- 26 files changed, 461 insertions(+), 167 deletions(-) diff --git a/atomic_cell.hh b/atomic_cell.hh index 1d1ee698e6..dfe12c3acd 100644 --- a/atomic_cell.hh +++ b/atomic_cell.hh @@ -48,6 +48,108 @@ T get_field(const bytes_view& v, unsigned offset) { class atomic_cell_or_collection; +template +class basic_atomic_cell_value_view { +public: + using fragment_type = std::conditional_t; +private: + fragment_type _value; +public: + explicit basic_atomic_cell_value_view(fragment_type value) : _value(value) { } + + class iterator { + fragment_type _view; + public: + using iterator_category = std::forward_iterator_tag; + using value_type = fragment_type; + using pointer = const fragment_type*; + using reference = const fragment_type&; + using difference_type = std::ptrdiff_t; + + explicit iterator(fragment_type fv) noexcept + : _view(fv) { } + + const fragment_type& operator*() const { + return _view; + } + const fragment_type* operator->() const { + return &_view; + } + iterator& operator++() { + _view = { }; + return *this; + } + iterator operator++(int) { + auto it = *this; + operator++(); + return it; + } + + bool operator==(const iterator& other) const { + return _view.data() == other._view.data(); + } + bool operator!=(const iterator& other) const { + return !(*this == other); + } + }; + + using const_iterator = iterator; + + auto begin() const { + return iterator(_value); + } + auto end() const { + return iterator(fragment_type()); + } + + bool operator==(const basic_atomic_cell_value_view& other) const noexcept { + return _value == other._value; + } + bool operator==(bytes_view bv) const noexcept { + return _value == bv; + } + + size_t size_bytes() const noexcept { + return _value.size(); + } + + bool empty() const noexcept { + return _value.empty(); + } + + bool is_fragmented() const noexcept { + return false; + } + + fragment_type first_fragment() const noexcept { + return _value; + } + + bytes linearize() const { + return bytes(_value.begin(), _value.end()); + } + + template + decltype(auto) with_linearized(Function&& fn) const { + return fn(_value); + } + + friend std::ostream& operator<<(std::ostream& os, const basic_atomic_cell_value_view& vv) { + return os << vv.first_fragment(); + } + +}; + +using atomic_cell_value_view = basic_atomic_cell_value_view; +using atomic_cell_value_mutable_view = basic_atomic_cell_value_view; + +inline int compare_unsigned(atomic_cell_value_view a, atomic_cell_value_view b) +{ + assert(!a.is_fragmented() && !b.is_fragmented()); + return compare_unsigned(a.first_fragment(), b.first_fragment()); +} + /* * Represents atomic cell layout. Works on serialized form. * @@ -102,11 +204,11 @@ private: return cell; } public: - static bytes_view value(bytes_view cell) { - return do_get_value(cell); + static atomic_cell_value_view value(bytes_view cell) { + return atomic_cell_value_view(do_get_value(cell)); } - static bytes_mutable_view value(bytes_mutable_view cell) { - return do_get_value(cell); + static atomic_cell_value_mutable_view value(bytes_mutable_view cell) { + return atomic_cell_value_mutable_view(do_get_value(cell)); } // Can be called on live counter update cells only static int64_t counter_update_value(bytes_view cell) { @@ -232,6 +334,9 @@ public: auto value() const { return atomic_cell_type::value(_data); } + bool is_value_fragmented() const { + return false; + } // Can be called on live counter update cells only int64_t counter_update_value() const { return atomic_cell_type::counter_update_value(_data); @@ -274,6 +379,10 @@ public: friend class atomic_cell; }; +template +using basic_atomic_cell_view = std::conditional_t; + class atomic_cell_ref final : public atomic_cell_base { public: atomic_cell_ref(managed_bytes& buf) : atomic_cell_base(buf) {} @@ -345,19 +454,18 @@ public: class collection_mutation_view { public: - bytes_view data; - bytes_view serialize() const { return data; } - static collection_mutation_view from_bytes(bytes_view v) { return { v }; } + // FIXME: encapsulate properly + atomic_cell_value_view data; }; inline collection_mutation::collection_mutation(collection_mutation_view v) - : data(v.data) { + : data(v.data.linearize()) { } inline collection_mutation::operator collection_mutation_view() const { - return { data }; + return { atomic_cell_value_view(bytes_view(data)) }; } class column_definition; diff --git a/atomic_cell_hash.hh b/atomic_cell_hash.hh index c82730c24a..87dc9356cb 100644 --- a/atomic_cell_hash.hh +++ b/atomic_cell_hash.hh @@ -33,13 +33,15 @@ template<> struct appending_hash { template void operator()(Hasher& h, collection_mutation_view cell, const column_definition& cdef) const { + cell.data.with_linearized([&] (bytes_view cell_bv) { auto ctype = static_pointer_cast(cdef.type); - auto m_view = ctype->deserialize_mutation_form(cell); + auto m_view = ctype->deserialize_mutation_form(cell_bv); ::feed_hash(h, m_view.tomb); for (auto&& key_and_value : m_view.cells) { ::feed_hash(h, key_and_value.first); ::feed_hash(h, key_and_value.second, cdef); } + }); } }; @@ -51,7 +53,9 @@ struct appending_hash { feed_hash(h, cell.timestamp()); if (cell.is_live()) { if (cdef.is_counter()) { - ::feed_hash(h, counter_cell_view(cell)); + counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) { + ::feed_hash(h, ccv); + }); return; } if (cell.is_live_and_has_ttl()) { @@ -91,4 +95,16 @@ struct appending_hash { feed_hash(h, c.as_collection_mutation(), cdef); } } -}; \ No newline at end of file +}; + +template<> +struct appending_hash { + template + void operator()(Hasher& h, atomic_cell_value_view v) const { + using boost::range::for_each; + feed_hash(h, v.size_bytes()); + for_each(v, [&h] (auto&& chk) { + h.update(reinterpret_cast(chk.data()), chk.size()); + }); + } +}; diff --git a/atomic_cell_or_collection.hh b/atomic_cell_or_collection.hh index 7e0b1effe5..b3b69b123d 100644 --- a/atomic_cell_or_collection.hh +++ b/atomic_cell_or_collection.hh @@ -57,7 +57,7 @@ public: return std::move(data.data); } collection_mutation_view as_collection_mutation() const { - return collection_mutation_view{_data}; + return collection_mutation_view{atomic_cell_value_view(bytes_view(_data))}; } bytes_view serialize() const { return _data; diff --git a/converting_mutation_partition_applier.hh b/converting_mutation_partition_applier.hh index 1a03ade7c7..e5826582a1 100644 --- a/converting_mutation_partition_applier.hh +++ b/converting_mutation_partition_applier.hh @@ -47,8 +47,9 @@ private: if (!is_compatible(new_def, old_type, kind)) { return; } + cell.data.with_linearized([&] (bytes_view cell_bv) { auto&& ctype = static_pointer_cast(old_type); - auto old_view = ctype->deserialize_mutation_form(cell); + auto old_view = ctype->deserialize_mutation_form(cell_bv); collection_type_impl::mutation_view new_view; if (old_view.tomb.timestamp > new_def.dropped_at()) { @@ -60,6 +61,7 @@ private: } } dst.apply(new_def, ctype->serialize_mutation_form(std::move(new_view))); + }); } public: converting_mutation_partition_applier( diff --git a/counters.cc b/counters.cc index 0883c78ea9..77d7292d34 100644 --- a/counters.cc +++ b/counters.cc @@ -78,10 +78,10 @@ std::vector counter_cell_view::shards_compatible_with_1_7_4() con return sorted_shards; } -static bool apply_in_place(const column_definition& cdef, atomic_cell_or_collection& dst, atomic_cell_or_collection& src) +static bool apply_in_place(const column_definition& cdef, atomic_cell_mutable_view dst, atomic_cell_mutable_view src) { - auto dst_ccmv = counter_cell_mutable_view(dst.as_mutable_atomic_cell(cdef)); - auto src_ccmv = counter_cell_mutable_view(src.as_mutable_atomic_cell(cdef)); + auto dst_ccmv = counter_cell_mutable_view(dst); + auto src_ccmv = counter_cell_mutable_view(src); auto dst_shards = dst_ccmv.shards(); auto src_shards = src_ccmv.shards(); @@ -143,16 +143,22 @@ void counter_cell_view::apply(const column_definition& cdef, atomic_cell_or_coll assert(!dst_ac.is_counter_update()); assert(!src_ac.is_counter_update()); + with_linearized(dst_ac, [&] (counter_cell_view dst_ccv) { + with_linearized(src_ac, [&] (counter_cell_view src_ccv) { - if (counter_cell_view(dst_ac).shard_count() >= counter_cell_view(src_ac).shard_count() + if (dst_ccv.shard_count() >= src_ccv.shard_count() && dst.can_use_mutable_view() && src.can_use_mutable_view()) { - if (apply_in_place(cdef, dst, src)) { - return; + auto dst_amc = dst.as_mutable_atomic_cell(cdef); + auto src_amc = src.as_mutable_atomic_cell(cdef); + if (!dst_amc.is_value_fragmented() && !src_amc.is_value_fragmented()) { + if (apply_in_place(cdef, dst_amc, src_amc)) { + return; + } } } - auto dst_shards = counter_cell_view(dst_ac).shards(); - auto src_shards = counter_cell_view(src_ac).shards(); + auto dst_shards = dst_ccv.shards(); + auto src_shards = src_ccv.shards(); counter_cell_builder result; combine(dst_shards.begin(), dst_shards.end(), src_shards.begin(), src_shards.end(), @@ -161,7 +167,9 @@ void counter_cell_view::apply(const column_definition& cdef, atomic_cell_or_coll }); auto cell = result.build(std::max(dst_ac.timestamp(), src_ac.timestamp())); - src = std::exchange(dst, atomic_cell_or_collection(*counter_type, cell)); + src = std::exchange(dst, atomic_cell_or_collection(std::move(cell))); + }); + }); } stdx::optional counter_cell_view::difference(atomic_cell_view a, atomic_cell_view b) @@ -176,8 +184,10 @@ stdx::optional counter_cell_view::difference(atomic_cell_view a, at return { }; } - auto a_shards = counter_cell_view(a).shards(); - auto b_shards = counter_cell_view(b).shards(); + return with_linearized(a, [&] (counter_cell_view a_ccv) { + return with_linearized(b, [&] (counter_cell_view b_ccv) { + auto a_shards = a_ccv.shards(); + auto b_shards = b_ccv.shards(); auto a_it = a_shards.begin(); auto a_end = a_shards.end(); @@ -202,6 +212,8 @@ stdx::optional counter_cell_view::difference(atomic_cell_view a, at diff = atomic_cell::make_live(*counter_type, a.timestamp(), bytes_view()); } return diff; + }); + }); } @@ -239,12 +251,13 @@ void transform_counter_updates_to_shards(mutation& m, const mutation* current_st if (!acv.is_live()) { return; // continue -- we are in lambda } - counter_cell_view ccv(acv); + counter_cell_view::with_linearized(acv, [&] (counter_cell_view ccv) { auto cs = ccv.local_shard(); if (!cs) { return; // continue } shards.emplace_back(std::make_pair(id, counter_shard(*cs))); + }); }); transformee.for_each_cell([&] (column_id id, atomic_cell_or_collection& ac_o_c) { diff --git a/counters.hh b/counters.hh index ab66c4e92e..97cd0f1565 100644 --- a/counters.hh +++ b/counters.hh @@ -79,7 +79,7 @@ static_assert(std::is_pod::value, "counter_id should be a POD type") std::ostream& operator<<(std::ostream& os, const counter_id& id); -template +template class basic_counter_shard_view { enum class offset : unsigned { id = 0u, @@ -88,7 +88,8 @@ class basic_counter_shard_view { total_size = unsigned(logical_clock) + sizeof(int64_t), }; private: - typename View::pointer _base; + using pointer_type = std::conditional_t; + pointer_type _base; private: template T read(offset off) const { @@ -100,7 +101,7 @@ public: static constexpr auto size = size_t(offset::total_size); public: basic_counter_shard_view() = default; - explicit basic_counter_shard_view(typename View::pointer ptr) noexcept + explicit basic_counter_shard_view(pointer_type ptr) noexcept : _base(ptr) { } counter_id id() const { return read(offset::id); } @@ -111,7 +112,7 @@ public: static constexpr size_t off = size_t(offset::value); static constexpr size_t size = size_t(offset::total_size) - off; - typename View::value_type tmp[size]; + signed char tmp[size]; std::copy_n(_base + off, size, tmp); std::copy_n(other._base + off, size, _base + off); std::copy_n(tmp, size, other._base + off); @@ -138,7 +139,7 @@ public: }; }; -using counter_shard_view = basic_counter_shard_view; +using counter_shard_view = basic_counter_shard_view; std::ostream& operator<<(std::ostream& os, counter_shard_view csv); @@ -287,28 +288,32 @@ public: // := // := // := * -template +template class basic_counter_cell_view { protected: - atomic_cell_base _cell; + using linearized_value_view = std::conditional_t; + using pointer_type = typename linearized_value_view::pointer; + basic_atomic_cell_view _cell; + linearized_value_view _value; private: - class shard_iterator : public std::iterator> { - typename View::pointer _current; - basic_counter_shard_view _current_view; + class shard_iterator : public std::iterator> { + pointer_type _current; + basic_counter_shard_view _current_view; public: shard_iterator() = default; - shard_iterator(typename View::pointer ptr) noexcept + shard_iterator(pointer_type ptr) noexcept : _current(ptr), _current_view(ptr) { } - basic_counter_shard_view& operator*() noexcept { + basic_counter_shard_view& operator*() noexcept { return _current_view; } - basic_counter_shard_view* operator->() noexcept { + basic_counter_shard_view* operator->() noexcept { return &_current_view; } shard_iterator& operator++() noexcept { _current += counter_shard_view::size; - _current_view = basic_counter_shard_view(_current); + _current_view = basic_counter_shard_view(_current); return *this; } shard_iterator operator++(int) noexcept { @@ -318,7 +323,7 @@ private: } shard_iterator& operator--() noexcept { _current -= counter_shard_view::size; - _current_view = basic_counter_shard_view(_current); + _current_view = basic_counter_shard_view(_current); return *this; } shard_iterator operator--(int) noexcept { @@ -335,22 +340,23 @@ private: }; public: boost::iterator_range shards() const { - auto bv = _cell.value(); - auto begin = shard_iterator(bv.data()); - auto end = shard_iterator(bv.data() + bv.size()); + auto begin = shard_iterator(_value.data()); + auto end = shard_iterator(_value.data() + _value.size()); return boost::make_iterator_range(begin, end); } size_t shard_count() const { - return _cell.value().size() / counter_shard_view::size; + return _cell.value().size_bytes() / counter_shard_view::size; } -public: +protected: // ac must be a live counter cell - explicit basic_counter_cell_view(atomic_cell_base ac) noexcept : _cell(ac) { + explicit basic_counter_cell_view(basic_atomic_cell_view ac, linearized_value_view vv) noexcept + : _cell(ac), _value(vv) + { assert(_cell.is_live()); assert(!_cell.is_counter_update()); } - +public: api::timestamp_type timestamp() const { return _cell.timestamp(); } static data_type total_value_type() { return long_type; } @@ -381,9 +387,17 @@ public: } }; -struct counter_cell_view : basic_counter_cell_view { +struct counter_cell_view : basic_counter_cell_view { using basic_counter_cell_view::basic_counter_cell_view; + template + static decltype(auto) with_linearized(basic_atomic_cell_view ac, Function&& fn) { + return ac.value().with_linearized([&] (bytes_view value_view) { + counter_cell_view ccv(ac, value_view); + return fn(ccv); + }); + } + // Returns counter shards in an order that is compatible with Scylla 1.7.4. std::vector shards_compatible_with_1_7_4() const; @@ -397,9 +411,15 @@ struct counter_cell_view : basic_counter_cell_view { friend std::ostream& operator<<(std::ostream& os, counter_cell_view ccv); }; -struct counter_cell_mutable_view : basic_counter_cell_view { +struct counter_cell_mutable_view : basic_counter_cell_view { using basic_counter_cell_view::basic_counter_cell_view; + explicit counter_cell_mutable_view(atomic_cell_mutable_view ac) noexcept + : basic_counter_cell_view(ac, ac.value().first_fragment()) + { + assert(!ac.value().is_fragmented()); + } + void set_timestamp(api::timestamp_type ts) { _cell.set_timestamp(ts); } }; diff --git a/cql3/restrictions/single_column_restriction.hh b/cql3/restrictions/single_column_restriction.hh index 4e47ed3e59..72770f4deb 100644 --- a/cql3/restrictions/single_column_restriction.hh +++ b/cql3/restrictions/single_column_restriction.hh @@ -113,7 +113,7 @@ public: class contains; protected: - bytes_view_opt get_value(const schema& schema, + std::optional get_value(const schema& schema, const partition_key& key, const clustering_key_prefix& ckey, const row& cells, diff --git a/cql3/restrictions/statement_restrictions.cc b/cql3/restrictions/statement_restrictions.cc index c76e955c84..ca947834b8 100644 --- a/cql3/restrictions/statement_restrictions.cc +++ b/cql3/restrictions/statement_restrictions.cc @@ -430,7 +430,7 @@ void statement_restrictions::validate_secondary_index_selections(bool selects_on } } -static bytes_view_opt do_get_value(const schema& schema, +static std::optional do_get_value(const schema& schema, const column_definition& cdef, const partition_key& key, const clustering_key_prefix& ckey, @@ -438,21 +438,21 @@ static bytes_view_opt do_get_value(const schema& schema, gc_clock::time_point now) { switch(cdef.kind) { case column_kind::partition_key: - return key.get_component(schema, cdef.component_index()); + return atomic_cell_value_view(key.get_component(schema, cdef.component_index())); case column_kind::clustering_key: - return ckey.get_component(schema, cdef.component_index()); + return atomic_cell_value_view(ckey.get_component(schema, cdef.component_index())); default: auto cell = cells.find_cell(cdef.id); if (!cell) { - return stdx::nullopt; + return std::nullopt; } assert(cdef.is_atomic()); auto c = cell->as_atomic_cell(cdef); - return c.is_dead(now) ? stdx::nullopt : bytes_view_opt(c.value()); + return c.is_dead(now) ? std::nullopt : std::optional(c.value()); } } -bytes_view_opt single_column_restriction::get_value(const schema& schema, +std::optional single_column_restriction::get_value(const schema& schema, const partition_key& key, const clustering_key_prefix& ckey, const row& cells, @@ -472,7 +472,12 @@ bool single_column_restriction::EQ::is_satisfied_by(const schema& schema, auto operand = value(options); if (operand) { auto cell_value = get_value(schema, key, ckey, cells, now); - return cell_value && _column_def.type->compare(*operand, *cell_value) == 0; + if (!cell_value) { + return false; + } + return cell_value->with_linearized([&] (bytes_view cell_value_bv) { + return _column_def.type->compare(*operand, cell_value_bv) == 0; + }); } return false; } @@ -491,9 +496,11 @@ bool single_column_restriction::IN::is_satisfied_by(const schema& schema, return false; } auto operands = values(options); + return cell_value->with_linearized([&] (bytes_view cell_value_bv) { return std::any_of(operands.begin(), operands.end(), [&] (auto&& operand) { - return operand && _column_def.type->compare(*operand, *cell_value) == 0; + return operand && _column_def.type->compare(*operand, cell_value_bv) == 0; }); + }); } static query::range to_range(const term_slice& slice, const query_options& options) { @@ -526,7 +533,9 @@ bool single_column_restriction::slice::is_satisfied_by(const schema& schema, if (!cell_value) { return false; } - return to_range(_slice, options).contains(*cell_value, _column_def.type->as_tri_comparator()); + return cell_value->with_linearized([&] (bytes_view cell_value_bv) { + return to_range(_slice, options).contains(cell_value_bv, _column_def.type->as_tri_comparator()); + }); } bool single_column_restriction::contains::is_satisfied_by(const schema& schema, @@ -552,7 +561,8 @@ bool single_column_restriction::contains::is_satisfied_by(const schema& schema, auto&& element_type = col_type->is_set() ? col_type->name_comparator() : col_type->value_comparator(); if (_column_def.type->is_multi_cell()) { auto cell = cells.find_cell(_column_def.id); - auto&& elements = col_type->deserialize_mutation_form(cell->as_collection_mutation()).cells; + return cell->as_collection_mutation().data.with_linearized([&] (bytes_view collection_bv) { + auto&& elements = col_type->deserialize_mutation_form(collection_bv).cells; auto end = std::remove_if(elements.begin(), elements.end(), [now] (auto&& element) { return element.second.is_dead(now); }); @@ -562,7 +572,9 @@ bool single_column_restriction::contains::is_satisfied_by(const schema& schema, continue; } auto found = std::find_if(elements.begin(), end, [&] (auto&& element) { - return element_type->compare(element.second.value(), *val) == 0; + return element.second.value().with_linearized([&] (bytes_view value_bv) { + return element_type->compare(value_bv, *val) == 0; + }); }); if (found == end) { return false; @@ -589,16 +601,26 @@ bool single_column_restriction::contains::is_satisfied_by(const schema& schema, auto found = std::find_if(elements.begin(), end, [&] (auto&& element) { return map_key_type->compare(element.first, *map_key) == 0; }); - if (found == end || element_type->compare(found->second.value(), *map_value) != 0) { + if (found == end) { + return false; + } + auto cmp = found->second.value().with_linearized([&] (bytes_view value_bv) { + return element_type->compare(value_bv, *map_value); + }); + if (cmp != 0) { return false; } } + return true; + }); } else { auto cell_value = get_value(schema, key, ckey, cells, now); if (!cell_value) { return false; } - auto deserialized = _column_def.type->deserialize(*cell_value); + auto deserialized = cell_value->with_linearized([&] (bytes_view cell_value_bv) { + return _column_def.type->deserialize(cell_value_bv); + }); for (auto&& value : _values) { auto val = value->bind_and_get(options); if (!val) { @@ -669,7 +691,9 @@ bool token_restriction::EQ::is_satisfied_by(const schema& schema, for (auto&& operand : values(options)) { if (operand) { auto cell_value = do_get_value(schema, **cdef, key, ckey, cells, now); - satisfied = cell_value && (*cdef)->type->compare(*operand, *cell_value) == 0; + satisfied = cell_value && cell_value->with_linearized([&] (bytes_view cell_value_bv) { + return (*cdef)->type->compare(*operand, cell_value_bv) == 0; + }); } if (!satisfied) { break; @@ -691,7 +715,9 @@ bool token_restriction::slice::is_satisfied_by(const schema& schema, if (!cell_value) { return false; } - satisfied = range.contains(*cell_value, cdef->type->as_tri_comparator()); + satisfied = cell_value->with_linearized([&] (bytes_view cell_value_bv) { + return range.contains(cell_value_bv, cdef->type->as_tri_comparator()); + }); if (!satisfied) { break; } diff --git a/cql3/statements/batch_statement.cc b/cql3/statements/batch_statement.cc index 35d0458732..f1f3226fef 100644 --- a/cql3/statements/batch_statement.cc +++ b/cql3/statements/batch_statement.cc @@ -239,18 +239,18 @@ void batch_statement::verify_batch_size(const std::vector& mutations) public: void accept_partition_tombstone(tombstone) override {} void accept_static_cell(column_id, atomic_cell_view v) override { - size += v.value().size(); + size += v.value().size_bytes(); } void accept_static_cell(column_id, collection_mutation_view v) override { - size += v.data.size(); + size += v.data.size_bytes(); } void accept_row_tombstone(const range_tombstone&) override {} void accept_row(position_in_partition_view, const row_tombstone&, const row_marker&, is_dummy, is_continuous) override {} void accept_row_cell(column_id, atomic_cell_view v) override { - size += v.value().size(); + size += v.value().size_bytes(); } void accept_row_cell(column_id id, collection_mutation_view v) override { - size += v.data.size(); + size += v.data.size_bytes(); } size_t size = 0; diff --git a/database.cc b/database.cc index fb86378152..7905b4d641 100644 --- a/database.cc +++ b/database.cc @@ -3681,7 +3681,7 @@ std::ostream& operator<<(std::ostream& os, const atomic_cell_view& acv) { if (acv.is_live()) { return fprint(os, "atomic_cell{%s;ts=%d;expiry=%d,ttl=%d}", - to_hex(acv.value()), + to_hex(acv.value().linearize()), acv.timestamp(), acv.is_live_and_has_ttl() ? acv.expiry().time_since_epoch().count() : -1, acv.is_live_and_has_ttl() ? acv.ttl().count() : 0); diff --git a/db/view/view.cc b/db/view/view.cc index 69ae601c03..65312dc174 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -318,7 +318,8 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons } deletable_row& view_updates::get_view_row(const partition_key& base_key, const clustering_row& update) { - auto get_value = boost::adaptors::transformed([&, this] (const column_definition& cdef) { + std::vector linearized_values; + auto get_value = boost::adaptors::transformed([&, this] (const column_definition& cdef) -> bytes_view { auto* base_col = _base->get_column_definition(cdef.name()); assert(base_col); switch (base_col->kind) { @@ -328,10 +329,11 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c return update.key().get_component(*_base, base_col->position()); default: auto& c = update.cells().cell_at(base_col->id); - if (base_col->is_atomic()) { - return c.as_atomic_cell(cdef).value(); + auto value_view = base_col->is_atomic() ? c.as_atomic_cell(cdef).value() : c.as_collection_mutation().data; + if (value_view.is_fragmented()) { + return linearized_values.emplace_back(value_view.linearize()); } - return c.as_collection_mutation().data; + return value_view.first_fragment(); } }); auto& partition = partition_for(partition_key::from_range(_view->partition_key_columns() | get_value)); diff --git a/memtable.hh b/memtable.hh index 3721400775..1db112f534 100644 --- a/memtable.hh +++ b/memtable.hh @@ -186,11 +186,13 @@ private: update(item.as_atomic_cell(col)); } else { auto ctype = static_pointer_cast(col.type); - auto mview = ctype->deserialize_mutation_form(item.as_collection_mutation()); + item.as_collection_mutation().data.with_linearized([&] (bytes_view bv) { + auto mview = ctype->deserialize_mutation_form(bv); update(mview.tomb); for (auto& entry : mview.cells) { update(entry.second); } + }); } }); } diff --git a/mutation_partition.cc b/mutation_partition.cc index cf2e06e94f..c795f87a30 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -595,7 +595,7 @@ void write_cell(RowWriter& w, const query::partition_slice& slice, ::atomic_cell } else { return std::move(wr).skip_expiry(); } - }().write_value(c.value()); + }().write_fragmented_value(c.value()); [&, wr = std::move(after_value)] () mutable { if (slice.options.contains() && c.is_live_and_has_ttl()) { return std::move(wr).write_ttl(c.ttl()); @@ -621,6 +621,7 @@ void write_cell(RowWriter& w, const query::partition_slice& slice, const data_ty template void write_counter_cell(RowWriter& w, const query::partition_slice& slice, ::atomic_cell_view c) { assert(c.is_live()); + counter_cell_view::with_linearized(c, [&] (counter_cell_view ccv) { auto wr = w.add().write(); [&, wr = std::move(wr)] () mutable { if (slice.options.contains()) { @@ -629,9 +630,10 @@ void write_counter_cell(RowWriter& w, const query::partition_slice& slice, ::ato return std::move(wr).skip_timestamp(); } }().skip_expiry() - .write_value(counter_cell_view::total_value_type()->decompose(counter_cell_view(c).total_value())) + .write_value(counter_cell_view::total_value_type()->decompose(ccv.total_value())) .skip_ttl() .end_qr_cell(); + }); } // Used to return the timestamp of the latest update to the row @@ -1624,7 +1626,8 @@ bool row::compact_and_expire( } else { auto&& cell = c.as_collection_mutation(); auto&& ctype = static_pointer_cast(def.type); - auto m_view = ctype->deserialize_mutation_form(cell); + cell.data.with_linearized([&] (bytes_view cell_bv) { + auto m_view = ctype->deserialize_mutation_form(cell_bv); collection_type_impl::mutation m = m_view.materialize(*ctype); any_live |= m.compact_and_expire(tomb, query_time, can_gc, gc_before); if (m.cells.empty() && m.tomb <= tomb.tomb()) { @@ -1632,6 +1635,7 @@ bool row::compact_and_expire( } else { c = ctype->serialize_mutation_form(m); } + }); } return erase; }); diff --git a/mutation_partition_serializer.cc b/mutation_partition_serializer.cc index 97b75bf205..9b809a3a86 100644 --- a/mutation_partition_serializer.cc +++ b/mutation_partition_serializer.cc @@ -44,7 +44,7 @@ template auto write_live_cell(Writer&& writer, atomic_cell_view c) { return std::move(writer).write_created_at(c.timestamp()) - .write_value(c.value()) + .write_fragmented_value(c.value()) .end_live_cell(); } @@ -59,7 +59,7 @@ auto write_counter_cell(Writer&& writer, atomic_cell_view c) .write_delta(delta) .end_counter_cell_update(); } else { - counter_cell_view ccv(c); + return counter_cell_view::with_linearized(c, [&] (counter_cell_view ccv) { auto shards = std::move(value).start_value_counter_cell_full() .start_shards(); if (service::get_local_storage_service().cluster_supports_correct_counter_order()) { @@ -72,6 +72,7 @@ auto write_counter_cell(Writer&& writer, atomic_cell_view c) } } return std::move(shards).end_shards().end_counter_cell_full(); + }); } }().end_counter_cell(); } @@ -83,7 +84,7 @@ auto write_expiring_cell(Writer&& writer, atomic_cell_view c) .write_expiry(c.expiry()) .start_c() .write_created_at(c.timestamp()) - .write_value(c.value()) + .write_fragmented_value(c.value()) .end_c() .end_expiring_cell(); } @@ -101,8 +102,9 @@ auto write_dead_cell(Writer&& writer, atomic_cell_view c) template auto write_collection_cell(Writer&& collection_writer, collection_mutation_view cmv, const column_definition& def) { + return cmv.data.with_linearized([&] (bytes_view cmv_bv) { auto&& ctype = static_pointer_cast(def.type); - auto m_view = ctype->deserialize_mutation_form(cmv); + auto m_view = ctype->deserialize_mutation_form(cmv_bv); auto cells_writer = std::move(collection_writer).write_tomb(m_view.tomb).start_elements(); for (auto&& c : m_view.cells) { auto cell_writer = cells_writer.add().write_key(c.first); @@ -115,6 +117,7 @@ auto write_collection_cell(Writer&& collection_writer, collection_mutation_view } } return std::move(cells_writer).end_elements().end_collection_cell(); + }); } template diff --git a/mutation_partition_view.cc b/mutation_partition_view.cc index 6aee34f77f..a195ed7cff 100644 --- a/mutation_partition_view.cc +++ b/mutation_partition_view.cc @@ -108,7 +108,7 @@ collection_mutation read_collection_cell(const collection_type_impl& ctype, ser: for (auto&& e : elements) { mut.cells.emplace_back(e.key(), read_atomic_cell(*ctype.value_comparator(), e.value())); } - return collection_type_impl::serialize_mutation_form(mut); + return ctype.serialize_mutation_form(mut); } template diff --git a/sstables/disk_types.hh b/sstables/disk_types.hh index dc42090b51..7ba7ea76dc 100644 --- a/sstables/disk_types.hh +++ b/sstables/disk_types.hh @@ -29,6 +29,7 @@ #include #include #include +#include "atomic_cell.hh" namespace sstables { @@ -67,6 +68,11 @@ struct disk_string_view { bytes_view value; }; +template +struct disk_data_value_view { + atomic_cell_value_view value; +}; + template struct disk_array { static_assert(std::is_integral::value, "Length type must be convertible to integer"); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index de20afec82..001424d5f0 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -378,6 +378,15 @@ inline void write(sstable_version_types v, file_writer& out, const disk_string_v write(v, out, len, s.value); } +template +inline void write(sstable_version_types ver, file_writer& out, const disk_data_value_view& v) { + SizeType length; + check_truncate_and_assign(length, v.value.size_bytes()); + write(ver, out, length); + using boost::range::for_each; + for_each(v.value, [&] (bytes_view fragment) { write(ver, out, fragment); }); +} + // We cannot simply read the whole array at once, because we don't know its // full size. We know the number of elements, but if we are talking about // disk_strings, for instance, we have no idea how much of the stream each @@ -1712,6 +1721,16 @@ void write_cell_value(file_writer& out, const abstract_type& type, bytes_view va } } +void write_cell_value(file_writer& out, const abstract_type& type, atomic_cell_value_view value) { + if (!value.empty()) { + if (!type.value_length_if_fixed()) { + write_vint(out, value.size_bytes()); + } + using boost::range::for_each; + for_each(value, [&] (bytes_view fragment) { write(sstable_version_types::mc, out, fragment); }); + } +} + static inline void update_cell_stats(column_stats& c_stats, api::timestamp_type timestamp) { c_stats.update_timestamp(timestamp); c_stats.cells_count++; @@ -1770,19 +1789,20 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell, const column_d column_mask mask = column_mask::counter; write(_version, out, mask, int64_t(0), timestamp); - counter_cell_view ccv(cell); + counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) { write_counter_value(ccv, out, _version, [v = _version] (file_writer& out, uint32_t value) { return write(v, out, value); }); _c_stats.update_local_deletion_time(std::numeric_limits::max()); + }); } else if (cell.is_live_and_has_ttl()) { // expiring cell column_mask mask = column_mask::expiration; uint32_t ttl = cell.ttl().count(); uint32_t expiration = cell.expiry().time_since_epoch().count(); - disk_string_view cell_value { cell.value() }; + disk_data_value_view cell_value { cell.value() }; _c_stats.update_local_deletion_time(expiration); // tombstone histogram is updated with expiration time because if ttl is longer @@ -1795,7 +1815,7 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell, const column_d // regular cell column_mask mask = column_mask::none; - disk_string_view cell_value { cell.value() }; + disk_data_value_view cell_value { cell.value() }; _c_stats.update_local_deletion_time(std::numeric_limits::max()); @@ -1884,8 +1904,9 @@ void sstable::write_range_tombstone(file_writer& out, } void sstable::write_collection(file_writer& out, const composite& clustering_key, const column_definition& cdef, collection_mutation_view collection) { + collection.data.with_linearized([&] (bytes_view collection_bv) { auto t = static_pointer_cast(cdef.type); - auto mview = t->deserialize_mutation_form(collection); + auto mview = t->deserialize_mutation_form(collection_bv); const bytes& column_name = cdef.name(); if (mview.tomb) { write_range_tombstone(out, clustering_key, composite::eoc::start, clustering_key, composite::eoc::end, { column_name }, mview.tomb); @@ -1894,6 +1915,7 @@ void sstable::write_collection(file_writer& out, const composite& clustering_key index_and_write_column_name(out, clustering_key, { column_name, cp.first }); write_cell(out, cp.second, cdef); } + }); } // This function is about writing a clustered_row to data file according to SSTables format. @@ -2904,10 +2926,11 @@ void sstable_writer_m::write_cell(file_writer& writer, atomic_cell_view cell, co if (has_value) { if (cdef.is_counter()) { assert(!cell.is_counter_update()); - counter_cell_view ccv(cell); + counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) { write_counter_value(ccv, writer, sstable_version_types::mc, [] (file_writer& out, uint32_t value) { return write_vint(out, value); }); + }); } else { write_cell_value(writer, *cdef.type, cell.value()); } @@ -2954,7 +2977,8 @@ void sstable_writer_m::write_liveness_info(file_writer& writer, const row_marker void sstable_writer_m::write_collection(file_writer& writer, const column_definition& cdef, collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion) { auto& ctype = *static_pointer_cast(cdef.type); - auto mview = ctype.deserialize_mutation_form(collection); + collection.data.with_linearized([&] (bytes_view collection_bv) { + auto mview = ctype.deserialize_mutation_form(collection_bv); if (has_complex_deletion) { auto dt = to_deletion_time(mview.tomb); write_delta_deletion_time(writer, dt); @@ -2972,6 +2996,7 @@ void sstable_writer_m::write_collection(file_writer& writer, const column_defini ++_c_stats.cells_count; write_cell(writer, cell, cdef, properties, cell_path); } + }); } void sstable_writer_m::write_cells(file_writer& writer, column_kind kind, const row& row_body, @@ -3071,12 +3096,14 @@ static bool row_has_complex_deletion(const schema& s, const row& r) { return stop_iteration::no; } auto t = static_pointer_cast(cdef.type); - auto mview = t->deserialize_mutation_form(c.as_collection_mutation()); + return c.as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) { + auto mview = t->deserialize_mutation_form(c_bv); if (mview.tomb) { result = true; } return stop_iteration(static_cast(mview.tomb)); }); + }); return result; } diff --git a/tests/counter_test.cc b/tests/counter_test.cc index d59fc2b9cf..2f1880c223 100644 --- a/tests/counter_test.cc +++ b/tests/counter_test.cc @@ -67,24 +67,28 @@ SEASTAR_TEST_CASE(test_counter_cell) { b1.add_shard(counter_shard(id[0], 5, 1)); b1.add_shard(counter_shard(id[1], -4, 1)); auto c1 = atomic_cell_or_collection(b1.build(0)); - - auto cv = counter_cell_view(c1.as_atomic_cell(cdef)); + + atomic_cell_or_collection c2; + counter_cell_view::with_linearized(c1.as_atomic_cell(cdef), [&] (counter_cell_view cv) { BOOST_REQUIRE_EQUAL(cv.total_value(), 1); verify_shard_order(cv); counter_cell_builder b2; b2.add_shard(counter_shard(*cv.get_shard(id[0])).update(2, 1)); b2.add_shard(counter_shard(id[2], 1, 1)); - auto c2 = atomic_cell_or_collection(b2.build(0)); + c2 = atomic_cell_or_collection(b2.build(0)); + }); - cv = counter_cell_view(c2.as_atomic_cell(cdef)); + counter_cell_view::with_linearized(c2.as_atomic_cell(cdef), [&] (counter_cell_view cv) { BOOST_REQUIRE_EQUAL(cv.total_value(), 8); verify_shard_order(cv); + }); counter_cell_view::apply(cdef, c1, c2); - cv = counter_cell_view(c1.as_atomic_cell(cdef)); + counter_cell_view::with_linearized(c1.as_atomic_cell(cdef), [&] (counter_cell_view cv) { BOOST_REQUIRE_EQUAL(cv.total_value(), 4); verify_shard_order(cv); + }); }); } @@ -97,9 +101,10 @@ SEASTAR_TEST_CASE(test_apply) { auto src = b.copy(*cdef.type); counter_cell_view::apply(cdef, dst, src); - auto cv = counter_cell_view(dst.as_atomic_cell(cdef)); + counter_cell_view::with_linearized(dst.as_atomic_cell(cdef), [&] (counter_cell_view cv) { BOOST_REQUIRE_EQUAL(cv.total_value(), value); BOOST_REQUIRE_EQUAL(cv.timestamp(), std::max(dst.as_atomic_cell(cdef).timestamp(), src.as_atomic_cell(cdef).timestamp())); + }); }; auto id = generate_ids(5); @@ -235,15 +240,17 @@ SEASTAR_TEST_CASE(test_counter_mutations) { m.apply(m2); auto ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - counter_cell_view ccv { ac }; + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), -102); verify_shard_order(ccv); + }); ac = get_static_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 20); verify_shard_order(ccv); + }); m.apply(m3); ac = get_counter_cell(m); @@ -263,28 +270,32 @@ SEASTAR_TEST_CASE(test_counter_mutations) { m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m2.partition())); ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 2); verify_shard_order(ccv); + }); ac = get_static_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 11); verify_shard_order(ccv); + }); m = mutation(s, m1.decorated_key(), m2.partition().difference(s, m1.partition())); ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), -105); verify_shard_order(ccv); + }); ac = get_static_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 9); verify_shard_order(ccv); + }); m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m3.partition())); BOOST_REQUIRE_EQUAL(m.partition().clustered_rows().calculate_size(), 0); @@ -420,30 +431,34 @@ SEASTAR_TEST_CASE(test_transfer_updates_to_shards) { auto ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - auto ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 5); verify_shard_order(ccv); + }); ac = get_static_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 4); verify_shard_order(ccv); + }); m = m2; transform_counter_updates_to_shards(m, &m0, 0); ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 14); verify_shard_order(ccv); + }); ac = get_static_counter_cell(m); BOOST_REQUIRE(ac.is_live()); - ccv = counter_cell_view(ac); + counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) { BOOST_REQUIRE_EQUAL(ccv.total_value(), 12); verify_shard_order(ccv); + }); m = m3; transform_counter_updates_to_shards(m, &m0, 0); @@ -502,13 +517,14 @@ SEASTAR_TEST_CASE(test_sanitize_corrupted_cells) { auto c2 = atomic_cell_or_collection(b2.build(0)); // Compare - auto cv1 = counter_cell_view(c1.as_atomic_cell(cdef)); - auto cv2 = counter_cell_view(c2.as_atomic_cell(cdef)); - + counter_cell_view::with_linearized(c1.as_atomic_cell(cdef), [&] (counter_cell_view cv1) { + counter_cell_view::with_linearized(c2.as_atomic_cell(cdef), [&] (counter_cell_view cv2) { BOOST_REQUIRE_EQUAL(cv1, cv2); BOOST_REQUIRE_EQUAL(cv1.total_value(), cv2.total_value()); verify_shard_order(cv1); verify_shard_order(cv2); + }); + }); } }); } @@ -561,7 +577,7 @@ SEASTAR_TEST_CASE(test_shards_compatible_with_1_7_4) { } auto ac = atomic_cell_or_collection(ccb.build(0)); - auto cv = counter_cell_view(ac.as_atomic_cell(cdef)); + counter_cell_view::with_linearized(ac.as_atomic_cell(cdef), [&] (counter_cell_view cv) { verify_shard_order(cv); @@ -573,6 +589,7 @@ SEASTAR_TEST_CASE(test_shards_compatible_with_1_7_4) { } previous = cs.id(); } + }); }); } diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index 419168ad8d..da2c81956a 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -228,12 +228,11 @@ public: if (!col_def->type->is_multi_cell()) { auto c = cell->as_atomic_cell(*col_def); assert(c.is_live()); - actual = { c.value().begin(), c.value().end() }; + actual = c.value().linearize(); } else { auto c = cell->as_collection_mutation(); auto type = dynamic_pointer_cast(col_def->type); - actual = type->to_value(type->deserialize_mutation_form(c), - cql_serialization_format::internal()); + actual = type->to_value(c, cql_serialization_format::internal()); } assert(col_def->type->equal(actual, exp)); }); diff --git a/tests/flat_mutation_reader_assertions.hh b/tests/flat_mutation_reader_assertions.hh index 028f36b732..0f3dc63628 100644 --- a/tests/flat_mutation_reader_assertions.hh +++ b/tests/flat_mutation_reader_assertions.hh @@ -116,7 +116,7 @@ public: BOOST_FAIL(sprint("Expected static row with column %s, but it is not present", columns[i].name)); } auto& cdef = _reader.schema()->static_column_at(columns[i].id); - auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value()); + auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value().linearize()); if (cmp != 0) { BOOST_FAIL(sprint("Expected static row with column %s having value %s, but it has value %s", columns[i].name, @@ -150,7 +150,7 @@ public: BOOST_FAIL(sprint("Expected row with column %s, but it is not present", columns[i].name)); } auto& cdef = _reader.schema()->regular_column_at(columns[i].id); - auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value()); + auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value().linearize()); if (cmp != 0) { BOOST_FAIL(sprint("Expected row with column %s having value %s, but it has value %s", columns[i].name, diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc index a09a478296..7aef592063 100644 --- a/tests/mutation_test.cc +++ b/tests/mutation_test.cc @@ -75,6 +75,11 @@ static atomic_cell make_atomic_cell(data_type dt, T value) { return atomic_cell::make_live(*dt, 0, dt->decompose(std::move(value))); }; +template +static atomic_cell make_collection_member(data_type dt, T value) { + return atomic_cell::make_live(*dt, 0, dt->decompose(std::move(value))); +}; + static mutation_partition get_partition(memtable& mt, const partition_key& key) { auto dk = dht::global_partitioner().decorate_key(*mt.schema(), key); auto reader = mt.make_flat_reader(mt.schema(), dht::partition_range::make_singular(dk)); @@ -119,7 +124,7 @@ SEASTAR_TEST_CASE(test_mutation_is_applied) { BOOST_REQUIRE(i); auto cell = i->as_atomic_cell(r1_col); BOOST_REQUIRE(cell.is_live()); - BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(3))); + BOOST_REQUIRE(int32_type->equal(cell.value().linearize(), int32_type->decompose(3))); }); } @@ -207,19 +212,19 @@ SEASTAR_TEST_CASE(test_map_mutations) { auto mt = make_lw_shared(s); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); - auto mmut1 = make_collection_mutation({}, int32_type->decompose(101), make_atomic_cell(utf8_type, sstring("101"))); + auto mmut1 = make_collection_mutation({}, int32_type->decompose(101), make_collection_member(utf8_type, sstring("101"))); mutation m1(s, key); m1.set_static_cell(column, my_map_type->serialize_mutation_form(mmut1)); mt->apply(m1); - auto mmut2 = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell(utf8_type, sstring("102"))); + auto mmut2 = make_collection_mutation({}, int32_type->decompose(102), make_collection_member(utf8_type, sstring("102"))); mutation m2(s, key); m2.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2)); mt->apply(m2); - auto mmut3 = make_collection_mutation({}, int32_type->decompose(103), make_atomic_cell(utf8_type, sstring("103"))); + auto mmut3 = make_collection_mutation({}, int32_type->decompose(103), make_collection_member(utf8_type, sstring("103"))); mutation m3(s, key); m3.set_static_cell(column, my_map_type->serialize_mutation_form(mmut3)); mt->apply(m3); - auto mmut2o = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell(utf8_type, sstring("102 override"))); + auto mmut2o = make_collection_mutation({}, int32_type->decompose(102), make_collection_member(utf8_type, sstring("102 override"))); mutation m2o(s, key); m2o.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2o)); mt->apply(m2o); @@ -229,7 +234,8 @@ SEASTAR_TEST_CASE(test_map_mutations) { auto i = r.find_cell(column.id); BOOST_REQUIRE(i); auto cell = i->as_collection_mutation(); - auto muts = my_map_type->deserialize_mutation_form(cell); + auto cell_b = cell.data.linearize(); + auto muts = my_map_type->deserialize_mutation_form(cell_b); BOOST_REQUIRE(muts.cells.size() == 3); // FIXME: more strict tests }); @@ -265,7 +271,8 @@ SEASTAR_TEST_CASE(test_set_mutations) { auto i = r.find_cell(column.id); BOOST_REQUIRE(i); auto cell = i->as_collection_mutation(); - auto muts = my_set_type->deserialize_mutation_form(cell); + auto cell_b = cell.data.linearize(); + auto muts = my_set_type->deserialize_mutation_form(cell_b); BOOST_REQUIRE(muts.cells.size() == 3); // FIXME: more strict tests }); @@ -280,19 +287,19 @@ SEASTAR_TEST_CASE(test_list_mutations) { auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); auto make_key = [] { return timeuuid_type->decompose(utils::UUID_gen::get_time_UUID()); }; - auto mmut1 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 101)); + auto mmut1 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 101)); mutation m1(s, key); m1.set_static_cell(column, my_list_type->serialize_mutation_form(mmut1)); mt->apply(m1); - auto mmut2 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 102)); + auto mmut2 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 102)); mutation m2(s, key); m2.set_static_cell(column, my_list_type->serialize_mutation_form(mmut2)); mt->apply(m2); - auto mmut3 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 103)); + auto mmut3 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 103)); mutation m3(s, key); m3.set_static_cell(column, my_list_type->serialize_mutation_form(mmut3)); mt->apply(m3); - auto mmut2o = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 102)); + auto mmut2o = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 102)); mutation m2o(s, key); m2o.set_static_cell(column, my_list_type->serialize_mutation_form(mmut2o)); mt->apply(m2o); @@ -302,7 +309,8 @@ SEASTAR_TEST_CASE(test_list_mutations) { auto i = r.find_cell(column.id); BOOST_REQUIRE(i); auto cell = i->as_collection_mutation(); - auto muts = my_list_type->deserialize_mutation_form(cell); + auto cell_b = cell.data.linearize(); + auto muts = my_list_type->deserialize_mutation_form(cell_b); BOOST_REQUIRE(muts.cells.size() == 4); // FIXME: more strict tests }); @@ -347,7 +355,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { BOOST_REQUIRE(i); auto cell = i->as_atomic_cell(r1_col); BOOST_REQUIRE(cell.is_live()); - BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(r1))); + BOOST_REQUIRE(int32_type->equal(cell.value().linearize(), int32_type->decompose(r1))); } }; verify_row(1001, 2001); @@ -485,7 +493,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { auto c1 = value_cast(int32_type->deserialize(re.key().explode(*s)[0])); auto cell = re.row().cells().find_cell(r1_col.id); if (cell) { - result[p1][c1] = value_cast(int32_type->deserialize(cell->as_atomic_cell(r1_col).value())); + result[p1][c1] = value_cast(int32_type->deserialize(cell->as_atomic_cell(r1_col).value().linearize())); } } return true; @@ -899,7 +907,8 @@ SEASTAR_TEST_CASE(test_mutation_diff) { BOOST_REQUIRE(m2_1.find_row(*s, ckey2)); BOOST_REQUIRE(m2_1.find_row(*s, ckey2)->find_cell(2)); auto cmv = m2_1.find_row(*s, ckey2)->find_cell(2)->as_collection_mutation(); - auto cm = my_set_type->deserialize_mutation_form(cmv); + auto cmv_b = cmv.data.linearize(); + auto cm = my_set_type->deserialize_mutation_form(cmv_b); BOOST_REQUIRE(cm.cells.size() == 1); BOOST_REQUIRE(cm.cells.front().first == int32_type->decompose(3)); @@ -916,7 +925,8 @@ SEASTAR_TEST_CASE(test_mutation_diff) { BOOST_REQUIRE(!m1_2.find_row(*s, ckey2)->find_cell(0)); BOOST_REQUIRE(!m1_2.find_row(*s, ckey2)->find_cell(1)); cmv = m1_2.find_row(*s, ckey2)->find_cell(2)->as_collection_mutation(); - cm = my_set_type->deserialize_mutation_form(cmv); + cmv_b = cmv.data.linearize(); + cm = my_set_type->deserialize_mutation_form(cmv_b); BOOST_REQUIRE(cm.cells.size() == 1); BOOST_REQUIRE(cm.cells.front().first == int32_type->decompose(2)); @@ -962,7 +972,7 @@ SEASTAR_TEST_CASE(test_large_blobs) { BOOST_REQUIRE(i); auto cell = i->as_atomic_cell(s1_col); BOOST_REQUIRE(cell.is_live()); - BOOST_REQUIRE(bytes_type->equal(cell.value(), bytes_type->decompose(data_value(blob1)))); + BOOST_REQUIRE(bytes_type->equal(cell.value().linearize(), bytes_type->decompose(data_value(blob1)))); // Stress managed_bytes::linearize and scatter by merging a value into the cell mutation m2(s, key); @@ -975,7 +985,7 @@ SEASTAR_TEST_CASE(test_large_blobs) { BOOST_REQUIRE(i2); auto cell2 = i2->as_atomic_cell(s1_col); BOOST_REQUIRE(cell2.is_live()); - BOOST_REQUIRE(bytes_type->equal(cell2.value(), bytes_type->decompose(data_value(blob2)))); + BOOST_REQUIRE(bytes_type->equal(cell2.value().linearize(), bytes_type->decompose(data_value(blob2)))); }); } diff --git a/tests/simple_schema.hh b/tests/simple_schema.hh index 7bbf08b657..698357fa13 100644 --- a/tests/simple_schema.hh +++ b/tests/simple_schema.hh @@ -114,7 +114,7 @@ public: if (!ac.is_live()) { throw std::runtime_error("cell is dead"); } - return std::make_pair(value_cast(utf8_type->deserialize(ac.value())), ac.timestamp()); + return std::make_pair(value_cast(utf8_type->deserialize(ac.value().linearize())), ac.timestamp()); } mutation_fragment make_row(const clustering_key& key, sstring v) { diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index 4cefa3e0c3..94ae3b8dd0 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -823,7 +823,8 @@ SEASTAR_TEST_CASE(datafile_generation_11) { auto cell = r->find_cell(set_col.id); BOOST_REQUIRE(cell); auto t = static_pointer_cast(set_col.type); - return t->deserialize_mutation_form(cell->as_collection_mutation()); + auto bv = cell->as_collection_mutation().data.linearize(); + return t->deserialize_mutation_form(bv).materialize(*t); }; auto sst = make_sstable(s, tmpdir_path, 11, la, big); @@ -832,7 +833,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) { return do_with(make_dkey(s, "key1"), [sstp, s, verifier, tomb, &static_set_col] (auto& key) { auto rd = make_lw_shared(sstp->read_row_flat(s, key)); return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, verifier, tomb, &static_set_col, rd] (auto mutation) { - auto verify_set = [&tomb] (auto m) { + auto verify_set = [&tomb] (const collection_type_impl::mutation& m) { BOOST_REQUIRE(bool(m.tomb) == true); BOOST_REQUIRE(m.tomb == tomb); BOOST_REQUIRE(m.cells.size() == 3); @@ -849,7 +850,8 @@ SEASTAR_TEST_CASE(datafile_generation_11) { // The static set auto t = static_pointer_cast(static_set_col.type); - auto mut = t->deserialize_mutation_form(scol->as_collection_mutation()); + auto bv = scol->as_collection_mutation().data.linearize(); + auto mut = t->deserialize_mutation_form(bv).materialize(*t); verify_set(mut); // The clustered set @@ -2875,7 +2877,7 @@ SEASTAR_TEST_CASE(test_counter_read) { BOOST_REQUIRE(mfopt->is_clustering_row()); const clustering_row* cr = &mfopt->as_clustering_row(); cr->cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) { - counter_cell_view ccv { c.as_atomic_cell(s->regular_column_at(id)) }; + counter_cell_view::with_linearized(c.as_atomic_cell(s->regular_column_at(id)), [&] (counter_cell_view ccv) { auto& col = s->column_at(column_kind::regular_column, id); if (col.name_as_text() == "c1") { BOOST_REQUIRE_EQUAL(ccv.total_value(), 13); @@ -2896,6 +2898,7 @@ SEASTAR_TEST_CASE(test_counter_read) { } else { BOOST_FAIL(sprint("Unexpected column \'%s\'", col.name_as_text())); } + }); }); mfopt = reader().get0(); @@ -4373,11 +4376,12 @@ SEASTAR_TEST_CASE(test_wrong_counter_shard_order) { size_t n = 0; row.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& ac_o_c) { auto acv = ac_o_c.as_atomic_cell(s->regular_column_at(id)); - auto ccv = counter_cell_view(acv); + counter_cell_view::with_linearized(acv, [&] (counter_cell_view ccv) { counter_shard_view::less_compare_by_id cmp; BOOST_REQUIRE_MESSAGE(boost::algorithm::is_sorted(ccv.shards(), cmp), ccv << " is expected to be sorted"); BOOST_REQUIRE_EQUAL(ccv.total_value(), expected_value); n++; + }); }); BOOST_REQUIRE_EQUAL(n, 5); }; diff --git a/tests/sstable_test.hh b/tests/sstable_test.hh index 0d18f631cd..cb67463ce3 100644 --- a/tests/sstable_test.hh +++ b/tests/sstable_test.hh @@ -559,7 +559,9 @@ inline void match(const row& row, const schema& s, bytes col, const data_value& } auto expected = cdef->type->decompose(value); - BOOST_REQUIRE(c.value() == expected); + auto val = c.value().linearize(); + assert(val == expected); + BOOST_REQUIRE(c.value().linearize() == expected); if (timestamp) { BOOST_REQUIRE(c.timestamp() == timestamp); } @@ -592,9 +594,11 @@ match_collection(const row& row, const schema& s, bytes col, const tombstone& t) BOOST_CHECK_NO_THROW(row.cell_at(cdef->id)); auto c = row.cell_at(cdef->id).as_collection_mutation(); auto ctype = static_pointer_cast(cdef->type); - auto&& mut = ctype->deserialize_mutation_form(c); + return c.data.with_linearized([&] (bytes_view c_bv) { + auto&& mut = ctype->deserialize_mutation_form(c_bv); BOOST_REQUIRE(mut.tomb == t); return mut.materialize(*ctype); + }); } template @@ -612,7 +616,7 @@ inline void match_collection_element(const std::pair& elemen // the schema for the set type, and is enough for the purposes of this // test. if (expected_serialized_value) { - BOOST_REQUIRE(element.second.value() == *expected_serialized_value); + BOOST_REQUIRE(element.second.value().linearize() == *expected_serialized_value); } } diff --git a/types.cc b/types.cc index bfc7594002..4c951de5a1 100644 --- a/types.cc +++ b/types.cc @@ -2098,7 +2098,9 @@ collection_type_impl::as_cql3_type() const { bytes collection_type_impl::to_value(collection_mutation_view mut, cql_serialization_format sf) const { - return to_value(deserialize_mutation_form(mut), sf); + return mut.data.with_linearized([&] (bytes_view bv) { + return to_value(deserialize_mutation_form(bv), sf); + }); } collection_type_impl::mutation @@ -2422,12 +2424,19 @@ map_type_impl::serialized_values(std::vector cells) const { bytes map_type_impl::to_value(mutation_view mut, cql_serialization_format sf) const { + std::vector linearized; std::vector tmp; tmp.reserve(mut.cells.size() * 2); for (auto&& e : mut.cells) { if (e.second.is_live(mut.tomb, false)) { tmp.emplace_back(e.first); - tmp.emplace_back(e.second.value()); + auto value_view = e.second.value(); + if (value_view.is_fragmented()) { + auto& v = linearized.emplace_back(value_view.linearize()); + tmp.emplace_back(v); + } else { + tmp.emplace_back(value_view.first_fragment()); + } } } return pack(tmp.begin(), tmp.end(), tmp.size() / 2, sf); @@ -2477,8 +2486,7 @@ bool map_type_impl::references_duration() const { return _keys->references_duration() || _values->references_duration(); } -auto collection_type_impl::deserialize_mutation_form(collection_mutation_view cm) const -> mutation_view { - auto&& in = cm.data; +auto collection_type_impl::deserialize_mutation_form(bytes_view in) const -> mutation_view { mutation_view ret; auto has_tomb = read_simple(in); if (has_tomb) { @@ -2502,13 +2510,14 @@ auto collection_type_impl::deserialize_mutation_form(collection_mutation_view cm } bool collection_type_impl::is_empty(collection_mutation_view cm) const { - auto&& in = cm.data; + return cm.data.with_linearized([&] (bytes_view in) { // FIXME: we can guarantee that this is in the first fragment auto has_tomb = read_simple(in); return !has_tomb && read_simple(in) == 0; + }); } bool collection_type_impl::is_any_live(collection_mutation_view cm, tombstone tomb, gc_clock::time_point now) const { - auto&& in = cm.data; + return cm.data.with_linearized([&] (bytes_view in) { auto has_tomb = read_simple(in); if (has_tomb) { auto ts = read_simple(in); @@ -2526,10 +2535,11 @@ bool collection_type_impl::is_any_live(collection_mutation_view cm, tombstone to } } return false; + }); } api::timestamp_type collection_type_impl::last_update(collection_mutation_view cm) const { - auto&& in = cm.data; + return cm.data.with_linearized([&] (bytes_view in) { api::timestamp_type max = api::missing_timestamp; auto has_tomb = read_simple(in); if (has_tomb) { @@ -2545,11 +2555,13 @@ api::timestamp_type collection_type_impl::last_update(collection_mutation_view c max = std::max(value.timestamp(), max); } return max; + }); } template collection_mutation do_serialize_mutation_form( + const collection_type_impl& ctype, const tombstone& tomb, boost::iterator_range cells) { auto element_size = [] (size_t c, auto&& e) -> size_t { @@ -2577,9 +2589,10 @@ do_serialize_mutation_form( auto&& k = kv.first; auto&& v = kv.second; writeb(k); + writeb(v.serialize()); } - return collection_mutation{std::move(ret)}; + return collection_mutation(std::move(ret)); } bool collection_type_impl::mutation::compact_and_expire(row_tombstone base_tomb, gc_clock::time_point query_time, @@ -2619,26 +2632,28 @@ bool collection_type_impl::mutation::compact_and_expire(row_tombstone base_tomb, } collection_mutation -collection_type_impl::serialize_mutation_form(const mutation& mut) { - return do_serialize_mutation_form(mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end())); +collection_type_impl::serialize_mutation_form(const mutation& mut) const { + return do_serialize_mutation_form(*this, mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end())); } collection_mutation -collection_type_impl::serialize_mutation_form(mutation_view mut) { - return do_serialize_mutation_form(mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end())); +collection_type_impl::serialize_mutation_form(mutation_view mut) const { + return do_serialize_mutation_form(*this, mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end())); } collection_mutation -collection_type_impl::serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now) { - return do_serialize_mutation_form(mut.tomb, mut.cells | boost::adaptors::filtered([t = mut.tomb, now] (auto&& e) { +collection_type_impl::serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now) const { + return do_serialize_mutation_form(*this, mut.tomb, mut.cells | boost::adaptors::filtered([t = mut.tomb, now] (auto&& e) { return e.second.is_live(t, now, false); })); } collection_mutation collection_type_impl::merge(collection_mutation_view a, collection_mutation_view b) const { - auto aa = deserialize_mutation_form(a); - auto bb = deserialize_mutation_form(b); + return a.data.with_linearized([&] (bytes_view a_in) { + return b.data.with_linearized([&] (bytes_view b_in) { + auto aa = deserialize_mutation_form(a_in); + auto bb = deserialize_mutation_form(b_in); mutation_view merged; merged.cells.reserve(aa.cells.size() + bb.cells.size()); using element_type = std::pair; @@ -2672,13 +2687,17 @@ collection_type_impl::merge(collection_mutation_view a, collection_mutation_view merge); merged.tomb = std::max(aa.tomb, bb.tomb); return serialize_mutation_form(merged); + }); + }); } collection_mutation collection_type_impl::difference(collection_mutation_view a, collection_mutation_view b) const { - auto aa = deserialize_mutation_form(a); - auto bb = deserialize_mutation_form(b); + return a.data.with_linearized([&] (bytes_view a_in) { + return b.data.with_linearized([&] (bytes_view b_in) { + auto aa = deserialize_mutation_form(a_in); + auto bb = deserialize_mutation_form(b_in); mutation_view diff; diff.cells.reserve(std::max(aa.cells.size(), bb.cells.size())); auto key_type = name_comparator(); @@ -2698,6 +2717,8 @@ collection_type_impl::difference(collection_mutation_view a, collection_mutation diff.tomb = aa.tomb; } return serialize_mutation_form(diff); + }); + }); } bytes_opt @@ -3166,11 +3187,18 @@ list_type_impl::serialized_values(std::vector cells) const { bytes list_type_impl::to_value(mutation_view mut, cql_serialization_format sf) const { + std::vector linearized; std::vector tmp; tmp.reserve(mut.cells.size()); for (auto&& e : mut.cells) { if (e.second.is_live(mut.tomb, false)) { - tmp.emplace_back(e.second.value()); + auto value_view = e.second.value(); + if (value_view.is_fragmented()) { + auto& v = linearized.emplace_back(value_view.linearize()); + tmp.emplace_back(v); + } else { + tmp.emplace_back(value_view.first_fragment()); + } } } return pack(tmp.begin(), tmp.end(), tmp.size(), sf); diff --git a/types.hh b/types.hh index 3d516a6384..45c78561be 100644 --- a/types.hh +++ b/types.hh @@ -818,26 +818,29 @@ public: virtual shared_ptr as_cql3_type() const override; template static bytes pack(BytesViewIterator start, BytesViewIterator finish, int elements, cql_serialization_format sf); - mutation_view deserialize_mutation_form(collection_mutation_view in) const; + // requires linearized collection_mutation_view, lifetime + mutation_view deserialize_mutation_form(bytes_view in) const; bool is_empty(collection_mutation_view in) const; bool is_any_live(collection_mutation_view in, tombstone tomb = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const; api::timestamp_type last_update(collection_mutation_view in) const; virtual bytes to_value(mutation_view mut, cql_serialization_format sf) const = 0; bytes to_value(collection_mutation_view mut, cql_serialization_format sf) const; // FIXME: use iterators? - static collection_mutation serialize_mutation_form(const mutation& mut); - static collection_mutation serialize_mutation_form(mutation_view mut); - static collection_mutation serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now); + collection_mutation serialize_mutation_form(const mutation& mut) const; + collection_mutation serialize_mutation_form(mutation_view mut) const; + collection_mutation serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now) const; collection_mutation merge(collection_mutation_view a, collection_mutation_view b) const; collection_mutation difference(collection_mutation_view a, collection_mutation_view b) const; // Calls Func(atomic_cell_view) for each cell in this collection. // noexcept if Func doesn't throw. template void for_each_cell(collection_mutation_view c, Func&& func) const { - auto m_view = deserialize_mutation_form(std::move(c)); + c.data.with_linearized([&] (bytes_view c_bv) { + auto m_view = deserialize_mutation_form(c_bv); for (auto&& c : m_view.cells) { func(std::move(c.second)); } + }); } virtual void serialize(const void* value, bytes::iterator& out, cql_serialization_format sf) const = 0; virtual data_value deserialize(bytes_view v, cql_serialization_format sf) const = 0;