view_builder: Wake-up step fiber with condition variable

View builder runs a background fiber that perform build steps. To kick
the fiber it uses serizlized action, but it's an overkill -- nobody
waits for the action to finish, but on stop, when it's joined.

This patch uses condition variable to kick the fiber, and starts it
instantly, in the place where serialized action was first kicked.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2026-01-27 14:07:49 +03:00
parent 834961c308
commit 2439d27b60
2 changed files with 22 additions and 13 deletions

View File

@@ -2240,6 +2240,7 @@ void view_builder::setup_metrics() {
}
future<> view_builder::start_in_background(service::migration_manager& mm, utils::cross_shard_barrier barrier) {
auto step_fiber = make_ready_future<>();
try {
view_builder_init_state vbi;
auto fail = defer([&barrier] mutable { barrier.abort(); });
@@ -2273,8 +2274,10 @@ future<> view_builder::start_in_background(service::migration_manager& mm, utils
_mnotifier.register_listener(this);
co_await calculate_shard_build_step(vbi);
_current_step = _base_to_build_step.begin();
// Waited on indirectly in stop().
(void)_build_step.trigger();
// If preparation above fails, run_in_background() is not invoked, just
// the start_in_background() emits a warning into logs and resolves
step_fiber = run_in_background();
} catch (...) {
auto ex = std::current_exception();
auto ll = log_level::error;
@@ -2289,10 +2292,12 @@ future<> view_builder::start_in_background(service::migration_manager& mm, utils
}
vlogger.log(ll, "start aborted: {}", ex);
}
co_await std::move(step_fiber);
}
future<> view_builder::start(service::migration_manager& mm, utils::cross_shard_barrier barrier) {
_started = start_in_background(mm, std::move(barrier));
_step_fiber = start_in_background(mm, std::move(barrier));
return make_ready_future<>();
}
@@ -2302,12 +2307,12 @@ future<> view_builder::drain() {
}
vlogger.info("Draining view builder");
_as.request_abort();
co_await std::move(_started);
co_await _mnotifier.unregister_listener(this);
co_await _vug.drain();
co_await _sem.wait();
_sem.broken();
co_await _build_step.join();
_build_step.broken();
co_await std::move(_step_fiber);
co_await coroutine::parallel_for_each(_base_to_build_step, [] (std::pair<const table_id, build_step>& p) {
return p.second.reader.close();
});
@@ -2728,8 +2733,7 @@ future<> view_builder::handle_create_view_local(const sstring& ks_name, const ss
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), std::current_exception());
}
// Waited on indirectly in stop().
static_cast<void>(_build_step.trigger());
_build_step.signal();
}
void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) {
@@ -2828,14 +2832,19 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name
}));
}
future<> view_builder::do_build_step() {
future<> view_builder::run_in_background() {
// Run the view building in the streaming scheduling group
// so that it doesn't impact other tasks with higher priority.
seastar::thread_attributes attr;
attr.sched_group = _db.get_streaming_scheduling_group();
return seastar::async(std::move(attr), [this] {
exponential_backoff_retry r(1s, 1min);
while (!_base_to_build_step.empty() && !_as.abort_requested()) {
while (!_as.abort_requested()) {
try {
_build_step.wait([this] { return !_base_to_build_step.empty(); }).get();
} catch (const seastar::broken_condition_variable&) {
return;
}
auto units = get_units(_sem, view_builder_semaphore_units).get();
++_stats.steps_performed;
try {

View File

@@ -11,13 +11,13 @@
#include "query/query-request.hh"
#include "service/migration_listener.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/serialized_action.hh"
#include "utils/cross-shard-barrier.hh"
#include "replica/database.hh"
#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/shared_ptr.hh>
@@ -174,7 +174,7 @@ class view_builder final : public service::migration_listener::only_view_notific
reader_permit _permit;
base_to_build_step_type _base_to_build_step;
base_to_build_step_type::iterator _current_step = _base_to_build_step.end();
serialized_action _build_step{std::bind(&view_builder::do_build_step, this)};
condition_variable _build_step;
static constexpr size_t view_builder_semaphore_units = 1;
// Ensures bookkeeping operations are serialized, meaning that while we execute
// a build step we don't consider newly added or removed views. This simplifies
@@ -191,7 +191,7 @@ class view_builder final : public service::migration_listener::only_view_notific
// `on_drop_view`, `on_create_view`, or `on_update_view` events.
seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}};
seastar::abort_source _as;
future<> _started = make_ready_future<>();
future<> _step_fiber = make_ready_future<>();
// Used to coordinate between shards the conclusion of the build process for a particular view.
std::unordered_set<table_id> _built_views;
// Used for testing.
@@ -278,7 +278,7 @@ private:
void setup_shard_build_step(view_builder_init_state& vbi, std::vector<system_keyspace_view_name>, std::vector<system_keyspace_view_build_progress>);
future<> calculate_shard_build_step(view_builder_init_state& vbi);
future<> add_new_view(view_ptr, build_step&);
future<> do_build_step();
future<> run_in_background();
void execute(build_step&, exponential_backoff_retry);
future<> maybe_mark_view_as_built(view_ptr, dht::token);
future<> mark_as_built(view_ptr);