mutation: add for_each_split_mutation
Allows processing of the split mutations one at a time. This can reduce memory footprint as the caller won't have to store a vector of the split mutations and then convert it (e.g. freeze the mutations or convert them to canonical mutations). Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -212,20 +212,20 @@ class mutation_by_size_splitter {
|
||||
}
|
||||
};
|
||||
const schema_ptr _schema;
|
||||
utils::chunked_vector<mutation>& _target;
|
||||
const size_t _max_size;
|
||||
std::function<void(mutation)> _process_mutation;
|
||||
std::optional<partition_state> _state;
|
||||
template <typename T>
|
||||
stop_iteration consume_fragment(T&& fragment) {
|
||||
const auto fragment_size = fragment.memory_usage(*_schema);
|
||||
if (_state->size && _state->size + _state->empty_partition_size + fragment_size > _max_size) {
|
||||
_target.emplace_back(_state->builder.flush());
|
||||
// We could end up with an empty mutation if we consumed a range_tombstone_change
|
||||
// and the next fragment exceeds the limit. The tombstone range may not have been
|
||||
// closed yet and range_tombstone will not be created.
|
||||
// This should be a rare case though, so just pop such mutation.
|
||||
if (_target.back().partition().empty()) {
|
||||
_target.pop_back();
|
||||
auto m = _state->builder.flush();
|
||||
if (!m.partition().empty()) {
|
||||
_process_mutation(std::move(m));
|
||||
}
|
||||
_state->size = 0;
|
||||
}
|
||||
@@ -234,10 +234,10 @@ class mutation_by_size_splitter {
|
||||
return stop_iteration::no;
|
||||
}
|
||||
public:
|
||||
mutation_by_size_splitter(schema_ptr schema, utils::chunked_vector<mutation>& target, size_t max_size)
|
||||
mutation_by_size_splitter(schema_ptr schema, size_t max_size, std::function<void(mutation)> process_mutation)
|
||||
: _schema(std::move(schema))
|
||||
, _target(target)
|
||||
, _max_size(max_size)
|
||||
, _process_mutation(process_mutation)
|
||||
{
|
||||
}
|
||||
void consume_new_partition(const dht::decorated_key& dk) {
|
||||
@@ -264,7 +264,7 @@ public:
|
||||
// was already emitted in the previous mutation (because the previous mutation was flushed
|
||||
// after consuming a clustering_row at that position).
|
||||
if (!mut_opt->partition().empty()) {
|
||||
_target.emplace_back(std::move(*mut_opt));
|
||||
_process_mutation(std::move(*mut_opt));
|
||||
}
|
||||
} else {
|
||||
on_internal_error(mlog, "consume_end_of_stream didn't return a mutation");
|
||||
@@ -278,7 +278,7 @@ public:
|
||||
};
|
||||
}
|
||||
|
||||
future<> split_mutation(mutation source, utils::chunked_vector<mutation>& target, size_t max_size) {
|
||||
future<> for_each_split_mutation(mutation source, size_t max_size, std::function<void(mutation)> process_mutation) {
|
||||
reader_concurrency_semaphore sem(reader_concurrency_semaphore::no_limits{}, "split_mutation",
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
{
|
||||
@@ -287,12 +287,18 @@ future<> split_mutation(mutation source, utils::chunked_vector<mutation>& target
|
||||
sem.make_tracking_only_permit(s, "split_mutation", db::no_timeout, {}),
|
||||
std::move(source));
|
||||
co_await with_closeable(std::move(reader), [&] (mutation_reader& reader) {
|
||||
return reader.consume(mutation_by_size_splitter(s, target, max_size));
|
||||
return reader.consume(mutation_by_size_splitter(s, max_size, std::move(process_mutation)));
|
||||
});
|
||||
}
|
||||
co_await sem.stop();
|
||||
}
|
||||
|
||||
future<> split_mutation(mutation source, utils::chunked_vector<mutation>& target, size_t max_size) {
|
||||
return for_each_split_mutation(std::move(source), max_size, [&target] (mutation m) {
|
||||
target.emplace_back(std::move(m));
|
||||
});
|
||||
}
|
||||
|
||||
auto fmt::formatter<mutation>::format(const mutation& m, fmt::format_context& ctx) const
|
||||
-> decltype(ctx.out()) {
|
||||
const ::schema& s = *m.schema();
|
||||
|
||||
@@ -475,3 +475,5 @@ template <> struct fmt::formatter<mutation> : fmt::formatter<string_view> {
|
||||
// to pass half of the required value as max_size; such a margin should ensure
|
||||
// that the condition is met.
|
||||
future<> split_mutation(mutation source, utils::chunked_vector<mutation>& target, size_t max_size);
|
||||
|
||||
future<> for_each_split_mutation(mutation source, size_t max_size, std::function<void(mutation)> process_mutation);
|
||||
|
||||
@@ -924,11 +924,9 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
|
||||
if (m.representation().size() <= max_size) {
|
||||
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
|
||||
} else {
|
||||
utils::chunked_vector<mutation> split_muts;
|
||||
co_await split_mutation(std::move(mut), split_muts, max_size);
|
||||
for (auto& mut : split_muts) {
|
||||
co_await for_each_split_mutation(std::move(mut), max_size, [&] (mutation m) -> future<> {
|
||||
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user