From 05c48ee0cc559fc94f7aa700dc67ceeb0eaceaf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 25 Jan 2022 09:42:21 +0200 Subject: [PATCH] db/view/view_updating_consumer: migrate to v2 Not a completely mechanical transition. The consumer has to generate its mutation via a mutation_rebuilder_v2 as mutation fragment v2 cannot be applied to mutations directly yet. --- db/view/view.cc | 17 ++++++++++++----- db/view/view_update_generator.cc | 2 +- db/view/view_updating_consumer.hh | 27 +++++++++++++++------------ test/boost/view_build_test.cc | 2 +- 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 9a4308988d..4179be9ea2 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2168,20 +2168,27 @@ void view_updating_consumer::do_flush_buffer() { } _buffer_size = 0; - _m = nullptr; +} + +void view_updating_consumer::flush_builder() { + _mut_builder->consume_end_of_partition(); + if (auto mut_opt = _mut_builder->consume_end_of_stream()) { + _buffer.emplace_back(std::move(*mut_opt)); + } + _mut_builder.reset(); } void view_updating_consumer::maybe_flush_buffer_mid_partition() { if (_buffer_size >= buffer_size_hard_limit) { - auto m = mutation(_schema, _m->decorated_key(), mutation_partition(_schema)); + flush_builder(); + auto dk = _buffer.back().decorated_key(); do_flush_buffer(); - _buffer.emplace_back(std::move(m)); - _m = &_buffer.back(); + consume_new_partition(dk); } } view_updating_consumer::view_updating_consumer(schema_ptr schema, reader_permit permit, replica::table& table, std::vector excluded_sstables, const seastar::abort_source& as, - evictable_reader_handle& staging_reader_handle) + evictable_reader_handle_v2& staging_reader_handle) : view_updating_consumer(std::move(schema), std::move(permit), as, staging_reader_handle, [table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables)] (mutation m) mutable { auto s = m.schema(); diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index f49b9e4ca5..3d770a086f 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -74,7 +74,7 @@ future<> view_update_generator::start() { mutation_reader::forwarding fwd_mr) { return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); }); - auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader( + auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader_v2( std::move(ms), s, permit, diff --git a/db/view/view_updating_consumer.hh b/db/view/view_updating_consumer.hh index d015b6bbbe..b94eba867c 100644 --- a/db/view/view_updating_consumer.hh +++ b/db/view/view_updating_consumer.hh @@ -16,6 +16,7 @@ #include "db/view/row_locking.hh" #include #include "mutation.hh" +#include "mutation_rebuilder.hh" class evictable_reader_handle; @@ -37,19 +38,20 @@ private: schema_ptr _schema; reader_permit _permit; const seastar::abort_source* _as; - evictable_reader_handle& _staging_reader_handle; + evictable_reader_handle_v2& _staging_reader_handle; circular_buffer _buffer; - mutation* _m{nullptr}; + std::optional _mut_builder; size_t _buffer_size{0}; noncopyable_function(mutation)> _view_update_pusher; private: void do_flush_buffer(); + void flush_builder(); void maybe_flush_buffer_mid_partition(); public: // Push updates with a custom pusher. Mainly for tests. - view_updating_consumer(schema_ptr schema, reader_permit permit, const seastar::abort_source& as, evictable_reader_handle& staging_reader_handle, + view_updating_consumer(schema_ptr schema, reader_permit permit, const seastar::abort_source& as, evictable_reader_handle_v2& staging_reader_handle, noncopyable_function(mutation)> view_update_pusher) : _schema(std::move(schema)) , _permit(std::move(permit)) @@ -59,19 +61,19 @@ public: { } view_updating_consumer(schema_ptr schema, reader_permit permit, replica::table& table, std::vector excluded_sstables, const seastar::abort_source& as, - evictable_reader_handle& staging_reader_handle); + evictable_reader_handle_v2& staging_reader_handle); view_updating_consumer(view_updating_consumer&&) = default; view_updating_consumer& operator=(view_updating_consumer&&) = delete; void consume_new_partition(const dht::decorated_key& dk) { - _buffer.emplace_back(_schema, dk, mutation_partition(_schema)); - _m = &_buffer.back(); + _mut_builder.emplace(_schema); + _mut_builder->consume_new_partition(dk); } void consume(tombstone t) { - _m->partition().apply(std::move(t)); + _mut_builder->consume(t); } stop_iteration consume(static_row&& sr) { @@ -79,7 +81,7 @@ public: return stop_iteration::yes; } _buffer_size += sr.memory_usage(*_schema); - _m->partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(sr))); + _mut_builder->consume(std::move(sr)); maybe_flush_buffer_mid_partition(); return stop_iteration::no; } @@ -89,17 +91,17 @@ public: return stop_iteration::yes; } _buffer_size += cr.memory_usage(*_schema); - _m->partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(cr))); + _mut_builder->consume(std::move(cr)); maybe_flush_buffer_mid_partition(); return stop_iteration::no; } - stop_iteration consume(range_tombstone&& rt) { + stop_iteration consume(range_tombstone_change&& rtc) { if (_as->abort_requested()) { return stop_iteration::yes; } - _buffer_size += rt.memory_usage(*_schema); - _m->partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(rt))); + _buffer_size += rtc.memory_usage(*_schema); + _mut_builder->consume(std::move(rtc)); maybe_flush_buffer_mid_partition(); return stop_iteration::no; } @@ -109,6 +111,7 @@ public: if (_as->abort_requested()) { return stop_iteration::yes; } + flush_builder(); if (_buffer_size >= buffer_size_soft_limit) { do_flush_buffer(); } diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index a5dcd11305..295ea09a8c 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -827,7 +827,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { mt->apply(mut); } - auto p = make_manually_paused_evictable_reader( + auto p = make_manually_paused_evictable_reader_v2( mt->as_data_source(), schema, permit,