logstor: logstor stats api

add api to get logstor statistics about segments for a table
This commit is contained in:
Michael Litvak
2026-02-25 08:31:37 +01:00
parent 8bd3bd7e2a
commit b7bdb1010a
8 changed files with 233 additions and 0 deletions

View File

@@ -3268,6 +3268,38 @@
}
]
},
{
"path":"/storage_service/logstor_info",
"operations":[
{
"method":"GET",
"summary":"Logstor segment information for one table",
"type":"table_logstor_info",
"nickname":"logstor_info",
"produces":[
"application/json"
],
"parameters":[
{
"name":"keyspace",
"description":"The keyspace",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"table",
"description":"table name",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/retrain_dict",
"operations":[
@@ -3676,6 +3708,47 @@
}
}
},
"logstor_hist_bucket":{
"id":"logstor_hist_bucket",
"properties":{
"bucket":{
"type":"long"
},
"count":{
"type":"long"
},
"min_data_size":{
"type":"long"
},
"max_data_size":{
"type":"long"
}
}
},
"table_logstor_info":{
"id":"table_logstor_info",
"description":"Per-table logstor segment distribution",
"properties":{
"keyspace":{
"type":"string"
},
"table":{
"type":"string"
},
"compaction_groups":{
"type":"long"
},
"segments":{
"type":"long"
},
"data_size_histogram":{
"type":"array",
"items":{
"$ref":"logstor_hist_bucket"
}
}
}
},
"tablet_repair_result":{
"id":"tablet_repair_result",
"description":"Tablet repair result",

View File

@@ -1575,6 +1575,76 @@ rest_sstable_info(http_context& ctx, std::unique_ptr<http::request> req) {
});
}
static
future<json::json_return_type>
rest_logstor_info(http_context& ctx, std::unique_ptr<http::request> req) {
auto keyspace = api::req_param<sstring>(*req, "keyspace", {}).value;
auto table = api::req_param<sstring>(*req, "table", {}).value;
if (table.empty()) {
table = api::req_param<sstring>(*req, "cf", {}).value;
}
if (keyspace.empty()) {
throw bad_param_exception("The query parameter 'keyspace' is required");
}
if (table.empty()) {
throw bad_param_exception("The query parameter 'table' is required");
}
keyspace = validate_keyspace(ctx, keyspace);
auto tid = validate_table(ctx.db.local(), keyspace, table);
auto& cf = ctx.db.local().find_column_family(tid);
if (!cf.uses_kv_storage()) {
throw bad_param_exception(fmt::format("Table {}.{} does not use logstor", keyspace, table));
}
return do_with(replica::logstor::table_segment_stats{}, [keyspace = std::move(keyspace), table = std::move(table), tid, &ctx] (replica::logstor::table_segment_stats& merged_stats) {
return ctx.db.map_reduce([&merged_stats](replica::logstor::table_segment_stats&& shard_stats) {
merged_stats.compaction_group_count += shard_stats.compaction_group_count;
merged_stats.segment_count += shard_stats.segment_count;
for (auto& bucket : shard_stats.histogram) {
auto bucket_it = std::find_if(merged_stats.histogram.begin(), merged_stats.histogram.end(), [&bucket] (const replica::logstor::table_segment_histogram_bucket& existing) {
return existing.bucket == bucket.bucket;
});
if (bucket_it == merged_stats.histogram.end()) {
merged_stats.histogram.push_back(std::move(bucket));
continue;
}
bucket_it->count += bucket.count;
bucket_it->min_data_size = std::min(bucket_it->min_data_size, bucket.min_data_size);
bucket_it->max_data_size = std::max(bucket_it->max_data_size, bucket.max_data_size);
}
}, [tid](const replica::database& db) {
return db.get_logstor_table_segment_stats(tid);
}).then([&merged_stats, keyspace = std::move(keyspace), table = std::move(table)] {
std::ranges::sort(merged_stats.histogram, [] (const replica::logstor::table_segment_histogram_bucket& left, const replica::logstor::table_segment_histogram_bucket& right) {
return left.bucket < right.bucket;
});
ss::table_logstor_info result;
result.keyspace = keyspace;
result.table = table;
result.compaction_groups = merged_stats.compaction_group_count;
result.segments = merged_stats.segment_count;
for (const auto& bucket : merged_stats.histogram) {
ss::logstor_hist_bucket hist;
hist.bucket = bucket.bucket;
hist.count = bucket.count;
hist.min_data_size = bucket.min_data_size;
hist.max_data_size = bucket.max_data_size;
result.data_size_histogram.push(std::move(hist));
}
return make_ready_future<json::json_return_type>(stream_object(result));
});
});
}
static
future<json::json_return_type>
rest_reload_raft_topology_state(sharded<service::storage_service>& ss, service::raft_group0_client& group0_client, std::unique_ptr<http::request> req) {
@@ -1872,6 +1942,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ss::retrain_dict.set(r, rest_bind(rest_retrain_dict, ctx, ss, group0_client));
ss::estimate_compression_ratios.set(r, rest_bind(rest_estimate_compression_ratios, ctx, ss));
ss::sstable_info.set(r, rest_bind(rest_sstable_info, ctx));
ss::logstor_info.set(r, rest_bind(rest_logstor_info, ctx));
ss::reload_raft_topology_state.set(r, rest_bind(rest_reload_raft_topology_state, ss, group0_client));
ss::upgrade_to_raft_topology.set(r, rest_bind(rest_upgrade_to_raft_topology, ss));
ss::raft_topology_upgrade_status.set(r, rest_bind(rest_raft_topology_upgrade_status, ss));
@@ -1951,6 +2022,7 @@ void unset_storage_service(http_context& ctx, routes& r) {
ss::get_ownership.unset(r);
ss::get_effective_ownership.unset(r);
ss::sstable_info.unset(r);
ss::logstor_info.unset(r);
ss::reload_raft_topology_state.unset(r);
ss::upgrade_to_raft_topology.unset(r);
ss::raft_topology_upgrade_status.unset(r);

View File

@@ -2880,6 +2880,13 @@ future<> database::trigger_logstor_barrier() {
return make_ready_future<>();
}
future<logstor::table_segment_stats> database::get_logstor_table_segment_stats(table_id table) const {
if (!_logstor) {
return make_ready_future<logstor::table_segment_stats>(logstor::table_segment_stats{});
}
return _logstor->get_table_segment_stats(table);
}
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, db::snapshot_options opts) {
if (!opts.skip_flush) {
co_await flush_table_on_all_shards(sharded_db, uuid);

View File

@@ -2023,6 +2023,7 @@ public:
future<> trigger_logstor_compaction(bool major);
static future<> trigger_logstor_barrier_on_all_shards(sharded<database>& sharded_db);
future<> trigger_logstor_barrier();
future<logstor::table_segment_stats> get_logstor_table_segment_stats(table_id table) const;
static future<db_clock::time_point> get_all_tables_flushed_at(sharded<database>& sharded_db);

View File

@@ -76,6 +76,10 @@ future<> logstor::truncate_table(table_id tid) {
return _segment_manager.truncate_table(tid);
}
future<table_segment_stats> logstor::get_table_segment_stats(table_id tid) const {
return _segment_manager.get_table_segment_stats(tid);
}
future<> logstor::write(const mutation& m, group_id group) {
auto key = calculate_key(*m.schema(), m.decorated_key());

View File

@@ -60,6 +60,8 @@ public:
future<> truncate_table(table_id);
future<table_segment_stats> get_table_segment_stats(table_id) const;
static index_key calculate_key(const schema&, const dht::decorated_key&);
future<> write(const mutation&, group_id);

View File

@@ -683,6 +683,8 @@ public:
const stats& get_stats() const noexcept { return _stats; }
future<table_segment_stats> get_table_segment_stats(table_id table) const;
// compaction group must be set.
void add_segment(segment_descriptor& desc) {
_compaction_groups[*desc.gid].segment_hist.push(desc);
@@ -952,6 +954,10 @@ public:
future<> truncate_table(table_id table);
future<table_segment_stats> get_table_segment_stats(table_id table) const {
return _compaction_mgr.get_table_segment_stats(table);
}
private:
struct segment_allocation_guard {
@@ -2085,6 +2091,55 @@ future<> compaction_manager::abort_separator(separator& sep) {
}
}
future<table_segment_stats> compaction_manager::get_table_segment_stats(table_id table) const {
table_segment_stats result;
std::map<size_t, table_segment_histogram_bucket> histogram_by_bucket;
auto it = _compaction_groups.lower_bound(group_id{table, 0});
while (it != _compaction_groups.end() && it->first.table == table) {
++result.compaction_group_count;
const auto& buckets = it->second.segment_hist.buckets();
for (const auto& bucket : buckets) {
co_await coroutine::maybe_yield();
if (bucket.empty()) {
continue;
}
for (const auto& desc : bucket) {
co_await coroutine::maybe_yield();
auto data_size = desc.net_data_size(_sm._cfg.segment_size);
auto bucket = segment_descriptor_hist_options.bucket_of(data_size);
auto [hist_it, inserted] = histogram_by_bucket.try_emplace(bucket, table_segment_histogram_bucket{
.bucket = bucket,
.count = 0,
.min_data_size = data_size,
.max_data_size = data_size,
});
auto& hist = hist_it->second;
++hist.count;
hist.min_data_size = std::min(hist.min_data_size, data_size);
hist.max_data_size = std::max(hist.max_data_size, data_size);
++result.segment_count;
}
}
++it;
}
result.histogram.reserve(histogram_by_bucket.size());
for (const auto& [_, bucket] : histogram_by_bucket) {
co_await coroutine::maybe_yield();
result.histogram.push_back(bucket);
}
co_return std::move(result);
}
std::chrono::microseconds segment_manager_impl::calculate_separator_delay() const {
size_t min_debt = _separator_flush_threshold * _cfg.segment_size;
size_t debt_target = separator_debt_target * _separator_flush_threshold * _cfg.segment_size;
@@ -2343,6 +2398,10 @@ future<> segment_manager::truncate_table(table_id table) {
return _impl->truncate_table(table);
}
future<table_segment_stats> segment_manager::get_table_segment_stats(table_id table) const {
return _impl->get_table_segment_stats(table);
}
}
template<>

View File

@@ -40,6 +40,19 @@ struct segment_manager_config {
size_t max_separator_memory = 1 * 1024 * 1024;
};
struct table_segment_histogram_bucket {
size_t bucket;
size_t count;
size_t min_data_size;
size_t max_data_size;
};
struct table_segment_stats {
size_t compaction_group_count{0};
size_t segment_count{0};
std::vector<table_segment_histogram_bucket> histogram;
};
class segment_manager_impl;
class log_index;
@@ -85,6 +98,8 @@ public:
future<> truncate_table(table_id);
future<table_segment_stats> get_table_segment_stats(table_id) const;
friend class segment_manager_impl;
};