db/view/view_update_check: check_needs_view_update_path(): filter out non-member hosts

We currently don't clean up the system_distributed.view_build_status
table after removed nodes. This can cause false-positive check for
whether view update generation is needed for streaming.
The proper fix is to clean up this table, but that will be more
involved, it even when done, it might not be immediate. So until then
and to be on the safe side, filter out entries belonging to unknown
hosts from said table.

Fixes: #11905
Refs: #11836

Closes #11860

(cherry picked from commit 84a69b6adb)
This commit is contained in:
Botond Dénes
2022-11-01 12:10:14 +02:00
parent 416929fb2a
commit fd4b2a3319
4 changed files with 19 additions and 11 deletions

View File

@@ -2150,24 +2150,28 @@ update_backlog node_update_backlog::add_fetch(unsigned shard, update_backlog bac
return std::max(backlog, _max.load(std::memory_order_relaxed));
}
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name) {
return sys_dist_ks.view_status(ks_name, cf_name).then([] (std::unordered_map<utils::UUID, sstring>&& view_statuses) {
return boost::algorithm::any_of(view_statuses | boost::adaptors::map_values, [] (const sstring& view_status) {
return view_status == "STARTED";
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const sstring& ks_name,
const sstring& cf_name) {
using view_statuses_type = std::unordered_map<utils::UUID, sstring>;
return sys_dist_ks.view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) {
return boost::algorithm::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) {
// Only consider status of known hosts.
return view_status.second == "STARTED" && tm.get_endpoint_for_host_id(view_status.first);
});
});
}
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason) {
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t,
streaming::stream_reason reason) {
if (is_internal_keyspace(t.schema()->ks_name())) {
return make_ready_future<bool>(false);
}
if (reason == streaming::stream_reason::repair && !t.views().empty()) {
return make_ready_future<bool>(true);
}
return do_with(t.views(), [&sys_dist_ks] (auto& views) {
return do_with(t.views(), [&sys_dist_ks, &tm] (auto& views) {
return map_reduce(views,
[&sys_dist_ks] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, view->ks_name(), view->cf_name()); },
[&sys_dist_ks, &tm] (const view_ptr& view) { return check_view_build_ongoing(sys_dist_ks, tm, view->ks_name(), view->cf_name()); },
false,
std::logical_or<bool>());
});

View File

@@ -22,9 +22,13 @@ class system_distributed_keyspace;
}
namespace locator {
class token_metadata;
}
namespace db::view {
future<bool> check_view_build_ongoing(db::system_distributed_keyspace& sys_dist_ks, const sstring& ks_name, const sstring& cf_name);
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const replica::table& t, streaming::stream_reason reason);
future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_dist_ks, const locator::token_metadata& tm, const replica::table& t,
streaming::stream_reason reason);
}

View File

@@ -361,7 +361,7 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, distr
&error_handler_gen_for_upload_dir);
}, sstables::sstable_directory::default_sstable_filter()).get();
const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), *global_table, streaming::stream_reason::repair).get0();
const bool use_view_update_path = db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *global_table, streaming::stream_reason::repair).get0();
auto datadir = upload.parent_path();
if (use_view_update_path) {

View File

@@ -29,7 +29,7 @@ std::function<future<> (flat_mutation_reader)> make_streaming_consumer(sstring o
std::exception_ptr ex;
try {
auto cf = db.local().find_column_family(reader.schema()).shared_from_this();
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), *cf, reason);
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason);
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = cf->get_compaction_strategy();