Files
scylladb/mutation_rebuilder.hh
Nadav Har'El e34c62c567 Merge 'view_updating_consumer: account empty partitions memory usage' from Botond Dénes
Te view updating consumer uses `_buffer_size` to decide when to flush the accumulated mutations, passing them to the actual view building code. This `_buffer_size` is incremented every time a mutation fragment is consumed. This is not exact, as e.g. range tombstones are represented differently in the mutation object, than in the fragment, but it is good enough. There is one flaw however: `_buffer_size` is not incremented when consuming a partition-start fragment. This is when the mutation object is created in the mutation rebuilder. This is not a big problem when partition have many rows, but if the partitions are tiny, the error in accounting quickly becomes significant. If the partitions are empty, `_buffer_size` is not bumped at all for empty partitions, and any number of these can accumulate in the buffer. We have recently seen this causing stalls and OOM as the buffer got to immense size, only containing empty and tiny partitions.
This PR fixes this by accounting the size of the freshly created `mutation` object in `_buffer_size`, after the partition-start fragment is consumed.

Fixes: #14819

Closes #14821

* github.com:scylladb/scylladb:
  test/boost/view_build_test: add test_view_update_generator_buffering_with_empty_mutations
  db/view/view_updating_consumer: account for the size of mutations
  mutation/mutation_rebuilder*: return const mutation& from consume_new_partition()
  mutation/mutation: add memory_usage()

(cherry picked from commit 056d04954c)
2023-07-31 03:43:44 -04:00

153 lines
4.7 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "mutation.hh"
#include "range_tombstone_assembler.hh"
class mutation_rebuilder {
schema_ptr _s;
mutation_opt _m;
public:
explicit mutation_rebuilder(schema_ptr s) : _s(std::move(s)) { }
const mutation& consume_new_partition(const dht::decorated_key& dk) {
assert(!_m);
_m = mutation(_s, std::move(dk));
return *_m;
}
stop_iteration consume(tombstone t) {
assert(_m);
_m->partition().apply(t);
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
assert(_m);
_m->partition().apply_row_tombstone(*_s, std::move(rt));
return stop_iteration::no;
}
stop_iteration consume(static_row&& sr) {
assert(_m);
_m->partition().static_row().apply(*_s, column_kind::static_column, std::move(sr.cells()));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
assert(_m);
auto& dr = _m->partition().clustered_row(*_s, std::move(cr.key()));
dr.apply(cr.tomb());
dr.apply(cr.marker());
dr.cells().apply(*_s, column_kind::regular_column, std::move(cr.cells()));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
assert(_m);
return stop_iteration::yes;
}
// Might only be called between consume_new_partition()
// and consume_end_of_partition().
//
// Returns (and forgets) the partition contents consumed so far.
// Can be used to split the processing of a large mutation into
// multiple smaller `mutation` objects (which add up to the full mutation).
mutation flush() {
assert(_m);
return std::exchange(*_m, mutation(_s, _m->decorated_key()));
}
mutation_opt consume_end_of_stream() {
return std::move(_m);
}
};
// Builds the mutation corresponding to the next partition in the mutation fragment stream.
// Implements FlattenedConsumerV2, MutationFragmentConsumerV2 and FlatMutationReaderConsumerV2.
// Does not work with streams in streamed_mutation::forwarding::yes mode.
class mutation_rebuilder_v2 {
schema_ptr _s;
mutation_rebuilder _builder;
range_tombstone_assembler _rt_assembler;
position_in_partition _pos = position_in_partition::before_all_clustered_rows();
public:
mutation_rebuilder_v2(schema_ptr s) : _s(std::move(s)), _builder(_s) { }
public:
stop_iteration consume(partition_start mf) {
consume_new_partition(mf.key());
return consume(mf.partition_tombstone());
}
stop_iteration consume(partition_end) {
return consume_end_of_partition();
}
stop_iteration consume(mutation_fragment_v2&& mf) {
return std::move(mf).consume(*this);
}
public:
const mutation& consume_new_partition(const dht::decorated_key& dk) {
return _builder.consume_new_partition(dk);
}
stop_iteration consume(tombstone t) {
_builder.consume(t);
return stop_iteration::no;
}
stop_iteration consume(range_tombstone_change&& rt) {
_pos = rt.position();
if (auto rt_opt = _rt_assembler.consume(*_s, std::move(rt))) {
_builder.consume(std::move(*rt_opt));
}
return stop_iteration::no;
}
stop_iteration consume(static_row&& sr) {
_builder.consume(std::move(sr));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
_pos = position_in_partition::after_key(*_s, cr.position());
_builder.consume(std::move(cr));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
_rt_assembler.on_end_of_stream();
return stop_iteration::yes;
}
mutation_opt consume_end_of_stream() {
_rt_assembler.on_end_of_stream();
return _builder.consume_end_of_stream();
}
// Might only be called between consume_new_partition()
// and consume_end_of_partition().
//
// Returns (and forgets) the partition contents consumed so far.
// Can be used to split the processing of a large mutation into
// multiple smaller `mutation` objects (which add up to the full mutation).
//
// The active range tombstone (if present) is flushed with end bound
// just after the last seen clustered position, but the range tombstone
// remains active, and the next mutation will see it restarted at the
// position it was flushed at.
mutation flush() {
if (auto rt_opt = _rt_assembler.flush(*_s, _pos)) {
_builder.consume(std::move(*rt_opt));
}
return _builder.flush();
}
};