From 2439d27b6093273fd7cdcbb5ff9befa69dd7a64c Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 27 Jan 2026 14:07:49 +0300 Subject: [PATCH] view_builder: Wake-up step fiber with condition variable View builder runs a background fiber that perform build steps. To kick the fiber it uses serizlized action, but it's an overkill -- nobody waits for the action to finish, but on stop, when it's joined. This patch uses condition variable to kick the fiber, and starts it instantly, in the place where serialized action was first kicked. Signed-off-by: Pavel Emelyanov --- db/view/view.cc | 27 ++++++++++++++++++--------- db/view/view_builder.hh | 8 ++++---- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 5a6ebfa966..9ba8677ad7 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2240,6 +2240,7 @@ void view_builder::setup_metrics() { } future<> view_builder::start_in_background(service::migration_manager& mm, utils::cross_shard_barrier barrier) { + auto step_fiber = make_ready_future<>(); try { view_builder_init_state vbi; auto fail = defer([&barrier] mutable { barrier.abort(); }); @@ -2273,8 +2274,10 @@ future<> view_builder::start_in_background(service::migration_manager& mm, utils _mnotifier.register_listener(this); co_await calculate_shard_build_step(vbi); _current_step = _base_to_build_step.begin(); - // Waited on indirectly in stop(). - (void)_build_step.trigger(); + + // If preparation above fails, run_in_background() is not invoked, just + // the start_in_background() emits a warning into logs and resolves + step_fiber = run_in_background(); } catch (...) { auto ex = std::current_exception(); auto ll = log_level::error; @@ -2289,10 +2292,12 @@ future<> view_builder::start_in_background(service::migration_manager& mm, utils } vlogger.log(ll, "start aborted: {}", ex); } + + co_await std::move(step_fiber); } future<> view_builder::start(service::migration_manager& mm, utils::cross_shard_barrier barrier) { - _started = start_in_background(mm, std::move(barrier)); + _step_fiber = start_in_background(mm, std::move(barrier)); return make_ready_future<>(); } @@ -2302,12 +2307,12 @@ future<> view_builder::drain() { } vlogger.info("Draining view builder"); _as.request_abort(); - co_await std::move(_started); co_await _mnotifier.unregister_listener(this); co_await _vug.drain(); co_await _sem.wait(); _sem.broken(); - co_await _build_step.join(); + _build_step.broken(); + co_await std::move(_step_fiber); co_await coroutine::parallel_for_each(_base_to_build_step, [] (std::pair& p) { return p.second.reader.close(); }); @@ -2728,8 +2733,7 @@ future<> view_builder::handle_create_view_local(const sstring& ks_name, const ss vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), std::current_exception()); } - // Waited on indirectly in stop(). - static_cast(_build_step.trigger()); + _build_step.signal(); } void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) { @@ -2828,14 +2832,19 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name })); } -future<> view_builder::do_build_step() { +future<> view_builder::run_in_background() { // Run the view building in the streaming scheduling group // so that it doesn't impact other tasks with higher priority. seastar::thread_attributes attr; attr.sched_group = _db.get_streaming_scheduling_group(); return seastar::async(std::move(attr), [this] { exponential_backoff_retry r(1s, 1min); - while (!_base_to_build_step.empty() && !_as.abort_requested()) { + while (!_as.abort_requested()) { + try { + _build_step.wait([this] { return !_base_to_build_step.empty(); }).get(); + } catch (const seastar::broken_condition_variable&) { + return; + } auto units = get_units(_sem, view_builder_semaphore_units).get(); ++_stats.steps_performed; try { diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 561109462e..81480e6b3b 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -11,13 +11,13 @@ #include "query/query-request.hh" #include "service/migration_listener.hh" #include "service/raft/raft_group0_client.hh" -#include "utils/serialized_action.hh" #include "utils/cross-shard-barrier.hh" #include "replica/database.hh" #include #include #include +#include #include #include #include @@ -174,7 +174,7 @@ class view_builder final : public service::migration_listener::only_view_notific reader_permit _permit; base_to_build_step_type _base_to_build_step; base_to_build_step_type::iterator _current_step = _base_to_build_step.end(); - serialized_action _build_step{std::bind(&view_builder::do_build_step, this)}; + condition_variable _build_step; static constexpr size_t view_builder_semaphore_units = 1; // Ensures bookkeeping operations are serialized, meaning that while we execute // a build step we don't consider newly added or removed views. This simplifies @@ -191,7 +191,7 @@ class view_builder final : public service::migration_listener::only_view_notific // `on_drop_view`, `on_create_view`, or `on_update_view` events. seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}}; seastar::abort_source _as; - future<> _started = make_ready_future<>(); + future<> _step_fiber = make_ready_future<>(); // Used to coordinate between shards the conclusion of the build process for a particular view. std::unordered_set _built_views; // Used for testing. @@ -278,7 +278,7 @@ private: void setup_shard_build_step(view_builder_init_state& vbi, std::vector, std::vector); future<> calculate_shard_build_step(view_builder_init_state& vbi); future<> add_new_view(view_ptr, build_step&); - future<> do_build_step(); + future<> run_in_background(); void execute(build_step&, exponential_backoff_retry); future<> maybe_mark_view_as_built(view_ptr, dht::token); future<> mark_as_built(view_ptr);