mutation/collection_mutation: add collection_mutation_writer

Create a serialized collection element-by-element, with size not known
up-front. Appended elements are written into the buffer directly.
Allows replacing the current practice of first creating a
vector of cells, then serializing at the end.
This commit is contained in:
Botond Dénes
2026-03-18 12:22:45 +02:00
parent b5a301e78a
commit 11d7128bf6
2 changed files with 81 additions and 0 deletions

View File

@@ -372,6 +372,54 @@ collection_mutation read_from_collection_cell_view(const abstract_type& type, co
return serialize_collection_mutation<serialized_cell_adaptor>(tomb, std::ranges::subrange(cells.begin(), cells.end()));
}
collection_mutation_writer::collection_mutation_writer(::tombstone tomb) : _tomb(tomb) {
auto oit = _out.write_begin();
write<uint8_t>(oit, uint8_t(bool(tomb)));
if (tomb) {
write<int64_t>(oit, tomb.timestamp);
write<int64_t>(oit, tomb.deletion_time.time_since_epoch().count());
}
_size_buffer = _out.write_place_holder<int32_t>().ptr;
}
void collection_mutation_writer::push_back(managed_bytes_view key, atomic_cell_view value) {
{
auto oit = _out.write_begin();
write<int32_t>(oit, key.size());
_out.write(key);
}
{
auto oit = _out.write_begin();
const auto value_bytes = value.serialize();
write<int32_t>(oit, value_bytes.size());
_out.write(value_bytes);
}
++_size;
}
collection_mutation collection_mutation_writer::finish() && {
if (empty()) {
return {};
}
write<int32_t>(_size_buffer, _size);
// Force a copy of the serialized collection, for 2 reasons:
// * Shrink the allocated memory to actually needed size.
// * Ensure we are using the correct allocator: bytes_ostream uses the
// standard allocator, but we may be in an LSA context here.
auto& outer_allocator = current_allocator();
return with_allocator(standard_allocator(), [&] {
// bytes_ostream uses malloc(), so we need to force standard
// allocator context here when the buffer is destroyed.
const auto tmp = std::move(_out).to_managed_bytes();
return with_allocator(outer_allocator, [&] {
return collection_mutation(tmp);
});
});
}
template <typename C>
requires std::is_base_of_v<abstract_type, std::remove_reference_t<C>>
static collection_mutation_view_description

View File

@@ -130,6 +130,39 @@ public:
operator collection_mutation_view() const;
};
class collection_mutation_writer {
public:
using value_type = std::pair<managed_bytes_view, atomic_cell_view>;
private:
bytes_ostream _out;
bytes::value_type* _size_buffer;
tombstone _tomb;
int32_t _size{0};
public:
explicit collection_mutation_writer(tombstone tomb);
bool empty() const {
return !_tomb && _size == 0;
}
tombstone tombstone() const {
return _tomb;
}
void push_back(managed_bytes_view key, atomic_cell_view value);
void push_back(managed_bytes_view key, atomic_cell value) {
push_back(std::move(key), atomic_cell_view(value));
}
void push_back(value_type kv) {
push_back(std::move(kv.first), std::move(kv.second));
}
collection_mutation finish() &&;
};
collection_mutation merge(const abstract_type&, collection_mutation_view, collection_mutation_view);
collection_mutation difference(const abstract_type&, collection_mutation_view, collection_mutation_view);