diff --git a/mutation/mutation.cc b/mutation/mutation.cc index 6130e526e5..7cdebc3af3 100644 --- a/mutation/mutation.cc +++ b/mutation/mutation.cc @@ -212,20 +212,20 @@ class mutation_by_size_splitter { } }; const schema_ptr _schema; - utils::chunked_vector& _target; const size_t _max_size; + std::function _process_mutation; std::optional _state; template stop_iteration consume_fragment(T&& fragment) { const auto fragment_size = fragment.memory_usage(*_schema); if (_state->size && _state->size + _state->empty_partition_size + fragment_size > _max_size) { - _target.emplace_back(_state->builder.flush()); // We could end up with an empty mutation if we consumed a range_tombstone_change // and the next fragment exceeds the limit. The tombstone range may not have been // closed yet and range_tombstone will not be created. // This should be a rare case though, so just pop such mutation. - if (_target.back().partition().empty()) { - _target.pop_back(); + auto m = _state->builder.flush(); + if (!m.partition().empty()) { + _process_mutation(std::move(m)); } _state->size = 0; } @@ -234,10 +234,10 @@ class mutation_by_size_splitter { return stop_iteration::no; } public: - mutation_by_size_splitter(schema_ptr schema, utils::chunked_vector& target, size_t max_size) + mutation_by_size_splitter(schema_ptr schema, size_t max_size, std::function process_mutation) : _schema(std::move(schema)) - , _target(target) , _max_size(max_size) + , _process_mutation(process_mutation) { } void consume_new_partition(const dht::decorated_key& dk) { @@ -264,7 +264,7 @@ public: // was already emitted in the previous mutation (because the previous mutation was flushed // after consuming a clustering_row at that position). if (!mut_opt->partition().empty()) { - _target.emplace_back(std::move(*mut_opt)); + _process_mutation(std::move(*mut_opt)); } } else { on_internal_error(mlog, "consume_end_of_stream didn't return a mutation"); @@ -278,7 +278,7 @@ public: }; } -future<> split_mutation(mutation source, utils::chunked_vector& target, size_t max_size) { +future<> for_each_split_mutation(mutation source, size_t max_size, std::function process_mutation) { reader_concurrency_semaphore sem(reader_concurrency_semaphore::no_limits{}, "split_mutation", reader_concurrency_semaphore::register_metrics::no); { @@ -287,12 +287,18 @@ future<> split_mutation(mutation source, utils::chunked_vector& target sem.make_tracking_only_permit(s, "split_mutation", db::no_timeout, {}), std::move(source)); co_await with_closeable(std::move(reader), [&] (mutation_reader& reader) { - return reader.consume(mutation_by_size_splitter(s, target, max_size)); + return reader.consume(mutation_by_size_splitter(s, max_size, std::move(process_mutation))); }); } co_await sem.stop(); } +future<> split_mutation(mutation source, utils::chunked_vector& target, size_t max_size) { + return for_each_split_mutation(std::move(source), max_size, [&target] (mutation m) { + target.emplace_back(std::move(m)); + }); +} + auto fmt::formatter::format(const mutation& m, fmt::format_context& ctx) const -> decltype(ctx.out()) { const ::schema& s = *m.schema(); diff --git a/mutation/mutation.hh b/mutation/mutation.hh index bb6fbdd605..5a68b0d712 100644 --- a/mutation/mutation.hh +++ b/mutation/mutation.hh @@ -475,3 +475,5 @@ template <> struct fmt::formatter : fmt::formatter { // to pass half of the required value as max_size; such a margin should ensure // that the condition is met. future<> split_mutation(mutation source, utils::chunked_vector& target, size_t max_size); + +future<> for_each_split_mutation(mutation source, size_t max_size, std::function process_mutation); diff --git a/service/storage_service.cc b/service/storage_service.cc index fa93f89c1f..86f51e9f52 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -924,11 +924,9 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) { if (m.representation().size() <= max_size) { frozen_muts_to_apply.push_back(co_await freeze_gently(mut)); } else { - utils::chunked_vector split_muts; - co_await split_mutation(std::move(mut), split_muts, max_size); - for (auto& mut : split_muts) { + co_await for_each_split_mutation(std::move(mut), max_size, [&] (mutation m) -> future<> { frozen_muts_to_apply.push_back(co_await freeze_gently(mut)); - } + }); } } }