tablets: load_balancer: Export metrics

This commit is contained in:
Tomasz Grabiec
2023-07-31 21:03:41 +02:00
parent 67c7aadded
commit d653cbae53

View File

@@ -25,6 +25,80 @@ namespace service {
seastar::logger lblogger("load_balancer");
struct load_balancer_dc_stats {
uint64_t calls = 0;
uint64_t migrations_produced = 0;
uint64_t migrations_skipped = 0;
uint64_t tablets_skipped_node = 0;
uint64_t tablets_skipped_rack = 0;
uint64_t stop_balance = 0;
uint64_t stop_load_inversion = 0;
uint64_t stop_no_candidates = 0;
uint64_t stop_skip_limit = 0;
uint64_t stop_batch_size = 0;
};
struct load_balancer_node_stats {
double load = 0;
};
using dc_name = sstring;
class load_balancer_stats_manager {
std::unordered_map<dc_name, std::unique_ptr<load_balancer_dc_stats>> _dc_stats;
std::unordered_map<host_id, std::unique_ptr<load_balancer_node_stats>> _node_stats;
seastar::metrics::label dc_label{"target_dc"};
seastar::metrics::label node_label{"target_node"};
seastar::metrics::metric_groups _metrics;
void setup_metrics(const dc_name& dc, load_balancer_dc_stats& stats) {
namespace sm = seastar::metrics;
auto dc_lb = dc_label(dc);
_metrics.add_group("load_balancer", {
sm::make_counter("calls", sm::description("number of calls to the load balancer"),
stats.calls)(dc_lb),
sm::make_counter("migrations_produced", sm::description("number of migrations produced by the load balancer"),
stats.migrations_produced)(dc_lb),
sm::make_counter("migrations_skipped", sm::description("number of migrations skipped by the load balancer due to load limits"),
stats.migrations_skipped)(dc_lb),
});
}
void setup_metrics(const dc_name& dc, host_id node, load_balancer_node_stats& stats) {
namespace sm = seastar::metrics;
auto dc_lb = dc_label(dc);
auto node_lb = node_label(node);
_metrics.add_group("load_balancer", {
sm::make_gauge("load", sm::description("node load during last load balancing"),
stats.load)(dc_lb)(node_lb)
});
}
public:
load_balancer_dc_stats& for_dc(const dc_name& dc) {
auto it = _dc_stats.find(dc);
if (it == _dc_stats.end()) {
auto stats = std::make_unique<load_balancer_dc_stats>();
setup_metrics(dc, *stats);
it = _dc_stats.emplace(dc, std::move(stats)).first;
}
return *it->second;
}
load_balancer_node_stats& for_node(const dc_name& dc, host_id node) {
auto it = _node_stats.find(node);
if (it == _node_stats.end()) {
auto stats = std::make_unique<load_balancer_node_stats>();
setup_metrics(dc, node, *stats);
it = _node_stats.emplace(node, std::move(stats)).first;
}
return *it->second;
}
void unregister() {
_metrics.clear();
}
};
/// The algorithm aims to equalize tablet count on each shard.
/// This goal is based on the assumption that every shard has similar processing power and space capacity,
/// and that each tablet has equal consumption of those resources. So by equalizing tablet count per shard we
@@ -164,6 +238,7 @@ class load_balancer {
const size_t max_read_streaming_load = 4;
token_metadata_ptr _tm;
load_balancer_stats_manager& _stats;
private:
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
// We reflect migrations in the load as if they already happened,
@@ -195,9 +270,10 @@ private:
}
public:
load_balancer(token_metadata_ptr tm)
: _tm(std::move(tm)) {
}
load_balancer(token_metadata_ptr tm, load_balancer_stats_manager& stats)
: _tm(std::move(tm))
, _stats(stats)
{ }
future<migration_plan> make_plan() {
const locator::topology& topo = _tm->get_topology();
@@ -214,7 +290,8 @@ public:
co_return std::move(plan);
}
future<migration_plan> make_plan(sstring dc) {
future<migration_plan> make_plan(dc_name dc) {
_stats.for_dc(dc).calls++;
lblogger.info("Examining DC {}", dc);
// Causes load balancer to move some tablet even though load is balanced.
@@ -275,11 +352,13 @@ public:
if (load.avg_load > max_load) {
max_load = load.avg_load;
}
_stats.for_node(dc, host).load = load.avg_load;
}
if (!shuffle && max_load == min_load) {
// load is balanced.
// TODO: Evaluate and fix intra-node balance.
_stats.for_dc(dc).stop_balance++;
co_return migration_plan();
}
@@ -374,9 +453,15 @@ public:
auto& target_info = nodes[target];
const size_t max_skipped_migrations = target_info.shards.size() * 2;
size_t skipped_migrations = 0;
while (plan.size() < batch_size && !nodes_by_load.empty()) {
while (plan.size() < batch_size) {
co_await coroutine::maybe_yield();
if (nodes_by_load.empty()) {
lblogger.debug("No more candidate nodes");
_stats.for_dc(dc).stop_no_candidates++;
break;
}
std::pop_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
auto src_host = nodes_by_load.back();
auto& src_node_info = nodes[src_host];
@@ -393,6 +478,7 @@ public:
// to handle the case of off-candidates being empty. In that case, max_off_candidate_load is 0.
if (std::max(max_off_candidate_load, src_node_info.avg_load) == target_info.avg_load) {
lblogger.debug("Balance achieved.");
_stats.for_dc(dc).stop_balance++;
break;
}
@@ -401,6 +487,7 @@ public:
if (src_node_info.avg_load <= target_info.avg_load) {
lblogger.debug("No more candidate nodes. Next candidate is {} with avg_load={}, target's avg_load={}",
src_host, src_node_info.avg_load, target_info.avg_load);
_stats.for_dc(dc).stop_no_candidates++;
break;
}
@@ -410,6 +497,7 @@ public:
lblogger.debug("No more candidate nodes, load would be inverted. Next candidate is {} with "
"avg_load={}, target's avg_load={}",
src_host, src_node_info.avg_load, target_info.avg_load);
_stats.for_dc(dc).stop_load_inversion++;
break;
}
}
@@ -461,6 +549,7 @@ public:
}
if (has_replica_on_target) {
_stats.for_dc(dc).tablets_skipped_node++;
lblogger.debug("candidate tablet {} skipped because it has a replica on target node", source_tablet);
continue;
}
@@ -473,6 +562,7 @@ public:
if (new_rack_load > max_rack_load) {
lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}",
source_tablet, target_node->dc_rack().rack, new_rack_load, max_rack_load);
_stats.for_dc(dc).tablets_skipped_rack++;
continue;
}
}
@@ -485,6 +575,7 @@ public:
target_info.shards[dst.shard].streaming_write_load += 1;
src_node_info.shards[src_shard].streaming_read_load += 1;
lblogger.debug("Adding migration: {}", mig);
_stats.for_dc(dc).migrations_produced++;
plan.push_back(std::move(mig));
} else {
// Shards are overloaded with streaming. Do not include the migration in the plan, but
@@ -497,8 +588,10 @@ public:
src_node_info.shards[src_shard].streaming_read_load,
target_info.shards[dst.shard].streaming_write_load);
skipped_migrations++;
_stats.for_dc(dc).migrations_skipped++;
if (skipped_migrations >= max_skipped_migrations) {
lblogger.debug("Too many migrations skipped, aborting balancing");
_stats.for_dc(dc).stop_skip_limit++;
break;
}
}
@@ -520,6 +613,10 @@ public:
}
}
if (plan.size() == batch_size) {
_stats.for_dc(dc).stop_batch_size++;
}
if (plan.empty()) {
// Due to replica collocation constraints, it may not be possible to balance the cluster evenly.
// For example, if nodes have different number of shards. Nodes which have more shards will be
@@ -546,6 +643,7 @@ class tablet_allocator_impl : public tablet_allocator::impl
, public service::migration_listener::empty_listener {
service::migration_notifier& _migration_notifier;
replica::database& _db;
load_balancer_stats_manager _load_balancer_stats;
bool _stopped = false;
public:
tablet_allocator_impl(service::migration_notifier& mn, replica::database& db)
@@ -568,7 +666,7 @@ public:
}
future<migration_plan> balance_tablets(token_metadata_ptr tm) {
load_balancer lb(tm);
load_balancer lb(tm, _load_balancer_stats);
co_return co_await lb.make_plan();
}