Files
scylladb/db/view/view_consumer.hh
Michał Jadwiszczak a59624c604 db/view: extract common view building functionalities
Extract common methods of view builder consumer to an abstract class
and `flush_base()` and `make_partition_slice()` functions,
so they can be used in view builder (vnode-based views) and view
building consumer (tablet-based views; introduced in the next commit).
2025-08-27 08:55:48 +02:00

79 lines
2.6 KiB
C++

/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "db/view/view_update_generator.hh"
#include "mutation/mutation_fragment_v2.hh"
namespace db {
namespace view {
class view_update_generator;
// Abstract consumer which goes over base table
// and populates all its views with the data.
// The class is abstract, so it can be used by 2 similar,
// yet slightly different services: view builder (for vnode-based views)
// and view building worker (for tablet-based views).
// Must be called in the context of a seastar::thread.
class view_consumer {
protected:
shared_ptr<view_update_generator> _gen;
gc_clock::time_point _now;
std::vector<view_ptr> _views_to_build;
abort_source& _as;
std::deque<mutation_fragment_v2> _fragments;
// The compact_for_query<> that feeds this consumer is already configured
// to feed us up to view_builder::batchsize (128) rows and not an entire
// partition. Still, if rows contain large blobs, saving 128 of them in
// _fragments may be too much. So we want to track _fragment's memory
// usage, and flush the _fragments if it has grown too large.
// Additionally, limiting _fragment's size also solves issue #4213:
// A single view mutation can be as large as the size of the base rows
// used to build it, and we cannot allow its serialized size to grow
// beyond our limit on mutation size (by default 32 MB).
size_t _fragments_memory_usage = 0;
virtual void check_for_built_views() = 0;
virtual void load_views_to_build() = 0;
virtual bool should_stop_consuming_end_of_partition() = 0;
virtual dht::decorated_key& get_current_key() = 0;
virtual void set_current_key(dht::decorated_key key) = 0;
virtual lw_shared_ptr<replica::table> base() = 0;
virtual mutation_reader& reader() = 0;
virtual reader_permit& permit() = 0;
void add_fragment(auto&& fragment);
void flush_fragments();
public:
view_consumer(shared_ptr<view_update_generator> gen, gc_clock::time_point now, abort_source& as);
stop_iteration consume_new_partition(const dht::decorated_key& dk);
stop_iteration consume(tombstone);
stop_iteration consume(static_row&& sr, tombstone, bool);
stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_live);
stop_iteration consume(range_tombstone_change&&);
stop_iteration consume_end_of_partition();
};
future<> flush_base(lw_shared_ptr<replica::column_family> base, abort_source& as);
query::partition_slice make_partition_slice(const schema& s);
}
}