db/view/view_updating_consumer: migrate to v2

Not a completely mechanical transition. The consumer has to generate its
mutation via a mutation_rebuilder_v2 as mutation fragment v2 cannot be
applied to mutations directly yet.
This commit is contained in:
Botond Dénes
2022-01-25 09:42:21 +02:00
parent 014a23bf2a
commit 05c48ee0cc
4 changed files with 29 additions and 19 deletions

View File

@@ -2168,20 +2168,27 @@ void view_updating_consumer::do_flush_buffer() {
}
_buffer_size = 0;
_m = nullptr;
}
void view_updating_consumer::flush_builder() {
_mut_builder->consume_end_of_partition();
if (auto mut_opt = _mut_builder->consume_end_of_stream()) {
_buffer.emplace_back(std::move(*mut_opt));
}
_mut_builder.reset();
}
void view_updating_consumer::maybe_flush_buffer_mid_partition() {
if (_buffer_size >= buffer_size_hard_limit) {
auto m = mutation(_schema, _m->decorated_key(), mutation_partition(_schema));
flush_builder();
auto dk = _buffer.back().decorated_key();
do_flush_buffer();
_buffer.emplace_back(std::move(m));
_m = &_buffer.back();
consume_new_partition(dk);
}
}
view_updating_consumer::view_updating_consumer(schema_ptr schema, reader_permit permit, replica::table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
evictable_reader_handle& staging_reader_handle)
evictable_reader_handle_v2& staging_reader_handle)
: view_updating_consumer(std::move(schema), std::move(permit), as, staging_reader_handle,
[table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables)] (mutation m) mutable {
auto s = m.schema();

View File

@@ -74,7 +74,7 @@ future<> view_update_generator::start() {
mutation_reader::forwarding fwd_mr) {
return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
});
auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader(
auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader_v2(
std::move(ms),
s,
permit,

View File

@@ -16,6 +16,7 @@
#include "db/view/row_locking.hh"
#include <seastar/core/abort_source.hh>
#include "mutation.hh"
#include "mutation_rebuilder.hh"
class evictable_reader_handle;
@@ -37,19 +38,20 @@ private:
schema_ptr _schema;
reader_permit _permit;
const seastar::abort_source* _as;
evictable_reader_handle& _staging_reader_handle;
evictable_reader_handle_v2& _staging_reader_handle;
circular_buffer<mutation> _buffer;
mutation* _m{nullptr};
std::optional<mutation_rebuilder_v2> _mut_builder;
size_t _buffer_size{0};
noncopyable_function<future<row_locker::lock_holder>(mutation)> _view_update_pusher;
private:
void do_flush_buffer();
void flush_builder();
void maybe_flush_buffer_mid_partition();
public:
// Push updates with a custom pusher. Mainly for tests.
view_updating_consumer(schema_ptr schema, reader_permit permit, const seastar::abort_source& as, evictable_reader_handle& staging_reader_handle,
view_updating_consumer(schema_ptr schema, reader_permit permit, const seastar::abort_source& as, evictable_reader_handle_v2& staging_reader_handle,
noncopyable_function<future<row_locker::lock_holder>(mutation)> view_update_pusher)
: _schema(std::move(schema))
, _permit(std::move(permit))
@@ -59,19 +61,19 @@ public:
{ }
view_updating_consumer(schema_ptr schema, reader_permit permit, replica::table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
evictable_reader_handle& staging_reader_handle);
evictable_reader_handle_v2& staging_reader_handle);
view_updating_consumer(view_updating_consumer&&) = default;
view_updating_consumer& operator=(view_updating_consumer&&) = delete;
void consume_new_partition(const dht::decorated_key& dk) {
_buffer.emplace_back(_schema, dk, mutation_partition(_schema));
_m = &_buffer.back();
_mut_builder.emplace(_schema);
_mut_builder->consume_new_partition(dk);
}
void consume(tombstone t) {
_m->partition().apply(std::move(t));
_mut_builder->consume(t);
}
stop_iteration consume(static_row&& sr) {
@@ -79,7 +81,7 @@ public:
return stop_iteration::yes;
}
_buffer_size += sr.memory_usage(*_schema);
_m->partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(sr)));
_mut_builder->consume(std::move(sr));
maybe_flush_buffer_mid_partition();
return stop_iteration::no;
}
@@ -89,17 +91,17 @@ public:
return stop_iteration::yes;
}
_buffer_size += cr.memory_usage(*_schema);
_m->partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(cr)));
_mut_builder->consume(std::move(cr));
maybe_flush_buffer_mid_partition();
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
stop_iteration consume(range_tombstone_change&& rtc) {
if (_as->abort_requested()) {
return stop_iteration::yes;
}
_buffer_size += rt.memory_usage(*_schema);
_m->partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(rt)));
_buffer_size += rtc.memory_usage(*_schema);
_mut_builder->consume(std::move(rtc));
maybe_flush_buffer_mid_partition();
return stop_iteration::no;
}
@@ -109,6 +111,7 @@ public:
if (_as->abort_requested()) {
return stop_iteration::yes;
}
flush_builder();
if (_buffer_size >= buffer_size_soft_limit) {
do_flush_buffer();
}

View File

@@ -827,7 +827,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
mt->apply(mut);
}
auto p = make_manually_paused_evictable_reader(
auto p = make_manually_paused_evictable_reader_v2(
mt->as_data_source(),
schema,
permit,