diff --git a/db/view/view.cc b/db/view/view.cc index 5455edb8fd..bc681ff764 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -26,7 +26,6 @@ #include #include #include -#include #include "replica/database.hh" #include "clustering_bounds_comparator.hh" @@ -2466,37 +2465,38 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) { future<> view_builder::maybe_mark_view_as_built(view_ptr view, dht::token next_token) { _built_views.emplace(view->id()); vlogger.debug("Shard finished building view {}.{}", view->ks_name(), view->cf_name()); - bool built = co_await container().map_reduce0( + return container().map_reduce0( [view_id = view->id()] (view_builder& builder) { return builder._built_views.contains(view_id); }, true, [] (bool result, bool shard_complete) { return result && shard_complete; + }).then([this, view, next_token = std::move(next_token)] (bool built) { + if (built) { + inject_failure("view_builder_mark_view_as_built"); + return container().invoke_on_all([view_id = view->id()] (view_builder& builder) { + if (builder._built_views.erase(view_id) == 0 || this_shard_id() != 0) { + return make_ready_future<>(); + } + auto view = builder._db.find_schema(view_id); + vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name()); + return seastar::when_all_succeed( + system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()), + builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name())).then_unpack([view] { + // The view is built, so shard 0 can remove the entry in the build progress system table on + // behalf of all shards. It is guaranteed to have a higher timestamp than the per-shard entries. + return system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name()); + }).then([&builder, view] { + auto it = builder._build_notifiers.find(std::pair(view->ks_name(), view->cf_name())); + if (it != builder._build_notifiers.end()) { + it->second.set_value(); + } + }); }); - if (built) { - inject_failure("view_builder_mark_view_as_built"); - co_await container().invoke_on_all(coroutine::lambda([view_id = view->id()] (view_builder& builder) -> future<> { - if (builder._built_views.erase(view_id) == 0 || this_shard_id() != 0) { - co_return; - } - auto view = builder._db.find_schema(view_id); - vlogger.info("Finished building view {}.{}", view->ks_name(), view->cf_name()); - co_await coroutine::all( - [&] { return system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()); }, - [&] { return builder._sys_dist_ks.finish_view_build(view->ks_name(), view->cf_name()); } - ); - // The view is built, so shard 0 can remove the entry in the build progress system table on - // behalf of all shards. It is guaranteed to have a higher timestamp than the per-shard entries. - co_await system_keyspace::remove_view_build_progress_across_all_shards(view->ks_name(), view->cf_name()); - - auto it = builder._build_notifiers.find(std::pair(view->ks_name(), view->cf_name())); - if (it != builder._build_notifiers.end()) { - it->second.set_value(); - } - })); - } - co_await system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token); + } + return system_keyspace::update_view_build_progress(view->ks_name(), view->cf_name(), next_token); + }); } future<> view_builder::wait_until_built(const sstring& ks_name, const sstring& view_name) {