diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 51d6bb0e7a..b2ab5da8bb 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -3268,6 +3268,38 @@ } ] }, + { + "path":"/storage_service/logstor_info", + "operations":[ + { + "method":"GET", + "summary":"Logstor segment information for one table", + "type":"table_logstor_info", + "nickname":"logstor_info", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"keyspace", + "description":"The keyspace", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"table", + "description":"table name", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + } + ] + } + ] + }, { "path":"/storage_service/retrain_dict", "operations":[ @@ -3676,6 +3708,47 @@ } } }, + "logstor_hist_bucket":{ + "id":"logstor_hist_bucket", + "properties":{ + "bucket":{ + "type":"long" + }, + "count":{ + "type":"long" + }, + "min_data_size":{ + "type":"long" + }, + "max_data_size":{ + "type":"long" + } + } + }, + "table_logstor_info":{ + "id":"table_logstor_info", + "description":"Per-table logstor segment distribution", + "properties":{ + "keyspace":{ + "type":"string" + }, + "table":{ + "type":"string" + }, + "compaction_groups":{ + "type":"long" + }, + "segments":{ + "type":"long" + }, + "data_size_histogram":{ + "type":"array", + "items":{ + "$ref":"logstor_hist_bucket" + } + } + } + }, "tablet_repair_result":{ "id":"tablet_repair_result", "description":"Tablet repair result", diff --git a/api/storage_service.cc b/api/storage_service.cc index 0bd0bd9a92..a7cac68dee 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1575,6 +1575,76 @@ rest_sstable_info(http_context& ctx, std::unique_ptr req) { }); } +static +future +rest_logstor_info(http_context& ctx, std::unique_ptr req) { + auto keyspace = api::req_param(*req, "keyspace", {}).value; + auto table = api::req_param(*req, "table", {}).value; + if (table.empty()) { + table = api::req_param(*req, "cf", {}).value; + } + + if (keyspace.empty()) { + throw bad_param_exception("The query parameter 'keyspace' is required"); + } + if (table.empty()) { + throw bad_param_exception("The query parameter 'table' is required"); + } + + keyspace = validate_keyspace(ctx, keyspace); + auto tid = validate_table(ctx.db.local(), keyspace, table); + + auto& cf = ctx.db.local().find_column_family(tid); + if (!cf.uses_kv_storage()) { + throw bad_param_exception(fmt::format("Table {}.{} does not use logstor", keyspace, table)); + } + + return do_with(replica::logstor::table_segment_stats{}, [keyspace = std::move(keyspace), table = std::move(table), tid, &ctx] (replica::logstor::table_segment_stats& merged_stats) { + return ctx.db.map_reduce([&merged_stats](replica::logstor::table_segment_stats&& shard_stats) { + merged_stats.compaction_group_count += shard_stats.compaction_group_count; + merged_stats.segment_count += shard_stats.segment_count; + + for (auto& bucket : shard_stats.histogram) { + auto bucket_it = std::find_if(merged_stats.histogram.begin(), merged_stats.histogram.end(), [&bucket] (const replica::logstor::table_segment_histogram_bucket& existing) { + return existing.bucket == bucket.bucket; + }); + + if (bucket_it == merged_stats.histogram.end()) { + merged_stats.histogram.push_back(std::move(bucket)); + continue; + } + + bucket_it->count += bucket.count; + bucket_it->min_data_size = std::min(bucket_it->min_data_size, bucket.min_data_size); + bucket_it->max_data_size = std::max(bucket_it->max_data_size, bucket.max_data_size); + } + }, [tid](const replica::database& db) { + return db.get_logstor_table_segment_stats(tid); + }).then([&merged_stats, keyspace = std::move(keyspace), table = std::move(table)] { + std::ranges::sort(merged_stats.histogram, [] (const replica::logstor::table_segment_histogram_bucket& left, const replica::logstor::table_segment_histogram_bucket& right) { + return left.bucket < right.bucket; + }); + + ss::table_logstor_info result; + result.keyspace = keyspace; + result.table = table; + result.compaction_groups = merged_stats.compaction_group_count; + result.segments = merged_stats.segment_count; + + for (const auto& bucket : merged_stats.histogram) { + ss::logstor_hist_bucket hist; + hist.bucket = bucket.bucket; + hist.count = bucket.count; + hist.min_data_size = bucket.min_data_size; + hist.max_data_size = bucket.max_data_size; + result.data_size_histogram.push(std::move(hist)); + } + + return make_ready_future(stream_object(result)); + }); + }); +} + static future rest_reload_raft_topology_state(sharded& ss, service::raft_group0_client& group0_client, std::unique_ptr req) { @@ -1872,6 +1942,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded database::trigger_logstor_barrier() { return make_ready_future<>(); } +future database::get_logstor_table_segment_stats(table_id table) const { + if (!_logstor) { + return make_ready_future(logstor::table_segment_stats{}); + } + return _logstor->get_table_segment_stats(table); +} + future<> database::snapshot_table_on_all_shards(sharded& sharded_db, table_id uuid, sstring tag, db::snapshot_options opts) { if (!opts.skip_flush) { co_await flush_table_on_all_shards(sharded_db, uuid); diff --git a/replica/database.hh b/replica/database.hh index aab17f3aa2..5e13b96cef 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -2023,6 +2023,7 @@ public: future<> trigger_logstor_compaction(bool major); static future<> trigger_logstor_barrier_on_all_shards(sharded& sharded_db); future<> trigger_logstor_barrier(); + future get_logstor_table_segment_stats(table_id table) const; static future get_all_tables_flushed_at(sharded& sharded_db); diff --git a/replica/logstor/logstor.cc b/replica/logstor/logstor.cc index 5c6744805b..5e2c4def9a 100644 --- a/replica/logstor/logstor.cc +++ b/replica/logstor/logstor.cc @@ -76,6 +76,10 @@ future<> logstor::truncate_table(table_id tid) { return _segment_manager.truncate_table(tid); } +future logstor::get_table_segment_stats(table_id tid) const { + return _segment_manager.get_table_segment_stats(tid); +} + future<> logstor::write(const mutation& m, group_id group) { auto key = calculate_key(*m.schema(), m.decorated_key()); diff --git a/replica/logstor/logstor.hh b/replica/logstor/logstor.hh index 8d9017c9b1..3605d94c2b 100644 --- a/replica/logstor/logstor.hh +++ b/replica/logstor/logstor.hh @@ -60,6 +60,8 @@ public: future<> truncate_table(table_id); + future get_table_segment_stats(table_id) const; + static index_key calculate_key(const schema&, const dht::decorated_key&); future<> write(const mutation&, group_id); diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index f60652ec40..d01ce9b1bb 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -683,6 +683,8 @@ public: const stats& get_stats() const noexcept { return _stats; } + future get_table_segment_stats(table_id table) const; + // compaction group must be set. void add_segment(segment_descriptor& desc) { _compaction_groups[*desc.gid].segment_hist.push(desc); @@ -952,6 +954,10 @@ public: future<> truncate_table(table_id table); + future get_table_segment_stats(table_id table) const { + return _compaction_mgr.get_table_segment_stats(table); + } + private: struct segment_allocation_guard { @@ -2085,6 +2091,55 @@ future<> compaction_manager::abort_separator(separator& sep) { } } +future compaction_manager::get_table_segment_stats(table_id table) const { + table_segment_stats result; + std::map histogram_by_bucket; + + auto it = _compaction_groups.lower_bound(group_id{table, 0}); + while (it != _compaction_groups.end() && it->first.table == table) { + ++result.compaction_group_count; + + const auto& buckets = it->second.segment_hist.buckets(); + for (const auto& bucket : buckets) { + co_await coroutine::maybe_yield(); + + if (bucket.empty()) { + continue; + } + + for (const auto& desc : bucket) { + co_await coroutine::maybe_yield(); + + auto data_size = desc.net_data_size(_sm._cfg.segment_size); + auto bucket = segment_descriptor_hist_options.bucket_of(data_size); + + auto [hist_it, inserted] = histogram_by_bucket.try_emplace(bucket, table_segment_histogram_bucket{ + .bucket = bucket, + .count = 0, + .min_data_size = data_size, + .max_data_size = data_size, + }); + + auto& hist = hist_it->second; + ++hist.count; + hist.min_data_size = std::min(hist.min_data_size, data_size); + hist.max_data_size = std::max(hist.max_data_size, data_size); + ++result.segment_count; + } + } + + ++it; + } + + result.histogram.reserve(histogram_by_bucket.size()); + for (const auto& [_, bucket] : histogram_by_bucket) { + co_await coroutine::maybe_yield(); + result.histogram.push_back(bucket); + } + + co_return std::move(result); +} + std::chrono::microseconds segment_manager_impl::calculate_separator_delay() const { size_t min_debt = _separator_flush_threshold * _cfg.segment_size; size_t debt_target = separator_debt_target * _separator_flush_threshold * _cfg.segment_size; @@ -2343,6 +2398,10 @@ future<> segment_manager::truncate_table(table_id table) { return _impl->truncate_table(table); } +future segment_manager::get_table_segment_stats(table_id table) const { + return _impl->get_table_segment_stats(table); +} + } template<> diff --git a/replica/logstor/segment_manager.hh b/replica/logstor/segment_manager.hh index 6313e84579..f9b45fee99 100644 --- a/replica/logstor/segment_manager.hh +++ b/replica/logstor/segment_manager.hh @@ -40,6 +40,19 @@ struct segment_manager_config { size_t max_separator_memory = 1 * 1024 * 1024; }; +struct table_segment_histogram_bucket { + size_t bucket; + size_t count; + size_t min_data_size; + size_t max_data_size; +}; + +struct table_segment_stats { + size_t compaction_group_count{0}; + size_t segment_count{0}; + std::vector histogram; +}; + class segment_manager_impl; class log_index; @@ -85,6 +98,8 @@ public: future<> truncate_table(table_id); + future get_table_segment_stats(table_id) const; + friend class segment_manager_impl; };