Files
scylladb/mutation/mutation_partition_serializer.cc
Avi Kivity ecb6fb00f0 streamed_mutation_freezer: use chunked_vector instead of std::deque for clustering rows
The streamed_mutation_freezer class uses a deque to avoid large
allocations, but fails as seen in the referenced issue when the
vector backing the deque grows too large. This may be a problem
in itself, but the issue doesn't provide enough information to tell.

Fix the immediate problem by switching to chunked_vector, which
is better in avoiding large allocations. We do lose some early-free
in serialize_mutation_fragments(), but since most of the memory should
be in the clustering row itself, not in the deque/chunked_vector holding
it, it should not be a problem.

Fixes #28275

Closes scylladb/scylladb#28281
2026-01-21 10:13:44 +02:00

316 lines
14 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "mutation_partition_serializer.hh"
#include "mutation_partition.hh"
#include "counters.hh"
#include "idl/mutation.dist.impl.hh"
#include "frozen_mutation.hh"
#include <seastar/coroutine/maybe_yield.hh>
using namespace db;
namespace {
template<typename Writer>
auto write_live_cell(Writer&& writer, atomic_cell_view c)
{
return std::move(writer).write_created_at(c.timestamp())
.write_fragmented_value(fragment_range(c.value()))
.end_live_cell();
}
template<typename Writer>
auto write_counter_cell(Writer&& writer, atomic_cell_view c)
{
auto value = std::move(writer).write_created_at(c.timestamp());
return [&c, value = std::move(value)] () mutable {
if (c.is_counter_update()) {
auto delta = c.counter_update_value();
return std::move(value).start_value_counter_cell_update()
.write_delta(delta)
.end_counter_cell_update();
} else {
auto ccv = counter_cell_view(c);
auto shards = std::move(value).start_value_counter_cell_full()
.start_shards();
for (auto csv : ccv.shards()) {
shards.add_shards(counter_shard(csv));
}
return std::move(shards).end_shards().end_counter_cell_full();
}
}().end_counter_cell();
}
template<typename Writer>
auto write_expiring_cell(Writer&& writer, atomic_cell_view c)
{
return std::move(writer).write_ttl(c.ttl())
.write_expiry(c.expiry())
.start_c()
.write_created_at(c.timestamp())
.write_fragmented_value(fragment_range(c.value()))
.end_c()
.end_expiring_cell();
}
template<typename Writer>
auto write_dead_cell(Writer&& writer, atomic_cell_view c)
{
return std::move(writer).start_tomb()
.write_timestamp(c.timestamp())
.write_deletion_time(c.deletion_time())
.end_tomb()
.end_dead_cell();
}
template<typename Writer>
auto write_collection_cell(Writer&& collection_writer, collection_mutation_view cmv, const column_definition& def)
{
return cmv.with_deserialized(*def.type, [&] (collection_mutation_view_description m_view) {
auto cells_writer = std::move(collection_writer).write_tomb(m_view.tomb).start_elements();
for (auto&& c : m_view.cells) {
auto cell_writer = cells_writer.add().write_key(c.first);
if (!c.second.is_live()) {
write_dead_cell(std::move(cell_writer).start_value_dead_cell(), c.second).end_collection_element();
} else if (c.second.is_live_and_has_ttl()) {
write_expiring_cell(std::move(cell_writer).start_value_expiring_cell(), c.second).end_collection_element();
} else {
write_live_cell(std::move(cell_writer).start_value_live_cell(), c.second).end_collection_element();
}
}
return std::move(cells_writer).end_elements().end_collection_cell();
});
}
template<typename Writer>
auto write_row_cells(Writer&& writer, const row& r, const schema& s, column_kind kind)
{
auto column_writer = std::move(writer).start_columns();
r.for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
auto& def = s.column_at(kind, id);
auto cell_or_collection_writer = column_writer.add().write_id(id);
if (def.is_atomic()) {
auto&& c = cell.as_atomic_cell(def);
auto cell_writer = std::move(cell_or_collection_writer).start_c_variant();
if (!c.is_live()) {
write_dead_cell(std::move(cell_writer).start_variant_dead_cell(), c).end_variant().end_column();
} else if (def.is_counter()) {
write_counter_cell(std::move(cell_writer).start_variant_counter_cell(), c).end_variant().end_column();
} else if (c.is_live_and_has_ttl()) {
write_expiring_cell(std::move(cell_writer).start_variant_expiring_cell(), c).end_variant().end_column();
} else {
write_live_cell(std::move(cell_writer).start_variant_live_cell(), c).end_variant().end_column();
}
} else {
write_collection_cell(std::move(cell_or_collection_writer).start_c_collection_cell(), cell.as_collection_mutation(), def).end_column();
}
});
return std::move(column_writer).end_columns();
}
template<typename Writer>
auto write_row_marker(Writer&& writer, const row_marker& marker)
{
if (marker.is_missing()) {
return std::move(writer).start_marker_no_marker().end_no_marker();
} else if (!marker.is_live()) {
return std::move(writer).start_marker_dead_marker()
.start_tomb()
.write_timestamp(marker.timestamp())
.write_deletion_time(marker.deletion_time())
.end_tomb()
.end_dead_marker();
} else if (marker.is_expiring()) {
return std::move(writer).start_marker_expiring_marker()
.start_lm()
.write_created_at(marker.timestamp())
.end_lm()
.write_ttl(marker.ttl())
.write_expiry(marker.expiry())
.end_expiring_marker();
} else {
return std::move(writer).start_marker_live_marker()
.write_created_at(marker.timestamp())
.end_live_marker();
}
}
}
template <typename RowTombstones>
static void write_tombstones(const schema& s, RowTombstones& row_tombstones, const range_tombstone_list& rt_list)
{
for (auto&& rte : rt_list) {
auto& rt = rte.tombstone();
row_tombstones.add().write_start(rt.start).write_tomb(rt.tomb).write_start_kind(rt.start_kind)
.write_end(rt.end).write_end_kind(rt.end_kind).end_range_tombstone();
}
}
template<typename Writer>
static auto write_tombstone(Writer&& writer, const tombstone& t) {
return std::move(writer).write_timestamp(t.timestamp).write_deletion_time(t.deletion_time);
}
template<typename Writer>
static auto write_row(Writer&& writer, const schema& s, const clustering_key_prefix& key, const row& cells, const row_marker& m, const row_tombstone& t) {
auto marker_writer = std::move(writer).write_key(key);
auto deleted_at_writer = write_row_marker(std::move(marker_writer), m).start_deleted_at();
auto row_writer = write_tombstone(std::move(deleted_at_writer), t.regular()).end_deleted_at().start_cells();
auto shadowable_deleted_at_writer = write_row_cells(std::move(row_writer), cells, s, column_kind::regular_column).end_cells().start_shadowable_deleted_at();
return write_tombstone(std::move(shadowable_deleted_at_writer), t.shadowable().tomb()).end_shadowable_deleted_at();
}
template<typename Writer>
void mutation_partition_serializer::write_serialized(Writer&& writer, const schema& s, const mutation_partition& mp)
{
auto srow_writer = std::move(writer).write_tomb(mp.partition_tombstone()).start_static_row();
auto row_tombstones = write_row_cells(std::move(srow_writer), mp.static_row().get(), s, column_kind::static_column).end_static_row().start_range_tombstones();
write_tombstones(s, row_tombstones, mp.row_tombstones());
auto clustering_rows = std::move(row_tombstones).end_range_tombstones().start_rows();
for (auto&& cr : mp.non_dummy_rows()) {
write_row(clustering_rows.add(), s, cr.key(), cr.row().cells(), cr.row().marker(), cr.row().deleted_at()).end_deletable_row();
}
std::move(clustering_rows).end_rows().end_mutation_partition();
}
template<typename Writer>
future<> mutation_partition_serializer::write_serialized_gently(Writer&& writer, const schema& s, const mutation_partition& mp)
{
auto srow_writer = std::move(writer).write_tomb(mp.partition_tombstone()).start_static_row();
auto row_tombstones = write_row_cells(std::move(srow_writer), mp.static_row().get(), s, column_kind::static_column).end_static_row().start_range_tombstones();
for (auto&& rte : mp.row_tombstones()) {
co_await coroutine::maybe_yield();
auto& rt = rte.tombstone();
row_tombstones.add().write_start(rt.start).write_tomb(rt.tomb).write_start_kind(rt.start_kind)
.write_end(rt.end).write_end_kind(rt.end_kind).end_range_tombstone();
}
auto clustering_rows = std::move(row_tombstones).end_range_tombstones().start_rows();
for (auto&& cr : mp.non_dummy_rows()) {
co_await coroutine::maybe_yield();
write_row(clustering_rows.add(), s, cr.key(), cr.row().cells(), cr.row().marker(), cr.row().deleted_at()).end_deletable_row();
}
std::move(clustering_rows).end_rows().end_mutation_partition();
}
mutation_partition_serializer::mutation_partition_serializer(const schema& schema, const mutation_partition& p)
: _schema(schema), _p(p)
{ }
void
mutation_partition_serializer::write(bytes_ostream& out) const {
write(ser::writer_of_mutation_partition<bytes_ostream>(out));
}
void mutation_partition_serializer::write(ser::writer_of_mutation_partition<bytes_ostream>&& wr) const
{
write_serialized(std::move(wr), _schema, _p);
}
future<> mutation_partition_serializer::write_gently(bytes_ostream& out) const {
return write_gently(ser::writer_of_mutation_partition<bytes_ostream>(out));
}
future<> mutation_partition_serializer::write_gently(ser::writer_of_mutation_partition<bytes_ostream>&& wr) const
{
return write_serialized_gently(std::move(wr), _schema, _p);
}
void serialize_mutation_fragments(const schema& s, tombstone partition_tombstone,
std::optional<static_row> sr, range_tombstone_list rts,
utils::chunked_vector<clustering_row> crs, ser::writer_of_mutation_partition<bytes_ostream>&& wr)
{
auto srow_writer = std::move(wr).write_tomb(partition_tombstone).start_static_row();
auto row_tombstones = [&] {
if (sr) {
return write_row_cells(std::move(srow_writer), sr->cells(), s, column_kind::static_column).end_static_row().start_range_tombstones();
} else {
return std::move(srow_writer).start_columns().end_columns().end_static_row().start_range_tombstones();
}
}();
sr = { };
write_tombstones(s, row_tombstones, rts);
rts.clear();
auto clustering_rows = std::move(row_tombstones).end_range_tombstones().start_rows();
for (auto& cr : crs) {
write_row(clustering_rows.add(), s, cr.key(), cr.cells(), cr.marker(), cr.tomb()).end_deletable_row();
cr = clustering_row(clustering_key_prefix{});
}
std::move(clustering_rows).end_rows().end_mutation_partition();
}
frozen_mutation_fragment freeze(const schema& s, const mutation_fragment& mf)
{
bytes_ostream out;
ser::writer_of_mutation_fragment<bytes_ostream> writer(out);
mf.visit(seastar::make_visitor(
[&] (const clustering_row& cr) {
return write_row(std::move(writer).start_fragment_clustering_row().start_row(), s, cr.key(), cr.cells(), cr.marker(), cr.tomb())
.end_row()
.end_clustering_row();
},
[&] (const static_row& sr) {
return write_row_cells(std::move(writer).start_fragment_static_row().start_cells(), sr.cells(), s, column_kind::static_column)
.end_cells()
.end_static_row();
},
[&] (const range_tombstone& rt) {
return std::move(writer).write_fragment_range_tombstone(rt);
},
[&] (const partition_start& ps) {
return std::move(writer).start_fragment_partition_start()
.write_key(ps.key().key())
.write_partition_tombstone(ps.partition_tombstone())
.end_partition_start();
},
[&] (const partition_end& pe) {
return std::move(writer).write_fragment_partition_end(pe);
}
)).end_mutation_fragment();
return frozen_mutation_fragment(std::move(out));
}
frozen_mutation_fragment_v2 freeze(const schema& s, const mutation_fragment_v2& mf) {
bytes_ostream out;
ser::writer_of_mutation_fragment_v2<bytes_ostream> writer(out);
mf.visit(seastar::make_visitor(
[&] (const clustering_row& cr) {
return write_row(std::move(writer).start_fragment_clustering_row().start_row(), s, cr.key(), cr.cells(), cr.marker(), cr.tomb())
.end_row()
.end_clustering_row();
},
[&] (const static_row& sr) {
return write_row_cells(std::move(writer).start_fragment_static_row().start_cells(), sr.cells(), s, column_kind::static_column)
.end_cells()
.end_static_row();
},
[&] (const range_tombstone_change& rtc) {
return std::move(writer).start_fragment_range_tombstone_change()
.write_key(rtc.position().key())
.write_weight(rtc.position().get_bound_weight())
.write_tomb(rtc.tombstone())
.end_range_tombstone_change();
},
[&] (const partition_start& ps) {
return std::move(writer).start_fragment_partition_start()
.write_key(ps.key().key())
.write_partition_tombstone(ps.partition_tombstone())
.end_partition_start();
},
[&] (const partition_end& pe) {
return std::move(writer).write_fragment_partition_end(pe);
}
)).end_mutation_fragment_v2();
return frozen_mutation_fragment_v2(std::move(out));
}