Files
scylladb/db/virtual_tables.cc
Vlad Zolotarov 28cbaef110 service/client_state and alternator/server: use cached values for driver_name and driver_version fields
Optimize memory usage changing types of driver_name and driver_version be
a reference to a cached value instead of an sstring.

These fields very often have the same value among different connections hence
it makes sense to cache these values and use references to them instead of duplicating
such strings in each connection state.

Signed-off-by: Vlad Zolotarov <vladz@scylladb.com>
2025-12-20 12:26:22 -05:00

1489 lines
65 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <algorithm>
#include <boost/algorithm/string.hpp>
#include <boost/lexical_cast.hpp>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/json/json_elements.hh>
#include <seastar/core/reactor.hh>
#include "cdc/generation_service.hh"
#include "cdc/log.hh"
#include "cdc/metadata.hh"
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "db/virtual_table.hh"
#include "partition_slice_builder.hh"
#include "db/virtual_tables.hh"
#include "db/size_estimates_virtual_reader.hh"
#include "db/view/build_progress_virtual_reader.hh"
#include "index/built_indexes_virtual_reader.hh"
#include "gms/gossiper.hh"
#include "mutation/frozen_mutation.hh"
#include "transport/protocol_server.hh"
#include "release.hh"
#include "replica/database.hh"
#include "schema/schema_builder.hh"
#include "service/raft/raft_group_registry.hh"
#include "service/storage_service.hh"
#include "service/tablet_allocator.hh"
#include "locator/load_sketch.hh"
#include "types/list.hh"
#include "types/types.hh"
#include "utils/build_id.hh"
#include "utils/log.hh"
#include "replica/exceptions.hh"
#include "service/paxos/paxos_state.hh"
#include "idl/storage_proxy.dist.hh"
using namespace locator;
namespace db {
namespace {
logging::logger vtlog("virtual_tables");
class cluster_status_table : public memtable_filling_virtual_table {
private:
sharded<service::storage_service>& _dist_ss;
sharded<gms::gossiper>& _dist_gossiper;
public:
cluster_status_table(sharded<service::storage_service>& ss, sharded<gms::gossiper>& g)
: memtable_filling_virtual_table(build_schema())
, _dist_ss(ss), _dist_gossiper(g) {}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "cluster_status");
return schema_builder(system_keyspace::NAME, "cluster_status", std::make_optional(id))
.with_column("peer", inet_addr_type, column_kind::partition_key)
.with_column("dc", utf8_type)
.with_column("up", boolean_type)
.with_column("status", utf8_type)
.with_column("load", utf8_type)
.with_column("tokens", int32_type)
.with_column("owns", float_type)
.with_column("host_id", uuid_type)
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
auto muts = co_await _dist_ss.invoke_on(0, [this] (service::storage_service& ss) -> future<utils::chunked_vector<frozen_mutation>> {
auto& gossiper = _dist_gossiper.local();
auto ownership = co_await ss.get_ownership();
const locator::token_metadata& tm = ss.get_token_metadata();
utils::chunked_vector<frozen_mutation> muts;
muts.reserve(gossiper.num_endpoints());
gossiper.for_each_endpoint_state([&] (const gms::endpoint_state& eps) {
static thread_local auto s = build_schema();
auto endpoint = eps.get_host_id();
mutation m(s, partition_key::from_single_value(*s, data_value(eps.get_ip()).serialize_nonnull()));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
auto hostid = eps.get_host_id();
set_cell(cr, "up", gossiper.is_alive(hostid));
if (!ss.raft_topology_change_enabled() || gossiper.is_shutdown(endpoint)) {
set_cell(cr, "status", gossiper.get_gossip_status(endpoint));
}
set_cell(cr, "load", gossiper.get_application_state_value(endpoint, gms::application_state::LOAD));
if (ss.raft_topology_change_enabled() && !gossiper.is_shutdown(endpoint)) {
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(hostid))));
}
set_cell(cr, "host_id", hostid.uuid());
if (tm.get_topology().has_node(hostid)) {
// Not all entries in gossiper are present in the topology
sstring dc = tm.get_topology().get_location(hostid).dc;
set_cell(cr, "dc", dc);
}
if (ownership.contains(eps.get_ip())) {
set_cell(cr, "owns", ownership[eps.get_ip()]);
}
set_cell(cr, "tokens", int32_t(tm.get_tokens(hostid).size()));
muts.push_back(freeze(std::move(m)));
});
co_return muts;
});
for (auto& m : muts) {
mutation_sink(m.unfreeze(schema()));
}
}
};
class token_ring_table : public streaming_virtual_table {
private:
replica::database& _db;
service::storage_service& _ss;
public:
token_ring_table(replica::database& db, service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _db(db)
, _ss(ss)
{
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "token_ring");
return schema_builder(system_keyspace::NAME, "token_ring", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::clustering_key)
.with_column("start_token", utf8_type, column_kind::clustering_key)
.with_column("endpoint", inet_addr_type, column_kind::clustering_key)
.with_column("end_token", utf8_type)
.with_column("dc", utf8_type)
.with_column("rack", utf8_type)
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const sstring& name) {
return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(name).serialize_nonnull()));
}
clustering_key make_clustering_key(const sstring& table_name, sstring start_token, gms::inet_address host) {
return clustering_key::from_exploded(*_s, {
data_value(table_name).serialize_nonnull(),
data_value(start_token).serialize_nonnull(),
data_value(host).serialize_nonnull()
});
}
future<> emit_ring(result_collector& result, const dht::decorated_key& dk, const sstring& table_name, utils::chunked_vector<dht::token_range_endpoints> ranges) {
co_await result.emit_partition_start(dk);
std::ranges::sort(ranges, std::ranges::less(), std::mem_fn(&dht::token_range_endpoints::_start_token));
for (dht::token_range_endpoints& range : ranges) {
std::ranges::sort(range._endpoint_details, endpoint_details_cmp());
for (const dht::endpoint_details& detail : range._endpoint_details) {
clustering_row cr(make_clustering_key(table_name, range._start_token, detail._host));
set_cell(cr.cells(), "end_token", sstring(range._end_token));
set_cell(cr.cells(), "dc", sstring(detail._datacenter));
set_cell(cr.cells(), "rack", sstring(detail._rack));
co_await result.emit_row(std::move(cr));
}
}
co_await result.emit_partition_end();
}
struct endpoint_details_cmp {
bool operator()(const dht::endpoint_details& l, const dht::endpoint_details& r) const {
return inet_addr_type->less(
data_value(l._host).serialize_nonnull(),
data_value(r._host).serialize_nonnull());
}
};
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_keyspace_name {
sstring name;
dht::decorated_key key;
};
auto keyspace_names = _db.get_non_local_strategy_keyspaces()
| std::views::transform([this] (auto&& ks) {
return decorated_keyspace_name{ks, make_partition_key(ks)};
})
| std::ranges::to<std::vector>();
std::ranges::sort(keyspace_names, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_keyspace_name::key));
for (const decorated_keyspace_name& e : keyspace_names) {
auto&& dk = e.key;
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk) || !_db.has_keyspace(e.name)) {
continue;
}
if (_db.find_keyspace(e.name).get_replication_strategy().uses_tablets()) {
co_await _db.get_tables_metadata().for_each_table_gently([&, this] (table_id, lw_shared_ptr<replica::table> table) -> future<> {
if (table->schema()->ks_name() != e.name) {
co_return;
}
const auto& table_name = table->schema()->cf_name();
utils::chunked_vector<dht::token_range_endpoints> ranges = co_await _ss.describe_ring_for_table(e.name, table_name);
co_await emit_ring(result, e.key, table_name, std::move(ranges));
});
} else {
utils::chunked_vector<dht::token_range_endpoints> ranges = co_await _ss.describe_ring(e.name);
co_await emit_ring(result, e.key, "<ALL>", std::move(ranges));
}
}
}
};
class snapshots_table : public streaming_virtual_table {
sharded<replica::database>& _db;
public:
explicit snapshots_table(sharded<replica::database>& db)
: streaming_virtual_table(build_schema())
, _db(db)
{
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "snapshots");
return schema_builder(system_keyspace::NAME, "snapshots", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::clustering_key)
.with_column("snapshot_name", utf8_type, column_kind::clustering_key)
.with_column("live", long_type)
.with_column("total", long_type)
.set_comment("Lists all the snapshots along with their size, dropped tables are not part of the listing.")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const sstring& name) {
return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(name).serialize_nonnull()));
}
clustering_key make_clustering_key(sstring table_name, sstring snapshot_name) {
return clustering_key::from_exploded(*_s, {
data_value(std::move(table_name)).serialize_nonnull(),
data_value(std::move(snapshot_name)).serialize_nonnull()
});
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_keyspace_name {
schema_ptr s;
dht::decorated_key key;
auto operator<=>(const decorated_keyspace_name& o) const {
return key.tri_compare(*o.s, o.key);
}
};
using snapshots_by_tables_map = std::map<sstring, std::map<sstring, replica::table::snapshot_details>>;
using snapshots_by_keyspace_map = std::map<decorated_keyspace_name, snapshots_by_tables_map>;
snapshots_by_keyspace_map keyspace_snapshots;
auto snapshot_details = co_await _db.local().get_snapshot_details();
for (const auto& [snapshot_name, details] : snapshot_details) {
for (const auto& d : details) {
auto dk = make_partition_key(d.ks);
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) {
continue;
}
keyspace_snapshots[decorated_keyspace_name(_s, dk)][d.cf][snapshot_name] = d.details;
}
}
for (const auto& [ks_data, snapshots_by_tables] : keyspace_snapshots) {
co_await result.emit_partition_start(ks_data.key);
for (const auto& [table_name, snapshots] : snapshots_by_tables) {
for (auto& [snapshot_name, details] : snapshots) {
clustering_row cr(make_clustering_key(table_name, snapshot_name));
set_cell(cr.cells(), "live", details.live);
set_cell(cr.cells(), "total", details.total);
co_await result.emit_row(std::move(cr));
}
}
co_await result.emit_partition_end();
}
}
};
class protocol_servers_table : public memtable_filling_virtual_table {
private:
service::storage_service& _ss;
struct protocol_server_info {
sstring name;
sstring protocol;
sstring protocol_version;
std::vector<sstring> listen_addresses;
explicit protocol_server_info(protocol_server& s)
: name(s.name())
, protocol(s.protocol())
, protocol_version(s.protocol_version()) {
for (const auto& addr : s.listen_addresses()) {
listen_addresses.push_back(format("{}:{}", addr.addr(), addr.port()));
}
}
};
public:
explicit protocol_servers_table(service::storage_service& ss)
: memtable_filling_virtual_table(build_schema())
, _ss(ss) {
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "protocol_servers");
return schema_builder(system_keyspace::NAME, "protocol_servers", std::make_optional(id))
.with_column("name", utf8_type, column_kind::partition_key)
.with_column("protocol", utf8_type)
.with_column("protocol_version", utf8_type)
.with_column("listen_addresses", list_type_impl::get_instance(utf8_type, false))
.set_comment("Lists all client protocol servers and their status.")
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
// Servers are registered on shard 0 only
const auto server_infos = co_await smp::submit_to(0ul, [&ss = _ss.container()] {
return ss.local().protocol_servers()
| std::views::transform([] (protocol_server* s) { return protocol_server_info(*s); })
| std::ranges::to<std::vector>();
});
for (auto server : server_infos) {
auto dk = dht::decorate_key(*_s, partition_key::from_single_value(*schema(), data_value(server.name).serialize_nonnull()));
if (!this_shard_owns(dk)) {
continue;
}
mutation m(schema(), std::move(dk));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
set_cell(cr, "protocol", server.protocol);
set_cell(cr, "protocol_version", server.protocol_version);
std::vector<data_value> addresses(server.listen_addresses.begin(), server.listen_addresses.end());
set_cell(cr, "listen_addresses", make_list_value(schema()->get_column_definition("listen_addresses")->type, std::move(addresses)));
mutation_sink(std::move(m));
}
}
};
class runtime_info_table : public memtable_filling_virtual_table {
private:
sharded<replica::database>& _db;
service::storage_service& _ss;
std::optional<dht::decorated_key> _generic_key;
private:
std::optional<dht::decorated_key> maybe_make_key(sstring key) {
auto dk = dht::decorate_key(*_s, partition_key::from_single_value(*schema(), data_value(std::move(key)).serialize_nonnull()));
if (this_shard_owns(dk)) {
return dk;
}
return std::nullopt;
}
void do_add_partition(std::function<void(mutation)>& mutation_sink, dht::decorated_key key, std::vector<std::pair<sstring, sstring>> rows) {
mutation m(schema(), std::move(key));
for (auto&& [ckey, cvalue] : rows) {
row& cr = m.partition().clustered_row(*schema(), clustering_key::from_single_value(*schema(), data_value(std::move(ckey)).serialize_nonnull())).cells();
set_cell(cr, "value", std::move(cvalue));
}
mutation_sink(std::move(m));
}
void add_partition(std::function<void(mutation)>& mutation_sink, sstring key, sstring value) {
if (_generic_key) {
do_add_partition(mutation_sink, *_generic_key, {{key, std::move(value)}});
}
}
void add_partition(std::function<void(mutation)>& mutation_sink, sstring key, std::initializer_list<std::pair<sstring, sstring>> rows) {
auto dk = maybe_make_key(std::move(key));
if (dk) {
do_add_partition(mutation_sink, std::move(*dk), std::move(rows));
}
}
future<> add_partition(std::function<void(mutation)>& mutation_sink, sstring key, std::function<future<sstring>()> value_producer) {
if (_generic_key) {
do_add_partition(mutation_sink, *_generic_key, {{key, co_await value_producer()}});
}
}
future<> add_partition(std::function<void(mutation)>& mutation_sink, sstring key, std::function<future<std::vector<std::pair<sstring, sstring>>>()> value_producer) {
auto dk = maybe_make_key(std::move(key));
if (dk) {
do_add_partition(mutation_sink, std::move(*dk), co_await value_producer());
}
}
template <typename T>
future<T> map_reduce_tables(std::function<T(replica::table&)> map, std::function<T(T, T)> reduce = std::plus<T>{}) {
class shard_reducer {
T _v{};
std::function<T(T, T)> _reduce;
public:
shard_reducer(std::function<T(T, T)> reduce) : _reduce(std::move(reduce)) { }
future<> operator()(T v) {
v = _reduce(_v, v);
return make_ready_future<>();
}
T get() && { return std::move(_v); }
};
co_return co_await _db.map_reduce(shard_reducer(reduce), [map, reduce] (replica::database& db) {
T val = {};
db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr<replica::table> table) {
val = reduce(val, map(*table));
});
return val;
});
}
template <typename T>
future<T> map_reduce_shards(std::function<T()> map, std::function<T(T, T)> reduce = std::plus<T>{}, T initial = {}) {
co_return co_await map_reduce(
std::views::iota(0u, smp::count),
[map] (shard_id shard) {
return smp::submit_to(shard, [map] {
return map();
});
},
initial,
reduce);
}
public:
explicit runtime_info_table(sharded<replica::database>& db, service::storage_service& ss)
: memtable_filling_virtual_table(build_schema())
, _db(db)
, _ss(ss) {
_shard_aware = true;
_generic_key = maybe_make_key("generic");
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "runtime_info");
return schema_builder(system_keyspace::NAME, "runtime_info", std::make_optional(id))
.with_column("group", utf8_type, column_kind::partition_key)
.with_column("item", utf8_type, column_kind::clustering_key)
.with_column("value", utf8_type)
.set_comment("Runtime system information.")
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
co_await add_partition(mutation_sink, "gossip_active", [this] () -> future<sstring> {
return _ss.is_gossip_running().then([] (bool running){
return format("{}", running);
});
});
co_await add_partition(mutation_sink, "load", [this] () -> future<sstring> {
return map_reduce_tables<int64_t>([] (replica::table& tbl) {
return tbl.get_stats().live_disk_space_used.on_disk;
}).then([] (int64_t load) {
return format("{}", load);
});
});
add_partition(mutation_sink, "uptime", format("{} seconds", std::chrono::duration_cast<std::chrono::seconds>(engine().uptime()).count()));
add_partition(mutation_sink, "trace_probability", format("{:.2}", tracing::tracing::get_local_tracing_instance().get_trace_probability()));
co_await add_partition(mutation_sink, "memory", [this] () {
struct stats {
// take the pre-reserved memory into account, as seastar only returns
// the stats of memory managed by the seastar allocator, but we instruct
// it to reserve addition memory for non-seastar allocator on per-shard
// basis.
uint64_t total = 0;
uint64_t free = 0;
static stats reduce(stats a, stats b) {
return stats{
a.total + b.total + db::config::wasm_udf_reserved_memory,
a.free + b.free};
};
};
return map_reduce_shards<stats>([] () {
const auto& s = memory::stats();
return stats{s.total_memory(), s.free_memory()};
}, stats::reduce, stats{}).then([] (stats s) {
return std::vector<std::pair<sstring, sstring>>{
{"total", format("{}", s.total)},
{"used", format("{}", s.total - s.free)},
{"free", format("{}", s.free)}};
});
});
co_await add_partition(mutation_sink, "memtable", [this] () {
struct stats {
uint64_t total = 0;
uint64_t free = 0;
uint64_t entries = 0;
static stats reduce(stats a, stats b) { return stats{a.total + b.total, a.free + b.free, a.entries + b.entries}; }
};
return map_reduce_tables<stats>([] (replica::table& t) {
logalloc::occupancy_stats s;
uint64_t partition_count = 0;
t.for_each_active_memtable([&] (replica::memtable& active_memtable) {
s += active_memtable.region().occupancy();
partition_count += active_memtable.partition_count();
});
return stats{s.total_space(), s.free_space(), partition_count};
}, stats::reduce).then([] (stats s) {
return std::vector<std::pair<sstring, sstring>>{
{"memory_total", format("{}", s.total)},
{"memory_used", format("{}", s.total - s.free)},
{"memory_free", format("{}", s.free)},
{"entries", format("{}", s.entries)}};
});
});
co_await add_partition(mutation_sink, "cache", [this] () {
struct stats {
uint64_t total = 0;
uint64_t free = 0;
uint64_t entries = 0;
uint64_t hits = 0;
uint64_t misses = 0;
utils::rate_moving_average hits_moving_average;
utils::rate_moving_average requests_moving_average;
static stats reduce(stats a, stats b) {
return stats{
a.total + b.total,
a.free + b.free,
a.entries + b.entries,
a.hits + b.hits,
a.misses + b.misses,
a.hits_moving_average + b.hits_moving_average,
a.requests_moving_average + b.requests_moving_average};
}
};
return _db.map_reduce0([] (replica::database& db) {
stats res{};
auto occupancy = db.row_cache_tracker().region().occupancy();
res.total = occupancy.total_space();
res.free = occupancy.free_space();
res.entries = db.row_cache_tracker().partitions();
db.get_tables_metadata().for_each_table([&] (table_id id, lw_shared_ptr<replica::table> t) {
auto& cache_stats = t->get_row_cache().stats();
res.hits += cache_stats.hits.count();
res.misses += cache_stats.misses.count();
res.hits_moving_average += cache_stats.hits.rate();
res.requests_moving_average += (cache_stats.hits.rate() + cache_stats.misses.rate());
});
return res;
}, stats{}, stats::reduce).then([] (stats s) {
return std::vector<std::pair<sstring, sstring>>{
{"memory_total", format("{}", s.total)},
{"memory_used", format("{}", s.total - s.free)},
{"memory_free", format("{}", s.free)},
{"entries", format("{}", s.entries)},
{"hits", format("{}", s.hits)},
{"misses", format("{}", s.misses)},
{"hit_rate_total", format("{:.2}", static_cast<double>(s.hits) / static_cast<double>(s.hits + s.misses))},
{"hit_rate_recent", format("{:.2}", s.hits_moving_average.mean_rate)},
{"requests_total", format("{}", s.hits + s.misses)},
{"requests_recent", format("{}", static_cast<uint64_t>(s.requests_moving_average.mean_rate))}};
});
});
co_await add_partition(mutation_sink, "incremental_backup_enabled", [this] () {
return _db.map_reduce0([] (replica::database& db) {
return std::ranges::any_of(db.get_keyspaces(), [] (const auto& id_and_ks) {
return id_and_ks.second.incremental_backups_enabled();
});
}, false, std::logical_or{}).then([] (bool res) -> sstring {
return res ? "true" : "false";
});
});
}
};
class versions_table : public memtable_filling_virtual_table {
public:
explicit versions_table()
: memtable_filling_virtual_table(build_schema()) {
_shard_aware = false;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "versions");
return schema_builder(system_keyspace::NAME, "versions", std::make_optional(id))
.with_column("key", utf8_type, column_kind::partition_key)
.with_column("version", utf8_type)
.with_column("build_mode", utf8_type)
.with_column("build_id", utf8_type)
.set_comment("Version information.")
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
mutation m(schema(), partition_key::from_single_value(*schema(), data_value("local").serialize_nonnull()));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
set_cell(cr, "version", scylla_version());
set_cell(cr, "build_mode", scylla_build_mode());
set_cell(cr, "build_id", get_build_id());
mutation_sink(std::move(m));
return make_ready_future<>();
}
};
class db_config_table final : public streaming_virtual_table {
db::config& _cfg;
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "config");
return schema_builder(system_keyspace::NAME, "config", std::make_optional(id))
.with_column("name", utf8_type, column_kind::partition_key)
.with_column("type", utf8_type)
.with_column("source", utf8_type)
.with_column("value", utf8_type)
.with_hash_version()
.build();
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct config_entry {
dht::decorated_key key;
std::string_view type;
sstring source;
sstring value;
};
std::vector<config_entry> cfg;
for (auto&& cfg_ref : _cfg.values()) {
auto&& c = cfg_ref.get();
dht::decorated_key dk = dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(c.name()).serialize_nonnull()));
if (this_shard_owns(dk)) {
cfg.emplace_back(config_entry{ std::move(dk), c.type_name(), c.source_name(), c.value_as_json()._res });
}
}
std::ranges::sort(cfg, dht::ring_position_less_comparator(*_s), std::mem_fn(&config_entry::key));
for (auto&& c : cfg) {
co_await result.emit_partition_start(c.key);
mutation m(schema(), c.key);
clustering_row cr(clustering_key::make_empty());
set_cell(cr.cells(), "type", c.type);
set_cell(cr.cells(), "source", c.source);
set_cell(cr.cells(), "value", c.value);
co_await result.emit_row(std::move(cr));
co_await result.emit_partition_end();
}
}
virtual future<> apply(const frozen_mutation& fm) override {
const mutation m = fm.unfreeze(_s);
query::result_set rs(m);
auto name = rs.row(0).get<sstring>("name");
auto value = rs.row(0).get<sstring>("value");
if (!_cfg.enable_cql_config_updates()) {
return virtual_table::apply(fm); // will return back exceptional future
}
if (!name) {
return make_exception_future<>(virtual_table_update_exception("option name is required"));
}
if (!value) {
return make_exception_future<>(virtual_table_update_exception("option value is required"));
}
if (rs.row(0).cells().contains("type")) {
return make_exception_future<>(virtual_table_update_exception("option type is immutable"));
}
if (rs.row(0).cells().contains("source")) {
return make_exception_future<>(virtual_table_update_exception("option source is not updateable"));
}
return smp::submit_to(0, [&cfg = _cfg, name = std::move(*name), value = std::move(*value)] () mutable -> future<> {
for (auto& c_ref : cfg.values()) {
auto& c = c_ref.get();
if (c.name() == name) {
std::exception_ptr ex;
try {
if (co_await c.set_value_on_all_shards(value, utils::config_file::config_source::CQL)) {
co_return;
} else {
ex = std::make_exception_ptr(virtual_table_update_exception("option is not live-updateable"));
}
} catch (boost::bad_lexical_cast&) {
ex = std::make_exception_ptr(virtual_table_update_exception("cannot parse option value"));
}
co_await coroutine::return_exception_ptr(std::move(ex));
}
}
co_await coroutine::return_exception(virtual_table_update_exception("no such option"));
});
}
public:
explicit db_config_table(db::config& cfg)
: streaming_virtual_table(build_schema())
, _cfg(cfg)
{
_shard_aware = true;
}
};
class clients_table : public streaming_virtual_table {
service::storage_service& _ss;
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "clients");
return schema_builder(system_keyspace::NAME, "clients", std::make_optional(id))
.with_column("address", inet_addr_type, column_kind::partition_key)
.with_column("port", int32_type, column_kind::clustering_key)
.with_column("client_type", utf8_type, column_kind::clustering_key)
.with_column("shard_id", int32_type)
.with_column("connection_stage", utf8_type)
.with_column("driver_name", utf8_type)
.with_column("driver_version", utf8_type)
.with_column("hostname", utf8_type)
.with_column("protocol_version", int32_type)
.with_column("ssl_cipher_suite", utf8_type)
.with_column("ssl_enabled", boolean_type)
.with_column("ssl_protocol", utf8_type)
.with_column("username", utf8_type)
.with_column("scheduling_group", utf8_type)
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(net::inet_address ip) {
return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(ip).serialize_nonnull()));
}
clustering_key make_clustering_key(int32_t port, sstring clt) {
return clustering_key::from_exploded(*_s, {
data_value(port).serialize_nonnull(),
data_value(clt).serialize_nonnull()
});
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
// Collect
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
using shard_client_data = std::vector<client_data_vec>;
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
cd_vec.resize(smp::count);
auto servers = co_await _ss.container().invoke_on(0, [] (auto& ss) { return ss.protocol_servers(); });
co_await smp::invoke_on_all([&cd_vec_ = cd_vec, &servers_ = servers] () -> future<> {
auto& cd_vec = cd_vec_;
auto& servers = servers_;
auto scd = std::make_unique<shard_client_data>();
for (const auto& ps : servers) {
client_data_vec cds = co_await ps->get_client_data();
if (cds.size() != 0) {
scd->emplace_back(std::move(cds));
}
}
cd_vec[this_shard_id()] = make_foreign(std::move(scd));
});
// Partition
struct decorated_ip {
dht::decorated_key key;
net::inet_address ip;
struct compare {
dht::ring_position_less_comparator less;
explicit compare(const class schema& s) : less(s) {}
bool operator()(const decorated_ip& a, const decorated_ip& b) const {
return less(a.key, b.key);
}
};
};
decorated_ip::compare cmp(*_s);
std::set<decorated_ip, decorated_ip::compare> ips(cmp);
std::unordered_map<net::inet_address, client_data_vec> cd_map;
for (unsigned i = 0; i < smp::count; i++) {
for (auto&& ps_cdc : *cd_vec[i]) {
for (auto&& cd : ps_cdc) {
if (cd_map.contains(cd->ip)) {
cd_map[cd->ip].emplace_back(std::move(cd));
} else {
dht::decorated_key key = make_partition_key(cd->ip);
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
ips.insert(decorated_ip{std::move(key), cd->ip});
cd_map[cd->ip].emplace_back(std::move(cd));
}
}
co_await coroutine::maybe_yield();
}
}
}
// Emit
for (const auto& dip : ips) {
co_await result.emit_partition_start(dip.key);
auto& clients = cd_map[dip.ip];
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
return a->port < b->port || a->client_type_str() < b->client_type_str();
});
for (const auto& cd : clients) {
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
set_cell(cr.cells(), "shard_id", cd->shard_id);
set_cell(cr.cells(), "connection_stage", cd->stage_str());
if (cd->driver_name) {
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
}
if (cd->driver_version) {
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
}
if (cd->hostname) {
set_cell(cr.cells(), "hostname", *cd->hostname);
}
if (cd->protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
}
if (cd->ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
}
if (cd->ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
}
if (cd->ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
}
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
if (cd->scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
}
auto map_type = map_type_impl::get_instance(
utf8_type,
utf8_type,
false
);
auto prepare_client_options = [] (const auto& client_options) {
map_type_impl::native_type tmp;
for (auto& co: client_options) {
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
tmp.push_back(std::move(map_element));
}
return tmp;
};
set_cell(cr.cells(), "client_options",
make_map_value(map_type, prepare_client_options(cd->client_options)));
co_await result.emit_row(std::move(cr));
}
co_await result.emit_partition_end();
}
}
public:
clients_table(service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _ss(ss)
{
_shard_aware = true;
}
};
// Shows the current state of each Raft group.
// Currently it shows only the configuration.
// In the future we plan to add additional columns with more information.
class raft_state_table : public streaming_virtual_table {
private:
sharded<service::raft_group_registry>& _raft_gr;
public:
raft_state_table(sharded<service::raft_group_registry>& raft_gr)
: streaming_virtual_table(build_schema())
, _raft_gr(raft_gr) {
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_gid {
raft::group_id gid;
dht::decorated_key key;
unsigned shard;
};
auto groups_and_shards = co_await _raft_gr.map([] (service::raft_group_registry& raft_gr) {
return std::pair{raft_gr.all_groups(), this_shard_id()};
});
std::vector<decorated_gid> decorated_gids;
for (auto& [groups, shard]: groups_and_shards) {
for (auto& gid: groups) {
decorated_gids.push_back(decorated_gid{gid, make_partition_key(gid), shard});
}
}
// Must return partitions in token order.
std::sort(decorated_gids.begin(), decorated_gids.end(), [less = dht::ring_position_less_comparator(*_s)]
(const decorated_gid& l, const decorated_gid& r) { return less(l.key, r.key); });
for (auto& [gid, dk, shard]: decorated_gids) {
if (!contains_key(qr.partition_range(), dk)) {
continue;
}
auto cfg_opt = co_await _raft_gr.invoke_on(shard,
[gid=gid] (service::raft_group_registry& raft_gr) -> std::optional<raft::configuration> {
// Be ready for a group to disappear while we're querying.
auto* srv = raft_gr.find_server(gid);
if (!srv) {
return std::nullopt;
}
// FIXME: the configuration returned here is obtained from raft::fsm, it may not be
// persisted yet, so this is not 100% correct. It may happen that we crash after
// a config entry is appended in-memory in fsm but before it's persisted. It would be
// incorrect to return the configuration observed during this window - after restart
// the configuration would revert to the previous one. Perhaps this is unlikely to
// happen in practice, but for correctness we should add a way of querying the
// latest persisted configuration.
return srv->get_configuration();
});
if (!cfg_opt) {
continue;
}
co_await result.emit_partition_start(dk);
// List current config first, because 'C' < 'P' and the disposition
// (ascii_type, 'CURRENT' vs 'PREVIOUS') is the first column in the clustering key.
co_await emit_member_set(result, "CURRENT", cfg_opt->current);
co_await emit_member_set(result, "PREVIOUS", cfg_opt->previous);
co_await result.emit_partition_end();
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "raft_state");
return schema_builder(system_keyspace::NAME, "raft_state", std::make_optional(id))
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS'
.with_column("server_id", uuid_type, column_kind::clustering_key)
.with_column("can_vote", boolean_type)
.set_comment("Currently operating RAFT configuration")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(raft::group_id gid) {
// Make sure to use timeuuid_native_type so comparisons are done correctly
// (we must emit partitions in the correct token order).
return dht::decorate_key(*_s, partition_key::from_single_value(
*_s, data_value(timeuuid_native_type{gid.uuid()}).serialize_nonnull()));
}
clustering_key make_clustering_key(std::string_view disposition, raft::server_id id) {
return clustering_key::from_exploded(*_s, {
data_value(disposition).serialize_nonnull(),
data_value(id.uuid()).serialize_nonnull()
});
}
future<> emit_member_set(result_collector& result, std::string_view disposition,
const raft::config_member_set& set) {
// Must sort servers in clustering order (i.e. according to their IDs).
// This is how `config_member::operator<` works so no need for custom comparator.
std::vector<raft::config_member> members{set.begin(), set.end()};
std::sort(members.begin(), members.end());
for (auto& member: members) {
clustering_row cr{make_clustering_key(disposition, member.addr.id)};
set_cell(cr.cells(), "can_vote", member.can_vote);
co_await result.emit_row(std::move(cr));
}
}
};
/// Base class for virtual tables which are supposed to serve data from group0 leader.
/// If queried on a different node, read will be redirected to the leader.
/// Implementations need to override execute_on_leader(), which will be executed
/// on shard0 of current group0 leader.
///
/// Current implementation is suitable for tables which are relatively small as all
/// data is materialized in memory and queried in one page.
class group0_virtual_table : public memtable_filling_virtual_table {
private:
sharded<service::raft_group_registry>& _raft_gr;
sharded<netw::messaging_service>& _ms;
public:
group0_virtual_table(schema_ptr s,
sharded<service::raft_group_registry>& raft_gr,
sharded<netw::messaging_service>& ms)
: memtable_filling_virtual_table(s)
, _raft_gr(raft_gr)
, _ms(ms) {
}
future<raft::server_id> get_leader(const reader_permit& permit) const {
while (!_raft_gr.local().group0().current_leader()) {
abort_on_expiry aoe(permit.timeout());
reader_permit::awaits_guard ag(permit);
co_await _raft_gr.local().group0().read_barrier(&aoe.abort_source());
}
co_return _raft_gr.local().group0().current_leader();
}
future<bool> is_leader(const reader_permit& permit) const {
auto leader = co_await get_leader(permit);
co_return leader == _raft_gr.local().get_my_raft_id();
}
future<> redirect_to_leader(std::function<void(mutation)> mutation_sink, reader_permit permit) {
auto leader = co_await get_leader(permit);
auto cmd = query::read_command(_s->id(),
_s->version(),
partition_slice_builder(*_s).build(),
query::max_result_size(256*1024*1024), // Sanity limit
query::tombstone_limit::max,
query::row_limit(query::max_rows),
query::partition_limit(query::max_partitions));
reader_permit::awaits_guard ag(permit);
auto&& [result, hit_rate, opt_exception] = co_await ser::storage_proxy_rpc_verbs::send_read_mutation_data(&_ms.local(),
locator::host_id(leader.uuid()),
permit.timeout(),
cmd,
query::full_partition_range,
{});
if (opt_exception.has_value() && *opt_exception) {
co_await coroutine::return_exception_ptr((*opt_exception).into_exception_ptr());
}
for (auto&& p : result.partitions()) {
mutation_sink(p.mut().unfreeze(_s));
co_await coroutine::maybe_yield();
}
co_return;
}
// This function is executed on the node which was a group0 leader at some point since the query started.
// The node may not be the leader anymore by the time it is executed.
// Only executed on shard 0.
virtual future<> execute_on_leader(std::function<void(mutation)> mutation_sink, reader_permit) = 0;
future<> execute(std::function<void(mutation)> mutation_sink, reader_permit permit) override {
if (this_shard_id() != 0) {
co_return;
}
if (co_await is_leader(permit)) {
co_await execute_on_leader(std::move(mutation_sink), std::move(permit));
co_return;
}
co_await redirect_to_leader(std::move(mutation_sink), std::move(permit));
}
};
class load_per_node : public group0_virtual_table {
private:
sharded<service::tablet_allocator>& _talloc;
sharded<replica::database>& _db;
sharded<gms::gossiper>& _gossiper;
public:
load_per_node(sharded<service::tablet_allocator>& talloc,
sharded<replica::database>& db,
sharded<service::raft_group_registry>& raft_gr,
sharded<netw::messaging_service>& ms,
sharded<gms::gossiper>& gossiper)
: group0_virtual_table(build_schema(), raft_gr, ms)
, _talloc(talloc)
, _db(db)
, _gossiper(gossiper)
{ }
future<> execute_on_leader(std::function<void(mutation)> mutation_sink, reader_permit permit) override {
auto stats = _talloc.local().get_load_stats();
while (!stats) {
// Wait for stats to be refreshed by topology coordinator
{
abort_on_expiry aoe(permit.timeout());
reader_permit::awaits_guard ag(permit);
co_await seastar::sleep_abortable(std::chrono::milliseconds(200), aoe.abort_source());
}
if (!co_await is_leader(permit)) {
co_await redirect_to_leader(std::move(mutation_sink), std::move(permit));
co_return;
}
stats = _talloc.local().get_load_stats();
}
auto tm = _db.local().get_token_metadata_ptr();
auto target_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
locator::load_sketch load(tm);
co_await load.populate();
tm->get_topology().for_each_node([&] (const auto& node) {
auto host = node.host_id();
mutation m(schema(), make_partition_key(host));
auto& r = m.partition().clustered_row(*schema(), clustering_key::make_empty());
set_cell(r.cells(), "dc", node.dc());
set_cell(r.cells(), "rack", node.rack());
set_cell(r.cells(), "up", _gossiper.local().is_alive(host));
if (auto ip = _gossiper.local().get_address_map().find(host)) {
set_cell(r.cells(), "ip", data_value(inet_address(*ip)));
}
set_cell(r.cells(), "tablets_allocated", load.get_load(host));
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_shard_load(host))));
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_load(host) * target_tablet_size)));
if (stats && stats->capacity.contains(host)) {
auto capacity = stats->capacity.at(host);
set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity)));
auto utilization = load.get_allocated_utilization(host, *stats, target_tablet_size);
if (utilization) {
set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization)));
}
}
mutation_sink(m);
});
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "load_per_node");
return schema_builder(system_keyspace::NAME, "load_per_node", std::make_optional(id))
.with_column("node", uuid_type, column_kind::partition_key)
.with_column("dc", utf8_type)
.with_column("rack", utf8_type)
.with_column("ip", inet_addr_type)
.with_column("up", boolean_type)
.with_column("tablets_allocated", long_type)
.with_column("tablets_allocated_per_shard", double_type)
.with_column("storage_capacity", long_type)
.with_column("storage_allocated_load", long_type)
.with_column("storage_allocated_utilization", double_type)
.with_sharder(1, 0) // shard0-only
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(host_id host) {
return dht::decorate_key(*_s, partition_key::from_single_value(
*_s, data_value(host.uuid()).serialize_nonnull()));
}
};
class tablet_sizes : public group0_virtual_table {
private:
sharded<service::tablet_allocator>& _talloc;
sharded<replica::database>& _db;
public:
tablet_sizes(sharded<service::tablet_allocator>& talloc,
sharded<replica::database>& db,
sharded<service::raft_group_registry>& raft_gr,
sharded<netw::messaging_service>& ms)
: group0_virtual_table(build_schema(), raft_gr, ms)
, _talloc(talloc)
, _db(db)
{ }
future<> execute_on_leader(std::function<void(mutation)> mutation_sink, reader_permit permit) override {
auto stats = _talloc.local().get_load_stats();
while (!stats) {
// Wait for stats to be refreshed by topology coordinator
{
abort_on_expiry aoe(permit.timeout());
reader_permit::awaits_guard ag(permit);
co_await seastar::sleep_abortable(std::chrono::milliseconds(200), aoe.abort_source());
}
if (!co_await is_leader(permit)) {
co_await redirect_to_leader(std::move(mutation_sink), std::move(permit));
co_return;
}
stats = _talloc.local().get_load_stats();
}
auto tm = _db.local().get_token_metadata_ptr();
auto prepare_replica_sizes = [] (const std::unordered_map<host_id, uint64_t>& replica_sizes) {
map_type_impl::native_type tmp;
for (auto& r: replica_sizes) {
auto replica = r.first.uuid();
int64_t tablet_size = int64_t(r.second);
auto map_element = std::make_pair<data_value, data_value>(data_value(replica), data_value(tablet_size));
tmp.push_back(std::move(map_element));
}
return tmp;
};
auto prepare_missing_replica = [] (const std::unordered_set<host_id>& missing_replicas) {
set_type_impl::native_type tmp;
for (auto& r: missing_replicas) {
tmp.push_back(data_value(r.uuid()));
}
return tmp;
};
auto map_type = map_type_impl::get_instance(uuid_type, long_type, false);
auto set_type = set_type_impl::get_instance(uuid_type, false);
for (auto&& [table, tmap] : tm->tablets().all_tables_ungrouped()) {
mutation m(schema(), make_partition_key(table));
co_await tmap->for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) -> future<> {
auto trange = tmap->get_token_range(tid);
int64_t last_token = trange.end()->value().raw();
auto& r = m.partition().clustered_row(*schema(), clustering_key::from_single_value(*schema(), data_value(last_token).serialize_nonnull()));
const range_based_tablet_id rb_tid {table, trange};
std::unordered_map<host_id, uint64_t> replica_sizes;
std::unordered_set<host_id> missing_replicas;
for (auto& replica : tinfo.replicas) {
auto tablet_size_opt = stats->get_tablet_size(replica.host, rb_tid);
if (tablet_size_opt) {
replica_sizes[replica.host] = *tablet_size_opt;
} else {
missing_replicas.insert(replica.host);
}
}
set_cell(r.cells(), "replicas", make_map_value(map_type, prepare_replica_sizes(replica_sizes)));
set_cell(r.cells(), "missing_replicas", make_set_value(set_type, prepare_missing_replica(missing_replicas)));
return make_ready_future<>();
});
mutation_sink(m);
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "tablet_sizes");
return schema_builder(system_keyspace::NAME, "tablet_sizes", std::make_optional(id))
.with_column("table_id", uuid_type, column_kind::partition_key)
.with_column("last_token", long_type, column_kind::clustering_key)
.with_column("replicas", map_type_impl::get_instance(uuid_type, long_type, false))
.with_column("missing_replicas", set_type_impl::get_instance(uuid_type, false))
.with_sharder(1, 0) // shard0-only
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(table_id table) {
return dht::decorate_key(*_s, partition_key::from_single_value(
*_s, data_value(table.uuid()).serialize_nonnull()));
}
};
class cdc_timestamps_table : public streaming_virtual_table {
private:
replica::database& _db;
service::storage_service& _ss;
public:
cdc_timestamps_table(replica::database& db, service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _db(db)
, _ss(ss)
{
_shard_aware = true;
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_table_name {
table_id table;
dht::decorated_key key;
};
std::vector<decorated_table_name> tables;
for (auto table : _ss.get_tables_with_cdc_tablet_streams()) {
auto& cf = _db.find_column_family(table);
auto base_schema = cdc::get_base_table(_db, *cf.schema());
auto dk = make_partition_key(base_schema->ks_name(), base_schema->cf_name());
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) {
continue;
}
tables.emplace_back(decorated_table_name{table, std::move(dk)});
}
std::ranges::sort(tables, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_table_name::key));
for (const auto& [table, dk] : tables) {
co_await result.emit_partition_start(dk);
auto created_at = api::new_timestamp();
co_await _ss.query_cdc_timestamps(table, false, [&] (db_clock::time_point ts) -> future<> {
clustering_row cr(make_clustering_key(ts));
cr.apply(row_marker(created_at));
return result.emit_row(std::move(cr));
});
co_await result.emit_partition_end();
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "cdc_timestamps");
return schema_builder(system_keyspace::NAME, "cdc_timestamps", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::partition_key)
.with_column("timestamp", reversed_type_impl::get_instance(timestamp_type), column_kind::clustering_key)
.set_comment("CDC generations for tablets-based tables")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const std::string& ks_name, const std::string& cf_name) {
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
data_value(ks_name).serialize_nonnull(),
data_value(cf_name).serialize_nonnull()
}));
}
clustering_key make_clustering_key(db_clock::time_point ts) {
return clustering_key::from_single_value(*_s,
timestamp_type->decompose(ts)
);
}
};
class cdc_streams_table : public streaming_virtual_table {
private:
replica::database& _db;
service::storage_service& _ss;
public:
cdc_streams_table(replica::database& db, service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _db(db)
, _ss(ss)
{
_shard_aware = true;
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_table_name {
table_id table;
dht::decorated_key key;
};
std::vector<decorated_table_name> tables;
for (auto table : _ss.get_tables_with_cdc_tablet_streams()) {
auto& cf = _db.find_column_family(table);
auto base_schema = cdc::get_base_table(_db, *cf.schema());
auto dk = make_partition_key(base_schema->ks_name(), base_schema->cf_name());
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) {
continue;
}
tables.emplace_back(decorated_table_name{table, std::move(dk)});
}
std::ranges::sort(tables, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_table_name::key));
for (const auto& [table, dk] : tables) {
co_await result.emit_partition_start(dk);
auto created_at = api::new_timestamp();
auto emit_stream_set = [&] (db_clock::time_point ts, cdc::stream_state kind, const auto& sids) -> future<> {
for (const auto& sid : sids) {
clustering_row cr(make_clustering_key(ts, kind, sid));
cr.apply(row_marker(created_at));
co_await result.emit_row(std::move(cr));
}
};
// emit in clustering order
static_assert(int(cdc::stream_state::current) < int(cdc::stream_state::closed));
static_assert(int(cdc::stream_state::closed) < int(cdc::stream_state::opened));
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
co_await emit_stream_set(ts, cdc::stream_state::current, current);
co_await emit_stream_set(ts, cdc::stream_state::closed, diff.closed_streams);
co_await emit_stream_set(ts, cdc::stream_state::opened, diff.opened_streams);
});
co_await result.emit_partition_end();
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "cdc_streams");
return schema_builder(system_keyspace::NAME, "cdc_streams", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::partition_key)
.with_column("timestamp", timestamp_type, column_kind::clustering_key)
.with_column("stream_state", byte_type, column_kind::clustering_key)
.with_column("stream_id", bytes_type, column_kind::clustering_key)
.set_comment("CDC streams for each generation for tablets-based tables")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const std::string& ks_name, const std::string& cf_name) {
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
data_value(ks_name).serialize_nonnull(),
data_value(cf_name).serialize_nonnull()
}));
}
clustering_key make_clustering_key(db_clock::time_point ts, cdc::stream_state kind, const cdc::stream_id& sid) {
return clustering_key::from_exploded(*_s, {
timestamp_type->decompose(ts),
byte_type->decompose(std::to_underlying(kind)),
sid.to_bytes()
});
}
};
}
future<> initialize_virtual_tables(
sharded<replica::database>& dist_db, sharded<service::storage_service>& dist_ss,
sharded<gms::gossiper>& dist_gossiper, sharded<service::raft_group_registry>& dist_raft_gr,
sharded<db::system_keyspace>& sys_ks,
sharded<service::tablet_allocator>& tablet_allocator,
sharded<netw::messaging_service>& ms,
db::config& cfg) {
auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry();
auto& virtual_tables = *virtual_tables_registry;
auto& db = dist_db.local();
auto& ss = dist_ss.local();
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
auto schema = tbl->schema();
virtual_tables[schema->id()] = std::move(tbl);
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
auto& cf = db.find_column_family(schema);
cf.mark_ready_for_writes(nullptr);
auto& vt = virtual_tables[schema->id()];
cf.set_virtual_reader(vt->as_mutation_source());
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
};
// Add built-in virtual tables here.
co_await add_table(std::make_unique<cluster_status_table>(dist_ss, dist_gossiper));
co_await add_table(std::make_unique<token_ring_table>(db, ss));
co_await add_table(std::make_unique<snapshots_table>(dist_db));
co_await add_table(std::make_unique<protocol_servers_table>(ss));
co_await add_table(std::make_unique<runtime_info_table>(dist_db, ss));
co_await add_table(std::make_unique<versions_table>());
co_await add_table(std::make_unique<db_config_table>(cfg));
co_await add_table(std::make_unique<clients_table>(ss));
co_await add_table(std::make_unique<raft_state_table>(dist_raft_gr));
co_await add_table(std::make_unique<load_per_node>(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper));
co_await add_table(std::make_unique<tablet_sizes>(tablet_allocator, dist_db, dist_raft_gr, ms));
co_await add_table(std::make_unique<cdc_timestamps_table>(db, ss));
co_await add_table(std::make_unique<cdc_streams_table>(db, ss));
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
db.find_column_family(system_keyspace::v3::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
}
virtual_tables_registry::virtual_tables_registry() : unique_ptr(std::make_unique<virtual_tables_registry_impl>()) {
}
virtual_tables_registry::~virtual_tables_registry() = default;
} // namespace db