Files
scylladb/db/virtual_tables.cc
Piotr Smaron a31cb18324 db: fix UB in system.clients row sorting
The comparator used to sort per-IP client rows was not a strict-weak-ordering (it could return true in both directions for some pairs), which makes `std::ranges::sort` behavior undefined. A concrete pair that breaks it (and is realistic in system.clients):
a = (port=9042, client_type="cql")
b = (port=10000, client_type="alternator")
With the current comparator:
cmp(a,b) = (9042 < 10000) || ("cql" < "alternator") = true || false = true
cmp(b,a) = (10000 < 9042) || ("alternator" < "cql") = false || true = true
So both directions are true, meaning there is no valid ordering that sort can achieve.

The fix is to sort lexicographically by (port, client_type) to match the table's clustering key and ensure deterministic ordering.

Closes scylladb/scylladb#28844
2026-03-04 14:10:49 +03:00

1515 lines
66 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <algorithm>
#include <boost/algorithm/string.hpp>
#include <boost/lexical_cast.hpp>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/json/json_elements.hh>
#include <seastar/core/reactor.hh>
#include "cdc/generation_service.hh"
#include "cdc/log.hh"
#include "cdc/metadata.hh"
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "db/virtual_table.hh"
#include "partition_slice_builder.hh"
#include "db/virtual_tables.hh"
#include "db/size_estimates_virtual_reader.hh"
#include "db/view/build_progress_virtual_reader.hh"
#include "index/built_indexes_virtual_reader.hh"
#include "gms/gossiper.hh"
#include "mutation/frozen_mutation.hh"
#include "transport/protocol_server.hh"
#include "release.hh"
#include "replica/database.hh"
#include "schema/schema_builder.hh"
#include "service/raft/raft_group_registry.hh"
#include "service/storage_service.hh"
#include "service/tablet_allocator.hh"
#include "locator/load_sketch.hh"
#include "types/list.hh"
#include "types/types.hh"
#include "utils/build_id.hh"
#include "utils/log.hh"
#include "replica/exceptions.hh"
#include "service/paxos/paxos_state.hh"
#include "idl/storage_proxy.dist.hh"
using namespace locator;
namespace db {
namespace {
logging::logger vtlog("virtual_tables");
class cluster_status_table : public memtable_filling_virtual_table {
private:
sharded<service::storage_service>& _dist_ss;
sharded<gms::gossiper>& _dist_gossiper;
public:
cluster_status_table(sharded<service::storage_service>& ss, sharded<gms::gossiper>& g)
: memtable_filling_virtual_table(build_schema())
, _dist_ss(ss), _dist_gossiper(g) {}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "cluster_status");
return schema_builder(system_keyspace::NAME, "cluster_status", std::make_optional(id))
.with_column("peer", inet_addr_type, column_kind::partition_key)
.with_column("dc", utf8_type)
.with_column("rack", utf8_type)
.with_column("up", boolean_type)
.with_column("draining", boolean_type)
.with_column("excluded", boolean_type)
.with_column("status", utf8_type)
.with_column("load", utf8_type)
.with_column("tokens", int32_type)
.with_column("owns", float_type)
.with_column("host_id", uuid_type)
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
auto muts = co_await _dist_ss.invoke_on(0, [this] (service::storage_service& ss) -> future<utils::chunked_vector<frozen_mutation>> {
auto& gossiper = _dist_gossiper.local();
auto ownership = co_await ss.get_ownership();
const locator::token_metadata& tm = ss.get_token_metadata();
utils::chunked_vector<frozen_mutation> muts;
muts.reserve(gossiper.num_endpoints());
gossiper.for_each_endpoint_state([&] (const gms::endpoint_state& eps) {
static thread_local auto s = build_schema();
auto endpoint = eps.get_host_id();
mutation m(s, partition_key::from_single_value(*s, data_value(eps.get_ip()).serialize_nonnull()));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
auto hostid = eps.get_host_id();
set_cell(cr, "up", gossiper.is_alive(hostid));
if (gossiper.is_shutdown(endpoint)) {
set_cell(cr, "status", gossiper.get_gossip_status(endpoint));
} else {
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(hostid))));
}
set_cell(cr, "load", gossiper.get_application_state_value(endpoint, gms::application_state::LOAD));
set_cell(cr, "host_id", hostid.uuid());
if (tm.get_topology().has_node(hostid)) {
// Not all entries in gossiper are present in the topology
auto& node = tm.get_topology().get_node(hostid);
sstring dc = node.dc_rack().dc;
sstring rack = node.dc_rack().rack;
set_cell(cr, "dc", dc);
set_cell(cr, "rack", rack);
set_cell(cr, "draining", node.is_draining());
set_cell(cr, "excluded", node.is_excluded());
}
if (ownership.contains(eps.get_ip())) {
set_cell(cr, "owns", ownership[eps.get_ip()]);
}
set_cell(cr, "tokens", int32_t(tm.get_tokens(hostid).size()));
muts.push_back(freeze(std::move(m)));
});
co_return muts;
});
for (auto& m : muts) {
mutation_sink(m.unfreeze(schema()));
}
}
};
class token_ring_table : public streaming_virtual_table {
private:
replica::database& _db;
service::storage_service& _ss;
public:
token_ring_table(replica::database& db, service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _db(db)
, _ss(ss)
{
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "token_ring");
return schema_builder(system_keyspace::NAME, "token_ring", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::clustering_key)
.with_column("start_token", utf8_type, column_kind::clustering_key)
.with_column("endpoint", inet_addr_type, column_kind::clustering_key)
.with_column("end_token", utf8_type)
.with_column("dc", utf8_type)
.with_column("rack", utf8_type)
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const sstring& name) {
return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(name).serialize_nonnull()));
}
clustering_key make_clustering_key(const sstring& table_name, sstring start_token, gms::inet_address host) {
return clustering_key::from_exploded(*_s, {
data_value(table_name).serialize_nonnull(),
data_value(start_token).serialize_nonnull(),
data_value(host).serialize_nonnull()
});
}
future<> emit_ring(result_collector& result, const dht::decorated_key& dk, const sstring& table_name, utils::chunked_vector<dht::token_range_endpoints> ranges) {
co_await result.emit_partition_start(dk);
std::ranges::sort(ranges, std::ranges::less(), std::mem_fn(&dht::token_range_endpoints::_start_token));
for (dht::token_range_endpoints& range : ranges) {
std::ranges::sort(range._endpoint_details, endpoint_details_cmp());
for (const dht::endpoint_details& detail : range._endpoint_details) {
clustering_row cr(make_clustering_key(table_name, range._start_token, detail._host));
set_cell(cr.cells(), "end_token", sstring(range._end_token));
set_cell(cr.cells(), "dc", sstring(detail._datacenter));
set_cell(cr.cells(), "rack", sstring(detail._rack));
co_await result.emit_row(std::move(cr));
}
}
co_await result.emit_partition_end();
}
struct endpoint_details_cmp {
bool operator()(const dht::endpoint_details& l, const dht::endpoint_details& r) const {
return inet_addr_type->less(
data_value(l._host).serialize_nonnull(),
data_value(r._host).serialize_nonnull());
}
};
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_keyspace_name {
sstring name;
dht::decorated_key key;
};
auto keyspace_names = _db.get_non_local_strategy_keyspaces()
| std::views::transform([this] (auto&& ks) {
return decorated_keyspace_name{ks, make_partition_key(ks)};
})
| std::ranges::to<std::vector>();
std::ranges::sort(keyspace_names, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_keyspace_name::key));
for (const decorated_keyspace_name& e : keyspace_names) {
auto&& dk = e.key;
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk) || !_db.has_keyspace(e.name)) {
continue;
}
if (_db.find_keyspace(e.name).get_replication_strategy().uses_tablets()) {
co_await _db.get_tables_metadata().for_each_table_gently([&, this] (table_id, lw_shared_ptr<replica::table> table) -> future<> {
if (table->schema()->ks_name() != e.name) {
co_return;
}
const auto& table_name = table->schema()->cf_name();
utils::chunked_vector<dht::token_range_endpoints> ranges = co_await _ss.describe_ring_for_table(e.name, table_name);
co_await emit_ring(result, e.key, table_name, std::move(ranges));
});
} else {
utils::chunked_vector<dht::token_range_endpoints> ranges = co_await _ss.describe_ring(e.name);
co_await emit_ring(result, e.key, "<ALL>", std::move(ranges));
}
}
}
};
class snapshots_table : public streaming_virtual_table {
sharded<replica::database>& _db;
public:
explicit snapshots_table(sharded<replica::database>& db)
: streaming_virtual_table(build_schema())
, _db(db)
{
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "snapshots");
return schema_builder(system_keyspace::NAME, "snapshots", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::clustering_key)
.with_column("snapshot_name", utf8_type, column_kind::clustering_key)
.with_column("live", long_type)
.with_column("total", long_type)
.set_comment("Lists all the snapshots along with their size, dropped tables are not part of the listing.")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const sstring& name) {
return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(name).serialize_nonnull()));
}
clustering_key make_clustering_key(sstring table_name, sstring snapshot_name) {
return clustering_key::from_exploded(*_s, {
data_value(std::move(table_name)).serialize_nonnull(),
data_value(std::move(snapshot_name)).serialize_nonnull()
});
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_keyspace_name {
schema_ptr s;
dht::decorated_key key;
auto operator<=>(const decorated_keyspace_name& o) const {
return key.tri_compare(*o.s, o.key);
}
};
using snapshots_by_tables_map = std::map<sstring, std::map<sstring, replica::table::snapshot_details>>;
using snapshots_by_keyspace_map = std::map<decorated_keyspace_name, snapshots_by_tables_map>;
snapshots_by_keyspace_map keyspace_snapshots;
auto snapshot_details = co_await _db.local().get_snapshot_details();
for (const auto& [snapshot_name, details] : snapshot_details) {
for (const auto& d : details) {
auto dk = make_partition_key(d.ks);
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) {
continue;
}
keyspace_snapshots[decorated_keyspace_name(_s, dk)][d.cf][snapshot_name] = d.details;
}
}
for (const auto& [ks_data, snapshots_by_tables] : keyspace_snapshots) {
co_await result.emit_partition_start(ks_data.key);
for (const auto& [table_name, snapshots] : snapshots_by_tables) {
for (auto& [snapshot_name, details] : snapshots) {
clustering_row cr(make_clustering_key(table_name, snapshot_name));
set_cell(cr.cells(), "live", details.live);
set_cell(cr.cells(), "total", details.total);
co_await result.emit_row(std::move(cr));
}
}
co_await result.emit_partition_end();
}
}
};
class protocol_servers_table : public memtable_filling_virtual_table {
private:
service::storage_service& _ss;
struct protocol_server_info {
sstring name;
sstring protocol;
sstring protocol_version;
std::vector<sstring> listen_addresses;
explicit protocol_server_info(protocol_server& s)
: name(s.name())
, protocol(s.protocol())
, protocol_version(s.protocol_version()) {
for (const auto& addr : s.listen_addresses()) {
listen_addresses.push_back(format("{}:{}", addr.addr(), addr.port()));
}
}
};
public:
explicit protocol_servers_table(service::storage_service& ss)
: memtable_filling_virtual_table(build_schema())
, _ss(ss) {
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "protocol_servers");
return schema_builder(system_keyspace::NAME, "protocol_servers", std::make_optional(id))
.with_column("name", utf8_type, column_kind::partition_key)
.with_column("protocol", utf8_type)
.with_column("protocol_version", utf8_type)
.with_column("listen_addresses", list_type_impl::get_instance(utf8_type, false))
.set_comment("Lists all client protocol servers and their status.")
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
// Servers are registered on shard 0 only
const auto server_infos = co_await smp::submit_to(0ul, [&ss = _ss.container()] {
return ss.local().protocol_servers()
| std::views::transform([] (protocol_server* s) { return protocol_server_info(*s); })
| std::ranges::to<std::vector>();
});
for (auto server : server_infos) {
auto dk = dht::decorate_key(*_s, partition_key::from_single_value(*schema(), data_value(server.name).serialize_nonnull()));
if (!this_shard_owns(dk)) {
continue;
}
mutation m(schema(), std::move(dk));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
set_cell(cr, "protocol", server.protocol);
set_cell(cr, "protocol_version", server.protocol_version);
std::vector<data_value> addresses(server.listen_addresses.begin(), server.listen_addresses.end());
set_cell(cr, "listen_addresses", make_list_value(schema()->get_column_definition("listen_addresses")->type, std::move(addresses)));
mutation_sink(std::move(m));
}
}
};
class runtime_info_table : public memtable_filling_virtual_table {
private:
sharded<replica::database>& _db;
service::storage_service& _ss;
std::optional<dht::decorated_key> _generic_key;
private:
std::optional<dht::decorated_key> maybe_make_key(sstring key) {
auto dk = dht::decorate_key(*_s, partition_key::from_single_value(*schema(), data_value(std::move(key)).serialize_nonnull()));
if (this_shard_owns(dk)) {
return dk;
}
return std::nullopt;
}
void do_add_partition(std::function<void(mutation)>& mutation_sink, dht::decorated_key key, std::vector<std::pair<sstring, sstring>> rows) {
mutation m(schema(), std::move(key));
for (auto&& [ckey, cvalue] : rows) {
row& cr = m.partition().clustered_row(*schema(), clustering_key::from_single_value(*schema(), data_value(std::move(ckey)).serialize_nonnull())).cells();
set_cell(cr, "value", std::move(cvalue));
}
mutation_sink(std::move(m));
}
void add_partition(std::function<void(mutation)>& mutation_sink, sstring key, sstring value) {
if (_generic_key) {
do_add_partition(mutation_sink, *_generic_key, {{key, std::move(value)}});
}
}
void add_partition(std::function<void(mutation)>& mutation_sink, sstring key, std::initializer_list<std::pair<sstring, sstring>> rows) {
auto dk = maybe_make_key(std::move(key));
if (dk) {
do_add_partition(mutation_sink, std::move(*dk), std::move(rows));
}
}
future<> add_partition(std::function<void(mutation)>& mutation_sink, sstring key, std::function<future<sstring>()> value_producer) {
if (_generic_key) {
do_add_partition(mutation_sink, *_generic_key, {{key, co_await value_producer()}});
}
}
future<> add_partition(std::function<void(mutation)>& mutation_sink, sstring key, std::function<future<std::vector<std::pair<sstring, sstring>>>()> value_producer) {
auto dk = maybe_make_key(std::move(key));
if (dk) {
do_add_partition(mutation_sink, std::move(*dk), co_await value_producer());
}
}
template <typename T>
future<T> map_reduce_tables(std::function<T(replica::table&)> map, std::function<T(T, T)> reduce = std::plus<T>{}) {
class shard_reducer {
T _v{};
std::function<T(T, T)> _reduce;
public:
shard_reducer(std::function<T(T, T)> reduce) : _reduce(std::move(reduce)) { }
future<> operator()(T v) {
v = _reduce(_v, v);
return make_ready_future<>();
}
T get() && { return std::move(_v); }
};
co_return co_await _db.map_reduce(shard_reducer(reduce), [map, reduce] (replica::database& db) {
T val = {};
db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr<replica::table> table) {
val = reduce(val, map(*table));
});
return val;
});
}
template <typename T>
future<T> map_reduce_shards(std::function<T()> map, std::function<T(T, T)> reduce = std::plus<T>{}, T initial = {}) {
co_return co_await map_reduce(
std::views::iota(0u, smp::count),
[map] (shard_id shard) {
return smp::submit_to(shard, [map] {
return map();
});
},
initial,
reduce);
}
public:
explicit runtime_info_table(sharded<replica::database>& db, service::storage_service& ss)
: memtable_filling_virtual_table(build_schema())
, _db(db)
, _ss(ss) {
_shard_aware = true;
_generic_key = maybe_make_key("generic");
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "runtime_info");
return schema_builder(system_keyspace::NAME, "runtime_info", std::make_optional(id))
.with_column("group", utf8_type, column_kind::partition_key)
.with_column("item", utf8_type, column_kind::clustering_key)
.with_column("value", utf8_type)
.set_comment("Runtime system information.")
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
co_await add_partition(mutation_sink, "gossip_active", [this] () -> future<sstring> {
return _ss.is_gossip_running().then([] (bool running){
return format("{}", running);
});
});
co_await add_partition(mutation_sink, "load", [this] () -> future<sstring> {
return map_reduce_tables<int64_t>([] (replica::table& tbl) {
return tbl.get_stats().live_disk_space_used.on_disk;
}).then([] (int64_t load) {
return format("{}", load);
});
});
add_partition(mutation_sink, "uptime", format("{} seconds", std::chrono::duration_cast<std::chrono::seconds>(engine().uptime()).count()));
add_partition(mutation_sink, "trace_probability", format("{:.2}", tracing::tracing::get_local_tracing_instance().get_trace_probability()));
co_await add_partition(mutation_sink, "memory", [this] () {
struct stats {
// take the pre-reserved memory into account, as seastar only returns
// the stats of memory managed by the seastar allocator, but we instruct
// it to reserve addition memory for non-seastar allocator on per-shard
// basis.
uint64_t total = 0;
uint64_t free = 0;
static stats reduce(stats a, stats b) {
return stats{
a.total + b.total + db::config::wasm_udf_reserved_memory,
a.free + b.free};
};
};
return map_reduce_shards<stats>([] () {
const auto& s = memory::stats();
return stats{s.total_memory(), s.free_memory()};
}, stats::reduce, stats{}).then([] (stats s) {
return std::vector<std::pair<sstring, sstring>>{
{"total", format("{}", s.total)},
{"used", format("{}", s.total - s.free)},
{"free", format("{}", s.free)}};
});
});
co_await add_partition(mutation_sink, "memtable", [this] () {
struct stats {
uint64_t total = 0;
uint64_t free = 0;
uint64_t entries = 0;
static stats reduce(stats a, stats b) { return stats{a.total + b.total, a.free + b.free, a.entries + b.entries}; }
};
return map_reduce_tables<stats>([] (replica::table& t) {
logalloc::occupancy_stats s;
uint64_t partition_count = 0;
t.for_each_active_memtable([&] (replica::memtable& active_memtable) {
s += active_memtable.region().occupancy();
partition_count += active_memtable.partition_count();
});
return stats{s.total_space(), s.free_space(), partition_count};
}, stats::reduce).then([] (stats s) {
return std::vector<std::pair<sstring, sstring>>{
{"memory_total", format("{}", s.total)},
{"memory_used", format("{}", s.total - s.free)},
{"memory_free", format("{}", s.free)},
{"entries", format("{}", s.entries)}};
});
});
co_await add_partition(mutation_sink, "cache", [this] () {
struct stats {
uint64_t total = 0;
uint64_t free = 0;
uint64_t entries = 0;
uint64_t hits = 0;
uint64_t misses = 0;
utils::rate_moving_average hits_moving_average;
utils::rate_moving_average requests_moving_average;
static stats reduce(stats a, stats b) {
return stats{
a.total + b.total,
a.free + b.free,
a.entries + b.entries,
a.hits + b.hits,
a.misses + b.misses,
a.hits_moving_average + b.hits_moving_average,
a.requests_moving_average + b.requests_moving_average};
}
};
return _db.map_reduce0([] (replica::database& db) {
stats res{};
auto occupancy = db.row_cache_tracker().region().occupancy();
res.total = occupancy.total_space();
res.free = occupancy.free_space();
res.entries = db.row_cache_tracker().partitions();
db.get_tables_metadata().for_each_table([&] (table_id id, lw_shared_ptr<replica::table> t) {
auto& cache_stats = t->get_row_cache().stats();
res.hits += cache_stats.hits.count();
res.misses += cache_stats.misses.count();
res.hits_moving_average += cache_stats.hits.rate();
res.requests_moving_average += (cache_stats.hits.rate() + cache_stats.misses.rate());
});
return res;
}, stats{}, stats::reduce).then([] (stats s) {
return std::vector<std::pair<sstring, sstring>>{
{"memory_total", format("{}", s.total)},
{"memory_used", format("{}", s.total - s.free)},
{"memory_free", format("{}", s.free)},
{"entries", format("{}", s.entries)},
{"hits", format("{}", s.hits)},
{"misses", format("{}", s.misses)},
{"hit_rate_total", format("{:.2}", static_cast<double>(s.hits) / static_cast<double>(s.hits + s.misses))},
{"hit_rate_recent", format("{:.2}", s.hits_moving_average.mean_rate)},
{"requests_total", format("{}", s.hits + s.misses)},
{"requests_recent", format("{}", static_cast<uint64_t>(s.requests_moving_average.mean_rate))}};
});
});
co_await add_partition(mutation_sink, "incremental_backup_enabled", [this] () {
return _db.map_reduce0([] (replica::database& db) {
return std::ranges::any_of(db.get_keyspaces(), [] (const auto& id_and_ks) {
return id_and_ks.second.incremental_backups_enabled();
});
}, false, std::logical_or{}).then([] (bool res) -> sstring {
return res ? "true" : "false";
});
});
}
};
class versions_table : public memtable_filling_virtual_table {
public:
explicit versions_table()
: memtable_filling_virtual_table(build_schema()) {
_shard_aware = false;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::VERSIONS);
return schema_builder(system_keyspace::NAME, system_keyspace::VERSIONS, std::make_optional(id))
.with_column("key", utf8_type, column_kind::partition_key)
.with_column("version", utf8_type)
.with_column("build_mode", utf8_type)
.with_column("build_id", utf8_type)
.set_comment("Version information.")
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
mutation m(schema(), partition_key::from_single_value(*schema(), data_value("local").serialize_nonnull()));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
set_cell(cr, "version", scylla_version());
set_cell(cr, "build_mode", scylla_build_mode());
set_cell(cr, "build_id", get_build_id());
mutation_sink(std::move(m));
return make_ready_future<>();
}
};
class db_config_table final : public streaming_virtual_table {
db::config& _cfg;
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "config");
return schema_builder(system_keyspace::NAME, "config", std::make_optional(id))
.with_column("name", utf8_type, column_kind::partition_key)
.with_column("type", utf8_type)
.with_column("source", utf8_type)
.with_column("value", utf8_type)
.with_hash_version()
.build();
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct config_entry {
dht::decorated_key key;
std::string_view type;
sstring source;
sstring value;
};
std::vector<config_entry> cfg;
for (auto&& cfg_ref : _cfg.values()) {
auto&& c = cfg_ref.get();
dht::decorated_key dk = dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(c.name()).serialize_nonnull()));
if (this_shard_owns(dk)) {
cfg.emplace_back(config_entry{ std::move(dk), c.type_name(), c.source_name(), c.value_as_json()._res });
}
}
std::ranges::sort(cfg, dht::ring_position_less_comparator(*_s), std::mem_fn(&config_entry::key));
for (auto&& c : cfg) {
co_await result.emit_partition_start(c.key);
mutation m(schema(), c.key);
clustering_row cr(clustering_key::make_empty());
set_cell(cr.cells(), "type", c.type);
set_cell(cr.cells(), "source", c.source);
set_cell(cr.cells(), "value", c.value);
co_await result.emit_row(std::move(cr));
co_await result.emit_partition_end();
}
}
virtual future<> apply(const frozen_mutation& fm) override {
const mutation m = fm.unfreeze(_s);
query::result_set rs(m);
auto name = rs.row(0).get<sstring>("name");
auto value = rs.row(0).get<sstring>("value");
if (!_cfg.enable_cql_config_updates()) {
return virtual_table::apply(fm); // will return back exceptional future
}
if (!name) {
return make_exception_future<>(virtual_table_update_exception("option name is required"));
}
if (!value) {
return make_exception_future<>(virtual_table_update_exception("option value is required"));
}
if (rs.row(0).cells().contains("type")) {
return make_exception_future<>(virtual_table_update_exception("option type is immutable"));
}
if (rs.row(0).cells().contains("source")) {
return make_exception_future<>(virtual_table_update_exception("option source is not updateable"));
}
return smp::submit_to(0, [&cfg = _cfg, name = std::move(*name), value = std::move(*value)] () mutable -> future<> {
for (auto& c_ref : cfg.values()) {
auto& c = c_ref.get();
if (c.name() == name) {
std::exception_ptr ex;
try {
if (co_await c.set_value_on_all_shards(value, utils::config_file::config_source::CQL)) {
co_return;
} else {
ex = std::make_exception_ptr(virtual_table_update_exception("option is not live-updateable"));
}
} catch (boost::bad_lexical_cast&) {
ex = std::make_exception_ptr(virtual_table_update_exception("cannot parse option value"));
}
co_await coroutine::return_exception_ptr(std::move(ex));
}
}
co_await coroutine::return_exception(virtual_table_update_exception("no such option"));
});
}
public:
explicit db_config_table(db::config& cfg)
: streaming_virtual_table(build_schema())
, _cfg(cfg)
{
_shard_aware = true;
}
};
class clients_table : public streaming_virtual_table {
service::storage_service& _ss;
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "clients");
return schema_builder(system_keyspace::NAME, "clients", std::make_optional(id))
.with_column("address", inet_addr_type, column_kind::partition_key)
.with_column("port", int32_type, column_kind::clustering_key)
.with_column("client_type", utf8_type, column_kind::clustering_key)
.with_column("shard_id", int32_type)
.with_column("connection_stage", utf8_type)
.with_column("driver_name", utf8_type)
.with_column("driver_version", utf8_type)
.with_column("hostname", utf8_type)
.with_column("protocol_version", int32_type)
.with_column("ssl_cipher_suite", utf8_type)
.with_column("ssl_enabled", boolean_type)
.with_column("ssl_protocol", utf8_type)
.with_column("username", utf8_type)
.with_column("scheduling_group", utf8_type)
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(net::inet_address ip) {
return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(ip).serialize_nonnull()));
}
clustering_key make_clustering_key(int32_t port, sstring clt) {
return clustering_key::from_exploded(*_s, {
data_value(port).serialize_nonnull(),
data_value(clt).serialize_nonnull()
});
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
// Collect
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
using shard_client_data = std::vector<client_data_vec>;
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
cd_vec.resize(smp::count);
auto servers = co_await _ss.container().invoke_on(0, [] (auto& ss) { return ss.protocol_servers(); });
co_await smp::invoke_on_all([&cd_vec_ = cd_vec, &servers_ = servers] () -> future<> {
auto& cd_vec = cd_vec_;
auto& servers = servers_;
auto scd = std::make_unique<shard_client_data>();
for (const auto& ps : servers) {
client_data_vec cds = co_await ps->get_client_data();
if (cds.size() != 0) {
scd->emplace_back(std::move(cds));
}
}
cd_vec[this_shard_id()] = make_foreign(std::move(scd));
});
// Partition
struct decorated_ip {
dht::decorated_key key;
net::inet_address ip;
struct compare {
dht::ring_position_less_comparator less;
explicit compare(const class schema& s) : less(s) {}
bool operator()(const decorated_ip& a, const decorated_ip& b) const {
return less(a.key, b.key);
}
};
};
decorated_ip::compare cmp(*_s);
std::set<decorated_ip, decorated_ip::compare> ips(cmp);
std::unordered_map<net::inet_address, client_data_vec> cd_map;
for (unsigned i = 0; i < smp::count; i++) {
for (auto&& ps_cdc : *cd_vec[i]) {
for (auto&& cd : ps_cdc) {
if (cd_map.contains(cd->ip)) {
cd_map[cd->ip].emplace_back(std::move(cd));
} else {
dht::decorated_key key = make_partition_key(cd->ip);
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
ips.insert(decorated_ip{std::move(key), cd->ip});
cd_map[cd->ip].emplace_back(std::move(cd));
}
}
co_await coroutine::maybe_yield();
}
}
}
// Emit
for (const auto& dip : ips) {
co_await result.emit_partition_start(dip.key);
auto& clients = cd_map[dip.ip];
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
if (a->port != b->port) {
return a->port < b->port;
}
return a->client_type_str() < b->client_type_str();
});
for (const auto& cd : clients) {
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
set_cell(cr.cells(), "shard_id", cd->shard_id);
set_cell(cr.cells(), "connection_stage", cd->stage_str());
if (cd->driver_name) {
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
}
if (cd->driver_version) {
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
}
if (cd->hostname) {
set_cell(cr.cells(), "hostname", *cd->hostname);
}
if (cd->protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
}
if (cd->ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
}
if (cd->ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
}
if (cd->ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
}
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
if (cd->scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
}
auto map_type = map_type_impl::get_instance(
utf8_type,
utf8_type,
false
);
auto prepare_client_options = [] (const auto& client_options) {
map_type_impl::native_type tmp;
for (auto& co: client_options) {
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
tmp.push_back(std::move(map_element));
}
return tmp;
};
set_cell(cr.cells(), "client_options",
make_map_value(map_type, prepare_client_options(cd->client_options)));
co_await result.emit_row(std::move(cr));
}
co_await result.emit_partition_end();
}
}
public:
clients_table(service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _ss(ss)
{
_shard_aware = true;
}
};
// Shows the current state of each Raft group.
// Currently it shows only the configuration.
// In the future we plan to add additional columns with more information.
class raft_state_table : public streaming_virtual_table {
private:
sharded<service::raft_group_registry>& _raft_gr;
public:
raft_state_table(sharded<service::raft_group_registry>& raft_gr)
: streaming_virtual_table(build_schema())
, _raft_gr(raft_gr) {
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_gid {
raft::group_id gid;
dht::decorated_key key;
unsigned shard;
};
auto groups_and_shards = co_await _raft_gr.map([] (service::raft_group_registry& raft_gr) {
return std::pair{raft_gr.all_groups(), this_shard_id()};
});
std::vector<decorated_gid> decorated_gids;
for (auto& [groups, shard]: groups_and_shards) {
for (auto& gid: groups) {
decorated_gids.push_back(decorated_gid{gid, make_partition_key(gid), shard});
}
}
// Must return partitions in token order.
std::sort(decorated_gids.begin(), decorated_gids.end(), [less = dht::ring_position_less_comparator(*_s)]
(const decorated_gid& l, const decorated_gid& r) { return less(l.key, r.key); });
for (auto& [gid, dk, shard]: decorated_gids) {
if (!contains_key(qr.partition_range(), dk)) {
continue;
}
auto cfg_opt = co_await _raft_gr.invoke_on(shard,
[gid=gid] (service::raft_group_registry& raft_gr) -> std::optional<raft::configuration> {
// Be ready for a group to disappear while we're querying.
auto* srv = raft_gr.find_server(gid);
if (!srv) {
return std::nullopt;
}
// FIXME: the configuration returned here is obtained from raft::fsm, it may not be
// persisted yet, so this is not 100% correct. It may happen that we crash after
// a config entry is appended in-memory in fsm but before it's persisted. It would be
// incorrect to return the configuration observed during this window - after restart
// the configuration would revert to the previous one. Perhaps this is unlikely to
// happen in practice, but for correctness we should add a way of querying the
// latest persisted configuration.
return srv->get_configuration();
});
if (!cfg_opt) {
continue;
}
co_await result.emit_partition_start(dk);
// List current config first, because 'C' < 'P' and the disposition
// (ascii_type, 'CURRENT' vs 'PREVIOUS') is the first column in the clustering key.
co_await emit_member_set(result, "CURRENT", cfg_opt->current);
co_await emit_member_set(result, "PREVIOUS", cfg_opt->previous);
co_await result.emit_partition_end();
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "raft_state");
return schema_builder(system_keyspace::NAME, "raft_state", std::make_optional(id))
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS'
.with_column("server_id", uuid_type, column_kind::clustering_key)
.with_column("can_vote", boolean_type)
.set_comment("Currently operating RAFT configuration")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(raft::group_id gid) {
// Make sure to use timeuuid_native_type so comparisons are done correctly
// (we must emit partitions in the correct token order).
return dht::decorate_key(*_s, partition_key::from_single_value(
*_s, data_value(timeuuid_native_type{gid.uuid()}).serialize_nonnull()));
}
clustering_key make_clustering_key(std::string_view disposition, raft::server_id id) {
return clustering_key::from_exploded(*_s, {
data_value(disposition).serialize_nonnull(),
data_value(id.uuid()).serialize_nonnull()
});
}
future<> emit_member_set(result_collector& result, std::string_view disposition,
const raft::config_member_set& set) {
// Must sort servers in clustering order (i.e. according to their IDs).
// This is how `config_member::operator<` works so no need for custom comparator.
std::vector<raft::config_member> members{set.begin(), set.end()};
std::sort(members.begin(), members.end());
for (auto& member: members) {
clustering_row cr{make_clustering_key(disposition, member.addr.id)};
set_cell(cr.cells(), "can_vote", member.can_vote);
co_await result.emit_row(std::move(cr));
}
}
};
/// Base class for virtual tables which are supposed to serve data from group0 leader.
/// If queried on a different node, read will be redirected to the leader.
/// Implementations need to override execute_on_leader(), which will be executed
/// on shard0 of current group0 leader.
///
/// Current implementation is suitable for tables which are relatively small as all
/// data is materialized in memory and queried in one page.
class group0_virtual_table : public memtable_filling_virtual_table {
private:
sharded<service::raft_group_registry>& _raft_gr;
sharded<netw::messaging_service>& _ms;
public:
group0_virtual_table(schema_ptr s,
sharded<service::raft_group_registry>& raft_gr,
sharded<netw::messaging_service>& ms)
: memtable_filling_virtual_table(s)
, _raft_gr(raft_gr)
, _ms(ms) {
}
future<raft::server_id> get_leader(const reader_permit& permit) const {
while (!_raft_gr.local().group0().current_leader()) {
abort_on_expiry aoe(permit.timeout());
reader_permit::awaits_guard ag(permit);
co_await _raft_gr.local().group0().read_barrier(&aoe.abort_source());
}
co_return _raft_gr.local().group0().current_leader();
}
future<bool> is_leader(const reader_permit& permit) const {
auto leader = co_await get_leader(permit);
co_return leader == _raft_gr.local().get_my_raft_id();
}
future<> redirect_to_leader(std::function<void(mutation)> mutation_sink, reader_permit permit) {
auto leader = co_await get_leader(permit);
auto cmd = query::read_command(_s->id(),
_s->version(),
partition_slice_builder(*_s).build(),
query::max_result_size(256*1024*1024), // Sanity limit
query::tombstone_limit::max,
query::row_limit(query::max_rows),
query::partition_limit(query::max_partitions));
reader_permit::awaits_guard ag(permit);
auto&& [result, hit_rate, opt_exception] = co_await ser::storage_proxy_rpc_verbs::send_read_mutation_data(&_ms.local(),
locator::host_id(leader.uuid()),
permit.timeout(),
cmd,
query::full_partition_range,
{});
if (opt_exception.has_value() && *opt_exception) {
co_await coroutine::return_exception_ptr((*opt_exception).into_exception_ptr());
}
for (auto&& p : result.partitions()) {
mutation_sink(p.mut().unfreeze(_s));
co_await coroutine::maybe_yield();
}
co_return;
}
// This function is executed on the node which was a group0 leader at some point since the query started.
// The node may not be the leader anymore by the time it is executed.
// Only executed on shard 0.
virtual future<> execute_on_leader(std::function<void(mutation)> mutation_sink, reader_permit) = 0;
future<> execute(std::function<void(mutation)> mutation_sink, reader_permit permit) override {
if (this_shard_id() != 0) {
co_return;
}
if (co_await is_leader(permit)) {
co_await execute_on_leader(std::move(mutation_sink), std::move(permit));
co_return;
}
co_await redirect_to_leader(std::move(mutation_sink), std::move(permit));
}
};
class load_per_node : public group0_virtual_table {
private:
sharded<service::tablet_allocator>& _talloc;
sharded<replica::database>& _db;
sharded<gms::gossiper>& _gossiper;
public:
load_per_node(sharded<service::tablet_allocator>& talloc,
sharded<replica::database>& db,
sharded<service::raft_group_registry>& raft_gr,
sharded<netw::messaging_service>& ms,
sharded<gms::gossiper>& gossiper)
: group0_virtual_table(build_schema(), raft_gr, ms)
, _talloc(talloc)
, _db(db)
, _gossiper(gossiper)
{ }
future<> execute_on_leader(std::function<void(mutation)> mutation_sink, reader_permit permit) override {
auto stats = _talloc.local().get_load_stats();
while (!stats) {
// Wait for stats to be refreshed by topology coordinator
{
abort_on_expiry aoe(permit.timeout());
reader_permit::awaits_guard ag(permit);
co_await seastar::sleep_abortable(std::chrono::milliseconds(200), aoe.abort_source());
}
if (!co_await is_leader(permit)) {
co_await redirect_to_leader(std::move(mutation_sink), std::move(permit));
co_return;
}
stats = _talloc.local().get_load_stats();
}
auto tm = _db.local().get_token_metadata_ptr();
const uint64_t default_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
locator::load_sketch load(tm, stats, default_tablet_size);
co_await load.populate();
tm->get_topology().for_each_node([&] (const auto& node) {
auto host = node.host_id();
mutation m(schema(), make_partition_key(host));
auto& r = m.partition().clustered_row(*schema(), clustering_key::make_empty());
set_cell(r.cells(), "dc", node.dc());
set_cell(r.cells(), "rack", node.rack());
set_cell(r.cells(), "up", _gossiper.local().is_alive(host));
set_cell(r.cells(), "draining", node.is_draining());
set_cell(r.cells(), "excluded", node.is_excluded());
if (auto ip = _gossiper.local().get_address_map().find(host)) {
set_cell(r.cells(), "ip", data_value(inet_address(*ip)));
}
set_cell(r.cells(), "tablets_allocated", int64_t(load.get_tablet_count(host)));
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_tablet_count(host))));
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_tablet_count(host) * default_tablet_size)));
if (stats && stats->capacity.contains(host)) {
auto capacity = stats->capacity.at(host);
set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity)));
if (auto ts_iter = stats->tablet_stats.find(host); ts_iter != stats->tablet_stats.end()) {
set_cell(r.cells(), "effective_capacity", data_value(int64_t(ts_iter->second.effective_capacity)));
}
if (auto utilization = load.get_allocated_utilization(host)) {
set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization)));
}
if (load.has_complete_data(host)) {
if (auto utilization = load.get_storage_utilization(host)) {
set_cell(r.cells(), "storage_utilization", data_value(double(*utilization)));
}
set_cell(r.cells(), "storage_load", data_value(int64_t(load.get_disk_used(host))));
}
}
mutation_sink(m);
});
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "load_per_node");
return schema_builder(system_keyspace::NAME, "load_per_node", std::make_optional(id))
.with_column("node", uuid_type, column_kind::partition_key)
.with_column("dc", utf8_type)
.with_column("rack", utf8_type)
.with_column("ip", inet_addr_type)
.with_column("up", boolean_type)
.with_column("draining", boolean_type)
.with_column("excluded", boolean_type)
.with_column("tablets_allocated", long_type)
.with_column("tablets_allocated_per_shard", double_type)
.with_column("storage_capacity", long_type)
.with_column("effective_capacity", long_type)
.with_column("storage_allocated_load", long_type)
.with_column("storage_allocated_utilization", double_type)
.with_column("storage_load", long_type)
.with_column("storage_utilization", double_type)
.with_sharder(1, 0) // shard0-only
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(host_id host) {
return dht::decorate_key(*_s, partition_key::from_single_value(
*_s, data_value(host.uuid()).serialize_nonnull()));
}
};
class tablet_sizes : public group0_virtual_table {
private:
sharded<service::tablet_allocator>& _talloc;
sharded<replica::database>& _db;
public:
tablet_sizes(sharded<service::tablet_allocator>& talloc,
sharded<replica::database>& db,
sharded<service::raft_group_registry>& raft_gr,
sharded<netw::messaging_service>& ms)
: group0_virtual_table(build_schema(), raft_gr, ms)
, _talloc(talloc)
, _db(db)
{ }
future<> execute_on_leader(std::function<void(mutation)> mutation_sink, reader_permit permit) override {
auto stats = _talloc.local().get_load_stats();
while (!stats) {
// Wait for stats to be refreshed by topology coordinator
{
abort_on_expiry aoe(permit.timeout());
reader_permit::awaits_guard ag(permit);
co_await seastar::sleep_abortable(std::chrono::milliseconds(200), aoe.abort_source());
}
if (!co_await is_leader(permit)) {
co_await redirect_to_leader(std::move(mutation_sink), std::move(permit));
co_return;
}
stats = _talloc.local().get_load_stats();
}
auto tm = _db.local().get_token_metadata_ptr();
auto prepare_replica_sizes = [] (const std::unordered_map<host_id, uint64_t>& replica_sizes) {
map_type_impl::native_type tmp;
for (auto& r: replica_sizes) {
auto replica = r.first.uuid();
int64_t tablet_size = int64_t(r.second);
auto map_element = std::make_pair<data_value, data_value>(data_value(replica), data_value(tablet_size));
tmp.push_back(std::move(map_element));
}
return tmp;
};
auto prepare_missing_replica = [] (const std::unordered_set<host_id>& missing_replicas) {
set_type_impl::native_type tmp;
for (auto& r: missing_replicas) {
tmp.push_back(data_value(r.uuid()));
}
return tmp;
};
auto map_type = map_type_impl::get_instance(uuid_type, long_type, false);
auto set_type = set_type_impl::get_instance(uuid_type, false);
for (auto&& [table, tmap] : tm->tablets().all_tables_ungrouped()) {
mutation m(schema(), make_partition_key(table));
co_await tmap->for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) -> future<> {
auto trange = tmap->get_token_range(tid);
int64_t last_token = trange.end()->value().raw();
auto& r = m.partition().clustered_row(*schema(), clustering_key::from_single_value(*schema(), data_value(last_token).serialize_nonnull()));
const range_based_tablet_id rb_tid {table, trange};
std::unordered_map<host_id, uint64_t> replica_sizes;
std::unordered_set<host_id> missing_replicas;
for (auto& replica : tinfo.replicas) {
auto tablet_size_opt = stats->get_tablet_size(replica.host, rb_tid);
if (tablet_size_opt) {
replica_sizes[replica.host] = *tablet_size_opt;
} else {
missing_replicas.insert(replica.host);
}
}
set_cell(r.cells(), "replicas", make_map_value(map_type, prepare_replica_sizes(replica_sizes)));
set_cell(r.cells(), "missing_replicas", make_set_value(set_type, prepare_missing_replica(missing_replicas)));
return make_ready_future<>();
});
mutation_sink(m);
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "tablet_sizes");
return schema_builder(system_keyspace::NAME, "tablet_sizes", std::make_optional(id))
.with_column("table_id", uuid_type, column_kind::partition_key)
.with_column("last_token", long_type, column_kind::clustering_key)
.with_column("replicas", map_type_impl::get_instance(uuid_type, long_type, false))
.with_column("missing_replicas", set_type_impl::get_instance(uuid_type, false))
.with_sharder(1, 0) // shard0-only
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(table_id table) {
return dht::decorate_key(*_s, partition_key::from_single_value(
*_s, data_value(table.uuid()).serialize_nonnull()));
}
};
class cdc_timestamps_table : public streaming_virtual_table {
private:
replica::database& _db;
service::storage_service& _ss;
public:
cdc_timestamps_table(replica::database& db, service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _db(db)
, _ss(ss)
{
_shard_aware = true;
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_table_name {
table_id table;
dht::decorated_key key;
};
std::vector<decorated_table_name> tables;
for (auto table : _ss.get_tables_with_cdc_tablet_streams()) {
auto& cf = _db.find_column_family(table);
auto base_schema = cdc::get_base_table(_db, *cf.schema());
auto dk = make_partition_key(base_schema->ks_name(), base_schema->cf_name());
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) {
continue;
}
tables.emplace_back(decorated_table_name{table, std::move(dk)});
}
std::ranges::sort(tables, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_table_name::key));
for (const auto& [table, dk] : tables) {
co_await result.emit_partition_start(dk);
auto created_at = api::new_timestamp();
co_await _ss.query_cdc_timestamps(table, false, [&] (db_clock::time_point ts) -> future<> {
clustering_row cr(make_clustering_key(ts));
cr.apply(row_marker(created_at));
return result.emit_row(std::move(cr));
});
co_await result.emit_partition_end();
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::CDC_TIMESTAMPS);
return schema_builder(system_keyspace::NAME, system_keyspace::CDC_TIMESTAMPS, std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::partition_key)
.with_column("timestamp", reversed_type_impl::get_instance(timestamp_type), column_kind::clustering_key)
.set_comment("CDC generations for tablets-based tables")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const std::string& ks_name, const std::string& cf_name) {
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
data_value(ks_name).serialize_nonnull(),
data_value(cf_name).serialize_nonnull()
}));
}
clustering_key make_clustering_key(db_clock::time_point ts) {
return clustering_key::from_single_value(*_s,
timestamp_type->decompose(ts)
);
}
};
class cdc_streams_table : public streaming_virtual_table {
private:
replica::database& _db;
service::storage_service& _ss;
public:
cdc_streams_table(replica::database& db, service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _db(db)
, _ss(ss)
{
_shard_aware = true;
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_table_name {
table_id table;
dht::decorated_key key;
};
std::vector<decorated_table_name> tables;
for (auto table : _ss.get_tables_with_cdc_tablet_streams()) {
auto& cf = _db.find_column_family(table);
auto base_schema = cdc::get_base_table(_db, *cf.schema());
auto dk = make_partition_key(base_schema->ks_name(), base_schema->cf_name());
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) {
continue;
}
tables.emplace_back(decorated_table_name{table, std::move(dk)});
}
std::ranges::sort(tables, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_table_name::key));
for (const auto& [table, dk] : tables) {
co_await result.emit_partition_start(dk);
auto created_at = api::new_timestamp();
auto emit_stream_set = [&] (db_clock::time_point ts, cdc::stream_state kind, const auto& sids) -> future<> {
for (const auto& sid : sids) {
clustering_row cr(make_clustering_key(ts, kind, sid));
cr.apply(row_marker(created_at));
co_await result.emit_row(std::move(cr));
}
};
// emit in clustering order
static_assert(int(cdc::stream_state::current) < int(cdc::stream_state::closed));
static_assert(int(cdc::stream_state::closed) < int(cdc::stream_state::opened));
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
co_await emit_stream_set(ts, cdc::stream_state::current, current);
co_await emit_stream_set(ts, cdc::stream_state::closed, diff.closed_streams);
co_await emit_stream_set(ts, cdc::stream_state::opened, diff.opened_streams);
});
co_await result.emit_partition_end();
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::CDC_STREAMS);
return schema_builder(system_keyspace::NAME, system_keyspace::CDC_STREAMS, std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::partition_key)
.with_column("timestamp", timestamp_type, column_kind::clustering_key)
.with_column("stream_state", byte_type, column_kind::clustering_key)
.with_column("stream_id", bytes_type, column_kind::clustering_key)
.set_comment("CDC streams for each generation for tablets-based tables")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const std::string& ks_name, const std::string& cf_name) {
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
data_value(ks_name).serialize_nonnull(),
data_value(cf_name).serialize_nonnull()
}));
}
clustering_key make_clustering_key(db_clock::time_point ts, cdc::stream_state kind, const cdc::stream_id& sid) {
return clustering_key::from_exploded(*_s, {
timestamp_type->decompose(ts),
byte_type->decompose(std::to_underlying(kind)),
sid.to_bytes()
});
}
};
}
future<> initialize_virtual_tables(
sharded<replica::database>& dist_db, sharded<service::storage_service>& dist_ss,
sharded<gms::gossiper>& dist_gossiper, sharded<service::raft_group_registry>& dist_raft_gr,
sharded<db::system_keyspace>& sys_ks,
sharded<service::tablet_allocator>& tablet_allocator,
sharded<netw::messaging_service>& ms,
db::config& cfg) {
auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry();
auto& virtual_tables = *virtual_tables_registry;
auto& db = dist_db.local();
auto& ss = dist_ss.local();
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
auto schema = tbl->schema();
virtual_tables[schema->id()] = std::move(tbl);
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
auto& cf = db.find_column_family(schema);
cf.mark_ready_for_writes(nullptr);
auto& vt = virtual_tables[schema->id()];
cf.set_virtual_reader(vt->as_mutation_source());
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
};
// Add built-in virtual tables here.
co_await add_table(std::make_unique<cluster_status_table>(dist_ss, dist_gossiper));
co_await add_table(std::make_unique<token_ring_table>(db, ss));
co_await add_table(std::make_unique<snapshots_table>(dist_db));
co_await add_table(std::make_unique<protocol_servers_table>(ss));
co_await add_table(std::make_unique<runtime_info_table>(dist_db, ss));
co_await add_table(std::make_unique<versions_table>());
co_await add_table(std::make_unique<db_config_table>(cfg));
co_await add_table(std::make_unique<clients_table>(ss));
co_await add_table(std::make_unique<raft_state_table>(dist_raft_gr));
co_await add_table(std::make_unique<load_per_node>(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper));
co_await add_table(std::make_unique<tablet_sizes>(tablet_allocator, dist_db, dist_raft_gr, ms));
co_await add_table(std::make_unique<cdc_timestamps_table>(db, ss));
co_await add_table(std::make_unique<cdc_streams_table>(db, ss));
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
}
virtual_tables_registry::virtual_tables_registry() : unique_ptr(std::make_unique<virtual_tables_registry_impl>()) {
}
virtual_tables_registry::~virtual_tables_registry() = default;
} // namespace db