diff --git a/db/view/view.cc b/db/view/view.cc index 5a6ebfa966..9ba8677ad7 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2240,6 +2240,7 @@ void view_builder::setup_metrics() { } future<> view_builder::start_in_background(service::migration_manager& mm, utils::cross_shard_barrier barrier) { + auto step_fiber = make_ready_future<>(); try { view_builder_init_state vbi; auto fail = defer([&barrier] mutable { barrier.abort(); }); @@ -2273,8 +2274,10 @@ future<> view_builder::start_in_background(service::migration_manager& mm, utils _mnotifier.register_listener(this); co_await calculate_shard_build_step(vbi); _current_step = _base_to_build_step.begin(); - // Waited on indirectly in stop(). - (void)_build_step.trigger(); + + // If preparation above fails, run_in_background() is not invoked, just + // the start_in_background() emits a warning into logs and resolves + step_fiber = run_in_background(); } catch (...) { auto ex = std::current_exception(); auto ll = log_level::error; @@ -2289,10 +2292,12 @@ future<> view_builder::start_in_background(service::migration_manager& mm, utils } vlogger.log(ll, "start aborted: {}", ex); } + + co_await std::move(step_fiber); } future<> view_builder::start(service::migration_manager& mm, utils::cross_shard_barrier barrier) { - _started = start_in_background(mm, std::move(barrier)); + _step_fiber = start_in_background(mm, std::move(barrier)); return make_ready_future<>(); } @@ -2302,12 +2307,12 @@ future<> view_builder::drain() { } vlogger.info("Draining view builder"); _as.request_abort(); - co_await std::move(_started); co_await _mnotifier.unregister_listener(this); co_await _vug.drain(); co_await _sem.wait(); _sem.broken(); - co_await _build_step.join(); + _build_step.broken(); + co_await std::move(_step_fiber); co_await coroutine::parallel_for_each(_base_to_build_step, [] (std::pair& p) { return p.second.reader.close(); }); @@ -2728,8 +2733,7 @@ future<> view_builder::handle_create_view_local(const sstring& ks_name, const ss vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), std::current_exception()); } - // Waited on indirectly in stop(). - static_cast(_build_step.trigger()); + _build_step.signal(); } void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) { @@ -2828,14 +2832,19 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name })); } -future<> view_builder::do_build_step() { +future<> view_builder::run_in_background() { // Run the view building in the streaming scheduling group // so that it doesn't impact other tasks with higher priority. seastar::thread_attributes attr; attr.sched_group = _db.get_streaming_scheduling_group(); return seastar::async(std::move(attr), [this] { exponential_backoff_retry r(1s, 1min); - while (!_base_to_build_step.empty() && !_as.abort_requested()) { + while (!_as.abort_requested()) { + try { + _build_step.wait([this] { return !_base_to_build_step.empty(); }).get(); + } catch (const seastar::broken_condition_variable&) { + return; + } auto units = get_units(_sem, view_builder_semaphore_units).get(); ++_stats.steps_performed; try { diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 561109462e..81480e6b3b 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -11,13 +11,13 @@ #include "query/query-request.hh" #include "service/migration_listener.hh" #include "service/raft/raft_group0_client.hh" -#include "utils/serialized_action.hh" #include "utils/cross-shard-barrier.hh" #include "replica/database.hh" #include #include #include +#include #include #include #include @@ -174,7 +174,7 @@ class view_builder final : public service::migration_listener::only_view_notific reader_permit _permit; base_to_build_step_type _base_to_build_step; base_to_build_step_type::iterator _current_step = _base_to_build_step.end(); - serialized_action _build_step{std::bind(&view_builder::do_build_step, this)}; + condition_variable _build_step; static constexpr size_t view_builder_semaphore_units = 1; // Ensures bookkeeping operations are serialized, meaning that while we execute // a build step we don't consider newly added or removed views. This simplifies @@ -191,7 +191,7 @@ class view_builder final : public service::migration_listener::only_view_notific // `on_drop_view`, `on_create_view`, or `on_update_view` events. seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}}; seastar::abort_source _as; - future<> _started = make_ready_future<>(); + future<> _step_fiber = make_ready_future<>(); // Used to coordinate between shards the conclusion of the build process for a particular view. std::unordered_set _built_views; // Used for testing. @@ -278,7 +278,7 @@ private: void setup_shard_build_step(view_builder_init_state& vbi, std::vector, std::vector); future<> calculate_shard_build_step(view_builder_init_state& vbi); future<> add_new_view(view_ptr, build_step&); - future<> do_build_step(); + future<> run_in_background(); void execute(build_step&, exponential_backoff_retry); future<> maybe_mark_view_as_built(view_ptr, dht::token); future<> mark_as_built(view_ptr);