Files
scylladb/db/view/view_consumer.cc
Michał Jadwiszczak a59624c604 db/view: extract common view building functionalities
Extract common methods of view builder consumer to an abstract class
and `flush_base()` and `make_partition_slice()` functions,
so they can be used in view builder (vnode-based views) and view
building consumer (tablet-based views; introduced in the next commit).
2025-08-27 08:55:48 +02:00

151 lines
5.4 KiB
C++

/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/view/view_consumer.hh"
#include "db/view/view_builder.hh"
#include "readers/from_fragments.hh"
#include "utils/exponential_backoff_retry.hh"
using namespace std::chrono_literals;
static logging::logger vc_logger("view_consumer");
static inline void inject_failure(std::string_view operation) {
utils::get_local_injector().inject(operation,
[operation] { throw std::runtime_error(std::string(operation)); });
}
namespace db {
namespace view {
view_consumer::view_consumer(shared_ptr<view_update_generator> gen, gc_clock::time_point now, abort_source& as)
: _gen(std::move(gen))
, _now(now)
, _as(as) {}
void view_consumer::add_fragment(auto&& fragment) {
_fragments_memory_usage += fragment.memory_usage(*reader().schema());
_fragments.emplace_back(*reader().schema(), permit(), std::move(fragment));
if (_fragments_memory_usage > view_builder::batch_memory_max) {
// Although we have not yet completed the batch of base rows that
// compact_for_query<> planned for us (view_builder::batchsize),
// we've still collected enough rows to reach sizeable memory use,
// so let's flush these rows now.
flush_fragments();
}
}
void view_consumer::flush_fragments() {
inject_failure("view_builder_flush_fragments");
_as.check();
if (!_fragments.empty()) {
_fragments.emplace_front(*reader().schema(), permit(), partition_start(get_current_key(), tombstone()));
auto base_schema = base()->schema();
auto fragments_reader = make_mutation_reader_from_fragments(reader().schema(), permit(), std::move(_fragments));
auto close_reader = defer([&fragments_reader] { fragments_reader.close().get(); });
fragments_reader.upgrade_schema(base_schema);
_gen->populate_views(
*base(),
_views_to_build,
get_current_key().token(),
std::move(fragments_reader),
_now).get();
close_reader.cancel();
_fragments.clear();
_fragments_memory_usage = 0;
}
}
stop_iteration view_consumer::consume_new_partition(const dht::decorated_key& dk) {
inject_failure("view_builder_consume_new_partition");
if (dk.key().is_empty()) {
on_internal_error(vc_logger, format("Trying to consume empty partition key {}", dk));
}
set_current_key(std::move(dk));
check_for_built_views();
_views_to_build.clear();
load_views_to_build();
return stop_iteration(_views_to_build.empty());
}
stop_iteration view_consumer::consume(tombstone) {
inject_failure("view_builder_consume_tombstone");
return stop_iteration::no;
}
stop_iteration view_consumer::consume(static_row&& sr, tombstone, bool) {
inject_failure("view_builder_consume_static_row");
if (_views_to_build.empty() || _as.abort_requested()) {
return stop_iteration::yes;
}
add_fragment(std::move(sr));
return stop_iteration::no;
}
stop_iteration view_consumer::consume(clustering_row&& cr, row_tombstone, bool is_live) {
inject_failure("view_builder_consume_clustering_row");
if (!is_live) {
return stop_iteration::no;
}
if (_views_to_build.empty() || _as.abort_requested()) {
return stop_iteration::yes;
}
add_fragment(std::move(cr));
return stop_iteration::no;
}
stop_iteration view_consumer::consume(range_tombstone_change&&) {
inject_failure("view_builder_consume_range_tombstone");
return stop_iteration::no;
}
stop_iteration view_consumer::consume_end_of_partition() {
inject_failure("view_builder_consume_end_of_partition");
utils::get_local_injector().inject("view_builder_consume_end_of_partition_delay", utils::wait_for_message(std::chrono::seconds(60))).get();
flush_fragments();
return stop_iteration(should_stop_consuming_end_of_partition());
}
future<> flush_base(lw_shared_ptr<replica::column_family> base, abort_source& as) {
struct empty_state { };
return exponential_backoff_retry::do_until_value(1s, 1min, as, [base = std::move(base)] {
return base->flush().then_wrapped([base] (future<> f) -> std::optional<empty_state> {
if (f.failed()) {
vc_logger.error("Error flushing base table {}.{}: {}; retrying", base->schema()->ks_name(), base->schema()->cf_name(), f.get_exception());
return { };
}
return { empty_state{} };
});
}).discard_result();
}
query::partition_slice make_partition_slice(const schema& s) {
query::partition_slice::option_set opts;
opts.set(query::partition_slice::option::send_partition_key);
opts.set(query::partition_slice::option::send_clustering_key);
opts.set(query::partition_slice::option::send_timestamp);
opts.set(query::partition_slice::option::send_ttl);
opts.set(query::partition_slice::option::always_return_static_content);
return query::partition_slice(
{query::full_clustering_range},
s.static_columns()
| std::views::transform(std::mem_fn(&column_definition::id))
| std::ranges::to<query::column_id_vector>(),
s.regular_columns()
| std::views::transform(std::mem_fn(&column_definition::id))
| std::ranges::to<query::column_id_vector>(),
std::move(opts));
}
}
}