load balancing: extend locator::load_stats to collect tablet sizes

This commit extend the TABLE_LOAD_STATS RPC with data about the tablet
replica sizes and effective disk capacity.
Effective disk capacity of a node is computed as a sum of the sizes of
all tablet replicas on a node and available disk space.

This is the first change in the size based load balancing series.

Closes scylladb/scylladb#26035
This commit is contained in:
Ferenc Szili
2025-05-28 16:08:13 +02:00
committed by Tomasz Grabiec
parent 37f59cef04
commit 20aeed1607
7 changed files with 128 additions and 17 deletions

View File

@@ -24,14 +24,27 @@ struct table_load_stats final {
int64_t split_ready_seq_number;
};
struct range_based_tablet_id final {
::table_id table;
dht::token_range range;
};
struct load_stats_v1 final {
std::unordered_map<::table_id, locator::table_load_stats> tables;
};
struct tablet_load_stats final {
// Sum of all tablet sizes on a node and available disk space.
uint64_t effective_capacity;
std::unordered_map<locator::range_based_tablet_id, uint64_t> tablet_sizes;
};
struct load_stats {
std::unordered_map<::table_id, locator::table_load_stats> tables;
std::unordered_map<locator::host_id, uint64_t> capacity;
std::unordered_map<locator::host_id, bool> critical_disk_utilization [[version 2025.3]];
std::unordered_map<locator::host_id, locator::tablet_load_stats> tablet_stats [[version 2026.1]];
};
}

View File

@@ -842,6 +842,15 @@ table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexce
return *this;
}
uint64_t tablet_load_stats::add_tablet_sizes(const tablet_load_stats& tls) {
uint64_t table_sizes_sum = 0;
for (auto& [rb_tid, tablet_size] : tls.tablet_sizes) {
tablet_sizes[rb_tid] = tablet_size;
table_sizes_sum += tablet_size;
}
return table_sizes_sum;
}
load_stats load_stats::from_v1(load_stats_v1&& stats) {
return { .tables = std::move(stats.tables) };
}
@@ -856,10 +865,24 @@ load_stats& load_stats::operator+=(const load_stats& s) {
for (auto& [host, cdu] : s.critical_disk_utilization) {
critical_disk_utilization[host] = cdu;
}
for (auto& [host, tablet_ls] : s.tablet_stats) {
tablet_stats[host].effective_capacity = tablet_ls.effective_capacity;
tablet_stats[host].add_tablet_sizes(tablet_ls);
}
return *this;
}
std::optional<uint64_t> load_stats::get_tablet_size(host_id host, const range_based_tablet_id& rb_tid) const {
if (auto node_i = tablet_stats.find(host); node_i != tablet_stats.end()) {
const tablet_load_stats& tls = node_i->second;
if (auto ts_i = tls.tablet_sizes.find(rb_tid); ts_i != tls.tablet_sizes.end()) {
return ts_i->second;
}
}
tablet_logger.debug("Unable to find tablet size on host: {} for tablet: {}", host, rb_tid);
return std::nullopt;
}
tablet_range_splitter::tablet_range_splitter(schema_ptr schema, const tablet_map& tablets, host_id host, const dht::partition_range_vector& ranges)
: _schema(std::move(schema))
, _ranges(ranges)

View File

@@ -63,6 +63,13 @@ struct global_tablet_id {
auto operator<=>(const global_tablet_id&) const = default;
};
struct range_based_tablet_id {
table_id table;
dht::token_range range;
bool operator==(const range_based_tablet_id&) const = default;
};
struct tablet_replica {
host_id host;
shard_id shard;
@@ -101,6 +108,15 @@ struct hash<locator::global_tablet_id> {
}
};
template<>
struct hash<locator::range_based_tablet_id> {
size_t operator()(const locator::range_based_tablet_id& id) const {
return utils::hash_combine(
std::hash<table_id>()(id.table),
std::hash<dht::token_range>()(id.range));
}
};
}
namespace locator {
@@ -420,6 +436,25 @@ struct load_stats_v1 {
std::unordered_map<table_id, table_load_stats> tables;
};
// This is defined as final in the idl layer to limit the amount of encoded data sent via the RPC
struct tablet_load_stats {
// Sum of all tablet sizes on a node and available disk space.
uint64_t effective_capacity = 0;
std::unordered_map<range_based_tablet_id, uint64_t> tablet_sizes;
// returns the aggregated size of all the tablets added
uint64_t add_tablet_sizes(const tablet_load_stats& tls);
};
// Used as a return value for functions returning both table and tablet stats
struct combined_load_stats {
locator::table_load_stats table_ls;
locator::tablet_load_stats tablet_ls;
};
using tablet_load_stats_map = std::unordered_map<host_id, tablet_load_stats>;
struct load_stats {
std::unordered_map<table_id, table_load_stats> tables;
@@ -429,12 +464,17 @@ struct load_stats {
// Critical disk utilization check for each host.
std::unordered_map<locator::host_id, bool> critical_disk_utilization;
// Size-based load balancing data
tablet_load_stats_map tablet_stats;
static load_stats from_v1(load_stats_v1&&);
load_stats& operator+=(const load_stats& s);
friend load_stats operator+(load_stats a, const load_stats& b) {
return a += b;
}
std::optional<uint64_t> get_tablet_size(host_id host, const range_based_tablet_id& rb_tid) const;
};
using load_stats_v2 = load_stats;
@@ -848,6 +888,13 @@ struct fmt::formatter<locator::tablet_replica> : fmt::formatter<string_view> {
}
};
template <>
struct fmt::formatter<locator::range_based_tablet_id> : fmt::formatter<string_view> {
auto format(const locator::range_based_tablet_id& rb_tid, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}:{}", rb_tid.table, rb_tid.range);
}
};
template <>
struct fmt::formatter<locator::tablet_map> : fmt::formatter<string_view> {
auto format(const locator::tablet_map&, fmt::format_context& ctx) const -> decltype(ctx.out());

View File

@@ -428,7 +428,7 @@ public:
virtual storage_group& storage_group_for_token(dht::token) const = 0;
virtual utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const = 0;
virtual locator::table_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept = 0;
virtual locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept = 0;
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;

View File

@@ -1126,7 +1126,7 @@ public:
// The tablet filter is used to not double account migrating tablets, so it's important that
// only one of pending or leaving replica is accounted based on current migration stage.
locator::table_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept;
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept;
const db::view::stats& get_view_stats() const {
return _view_stats;

View File

@@ -705,12 +705,15 @@ public:
return *_single_sg;
}
locator::table_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const noexcept override {
return locator::table_load_stats{
.size_in_bytes = _single_sg->live_disk_space_used(),
.split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min()
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const noexcept override {
return locator::combined_load_stats{
.table_ls = locator::table_load_stats{
.size_in_bytes = _single_sg->live_disk_space_used(),
.split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min()},
.tablet_ls = locator::tablet_load_stats{}
};
}
bool all_storage_groups_split() override { return true; }
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override { return make_ready_future(); }
future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); }
@@ -868,7 +871,7 @@ public:
return storage_group_for_id(storage_group_of(token).first);
}
locator::table_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept override;
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept override;
bool all_storage_groups_split() override;
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
future<> maybe_split_compaction_group_of(size_t idx) override;
@@ -2797,20 +2800,28 @@ void table::on_flush_timer() {
});
}
locator::table_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
locator::table_load_stats stats;
stats.split_ready_seq_number = _split_ready_seq_number;
locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
locator::table_load_stats table_stats;
table_stats.split_ready_seq_number = _split_ready_seq_number;
locator::tablet_load_stats tablet_stats;
for_each_storage_group([&] (size_t id, storage_group& sg) {
locator::global_tablet_id gid { _t.schema()->id(), locator::tablet_id(id) };
if (tablet_filter(*_tablet_map, gid)) {
stats.size_in_bytes += sg.live_disk_space_used();
const uint64_t tablet_size = sg.live_disk_space_used();
table_stats.size_in_bytes += tablet_size;
const locator::range_based_tablet_id rb_tid {gid.table, _tablet_map->get_token_range(gid.tablet)};
tablet_stats.tablet_sizes[rb_tid] = tablet_size;
}
});
return stats;
return locator::combined_load_stats{
.table_ls = std::move(table_stats),
.tablet_ls = std::move(tablet_stats)
};
}
locator::table_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
locator::combined_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
return _sg_manager->table_load_stats(std::move(tablet_filter));
}

View File

@@ -7006,9 +7006,13 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
// double accounting (anomaly) in the reported size.
auto tmlock = co_await get_token_metadata_lock();
const locator::host_id this_host = _db.local().get_token_metadata().get_my_id();
uint64_t sum_tablet_sizes = 0;
// Each node combines a per-table load map from all of its shards and returns it to the coordinator.
// So if there are 1k nodes, there will be 1k RPCs in total.
auto load_stats = co_await _db.map_reduce0([&table_ids] (replica::database& db) -> future<locator::load_stats> {
auto load_stats = co_await _db.map_reduce0([&table_ids, &this_host, &sum_tablet_sizes] (replica::database& db) -> future<locator::load_stats> {
locator::load_stats load_stats{};
auto& tables_metadata = db.get_tables_metadata();
@@ -7044,17 +7048,30 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
|| (is_pending && s == locator::read_replica_set_selector::next);
};
load_stats.tables.emplace(id, table->table_load_stats(tablet_filter));
locator::combined_load_stats combined_ls { table->table_load_stats(tablet_filter) };
load_stats.tables.emplace(id, std::move(combined_ls.table_ls));
sum_tablet_sizes += load_stats.tablet_stats[this_host].add_tablet_sizes(combined_ls.tablet_ls);
co_await coroutine::maybe_yield();
}
co_return std::move(load_stats);
}, locator::load_stats{}, std::plus<locator::load_stats>());
auto this_host = _db.local().get_token_metadata().get_my_id();
load_stats.capacity[this_host] = _disk_space_monitor->space().capacity;
load_stats.critical_disk_utilization[this_host] = _disk_space_monitor->disk_utilization() > _db.local().get_config().critical_disk_utilization_level();
const std::filesystem::space_info si = _disk_space_monitor->space();
load_stats.capacity[this_host] = si.capacity;
locator::tablet_load_stats& tls = load_stats.tablet_stats[this_host];
const uint64_t config_capacity = _db.local().get_config().data_file_capacity();
if (config_capacity != 0) {
tls.effective_capacity = config_capacity;
} else {
tls.effective_capacity = si.available + sum_tablet_sizes;
}
co_return std::move(load_stats);
}