db: call initialize_virtual_tables from shard 0 only

Move the smp::invoke_on_all dispatch from the callers into
initialize_virtual_tables() itself, so the function is called
once from shard 0 and internally distributes the per-shard
virtual table setup to all shards.

This simplifies the callers and allows a single place to add
cross-shard coordination logic (e.g. feature-gated table
registration) in future commits.
This commit is contained in:
Benny Halevy
2026-03-26 17:04:48 +02:00
parent 90d4ff34fb
commit cb6004b625
3 changed files with 35 additions and 37 deletions

View File

@@ -1470,40 +1470,42 @@ future<> initialize_virtual_tables(
sharded<service::tablet_allocator>& tablet_allocator,
sharded<netw::messaging_service>& ms,
db::config& cfg) {
auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry();
auto& virtual_tables = *virtual_tables_registry;
auto& db = dist_db.local();
auto& ss = dist_ss.local();
co_await smp::invoke_on_all([&] () -> future<> {
auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry();
auto& virtual_tables = *virtual_tables_registry;
auto& db = dist_db.local();
auto& ss = dist_ss.local();
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
auto schema = tbl->schema();
virtual_tables[schema->id()] = std::move(tbl);
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
auto& cf = db.find_column_family(schema);
cf.mark_ready_for_writes(nullptr);
auto& vt = virtual_tables[schema->id()];
cf.set_virtual_reader(vt->as_mutation_source());
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
};
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
auto schema = tbl->schema();
virtual_tables[schema->id()] = std::move(tbl);
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
auto& cf = db.find_column_family(schema);
cf.mark_ready_for_writes(nullptr);
auto& vt = virtual_tables[schema->id()];
cf.set_virtual_reader(vt->as_mutation_source());
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
};
// Add built-in virtual tables here.
co_await add_table(std::make_unique<cluster_status_table>(dist_ss, dist_gossiper));
co_await add_table(std::make_unique<token_ring_table>(db, ss));
co_await add_table(std::make_unique<snapshots_table>(dist_db));
co_await add_table(std::make_unique<protocol_servers_table>(ss));
co_await add_table(std::make_unique<runtime_info_table>(dist_db, ss));
co_await add_table(std::make_unique<versions_table>());
co_await add_table(std::make_unique<db_config_table>(cfg));
co_await add_table(std::make_unique<clients_table>(ss));
co_await add_table(std::make_unique<raft_state_table>(dist_raft_gr));
co_await add_table(std::make_unique<load_per_node>(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper));
co_await add_table(std::make_unique<tablet_sizes>(tablet_allocator, dist_db, dist_raft_gr, ms));
co_await add_table(std::make_unique<cdc_timestamps_table>(db, ss));
co_await add_table(std::make_unique<cdc_streams_table>(db, ss));
// Add built-in virtual tables here.
co_await add_table(std::make_unique<cluster_status_table>(dist_ss, dist_gossiper));
co_await add_table(std::make_unique<token_ring_table>(db, ss));
co_await add_table(std::make_unique<snapshots_table>(dist_db));
co_await add_table(std::make_unique<protocol_servers_table>(ss));
co_await add_table(std::make_unique<runtime_info_table>(dist_db, ss));
co_await add_table(std::make_unique<versions_table>());
co_await add_table(std::make_unique<db_config_table>(cfg));
co_await add_table(std::make_unique<clients_table>(ss));
co_await add_table(std::make_unique<raft_state_table>(dist_raft_gr));
co_await add_table(std::make_unique<load_per_node>(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper));
co_await add_table(std::make_unique<tablet_sizes>(tablet_allocator, dist_db, dist_raft_gr, ms));
co_await add_table(std::make_unique<cdc_timestamps_table>(db, ss));
co_await add_table(std::make_unique<cdc_streams_table>(db, ss));
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
});
}
virtual_tables_registry::virtual_tables_registry() : unique_ptr(std::make_unique<virtual_tables_registry_impl>()) {

View File

@@ -1963,9 +1963,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
});
checkpoint(stop_signal, "initializing virtual tables");
smp::invoke_on_all([&] {
return db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg);
}).get();
db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg).get();
// #293 - do not stop anything
// engine().at_exit([&qp] { return qp.stop(); });

View File

@@ -1008,9 +1008,7 @@ private:
_mnotifier.local().unregister_listener(&_ss.local()).get();
});
smp::invoke_on_all([&] {
return db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg);
}).get();
db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg).get();
_qp.invoke_on_all([this, &group0_client] (cql3::query_processor& qp) {
qp.start_remote(_mm.local(), _mapreduce_service.local(), _ss.local(), group0_client,