Files
scylladb/db/virtual_tables.cc
Patryk Jędrzejczak 67045b5f17 Merge 'raft_topology, tablets: Drain tablets in parallel with other topology operations' from Tomasz Grabiec
Allows other topology operations to execute while tablets are being
drained on decommission. In particular, bootstrap on scale-out. This
is important for elasticity.

Allows multiple decommission/removenode to happen in parallel, which
is important for efficiency.

Flow of decommission/removenode request:
  1) pending and paused, has tablet replicas on target node.
     Tablet scheduler will start draining tablets.
  2) No tablets on target node, request is pending but not paused
  3) Request is scheduled, node is in transition
  4) Request is done

Nodes are considered draining as soon as there is a leave or remove
request on them. If there are tablet replicas present on the target
node, the request is in a paused state and will not be picked by
topology coordinator. The paused state is computed from topology state
automatically on reload.

When request is not paused, its execution starts in
write_both_read_old state. The old tablet_draining state is not
entered (it's deprecated now).

Tablet load balancing will yield the state machine as soon as some
request is no longer paused and ready to be scheduled, based on
standard preemption mechanics.

Fixes #21452

Closes scylladb/scylladb#24129

* https://github.com/scylladb/scylladb:
  docs: Document parallel decommission and removenode and relevant task API
  test: Add tests for parallel decommission/removenode
  test: util: Introduce ensure_group0_leader_on()
  test: tablets: Check that there are no migrations scheduled on draining nodes
  test: lib: topology_builder: Introduce add_draining_request()
  topology_coordinator, tablets: Fail draining operations when tablet migration fails due to critical disk utilization
  tablets: topology_coordinator: Refactor to propagate reason for migration rollback
  tablet_allocator: Skip co-location on draining nodes
  node_ops: task_manager_module: Populate entity field also for active requests
  tasks: node_ops: Put node id in the entity field
  tasks, node_ops: Unify setting of task_stats in get_status() and get_stats()
  topology: Protect against empty cancelation reason
  tasks, topology: Make pending node operations abortable
  doc: topology-over-raft.md: Fix diagram for replacing, tablet_draining is not engaged
  raft_topology, tablets: Drain tablets in parallel with other topology operations
  virtual_tables: Show draining and excluded fields in system.cluster_status and system.load_by_node
  locator: topology: Add "draining" flag to a node
  topology_coordinator: Extract generate_cancel_request_update()
  storage_service: Drop dependency in topology_state_machine.hh in the header
  locator: Extract common code in assert_rf_rack_valid_keyspace()
  topology_coordinator, storage_service: Validate node removal/decommission at request submission time
2026-01-22 13:06:53 +01:00

1510 lines
66 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <algorithm>
#include <boost/algorithm/string.hpp>
#include <boost/lexical_cast.hpp>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/json/json_elements.hh>
#include <seastar/core/reactor.hh>
#include "cdc/generation_service.hh"
#include "cdc/log.hh"
#include "cdc/metadata.hh"
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "db/virtual_table.hh"
#include "partition_slice_builder.hh"
#include "db/virtual_tables.hh"
#include "db/size_estimates_virtual_reader.hh"
#include "db/view/build_progress_virtual_reader.hh"
#include "index/built_indexes_virtual_reader.hh"
#include "gms/gossiper.hh"
#include "mutation/frozen_mutation.hh"
#include "transport/protocol_server.hh"
#include "release.hh"
#include "replica/database.hh"
#include "schema/schema_builder.hh"
#include "service/raft/raft_group_registry.hh"
#include "service/storage_service.hh"
#include "service/tablet_allocator.hh"
#include "locator/load_sketch.hh"
#include "types/list.hh"
#include "types/types.hh"
#include "utils/build_id.hh"
#include "utils/log.hh"
#include "replica/exceptions.hh"
#include "service/paxos/paxos_state.hh"
#include "idl/storage_proxy.dist.hh"
using namespace locator;
namespace db {
namespace {
logging::logger vtlog("virtual_tables");
class cluster_status_table : public memtable_filling_virtual_table {
private:
sharded<service::storage_service>& _dist_ss;
sharded<gms::gossiper>& _dist_gossiper;
public:
cluster_status_table(sharded<service::storage_service>& ss, sharded<gms::gossiper>& g)
: memtable_filling_virtual_table(build_schema())
, _dist_ss(ss), _dist_gossiper(g) {}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "cluster_status");
return schema_builder(system_keyspace::NAME, "cluster_status", std::make_optional(id))
.with_column("peer", inet_addr_type, column_kind::partition_key)
.with_column("dc", utf8_type)
.with_column("up", boolean_type)
.with_column("draining", boolean_type)
.with_column("excluded", boolean_type)
.with_column("status", utf8_type)
.with_column("load", utf8_type)
.with_column("tokens", int32_type)
.with_column("owns", float_type)
.with_column("host_id", uuid_type)
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
auto muts = co_await _dist_ss.invoke_on(0, [this] (service::storage_service& ss) -> future<utils::chunked_vector<frozen_mutation>> {
auto& gossiper = _dist_gossiper.local();
auto ownership = co_await ss.get_ownership();
const locator::token_metadata& tm = ss.get_token_metadata();
utils::chunked_vector<frozen_mutation> muts;
muts.reserve(gossiper.num_endpoints());
gossiper.for_each_endpoint_state([&] (const gms::endpoint_state& eps) {
static thread_local auto s = build_schema();
auto endpoint = eps.get_host_id();
mutation m(s, partition_key::from_single_value(*s, data_value(eps.get_ip()).serialize_nonnull()));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
auto hostid = eps.get_host_id();
set_cell(cr, "up", gossiper.is_alive(hostid));
if (!ss.raft_topology_change_enabled() || gossiper.is_shutdown(endpoint)) {
set_cell(cr, "status", gossiper.get_gossip_status(endpoint));
}
set_cell(cr, "load", gossiper.get_application_state_value(endpoint, gms::application_state::LOAD));
if (ss.raft_topology_change_enabled() && !gossiper.is_shutdown(endpoint)) {
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(hostid))));
}
set_cell(cr, "host_id", hostid.uuid());
if (tm.get_topology().has_node(hostid)) {
// Not all entries in gossiper are present in the topology
auto& node = tm.get_topology().get_node(hostid);
sstring dc = node.dc_rack().dc;
set_cell(cr, "dc", dc);
set_cell(cr, "draining", node.is_draining());
set_cell(cr, "excluded", node.is_excluded());
}
if (ownership.contains(eps.get_ip())) {
set_cell(cr, "owns", ownership[eps.get_ip()]);
}
set_cell(cr, "tokens", int32_t(tm.get_tokens(hostid).size()));
muts.push_back(freeze(std::move(m)));
});
co_return muts;
});
for (auto& m : muts) {
mutation_sink(m.unfreeze(schema()));
}
}
};
class token_ring_table : public streaming_virtual_table {
private:
replica::database& _db;
service::storage_service& _ss;
public:
token_ring_table(replica::database& db, service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _db(db)
, _ss(ss)
{
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "token_ring");
return schema_builder(system_keyspace::NAME, "token_ring", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::clustering_key)
.with_column("start_token", utf8_type, column_kind::clustering_key)
.with_column("endpoint", inet_addr_type, column_kind::clustering_key)
.with_column("end_token", utf8_type)
.with_column("dc", utf8_type)
.with_column("rack", utf8_type)
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const sstring& name) {
return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(name).serialize_nonnull()));
}
clustering_key make_clustering_key(const sstring& table_name, sstring start_token, gms::inet_address host) {
return clustering_key::from_exploded(*_s, {
data_value(table_name).serialize_nonnull(),
data_value(start_token).serialize_nonnull(),
data_value(host).serialize_nonnull()
});
}
future<> emit_ring(result_collector& result, const dht::decorated_key& dk, const sstring& table_name, utils::chunked_vector<dht::token_range_endpoints> ranges) {
co_await result.emit_partition_start(dk);
std::ranges::sort(ranges, std::ranges::less(), std::mem_fn(&dht::token_range_endpoints::_start_token));
for (dht::token_range_endpoints& range : ranges) {
std::ranges::sort(range._endpoint_details, endpoint_details_cmp());
for (const dht::endpoint_details& detail : range._endpoint_details) {
clustering_row cr(make_clustering_key(table_name, range._start_token, detail._host));
set_cell(cr.cells(), "end_token", sstring(range._end_token));
set_cell(cr.cells(), "dc", sstring(detail._datacenter));
set_cell(cr.cells(), "rack", sstring(detail._rack));
co_await result.emit_row(std::move(cr));
}
}
co_await result.emit_partition_end();
}
struct endpoint_details_cmp {
bool operator()(const dht::endpoint_details& l, const dht::endpoint_details& r) const {
return inet_addr_type->less(
data_value(l._host).serialize_nonnull(),
data_value(r._host).serialize_nonnull());
}
};
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_keyspace_name {
sstring name;
dht::decorated_key key;
};
auto keyspace_names = _db.get_non_local_strategy_keyspaces()
| std::views::transform([this] (auto&& ks) {
return decorated_keyspace_name{ks, make_partition_key(ks)};
})
| std::ranges::to<std::vector>();
std::ranges::sort(keyspace_names, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_keyspace_name::key));
for (const decorated_keyspace_name& e : keyspace_names) {
auto&& dk = e.key;
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk) || !_db.has_keyspace(e.name)) {
continue;
}
if (_db.find_keyspace(e.name).get_replication_strategy().uses_tablets()) {
co_await _db.get_tables_metadata().for_each_table_gently([&, this] (table_id, lw_shared_ptr<replica::table> table) -> future<> {
if (table->schema()->ks_name() != e.name) {
co_return;
}
const auto& table_name = table->schema()->cf_name();
utils::chunked_vector<dht::token_range_endpoints> ranges = co_await _ss.describe_ring_for_table(e.name, table_name);
co_await emit_ring(result, e.key, table_name, std::move(ranges));
});
} else {
utils::chunked_vector<dht::token_range_endpoints> ranges = co_await _ss.describe_ring(e.name);
co_await emit_ring(result, e.key, "<ALL>", std::move(ranges));
}
}
}
};
class snapshots_table : public streaming_virtual_table {
sharded<replica::database>& _db;
public:
explicit snapshots_table(sharded<replica::database>& db)
: streaming_virtual_table(build_schema())
, _db(db)
{
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "snapshots");
return schema_builder(system_keyspace::NAME, "snapshots", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::clustering_key)
.with_column("snapshot_name", utf8_type, column_kind::clustering_key)
.with_column("live", long_type)
.with_column("total", long_type)
.set_comment("Lists all the snapshots along with their size, dropped tables are not part of the listing.")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const sstring& name) {
return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(name).serialize_nonnull()));
}
clustering_key make_clustering_key(sstring table_name, sstring snapshot_name) {
return clustering_key::from_exploded(*_s, {
data_value(std::move(table_name)).serialize_nonnull(),
data_value(std::move(snapshot_name)).serialize_nonnull()
});
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_keyspace_name {
schema_ptr s;
dht::decorated_key key;
auto operator<=>(const decorated_keyspace_name& o) const {
return key.tri_compare(*o.s, o.key);
}
};
using snapshots_by_tables_map = std::map<sstring, std::map<sstring, replica::table::snapshot_details>>;
using snapshots_by_keyspace_map = std::map<decorated_keyspace_name, snapshots_by_tables_map>;
snapshots_by_keyspace_map keyspace_snapshots;
auto snapshot_details = co_await _db.local().get_snapshot_details();
for (const auto& [snapshot_name, details] : snapshot_details) {
for (const auto& d : details) {
auto dk = make_partition_key(d.ks);
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) {
continue;
}
keyspace_snapshots[decorated_keyspace_name(_s, dk)][d.cf][snapshot_name] = d.details;
}
}
for (const auto& [ks_data, snapshots_by_tables] : keyspace_snapshots) {
co_await result.emit_partition_start(ks_data.key);
for (const auto& [table_name, snapshots] : snapshots_by_tables) {
for (auto& [snapshot_name, details] : snapshots) {
clustering_row cr(make_clustering_key(table_name, snapshot_name));
set_cell(cr.cells(), "live", details.live);
set_cell(cr.cells(), "total", details.total);
co_await result.emit_row(std::move(cr));
}
}
co_await result.emit_partition_end();
}
}
};
class protocol_servers_table : public memtable_filling_virtual_table {
private:
service::storage_service& _ss;
struct protocol_server_info {
sstring name;
sstring protocol;
sstring protocol_version;
std::vector<sstring> listen_addresses;
explicit protocol_server_info(protocol_server& s)
: name(s.name())
, protocol(s.protocol())
, protocol_version(s.protocol_version()) {
for (const auto& addr : s.listen_addresses()) {
listen_addresses.push_back(format("{}:{}", addr.addr(), addr.port()));
}
}
};
public:
explicit protocol_servers_table(service::storage_service& ss)
: memtable_filling_virtual_table(build_schema())
, _ss(ss) {
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "protocol_servers");
return schema_builder(system_keyspace::NAME, "protocol_servers", std::make_optional(id))
.with_column("name", utf8_type, column_kind::partition_key)
.with_column("protocol", utf8_type)
.with_column("protocol_version", utf8_type)
.with_column("listen_addresses", list_type_impl::get_instance(utf8_type, false))
.set_comment("Lists all client protocol servers and their status.")
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
// Servers are registered on shard 0 only
const auto server_infos = co_await smp::submit_to(0ul, [&ss = _ss.container()] {
return ss.local().protocol_servers()
| std::views::transform([] (protocol_server* s) { return protocol_server_info(*s); })
| std::ranges::to<std::vector>();
});
for (auto server : server_infos) {
auto dk = dht::decorate_key(*_s, partition_key::from_single_value(*schema(), data_value(server.name).serialize_nonnull()));
if (!this_shard_owns(dk)) {
continue;
}
mutation m(schema(), std::move(dk));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
set_cell(cr, "protocol", server.protocol);
set_cell(cr, "protocol_version", server.protocol_version);
std::vector<data_value> addresses(server.listen_addresses.begin(), server.listen_addresses.end());
set_cell(cr, "listen_addresses", make_list_value(schema()->get_column_definition("listen_addresses")->type, std::move(addresses)));
mutation_sink(std::move(m));
}
}
};
class runtime_info_table : public memtable_filling_virtual_table {
private:
sharded<replica::database>& _db;
service::storage_service& _ss;
std::optional<dht::decorated_key> _generic_key;
private:
std::optional<dht::decorated_key> maybe_make_key(sstring key) {
auto dk = dht::decorate_key(*_s, partition_key::from_single_value(*schema(), data_value(std::move(key)).serialize_nonnull()));
if (this_shard_owns(dk)) {
return dk;
}
return std::nullopt;
}
void do_add_partition(std::function<void(mutation)>& mutation_sink, dht::decorated_key key, std::vector<std::pair<sstring, sstring>> rows) {
mutation m(schema(), std::move(key));
for (auto&& [ckey, cvalue] : rows) {
row& cr = m.partition().clustered_row(*schema(), clustering_key::from_single_value(*schema(), data_value(std::move(ckey)).serialize_nonnull())).cells();
set_cell(cr, "value", std::move(cvalue));
}
mutation_sink(std::move(m));
}
void add_partition(std::function<void(mutation)>& mutation_sink, sstring key, sstring value) {
if (_generic_key) {
do_add_partition(mutation_sink, *_generic_key, {{key, std::move(value)}});
}
}
void add_partition(std::function<void(mutation)>& mutation_sink, sstring key, std::initializer_list<std::pair<sstring, sstring>> rows) {
auto dk = maybe_make_key(std::move(key));
if (dk) {
do_add_partition(mutation_sink, std::move(*dk), std::move(rows));
}
}
future<> add_partition(std::function<void(mutation)>& mutation_sink, sstring key, std::function<future<sstring>()> value_producer) {
if (_generic_key) {
do_add_partition(mutation_sink, *_generic_key, {{key, co_await value_producer()}});
}
}
future<> add_partition(std::function<void(mutation)>& mutation_sink, sstring key, std::function<future<std::vector<std::pair<sstring, sstring>>>()> value_producer) {
auto dk = maybe_make_key(std::move(key));
if (dk) {
do_add_partition(mutation_sink, std::move(*dk), co_await value_producer());
}
}
template <typename T>
future<T> map_reduce_tables(std::function<T(replica::table&)> map, std::function<T(T, T)> reduce = std::plus<T>{}) {
class shard_reducer {
T _v{};
std::function<T(T, T)> _reduce;
public:
shard_reducer(std::function<T(T, T)> reduce) : _reduce(std::move(reduce)) { }
future<> operator()(T v) {
v = _reduce(_v, v);
return make_ready_future<>();
}
T get() && { return std::move(_v); }
};
co_return co_await _db.map_reduce(shard_reducer(reduce), [map, reduce] (replica::database& db) {
T val = {};
db.get_tables_metadata().for_each_table([&] (table_id, lw_shared_ptr<replica::table> table) {
val = reduce(val, map(*table));
});
return val;
});
}
template <typename T>
future<T> map_reduce_shards(std::function<T()> map, std::function<T(T, T)> reduce = std::plus<T>{}, T initial = {}) {
co_return co_await map_reduce(
std::views::iota(0u, smp::count),
[map] (shard_id shard) {
return smp::submit_to(shard, [map] {
return map();
});
},
initial,
reduce);
}
public:
explicit runtime_info_table(sharded<replica::database>& db, service::storage_service& ss)
: memtable_filling_virtual_table(build_schema())
, _db(db)
, _ss(ss) {
_shard_aware = true;
_generic_key = maybe_make_key("generic");
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "runtime_info");
return schema_builder(system_keyspace::NAME, "runtime_info", std::make_optional(id))
.with_column("group", utf8_type, column_kind::partition_key)
.with_column("item", utf8_type, column_kind::clustering_key)
.with_column("value", utf8_type)
.set_comment("Runtime system information.")
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
co_await add_partition(mutation_sink, "gossip_active", [this] () -> future<sstring> {
return _ss.is_gossip_running().then([] (bool running){
return format("{}", running);
});
});
co_await add_partition(mutation_sink, "load", [this] () -> future<sstring> {
return map_reduce_tables<int64_t>([] (replica::table& tbl) {
return tbl.get_stats().live_disk_space_used.on_disk;
}).then([] (int64_t load) {
return format("{}", load);
});
});
add_partition(mutation_sink, "uptime", format("{} seconds", std::chrono::duration_cast<std::chrono::seconds>(engine().uptime()).count()));
add_partition(mutation_sink, "trace_probability", format("{:.2}", tracing::tracing::get_local_tracing_instance().get_trace_probability()));
co_await add_partition(mutation_sink, "memory", [this] () {
struct stats {
// take the pre-reserved memory into account, as seastar only returns
// the stats of memory managed by the seastar allocator, but we instruct
// it to reserve addition memory for non-seastar allocator on per-shard
// basis.
uint64_t total = 0;
uint64_t free = 0;
static stats reduce(stats a, stats b) {
return stats{
a.total + b.total + db::config::wasm_udf_reserved_memory,
a.free + b.free};
};
};
return map_reduce_shards<stats>([] () {
const auto& s = memory::stats();
return stats{s.total_memory(), s.free_memory()};
}, stats::reduce, stats{}).then([] (stats s) {
return std::vector<std::pair<sstring, sstring>>{
{"total", format("{}", s.total)},
{"used", format("{}", s.total - s.free)},
{"free", format("{}", s.free)}};
});
});
co_await add_partition(mutation_sink, "memtable", [this] () {
struct stats {
uint64_t total = 0;
uint64_t free = 0;
uint64_t entries = 0;
static stats reduce(stats a, stats b) { return stats{a.total + b.total, a.free + b.free, a.entries + b.entries}; }
};
return map_reduce_tables<stats>([] (replica::table& t) {
logalloc::occupancy_stats s;
uint64_t partition_count = 0;
t.for_each_active_memtable([&] (replica::memtable& active_memtable) {
s += active_memtable.region().occupancy();
partition_count += active_memtable.partition_count();
});
return stats{s.total_space(), s.free_space(), partition_count};
}, stats::reduce).then([] (stats s) {
return std::vector<std::pair<sstring, sstring>>{
{"memory_total", format("{}", s.total)},
{"memory_used", format("{}", s.total - s.free)},
{"memory_free", format("{}", s.free)},
{"entries", format("{}", s.entries)}};
});
});
co_await add_partition(mutation_sink, "cache", [this] () {
struct stats {
uint64_t total = 0;
uint64_t free = 0;
uint64_t entries = 0;
uint64_t hits = 0;
uint64_t misses = 0;
utils::rate_moving_average hits_moving_average;
utils::rate_moving_average requests_moving_average;
static stats reduce(stats a, stats b) {
return stats{
a.total + b.total,
a.free + b.free,
a.entries + b.entries,
a.hits + b.hits,
a.misses + b.misses,
a.hits_moving_average + b.hits_moving_average,
a.requests_moving_average + b.requests_moving_average};
}
};
return _db.map_reduce0([] (replica::database& db) {
stats res{};
auto occupancy = db.row_cache_tracker().region().occupancy();
res.total = occupancy.total_space();
res.free = occupancy.free_space();
res.entries = db.row_cache_tracker().partitions();
db.get_tables_metadata().for_each_table([&] (table_id id, lw_shared_ptr<replica::table> t) {
auto& cache_stats = t->get_row_cache().stats();
res.hits += cache_stats.hits.count();
res.misses += cache_stats.misses.count();
res.hits_moving_average += cache_stats.hits.rate();
res.requests_moving_average += (cache_stats.hits.rate() + cache_stats.misses.rate());
});
return res;
}, stats{}, stats::reduce).then([] (stats s) {
return std::vector<std::pair<sstring, sstring>>{
{"memory_total", format("{}", s.total)},
{"memory_used", format("{}", s.total - s.free)},
{"memory_free", format("{}", s.free)},
{"entries", format("{}", s.entries)},
{"hits", format("{}", s.hits)},
{"misses", format("{}", s.misses)},
{"hit_rate_total", format("{:.2}", static_cast<double>(s.hits) / static_cast<double>(s.hits + s.misses))},
{"hit_rate_recent", format("{:.2}", s.hits_moving_average.mean_rate)},
{"requests_total", format("{}", s.hits + s.misses)},
{"requests_recent", format("{}", static_cast<uint64_t>(s.requests_moving_average.mean_rate))}};
});
});
co_await add_partition(mutation_sink, "incremental_backup_enabled", [this] () {
return _db.map_reduce0([] (replica::database& db) {
return std::ranges::any_of(db.get_keyspaces(), [] (const auto& id_and_ks) {
return id_and_ks.second.incremental_backups_enabled();
});
}, false, std::logical_or{}).then([] (bool res) -> sstring {
return res ? "true" : "false";
});
});
}
};
class versions_table : public memtable_filling_virtual_table {
public:
explicit versions_table()
: memtable_filling_virtual_table(build_schema()) {
_shard_aware = false;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::VERSIONS);
return schema_builder(system_keyspace::NAME, system_keyspace::VERSIONS, std::make_optional(id))
.with_column("key", utf8_type, column_kind::partition_key)
.with_column("version", utf8_type)
.with_column("build_mode", utf8_type)
.with_column("build_id", utf8_type)
.set_comment("Version information.")
.with_hash_version()
.build();
}
future<> execute(std::function<void(mutation)> mutation_sink) override {
mutation m(schema(), partition_key::from_single_value(*schema(), data_value("local").serialize_nonnull()));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
set_cell(cr, "version", scylla_version());
set_cell(cr, "build_mode", scylla_build_mode());
set_cell(cr, "build_id", get_build_id());
mutation_sink(std::move(m));
return make_ready_future<>();
}
};
class db_config_table final : public streaming_virtual_table {
db::config& _cfg;
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "config");
return schema_builder(system_keyspace::NAME, "config", std::make_optional(id))
.with_column("name", utf8_type, column_kind::partition_key)
.with_column("type", utf8_type)
.with_column("source", utf8_type)
.with_column("value", utf8_type)
.with_hash_version()
.build();
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct config_entry {
dht::decorated_key key;
std::string_view type;
sstring source;
sstring value;
};
std::vector<config_entry> cfg;
for (auto&& cfg_ref : _cfg.values()) {
auto&& c = cfg_ref.get();
dht::decorated_key dk = dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(c.name()).serialize_nonnull()));
if (this_shard_owns(dk)) {
cfg.emplace_back(config_entry{ std::move(dk), c.type_name(), c.source_name(), c.value_as_json()._res });
}
}
std::ranges::sort(cfg, dht::ring_position_less_comparator(*_s), std::mem_fn(&config_entry::key));
for (auto&& c : cfg) {
co_await result.emit_partition_start(c.key);
mutation m(schema(), c.key);
clustering_row cr(clustering_key::make_empty());
set_cell(cr.cells(), "type", c.type);
set_cell(cr.cells(), "source", c.source);
set_cell(cr.cells(), "value", c.value);
co_await result.emit_row(std::move(cr));
co_await result.emit_partition_end();
}
}
virtual future<> apply(const frozen_mutation& fm) override {
const mutation m = fm.unfreeze(_s);
query::result_set rs(m);
auto name = rs.row(0).get<sstring>("name");
auto value = rs.row(0).get<sstring>("value");
if (!_cfg.enable_cql_config_updates()) {
return virtual_table::apply(fm); // will return back exceptional future
}
if (!name) {
return make_exception_future<>(virtual_table_update_exception("option name is required"));
}
if (!value) {
return make_exception_future<>(virtual_table_update_exception("option value is required"));
}
if (rs.row(0).cells().contains("type")) {
return make_exception_future<>(virtual_table_update_exception("option type is immutable"));
}
if (rs.row(0).cells().contains("source")) {
return make_exception_future<>(virtual_table_update_exception("option source is not updateable"));
}
return smp::submit_to(0, [&cfg = _cfg, name = std::move(*name), value = std::move(*value)] () mutable -> future<> {
for (auto& c_ref : cfg.values()) {
auto& c = c_ref.get();
if (c.name() == name) {
std::exception_ptr ex;
try {
if (co_await c.set_value_on_all_shards(value, utils::config_file::config_source::CQL)) {
co_return;
} else {
ex = std::make_exception_ptr(virtual_table_update_exception("option is not live-updateable"));
}
} catch (boost::bad_lexical_cast&) {
ex = std::make_exception_ptr(virtual_table_update_exception("cannot parse option value"));
}
co_await coroutine::return_exception_ptr(std::move(ex));
}
}
co_await coroutine::return_exception(virtual_table_update_exception("no such option"));
});
}
public:
explicit db_config_table(db::config& cfg)
: streaming_virtual_table(build_schema())
, _cfg(cfg)
{
_shard_aware = true;
}
};
class clients_table : public streaming_virtual_table {
service::storage_service& _ss;
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "clients");
return schema_builder(system_keyspace::NAME, "clients", std::make_optional(id))
.with_column("address", inet_addr_type, column_kind::partition_key)
.with_column("port", int32_type, column_kind::clustering_key)
.with_column("client_type", utf8_type, column_kind::clustering_key)
.with_column("shard_id", int32_type)
.with_column("connection_stage", utf8_type)
.with_column("driver_name", utf8_type)
.with_column("driver_version", utf8_type)
.with_column("hostname", utf8_type)
.with_column("protocol_version", int32_type)
.with_column("ssl_cipher_suite", utf8_type)
.with_column("ssl_enabled", boolean_type)
.with_column("ssl_protocol", utf8_type)
.with_column("username", utf8_type)
.with_column("scheduling_group", utf8_type)
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(net::inet_address ip) {
return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(ip).serialize_nonnull()));
}
clustering_key make_clustering_key(int32_t port, sstring clt) {
return clustering_key::from_exploded(*_s, {
data_value(port).serialize_nonnull(),
data_value(clt).serialize_nonnull()
});
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
// Collect
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
using shard_client_data = std::vector<client_data_vec>;
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
cd_vec.resize(smp::count);
auto servers = co_await _ss.container().invoke_on(0, [] (auto& ss) { return ss.protocol_servers(); });
co_await smp::invoke_on_all([&cd_vec_ = cd_vec, &servers_ = servers] () -> future<> {
auto& cd_vec = cd_vec_;
auto& servers = servers_;
auto scd = std::make_unique<shard_client_data>();
for (const auto& ps : servers) {
client_data_vec cds = co_await ps->get_client_data();
if (cds.size() != 0) {
scd->emplace_back(std::move(cds));
}
}
cd_vec[this_shard_id()] = make_foreign(std::move(scd));
});
// Partition
struct decorated_ip {
dht::decorated_key key;
net::inet_address ip;
struct compare {
dht::ring_position_less_comparator less;
explicit compare(const class schema& s) : less(s) {}
bool operator()(const decorated_ip& a, const decorated_ip& b) const {
return less(a.key, b.key);
}
};
};
decorated_ip::compare cmp(*_s);
std::set<decorated_ip, decorated_ip::compare> ips(cmp);
std::unordered_map<net::inet_address, client_data_vec> cd_map;
for (unsigned i = 0; i < smp::count; i++) {
for (auto&& ps_cdc : *cd_vec[i]) {
for (auto&& cd : ps_cdc) {
if (cd_map.contains(cd->ip)) {
cd_map[cd->ip].emplace_back(std::move(cd));
} else {
dht::decorated_key key = make_partition_key(cd->ip);
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
ips.insert(decorated_ip{std::move(key), cd->ip});
cd_map[cd->ip].emplace_back(std::move(cd));
}
}
co_await coroutine::maybe_yield();
}
}
}
// Emit
for (const auto& dip : ips) {
co_await result.emit_partition_start(dip.key);
auto& clients = cd_map[dip.ip];
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
return a->port < b->port || a->client_type_str() < b->client_type_str();
});
for (const auto& cd : clients) {
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
set_cell(cr.cells(), "shard_id", cd->shard_id);
set_cell(cr.cells(), "connection_stage", cd->stage_str());
if (cd->driver_name) {
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
}
if (cd->driver_version) {
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
}
if (cd->hostname) {
set_cell(cr.cells(), "hostname", *cd->hostname);
}
if (cd->protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
}
if (cd->ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
}
if (cd->ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
}
if (cd->ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
}
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
if (cd->scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
}
auto map_type = map_type_impl::get_instance(
utf8_type,
utf8_type,
false
);
auto prepare_client_options = [] (const auto& client_options) {
map_type_impl::native_type tmp;
for (auto& co: client_options) {
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
tmp.push_back(std::move(map_element));
}
return tmp;
};
set_cell(cr.cells(), "client_options",
make_map_value(map_type, prepare_client_options(cd->client_options)));
co_await result.emit_row(std::move(cr));
}
co_await result.emit_partition_end();
}
}
public:
clients_table(service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _ss(ss)
{
_shard_aware = true;
}
};
// Shows the current state of each Raft group.
// Currently it shows only the configuration.
// In the future we plan to add additional columns with more information.
class raft_state_table : public streaming_virtual_table {
private:
sharded<service::raft_group_registry>& _raft_gr;
public:
raft_state_table(sharded<service::raft_group_registry>& raft_gr)
: streaming_virtual_table(build_schema())
, _raft_gr(raft_gr) {
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_gid {
raft::group_id gid;
dht::decorated_key key;
unsigned shard;
};
auto groups_and_shards = co_await _raft_gr.map([] (service::raft_group_registry& raft_gr) {
return std::pair{raft_gr.all_groups(), this_shard_id()};
});
std::vector<decorated_gid> decorated_gids;
for (auto& [groups, shard]: groups_and_shards) {
for (auto& gid: groups) {
decorated_gids.push_back(decorated_gid{gid, make_partition_key(gid), shard});
}
}
// Must return partitions in token order.
std::sort(decorated_gids.begin(), decorated_gids.end(), [less = dht::ring_position_less_comparator(*_s)]
(const decorated_gid& l, const decorated_gid& r) { return less(l.key, r.key); });
for (auto& [gid, dk, shard]: decorated_gids) {
if (!contains_key(qr.partition_range(), dk)) {
continue;
}
auto cfg_opt = co_await _raft_gr.invoke_on(shard,
[gid=gid] (service::raft_group_registry& raft_gr) -> std::optional<raft::configuration> {
// Be ready for a group to disappear while we're querying.
auto* srv = raft_gr.find_server(gid);
if (!srv) {
return std::nullopt;
}
// FIXME: the configuration returned here is obtained from raft::fsm, it may not be
// persisted yet, so this is not 100% correct. It may happen that we crash after
// a config entry is appended in-memory in fsm but before it's persisted. It would be
// incorrect to return the configuration observed during this window - after restart
// the configuration would revert to the previous one. Perhaps this is unlikely to
// happen in practice, but for correctness we should add a way of querying the
// latest persisted configuration.
return srv->get_configuration();
});
if (!cfg_opt) {
continue;
}
co_await result.emit_partition_start(dk);
// List current config first, because 'C' < 'P' and the disposition
// (ascii_type, 'CURRENT' vs 'PREVIOUS') is the first column in the clustering key.
co_await emit_member_set(result, "CURRENT", cfg_opt->current);
co_await emit_member_set(result, "PREVIOUS", cfg_opt->previous);
co_await result.emit_partition_end();
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "raft_state");
return schema_builder(system_keyspace::NAME, "raft_state", std::make_optional(id))
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS'
.with_column("server_id", uuid_type, column_kind::clustering_key)
.with_column("can_vote", boolean_type)
.set_comment("Currently operating RAFT configuration")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(raft::group_id gid) {
// Make sure to use timeuuid_native_type so comparisons are done correctly
// (we must emit partitions in the correct token order).
return dht::decorate_key(*_s, partition_key::from_single_value(
*_s, data_value(timeuuid_native_type{gid.uuid()}).serialize_nonnull()));
}
clustering_key make_clustering_key(std::string_view disposition, raft::server_id id) {
return clustering_key::from_exploded(*_s, {
data_value(disposition).serialize_nonnull(),
data_value(id.uuid()).serialize_nonnull()
});
}
future<> emit_member_set(result_collector& result, std::string_view disposition,
const raft::config_member_set& set) {
// Must sort servers in clustering order (i.e. according to their IDs).
// This is how `config_member::operator<` works so no need for custom comparator.
std::vector<raft::config_member> members{set.begin(), set.end()};
std::sort(members.begin(), members.end());
for (auto& member: members) {
clustering_row cr{make_clustering_key(disposition, member.addr.id)};
set_cell(cr.cells(), "can_vote", member.can_vote);
co_await result.emit_row(std::move(cr));
}
}
};
/// Base class for virtual tables which are supposed to serve data from group0 leader.
/// If queried on a different node, read will be redirected to the leader.
/// Implementations need to override execute_on_leader(), which will be executed
/// on shard0 of current group0 leader.
///
/// Current implementation is suitable for tables which are relatively small as all
/// data is materialized in memory and queried in one page.
class group0_virtual_table : public memtable_filling_virtual_table {
private:
sharded<service::raft_group_registry>& _raft_gr;
sharded<netw::messaging_service>& _ms;
public:
group0_virtual_table(schema_ptr s,
sharded<service::raft_group_registry>& raft_gr,
sharded<netw::messaging_service>& ms)
: memtable_filling_virtual_table(s)
, _raft_gr(raft_gr)
, _ms(ms) {
}
future<raft::server_id> get_leader(const reader_permit& permit) const {
while (!_raft_gr.local().group0().current_leader()) {
abort_on_expiry aoe(permit.timeout());
reader_permit::awaits_guard ag(permit);
co_await _raft_gr.local().group0().read_barrier(&aoe.abort_source());
}
co_return _raft_gr.local().group0().current_leader();
}
future<bool> is_leader(const reader_permit& permit) const {
auto leader = co_await get_leader(permit);
co_return leader == _raft_gr.local().get_my_raft_id();
}
future<> redirect_to_leader(std::function<void(mutation)> mutation_sink, reader_permit permit) {
auto leader = co_await get_leader(permit);
auto cmd = query::read_command(_s->id(),
_s->version(),
partition_slice_builder(*_s).build(),
query::max_result_size(256*1024*1024), // Sanity limit
query::tombstone_limit::max,
query::row_limit(query::max_rows),
query::partition_limit(query::max_partitions));
reader_permit::awaits_guard ag(permit);
auto&& [result, hit_rate, opt_exception] = co_await ser::storage_proxy_rpc_verbs::send_read_mutation_data(&_ms.local(),
locator::host_id(leader.uuid()),
permit.timeout(),
cmd,
query::full_partition_range,
{});
if (opt_exception.has_value() && *opt_exception) {
co_await coroutine::return_exception_ptr((*opt_exception).into_exception_ptr());
}
for (auto&& p : result.partitions()) {
mutation_sink(p.mut().unfreeze(_s));
co_await coroutine::maybe_yield();
}
co_return;
}
// This function is executed on the node which was a group0 leader at some point since the query started.
// The node may not be the leader anymore by the time it is executed.
// Only executed on shard 0.
virtual future<> execute_on_leader(std::function<void(mutation)> mutation_sink, reader_permit) = 0;
future<> execute(std::function<void(mutation)> mutation_sink, reader_permit permit) override {
if (this_shard_id() != 0) {
co_return;
}
if (co_await is_leader(permit)) {
co_await execute_on_leader(std::move(mutation_sink), std::move(permit));
co_return;
}
co_await redirect_to_leader(std::move(mutation_sink), std::move(permit));
}
};
class load_per_node : public group0_virtual_table {
private:
sharded<service::tablet_allocator>& _talloc;
sharded<replica::database>& _db;
sharded<gms::gossiper>& _gossiper;
public:
load_per_node(sharded<service::tablet_allocator>& talloc,
sharded<replica::database>& db,
sharded<service::raft_group_registry>& raft_gr,
sharded<netw::messaging_service>& ms,
sharded<gms::gossiper>& gossiper)
: group0_virtual_table(build_schema(), raft_gr, ms)
, _talloc(talloc)
, _db(db)
, _gossiper(gossiper)
{ }
future<> execute_on_leader(std::function<void(mutation)> mutation_sink, reader_permit permit) override {
auto stats = _talloc.local().get_load_stats();
while (!stats) {
// Wait for stats to be refreshed by topology coordinator
{
abort_on_expiry aoe(permit.timeout());
reader_permit::awaits_guard ag(permit);
co_await seastar::sleep_abortable(std::chrono::milliseconds(200), aoe.abort_source());
}
if (!co_await is_leader(permit)) {
co_await redirect_to_leader(std::move(mutation_sink), std::move(permit));
co_return;
}
stats = _talloc.local().get_load_stats();
}
auto tm = _db.local().get_token_metadata_ptr();
const uint64_t default_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
locator::load_sketch load(tm, stats, default_tablet_size);
co_await load.populate();
tm->get_topology().for_each_node([&] (const auto& node) {
auto host = node.host_id();
mutation m(schema(), make_partition_key(host));
auto& r = m.partition().clustered_row(*schema(), clustering_key::make_empty());
set_cell(r.cells(), "dc", node.dc());
set_cell(r.cells(), "rack", node.rack());
set_cell(r.cells(), "up", _gossiper.local().is_alive(host));
set_cell(r.cells(), "draining", node.is_draining());
set_cell(r.cells(), "excluded", node.is_excluded());
if (auto ip = _gossiper.local().get_address_map().find(host)) {
set_cell(r.cells(), "ip", data_value(inet_address(*ip)));
}
set_cell(r.cells(), "tablets_allocated", int64_t(load.get_tablet_count(host)));
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_tablet_count(host))));
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_tablet_count(host) * default_tablet_size)));
if (stats && stats->capacity.contains(host)) {
auto capacity = stats->capacity.at(host);
set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity)));
if (auto ts_iter = stats->tablet_stats.find(host); ts_iter != stats->tablet_stats.end()) {
set_cell(r.cells(), "effective_capacity", data_value(int64_t(ts_iter->second.effective_capacity)));
}
if (auto utilization = load.get_allocated_utilization(host)) {
set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization)));
}
if (load.has_complete_data(host)) {
if (auto utilization = load.get_storage_utilization(host)) {
set_cell(r.cells(), "storage_utilization", data_value(double(*utilization)));
}
set_cell(r.cells(), "storage_load", data_value(int64_t(load.get_disk_used(host))));
}
}
mutation_sink(m);
});
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "load_per_node");
return schema_builder(system_keyspace::NAME, "load_per_node", std::make_optional(id))
.with_column("node", uuid_type, column_kind::partition_key)
.with_column("dc", utf8_type)
.with_column("rack", utf8_type)
.with_column("ip", inet_addr_type)
.with_column("up", boolean_type)
.with_column("draining", boolean_type)
.with_column("excluded", boolean_type)
.with_column("tablets_allocated", long_type)
.with_column("tablets_allocated_per_shard", double_type)
.with_column("storage_capacity", long_type)
.with_column("effective_capacity", long_type)
.with_column("storage_allocated_load", long_type)
.with_column("storage_allocated_utilization", double_type)
.with_column("storage_load", long_type)
.with_column("storage_utilization", double_type)
.with_sharder(1, 0) // shard0-only
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(host_id host) {
return dht::decorate_key(*_s, partition_key::from_single_value(
*_s, data_value(host.uuid()).serialize_nonnull()));
}
};
class tablet_sizes : public group0_virtual_table {
private:
sharded<service::tablet_allocator>& _talloc;
sharded<replica::database>& _db;
public:
tablet_sizes(sharded<service::tablet_allocator>& talloc,
sharded<replica::database>& db,
sharded<service::raft_group_registry>& raft_gr,
sharded<netw::messaging_service>& ms)
: group0_virtual_table(build_schema(), raft_gr, ms)
, _talloc(talloc)
, _db(db)
{ }
future<> execute_on_leader(std::function<void(mutation)> mutation_sink, reader_permit permit) override {
auto stats = _talloc.local().get_load_stats();
while (!stats) {
// Wait for stats to be refreshed by topology coordinator
{
abort_on_expiry aoe(permit.timeout());
reader_permit::awaits_guard ag(permit);
co_await seastar::sleep_abortable(std::chrono::milliseconds(200), aoe.abort_source());
}
if (!co_await is_leader(permit)) {
co_await redirect_to_leader(std::move(mutation_sink), std::move(permit));
co_return;
}
stats = _talloc.local().get_load_stats();
}
auto tm = _db.local().get_token_metadata_ptr();
auto prepare_replica_sizes = [] (const std::unordered_map<host_id, uint64_t>& replica_sizes) {
map_type_impl::native_type tmp;
for (auto& r: replica_sizes) {
auto replica = r.first.uuid();
int64_t tablet_size = int64_t(r.second);
auto map_element = std::make_pair<data_value, data_value>(data_value(replica), data_value(tablet_size));
tmp.push_back(std::move(map_element));
}
return tmp;
};
auto prepare_missing_replica = [] (const std::unordered_set<host_id>& missing_replicas) {
set_type_impl::native_type tmp;
for (auto& r: missing_replicas) {
tmp.push_back(data_value(r.uuid()));
}
return tmp;
};
auto map_type = map_type_impl::get_instance(uuid_type, long_type, false);
auto set_type = set_type_impl::get_instance(uuid_type, false);
for (auto&& [table, tmap] : tm->tablets().all_tables_ungrouped()) {
mutation m(schema(), make_partition_key(table));
co_await tmap->for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) -> future<> {
auto trange = tmap->get_token_range(tid);
int64_t last_token = trange.end()->value().raw();
auto& r = m.partition().clustered_row(*schema(), clustering_key::from_single_value(*schema(), data_value(last_token).serialize_nonnull()));
const range_based_tablet_id rb_tid {table, trange};
std::unordered_map<host_id, uint64_t> replica_sizes;
std::unordered_set<host_id> missing_replicas;
for (auto& replica : tinfo.replicas) {
auto tablet_size_opt = stats->get_tablet_size(replica.host, rb_tid);
if (tablet_size_opt) {
replica_sizes[replica.host] = *tablet_size_opt;
} else {
missing_replicas.insert(replica.host);
}
}
set_cell(r.cells(), "replicas", make_map_value(map_type, prepare_replica_sizes(replica_sizes)));
set_cell(r.cells(), "missing_replicas", make_set_value(set_type, prepare_missing_replica(missing_replicas)));
return make_ready_future<>();
});
mutation_sink(m);
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "tablet_sizes");
return schema_builder(system_keyspace::NAME, "tablet_sizes", std::make_optional(id))
.with_column("table_id", uuid_type, column_kind::partition_key)
.with_column("last_token", long_type, column_kind::clustering_key)
.with_column("replicas", map_type_impl::get_instance(uuid_type, long_type, false))
.with_column("missing_replicas", set_type_impl::get_instance(uuid_type, false))
.with_sharder(1, 0) // shard0-only
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(table_id table) {
return dht::decorate_key(*_s, partition_key::from_single_value(
*_s, data_value(table.uuid()).serialize_nonnull()));
}
};
class cdc_timestamps_table : public streaming_virtual_table {
private:
replica::database& _db;
service::storage_service& _ss;
public:
cdc_timestamps_table(replica::database& db, service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _db(db)
, _ss(ss)
{
_shard_aware = true;
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_table_name {
table_id table;
dht::decorated_key key;
};
std::vector<decorated_table_name> tables;
for (auto table : _ss.get_tables_with_cdc_tablet_streams()) {
auto& cf = _db.find_column_family(table);
auto base_schema = cdc::get_base_table(_db, *cf.schema());
auto dk = make_partition_key(base_schema->ks_name(), base_schema->cf_name());
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) {
continue;
}
tables.emplace_back(decorated_table_name{table, std::move(dk)});
}
std::ranges::sort(tables, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_table_name::key));
for (const auto& [table, dk] : tables) {
co_await result.emit_partition_start(dk);
auto created_at = api::new_timestamp();
co_await _ss.query_cdc_timestamps(table, false, [&] (db_clock::time_point ts) -> future<> {
clustering_row cr(make_clustering_key(ts));
cr.apply(row_marker(created_at));
return result.emit_row(std::move(cr));
});
co_await result.emit_partition_end();
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "cdc_timestamps");
return schema_builder(system_keyspace::NAME, "cdc_timestamps", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::partition_key)
.with_column("timestamp", reversed_type_impl::get_instance(timestamp_type), column_kind::clustering_key)
.set_comment("CDC generations for tablets-based tables")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const std::string& ks_name, const std::string& cf_name) {
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
data_value(ks_name).serialize_nonnull(),
data_value(cf_name).serialize_nonnull()
}));
}
clustering_key make_clustering_key(db_clock::time_point ts) {
return clustering_key::from_single_value(*_s,
timestamp_type->decompose(ts)
);
}
};
class cdc_streams_table : public streaming_virtual_table {
private:
replica::database& _db;
service::storage_service& _ss;
public:
cdc_streams_table(replica::database& db, service::storage_service& ss)
: streaming_virtual_table(build_schema())
, _db(db)
, _ss(ss)
{
_shard_aware = true;
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_table_name {
table_id table;
dht::decorated_key key;
};
std::vector<decorated_table_name> tables;
for (auto table : _ss.get_tables_with_cdc_tablet_streams()) {
auto& cf = _db.find_column_family(table);
auto base_schema = cdc::get_base_table(_db, *cf.schema());
auto dk = make_partition_key(base_schema->ks_name(), base_schema->cf_name());
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) {
continue;
}
tables.emplace_back(decorated_table_name{table, std::move(dk)});
}
std::ranges::sort(tables, dht::ring_position_less_comparator(*_s), std::mem_fn(&decorated_table_name::key));
for (const auto& [table, dk] : tables) {
co_await result.emit_partition_start(dk);
auto created_at = api::new_timestamp();
auto emit_stream_set = [&] (db_clock::time_point ts, cdc::stream_state kind, const auto& sids) -> future<> {
for (const auto& sid : sids) {
clustering_row cr(make_clustering_key(ts, kind, sid));
cr.apply(row_marker(created_at));
co_await result.emit_row(std::move(cr));
}
};
// emit in clustering order
static_assert(int(cdc::stream_state::current) < int(cdc::stream_state::closed));
static_assert(int(cdc::stream_state::closed) < int(cdc::stream_state::opened));
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
co_await emit_stream_set(ts, cdc::stream_state::current, current);
co_await emit_stream_set(ts, cdc::stream_state::closed, diff.closed_streams);
co_await emit_stream_set(ts, cdc::stream_state::opened, diff.opened_streams);
});
co_await result.emit_partition_end();
}
}
private:
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "cdc_streams");
return schema_builder(system_keyspace::NAME, "cdc_streams", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::partition_key)
.with_column("timestamp", timestamp_type, column_kind::clustering_key)
.with_column("stream_state", byte_type, column_kind::clustering_key)
.with_column("stream_id", bytes_type, column_kind::clustering_key)
.set_comment("CDC streams for each generation for tablets-based tables")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const std::string& ks_name, const std::string& cf_name) {
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
data_value(ks_name).serialize_nonnull(),
data_value(cf_name).serialize_nonnull()
}));
}
clustering_key make_clustering_key(db_clock::time_point ts, cdc::stream_state kind, const cdc::stream_id& sid) {
return clustering_key::from_exploded(*_s, {
timestamp_type->decompose(ts),
byte_type->decompose(std::to_underlying(kind)),
sid.to_bytes()
});
}
};
}
future<> initialize_virtual_tables(
sharded<replica::database>& dist_db, sharded<service::storage_service>& dist_ss,
sharded<gms::gossiper>& dist_gossiper, sharded<service::raft_group_registry>& dist_raft_gr,
sharded<db::system_keyspace>& sys_ks,
sharded<service::tablet_allocator>& tablet_allocator,
sharded<netw::messaging_service>& ms,
db::config& cfg) {
auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry();
auto& virtual_tables = *virtual_tables_registry;
auto& db = dist_db.local();
auto& ss = dist_ss.local();
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
auto schema = tbl->schema();
virtual_tables[schema->id()] = std::move(tbl);
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
auto& cf = db.find_column_family(schema);
cf.mark_ready_for_writes(nullptr);
auto& vt = virtual_tables[schema->id()];
cf.set_virtual_reader(vt->as_mutation_source());
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
};
// Add built-in virtual tables here.
co_await add_table(std::make_unique<cluster_status_table>(dist_ss, dist_gossiper));
co_await add_table(std::make_unique<token_ring_table>(db, ss));
co_await add_table(std::make_unique<snapshots_table>(dist_db));
co_await add_table(std::make_unique<protocol_servers_table>(ss));
co_await add_table(std::make_unique<runtime_info_table>(dist_db, ss));
co_await add_table(std::make_unique<versions_table>());
co_await add_table(std::make_unique<db_config_table>(cfg));
co_await add_table(std::make_unique<clients_table>(ss));
co_await add_table(std::make_unique<raft_state_table>(dist_raft_gr));
co_await add_table(std::make_unique<load_per_node>(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper));
co_await add_table(std::make_unique<tablet_sizes>(tablet_allocator, dist_db, dist_raft_gr, ms));
co_await add_table(std::make_unique<cdc_timestamps_table>(db, ss));
co_await add_table(std::make_unique<cdc_streams_table>(db, ss));
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
}
virtual_tables_registry::virtual_tables_registry() : unique_ptr(std::make_unique<virtual_tables_registry_impl>()) {
}
virtual_tables_registry::~virtual_tables_registry() = default;
} // namespace db