diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh index 604264fa1a..78cae22440 100644 --- a/idl/storage_service.idl.hh +++ b/idl/storage_service.idl.hh @@ -24,14 +24,27 @@ struct table_load_stats final { int64_t split_ready_seq_number; }; +struct range_based_tablet_id final { + ::table_id table; + dht::token_range range; +}; + struct load_stats_v1 final { std::unordered_map<::table_id, locator::table_load_stats> tables; }; +struct tablet_load_stats final { + // Sum of all tablet sizes on a node and available disk space. + uint64_t effective_capacity; + + std::unordered_map tablet_sizes; +}; + struct load_stats { std::unordered_map<::table_id, locator::table_load_stats> tables; std::unordered_map capacity; std::unordered_map critical_disk_utilization [[version 2025.3]]; + std::unordered_map tablet_stats [[version 2026.1]]; }; } diff --git a/locator/tablets.cc b/locator/tablets.cc index fd2f8f3d3b..110f2cee8b 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -842,6 +842,15 @@ table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexce return *this; } +uint64_t tablet_load_stats::add_tablet_sizes(const tablet_load_stats& tls) { + uint64_t table_sizes_sum = 0; + for (auto& [rb_tid, tablet_size] : tls.tablet_sizes) { + tablet_sizes[rb_tid] = tablet_size; + table_sizes_sum += tablet_size; + } + return table_sizes_sum; +} + load_stats load_stats::from_v1(load_stats_v1&& stats) { return { .tables = std::move(stats.tables) }; } @@ -856,10 +865,24 @@ load_stats& load_stats::operator+=(const load_stats& s) { for (auto& [host, cdu] : s.critical_disk_utilization) { critical_disk_utilization[host] = cdu; } - + for (auto& [host, tablet_ls] : s.tablet_stats) { + tablet_stats[host].effective_capacity = tablet_ls.effective_capacity; + tablet_stats[host].add_tablet_sizes(tablet_ls); + } return *this; } +std::optional load_stats::get_tablet_size(host_id host, const range_based_tablet_id& rb_tid) const { + if (auto node_i = tablet_stats.find(host); node_i != tablet_stats.end()) { + const tablet_load_stats& tls = node_i->second; + if (auto ts_i = tls.tablet_sizes.find(rb_tid); ts_i != tls.tablet_sizes.end()) { + return ts_i->second; + } + } + tablet_logger.debug("Unable to find tablet size on host: {} for tablet: {}", host, rb_tid); + return std::nullopt; +} + tablet_range_splitter::tablet_range_splitter(schema_ptr schema, const tablet_map& tablets, host_id host, const dht::partition_range_vector& ranges) : _schema(std::move(schema)) , _ranges(ranges) diff --git a/locator/tablets.hh b/locator/tablets.hh index e70b82775b..d3f5e60b06 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -63,6 +63,13 @@ struct global_tablet_id { auto operator<=>(const global_tablet_id&) const = default; }; +struct range_based_tablet_id { + table_id table; + dht::token_range range; + + bool operator==(const range_based_tablet_id&) const = default; +}; + struct tablet_replica { host_id host; shard_id shard; @@ -101,6 +108,15 @@ struct hash { } }; +template<> +struct hash { + size_t operator()(const locator::range_based_tablet_id& id) const { + return utils::hash_combine( + std::hash()(id.table), + std::hash()(id.range)); + } +}; + } namespace locator { @@ -420,6 +436,25 @@ struct load_stats_v1 { std::unordered_map tables; }; +// This is defined as final in the idl layer to limit the amount of encoded data sent via the RPC +struct tablet_load_stats { + // Sum of all tablet sizes on a node and available disk space. + uint64_t effective_capacity = 0; + + std::unordered_map tablet_sizes; + + // returns the aggregated size of all the tablets added + uint64_t add_tablet_sizes(const tablet_load_stats& tls); +}; + +// Used as a return value for functions returning both table and tablet stats +struct combined_load_stats { + locator::table_load_stats table_ls; + locator::tablet_load_stats tablet_ls; +}; + +using tablet_load_stats_map = std::unordered_map; + struct load_stats { std::unordered_map tables; @@ -429,12 +464,17 @@ struct load_stats { // Critical disk utilization check for each host. std::unordered_map critical_disk_utilization; + // Size-based load balancing data + tablet_load_stats_map tablet_stats; + static load_stats from_v1(load_stats_v1&&); load_stats& operator+=(const load_stats& s); friend load_stats operator+(load_stats a, const load_stats& b) { return a += b; } + + std::optional get_tablet_size(host_id host, const range_based_tablet_id& rb_tid) const; }; using load_stats_v2 = load_stats; @@ -848,6 +888,13 @@ struct fmt::formatter : fmt::formatter { } }; +template <> +struct fmt::formatter : fmt::formatter { + auto format(const locator::range_based_tablet_id& rb_tid, fmt::format_context& ctx) const { + return fmt::format_to(ctx.out(), "{}:{}", rb_tid.table, rb_tid.range); + } +}; + template <> struct fmt::formatter : fmt::formatter { auto format(const locator::tablet_map&, fmt::format_context& ctx) const -> decltype(ctx.out()); diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 60158038b5..c03f281168 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -428,7 +428,7 @@ public: virtual storage_group& storage_group_for_token(dht::token) const = 0; virtual utils::chunked_vector storage_groups_for_token_range(dht::token_range tr) const = 0; - virtual locator::table_load_stats table_load_stats(std::function tablet_filter) const noexcept = 0; + virtual locator::combined_load_stats table_load_stats(std::function tablet_filter) const noexcept = 0; virtual bool all_storage_groups_split() = 0; virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0; virtual future<> maybe_split_compaction_group_of(size_t idx) = 0; diff --git a/replica/database.hh b/replica/database.hh index 63623939c7..8da00d9fd7 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1126,7 +1126,7 @@ public: // The tablet filter is used to not double account migrating tablets, so it's important that // only one of pending or leaving replica is accounted based on current migration stage. - locator::table_load_stats table_load_stats(std::function tablet_filter) const noexcept; + locator::combined_load_stats table_load_stats(std::function tablet_filter) const noexcept; const db::view::stats& get_view_stats() const { return _view_stats; diff --git a/replica/table.cc b/replica/table.cc index 1b9ea6dfeb..6dc1d1418a 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -705,12 +705,15 @@ public: return *_single_sg; } - locator::table_load_stats table_load_stats(std::function) const noexcept override { - return locator::table_load_stats{ - .size_in_bytes = _single_sg->live_disk_space_used(), - .split_ready_seq_number = std::numeric_limits::min() + locator::combined_load_stats table_load_stats(std::function) const noexcept override { + return locator::combined_load_stats{ + .table_ls = locator::table_load_stats{ + .size_in_bytes = _single_sg->live_disk_space_used(), + .split_ready_seq_number = std::numeric_limits::min()}, + .tablet_ls = locator::tablet_load_stats{} }; } + bool all_storage_groups_split() override { return true; } future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override { return make_ready_future(); } future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); } @@ -868,7 +871,7 @@ public: return storage_group_for_id(storage_group_of(token).first); } - locator::table_load_stats table_load_stats(std::function tablet_filter) const noexcept override; + locator::combined_load_stats table_load_stats(std::function tablet_filter) const noexcept override; bool all_storage_groups_split() override; future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override; future<> maybe_split_compaction_group_of(size_t idx) override; @@ -2797,20 +2800,28 @@ void table::on_flush_timer() { }); } -locator::table_load_stats tablet_storage_group_manager::table_load_stats(std::function tablet_filter) const noexcept { - locator::table_load_stats stats; - stats.split_ready_seq_number = _split_ready_seq_number; +locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function tablet_filter) const noexcept { + locator::table_load_stats table_stats; + table_stats.split_ready_seq_number = _split_ready_seq_number; + + locator::tablet_load_stats tablet_stats; for_each_storage_group([&] (size_t id, storage_group& sg) { locator::global_tablet_id gid { _t.schema()->id(), locator::tablet_id(id) }; if (tablet_filter(*_tablet_map, gid)) { - stats.size_in_bytes += sg.live_disk_space_used(); + const uint64_t tablet_size = sg.live_disk_space_used(); + table_stats.size_in_bytes += tablet_size; + const locator::range_based_tablet_id rb_tid {gid.table, _tablet_map->get_token_range(gid.tablet)}; + tablet_stats.tablet_sizes[rb_tid] = tablet_size; } }); - return stats; + return locator::combined_load_stats{ + .table_ls = std::move(table_stats), + .tablet_ls = std::move(tablet_stats) + }; } -locator::table_load_stats table::table_load_stats(std::function tablet_filter) const noexcept { +locator::combined_load_stats table::table_load_stats(std::function tablet_filter) const noexcept { return _sg_manager->table_load_stats(std::move(tablet_filter)); } diff --git a/service/storage_service.cc b/service/storage_service.cc index c988bd5fe8..b921ca58dd 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -7006,9 +7006,13 @@ future storage_service::load_stats_for_tablet_based_tables( // double accounting (anomaly) in the reported size. auto tmlock = co_await get_token_metadata_lock(); + const locator::host_id this_host = _db.local().get_token_metadata().get_my_id(); + + uint64_t sum_tablet_sizes = 0; + // Each node combines a per-table load map from all of its shards and returns it to the coordinator. // So if there are 1k nodes, there will be 1k RPCs in total. - auto load_stats = co_await _db.map_reduce0([&table_ids] (replica::database& db) -> future { + auto load_stats = co_await _db.map_reduce0([&table_ids, &this_host, &sum_tablet_sizes] (replica::database& db) -> future { locator::load_stats load_stats{}; auto& tables_metadata = db.get_tables_metadata(); @@ -7044,17 +7048,30 @@ future storage_service::load_stats_for_tablet_based_tables( || (is_pending && s == locator::read_replica_set_selector::next); }; - load_stats.tables.emplace(id, table->table_load_stats(tablet_filter)); + locator::combined_load_stats combined_ls { table->table_load_stats(tablet_filter) }; + load_stats.tables.emplace(id, std::move(combined_ls.table_ls)); + sum_tablet_sizes += load_stats.tablet_stats[this_host].add_tablet_sizes(combined_ls.tablet_ls); + co_await coroutine::maybe_yield(); } co_return std::move(load_stats); }, locator::load_stats{}, std::plus()); - auto this_host = _db.local().get_token_metadata().get_my_id(); load_stats.capacity[this_host] = _disk_space_monitor->space().capacity; load_stats.critical_disk_utilization[this_host] = _disk_space_monitor->disk_utilization() > _db.local().get_config().critical_disk_utilization_level(); + const std::filesystem::space_info si = _disk_space_monitor->space(); + load_stats.capacity[this_host] = si.capacity; + + locator::tablet_load_stats& tls = load_stats.tablet_stats[this_host]; + const uint64_t config_capacity = _db.local().get_config().data_file_capacity(); + if (config_capacity != 0) { + tls.effective_capacity = config_capacity; + } else { + tls.effective_capacity = si.available + sum_tablet_sizes; + } + co_return std::move(load_stats); }