From fd4b2a33196d6de19cff6d0227abaac06709cfc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 1 Nov 2022 12:10:14 +0200 Subject: [PATCH] db/view/view_update_check: check_needs_view_update_path(): filter out non-member hosts We currently don't clean up the system_distributed.view_build_status table after removed nodes. This can cause false-positive check for whether view update generation is needed for streaming. The proper fix is to clean up this table, but that will be more involved, it even when done, it might not be immediate. So until then and to be on the safe side, filter out entries belonging to unknown hosts from said table. Fixes: #11905 Refs: #11836 Closes #11860 (cherry picked from commit 84a69b6adb3df64657956404884c3880102e84b4) --- db/view/view.cc | 18 +++++++++++------- db/view/view_update_checks.hh | 8 ++++++-- replica/distributed_loader.cc | 2 +- streaming/consumer.cc | 2 +- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index f78b4212ef..02a0478616 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2150,24 +2150,28 @@ update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog bac return std::max(backlog, _max.load(std::memory_order_relaxed)); } -future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name) { - return sys_dist_ks.view_status(ks_name, cf_name).then([] (std::unordered_map&& view_statuses) { - return boost::algorithm::any_of(view_statuses | boost::adaptors::map_values, [] (const sstring& view_status) { - return view_status == "STARTED"; +future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const sstring& ks_name, + const sstring& cf_name) { + using view_statuses_type = std::unordered_map; + return sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) { + return boost::algorithm::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) { + // Only consider status of known hosts. + return view_status.second == "STARTED" && tm.get_endpoint_for_host_id(view_status.first); }); }); } -future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason) { +future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t, + streaming::stream_reason reason) { if (is_internal_keyspace(t.schema()->ks_name())) { return make_ready_future(false); } if (reason == streaming::stream_reason::repair && !t.views().empty()) { return make_ready_future(true); } - return do_with(t.views(), [&sys_dist_ks] (auto& views) { + return do_with(t.views(), [&sys_dist_ks, &tm] (auto& views) { return map_reduce(views, - [&sys_dist_ks] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, view->ks_name(), view->cf_name()); }, + [&sys_dist_ks, &tm] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, tm, view->ks_name(), view->cf_name()); }, false, std::logical_or()); }); diff --git a/db/view/view_update_checks.hh b/db/view/view_update_checks.hh index deffeeb034..77b7113c0d 100644 --- a/db/view/view_update_checks.hh +++ b/db/view/view_update_checks.hh @@ -22,9 +22,13 @@ class system_distributed_keyspace; } +namespace locator { +class token_metadata; +} + namespace db::view { -future check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name); -future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason); +future check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t, + streaming::stream_reason reason); } diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 6794ca4265..9d04d9e8aa 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -361,7 +361,7 @@ distributed_loader::process_upload_dir(distributed& db, distr &error_handler_gen_for_upload_dir); }, sstables::sstable_directory::default_sstable_filter()).get(); - const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), *global_table, streaming::stream_reason::repair).get0(); + const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get0(); auto datadir = upload.parent_path(); if (use_view_update_path) { diff --git a/streaming/consumer.cc b/streaming/consumer.cc index 831432e518..bac57e510e 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -29,7 +29,7 @@ std::function (flat_mutation_reader)> make_streaming_consumer(sstring o std::exception_ptr ex; try { auto cf = db.local().find_column_family(reader.schema()).shared_from_this(); - auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), *cf, reason); + auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason); //FIXME: for better estimations this should be transmitted from remote auto metadata = mutation_source_metadata{}; auto& cs = cf->get_compaction_strategy();