From d653cbae53ae3a8b3ea97cd78f88a4dfc09047ed Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 31 Jul 2023 21:03:41 +0200 Subject: [PATCH] tablets: load_balancer: Export metrics --- service/tablet_allocator.cc | 110 ++++++++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 6 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index c57b79178c..a95d47d2b4 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -25,6 +25,80 @@ namespace service { seastar::logger lblogger("load_balancer"); +struct load_balancer_dc_stats { + uint64_t calls = 0; + uint64_t migrations_produced = 0; + uint64_t migrations_skipped = 0; + uint64_t tablets_skipped_node = 0; + uint64_t tablets_skipped_rack = 0; + uint64_t stop_balance = 0; + uint64_t stop_load_inversion = 0; + uint64_t stop_no_candidates = 0; + uint64_t stop_skip_limit = 0; + uint64_t stop_batch_size = 0; +}; + +struct load_balancer_node_stats { + double load = 0; +}; + +using dc_name = sstring; + +class load_balancer_stats_manager { + std::unordered_map> _dc_stats; + std::unordered_map> _node_stats; + seastar::metrics::label dc_label{"target_dc"}; + seastar::metrics::label node_label{"target_node"}; + seastar::metrics::metric_groups _metrics; + + void setup_metrics(const dc_name& dc, load_balancer_dc_stats& stats) { + namespace sm = seastar::metrics; + auto dc_lb = dc_label(dc); + _metrics.add_group("load_balancer", { + sm::make_counter("calls", sm::description("number of calls to the load balancer"), + stats.calls)(dc_lb), + sm::make_counter("migrations_produced", sm::description("number of migrations produced by the load balancer"), + stats.migrations_produced)(dc_lb), + sm::make_counter("migrations_skipped", sm::description("number of migrations skipped by the load balancer due to load limits"), + stats.migrations_skipped)(dc_lb), + }); + } + + void setup_metrics(const dc_name& dc, host_id node, load_balancer_node_stats& stats) { + namespace sm = seastar::metrics; + auto dc_lb = dc_label(dc); + auto node_lb = node_label(node); + _metrics.add_group("load_balancer", { + sm::make_gauge("load", sm::description("node load during last load balancing"), + stats.load)(dc_lb)(node_lb) + }); + } +public: + load_balancer_dc_stats& for_dc(const dc_name& dc) { + auto it = _dc_stats.find(dc); + if (it == _dc_stats.end()) { + auto stats = std::make_unique(); + setup_metrics(dc, *stats); + it = _dc_stats.emplace(dc, std::move(stats)).first; + } + return *it->second; + } + + load_balancer_node_stats& for_node(const dc_name& dc, host_id node) { + auto it = _node_stats.find(node); + if (it == _node_stats.end()) { + auto stats = std::make_unique(); + setup_metrics(dc, node, *stats); + it = _node_stats.emplace(node, std::move(stats)).first; + } + return *it->second; + } + + void unregister() { + _metrics.clear(); + } +}; + /// The algorithm aims to equalize tablet count on each shard. /// This goal is based on the assumption that every shard has similar processing power and space capacity, /// and that each tablet has equal consumption of those resources. So by equalizing tablet count per shard we @@ -164,6 +238,7 @@ class load_balancer { const size_t max_read_streaming_load = 4; token_metadata_ptr _tm; + load_balancer_stats_manager& _stats; private: tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { // We reflect migrations in the load as if they already happened, @@ -195,9 +270,10 @@ private: } public: - load_balancer(token_metadata_ptr tm) - : _tm(std::move(tm)) { - } + load_balancer(token_metadata_ptr tm, load_balancer_stats_manager& stats) + : _tm(std::move(tm)) + , _stats(stats) + { } future make_plan() { const locator::topology& topo = _tm->get_topology(); @@ -214,7 +290,8 @@ public: co_return std::move(plan); } - future make_plan(sstring dc) { + future make_plan(dc_name dc) { + _stats.for_dc(dc).calls++; lblogger.info("Examining DC {}", dc); // Causes load balancer to move some tablet even though load is balanced. @@ -275,11 +352,13 @@ public: if (load.avg_load > max_load) { max_load = load.avg_load; } + _stats.for_node(dc, host).load = load.avg_load; } if (!shuffle && max_load == min_load) { // load is balanced. // TODO: Evaluate and fix intra-node balance. + _stats.for_dc(dc).stop_balance++; co_return migration_plan(); } @@ -374,9 +453,15 @@ public: auto& target_info = nodes[target]; const size_t max_skipped_migrations = target_info.shards.size() * 2; size_t skipped_migrations = 0; - while (plan.size() < batch_size && !nodes_by_load.empty()) { + while (plan.size() < batch_size) { co_await coroutine::maybe_yield(); + if (nodes_by_load.empty()) { + lblogger.debug("No more candidate nodes"); + _stats.for_dc(dc).stop_no_candidates++; + break; + } + std::pop_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp); auto src_host = nodes_by_load.back(); auto& src_node_info = nodes[src_host]; @@ -393,6 +478,7 @@ public: // to handle the case of off-candidates being empty. In that case, max_off_candidate_load is 0. if (std::max(max_off_candidate_load, src_node_info.avg_load) == target_info.avg_load) { lblogger.debug("Balance achieved."); + _stats.for_dc(dc).stop_balance++; break; } @@ -401,6 +487,7 @@ public: if (src_node_info.avg_load <= target_info.avg_load) { lblogger.debug("No more candidate nodes. Next candidate is {} with avg_load={}, target's avg_load={}", src_host, src_node_info.avg_load, target_info.avg_load); + _stats.for_dc(dc).stop_no_candidates++; break; } @@ -410,6 +497,7 @@ public: lblogger.debug("No more candidate nodes, load would be inverted. Next candidate is {} with " "avg_load={}, target's avg_load={}", src_host, src_node_info.avg_load, target_info.avg_load); + _stats.for_dc(dc).stop_load_inversion++; break; } } @@ -461,6 +549,7 @@ public: } if (has_replica_on_target) { + _stats.for_dc(dc).tablets_skipped_node++; lblogger.debug("candidate tablet {} skipped because it has a replica on target node", source_tablet); continue; } @@ -473,6 +562,7 @@ public: if (new_rack_load > max_rack_load) { lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}", source_tablet, target_node->dc_rack().rack, new_rack_load, max_rack_load); + _stats.for_dc(dc).tablets_skipped_rack++; continue; } } @@ -485,6 +575,7 @@ public: target_info.shards[dst.shard].streaming_write_load += 1; src_node_info.shards[src_shard].streaming_read_load += 1; lblogger.debug("Adding migration: {}", mig); + _stats.for_dc(dc).migrations_produced++; plan.push_back(std::move(mig)); } else { // Shards are overloaded with streaming. Do not include the migration in the plan, but @@ -497,8 +588,10 @@ public: src_node_info.shards[src_shard].streaming_read_load, target_info.shards[dst.shard].streaming_write_load); skipped_migrations++; + _stats.for_dc(dc).migrations_skipped++; if (skipped_migrations >= max_skipped_migrations) { lblogger.debug("Too many migrations skipped, aborting balancing"); + _stats.for_dc(dc).stop_skip_limit++; break; } } @@ -520,6 +613,10 @@ public: } } + if (plan.size() == batch_size) { + _stats.for_dc(dc).stop_batch_size++; + } + if (plan.empty()) { // Due to replica collocation constraints, it may not be possible to balance the cluster evenly. // For example, if nodes have different number of shards. Nodes which have more shards will be @@ -546,6 +643,7 @@ class tablet_allocator_impl : public tablet_allocator::impl , public service::migration_listener::empty_listener { service::migration_notifier& _migration_notifier; replica::database& _db; + load_balancer_stats_manager _load_balancer_stats; bool _stopped = false; public: tablet_allocator_impl(service::migration_notifier& mn, replica::database& db) @@ -568,7 +666,7 @@ public: } future balance_tablets(token_metadata_ptr tm) { - load_balancer lb(tm); + load_balancer lb(tm, _load_balancer_stats); co_return co_await lb.make_plan(); }