mutation: add for_each_split_mutation

Allows processing of the split mutations one at a time.
This can reduce memory footprint as the caller
won't have to store a vector of the split mutations
and then convert it (e.g. freeze the mutations
or convert them to canonical mutations).

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2024-04-04 12:53:49 +03:00
parent d21984d0cc
commit faa0ee9844
3 changed files with 19 additions and 13 deletions

View File

@@ -212,20 +212,20 @@ class mutation_by_size_splitter {
}
};
const schema_ptr _schema;
utils::chunked_vector<mutation>& _target;
const size_t _max_size;
std::function<void(mutation)> _process_mutation;
std::optional<partition_state> _state;
template <typename T>
stop_iteration consume_fragment(T&& fragment) {
const auto fragment_size = fragment.memory_usage(*_schema);
if (_state->size && _state->size + _state->empty_partition_size + fragment_size > _max_size) {
_target.emplace_back(_state->builder.flush());
// We could end up with an empty mutation if we consumed a range_tombstone_change
// and the next fragment exceeds the limit. The tombstone range may not have been
// closed yet and range_tombstone will not be created.
// This should be a rare case though, so just pop such mutation.
if (_target.back().partition().empty()) {
_target.pop_back();
auto m = _state->builder.flush();
if (!m.partition().empty()) {
_process_mutation(std::move(m));
}
_state->size = 0;
}
@@ -234,10 +234,10 @@ class mutation_by_size_splitter {
return stop_iteration::no;
}
public:
mutation_by_size_splitter(schema_ptr schema, utils::chunked_vector<mutation>& target, size_t max_size)
mutation_by_size_splitter(schema_ptr schema, size_t max_size, std::function<void(mutation)> process_mutation)
: _schema(std::move(schema))
, _target(target)
, _max_size(max_size)
, _process_mutation(process_mutation)
{
}
void consume_new_partition(const dht::decorated_key& dk) {
@@ -264,7 +264,7 @@ public:
// was already emitted in the previous mutation (because the previous mutation was flushed
// after consuming a clustering_row at that position).
if (!mut_opt->partition().empty()) {
_target.emplace_back(std::move(*mut_opt));
_process_mutation(std::move(*mut_opt));
}
} else {
on_internal_error(mlog, "consume_end_of_stream didn't return a mutation");
@@ -278,7 +278,7 @@ public:
};
}
future<> split_mutation(mutation source, utils::chunked_vector<mutation>& target, size_t max_size) {
future<> for_each_split_mutation(mutation source, size_t max_size, std::function<void(mutation)> process_mutation) {
reader_concurrency_semaphore sem(reader_concurrency_semaphore::no_limits{}, "split_mutation",
reader_concurrency_semaphore::register_metrics::no);
{
@@ -287,12 +287,18 @@ future<> split_mutation(mutation source, utils::chunked_vector<mutation>& target
sem.make_tracking_only_permit(s, "split_mutation", db::no_timeout, {}),
std::move(source));
co_await with_closeable(std::move(reader), [&] (mutation_reader& reader) {
return reader.consume(mutation_by_size_splitter(s, target, max_size));
return reader.consume(mutation_by_size_splitter(s, max_size, std::move(process_mutation)));
});
}
co_await sem.stop();
}
future<> split_mutation(mutation source, utils::chunked_vector<mutation>& target, size_t max_size) {
return for_each_split_mutation(std::move(source), max_size, [&target] (mutation m) {
target.emplace_back(std::move(m));
});
}
auto fmt::formatter<mutation>::format(const mutation& m, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
const ::schema& s = *m.schema();

View File

@@ -475,3 +475,5 @@ template <> struct fmt::formatter<mutation> : fmt::formatter<string_view> {
// to pass half of the required value as max_size; such a margin should ensure
// that the condition is met.
future<> split_mutation(mutation source, utils::chunked_vector<mutation>& target, size_t max_size);
future<> for_each_split_mutation(mutation source, size_t max_size, std::function<void(mutation)> process_mutation);

View File

@@ -924,11 +924,9 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
if (m.representation().size() <= max_size) {
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
} else {
utils::chunked_vector<mutation> split_muts;
co_await split_mutation(std::move(mut), split_muts, max_size);
for (auto& mut : split_muts) {
co_await for_each_split_mutation(std::move(mut), max_size, [&] (mutation m) -> future<> {
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
}
});
}
}
}