diff --git a/db/view/view.cc b/db/view/view.cc index f78b4212ef..02a0478616 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2150,24 +2150,28 @@ update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog bac return std::max(backlog, _max.load(std::memory_order_relaxed)); } -future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name) { - return sys_dist_ks.view_status(ks_name, cf_name).then([] (std::unordered_map&& view_statuses) { - return boost::algorithm::any_of(view_statuses | boost::adaptors::map_values, [] (const sstring& view_status) { - return view_status == "STARTED"; +future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const sstring& ks_name, + const sstring& cf_name) { + using view_statuses_type = std::unordered_map; + return sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) { + return boost::algorithm::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) { + // Only consider status of known hosts. + return view_status.second == "STARTED" && tm.get_endpoint_for_host_id(view_status.first); }); }); } -future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason) { +future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t, + streaming::stream_reason reason) { if (is_internal_keyspace(t.schema()->ks_name())) { return make_ready_future(false); } if (reason == streaming::stream_reason::repair && !t.views().empty()) { return make_ready_future(true); } - return do_with(t.views(), [&sys_dist_ks] (auto& views) { + return do_with(t.views(), [&sys_dist_ks, &tm] (auto& views) { return map_reduce(views, - [&sys_dist_ks] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, view->ks_name(), view->cf_name()); }, + [&sys_dist_ks, &tm] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, tm, view->ks_name(), view->cf_name()); }, false, std::logical_or()); }); diff --git a/db/view/view_update_checks.hh b/db/view/view_update_checks.hh index deffeeb034..77b7113c0d 100644 --- a/db/view/view_update_checks.hh +++ b/db/view/view_update_checks.hh @@ -22,9 +22,13 @@ class system_distributed_keyspace; } +namespace locator { +class token_metadata; +} + namespace db::view { -future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name); -future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason); +future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t, + streaming::stream_reason reason); } diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 6794ca4265..9d04d9e8aa 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -361,7 +361,7 @@ distributed_loader::process_upload_dir(distributed& db, distr &error_handler_gen_for_upload_dir); }, sstables::sstable_directory::default_sstable_filter()).get(); - const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), *global_table, streaming::stream_reason::repair).get0(); + const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get0(); auto datadir = upload.parent_path(); if (use_view_update_path) { diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 831432e518..bac57e510e 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -29,7 +29,7 @@ std::function (flat_mutation_reader)> make_streaming_consumer(sstring o std::exception_ptr ex; try { auto cf = db.local().find_column_family(reader.schema()).shared_from_this(); - auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), *cf, reason); + auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason); //FIXME: for better estimations this should be transmitted from remote auto metadata = mutation_source_metadata{}; auto& cs = cf->get_compaction_strategy();