From 11d7128bf6c32ec68b783b56eca1eeeeee5084b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 18 Mar 2026 12:22:45 +0200 Subject: [PATCH] mutation/collection_mutation: add collection_mutation_writer Create a serialized collection element-by-element, with size not known up-front. Appended elements are written into the buffer directly. Allows replacing the current practice of first creating a vector of cells, then serializing at the end. --- mutation/collection_mutation.cc | 48 +++++++++++++++++++++++++++++++++ mutation/collection_mutation.hh | 33 +++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/mutation/collection_mutation.cc b/mutation/collection_mutation.cc index b297c55b15..6c7dc896a9 100644 --- a/mutation/collection_mutation.cc +++ b/mutation/collection_mutation.cc @@ -372,6 +372,54 @@ collection_mutation read_from_collection_cell_view(const abstract_type& type, co return serialize_collection_mutation(tomb, std::ranges::subrange(cells.begin(), cells.end())); } +collection_mutation_writer::collection_mutation_writer(::tombstone tomb) : _tomb(tomb) { + auto oit = _out.write_begin(); + write(oit, uint8_t(bool(tomb))); + if (tomb) { + write(oit, tomb.timestamp); + write(oit, tomb.deletion_time.time_since_epoch().count()); + } + _size_buffer = _out.write_place_holder().ptr; +} + +void collection_mutation_writer::push_back(managed_bytes_view key, atomic_cell_view value) { + { + auto oit = _out.write_begin(); + write(oit, key.size()); + _out.write(key); + } + + { + auto oit = _out.write_begin(); + const auto value_bytes = value.serialize(); + write(oit, value_bytes.size()); + _out.write(value_bytes); + } + + ++_size; +} + +collection_mutation collection_mutation_writer::finish() && { + if (empty()) { + return {}; + } + + write(_size_buffer, _size); + // Force a copy of the serialized collection, for 2 reasons: + // * Shrink the allocated memory to actually needed size. + // * Ensure we are using the correct allocator: bytes_ostream uses the + // standard allocator, but we may be in an LSA context here. + auto& outer_allocator = current_allocator(); + return with_allocator(standard_allocator(), [&] { + // bytes_ostream uses malloc(), so we need to force standard + // allocator context here when the buffer is destroyed. + const auto tmp = std::move(_out).to_managed_bytes(); + return with_allocator(outer_allocator, [&] { + return collection_mutation(tmp); + }); + }); +} + template requires std::is_base_of_v> static collection_mutation_view_description diff --git a/mutation/collection_mutation.hh b/mutation/collection_mutation.hh index c5343995e0..e0c5f94314 100644 --- a/mutation/collection_mutation.hh +++ b/mutation/collection_mutation.hh @@ -130,6 +130,39 @@ public: operator collection_mutation_view() const; }; +class collection_mutation_writer { +public: + using value_type = std::pair; + +private: + bytes_ostream _out; + bytes::value_type* _size_buffer; + + tombstone _tomb; + int32_t _size{0}; +public: + explicit collection_mutation_writer(tombstone tomb); + + bool empty() const { + return !_tomb && _size == 0; + } + + tombstone tombstone() const { + return _tomb; + } + + void push_back(managed_bytes_view key, atomic_cell_view value); + void push_back(managed_bytes_view key, atomic_cell value) { + push_back(std::move(key), atomic_cell_view(value)); + } + + void push_back(value_type kv) { + push_back(std::move(kv.first), std::move(kv.second)); + } + + collection_mutation finish() &&; +}; + collection_mutation merge(const abstract_type&, collection_mutation_view, collection_mutation_view); collection_mutation difference(const abstract_type&, collection_mutation_view, collection_mutation_view);