Files
scylladb/replica/distributed_loader.cc
Taras Veretilnyk 7214f5a0b6 sstables: propagate ignore_component_digest_mismatch config to all load sites
Add ignore_component_digest_mismatch option to db::config (default false).
When set, sstable loading logs a warning instead of throwing on component
digest mismatches, allowing a node to start up despite corrupted non-vital
components or bugs in digest calculation.

Propagate the config to all production sstable load paths:
- distributed_loader (node startup, upload dir processing)
- storage_service (tablet storage cloning)
- sstables_loader (load-and-stream, download tasks, attach)
- stream_blob (tablet streaming)
2026-03-10 19:24:05 +01:00

573 lines
27 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/view/view_building_worker.hh"
#include "sstables/shared_sstable.hh"
#include "utils/assert.hh"
#include <fmt/std.h>
#include <seastar/core/coroutine.hh>
#include <seastar/core/smp.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/closeable.hh>
#include "distributed_loader.hh"
#include "replica/database.hh"
#include "replica/global_table_ptr.hh"
#include "db/config.hh"
#include "db/extensions.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/schema_tables.hh"
#include "compaction/compaction_manager.hh"
#include "compaction/task_manager_module.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstable_directory.hh"
#include "auth/common.hh"
#include "tracing/trace_keyspace_helper.hh"
#include "db/view/view_update_checks.hh"
#include "ent/encryption/replicated_key_provider.hh"
#include <unordered_map>
#include "db/view/view_builder.hh"
extern logging::logger dblog;
static const std::unordered_set<std::string_view> system_keyspaces = {
db::system_keyspace::NAME, db::schema_tables::NAME,
};
bool is_system_keyspace(std::string_view name) {
return system_keyspaces.contains(name);
}
static const std::unordered_set<std::string_view> internal_keyspaces = {
db::system_distributed_keyspace::NAME,
db::system_distributed_keyspace::NAME_EVERYWHERE,
db::system_keyspace::NAME,
db::schema_tables::NAME,
auth::meta::legacy::AUTH_KS,
tracing::trace_keyspace_helper::KEYSPACE_NAME,
encryption::replicated_key_provider_factory::KSNAME
};
bool is_internal_keyspace(std::string_view name) {
return internal_keyspaces.contains(name);
}
namespace replica {
static io_error_handler error_handler_for_upload_dir() {
return [] (std::exception_ptr eptr) {
// do nothing about sstable exception and caller will just rethrow it.
};
}
io_error_handler error_handler_gen_for_upload_dir(disk_error_signal_type& dummy) {
return error_handler_for_upload_dir();
}
future<>
distributed_loader::process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags) {
co_await dir.invoke_on(0, [flags] (sstables::sstable_directory& d) -> future<> {
co_await d.prepare(flags);
});
co_await dir.invoke_on_all([&dir, flags] (sstables::sstable_directory& d) -> future<> {
// Supposed to be called with the node either down or on behalf of maintenance tasks
// like nodetool refresh
co_await d.process_sstable_dir(flags);
co_await d.move_foreign_sstables(dir);
});
co_await dir.invoke_on_all(&sstables::sstable_directory::commit_directory_changes);
}
future<>
distributed_loader::lock_table(global_table_ptr& table, sharded<sstables::sstable_directory>& dir) {
return dir.invoke_on_all([&table] (sstables::sstable_directory& d) {
d.store_phaser(table->write_in_progress());
return make_ready_future<>();
});
}
// Global resharding function. Done in two parts:
// - The first part spreads the foreign_sstable_open_info across shards so that all of them are
// resharding about the same amount of data
// - The second part calls each shard's distributed object to reshard the SSTables they were
// assigned.
future<>
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr) {
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::table_resharding_compaction_task_impl>({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr));
co_await task->done();
}
future<sstables::sstable::version_types>
highest_version_seen(sharded<sstables::sstable_directory>& dir, sstables::sstable_version_types system_version) {
using version = sstables::sstable_version_types;
return dir.map_reduce0(std::mem_fn(&sstables::sstable_directory::highest_version_seen), system_version, [] (version a, version b) {
return std::max(a, b);
});
}
future<>
distributed_loader::reshape(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, compaction::reshape_mode mode,
sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator,
std::function<bool (const sstables::shared_sstable&)> filter) {
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::table_reshaping_compaction_task_impl>({}, std::move(ks_name), std::move(table_name), dir, db, mode, std::move(creator), std::move(filter));
co_await task->done();
}
// Loads SSTables into the main directory (or staging) and returns how many were loaded
future<size_t>
distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sharded<replica::database>& db,
sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw, db::view::sstable_destination_decision needs_view_update, sstring ks, sstring cf) {
auto& table = db.local().find_column_family(ks, cf);
auto new_sstables = std::vector<sstables::shared_sstable>();
co_await dir.do_for_each_sstable([&table, needs_view_update, &new_sstables] (sstables::shared_sstable sst) -> future<> {
auto gen = table.calculate_generation_for_new_table();
dblog.trace("Loading {} into {}, new generation {}", sst->get_filename(), needs_view_update == db::view::sstable_destination_decision::normal_directory ? "base" : "staging", gen);
co_await sst->pick_up_from_upload(needs_view_update == db::view::sstable_destination_decision::normal_directory ? sstables::sstable_state::normal : sstables::sstable_state::staging, gen);
// When loading an imported sst, set level to 0 because it may overlap with existing ssts on higher levels.
sst->set_sstable_level(0);
new_sstables.push_back(std::move(sst));
});
// nothing loaded
if (new_sstables.empty()) {
co_return 0;
}
co_await table.add_sstables_and_update_cache(new_sstables).handle_exception([&table] (std::exception_ptr ep) {
dblog.error("Failed to load SSTables for {}.{}: {}. Aborting.", table.schema()->ks_name(), table.schema()->cf_name(), ep);
abort();
});
if (needs_view_update == db::view::sstable_destination_decision::staging_managed_by_vbc) {
co_await vbw.local().register_staging_sstable_tasks(new_sstables, table.schema()->id());
} else if (needs_view_update == db::view::sstable_destination_decision::staging_directly_to_generator) {
co_await coroutine::parallel_for_each(new_sstables, [&vb, &table] (sstables::shared_sstable sst) -> future<> {
if (sst->requires_view_building()) {
co_await vb.local().register_staging_sstable(sst, table.shared_from_this());
}
});
}
co_return new_sstables.size();
}
future<>
distributed_loader::process_upload_dir(sharded<replica::database>& db, sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw, sstring ks, sstring cf, bool skip_cleanup, bool skip_reshape) {
const auto& rs = db.local().find_keyspace(ks).get_replication_strategy();
if (rs.is_per_table()) {
on_internal_error(dblog, "process_upload_dir is not supported with tablets");
}
return seastar::async([&db, &vb, &vbw, ks = std::move(ks), cf = std::move(cf), skip_cleanup, skip_reshape] {
auto global_table = get_table_on_all_shards(db, ks, cf).get();
sharded<sstables::sstable_directory> directory;
directory.start(global_table.as_sharded_parameter(),
sstables::sstable_state::upload, &error_handler_gen_for_upload_dir
).get();
auto stop_directory = deferred_stop(directory);
lock_table(global_table, directory).get();
sstables::sstable_directory::process_flags flags {
.need_mutate_level = true,
.enable_dangerous_direct_import_of_cassandra_counters = db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters(),
.allow_loading_materialized_view = false,
.sstable_open_config = {
.ignore_component_digest_mismatch = db.local().get_config().ignore_component_digest_mismatch(),
},
};
process_sstable_dir(directory, flags).get();
auto make_sstable = [&] (shard_id shard) {
auto& sstm = global_table->get_sstables_manager();
auto& gen = global_table->get_sstable_generation_generator();
auto generation = gen();
return sstm.make_sstable(global_table->schema(), global_table->get_storage_options(),
generation, sstables::sstable_state::upload, sstm.get_preferred_sstable_version(),
sstables::sstable_format_types::big, db_clock::now(), &error_handler_gen_for_upload_dir);
};
// Pass owned_ranges_ptr to reshard to piggy-back cleanup on the resharding compaction.
// Note that needs_cleanup() is inaccurate and may return false positives,
// maybe triggering resharding+cleanup unnecessarily for some sstables.
// But this is resharding on refresh (sstable loading via upload dir),
// which will usually require resharding anyway.
//
// FIXME: take multiple compaction groups into account
// - segregate resharded tables into compaction groups
// - split the keyspace local ranges per compaction_group as done in table::perform_cleanup_compaction
// so that cleanup can be considered per compaction group
const auto& erm = db.local().find_keyspace(ks).get_static_effective_replication_map();
auto owned_ranges_ptr = skip_cleanup ? lw_shared_ptr<dht::token_range_vector>(nullptr) : compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(erm).get());
reshard(directory, db, ks, cf, make_sstable, owned_ranges_ptr).get();
if (!skip_reshape) {
reshape(directory, db, compaction::reshape_mode::strict, ks, cf, make_sstable,
[] (const sstables::shared_sstable&) { return true; }).get();
}
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
const auto use_view_update_path = db::view::check_needs_view_update_path(vb.local(), erm->get_token_metadata_ptr(), *global_table, streaming::stream_reason::repair).get();
size_t loaded = directory.map_reduce0([&db, ks, cf, use_view_update_path, &vb, &vbw] (sstables::sstable_directory& dir) {
return make_sstables_available(dir, db, vb, vbw, use_view_update_path, ks, cf);
}, size_t(0), std::plus<size_t>()).get();
dblog.info("Loaded {} SSTables", loaded);
});
}
future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
distributed_loader::get_sstables_from_upload_dir(sharded<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg) {
return get_sstables_from(db, ks, cf, cfg, [] (auto& global_table, auto& directory) {
return directory.start(global_table.as_sharded_parameter(),
sstables::sstable_state::upload, &error_handler_gen_for_upload_dir
);
});
}
future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
distributed_loader::get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring type, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> get_abort_src) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, type, prefix, sstables=std::move(sstables), &get_abort_src] (auto& global_table, auto& directory) {
return directory.start(global_table.as_sharded_parameter(),
sharded_parameter([bucket, endpoint, type, prefix, &get_abort_src] {
seastar::abort_source* as = get_abort_src ? get_abort_src() : nullptr;
auto opts = data_dictionary::make_object_storage_options(endpoint, type, bucket, prefix, as);
return make_lw_shared<const data_dictionary::storage_options>(std::move(opts));
}),
sstables,
&error_handler_gen_for_upload_dir);
});
}
future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
distributed_loader::get_sstables_from(sharded<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg,
noncopyable_function<future<>(global_table_ptr&, sharded<sstables::sstable_directory>&)> start_dir) {
return seastar::async([&db, ks = std::move(ks), cf = std::move(cf), start_dir = std::move(start_dir), cfg] {
auto global_table = get_table_on_all_shards(db, ks, cf).get();
auto table_id = global_table->schema()->id();
sharded<sstables::sstable_directory> directory;
start_dir(global_table, directory).get();
auto stop = deferred_stop(directory);
std::vector<std::vector<sstables::shared_sstable>> sstables_on_shards(smp::count);
lock_table(global_table, directory).get();
sstables::sstable_directory::process_flags flags {
.need_mutate_level = true,
.enable_dangerous_direct_import_of_cassandra_counters = db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters(),
.allow_loading_materialized_view = false,
.sort_sstables_according_to_owner = false,
.sstable_open_config = cfg,
};
process_sstable_dir(directory, flags).get();
directory.invoke_on_all([&sstables_on_shards] (sstables::sstable_directory& d) mutable {
sstables_on_shards[this_shard_id()] = d.get_unsorted_sstables();
}).get();
return std::make_tuple(table_id, std::move(sstables_on_shards));
});
}
class table_populator {
sharded<replica::database>& _db;
sstring _ks;
sstring _cf;
global_table_ptr& _global_table;
std::vector<lw_shared_ptr<sharded<sstables::sstable_directory>>> _sstable_directories;
sstables::sstable_version_types _version_for_reshaping = sstables::oldest_writable_sstable_format;
public:
table_populator(global_table_ptr& ptr, sharded<replica::database>& db, sstring ks, sstring cf)
: _db(db)
, _ks(std::move(ks))
, _cf(std::move(cf))
, _global_table(ptr)
{
}
~table_populator() {
// All directories must have been stopped
// using table_populator::stop()
SCYLLA_ASSERT(_sstable_directories.empty());
}
future<> start();
future<> stop();
private:
using allow_offstrategy_compaction = bool_class<struct allow_offstrategy_compaction_tag>;
future<> collect_subdirs();
future<> collect_subdirs(const data_dictionary::storage_options::local&, sstables::sstable_state state);
future<> collect_subdirs(const data_dictionary::storage_options::object_storage&, sstables::sstable_state state);
future<> process_subdir(sharded<sstables::sstable_directory>&);
future<> populate_subdir(sharded<sstables::sstable_directory>&);
};
future<> table_populator::start() {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await collect_subdirs();
for (auto dir : _sstable_directories) {
co_await process_subdir(*dir);
}
co_await smp::invoke_on_all([this] {
return _global_table->disable_auto_compaction();
});
for (auto dir : _sstable_directories) {
co_await populate_subdir(*dir);
}
}
future<> table_populator::stop() {
while (!_sstable_directories.empty()) {
co_await _sstable_directories.back()->stop();
_sstable_directories.pop_back();
}
}
future<> table_populator::collect_subdirs(const data_dictionary::storage_options::local& so, sstables::sstable_state state) {
auto datadirs = _global_table->get_sstables_manager().get_local_directories(so);
co_await coroutine::parallel_for_each(datadirs, [&] (const std::filesystem::path& datadir) -> future<> {
auto dptr = make_lw_shared<sharded<sstables::sstable_directory>>();
co_await dptr->start(_global_table.as_sharded_parameter(), state,
sharded_parameter([datadir] {
auto opts = data_dictionary::make_local_options(datadir);
return make_lw_shared<const data_dictionary::storage_options>(std::move(opts));
}), default_io_error_handler_gen());
_sstable_directories.push_back(std::move(dptr));
});
}
future<> table_populator::collect_subdirs(const data_dictionary::storage_options::object_storage& so, sstables::sstable_state state) {
auto dptr = make_lw_shared<sharded<sstables::sstable_directory>>();
co_await dptr->start(_global_table.as_sharded_parameter(), state, default_io_error_handler_gen());
_sstable_directories.push_back(std::move(dptr));
}
future<> table_populator::collect_subdirs() {
// The table base directory (with sstable_state::normal) must be
// loaded and processed first as it now may contain the shared
// pending_delete_dir, possibly referring to sstables in sub-directories.
for (auto state : { sstables::sstable_state::normal, sstables::sstable_state::staging, sstables::sstable_state::quarantine }) {
co_await std::visit([this, state] (const auto& so) -> future<> { co_await collect_subdirs(so, state); }, _global_table->get_storage_options().value);
}
// directory must be stopped using table_populator::stop below
}
future<> table_populator::process_subdir(sharded<sstables::sstable_directory>& directory) {
co_await distributed_loader::lock_table(_global_table, directory);
sstables::sstable_directory::process_flags flags {
.throw_on_missing_toc = true,
.enable_dangerous_direct_import_of_cassandra_counters = _db.local().get_config().enable_dangerous_direct_import_of_cassandra_counters(),
.allow_loading_materialized_view = true,
.garbage_collect = true,
.sstable_open_config = {
.ignore_component_digest_mismatch = _db.local().get_config().ignore_component_digest_mismatch(),
},
};
co_await distributed_loader::process_sstable_dir(directory, flags);
// If we are resharding system tables before we can read them, we will not
// know which is the highest format we support: this information is itself stored
// in the system tables. In that case we'll rely on what we find on disk: we'll
// at least not downgrade any files. If we already know that we support a higher
// format than the one we see then we use that.
auto sst_version = co_await highest_version_seen(directory, sstables::oldest_writable_sstable_format);
_version_for_reshaping = _global_table->get_sstables_manager().get_safe_sstable_version_for_rewrites(sst_version);
}
sstables::shared_sstable make_sstable(replica::table& table, sstables::sstable_state state, sstables::generation_type generation, sstables::sstable_version_types v) {
return table.get_sstables_manager().make_sstable(table.schema(), table.get_storage_options(), generation, state, v, sstables::sstable_format_types::big);
}
future<> table_populator::populate_subdir(sharded<sstables::sstable_directory>& directory) {
auto state = directory.local().state();
dblog.debug("Populating {}/{}/{} state={}", _ks, _cf, _global_table->get_storage_options(), state);
co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, state] (shard_id shard) mutable {
auto gen = smp::submit_to(shard, [this] () {
return _global_table->calculate_generation_for_new_table();
}).get();
return make_sstable(*_global_table, state, gen, _version_for_reshaping);
});
// The node is offline at this point so we are very lenient with what we consider
// offstrategy.
// SSTables created by repair may not conform to compaction strategy layout goal
// because data segregation is only performed by compaction
// Instead of reshaping them on boot, let's add them to maintenance set and allow
// off-strategy compaction to reshape them. This will allow node to become online
// ASAP. Given that SSTables with repair origin are disjoint, they can be efficiently
// read from.
auto eligible_for_reshape_on_boot = [] (const sstables::shared_sstable& sst) {
return sst->get_origin() != sstables::repair_origin;
};
co_await distributed_loader::reshape(directory, _db, compaction::reshape_mode::relaxed, _ks, _cf, [this, state](shard_id shard) {
auto gen = _global_table->calculate_generation_for_new_table();
return make_sstable(*_global_table, state, gen, _version_for_reshaping);
}, eligible_for_reshape_on_boot);
auto do_allow_offstrategy_compaction = allow_offstrategy_compaction(state == sstables::sstable_state::normal);
co_await directory.invoke_on_all([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) -> future<> {
co_await dir.do_for_each_sstable([this, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) {
auto requires_offstrategy = sstables::offstrategy(do_allow_offstrategy_compaction && !eligible_for_reshape_on_boot(sst));
return _global_table->add_sstable_and_update_cache(sst, requires_offstrategy);
});
if (do_allow_offstrategy_compaction) {
_global_table->trigger_offstrategy_compaction();
}
});
}
future<> distributed_loader::populate_keyspace(sharded<replica::database>& db,
sharded<db::system_keyspace>& sys_ks, keyspace& ks, sstring ks_name)
{
dblog.info("Populating Keyspace {}", ks_name);
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data() | std::views::values, [&] (schema_ptr s) -> future<> {
auto uuid = s->id();
sstring cfname = s->cf_name();
auto gtable = co_await get_table_on_all_shards(db, ks_name, cfname);
auto& cf = *gtable;
dblog.info("Keyspace {}: Reading CF {} id={} version={} storage={}", ks_name, cfname, uuid, s->version(), cf.get_storage_options());
auto metadata = table_populator(gtable, db, ks_name, cfname);
std::exception_ptr ex;
try {
co_await metadata.start();
} catch (...) {
std::exception_ptr eptr = std::current_exception();
std::string msg =
format("Exception while populating keyspace '{}' with column family '{}' from '{}': {}",
ks_name, cfname, cf.get_storage_options(), eptr);
dblog.error("{}", msg);
try {
std::rethrow_exception(eptr);
} catch (compaction::compaction_stopped_exception& e) {
// swallow compaction stopped exception, to allow clean shutdown.
} catch (...) {
ex = std::make_exception_ptr(std::runtime_error(msg.c_str()));
}
}
co_await metadata.stop();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
// system tables are made writable through sys_ks::mark_writable
if (!is_system_keyspace(ks_name)) {
co_await smp::invoke_on_all([&] {
auto s = gtable->schema();
db.local().find_column_family(s).mark_ready_for_writes(db.local().commitlog_for(s));
});
}
});
// here we can be sure that the 'truncated' table is loaded,
// now we load and initialize the truncation times for the tables
{
const auto truncation_times = co_await sys_ks.local().load_truncation_times();
co_await smp::invoke_on_all([&] {
db.local().get_tables_metadata().for_each_table([&](table_id id, lw_shared_ptr<table> table) {
if (table->schema()->ks_name() != ks_name) {
return;
}
const auto it = truncation_times.find(id);
const auto truncation_time = it == truncation_times.end()
? db_clock::time_point::min()
: it->second;
if (this_shard_id() == 0 && table->schema()->ks_name() == db::schema_tables::NAME &&
truncation_time != db_clock::time_point::min()) {
// replay_position stored in the truncation record may belong to
// the old (default) commitlog domain. It's not safe to interpret
// that replay position in the schema commitlog domain.
// Refuse to boot in this case. We assume no one truncated schema tables.
// We will hit this during rolling upgrade, in which case the user will
// roll back and let us know.
throw std::runtime_error(format("Schema table {}.{} has a truncation record. Booting is not safe.",
table->schema()->ks_name(), table->schema()->cf_name()));
}
table->set_truncation_time(truncation_time);
});
});
}
}
future<> distributed_loader::init_system_keyspace(sharded<db::system_keyspace>& sys_ks, sharded<locator::effective_replication_map_factory>& erm_factory, sharded<replica::database>& db) {
return seastar::async([&sys_ks, &erm_factory, &db] {
sys_ks.invoke_on_all([&erm_factory, &db] (auto& sys_ks) {
return sys_ks.make(erm_factory.local(), db.local());
}).get();
for (auto ksname : system_keyspaces) {
auto& ks = db.local().get_keyspaces();
auto i = ks.find(ksname);
if (i != ks.end()) {
distributed_loader::populate_keyspace(db, sys_ks, i->second, sstring(ksname)).get();
}
}
});
}
future<> distributed_loader::init_non_system_keyspaces(sharded<replica::database>& db,
sharded<service::storage_proxy>& proxy, sharded<db::system_keyspace>& sys_ks) {
return seastar::async([&db, &proxy, &sys_ks] {
db.invoke_on_all([&proxy, &sys_ks] (replica::database& db) {
return db.parse_system_tables(proxy, sys_ks);
}).get();
const auto& cfg = db.local().get_config();
for (bool prio_only : { true, false}) {
std::vector<future<>> futures;
for (auto& ks : db.local().get_keyspaces()) {
auto& ks_name = ks.first;
if (is_system_keyspace(ks_name)) {
continue;
}
/**
* Must process in two phases: Prio and non-prio.
* This looks like it is not needed. And it is not
* in open-source version. But essential for enterprise.
* Do _not_ remove or refactor away.
*/
if (prio_only != cfg.extensions().is_extension_internal_keyspace(ks_name)) {
continue;
}
futures.emplace_back(distributed_loader::populate_keyspace(db, sys_ks, ks.second, ks_name));
}
when_all_succeed(futures.begin(), futures.end()).discard_result().get();
}
});
}
}