db/system_keyspace: add snapshots virtual table

Lists the equivalent of the `nodetool listsnapshots` command.
This commit is contained in:
Botond Dénes
2021-10-21 13:16:27 +03:00
parent f0281eaa98
commit 64f658aea4
2 changed files with 132 additions and 0 deletions

View File

@@ -1946,6 +1946,118 @@ public:
}
};
class snapshots_table : public streaming_virtual_table {
distributed<database>& _db;
public:
explicit snapshots_table(distributed<database>& db)
: streaming_virtual_table(build_schema())
, _db(db)
{
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "snapshots");
return schema_builder(system_keyspace::NAME, "snapshots", std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::clustering_key)
.with_column("snapshot_name", utf8_type, column_kind::clustering_key)
.with_column("live", long_type)
.with_column("total", long_type)
.set_comment("Lists all the snapshots along with their size, dropped tables are not part of the listing.")
.with_version(system_keyspace::generate_schema_version(id))
.build();
}
dht::decorated_key make_partition_key(const sstring& name) {
return dht::decorate_key(*_s, partition_key::from_single_value(*_s, data_value(name).serialize_nonnull()));
}
clustering_key make_clustering_key(sstring table_name, sstring snapshot_name) {
return clustering_key::from_exploded(*_s, {
data_value(std::move(table_name)).serialize_nonnull(),
data_value(std::move(snapshot_name)).serialize_nonnull()
});
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct decorated_keyspace_name {
sstring name;
dht::decorated_key key;
};
std::vector<decorated_keyspace_name> keyspace_names;
for (const auto& [name, _] : _db.local().get_keyspaces()) {
auto dk = make_partition_key(name);
if (!this_shard_owns(dk) || !contains_key(qr.partition_range(), dk)) {
continue;
}
keyspace_names.push_back({std::move(name), std::move(dk)});
}
boost::sort(keyspace_names, [less = dht::ring_position_less_comparator(*_s)]
(const decorated_keyspace_name& l, const decorated_keyspace_name& r) {
return less(l.key, r.key);
});
using snapshots_by_tables_map = std::map<sstring, std::map<sstring, table::snapshot_details>>;
class snapshot_reducer {
private:
snapshots_by_tables_map _result;
public:
future<> operator()(const snapshots_by_tables_map& value) {
for (auto& [table_name, snapshots] : value) {
if (auto [_, added] = _result.try_emplace(table_name, std::move(snapshots)); added) {
continue;
}
auto& rp = _result.at(table_name);
for (auto&& [snapshot_name, snapshot_detail]: snapshots) {
if (auto [_, added] = rp.try_emplace(snapshot_name, std::move(snapshot_detail)); added) {
continue;
}
auto& detail = rp.at(snapshot_name);
detail.live += snapshot_detail.live;
detail.total += snapshot_detail.total;
}
}
return make_ready_future<>();
}
snapshots_by_tables_map get() && {
return std::move(_result);
}
};
for (auto& ks_data : keyspace_names) {
co_await result.emit_partition_start(ks_data.key);
const auto snapshots_by_tables = co_await _db.map_reduce(snapshot_reducer(), [ks_name = ks_data.name] (database& db) -> future<snapshots_by_tables_map> {
snapshots_by_tables_map snapshots_by_tables;
for (auto& [_, table] : db.get_column_families()) {
if (table->schema()->ks_name() != ks_name) {
continue;
}
const auto unordered_snapshots = co_await table->get_snapshot_details();
snapshots_by_tables.emplace(table->schema()->cf_name(), std::map<sstring, table::snapshot_details>(unordered_snapshots.begin(), unordered_snapshots.end()));
}
co_return snapshots_by_tables;
});
for (const auto& [table_name, snapshots] : snapshots_by_tables) {
for (auto& [snapshot_name, details] : snapshots) {
clustering_row cr(make_clustering_key(table_name, snapshot_name));
set_cell(cr.cells(), "live", details.live);
set_cell(cr.cells(), "total", details.total);
co_await result.emit_row(std::move(cr));
}
}
co_await result.emit_partition_end();
}
}
};
// Map from table's schema ID to table itself. Helps avoiding accidental duplication.
static thread_local std::map<utils::UUID, std::unique_ptr<virtual_table>> virtual_tables;
@@ -1960,6 +2072,7 @@ void register_virtual_tables(distributed<database>& dist_db, distributed<service
// Add built-in virtual tables here.
add_table(std::make_unique<cluster_status_table>(ss));
add_table(std::make_unique<token_ring_table>(db, ss));
add_table(std::make_unique<snapshots_table>(dist_db));
}
std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {

View File

@@ -194,6 +194,25 @@ CREATE TABLE system.size_estimates (
Implemented by `size_estimates_mutation_reader` in `db/size_estimates_virtual_reader.{hh,cc}`.
## system.snapshots
The list of snapshots on the node.
Equivalent to the `nodetool listsnapshots` command.
Schema:
```CQL
CREATE TABLE system.snapshots (
keyspace_name text,
table_name text,
snapshot_name text,
live bigint,
total bigint,
PRIMARY KEY (keyspace_name, table_name, snapshot_name)
)
```
Implemented by `snapshots_table` in `db/system_keyspace.cc`.
## system.token_ring
The ring description for each keyspace.