/* * Copyright (C) 2023-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include #include #include #include "cdc/generation_service.hh" #include "cdc/log.hh" #include "cdc/metadata.hh" #include "db/config.hh" #include "db/system_keyspace.hh" #include "db/virtual_table.hh" #include "partition_slice_builder.hh" #include "db/virtual_tables.hh" #include "db/size_estimates_virtual_reader.hh" #include "db/view/build_progress_virtual_reader.hh" #include "index/built_indexes_virtual_reader.hh" #include "gms/gossiper.hh" #include "mutation/frozen_mutation.hh" #include "transport/protocol_server.hh" #include "release.hh" #include "replica/database.hh" #include "schema/schema_builder.hh" #include "service/raft/raft_group_registry.hh" #include "service/storage_service.hh" #include "service/tablet_allocator.hh" #include "locator/load_sketch.hh" #include "types/list.hh" #include "types/types.hh" #include "utils/build_id.hh" #include "utils/log.hh" #include "replica/exceptions.hh" #include "service/paxos/paxos_state.hh" #include "idl/storage_proxy.dist.hh" using namespace locator; namespace db { namespace { logging::logger vtlog("virtual_tables"); class cluster_status_table : public memtable_filling_virtual_table { private: sharded& _dist_ss; sharded& _dist_gossiper; public: cluster_status_table(sharded& ss, sharded& g) : memtable_filling_virtual_table(build_schema()) , _dist_ss(ss), _dist_gossiper(g) {} static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, "cluster_status"); return schema_builder(system_keyspace::NAME, "cluster_status", std::make_optional(id)) .with_column("peer", inet_addr_type, column_kind::partition_key) .with_column("dc", utf8_type) .with_column("rack", utf8_type) .with_column("up", boolean_type) .with_column("draining", boolean_type) .with_column("excluded", boolean_type) .with_column("status", utf8_type) .with_column("load", utf8_type) .with_column("tokens", int32_type) .with_column("owns", float_type) .with_column("host_id", uuid_type) .with_hash_version() .build(); } future<> execute(std::function mutation_sink) override { auto muts = co_await _dist_ss.invoke_on(0, [this] (service::storage_service& ss) -> future> { auto& gossiper = _dist_gossiper.local(); auto ownership = co_await ss.get_ownership(); const locator::token_metadata& tm = ss.get_token_metadata(); utils::chunked_vector muts; muts.reserve(gossiper.num_endpoints()); gossiper.for_each_endpoint_state([&] (const gms::endpoint_state& eps) { static thread_local auto s = build_schema(); auto endpoint = eps.get_host_id(); mutation m(s, partition_key::from_single_value(*s, data_value(eps.get_ip()).serialize_nonnull())); row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells(); auto hostid = eps.get_host_id(); set_cell(cr, "up", gossiper.is_alive(hostid)); if (gossiper.is_shutdown(endpoint)) { set_cell(cr, "status", gossiper.get_gossip_status(endpoint)); } else { set_cell(cr, "status", boost::to_upper_copy(fmt::format("{}", ss.get_node_state(hostid)))); } set_cell(cr, "load", gossiper.get_application_state_value(endpoint, gms::application_state::LOAD)); set_cell(cr, "host_id", hostid.uuid()); if (tm.get_topology().has_node(hostid)) { // Not all entries in gossiper are present in the topology auto& node = tm.get_topology().get_node(hostid); sstring dc = node.dc_rack().dc; sstring rack = node.dc_rack().rack; set_cell(cr, "dc", dc); set_cell(cr, "rack", rack); set_cell(cr, "draining", node.is_draining()); set_cell(cr, "excluded", node.is_excluded()); } if (ownership.contains(eps.get_ip())) { set_cell(cr, "owns", ownership[eps.get_ip()]); } set_cell(cr, "tokens", int32_t(tm.get_tokens(hostid).size())); muts.push_back(freeze(std::move(m))); }); co_return muts; }); for (auto& m : muts) { mutation_sink(m.unfreeze(schema())); } } }; class token_ring_table : public streaming_virtual_table { private: replica::database& _db; service::storage_service& _ss; public: token_ring_table(replica::database& db, service::storage_service& ss) : streaming_virtual_table(build_schema()) , _db(db) , _ss(ss) { _shard_aware = true; } static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, "token_ring"); return schema_builder(system_keyspace::NAME, "token_ring", std::make_optional(id)) .with_column("keyspace_name", utf8_type, column_kind::partition_key) .with_column("table_name", utf8_type, column_kind::clustering_key) .with_column("start_token", utf8_type, column_kind::clustering_key) .with_column("endpoint", inet_addr_type, column_kind::clustering_key) .with_column("end_token", utf8_type) .with_column("dc", utf8_type) .with_column("rack", utf8_type) .with_hash_version() .build(); } dht::decorated_key make_partition_key(const sstring& name) { return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(name).serialize_nonnull())); } clustering_key make_clustering_key(const sstring& table_name, sstring start_token, gms::inet_address host) { return clustering_key::from_exploded(*_s, { data_value(table_name).serialize_nonnull(), data_value(start_token).serialize_nonnull(), data_value(host).serialize_nonnull() }); } future<> emit_ring(result_collector& result, const dht::decorated_key& dk, const sstring& table_name, utils::chunked_vector ranges) { co_await result.emit_partition_start(dk); std::ranges::sort(ranges, std::ranges::less(), std::mem_fn(&dht::token_range_endpoints::_start_token)); for (dht::token_range_endpoints& range : ranges) { std::ranges::sort(range._endpoint_details, endpoint_details_cmp()); for (const dht::endpoint_details& detail : range._endpoint_details) { clustering_row cr(make_clustering_key(table_name, range._start_token, detail._host)); set_cell(cr.cells(), "end_token", sstring(range._end_token)); set_cell(cr.cells(), "dc", sstring(detail._datacenter)); set_cell(cr.cells(), "rack", sstring(detail._rack)); co_await result.emit_row(std::move(cr)); } } co_await result.emit_partition_end(); } struct endpoint_details_cmp { bool operator()(const dht::endpoint_details& l, const dht::endpoint_details& r) const { return inet_addr_type->less( data_value(l._host).serialize_nonnull(), data_value(r._host).serialize_nonnull()); } }; future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { struct decorated_keyspace_name { sstring name; dht::decorated_key key; }; auto keyspace_names = _db.get_non_local_strategy_keyspaces() | std::views::transform([this] (auto&& ks) { return decorated_keyspace_name{ks, make_partition_key(ks)}; }) | std::ranges::to(); std::ranges::sort(keyspace_names, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_keyspace_name::key)); for (const decorated_keyspace_name& e : keyspace_names) { auto&& dk = e.key; if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk) || !_db.has_keyspace(e.name)) { continue; } if (_db.find_keyspace(e.name).get_replication_strategy().uses_tablets()) { co_await _db.get_tables_metadata().for_each_table_gently([&, this] (table_id, lw_shared_ptr table) -> future<> { if (table->schema()->ks_name() != e.name) { co_return; } const auto& table_name = table->schema()->cf_name(); utils::chunked_vector ranges = co_await _ss.describe_ring_for_table(e.name, table_name); co_await emit_ring(result, e.key, table_name, std::move(ranges)); }); } else { utils::chunked_vector ranges = co_await _ss.describe_ring(e.name); co_await emit_ring(result, e.key, "", std::move(ranges)); } } } }; class snapshots_table : public streaming_virtual_table { sharded& _db; public: explicit snapshots_table(sharded& db) : streaming_virtual_table(build_schema()) , _db(db) { _shard_aware = true; } static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, "snapshots"); return schema_builder(system_keyspace::NAME, "snapshots", std::make_optional(id)) .with_column("keyspace_name", utf8_type, column_kind::partition_key) .with_column("table_name", utf8_type, column_kind::clustering_key) .with_column("snapshot_name", utf8_type, column_kind::clustering_key) .with_column("live", long_type) .with_column("total", long_type) .set_comment("Lists all the snapshots along with their size, dropped tables are not part of the listing.") .with_hash_version() .build(); } dht::decorated_key make_partition_key(const sstring& name) { return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(name).serialize_nonnull())); } clustering_key make_clustering_key(sstring table_name, sstring snapshot_name) { return clustering_key::from_exploded(*_s, { data_value(std::move(table_name)).serialize_nonnull(), data_value(std::move(snapshot_name)).serialize_nonnull() }); } future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { struct decorated_keyspace_name { schema_ptr s; dht::decorated_key key; auto operator<=>(const decorated_keyspace_name& o) const { return key.tri_compare(*o.s, o.key); } }; using snapshots_by_tables_map = std::map>; using snapshots_by_keyspace_map = std::map; snapshots_by_keyspace_map keyspace_snapshots; auto snapshot_details = co_await _db.local().get_snapshot_details(); for (const auto& [snapshot_name, details] : snapshot_details) { for (const auto& d : details) { auto dk = make_partition_key(d.ks); if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) { continue; } keyspace_snapshots[decorated_keyspace_name(_s, dk)][d.cf][snapshot_name] = d.details; } } for (const auto& [ks_data, snapshots_by_tables] : keyspace_snapshots) { co_await result.emit_partition_start(ks_data.key); for (const auto& [table_name, snapshots] : snapshots_by_tables) { for (auto& [snapshot_name, details] : snapshots) { clustering_row cr(make_clustering_key(table_name, snapshot_name)); set_cell(cr.cells(), "live", details.live); set_cell(cr.cells(), "total", details.total); co_await result.emit_row(std::move(cr)); } } co_await result.emit_partition_end(); } } }; class protocol_servers_table : public memtable_filling_virtual_table { private: service::storage_service& _ss; struct protocol_server_info { sstring name; sstring protocol; sstring protocol_version; std::vector listen_addresses; explicit protocol_server_info(protocol_server& s) : name(s.name()) , protocol(s.protocol()) , protocol_version(s.protocol_version()) { for (const auto& addr : s.listen_addresses()) { listen_addresses.push_back(format("{}:{}", addr.addr(), addr.port())); } } }; public: explicit protocol_servers_table(service::storage_service& ss) : memtable_filling_virtual_table(build_schema()) , _ss(ss) { _shard_aware = true; } static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, "protocol_servers"); return schema_builder(system_keyspace::NAME, "protocol_servers", std::make_optional(id)) .with_column("name", utf8_type, column_kind::partition_key) .with_column("protocol", utf8_type) .with_column("protocol_version", utf8_type) .with_column("listen_addresses", list_type_impl::get_instance(utf8_type, false)) .set_comment("Lists all client protocol servers and their status.") .with_hash_version() .build(); } future<> execute(std::function mutation_sink) override { // Servers are registered on shard 0 only const auto server_infos = co_await smp::submit_to(0ul, [&ss = _ss.container()] { return ss.local().protocol_servers() | std::views::transform([] (protocol_server* s) { return protocol_server_info(*s); }) | std::ranges::to(); }); for (auto server : server_infos) { auto dk = dht::decorate_key(*_s, partition_key::from_single_value(*schema(), data_value(server.name).serialize_nonnull())); if (!this_shard_owns(dk)) { continue; } mutation m(schema(), std::move(dk)); row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells(); set_cell(cr, "protocol", server.protocol); set_cell(cr, "protocol_version", server.protocol_version); std::vector addresses(server.listen_addresses.begin(), server.listen_addresses.end()); set_cell(cr, "listen_addresses", make_list_value(schema()->get_column_definition("listen_addresses")->type, std::move(addresses))); mutation_sink(std::move(m)); } } }; class runtime_info_table : public memtable_filling_virtual_table { private: sharded& _db; service::storage_service& _ss; std::optional _generic_key; private: std::optional maybe_make_key(sstring key) { auto dk = dht::decorate_key(*_s, partition_key::from_single_value(*schema(), data_value(std::move(key)).serialize_nonnull())); if (this_shard_owns(dk)) { return dk; } return std::nullopt; } void do_add_partition(std::function& mutation_sink, dht::decorated_key key, std::vector> rows) { mutation m(schema(), std::move(key)); for (auto&& [ckey, cvalue] : rows) { row& cr = m.partition().clustered_row(*schema(), clustering_key::from_single_value(*schema(), data_value(std::move(ckey)).serialize_nonnull())).cells(); set_cell(cr, "value", std::move(cvalue)); } mutation_sink(std::move(m)); } void add_partition(std::function& mutation_sink, sstring key, sstring value) { if (_generic_key) { do_add_partition(mutation_sink, *_generic_key, {{key, std::move(value)}}); } } void add_partition(std::function& mutation_sink, sstring key, std::initializer_list> rows) { auto dk = maybe_make_key(std::move(key)); if (dk) { do_add_partition(mutation_sink, std::move(*dk), std::move(rows)); } } future<> add_partition(std::function& mutation_sink, sstring key, std::function()> value_producer) { if (_generic_key) { do_add_partition(mutation_sink, *_generic_key, {{key, co_await value_producer()}}); } } future<> add_partition(std::function& mutation_sink, sstring key, std::function>>()> value_producer) { auto dk = maybe_make_key(std::move(key)); if (dk) { do_add_partition(mutation_sink, std::move(*dk), co_await value_producer()); } } template future map_reduce_tables(std::function map, std::function reduce = std::plus{}) { class shard_reducer { T _v{}; std::function _reduce; public: shard_reducer(std::function reduce) : _reduce(std::move(reduce)) { } future<> operator()(T v) { v = _reduce(_v, v); return make_ready_future<>(); } T get() && { return std::move(_v); } }; co_return co_await _db.map_reduce(shard_reducer(reduce), [map, reduce] (replica::database& db) { T val = {}; db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr table) { val = reduce(val, map(*table)); }); return val; }); } template future map_reduce_shards(std::function map, std::function reduce = std::plus{}, T initial = {}) { co_return co_await map_reduce( std::views::iota(0u, smp::count), [map] (shard_id shard) { return smp::submit_to(shard, [map] { return map(); }); }, initial, reduce); } public: explicit runtime_info_table(sharded& db, service::storage_service& ss) : memtable_filling_virtual_table(build_schema()) , _db(db) , _ss(ss) { _shard_aware = true; _generic_key = maybe_make_key("generic"); } static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, "runtime_info"); return schema_builder(system_keyspace::NAME, "runtime_info", std::make_optional(id)) .with_column("group", utf8_type, column_kind::partition_key) .with_column("item", utf8_type, column_kind::clustering_key) .with_column("value", utf8_type) .set_comment("Runtime system information.") .with_hash_version() .build(); } future<> execute(std::function mutation_sink) override { co_await add_partition(mutation_sink, "gossip_active", [this] () -> future { return _ss.is_gossip_running().then([] (bool running){ return format("{}", running); }); }); co_await add_partition(mutation_sink, "load", [this] () -> future { return map_reduce_tables([] (replica::table& tbl) { return tbl.get_stats().live_disk_space_used.on_disk; }).then([] (int64_t load) { return format("{}", load); }); }); add_partition(mutation_sink, "uptime", format("{} seconds", std::chrono::duration_cast(engine().uptime()).count())); add_partition(mutation_sink, "trace_probability", format("{:.2}", tracing::tracing::get_local_tracing_instance().get_trace_probability())); co_await add_partition(mutation_sink, "memory", [this] () { struct stats { // take the pre-reserved memory into account, as seastar only returns // the stats of memory managed by the seastar allocator, but we instruct // it to reserve addition memory for non-seastar allocator on per-shard // basis. uint64_t total = 0; uint64_t free = 0; static stats reduce(stats a, stats b) { return stats{ a.total + b.total + db::config::wasm_udf_reserved_memory, a.free + b.free}; }; }; return map_reduce_shards([] () { const auto& s = memory::stats(); return stats{s.total_memory(), s.free_memory()}; }, stats::reduce, stats{}).then([] (stats s) { return std::vector>{ {"total", format("{}", s.total)}, {"used", format("{}", s.total - s.free)}, {"free", format("{}", s.free)}}; }); }); co_await add_partition(mutation_sink, "memtable", [this] () { struct stats { uint64_t total = 0; uint64_t free = 0; uint64_t entries = 0; static stats reduce(stats a, stats b) { return stats{a.total + b.total, a.free + b.free, a.entries + b.entries}; } }; return map_reduce_tables([] (replica::table& t) { logalloc::occupancy_stats s; uint64_t partition_count = 0; t.for_each_active_memtable([&] (replica::memtable& active_memtable) { s += active_memtable.region().occupancy(); partition_count += active_memtable.partition_count(); }); return stats{s.total_space(), s.free_space(), partition_count}; }, stats::reduce).then([] (stats s) { return std::vector>{ {"memory_total", format("{}", s.total)}, {"memory_used", format("{}", s.total - s.free)}, {"memory_free", format("{}", s.free)}, {"entries", format("{}", s.entries)}}; }); }); co_await add_partition(mutation_sink, "cache", [this] () { struct stats { uint64_t total = 0; uint64_t free = 0; uint64_t entries = 0; uint64_t hits = 0; uint64_t misses = 0; utils::rate_moving_average hits_moving_average; utils::rate_moving_average requests_moving_average; static stats reduce(stats a, stats b) { return stats{ a.total + b.total, a.free + b.free, a.entries + b.entries, a.hits + b.hits, a.misses + b.misses, a.hits_moving_average + b.hits_moving_average, a.requests_moving_average + b.requests_moving_average}; } }; return _db.map_reduce0([] (replica::database& db) { stats res{}; auto occupancy = db.row_cache_tracker().region().occupancy(); res.total = occupancy.total_space(); res.free = occupancy.free_space(); res.entries = db.row_cache_tracker().partitions(); db.get_tables_metadata().for_each_table([&] (table_id id, lw_shared_ptr t) { auto& cache_stats = t->get_row_cache().stats(); res.hits += cache_stats.hits.count(); res.misses += cache_stats.misses.count(); res.hits_moving_average += cache_stats.hits.rate(); res.requests_moving_average += (cache_stats.hits.rate() + cache_stats.misses.rate()); }); return res; }, stats{}, stats::reduce).then([] (stats s) { return std::vector>{ {"memory_total", format("{}", s.total)}, {"memory_used", format("{}", s.total - s.free)}, {"memory_free", format("{}", s.free)}, {"entries", format("{}", s.entries)}, {"hits", format("{}", s.hits)}, {"misses", format("{}", s.misses)}, {"hit_rate_total", format("{:.2}", static_cast(s.hits) / static_cast(s.hits + s.misses))}, {"hit_rate_recent", format("{:.2}", s.hits_moving_average.mean_rate)}, {"requests_total", format("{}", s.hits + s.misses)}, {"requests_recent", format("{}", static_cast(s.requests_moving_average.mean_rate))}}; }); }); co_await add_partition(mutation_sink, "incremental_backup_enabled", [this] () { return _db.map_reduce0([] (replica::database& db) { return std::ranges::any_of(db.get_keyspaces(), [] (const auto& id_and_ks) { return id_and_ks.second.incremental_backups_enabled(); }); }, false, std::logical_or{}).then([] (bool res) -> sstring { return res ? "true" : "false"; }); }); } }; class versions_table : public memtable_filling_virtual_table { public: explicit versions_table() : memtable_filling_virtual_table(build_schema()) { _shard_aware = false; } static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::VERSIONS); return schema_builder(system_keyspace::NAME, system_keyspace::VERSIONS, std::make_optional(id)) .with_column("key", utf8_type, column_kind::partition_key) .with_column("version", utf8_type) .with_column("build_mode", utf8_type) .with_column("build_id", utf8_type) .set_comment("Version information.") .with_hash_version() .build(); } future<> execute(std::function mutation_sink) override { mutation m(schema(), partition_key::from_single_value(*schema(), data_value("local").serialize_nonnull())); row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells(); set_cell(cr, "version", scylla_version()); set_cell(cr, "build_mode", scylla_build_mode()); set_cell(cr, "build_id", get_build_id()); mutation_sink(std::move(m)); return make_ready_future<>(); } }; class db_config_table final : public streaming_virtual_table { db::config& _cfg; static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, "config"); return schema_builder(system_keyspace::NAME, "config", std::make_optional(id)) .with_column("name", utf8_type, column_kind::partition_key) .with_column("type", utf8_type) .with_column("source", utf8_type) .with_column("value", utf8_type) .with_hash_version() .build(); } future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { struct config_entry { dht::decorated_key key; std::string_view type; sstring source; sstring value; }; std::vector cfg; for (auto&& cfg_ref : _cfg.values()) { auto&& c = cfg_ref.get(); dht::decorated_key dk = dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(c.name()).serialize_nonnull())); if (this_shard_owns(dk)) { cfg.emplace_back(config_entry{ std::move(dk), c.type_name(), c.source_name(), c.value_as_json()._res }); } } std::ranges::sort(cfg, dht::ring_position_less_comparator(*_s), std::mem_fn(&config_entry::key)); for (auto&& c : cfg) { co_await result.emit_partition_start(c.key); mutation m(schema(), c.key); clustering_row cr(clustering_key::make_empty()); set_cell(cr.cells(), "type", c.type); set_cell(cr.cells(), "source", c.source); set_cell(cr.cells(), "value", c.value); co_await result.emit_row(std::move(cr)); co_await result.emit_partition_end(); } } virtual future<> apply(const frozen_mutation& fm) override { const mutation m = fm.unfreeze(_s); query::result_set rs(m); auto name = rs.row(0).get("name"); auto value = rs.row(0).get("value"); if (!_cfg.enable_cql_config_updates()) { return virtual_table::apply(fm); // will return back exceptional future } if (!name) { return make_exception_future<>(virtual_table_update_exception("option name is required")); } if (!value) { return make_exception_future<>(virtual_table_update_exception("option value is required")); } if (rs.row(0).cells().contains("type")) { return make_exception_future<>(virtual_table_update_exception("option type is immutable")); } if (rs.row(0).cells().contains("source")) { return make_exception_future<>(virtual_table_update_exception("option source is not updateable")); } return smp::submit_to(0, [&cfg = _cfg, name = std::move(*name), value = std::move(*value)] () mutable -> future<> { for (auto& c_ref : cfg.values()) { auto& c = c_ref.get(); if (c.name() == name) { std::exception_ptr ex; try { if (co_await c.set_value_on_all_shards(value, utils::config_file::config_source::CQL)) { co_return; } else { ex = std::make_exception_ptr(virtual_table_update_exception("option is not live-updateable")); } } catch (boost::bad_lexical_cast&) { ex = std::make_exception_ptr(virtual_table_update_exception("cannot parse option value")); } co_await coroutine::return_exception_ptr(std::move(ex)); } } co_await coroutine::return_exception(virtual_table_update_exception("no such option")); }); } public: explicit db_config_table(db::config& cfg) : streaming_virtual_table(build_schema()) , _cfg(cfg) { _shard_aware = true; } }; class clients_table : public streaming_virtual_table { service::storage_service& _ss; static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, "clients"); return schema_builder(system_keyspace::NAME, "clients", std::make_optional(id)) .with_column("address", inet_addr_type, column_kind::partition_key) .with_column("port", int32_type, column_kind::clustering_key) .with_column("client_type", utf8_type, column_kind::clustering_key) .with_column("shard_id", int32_type) .with_column("connection_stage", utf8_type) .with_column("driver_name", utf8_type) .with_column("driver_version", utf8_type) .with_column("hostname", utf8_type) .with_column("protocol_version", int32_type) .with_column("ssl_cipher_suite", utf8_type) .with_column("ssl_enabled", boolean_type) .with_column("ssl_protocol", utf8_type) .with_column("username", utf8_type) .with_column("scheduling_group", utf8_type) .with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false)) .with_hash_version() .build(); } dht::decorated_key make_partition_key(net::inet_address ip) { return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(ip).serialize_nonnull())); } clustering_key make_clustering_key(int32_t port, sstring clt) { return clustering_key::from_exploded(*_s, { data_value(port).serialize_nonnull(), data_value(clt).serialize_nonnull() }); } future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { // Collect using client_data_vec = utils::chunked_vector>>; using shard_client_data = std::vector; std::vector>> cd_vec; cd_vec.resize(smp::count); auto servers = co_await _ss.container().invoke_on(0, [] (auto& ss) { return ss.protocol_servers(); }); co_await smp::invoke_on_all([&cd_vec_ = cd_vec, &servers_ = servers] () -> future<> { auto& cd_vec = cd_vec_; auto& servers = servers_; auto scd = std::make_unique(); for (const auto& ps : servers) { client_data_vec cds = co_await ps->get_client_data(); if (cds.size() != 0) { scd->emplace_back(std::move(cds)); } } cd_vec[this_shard_id()] = make_foreign(std::move(scd)); }); // Partition struct decorated_ip { dht::decorated_key key; net::inet_address ip; struct compare { dht::ring_position_less_comparator less; explicit compare(const class schema& s) : less(s) {} bool operator()(const decorated_ip& a, const decorated_ip& b) const { return less(a.key, b.key); } }; }; decorated_ip::compare cmp(*_s); std::set ips(cmp); std::unordered_map cd_map; for (unsigned i = 0; i < smp::count; i++) { for (auto&& ps_cdc : *cd_vec[i]) { for (auto&& cd : ps_cdc) { if (cd_map.contains(cd->ip)) { cd_map[cd->ip].emplace_back(std::move(cd)); } else { dht::decorated_key key = make_partition_key(cd->ip); if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) { ips.insert(decorated_ip{std::move(key), cd->ip}); cd_map[cd->ip].emplace_back(std::move(cd)); } } co_await coroutine::maybe_yield(); } } } // Emit for (const auto& dip : ips) { co_await result.emit_partition_start(dip.key); auto& clients = cd_map[dip.ip]; std::ranges::sort(clients, [] (const foreign_ptr>& a, const foreign_ptr>& b) { if (a->port != b->port) { return a->port < b->port; } return a->client_type_str() < b->client_type_str(); }); for (const auto& cd : clients) { clustering_row cr(make_clustering_key(cd->port, cd->client_type_str())); set_cell(cr.cells(), "shard_id", cd->shard_id); set_cell(cr.cells(), "connection_stage", cd->stage_str()); if (cd->driver_name) { set_cell(cr.cells(), "driver_name", cd->driver_name->key()); } if (cd->driver_version) { set_cell(cr.cells(), "driver_version", cd->driver_version->key()); } if (cd->hostname) { set_cell(cr.cells(), "hostname", *cd->hostname); } if (cd->protocol_version) { set_cell(cr.cells(), "protocol_version", *cd->protocol_version); } if (cd->ssl_cipher_suite) { set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite); } if (cd->ssl_enabled) { set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled); } if (cd->ssl_protocol) { set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol); } set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous")); if (cd->scheduling_group_name) { set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name); } auto map_type = map_type_impl::get_instance( utf8_type, utf8_type, false ); auto prepare_client_options = [] (const auto& client_options) { map_type_impl::native_type tmp; for (auto& co: client_options) { auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key())); tmp.push_back(std::move(map_element)); } return tmp; }; set_cell(cr.cells(), "client_options", make_map_value(map_type, prepare_client_options(cd->client_options))); co_await result.emit_row(std::move(cr)); } co_await result.emit_partition_end(); } } public: clients_table(service::storage_service& ss) : streaming_virtual_table(build_schema()) , _ss(ss) { _shard_aware = true; } }; // Shows the current state of each Raft group. // Currently it shows only the configuration. // In the future we plan to add additional columns with more information. class raft_state_table : public streaming_virtual_table { private: sharded& _raft_gr; public: raft_state_table(sharded& raft_gr) : streaming_virtual_table(build_schema()) , _raft_gr(raft_gr) { } future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { struct decorated_gid { raft::group_id gid; dht::decorated_key key; unsigned shard; }; auto groups_and_shards = co_await _raft_gr.map([] (service::raft_group_registry& raft_gr) { return std::pair{raft_gr.all_groups(), this_shard_id()}; }); std::vector decorated_gids; for (auto& [groups, shard]: groups_and_shards) { for (auto& gid: groups) { decorated_gids.push_back(decorated_gid{gid, make_partition_key(gid), shard}); } } // Must return partitions in token order. std::sort(decorated_gids.begin(), decorated_gids.end(), [less = dht::ring_position_less_comparator(*_s)] (const decorated_gid& l, const decorated_gid& r) { return less(l.key, r.key); }); for (auto& [gid, dk, shard]: decorated_gids) { if (!contains_key(qr.partition_range(), dk)) { continue; } auto cfg_opt = co_await _raft_gr.invoke_on(shard, [gid=gid] (service::raft_group_registry& raft_gr) -> std::optional { // Be ready for a group to disappear while we're querying. auto* srv = raft_gr.find_server(gid); if (!srv) { return std::nullopt; } // FIXME: the configuration returned here is obtained from raft::fsm, it may not be // persisted yet, so this is not 100% correct. It may happen that we crash after // a config entry is appended in-memory in fsm but before it's persisted. It would be // incorrect to return the configuration observed during this window - after restart // the configuration would revert to the previous one. Perhaps this is unlikely to // happen in practice, but for correctness we should add a way of querying the // latest persisted configuration. return srv->get_configuration(); }); if (!cfg_opt) { continue; } co_await result.emit_partition_start(dk); // List current config first, because 'C' < 'P' and the disposition // (ascii_type, 'CURRENT' vs 'PREVIOUS') is the first column in the clustering key. co_await emit_member_set(result, "CURRENT", cfg_opt->current); co_await emit_member_set(result, "PREVIOUS", cfg_opt->previous); co_await result.emit_partition_end(); } } private: static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, "raft_state"); return schema_builder(system_keyspace::NAME, "raft_state", std::make_optional(id)) .with_column("group_id", timeuuid_type, column_kind::partition_key) .with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS' .with_column("server_id", uuid_type, column_kind::clustering_key) .with_column("can_vote", boolean_type) .set_comment("Currently operating RAFT configuration") .with_hash_version() .build(); } dht::decorated_key make_partition_key(raft::group_id gid) { // Make sure to use timeuuid_native_type so comparisons are done correctly // (we must emit partitions in the correct token order). return dht::decorate_key(*_s, partition_key::from_single_value( *_s, data_value(timeuuid_native_type{gid.uuid()}).serialize_nonnull())); } clustering_key make_clustering_key(std::string_view disposition, raft::server_id id) { return clustering_key::from_exploded(*_s, { data_value(disposition).serialize_nonnull(), data_value(id.uuid()).serialize_nonnull() }); } future<> emit_member_set(result_collector& result, std::string_view disposition, const raft::config_member_set& set) { // Must sort servers in clustering order (i.e. according to their IDs). // This is how `config_member::operator<` works so no need for custom comparator. std::vector members{set.begin(), set.end()}; std::sort(members.begin(), members.end()); for (auto& member: members) { clustering_row cr{make_clustering_key(disposition, member.addr.id)}; set_cell(cr.cells(), "can_vote", member.can_vote); co_await result.emit_row(std::move(cr)); } } }; /// Base class for virtual tables which are supposed to serve data from group0 leader. /// If queried on a different node, read will be redirected to the leader. /// Implementations need to override execute_on_leader(), which will be executed /// on shard0 of current group0 leader. /// /// Current implementation is suitable for tables which are relatively small as all /// data is materialized in memory and queried in one page. class group0_virtual_table : public memtable_filling_virtual_table { private: sharded& _raft_gr; sharded& _ms; public: group0_virtual_table(schema_ptr s, sharded& raft_gr, sharded& ms) : memtable_filling_virtual_table(s) , _raft_gr(raft_gr) , _ms(ms) { } future get_leader(const reader_permit& permit) const { while (!_raft_gr.local().group0().current_leader()) { abort_on_expiry aoe(permit.timeout()); reader_permit::awaits_guard ag(permit); co_await _raft_gr.local().group0().read_barrier(&aoe.abort_source()); } co_return _raft_gr.local().group0().current_leader(); } future is_leader(const reader_permit& permit) const { auto leader = co_await get_leader(permit); co_return leader == _raft_gr.local().get_my_raft_id(); } future<> redirect_to_leader(std::function mutation_sink, reader_permit permit) { auto leader = co_await get_leader(permit); auto cmd = query::read_command(_s->id(), _s->version(), partition_slice_builder(*_s).build(), query::max_result_size(256*1024*1024), // Sanity limit query::tombstone_limit::max, query::row_limit(query::max_rows), query::partition_limit(query::max_partitions)); reader_permit::awaits_guard ag(permit); auto&& [result, hit_rate, opt_exception] = co_await ser::storage_proxy_rpc_verbs::send_read_mutation_data(&_ms.local(), locator::host_id(leader.uuid()), permit.timeout(), cmd, query::full_partition_range, {}); if (opt_exception.has_value() && *opt_exception) { co_await coroutine::return_exception_ptr((*opt_exception).into_exception_ptr()); } for (auto&& p : result.partitions()) { mutation_sink(p.mut().unfreeze(_s)); co_await coroutine::maybe_yield(); } co_return; } // This function is executed on the node which was a group0 leader at some point since the query started. // The node may not be the leader anymore by the time it is executed. // Only executed on shard 0. virtual future<> execute_on_leader(std::function mutation_sink, reader_permit) = 0; future<> execute(std::function mutation_sink, reader_permit permit) override { if (this_shard_id() != 0) { co_return; } if (co_await is_leader(permit)) { co_await execute_on_leader(std::move(mutation_sink), std::move(permit)); co_return; } co_await redirect_to_leader(std::move(mutation_sink), std::move(permit)); } }; class load_per_node : public group0_virtual_table { private: sharded& _talloc; sharded& _db; sharded& _gossiper; public: load_per_node(sharded& talloc, sharded& db, sharded& raft_gr, sharded& ms, sharded& gossiper) : group0_virtual_table(build_schema(), raft_gr, ms) , _talloc(talloc) , _db(db) , _gossiper(gossiper) { } future<> execute_on_leader(std::function mutation_sink, reader_permit permit) override { auto stats = _talloc.local().get_load_stats(); while (!stats) { // Wait for stats to be refreshed by topology coordinator { abort_on_expiry aoe(permit.timeout()); reader_permit::awaits_guard ag(permit); co_await seastar::sleep_abortable(std::chrono::milliseconds(200), aoe.abort_source()); } if (!co_await is_leader(permit)) { co_await redirect_to_leader(std::move(mutation_sink), std::move(permit)); co_return; } stats = _talloc.local().get_load_stats(); } auto tm = _db.local().get_token_metadata_ptr(); const uint64_t default_tablet_size = _db.local().get_config().target_tablet_size_in_bytes(); locator::load_sketch load(tm, stats, default_tablet_size); co_await load.populate(); tm->get_topology().for_each_node([&] (const auto& node) { auto host = node.host_id(); mutation m(schema(), make_partition_key(host)); auto& r = m.partition().clustered_row(*schema(), clustering_key::make_empty()); set_cell(r.cells(), "dc", node.dc()); set_cell(r.cells(), "rack", node.rack()); set_cell(r.cells(), "up", _gossiper.local().is_alive(host)); set_cell(r.cells(), "draining", node.is_draining()); set_cell(r.cells(), "excluded", node.is_excluded()); if (auto ip = _gossiper.local().get_address_map().find(host)) { set_cell(r.cells(), "ip", data_value(inet_address(*ip))); } set_cell(r.cells(), "tablets_allocated", int64_t(load.get_tablet_count(host))); set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_tablet_count(host)))); set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_tablet_count(host) * default_tablet_size))); if (stats && stats->capacity.contains(host)) { auto capacity = stats->capacity.at(host); set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity))); if (auto ts_iter = stats->tablet_stats.find(host); ts_iter != stats->tablet_stats.end()) { set_cell(r.cells(), "effective_capacity", data_value(int64_t(ts_iter->second.effective_capacity))); } if (auto utilization = load.get_allocated_utilization(host)) { set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization))); } if (load.has_complete_data(host)) { if (auto utilization = load.get_storage_utilization(host)) { set_cell(r.cells(), "storage_utilization", data_value(double(*utilization))); } set_cell(r.cells(), "storage_load", data_value(int64_t(load.get_disk_used(host)))); } } mutation_sink(m); }); } private: static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, "load_per_node"); return schema_builder(system_keyspace::NAME, "load_per_node", std::make_optional(id)) .with_column("node", uuid_type, column_kind::partition_key) .with_column("dc", utf8_type) .with_column("rack", utf8_type) .with_column("ip", inet_addr_type) .with_column("up", boolean_type) .with_column("draining", boolean_type) .with_column("excluded", boolean_type) .with_column("tablets_allocated", long_type) .with_column("tablets_allocated_per_shard", double_type) .with_column("storage_capacity", long_type) .with_column("effective_capacity", long_type) .with_column("storage_allocated_load", long_type) .with_column("storage_allocated_utilization", double_type) .with_column("storage_load", long_type) .with_column("storage_utilization", double_type) .with_sharder(1, 0) // shard0-only .with_hash_version() .build(); } dht::decorated_key make_partition_key(host_id host) { return dht::decorate_key(*_s, partition_key::from_single_value( *_s, data_value(host.uuid()).serialize_nonnull())); } }; class tablet_sizes : public group0_virtual_table { private: sharded& _talloc; sharded& _db; public: tablet_sizes(sharded& talloc, sharded& db, sharded& raft_gr, sharded& ms) : group0_virtual_table(build_schema(), raft_gr, ms) , _talloc(talloc) , _db(db) { } future<> execute_on_leader(std::function mutation_sink, reader_permit permit) override { auto stats = _talloc.local().get_load_stats(); while (!stats) { // Wait for stats to be refreshed by topology coordinator { abort_on_expiry aoe(permit.timeout()); reader_permit::awaits_guard ag(permit); co_await seastar::sleep_abortable(std::chrono::milliseconds(200), aoe.abort_source()); } if (!co_await is_leader(permit)) { co_await redirect_to_leader(std::move(mutation_sink), std::move(permit)); co_return; } stats = _talloc.local().get_load_stats(); } auto tm = _db.local().get_token_metadata_ptr(); auto prepare_replica_sizes = [] (const std::unordered_map& replica_sizes) { map_type_impl::native_type tmp; for (auto& r: replica_sizes) { auto replica = r.first.uuid(); int64_t tablet_size = int64_t(r.second); auto map_element = std::make_pair(data_value(replica), data_value(tablet_size)); tmp.push_back(std::move(map_element)); } return tmp; }; auto prepare_missing_replica = [] (const std::unordered_set& missing_replicas) { set_type_impl::native_type tmp; for (auto& r: missing_replicas) { tmp.push_back(data_value(r.uuid())); } return tmp; }; auto map_type = map_type_impl::get_instance(uuid_type, long_type, false); auto set_type = set_type_impl::get_instance(uuid_type, false); for (auto&& [table, tmap] : tm->tablets().all_tables_ungrouped()) { mutation m(schema(), make_partition_key(table)); co_await tmap->for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) -> future<> { auto trange = tmap->get_token_range(tid); int64_t last_token = trange.end()->value().raw(); auto& r = m.partition().clustered_row(*schema(), clustering_key::from_single_value(*schema(), data_value(last_token).serialize_nonnull())); const range_based_tablet_id rb_tid {table, trange}; std::unordered_map replica_sizes; std::unordered_set missing_replicas; for (auto& replica : tinfo.replicas) { auto tablet_size_opt = stats->get_tablet_size(replica.host, rb_tid); if (tablet_size_opt) { replica_sizes[replica.host] = *tablet_size_opt; } else { missing_replicas.insert(replica.host); } } set_cell(r.cells(), "replicas", make_map_value(map_type, prepare_replica_sizes(replica_sizes))); set_cell(r.cells(), "missing_replicas", make_set_value(set_type, prepare_missing_replica(missing_replicas))); return make_ready_future<>(); }); mutation_sink(m); } } private: static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, "tablet_sizes"); return schema_builder(system_keyspace::NAME, "tablet_sizes", std::make_optional(id)) .with_column("table_id", uuid_type, column_kind::partition_key) .with_column("last_token", long_type, column_kind::clustering_key) .with_column("replicas", map_type_impl::get_instance(uuid_type, long_type, false)) .with_column("missing_replicas", set_type_impl::get_instance(uuid_type, false)) .with_sharder(1, 0) // shard0-only .with_hash_version() .build(); } dht::decorated_key make_partition_key(table_id table) { return dht::decorate_key(*_s, partition_key::from_single_value( *_s, data_value(table.uuid()).serialize_nonnull())); } }; class cdc_timestamps_table : public streaming_virtual_table { private: replica::database& _db; service::storage_service& _ss; public: cdc_timestamps_table(replica::database& db, service::storage_service& ss) : streaming_virtual_table(build_schema()) , _db(db) , _ss(ss) { _shard_aware = true; } future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { struct decorated_table_name { table_id table; dht::decorated_key key; }; std::vector tables; for (auto table : _ss.get_tables_with_cdc_tablet_streams()) { auto& cf = _db.find_column_family(table); auto base_schema = cdc::get_base_table(_db, *cf.schema()); auto dk = make_partition_key(base_schema->ks_name(), base_schema->cf_name()); if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) { continue; } tables.emplace_back(decorated_table_name{table, std::move(dk)}); } std::ranges::sort(tables, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_table_name::key)); for (const auto& [table, dk] : tables) { co_await result.emit_partition_start(dk); auto created_at = api::new_timestamp(); co_await _ss.query_cdc_timestamps(table, false, [&] (db_clock::time_point ts) -> future<> { clustering_row cr(make_clustering_key(ts)); cr.apply(row_marker(created_at)); return result.emit_row(std::move(cr)); }); co_await result.emit_partition_end(); } } private: static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::CDC_TIMESTAMPS); return schema_builder(system_keyspace::NAME, system_keyspace::CDC_TIMESTAMPS, std::make_optional(id)) .with_column("keyspace_name", utf8_type, column_kind::partition_key) .with_column("table_name", utf8_type, column_kind::partition_key) .with_column("timestamp", reversed_type_impl::get_instance(timestamp_type), column_kind::clustering_key) .set_comment("CDC generations for tablets-based tables") .with_hash_version() .build(); } dht::decorated_key make_partition_key(const std::string& ks_name, const std::string& cf_name) { return dht::decorate_key(*_s, partition_key::from_exploded(*_s, { data_value(ks_name).serialize_nonnull(), data_value(cf_name).serialize_nonnull() })); } clustering_key make_clustering_key(db_clock::time_point ts) { return clustering_key::from_single_value(*_s, timestamp_type->decompose(ts) ); } }; class cdc_streams_table : public streaming_virtual_table { private: replica::database& _db; service::storage_service& _ss; public: cdc_streams_table(replica::database& db, service::storage_service& ss) : streaming_virtual_table(build_schema()) , _db(db) , _ss(ss) { _shard_aware = true; } future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { struct decorated_table_name { table_id table; dht::decorated_key key; }; std::vector tables; for (auto table : _ss.get_tables_with_cdc_tablet_streams()) { auto& cf = _db.find_column_family(table); auto base_schema = cdc::get_base_table(_db, *cf.schema()); auto dk = make_partition_key(base_schema->ks_name(), base_schema->cf_name()); if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) { continue; } tables.emplace_back(decorated_table_name{table, std::move(dk)}); } std::ranges::sort(tables, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_table_name::key)); for (const auto& [table, dk] : tables) { co_await result.emit_partition_start(dk); auto created_at = api::new_timestamp(); auto emit_stream_set = [&] (db_clock::time_point ts, cdc::stream_state kind, const auto& sids) -> future<> { for (const auto& sid : sids) { clustering_row cr(make_clustering_key(ts, kind, sid)); cr.apply(row_marker(created_at)); co_await result.emit_row(std::move(cr)); } }; // emit in clustering order static_assert(int(cdc::stream_state::current) < int(cdc::stream_state::closed)); static_assert(int(cdc::stream_state::closed) < int(cdc::stream_state::opened)); co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const utils::chunked_vector& current, cdc::cdc_stream_diff diff) -> future<> { co_await emit_stream_set(ts, cdc::stream_state::current, current); co_await emit_stream_set(ts, cdc::stream_state::closed, diff.closed_streams); co_await emit_stream_set(ts, cdc::stream_state::opened, diff.opened_streams); }); co_await result.emit_partition_end(); } } private: static schema_ptr build_schema() { auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::CDC_STREAMS); return schema_builder(system_keyspace::NAME, system_keyspace::CDC_STREAMS, std::make_optional(id)) .with_column("keyspace_name", utf8_type, column_kind::partition_key) .with_column("table_name", utf8_type, column_kind::partition_key) .with_column("timestamp", timestamp_type, column_kind::clustering_key) .with_column("stream_state", byte_type, column_kind::clustering_key) .with_column("stream_id", bytes_type, column_kind::clustering_key) .set_comment("CDC streams for each generation for tablets-based tables") .with_hash_version() .build(); } dht::decorated_key make_partition_key(const std::string& ks_name, const std::string& cf_name) { return dht::decorate_key(*_s, partition_key::from_exploded(*_s, { data_value(ks_name).serialize_nonnull(), data_value(cf_name).serialize_nonnull() })); } clustering_key make_clustering_key(db_clock::time_point ts, cdc::stream_state kind, const cdc::stream_id& sid) { return clustering_key::from_exploded(*_s, { timestamp_type->decompose(ts), byte_type->decompose(std::to_underlying(kind)), sid.to_bytes() }); } }; } future<> initialize_virtual_tables( sharded& dist_db, sharded& dist_ss, sharded& dist_gossiper, sharded& dist_raft_gr, sharded& sys_ks, sharded& tablet_allocator, sharded& ms, db::config& cfg) { auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry(); auto& virtual_tables = *virtual_tables_registry; auto& db = dist_db.local(); auto& ss = dist_ss.local(); auto add_table = [&] (std::unique_ptr&& tbl) -> future<> { auto schema = tbl->schema(); virtual_tables[schema->id()] = std::move(tbl); co_await db.create_local_system_table(schema, false, ss.get_erm_factory()); auto& cf = db.find_column_family(schema); cf.mark_ready_for_writes(nullptr); auto& vt = virtual_tables[schema->id()]; cf.set_virtual_reader(vt->as_mutation_source()); cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); }); }; // Add built-in virtual tables here. co_await add_table(std::make_unique(dist_ss, dist_gossiper)); co_await add_table(std::make_unique(db, ss)); co_await add_table(std::make_unique(dist_db)); co_await add_table(std::make_unique(ss)); co_await add_table(std::make_unique(dist_db, ss)); co_await add_table(std::make_unique()); co_await add_table(std::make_unique(cfg)); co_await add_table(std::make_unique(ss)); co_await add_table(std::make_unique(dist_raft_gr)); co_await add_table(std::make_unique(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper)); co_await add_table(std::make_unique(tablet_allocator, dist_db, dist_raft_gr, ms)); co_await add_table(std::make_unique(db, ss)); co_await add_table(std::make_unique(db, ss)); db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local()))); db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db))); db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db))); } virtual_tables_registry::virtual_tables_registry() : unique_ptr(std::make_unique()) { } virtual_tables_registry::~virtual_tables_registry() = default; } // namespace db