/* * Copyright (C) 2023-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "cql3/statements/ks_prop_defs.hh" #include "db/system_keyspace.hh" #include "locator/tablets.hh" #include "locator/topology.hh" #include "replica/tablets.hh" #include "locator/tablet_replication_strategy.hh" #include "replica/database.hh" #include "service/migration_listener.hh" #include "service/tablet_allocator.hh" #include "utils/UUID.hh" #include "utils/assert.hh" #include "utils/error_injection.hh" #include "utils/stall_free.hh" #include "utils/overloaded_functor.hh" #include "db/config.hh" #include "db/tablet_options.hh" #include "locator/load_sketch.hh" #include "replica/database.hh" #include "gms/feature_service.hh" #include #include #include #include #include #include #include #include #include using namespace locator; using namespace replica; namespace service { seastar::logger lblogger("load_balancer"); void load_balancer_stats_manager::setup_metrics(const dc_name& dc, load_balancer_dc_stats& stats) { namespace sm = seastar::metrics; auto dc_lb = dc_label(dc); _metrics.add_group(group_name, { sm::make_counter("calls", sm::description("number of calls to the load balancer"), stats.calls)(dc_lb), sm::make_counter("migrations_produced", sm::description("number of migrations produced by the load balancer"), stats.migrations_produced)(dc_lb), sm::make_counter("migrations_skipped", sm::description("number of migrations skipped by the load balancer due to load limits"), stats.migrations_skipped)(dc_lb), sm::make_counter("cross_rack_collocations", sm::description("number of co-locating migrations which move replica across racks"), stats.cross_rack_collocations)(dc_lb), }); } void load_balancer_stats_manager::setup_metrics(const dc_name& dc, host_id node, load_balancer_node_stats& stats) { namespace sm = seastar::metrics; auto dc_lb = dc_label(dc); auto node_lb = node_label(node); _metrics.add_group(group_name, { sm::make_gauge("load", sm::description("node load during last load balancing"), stats.load)(dc_lb)(node_lb) }); } void load_balancer_stats_manager::setup_metrics(load_balancer_cluster_stats& stats) { namespace sm = seastar::metrics; // FIXME: we can probably improve it by making it per resize type (split, merge or none). _metrics.add_group(group_name, { sm::make_counter("resizes_emitted", sm::description("number of resizes produced by the load balancer"), stats.resizes_emitted), sm::make_counter("resizes_revoked", sm::description("number of resizes revoked by the load balancer"), stats.resizes_revoked), sm::make_counter("resizes_finalized", sm::description("number of resizes finalized by the load balancer"), stats.resizes_finalized), sm::make_counter("auto_repair_needs_repair_nr", sm::description("number of tablets with auto repair enabled that currently needs repair"), stats.auto_repair_needs_repair_nr), sm::make_counter("auto_repair_enabled_nr", sm::description("number of tablets with auto repair enabled"), stats.auto_repair_enabled_nr) }); } load_balancer_stats_manager::load_balancer_stats_manager(sstring group_name): group_name(std::move(group_name)) { setup_metrics(_cluster_stats); } const lw_shared_ptr& load_balancer_stats_manager::for_dc(const dc_name& dc) { auto it = _dc_stats.find(dc); if (it == _dc_stats.end()) { auto stats = make_lw_shared(); setup_metrics(dc, *stats); it = _dc_stats.emplace(dc, std::move(stats)).first; } return it->second; } load_balancer_node_stats& load_balancer_stats_manager::for_node(const dc_name& dc, host_id node) { auto it = _node_stats.find(node); if (it == _node_stats.end()) { auto stats = std::make_unique(); setup_metrics(dc, node, *stats); it = _node_stats.emplace(node, std::move(stats)).first; } return *it->second; } load_balancer_cluster_stats& load_balancer_stats_manager::for_cluster() { return _cluster_stats; } void load_balancer_stats_manager::unregister() { _metrics.clear(); } template requires std::convertible_to, db::tablet_options> db::tablet_options combine_tablet_options(R&& opts) { db::tablet_options combined_opts; using data_size_type = decltype(db::tablet_options::expected_data_size_in_gb)::value_type; data_size_type total_expected_data_size_in_gb = 0; size_t total_expected_data_size_in_gb_count = 0; for (const auto& opt : opts) { if (opt.min_tablet_count) { combined_opts.min_tablet_count = std::max(combined_opts.min_tablet_count.value_or(0), *opt.min_tablet_count); } if (opt.min_per_shard_tablet_count) { combined_opts.min_per_shard_tablet_count = std::max(combined_opts.min_per_shard_tablet_count.value_or(0), *opt.min_per_shard_tablet_count); } if (opt.expected_data_size_in_gb) { total_expected_data_size_in_gb += *opt.expected_data_size_in_gb; total_expected_data_size_in_gb_count++; } if (opt.max_tablet_count) { if (!combined_opts.max_tablet_count) { combined_opts.max_tablet_count = *opt.max_tablet_count; } else { combined_opts.max_tablet_count = std::min(*combined_opts.max_tablet_count, *opt.max_tablet_count); } } } if (total_expected_data_size_in_gb_count) { combined_opts.expected_data_size_in_gb = total_expected_data_size_in_gb / total_expected_data_size_in_gb_count; } return combined_opts; } static std::unordered_set split_string_to_tablet_id(std::string_view s, char delimiter) { auto tokens_view = s | std::views::split(delimiter) | std::views::transform([](auto&& range) { return std::string_view(&*range.begin(), std::ranges::distance(range)); }) | std::views::transform([](std::string_view sv) { return locator::tablet_id(std::stoul(std::string(sv))); }); return std::unordered_set{tokens_view.begin(), tokens_view.end()}; } struct repair_plan { locator::global_tablet_id gid; locator::tablet_info tinfo; dht::token_range range; dht::token last_token; db_clock::duration repair_time_diff; bool is_user_reuqest; }; // Used to compare different migration choices in regard to impact on load imbalance. // There is a total order on migration_badness such that better migrations are ordered before worse ones. struct migration_badness { double src_shard_badness = 0; double src_node_badness = 0; double dst_shard_badness = 0; double dst_node_badness = 0; bool bad; migration_badness() : bad(false) {} migration_badness(double src_shard_badness, double src_node_badness, double dst_shard_badness, double dst_node_badness) : src_shard_badness(src_shard_badness) , src_node_badness(src_node_badness) , dst_shard_badness(dst_shard_badness) , dst_node_badness(dst_node_badness) , bad(src_shard_badness > 0 || src_node_badness > 0 || dst_shard_badness > 0 || dst_node_badness > 0) {} double node_badness() const { return std::max(src_node_badness, dst_node_badness); } double shard_badness() const { return std::max(src_shard_badness, dst_shard_badness); } bool is_bad() const { return bad; } bool operator<(const migration_badness& other) const { // Prefer candidates with no across-node badness to those with across-node badness. // Then, prefer those with lowest shard badness. // We want to balance nodes first as balancing nodes internally between shards is cheap. if (node_badness() == other.node_badness()) { return shard_badness() < other.shard_badness(); } if (node_badness() > 0 || other.node_badness() > 0) { return node_badness() < other.node_badness(); } return shard_badness() < other.shard_badness(); } bool operator<=>(const migration_badness& other) const = default; }; struct colocated_tablets { global_tablet_id left_tablet; global_tablet_id right_tablet; auto operator<=>(const colocated_tablets&) const = default; }; // Represents either a single tablet replica or co-located replicas of sibling // tablets. The migration tablet set is logically treated by balancer as a single // candidate. When candidate represents co-located replicas, it means that // the balancer will work to preserve the co-location by migrating those replicas // to same destination. struct migration_tablet_set { std::variant tablet_s; uint64_t tablet_set_disk_size = 0; table_id table() const { return std::visit( overloaded_functor{ [](global_tablet_id t) { return t.table; }, [](colocated_tablets t) { return t.left_tablet.table; }, }, tablet_s); } using tablet_small_vector = utils::small_vector; tablet_small_vector tablets() const { return std::visit( overloaded_functor{ [](global_tablet_id t) { return tablet_small_vector{t}; }, [](colocated_tablets t) { return tablet_small_vector{t.left_tablet, t.right_tablet}; }, }, tablet_s); } bool colocated() const { return std::holds_alternative(tablet_s); } bool operator==(const migration_tablet_set& rhs) const { return tablet_s == rhs.tablet_s; } }; struct migration_candidate { migration_tablet_set tablets; tablet_replica src; tablet_replica dst; migration_badness badness; }; struct colocation_source { locator::global_tablet_id gid; locator::tablet_replica replica; }; using colocation_source_set = utils::chunked_vector; using colocation_sources_by_destination_rack = std::unordered_map; struct rack_list_colocation_state { colocation_sources_by_destination_rack dst_dc_rack_to_tablets; std::unordered_map> dst_to_requests; utils::UUID request_to_resume; void maybe_set_request_to_resume(const utils::UUID& id) { if (!request_to_resume) { request_to_resume = id; } } }; /// Formattable wrapper for migration_plan, whose formatter prints a short summary of the plan. struct plan_summary { migration_plan& plan; explicit plan_summary(migration_plan& plan) : plan(plan) {} }; future find_required_rack_list_colocations( replica::database& db, token_metadata_ptr tmptr, db::system_keyspace* sys_ks, const std::unordered_set& paused_rf_change_requests, const std::unordered_set& already_planned_migrations) { rack_list_colocation_state state; auto get_node = [&] (locator::host_id host) -> const locator::node& { auto* node = tmptr->get_topology().find_node(host); if (!node) { on_internal_error(lblogger, format("Node {} not found in topology", host)); } return *node; }; for (const auto& request_id : paused_rf_change_requests) { auto req_entry = co_await sys_ks->get_topology_request_entry(request_id); sstring ks_name = *req_entry.new_keyspace_rf_change_ks_name; if (!db.has_keyspace(ks_name)) { state.maybe_set_request_to_resume(request_id); continue; } auto& ks = db.find_keyspace(ks_name); std::unordered_map saved_ks_props = *req_entry.new_keyspace_rf_change_data; cql3::statements::ks_prop_defs new_ks_props{std::map{saved_ks_props.begin(), saved_ks_props.end()}}; new_ks_props.validate(); auto ks_md = new_ks_props.as_ks_metadata_update(ks.metadata(), *tmptr, db.features(), db.get_config()); auto tables_with_mvs = ks.metadata()->tables(); auto views = ks.metadata()->views(); tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end()); if (tables_with_mvs.empty()) { state.maybe_set_request_to_resume(request_id); continue; } bool no_changes_needed = true; for (const auto& table_or_mv : tables_with_mvs) { if (!tmptr->tablets().is_base_table(table_or_mv->id())) { continue; } const auto& tmap = tmptr->tablets().get_tablet_map(table_or_mv->id()); const auto& new_replication_strategy_config = ks_md->strategy_options(); for (auto& [dc, rf_value] : new_replication_strategy_config) { if (!std::holds_alternative(rf_value)) { continue; } auto racks = std::get(rf_value) | std::ranges::to>(); co_await tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& ti) -> future<> { auto gid = locator::global_tablet_id{table_or_mv->id(), tid}; // Current replicas in this DC. There might be multiple replicas in the same rack. auto dc_replicas = ti.replicas | std::views::filter([&] (const tablet_replica& r) { return get_node(r.host).dc_rack().dc == dc; }) | std::ranges::to>(); if (dc_replicas.empty()) { return make_ready_future<>(); } // Find replicas that are not in the desired racks (src_replicas) // and racks that do not have replicas yet (dst_racks). auto dst_racks = racks; std::vector src_replicas; for (const auto& r : dc_replicas) { auto rack = get_node(r.host).dc_rack().rack; if (dst_racks.find(rack) != dst_racks.end()) { // There is already a replica in this rack. dst_racks.erase(rack); } else { // There is a replica in this rack, but it needs to be moved. src_replicas.push_back(r); } } auto zipped = std::views::zip(src_replicas, dst_racks); if (!std::ranges::empty(zipped)) { no_changes_needed = false; } // Skip tablet that is in transitions. auto* tti = tmap.get_tablet_transition_info(tid); if (tti) { lblogger.debug("Skipped colocation for tablet={} which is already in transition={}", gid, tti->transition); return make_ready_future<>(); } // Skip tablet that is about to be in transition. if (already_planned_migrations.contains(gid)) { return make_ready_future<>(); } for (auto src_dst : zipped) { auto src = std::get<0>(src_dst); auto dst = std::get<1>(src_dst); auto endpoint = locator::endpoint_dc_rack{dc, dst}; state.dst_dc_rack_to_tablets[endpoint].emplace_back(colocation_source{{table_or_mv->id(), tid}, src}); state.dst_to_requests[endpoint].insert(request_id); } return make_ready_future<>(); }); } } if (no_changes_needed) { state.maybe_set_request_to_resume(request_id); } } co_return state; } future requires_rack_list_colocation( replica::database& db, locator::token_metadata_ptr tmptr, db::system_keyspace* sys_ks, utils::UUID request_id) { auto res = co_await find_required_rack_list_colocations(db, tmptr, sys_ks, {request_id}, {}); co_return res.request_to_resume != request_id; } } template<> struct fmt::formatter : fmt::formatter { template auto format(const service::migration_badness& badness, FormatContext& ctx) const { return fmt::format_to(ctx.out(), "{{s: {:.4f}, n: {:.4f}}}", badness.shard_badness(), badness.node_badness()); } }; template<> struct fmt::formatter : fmt::formatter { template auto format(const service::migration_tablet_set& tablet_set, FormatContext& ctx) const { if (tablet_set.colocated()) { return fmt::format_to(ctx.out(), "{{colocated: {}}}", tablet_set.tablets()); } return fmt::format_to(ctx.out(), "{}", tablet_set.tablets().front()); } }; template<> struct fmt::formatter : fmt::formatter { template auto format(const service::migration_candidate& candidate, FormatContext& ctx) const { fmt::format_to(ctx.out(), "{{tablet: {}, {} -> {}, badness: {}", candidate.tablets, candidate.src, candidate.dst, candidate.badness); if (candidate.badness.is_bad()) { fmt::format_to(ctx.out(), " (bad!)"); } fmt::format_to(ctx.out(), "}}"); return ctx.out(); } }; template<> struct fmt::formatter : fmt::formatter { template auto format(const service::repair_plan& p, FormatContext& ctx) const { auto diff_seconds = std::chrono::duration(p.repair_time_diff).count(); fmt::format_to(ctx.out(), "{{tablet={} last_token={} is_user_req={} diff_seconds={}}}", p.gid, p.last_token, p.is_user_reuqest, diff_seconds); return ctx.out(); } }; template<> struct fmt::formatter : fmt::formatter { template auto format(const service::plan_summary& p, FormatContext& ctx) const { auto& plan = p.plan; std::string_view delim = ""; auto get_delim = [&] { return std::exchange(delim, ", "); }; if (plan.migrations().size()) { fmt::format_to(ctx.out(), "{}migrations: {}", get_delim(), plan.migrations().size()); } if (plan.repair_plan().repairs().size()) { fmt::format_to(ctx.out(), "{}repairs: {}", get_delim(), plan.repair_plan().repairs().size()); } if (plan.resize_plan().resize.size()) { fmt::format_to(ctx.out(), "{}resize: {}", get_delim(), plan.resize_plan().resize.size()); } if (plan.resize_plan().finalize_resize.size()) { fmt::format_to(ctx.out(), "{}resize-ready: {}", get_delim(), plan.resize_plan().finalize_resize.size()); } if (plan.rack_list_colocation_plan().size()) { fmt::format_to(ctx.out(), "{}rack-list colocation ready: {}", get_delim(), plan.rack_list_colocation_plan().request_to_resume()); } if (delim.empty()) { fmt::format_to(ctx.out(), "empty"); } return ctx.out(); } }; namespace std { using namespace service; template <> struct hash { size_t operator()(const colocated_tablets& id) const { return utils::hash_combine(std::hash()(id.left_tablet), std::hash()(id.right_tablet)); } }; template <> struct hash { size_t operator()(const migration_tablet_set& tablet_set) const { return std::hash()(tablet_set.tablet_s); } }; } namespace service { /// The algorithm aims to equalize tablet count on each shard. /// This goal is based on the assumption that every shard has similar processing power and space capacity, /// and that each tablet has equal consumption of those resources. So by equalizing tablet count per shard we /// equalize resource utilization. /// /// The algorithm produces a migration plan which is a set of instructions about which tablets to move /// where. The plan is a small increment, not a complete plan. To achieve balance, the algorithm should /// be invoked iteratively until an empty plan is returned. /// /// The algorithm keeps track of load at two levels, per node and per shard. The reason for this is that /// we want to equalize the per-node score first, by moving tablets across nodes. Tablets are moved away /// from the most loaded node first. We also track load per shard, so that we move tablets from the most /// loaded shard on a given node first. /// /// The metric for node load is (number of tablets / shard count) which is the average /// per-shard load. If we achieve balance according to this metric, and then rebalance the nodes internally, /// we will achieve global balance on all shards in the cluster. /// /// The reason why we focus on nodes first before rebalancing them internally is that this results /// in less tablet movements than looking at shards only. /// /// It would be still beneficial to rebalance tablet-receiving nodes internally before moving tablets /// to them so that we can distribute load equally without overloading shards which are out of balance, /// but this is not implemented yet. /// /// The outline of the inter-node balancing algorithm is as follows: /// /// 1. Determine the set of nodes whose load should be balanced. /// 2. Divide the nodes into two sets, sources and destinations. /// Tablets are only moved from sources to destinations. /// When nodes are drained (e.g. on decommission), the drained nodes are sources and all other /// nodes are destinations. /// During free load balancing, we pick a single destination node which is the least loaded node /// and all other nodes are sources. /// 3. Move tablets from sources to destinations until load order between nodes would get inverted after the movement: /// 3.1. Pick the most-loaded source node (src.host) /// 3.1.1 Pick the most-loaded shard (src.shard) on src.host /// 3.2. Pick the least-loaded destination node (dst.host) /// 3.3. Pick the least-loaded shard (dst.shard) on dst.host /// 3.4. If candidate is not chosen, pick the best candidate tablet on src to move to dst. /// 3.5. If movement impact is bad: /// 3.5.1. Consider moving from other shards on src.host and to other destination hosts and shards. /// Picks the best candidate according to the impact of the movement on load imbalance. /// 3.6. Evaluate collocation constraints for tablet replicas /// 3.6.1. If met, schedule migration /// 3.6.2. If not, add the tablet to the list of skipped tablets on src.host /// /// /// Even though the algorithm focuses on a single target, the fact the the produced plan is just an increment /// means that many under-loaded nodes can be driven forward to balance concurrently because the load balancer /// will alternate between them across make_plan() calls. /// /// The algorithm behaves differently when there are decommissioning nodes which have tablet replicas. /// In this case, we move those tablets away first. The balancing works in the opposite direction. /// Rather than picking a single least-loaded target and moving tablets into it from many sources, /// we have a single source and move tablets to multiple targets. This process necessarily disregards /// convergence checks, and the stop condition is that the source is drained. We still take target /// load into consideration and pick least-loaded targets first. When draining is not possible /// because there is no viable new replica for a tablet, load balancing will throw an exception. /// /// After scheduling inter-node migrations, the algorithm schedules intra-node migrations. /// This means that across-node migrations can proceed in parallel with intra-node migrations /// if there is free capacity to carry them out, but across-node migrations have higher priority. /// /// Intra-node migrations are scheduled for each node independently with the aim to equalize /// per-shard tablet count on each node. /// /// If the algorithm is called with active tablet migrations in tablet metadata, those are treated /// by load balancer as if they were already completed. This allows the algorithm to incrementally /// make decision which when executed with active migrations will produce the desired result. /// Overload of shards which still contain migrated-away tablets is limited by the fact /// that the algorithm tracks streaming concurrency on both source and target shards of active /// migrations and takes concurrency limit into account when producing new migrations. /// /// The cost of make_plan() is relatively heavy in terms of preparing data structures, so the current /// implementation is not efficient if the scheduler would like to call make_plan() multiple times /// to parallelize execution. This will be addressed in the future by keeping the data structures /// valid across calls and only recalculating them when starting a new round with a new token metadata version. /// class load_balancer { using global_shard_id = tablet_replica; using shard_id = seastar::shard_id; // Represents metric for load which we want to equalize between shards or nodes. // Load balancer equalizes storage utilization. // In case force_capacity_based_balancing is true, it is assumed that each tablet has equal size and that // shards and nodes can have different capacity. If force_capacity_based_balancing is false, // tablet sizes are fetched from load_stats. // So we equalize: sum of tablet_sizes / capacity_in_bytes. using load_type = double; using table_candidates_map = std::unordered_map>; struct shard_load { size_t tablet_count = 0; std::optional dusage; absl::flat_hash_map tablet_count_per_table; absl::flat_hash_map tablet_sizes_per_table; // Number of tablets which are streamed from this shard. size_t streaming_read_load = 0; // Number of tablets which are streamed to this shard. size_t streaming_write_load = 0; // Tablets which still have a replica on this shard which are candidates for migrating away from this shard. // Grouped by table. Used when _use_table_aware_balancing == true. // The set of candidates per table may be empty. table_candidates_map candidates; // For all tables. Used when _use_table_aware_balancing == false. std::unordered_set candidates_all_tables; future<> clear_gently() { co_await utils::clear_gently(candidates); co_await utils::clear_gently(candidates_all_tables); } bool has_candidates() const { for (const auto& [table, tablets] : candidates) { if (!tablets.empty()) { return true; } } return !candidates_all_tables.empty(); } size_t candidate_count() const { size_t result = 0; for (const auto& [table, tablets] : candidates) { result += tablets.size(); } return result + candidates_all_tables.size(); } }; struct skipped_candidate { tablet_replica replica; migration_tablet_set tablets; std::unordered_set viable_targets; }; struct node_load { host_id id; uint64_t shard_count = 0; uint64_t tablet_count = 0; std::optional dusage; // Invariant: bool(dusage) || drained. bool drained = false; bool excluded = false; // Engaged if and only if drained == true. // Determines whether the action is to migrate (leave request) or rebuild (remove request). // Looking at is_excluded() is not sufficient because a node may be marked as excluded during decommission, // and we don't want to silently upgrade it to a remove operation, which accepts a replica loss. std::optional req; const locator::node* node; // never nullptr // The average shard load on this node. // Valid only when "dusage" is set. load_type avg_load = 0; absl::flat_hash_map tablet_count_per_table; absl::flat_hash_map tablet_sizes_per_table; // heap which tracks most-loaded shards using shards_by_load_cmp(). // Valid during intra-node plan-making for nodes which are in the source node set. std::vector shards_by_load; std::vector shards; // Indexed by shard_id to which a given shard_load corresponds. utils::chunked_vector skipped_candidates; std::optional capacity_per_shard() const { return dusage.transform([&] (auto du) { return load_type(du.capacity) / shard_count; }); } const sstring& dc() const { return node->dc_rack().dc; } const sstring& rack() const { return node->dc_rack().rack; } locator::node::state state() const { return node->get_state(); } // Call when tablet_count or capacity changes. void update() { if (auto load = get_avg_load()) { avg_load = *load; } } // Result engaged when !drained. std::optional get_avg_load(uint64_t used_size_delta = 0) const { return dusage.transform([&] (auto du) { du.used += used_size_delta; return du.get_load(); }); } double tablets_per_shard(uint64_t tablets) const { return double(tablets) / shard_count; } double tablets_per_shard() const { return tablets_per_shard(tablet_count); } // Result engaged for !drained nodes. std::optional shard_load(shard_id shard, int64_t used_size_delta = 0) const { return shards[shard].dusage.transform([&] (auto du) { du.used += used_size_delta; return du.get_load(); }); } auto shards_by_load_cmp() { return [this] (const auto& a, const auto& b) { if (dusage) { return shards[a].dusage->get_load() < shards[b].dusage->get_load(); } else { return shards[a].tablet_count < shards[b].tablet_count; } }; } future<> clear_gently() { co_await utils::clear_gently(shards); co_await utils::clear_gently(skipped_candidates); } }; // Data structure used for making load-balancing decisions over a set of nodes. using node_load_map = std::unordered_map; // Less-comparator which orders nodes by load. struct nodes_by_load_cmp { node_load_map& nodes; bool operator()(host_id a, host_id b) const { return nodes[a].avg_load < nodes[b].avg_load; } }; // We have split and merge thresholds, which work respectively as (target) upper and lower // bound for average size of tablets. // // The merge threshold is 50% of target tablet size (a midpoint between split and merge), // such that after a merge, the average size is equally far from split and merge. // The same applies to split. It's 100% of target size, so after split, the average is // close to the target size (assuming small variations during the operation). // // It might happen that during a resize decision, average size changes drastically, and // split or merge might get cancelled. E.g. after deleting a large partition or lots of // data becoming suddenly expired. // If we're splitting, we will only cancel it, if the average size dropped below the // target size. That's because a merge would be required right after split completes, // due to the average size dropping below the merge threshold, as tablet count doubles. const uint64_t _target_tablet_size = default_target_tablet_size; const unsigned _tablets_per_shard_goal; uint64_t target_max_tablet_size(uint64_t target_tablet_size) const noexcept { return target_tablet_size * 2; } uint64_t target_min_tablet_size(uint64_t target_tablet_size) const noexcept { return target_tablet_size / 2; } struct table_size_desc { uint64_t target_tablet_size; uint64_t avg_tablet_size; locator::resize_decision resize_decision; locator::resize_decision new_resize_decision; size_t tablet_count; size_t shard_count; sstring reason; // reason for target_tablet_count }; struct cluster_resize_load { using table_id_and_size_desc = std::pair; std::vector tables_need_resize; std::vector tables_being_resized; static locator::resize_decision to_resize_decision(const table_size_desc& d) { return d.new_resize_decision; } bool table_needs_resize(const table_size_desc& d) const { return to_resize_decision(d).split_or_merge(); } // Resize cancellation will account for possible oscillations caused by compaction, etc. // We shouldn't rush into cancelling an ongoing resize. That will only happen if the // average size is past the point it would be if either split or merge had completed. // If we cancel a split, that's because average size dropped so much a merge would be // required post completion, and vice-versa. bool table_needs_resize_cancellation(const table_size_desc& d) const { if (utils::get_local_injector().enter("force_resize_cancellation")) { return true; } return d.resize_decision.split_or_merge() && to_resize_decision(d).way != d.resize_decision.way; } void update(table_id id, table_size_desc d) { bool table_undergoing_resize = d.resize_decision.split_or_merge(); // Resizing tables that no longer need resize will have the resize decision revoked, // therefore they must be listed as being resized. if (!table_needs_resize(d) && !table_undergoing_resize) { return; } auto entry = std::make_pair(id, std::move(d)); if (table_undergoing_resize) { tables_being_resized.push_back(entry); } else { tables_need_resize.push_back(entry); } } // Comparator that measures the weight of the need for resizing. auto resize_urgency_cmp() const { return [] (const table_id_and_size_desc& a, const table_id_and_size_desc& b) { auto urgency = [] (const table_size_desc& d) -> double { // FIXME: only takes into account split today. return double(d.avg_tablet_size); }; return urgency(a.second) < urgency(b.second); }; } // Resize decisions can be revoked with an empty (none) decision, so replicas // will know they're no longer required to prepare storage for the execution of // topology changes. static locator::resize_decision revoke_resize_decision() { return locator::resize_decision{}; } }; // Per-shard limits for active tablet streaming sessions. // // There is no hard reason for these values being what they are other than // the guidelines below. // // We want to limit concurrency of active streaming for several reasons. // One is that we want to prevent over-utilization of memory required to carry out streaming, // as that may lead to OOM or excessive cache eviction. // // There is no network scheduler yet, so we want to avoid over-utilization of network bandwidth. // Limiting per-shard concurrency is a lame way to achieve that, but it's better than nothing. // // Scheduling groups should limit impact of streaming on other kinds of processes on the same node, // so this aspect is not the reason for limiting concurrency. // // We don't want too much parallelism because it means that we have plenty of migrations // which progress slowly. It's better to have fewer which complete faster because // less user requests suffer from double-quorum overhead, and under-loaded nodes can take // the load sooner. At the same time, we want to have enough concurrency to fully utilize resources. // // Streaming speed is supposed to be I/O bound and writes are more expensive in terms of IO than reads, // so we allow more read concurrency. // // We allow at least two sessions per shard so that there is less chance for idling until load balancer // makes the next decision after streaming is finished. size_t max_write_streaming_load; size_t max_read_streaming_load; replica::database& _db; token_metadata_ptr _tm; service::topology* _topology; db::system_keyspace* _sys_ks; std::optional _load_sketch; // Holds the set of tablets already scheduled for transition during plan-making. std::unordered_set _scheduled_tablets; // Holds tablet replica count per table in the balanced node set (within a single DC). absl::flat_hash_map _tablet_count_per_table; // Holds total used storage per table in the DC absl::flat_hash_map _disk_used_per_table; dc_name _dc; std::optional _rack; // Set when plan making is limited to a single rack. sstring _location; // Name of the current scope of plan making. DC or DC+rack. lw_shared_ptr _current_stats; // Stats for current scope of plan making. size_t _total_capacity_shards; // Total number of non-drained shards in the balanced node set. size_t _total_capacity_nodes; // Total number of non-drained nodes in the balanced node set. uint64_t _total_capacity_storage; // Total storage of non-drained nodes in the balanced node set. size_t _migrating_candidates; // Number of candidate replicas skipped because tablet is migrating. locator::load_stats_ptr _table_load_stats; load_balancer_stats_manager& _stats; std::unordered_set _skiplist; bool _use_table_aware_balancing = true; double _initial_scale = 1; // This is the maximum load delta between the most and least loaded nodes, // below which the balancer considers the DC balanced double _size_based_balance_threshold = 0.01; // When this is set to true, the balancer assumes all tablets // have the same size: _target_tablet_size bool _force_capacity_based_balancing = false; // The minimal tablet size the balancer will compute load with. For any tablet smaller than this, // the balancer will use this size instead of the actual tablet size. uint64_t _minimal_tablet_size = service::default_target_tablet_size / 100; private: tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { // We reflect migrations in the load as if they already happened, // optimistically assuming that they will succeed. return trinfo ? trinfo->next : ti.replicas; } tablet_replica_set sorted_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { auto set = get_replicas_for_tablet_load(ti, trinfo); std::ranges::sort(set, std::less()); return set; } // Whether to count the tablet as putting streaming load on the system. // Tablets which are streaming or are yet-to-stream are counted. bool is_streaming(const tablet_transition_info* trinfo) { if (!trinfo) { return false; } switch (trinfo->stage) { case tablet_transition_stage::allow_write_both_read_old: return true; case tablet_transition_stage::write_both_read_old: return true; case tablet_transition_stage::write_both_read_old_fallback_cleanup: return false; case tablet_transition_stage::streaming: return true; case tablet_transition_stage::rebuild_repair: return true; case tablet_transition_stage::repair: return true; case tablet_transition_stage::end_repair: return false; case tablet_transition_stage::write_both_read_new: return false; case tablet_transition_stage::use_new: return false; case tablet_transition_stage::cleanup: return false; case tablet_transition_stage::cleanup_target: return false; case tablet_transition_stage::revert_migration: return false; case tablet_transition_stage::end_migration: return false; } on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast(trinfo->stage))); } using migration_vector = migration_plan::migrations_vector; static migration_vector get_migration_info(const migration_tablet_set& tablet_set, tablet_transition_kind kind, tablet_replica src, tablet_replica dst) { migration_vector infos; for (auto tablet : tablet_set.tablets()) { infos.push_back(tablet_migration_info{kind, tablet, src, dst}); } return infos; } using migration_streaming_info_vector = utils::small_vector; static migration_streaming_info_vector get_migration_streaming_infos(const locator::topology& topology, const tablet_map& tmap, const migration_vector& infos) { migration_streaming_info_vector streaming_infos; for (auto& info : infos) { auto& ti = tmap.get_tablet_info(info.tablet.tablet); streaming_infos.push_back(get_migration_streaming_info(topology, ti, info)); } return streaming_infos; } public: load_balancer(replica::database& db, token_metadata_ptr tm, service::topology* topology, db::system_keyspace* sys_ks, locator::load_stats_ptr table_load_stats, load_balancer_stats_manager& stats, uint64_t target_tablet_size, unsigned tablets_per_shard_goal, std::unordered_set skiplist) : _target_tablet_size(target_tablet_size) , _tablets_per_shard_goal(tablets_per_shard_goal) , _db(db) , _tm(std::move(tm)) , _topology(topology) , _sys_ks(sys_ks) , _table_load_stats(std::move(table_load_stats)) , _stats(stats) , _skiplist(std::move(skiplist)) , _size_based_balance_threshold(db.get_config().size_based_balance_threshold_percentage() / 100.0) , _force_capacity_based_balancing(db.get_config().force_capacity_based_balancing()) , _minimal_tablet_size(db.get_config().minimal_tablet_size_for_balancing()) { // Force capacity based balancing until all the nodes have been upgraded if (!_db.features().size_based_load_balancing && !_force_capacity_based_balancing) { lblogger.info("Size based load balancing cluster feature disabled; forcing capacity based balancing"); _force_capacity_based_balancing = true; } max_read_streaming_load = db.get_config().tablet_streaming_read_concurrency_per_shard(); max_write_streaming_load = db.get_config().tablet_streaming_write_concurrency_per_shard(); } bool ongoing_rack_list_colocation() const { return _topology != nullptr && _sys_ks != nullptr && !_topology->paused_rf_change_requests.empty(); } future make_plan() { const locator::topology& topo = _tm->get_topology(); migration_plan plan; auto rack_list_colocation = ongoing_rack_list_colocation(); // Prepare plans for each DC separately and combine them to be executed in parallel. for (auto&& dc : topo.get_datacenters()) { if (_db.get_config().rf_rack_valid_keyspaces() || _db.get_config().enforce_rack_list() || rack_list_colocation) { for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) { auto rack_plan = co_await make_plan(dc, rack); auto level = rack_plan.empty() ? seastar::log_level::debug : seastar::log_level::info; lblogger.log(level, "Plan for {}/{}: {}", dc, rack, plan_summary(rack_plan)); plan.merge(std::move(rack_plan)); } } else { auto dc_plan = co_await make_plan(dc); auto level = dc_plan.empty() ? seastar::log_level::debug : seastar::log_level::info; lblogger.log(level, "Plan for {}: {}", dc, plan_summary(dc_plan)); plan.merge(std::move(dc_plan)); } } if (rack_list_colocation) { plan.merge(co_await make_rack_list_colocation_plan(plan)); } // Merge table-wide resize decisions, may emit new decisions, revoke or finalize ongoing ones. // Note : Resize plans should be generated before repair plans to avoid scheduling repairs when there is pending resize finalization plan.merge_resize_plan(co_await make_resize_plan(plan)); // Skip making repair plans if resize finalizations are pending, since repairs could delay finalization. if (plan.resize_plan().finalize_resize.empty()) { plan.set_repair_plan(co_await make_repair_plan(plan)); } auto level = plan.empty() ? seastar::log_level::debug : seastar::log_level::info; lblogger.log(level, "Prepared plan: {}", plan_summary(plan)); co_return std::move(plan); } void set_use_table_aware_balancing(bool use_table_aware_balancing) { _use_table_aware_balancing = use_table_aware_balancing; } void set_initial_scale(double initial_scale) { _initial_scale = initial_scale; } const locator::table_load_stats* load_stats_for_table(table_id id) const { if (!_table_load_stats) { return nullptr; } auto it = _table_load_stats->tables.find(id); return (it != _table_load_stats->tables.end()) ? &it->second : nullptr; } std::optional get_tablet_size(host_id host, const range_based_tablet_id& rb_tid, const tablet_info& ti, const tablet_transition_info* trinfo) const { if (_table_load_stats) { return _table_load_stats->get_tablet_size_in_transition(host, rb_tid, ti, trinfo); } return std::nullopt; } bool is_auto_repair_enabled(const std::optional& config) { // Only check the yaml config for now return _db.get_config().auto_repair_enabled_default(); } future needs_auto_repair(const locator::global_tablet_id& gid, const locator::tablet_info& info, const std::optional& config, const db_clock::time_point& now, db_clock::duration& diff, service::auto_repair_stats& stats) { if (utils::get_local_injector().enter("tablet_keep_repairing")) { lblogger.info("Forced auto-repair for tablet={}", gid); co_return true; } if (!is_auto_repair_enabled(config)) { co_return false; } auto size = info.replicas.size(); if (size <= 1) { lblogger.debug("Skipped auto repair for tablet={} replicas={}", gid, size); co_return false; } auto threshold = _db.get_config().auto_repair_threshold_default_in_seconds(); auto repair_time_threshold = std::chrono::seconds(threshold); auto& last_repair_time = info.repair_time; diff = now - last_repair_time; lblogger.trace("Check gid={} diff={} last_repair_time={} repair_time_threshold={}", gid, diff, info.repair_time, repair_time_threshold); if (diff < repair_time_threshold) { co_return false; } stats.needs_repair_nr++; co_return true; } void ensure_node(node_load_map& nodes, host_id host) { if (nodes.contains(host)) { return; } const locator::topology& topo = _tm->get_topology(); auto* node = topo.find_node(host); if (!node) { on_internal_error(lblogger, format("Node {} not found in topology", host)); } node_load& load = nodes[host]; load.id = host; load.node = node; load.shard_count = node->get_shard_count(); load.excluded = node->is_excluded(); if (!load.shard_count) { throw std::runtime_error(format("Shard count of {} not found in topology", host)); } if (!_db.features().tablet_load_stats_v2) { // This way load calculation will hold tablet count. load.dusage = disk_usage{_target_tablet_size * load.shard_count, 0}; } else if (_table_load_stats) { if (_table_load_stats->tablet_stats.contains(host) && !_force_capacity_based_balancing) { load.dusage = disk_usage{_table_load_stats->tablet_stats.at(host).effective_capacity, 0}; } else if (_table_load_stats->capacity.contains(host)) { load.dusage = disk_usage{_table_load_stats->capacity.at(host), 0}; } } load.shards.resize(load.shard_count); if (load.dusage) { for (auto& sload : load.shards) { sload.dusage = disk_usage{ load.dusage->capacity / load.shard_count, 0 }; } } } future<> consider_scheduled_load(node_load_map& nodes) { const locator::topology& topo = _tm->get_topology(); for (auto&& [table, tables] : _tm->tablets().all_table_groups()) { const auto& tmap = _tm->tablets().get_tablet_map(table); for (auto&& [tid, trinfo]: tmap.transitions()) { co_await coroutine::maybe_yield(); if (is_streaming(&trinfo)) { auto& tinfo = tmap.get_tablet_info(tid); apply_load(nodes, get_migration_streaming_info(topo, tinfo, trinfo)); } } } } future<> consider_planned_load(node_load_map& nodes, const migration_plan& mplan) { const locator::topology& topo = _tm->get_topology(); auto& tablet_meta = _tm->tablets(); for (const tablet_migration_info& tmi : mplan.migrations()) { co_await coroutine::maybe_yield(); auto& tmap = tablet_meta.get_tablet_map(tmi.tablet.table); auto& tinfo = tmap.get_tablet_info(tmi.tablet.tablet); auto streaming_info = get_migration_streaming_info(topo, tinfo, tmi); apply_load(nodes, streaming_info); } } future make_repair_plan(const migration_plan& mplan) { lblogger.debug("In make_repair_plan"); auto ret = tablet_repair_plan(); if (!_db.features().tablet_repair_scheduler) { lblogger.debug("make_repair_plan: The TABLET_REPAIR_SCHEDULER feature is not enabled"); co_return ret; } const locator::topology& topo = _tm->get_topology(); // Populate the load of the migration that is already in the plan node_load_map nodes; // TODO: share code with make_plan() topo.for_each_node([&] (const locator::node& node) { bool is_drained = node.get_state() == locator::node::state::being_decommissioned || node.get_state() == locator::node::state::being_removed; if (node.get_state() == locator::node::state::normal || is_drained) { ensure_node(nodes, node.host_id()); } }); // Consider load that is already scheduled co_await consider_scheduled_load(nodes); // Consider load that is about to be scheduled co_await consider_planned_load(nodes, mplan); service::auto_repair_stats auto_repair_stats; utils::chunked_vector plans; auto migration_tablet_ids = co_await mplan.get_migration_tablet_ids(); for (auto&& [table, tables] : _tm->tablets().all_table_groups()) { const auto& tmap = _tm->tablets().get_tablet_map(table); co_await coroutine::maybe_yield(); auto config = tmap.get_repair_scheduler_config(); auto auto_repair_enabled = is_auto_repair_enabled(config); auto now = db_clock::now(); auto skip = utils::get_local_injector().inject_parameter("tablet_repair_skip_sched"); auto skip_tablets = skip ? split_string_to_tablet_id(*skip, ',') : std::unordered_set(); co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> { auto gid = locator::global_tablet_id{table, id}; if (auto_repair_enabled) { auto_repair_stats.enabled_nr++; } // Skip tablet that is in transitions. auto* tti = tmap.get_tablet_transition_info(id); if (tti) { lblogger.debug("Skipped tablet repair for tablet={} which is already in transition={}", gid, tti->transition); co_return; } // Skip the tablet that is about to be in transition. if (migration_tablet_ids.contains(gid)) { co_return; } // Skip the tablet that has excluded replica node. auto& tinfo = tmap.get_tablet_info(id); if (tablet_has_excluded_node(topo, tinfo)) { co_return; } if (skip_tablets.contains(id)) { lblogger.debug("Skipped tablet repair for tablet={} by error injector", gid); co_return; } // Avoid rescheduling a failed tablet repair in a loop // TODO: Allow user to config const auto min_reschedule_time = std::chrono::seconds(5); if (now - info.repair_task_info.sched_time < min_reschedule_time) { lblogger.debug("Skipped tablet repair for tablet={} which is scheduled too frequently", gid); co_return; } db_clock::duration diff; auto is_user_reuqest = info.repair_task_info.is_user_repair_request(); if (is_user_reuqest) { // This means the user has issued a repair request manually. Select it for repair scheduling. } else { auto auto_repair = co_await needs_auto_repair(gid, info, config, now, diff, auto_repair_stats); if (!auto_repair) { co_return; } } auto range = tmap.get_token_range(id); auto last_token = tmap.get_last_token(id); plans.push_back(repair_plan{gid, info, range, last_token, diff, is_user_reuqest}); }); } _stats.for_cluster().auto_repair_needs_repair_nr = auto_repair_stats.needs_repair_nr; _stats.for_cluster().auto_repair_enabled_nr = auto_repair_stats.enabled_nr; // TODO: we could add other factors in addition to the repair time when // picking which tablet to repair, e.g., higher repair priority // specified by user, tablet with higher purgeable tombstone ratio. std::sort(plans.begin(), plans.end(), [] (const repair_plan& x, const repair_plan& y) { if (x.is_user_reuqest != y.is_user_reuqest) { return x.is_user_reuqest > y.is_user_reuqest; } return x.repair_time_diff > y.repair_time_diff; }); if (utils::get_local_injector().enter("tablet_dump_repair_plan")) { lblogger.info("dump_repair_plans=[{}]", fmt::join(plans, ",")); } auto trinfo = tablet_transition_info(locator::tablet_transition_stage::repair, locator::tablet_transition_kind::repair, tablet_replica_set(), {}, service::session_id()); for (auto& plan : plans) { co_await coroutine::maybe_yield(); tablet_migration_streaming_info tmsi; tmsi = get_migration_streaming_info(topo, plan.tinfo, trinfo); if (can_accept_load(nodes, tmsi)) { apply_load(nodes, tmsi); ret.add(plan.gid); } } if (utils::get_local_injector().enter("tablet_skip_repair_plan")) { lblogger.info("Skip repair plan due to error injection=tablet_skip_repair_plan"); co_return tablet_repair_plan(); } co_return ret; } future make_rack_list_colocation_plan(const migration_plan& mplan) { lblogger.debug("In make_rack_list_colocation_plan"); migration_plan plan; tablet_rack_list_colocation_plan rack_list_plan; if (!ongoing_rack_list_colocation() || utils::get_local_injector().enter("wait_with_rack_list_colocation")) { co_return plan; } const locator::topology& topo = _tm->get_topology(); auto migration_tablet_ids = co_await mplan.get_migration_tablet_ids(); auto colocation_state = co_await find_required_rack_list_colocations(_db, _tm, _sys_ks, _topology->paused_rf_change_requests, std::move(migration_tablet_ids)); node_load_map nodes; topo.for_each_node([&] (const locator::node& node) { if (node.get_state() == locator::node::state::normal && !node.is_excluded()) { ensure_node(nodes, node.host_id()); } }); // Consider load that is already scheduled. co_await consider_scheduled_load(nodes); // Consider load that is about to be scheduled. co_await consider_planned_load(nodes, mplan); std::unordered_set colocation_tablet_ids; for (auto& [dc_rack, colocation_sources] : colocation_state.dst_dc_rack_to_tablets) { auto nodes_by_load_dst = nodes | std::views::filter([&] (const auto& host_load) { auto& [host, load] = host_load; auto& node = *load.node; return node.dc_rack() == dc_rack; }) | std::views::keys | std::ranges::to>(); if (nodes_by_load_dst.empty()) { lblogger.warn("No target nodes available for RF change colocation plan in dc {}, rack {}", dc_rack.dc, dc_rack.rack); if (auto it = colocation_state.dst_to_requests.find(dc_rack); it != colocation_state.dst_to_requests.end()) { rack_list_plan.maybe_add_request_to_resume(*it->second.begin()); } continue; } auto nodes_cmp = nodes_by_load_cmp(nodes); auto nodes_dst_cmp = [&] (const host_id& a, const host_id& b) { return nodes_cmp(b, a); }; // Ascending load heap of candidate target nodes. std::make_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp); const tablet_metadata& tmeta = _tm->tablets(); for (colocation_source& source : colocation_sources) { if (colocation_tablet_ids.contains(source.gid)) { lblogger.debug("Skipped colocation of replica {} of tablet={}, another replica of which is about to be colocated", source.replica, source.gid); continue; } // Pick the least loaded node as target. std::pop_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp); auto target = nodes_by_load_dst.back(); auto& target_info = nodes[target]; auto push_back_target_node = seastar::defer([&] { std::push_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp); }); lblogger.debug("target node: {}, avg_load={}", target, target_info.avg_load); auto dst = global_shard_id {target, _load_sketch->get_least_loaded_shard(target)}; lblogger.trace("target shard: {}, tablets={}, load={}", dst.shard, target_info.shards[dst.shard].tablet_count, target_info.shard_load(dst.shard, _target_tablet_size)); tablet_transition_kind kind = tablet_transition_kind::migration; migration_tablet_set source_tablets { .tablet_s = source.gid, // Ignore the merge co-location. }; auto src = source.replica; auto mig = get_migration_info(source_tablets, kind, src, dst); auto& tmap = tmeta.get_tablet_map(source_tablets.table()); auto mig_streaming_info = get_migration_streaming_infos(topo, tmap, mig); pick(*_load_sketch, dst.host, dst.shard, source_tablets); if (can_accept_load(nodes, mig_streaming_info)) { apply_load(nodes, mig_streaming_info); lblogger.debug("Adding migration: {}", mig); mark_as_scheduled(mig); for (auto& m : mig) { plan.add(std::move(m)); colocation_tablet_ids.insert(m.tablet); } } update_node_load_on_migration(nodes, src, dst, source_tablets); } } if (colocation_state.request_to_resume) { rack_list_plan.maybe_add_request_to_resume(colocation_state.request_to_resume); } plan.set_rack_list_colocation_plan(std::move(rack_list_plan)); co_return std::move(plan); } // Returns true if a table has replicas of all its sibling tablets co-located. // This is used for determining whether merge can be finalized, since co-location // is a strict requirement for sibling tablets to be merged. future all_sibling_tablet_replicas_colocated(table_id table, const tablet_map& tmap) { bool all_colocated = true; co_await tmap.for_each_sibling_tablets([&] (tablet_desc t1, std::optional t2_opt) -> future<> { // FIXME: introduce variant of for_each_sibling_tablets() that accepts stop_iteration. if (!all_colocated) { return make_ready_future<>(); } if (!t2_opt) { on_internal_error(lblogger, format("Unable to find sibling tablet during co-location check for table {}", table)); } auto t2 = *t2_opt; // Sibling tablets cannot be considered co-located if their tablet info is temporarily unmergeable. // It can happen either has active repair task for example. all_colocated &= bool(merge_tablet_info(*t1.info, *t2.info)); return make_ready_future<>(); }); if (all_colocated) { lblogger.info("All sibling tablets are co-located for table {}", table); } co_return all_colocated; } future make_merge_colocation_plan(node_load_map& nodes) { migration_plan plan; table_resize_plan resize_plan; for (auto&& [table, tables] : _tm->tablets().all_table_groups()) { const auto& tmap = _tm->tablets().get_tablet_map(table); if (!tmap.needs_merge()) { continue; } // Also filter out replicas that don't belong to the DC being worked on. auto get_replicas = [this, &nodes] (const tablet_desc& t) { auto ret = sorted_replicas_for_tablet_load(*t.info, t.transition); const auto [first, last] = std::ranges::remove_if(ret, [&] (tablet_replica r) { return !nodes.contains(r.host); }); ret.erase(first, last); return ret; }; auto migrating = [this, table] (const tablet_desc& t) { return bool(t.transition) || _scheduled_tablets.contains(global_tablet_id{table, t.tid}); }; auto rack_of = [&topo = _tm->get_topology()] (tablet_replica tr) -> const sstring& { return topo.get_rack(tr.host); }; auto cross_rack_migration = [&] (tablet_replica src, tablet_replica dst) { return rack_of(src) != rack_of(dst); }; auto first_non_matching_replicas = [&] (tablet_replica_set r1, tablet_replica_set r2) -> std::optional> { assert(r1.size() == r2.size()); // Subtract intersecting (co-located) elements from the replicas set of sibling tablets. // Think for example that tablet 0 and 1 have replicas [n2, n4] and [n1, n2] respectively. // After subtraction, replica of tablet 1 in n1 will be a candidate for co-location with // replica of tablet 0 in n4. std::unordered_set intersection; std::ranges::set_intersection(r1, r2, std::inserter(intersection, intersection.begin())); const auto [r1_first, r1_last] = std::ranges::remove_if(r1, [&] (tablet_replica r) { return intersection.contains(r); }); r1.erase(r1_first, r1_last); const auto [r2_first, r2_last] = std::ranges::remove_if(r2, [&] (tablet_replica r) { return intersection.contains(r); }); r2.erase(r2_first, r2_last); // Favor replicas of different tablets that belong to same node. For example: // tablet 0 replicas: [n2:s1, n3:s0] // tablet 1 replicas: [n1:s0, n2:s0] // Replica in n1:s0 cannot follow sibling replica in n2:s1. Otherwise, RF invariant is broken. // Instead, tablet 1 in n2:s0 will be co-located with tablet 0 in n2:s1. std::unordered_map r1_map; std::ranges::transform(r1, std::inserter(r1_map, r1_map.begin()), [] (tablet_replica r) { return std::make_pair(r.host, r); }); for (unsigned i = 0; i < r2.size(); i++) { auto r1_it = r1_map.find(r2[i].host); if (r1_it != r1_map.end()) { return std::make_pair(r1_it->second, r2[i]); } } // Favor replicas which belong to the same rack. For example: // // tablet 0: [n1:rack3, n2:rack1] // tablet 1: [n3:rack2, n4:rack3] // // We want to move tablet1's n4:rack3 to n1:rack3 first (within rack3), for the following reasons: // 1) Minimize cross-rack migrations (they have higher cost) // In particular, this ensures that when RF=#racks, there will be no across-rack migrations. // 2) Minimize breaking of pairing: view replica is determined by rack, cross-rack migration breaks it // In particular, this ensures that when RF=#racks, no pairing will be broken. // 3) Avoid overloading racks temporarily, which is an availability risk in case the rack goes down. // Otherwise, n3:rack2 would be migrated to n1:rack3, and tablet 1 would have two replicas in rack3. // std::unordered_map r1_rack_map; for (auto&& r : r1) { auto&& rack = rack_of(r); auto i = r1_rack_map.find(rack); if (i == r1_rack_map.end()) { r1_rack_map[rack] = r; } } for (auto&& r : r2) { auto&& rack = rack_of(r); auto i = r1_rack_map.find(rack); if (i != r1_rack_map.end()) { return std::make_pair(i->second, r); } } // r1 and r2 don't share replicas, hosts, or racks. if (r1.size() > 0) { return std::make_pair(r1[0], r2[0]); } return std::nullopt; }; auto create_migration_info = [] (global_tablet_id gid, tablet_replica src, tablet_replica dst) { auto kind = (src.host != dst.host) ? tablet_transition_kind::migration : tablet_transition_kind::intranode_migration; return tablet_migration_info{kind, gid, src, dst}; }; co_await tmap.for_each_sibling_tablets([&] (tablet_desc t1, std::optional t2_opt) -> future<> { // Be optimistic about migrating tablets, as if they succeeded. // Merge finalization will have to recheck that all sibling tablets are co-located. if (!t2_opt) { on_internal_error(lblogger, format("Unable to find sibling tablet during co-location, with tablet count {}, for table {}", tmap.tablet_count(), table)); } auto t2 = *t2_opt; auto r1 = get_replicas(t1); auto r2 = get_replicas(t2); if (r1 == r2) { return make_ready_future<>(); } auto t1_id = global_tablet_id{table, t1.tid}; auto t2_id = global_tablet_id{table, t2.tid}; if (migrating(t1) || migrating(t2)) { return make_ready_future<>(); } // During RF change, tablets may have incrementally replicas allocated / deallocated to them. // Let's temporarily delay their co-location until their replica sets have the same size. if (r1.size() != r2.size()) { lblogger.warn("Replica sets of tablets to be co-located differ in size: ({}: {}), ({}, {})", t1_id, r1, t2_id, r2); return make_ready_future<>(); } // Returns true if moving candidate into dst will violate replication constraint. const auto r2_hosts = r2 | std::views::transform(std::mem_fn(&locator::tablet_replica::host)) | std::ranges::to>(); auto check_constraints = [r2_hosts = std::move(r2_hosts)] (tablet_replica src, tablet_replica dst) { // handles intra-node migration. if (src.host == dst.host && src.shard != dst.shard) { return false; } return r2_hosts.contains(dst.host); }; lblogger.debug("Replica sets of tablets being co-located: ({}: {}), ({}, {})", t1_id, r1, t2_id, r2); auto ret = first_non_matching_replicas(r1, r2); if (!ret) { // this shouldn't happen in practice, since the above call should always produce a pair of // replicas to co-locate, since we only got here if the sibling tablets aren't fully co-located. on_internal_error(lblogger, format("Unable to find replicas to co-locate for sibling tablets ({}: {}), and ({}, {})", t1_id, r1, t2_id, r2)); } // Emits migration for replica of t2 to co-habit same shard as replica of t1. auto src = ret->second; auto dst = ret->first; if (cross_rack_migration(src, dst)) { // FIXME: This is illegal if table has views, as it breaks base-view pairing. // Can happen when RF!=#racks. _current_stats->cross_rack_collocations++; lblogger.debug("Cross-rack co-location migration for {}@{} (rack: {}) to co-habit {}@{} (rack: {})", t2_id, src, rack_of(src), t1_id, dst, rack_of(dst)); utils::get_local_injector().inject("forbid_cross_rack_migration_attempt", [&] { on_fatal_internal_error(lblogger, "Cross rack colocation is not allowed, killing the node"); }); } // Node which is draining is either being decommissioned or removed. // If involved node is excluded, co-locating migration will surely fail, so it's pointless. // We should wait until the node is removed. // Also, it can fail the removenode request, as failure of this migration is interpreted as // draining failure. // In case of decommission, draining is more important than co-location, so postponing is good. if (nodes.at(dst.host).drained || nodes.at(src.host).drained) { lblogger.debug("Co-locating migration ({}, {}) -> ({}, {}) involves draining nodes, postponing", t2_id, src, t1_id, dst); return make_ready_future<>(); } // If migration will violate replication constraint, skip to next pair of replicas of sibling tablets. auto skip = check_constraints(src, dst); if (skip) { lblogger.debug("Replication constraint check failed, unable to emit migration for replica ({}, {}) to co-habit the replica ({}, {})", t2_id, src, t1_id, dst); return make_ready_future<>(); } auto mig = create_migration_info(t2_id, src, dst); auto mig_streaming_info = get_migration_streaming_info(_tm->get_topology(), *t2.info, mig); if (!can_accept_load(nodes, mig_streaming_info)) { // FIXME: we can try another pair of non-colocated replicas of same sibling tablets. lblogger.debug("Load limit reached, unable to emit migration for replica ({}, {}) to co-habit the replica ({}, {})", t2_id, src, t1_id, dst); return make_ready_future<>(); } apply_load(nodes, mig_streaming_info); lblogger.info("Created migration for replica ({}, {}) to co-habit same shard as ({}, {})", t2_id, src, t1_id, dst); mark_as_scheduled(mig); plan.add(std::move(mig)); return make_ready_future<>(); }); } plan.merge_resize_plan(std::move(resize_plan)); co_return std::move(plan); } std::tuple get_schema_and_rs(table_id table) { auto t = _db.get_tables_metadata().get_table_if_exists(table); if (!t) { on_internal_error(lblogger, format("Table {} does not exist", table)); } auto s = t->schema(); auto erm = t->get_effective_replication_map(); auto rs = erm->get_replication_strategy().maybe_as_tablet_aware(); if (!rs) { auto msg = format("Table {}.{} has no tablet_aware_replication_strategy: uses_tablets={}", s->ks_name(), s->cf_name(), erm->get_replication_strategy().uses_tablets()); on_internal_error(lblogger, msg); } return {s, rs}; } const tablet_aware_replication_strategy* get_rs(table_id id) { auto [s, rs] = get_schema_and_rs(id); return rs; } struct table_sizing { size_t current_tablet_count; // Tablet count in group0. size_t target_tablet_count; // Tablet count wanted by scheduler. sstring target_tablet_count_reason; // Winning rule for target_tablet_count value. std::optional avg_tablet_size; // nullopt when stats not yet available. size_t target_tablet_count_aligned; // target_tablet_count aligned to power of 2. resize_decision::way_type resize_decision; // Decision which should be emitted to achieve target_tablet_count_aligned. }; struct sizing_plan { std::unordered_map tables; }; struct tablet_count_and_reason { size_t tablet_count = 0; sstring reason; }; tablet_count_and_reason tablet_count_from_min_per_shard_tablet_count(const schema& s, const std::unordered_map& shards_per_dc, const std::unordered_map& shards_per_rack, const tablet_aware_replication_strategy& rs, double min_per_shard_tablet_count) { // Try to use as many tablets so that all shards in the current topology // are covered with at least `min_per_shard_tablet_count` tablets on average. size_t tablet_count = 0; const sstring* winning_dc = nullptr; sstring winning_rack; for (auto&& [dc, shards_in_dc] : shards_per_dc) { auto rf_in_dc = rs.get_replication_factor_data(dc); if (!rf_in_dc) { continue; } if (rf_in_dc->is_numeric()) { size_t tablets_in_dc = std::ceil((double) (min_per_shard_tablet_count * shards_in_dc) / rf_in_dc->count()); lblogger.debug("Estimated {} tablets due to min_per_shard_tablet_count={:.3f} for table={}.{} in DC {} ({} shards)", tablets_in_dc, min_per_shard_tablet_count, s.ks_name(), s.cf_name(), dc, shards_in_dc); if (tablets_in_dc > tablet_count) { tablet_count = tablets_in_dc; winning_dc = &dc; winning_rack = sstring(); } } else { for (auto rack : rf_in_dc->get_rack_list()) { size_t shards = 0; auto dc_rack = endpoint_dc_rack{dc, rack}; if (!shards_per_rack.contains(dc_rack)) { lblogger.warn("No shards for rack {}, but table {}.{} replicates there", rack, s.ks_name(), s.cf_name()); } else { shards = shards_per_rack.at(dc_rack); } size_t tablets_in_rack = std::ceil(min_per_shard_tablet_count * shards); lblogger.debug("Estimated {} tablets due to min_per_shard_tablet_count={:.3f} for table={}.{} in rack {} ({} shards) in DC {}", tablets_in_rack, min_per_shard_tablet_count, s.ks_name(), s.cf_name(), rack, shards, dc); if (tablets_in_rack > tablet_count) { tablet_count = tablets_in_rack; winning_dc = &dc; winning_rack = rack; } } } } if (!winning_dc) { return {}; } if (!winning_rack.empty()) { return {tablet_count, format("min_per_shard_tablet_count={:.3f} in DC {} rack {}", min_per_shard_tablet_count, *winning_dc, winning_rack)}; } return {tablet_count, format("min_per_shard_tablet_count={:.3f} in DC {}", min_per_shard_tablet_count, *winning_dc)}; } future make_sizing_plan(schema_ptr new_table = nullptr, const tablet_aware_replication_strategy* new_rs = nullptr) { std::unordered_map rs_by_table; sizing_plan plan; std::unordered_map shards_per_dc; std::unordered_map shards_per_rack; std::unordered_map> racks_per_dc; _tm->for_each_token_owner([&] (const node& n) { if (n.is_normal() && !n.is_draining()) { shards_per_dc[n.dc_rack().dc] += n.get_shard_count(); shards_per_rack[n.dc_rack()] += n.get_shard_count(); racks_per_dc[n.dc_rack().dc].insert(n.dc_rack().rack); } }); auto process_table = [&] (table_id table, const locator::table_group_set& tables, schema_ptr s, db::tablet_options tablet_options, const tablet_aware_replication_strategy* rs, size_t tablet_count) { table_sizing& table_plan = plan.tables[table]; table_plan.current_tablet_count = tablet_count; rs_by_table[table] = rs; // for a group of co-located tablets of size g with average tablet size t, the migration unit // size is g*t. in order to keep the migration unit size reasonable, we set a lower target tablet size // as the group size increases. auto target_tablet_size = _target_tablet_size / tables.size(); tablet_count_and_reason target_tablet_count = {1, ""}; auto maybe_apply = [&] (tablet_count_and_reason candidate, bool force = false) { lblogger.debug("Table {} ({}.{}) wants {} tablets due to {}", table, s->ks_name(), s->cf_name(), candidate.tablet_count, candidate.reason); if (candidate.tablet_count > target_tablet_count.tablet_count || force) { target_tablet_count = candidate; } }; maybe_apply({rs->get_initial_tablets(), "initial"}); if (tablet_options.min_tablet_count) { maybe_apply({tablet_options.min_tablet_count.value(), "min_tablet_count"}); } if (tablet_options.expected_data_size_in_gb) { maybe_apply({(tablet_options.expected_data_size_in_gb.value() << 30) / target_tablet_size, format("expected_data_size_in_gb={}", tablet_options.expected_data_size_in_gb.value())}); } auto min_per_shard_tablet_count = tablet_options.min_per_shard_tablet_count.value_or( // If min_tablet_count is set, initial_scale should not be effective for // compatibility with the deprecated "initial" tablet count. (rs->get_initial_tablets() || tablet_options.min_tablet_count) ? 0 : _initial_scale); if (min_per_shard_tablet_count) { maybe_apply(tablet_count_from_min_per_shard_tablet_count(*s, shards_per_dc, shards_per_rack, *rs, min_per_shard_tablet_count)); } auto total_size_opt = std::invoke([&] -> std::optional { size_t total_size = 0; for (auto table : tables) { const auto* table_stats = load_stats_for_table(table); if (!table_stats) { return std::nullopt; } total_size += table_stats->size_in_bytes; } return total_size; }); if (total_size_opt) { auto total_size = *total_size_opt; auto cur_decision = _tm->tablets().get_tablet_map(table).resize_decision(); auto avg_tablet_size = total_size / std::max(table_plan.current_tablet_count * tables.size(), 1); auto tablet_count_from_size = table_plan.current_tablet_count; // Split based on avg_tablet_size, or if the current resize_decision is split, apply hysteresis, // so it would get cancelled only when crossing back the half-way point. if (avg_tablet_size > target_max_tablet_size(target_tablet_size) || (cur_decision.is_split() && avg_tablet_size >= target_tablet_size)) { // TODO: extend to n-way split when needed tablet_count_from_size *= 2; } else { // Consider merge. If the current resize_decision is merge, apply hysteresis, // so it would get cancelled only when crossing back the half-way point. if (avg_tablet_size < target_min_tablet_size(target_tablet_size) || (cur_decision.is_merge() && avg_tablet_size <= target_tablet_size)) { tablet_count_from_size /= 2; } } table_plan.avg_tablet_size = avg_tablet_size; maybe_apply({tablet_count_from_size, format("avg_tablet_size={}", avg_tablet_size)}); } else { // When we don't have tablet size info, allow tablet count to increase but not to decrease. // Increasing will always bring us closer to the true target count, since tablet_count_from_size // can only increase the count above it, but decreasing may go against the true target count // if tablet_count_from_size would demand more tablets. maybe_apply({table_plan.current_tablet_count, "current count"}); } // Apply max_tablet_count cap after all other factors have been considered. if (tablet_options.max_tablet_count) { if (target_tablet_count.tablet_count > static_cast(*tablet_options.max_tablet_count)) { maybe_apply({static_cast(*tablet_options.max_tablet_count), "max_tablet_count"}, true); } } if (utils::get_local_injector().enter("tablet_force_tablet_count_increase")) { target_tablet_count = {tablet_count * 2, "force_tablet_count_increase"}; } else if (utils::get_local_injector().enter("tablet_force_tablet_count_decrease")) { auto size = std::max(size_t(1), tablet_count / 2); target_tablet_count = {size, "force_tablet_count_decrease"}; } table_plan.target_tablet_count = target_tablet_count.tablet_count; table_plan.target_tablet_count_reason = target_tablet_count.reason; lblogger.debug("Table {} ({}.{}) target_tablet_count: {} ({})", table, s->ks_name(), s->cf_name(), table_plan.target_tablet_count, table_plan.target_tablet_count_reason); }; for (const auto& [table, tables] : _tm->tablets().all_table_groups()) { const auto& tmap = _tm->tablets().get_tablet_map(table); auto [s, rs] = get_schema_and_rs(table); auto tablet_options = combine_tablet_options( tables | std::views::transform([&] (table_id table) { return _db.get_tables_metadata().get_table_if_exists(table); }) | std::views::filter([] (auto t) { return t != nullptr; }) | std::views::transform([] (auto t) { return t->schema()->tablet_options(); }) ); process_table(table, tables, s, tablet_options, rs, tmap.tablet_count()); co_await coroutine::maybe_yield(); } if (new_table) { process_table(new_table->id(), {new_table->id()}, new_table, new_table->tablet_options(), new_rs, 0); } // Below section ensures we respect the _tablets_per_shard_goal. // // It will scale down target_tablet_count for all tables so that // the average number of tablets per shard in each DC or rack does not exceed _tablets_per_shard_goal. // // The impact of table's tablet count on average per-shard tablet replica count // is different in each rack because replication factors are different in each DC/rack. // Numerical RF impacts all racks in a DC. Rack-list RF impacts particular racks. // // The algorithm works like this: // Compute average tablet replica count per-shard in each rack, // determine if per-shard goal is exceeded in that rack, // compute scale factor by which tablet count should be multiplied so that the goal // is not exceeded in that rack. // Take the smallest scale factor among all racks, which ensures that no rack is overloaded. // // We align tablet counts to the nearest power of 2 post-scaling, which // means that scaling may not be effective and in the worst case we may overshoot the goal by // a factor of 2. This is acceptable since the goal is a soft limit and not a hard constraint. // Scaling post-alignment would be problematic. If we scale down all tables fairly, we undershoot the goal // by a factor of 2 in the worst case. If we choose a subset of tables to scale down by a factor of 2 then // we have a problem of making sure that the choice is stable across scheduler invocations to avoid // oscillations of decisions. struct scale_info { double factor; endpoint_dc_rack source; }; std::unordered_map table_scaling; for (auto&& [rack, shard_count] : shards_per_rack) { double cur_avg_tablets_per_shard = 0; double new_avg_tablets_per_shard = 0; for (auto&& [table, table_plan] : plan.tables) { auto* rs = rs_by_table[table]; auto rf = rs->get_replication_factor_data(rack.dc); auto get_avg_tablets_per_shard = [&] (size_t tablet_count) -> double { if (!rf) { return 0; } if (rf->is_numeric()) { auto racks_in_dc = racks_per_dc.at(rack.dc).size(); return double(tablet_count) * rf->count() / shard_count / racks_in_dc; } if (std::ranges::contains(rf->get_rack_list(), rack.rack)) { return double(tablet_count) / shard_count; } return 0; }; auto cur_tablets_per_shard = get_avg_tablets_per_shard(table_plan.current_tablet_count); cur_avg_tablets_per_shard += cur_tablets_per_shard; lblogger.debug("cur_avg_tablets_per_shard [dc={}, rack={}, table={}]: {:.3f}", rack.dc, rack.rack, table, cur_tablets_per_shard); auto new_tablets_per_shard = get_avg_tablets_per_shard(table_plan.target_tablet_count); new_avg_tablets_per_shard += new_tablets_per_shard; lblogger.debug("new_avg_tablets_per_shard [dc={}, rack={}, table={}]: {:.3f}", rack.dc, rack.rack, table, new_tablets_per_shard); } { bool overloaded = cur_avg_tablets_per_shard > _tablets_per_shard_goal; lblogger.debug("cur_avg_tablets_per_shard[dc={},rack={}]: {:.3f}{}", rack.dc, rack.rack, cur_avg_tablets_per_shard, overloaded ? " (overloaded!)" : ""); } bool overloaded = new_avg_tablets_per_shard > _tablets_per_shard_goal; lblogger.debug("new_avg_tablets_per_shard[dc={},rack={}]: {:.3f}{}", rack.dc, rack.rack, new_avg_tablets_per_shard, overloaded ? " (overloaded!)" : ""); if (overloaded) { auto scale = scale_info{_tablets_per_shard_goal / new_avg_tablets_per_shard, rack}; for (auto&& [table, table_plan]: plan.tables) { auto* rs = rs_by_table[table]; auto rf = rs->get_replication_factor_data(rack.dc); // If table has no replicas in this rack, scaling it won't help and is harmful to its distribution // in other DCs or racks. if (rf && (rf->is_numeric() || std::ranges::contains(rf->get_rack_list(), rack.rack))) { auto [i, inserted] = table_scaling.try_emplace(table, scale); if (!inserted) { if (scale.factor < i->second.factor) { i->second = std::move(scale); } } } } } } for (auto&& [table, scale] : table_scaling) { auto& table_plan = plan.tables[table]; auto new_count = std::max(1, table_plan.target_tablet_count * scale.factor); lblogger.debug("Scaling down table {} by a factor of {:.3f} due to {}.{}: {} => {}", table, scale.factor, scale.source.dc, scale.source.rack, table_plan.target_tablet_count, new_count); table_plan.target_tablet_count = new_count; table_plan.target_tablet_count_reason = format("{} scaled by {:.3f} due to {}.{}", table_plan.target_tablet_count_reason, scale.factor, scale.source.dc, scale.source.rack); } // Generate: // table_plan.target_tablet_count_aligned // table_plan.resize_decision for (auto&& [table, table_plan] : plan.tables) { table_plan.target_tablet_count_aligned = 1u << log2ceil(table_plan.target_tablet_count); if (table_plan.target_tablet_count_aligned > table_plan.current_tablet_count) { table_plan.resize_decision = locator::resize_decision::split(); } else if (table_plan.target_tablet_count_aligned < table_plan.current_tablet_count) { table_plan.resize_decision = locator::resize_decision::merge(); } lblogger.debug("Table {}, {} => {} ({}: {}), resize: {}", table, table_plan.current_tablet_count, table_plan.target_tablet_count_aligned, table_plan.target_tablet_count, table_plan.target_tablet_count_reason, table_plan.resize_decision); } co_return std::move(plan); } future make_resize_plan(const migration_plan& plan) { table_resize_plan resize_plan; if (!_tm->tablets().balancing_enabled()) { co_return std::move(resize_plan); } auto table_sizing_plan = co_await make_sizing_plan(); cluster_resize_load resize_load; for (auto&& [table, table_plan] : table_sizing_plan.tables) { auto& tmap = _tm->tablets().get_tablet_map(table); if (!table_plan.avg_tablet_size) { continue; } // shard presence of a table across the cluster size_t shard_count = std::accumulate(tmap.tablets().begin(), tmap.tablets().end(), size_t(0), [] (size_t shard_count, const locator::tablet_info& info) { return shard_count + info.replicas.size(); }); resize_decision new_resize_decision; new_resize_decision.way = table_plan.resize_decision; table_size_desc size_desc { .avg_tablet_size = *table_plan.avg_tablet_size, .resize_decision = tmap.resize_decision(), .new_resize_decision = new_resize_decision, .tablet_count = table_plan.current_tablet_count, .shard_count = shard_count, .reason = table_plan.target_tablet_count_reason, }; resize_load.update(table, std::move(size_desc)); lblogger.debug("Table {} with tablet_count={} has an average tablet size of {}", table, tmap.tablet_count(), *table_plan.avg_tablet_size); co_await coroutine::maybe_yield(); } // Emit new resize decisions // The limit of resize requests is determined by the shard presence (count) of tables involved. // If tables still have a low tablet count, the concurrency must be high in order to saturate the cluster. // If a table covers the entire cluster, and needs split, concurrency will be reduced to 1. size_t total_shard_count = std::invoke([this] { size_t shard_count = 0; _tm->for_each_token_owner([&] (const locator::node& node) { shard_count += node.get_shard_count(); }); return shard_count; }); size_t resizing_shard_count = std::accumulate(resize_load.tables_being_resized.begin(), resize_load.tables_being_resized.end(), size_t(0), [] (size_t shard_count, const auto& table_desc) { return shard_count + table_desc.second.shard_count; }); // Limits the amount of new resize requests to be generated in a single round, as each one is a mutation to group0. constexpr size_t max_new_resize_requests = 10; auto available_shards = std::max(ssize_t(total_shard_count) - ssize_t(resizing_shard_count), ssize_t(0)); std::make_heap(resize_load.tables_need_resize.begin(), resize_load.tables_need_resize.end(), resize_load.resize_urgency_cmp()); while (resize_load.tables_need_resize.size() && resize_plan.size() < max_new_resize_requests) { const auto& [table, size_desc] = resize_load.tables_need_resize.front(); if (resize_plan.size() > 0 && std::cmp_less(available_shards, size_desc.shard_count)) { break; } auto resize_decision = cluster_resize_load::to_resize_decision(size_desc); lblogger.info("Emitting resize decision of type {} for table {}, avg_tablet_size={} reason={}", resize_decision.type_name(), table, size_desc.avg_tablet_size, size_desc.reason); resize_plan.resize[table] = std::move(resize_decision); _stats.for_cluster().resizes_emitted++; std::pop_heap(resize_load.tables_need_resize.begin(), resize_load.tables_need_resize.end(), resize_load.resize_urgency_cmp()); resize_load.tables_need_resize.pop_back(); available_shards -= size_desc.shard_count; } // Revoke resize decision if any table no longer needs it // Also communicate coordinator if any table is ready for finalizing resizing for (const auto& [table, size_desc] : resize_load.tables_being_resized) { if (resize_load.table_needs_resize_cancellation(size_desc)) { resize_plan.resize[table] = cluster_resize_load::revoke_resize_decision(); _stats.for_cluster().resizes_revoked++; lblogger.info("Revoking resize decision for table {}, avg_tablet_size={} reason={}", table, size_desc.avg_tablet_size, size_desc.reason); continue; } auto& tmap = _tm->tablets().get_tablet_map(table); const auto& table_groups = _tm->tablets().all_table_groups(); auto finalize_decision = [&] { if (utils::get_local_injector().enter("tablet_resize_finalization_postpone")) { return; } _stats.for_cluster().resizes_finalized++; resize_plan.finalize_resize.insert(table); }; // If all replicas have completed split work for the current sequence number, it means that // load balancer can emit finalize decision, for split to be completed. if (tmap.needs_split()) { bool all_tables_ready = std::ranges::all_of(table_groups.at(table), [&, seq_num = tmap.resize_decision().sequence_number] (table_id table) { const auto* table_stats = load_stats_for_table(table); return table_stats && table_stats->split_ready_seq_number == seq_num; }); if (all_tables_ready) { finalize_decision(); lblogger.info("Finalizing resize decision for table {} as all replicas agree on sequence number {}", table, tmap.resize_decision().sequence_number); } // If all sibling tablets are co-located across all DCs, then merge can be finalized. } else if (tmap.needs_merge() && co_await all_sibling_tablet_replicas_colocated(table, tmap) && !bypass_merge_completion()) { finalize_decision(); lblogger.info("Finalizing resize decision for table {} as all replicas are co-located", table); } } co_return std::move(resize_plan); } void apply_load(node_load_map& nodes, const tablet_migration_streaming_info& info) { for (auto&& replica : info.read_from) { if (nodes.contains(replica.host)) { nodes[replica.host].shards[replica.shard].streaming_read_load += info.stream_weight; } } for (auto&& replica : info.written_to) { if (nodes.contains(replica.host)) { nodes[replica.host].shards[replica.shard].streaming_write_load += info.stream_weight; } } } void apply_load(node_load_map& nodes, const migration_streaming_info_vector& infos) { for (auto& info : infos) { apply_load(nodes, info); } } bool can_accept_load(node_load_map& nodes, const tablet_migration_streaming_info& info) { for (auto r : info.read_from) { if (!nodes.contains(r.host)) { continue; } auto load = nodes[r.host].shards[r.shard].streaming_read_load; if (load > 0 && load + info.stream_weight > max_read_streaming_load) { lblogger.debug("Migration skipped because of read load limit on {} ({})", r, load); return false; } } for (auto r : info.written_to) { if (!nodes.contains(r.host)) { continue; } auto load = nodes[r.host].shards[r.shard].streaming_write_load; if (load > 0 && load + info.stream_weight > max_write_streaming_load) { lblogger.debug("Migration skipped because of write load limit on {} ({})", r, load); return false; } } return true; } // Precondition: all migration streaming info have same source and destination. // FIXME: remove precondition but it's not easy without copying noad_load_map. bool can_accept_load(node_load_map& nodes, const migration_streaming_info_vector& infos) { // Since all migration info have the same source and destination, the load check can be easily done // by informing the number of migrations. auto info = infos[0]; info.stream_weight = infos.size(); return can_accept_load(nodes, info); } bool in_shuffle_mode() const { return utils::get_local_injector().enter("tablet_allocator_shuffle"); } bool is_balanced(load_type min_load, load_type max_load) const { if (_force_capacity_based_balancing) { return min_load == max_load; } if (max_load == 0) { return true; } const load_type load_delta = max_load - min_load; return (load_delta / max_load) < _size_based_balance_threshold; } // If cluster cannot agree on tablet merge feature, then merge will not be finalized since // not all nodes in the cluster can handle the finalization step. bool bypass_merge_completion() const { return !_db.features().tablet_merge || utils::get_local_injector().enter("tablet_merge_completion_bypass"); } size_t rand_int() const { static thread_local std::default_random_engine re{std::random_device{}()}; static thread_local std::uniform_int_distribution dist; return dist(re); } shard_id rand_shard(shard_id shard_count) const { return rand_int() % shard_count; } table_id pick_table(const table_candidates_map& candidates) { if (!_use_table_aware_balancing) { on_internal_error(lblogger, "pick_table() called when table-aware balancing is disabled"); } size_t total = 0; for (auto&& [table, tablets] : candidates) { total += tablets.size(); } ssize_t candidate_index = rand_int() % total; for (auto&& [table, tablets] : candidates) { candidate_index -= tablets.size(); if (candidate_index <= 0 && !tablets.empty()) { return table; } } on_internal_error(lblogger, "No candidate table"); } migration_tablet_set peek_candidate(shard_load& shard_info) { if (_use_table_aware_balancing) { auto table = pick_table(shard_info.candidates); return *shard_info.candidates[table].begin(); } return *shard_info.candidates_all_tables.begin(); } // Evaluates impact on load balance of migrating a tablet set of a given table to dst. migration_badness evaluate_dst_badness(node_load_map& nodes, table_id table, tablet_replica dst, uint64_t tablet_set_disk_size) { _current_stats->candidates_evaluated++; auto& node_info = nodes[dst.host]; // Size of all tablet replicas of the table in bytes. uint64_t table_size = _disk_used_per_table[table]; if (node_info.drained) { // Moving a tablet to a drained node is always bad. // We may not have capacity information for the drained node, so we can't evaluate exact badness. return migration_badness{0, 0, table_size, table_size}; } double ideal_table_load = double(table_size) / _total_capacity_storage; auto compute_load_and_dst_badness = [&] (uint64_t capacity, uint64_t new_used) { double new_load = double(new_used) / capacity; // Divide badness by table_size to take into account that moving a tablet of a small table has // greater impact on balance of that table than moving a tablet of the same size of a larger table return std::make_pair(new_load, (new_load - ideal_table_load) / table_size); }; uint64_t capacity = node_info.shards[dst.shard].dusage->capacity; uint64_t new_used = node_info.shards[dst.shard].tablet_sizes_per_table[table] + tablet_set_disk_size; auto [new_shard_load, dst_shard_badness] = compute_load_and_dst_badness(capacity, new_used); lblogger.trace("Table {} @{} shard balance threshold: {}, dst: {} ({:.4f})", table, dst, ideal_table_load, new_shard_load, dst_shard_badness); capacity = node_info.dusage->capacity; new_used = node_info.tablet_sizes_per_table[table] + tablet_set_disk_size; auto [new_node_load, dst_node_badness] = compute_load_and_dst_badness(capacity, new_used); lblogger.trace("Table {} @{} node balance threshold: {}, dst: {} ({:.4f})", table, dst, ideal_table_load, new_node_load, dst_node_badness); return migration_badness{0, 0, dst_shard_badness, dst_node_badness}; } // Evaluates impact on load balance of migrating a tablet set of a given table from src. migration_badness evaluate_src_badness(node_load_map& nodes, table_id table, tablet_replica src, uint64_t tablet_set_disk_size) { _current_stats->candidates_evaluated++; auto& node_info = nodes[src.host]; // Size of all tablet replicas of the table in bytes. uint64_t table_size = _disk_used_per_table[table]; if (node_info.drained) { // Moving a tablet away from a drained node is always good. return migration_badness{-1, -1, 0, 0}; } double ideal_table_load = double(table_size) / _total_capacity_storage; auto compute_load_and_src_badness = [&] (uint64_t capacity, uint64_t new_used) { // Divide badness by table_size to take into account that moving a tablet of a small table has // greater impact on balance of that table than moving a tablet of the same size of a larger table double new_load = double(new_used) / capacity; return std::make_pair(new_load, (ideal_table_load - new_load) / table_size); }; uint64_t capacity = node_info.shards[src.shard].dusage->capacity; uint64_t new_used = node_info.shards[src.shard].tablet_sizes_per_table[table] - tablet_set_disk_size; auto [new_shard_load, src_shard_badness] = compute_load_and_src_badness(capacity, new_used); lblogger.trace("Table {} @{} shard balance threshold: {}, src: {} ({:.4f})", table, src, ideal_table_load, new_shard_load, src_shard_badness); capacity = node_info.dusage->capacity; new_used = node_info.tablet_sizes_per_table[table] - tablet_set_disk_size; auto [new_node_load, src_node_badness] = compute_load_and_src_badness(capacity, new_used); lblogger.trace("Table {} @{} node balance threshold: {}, src: {} ({:.4f})", table, src, ideal_table_load, new_node_load, src_node_badness); return migration_badness{src_shard_badness, src_node_badness, 0, 0}; } // Evaluates impact on load balance of migrating a single tablet of a given table from src to dst. migration_badness evaluate_candidate(node_load_map& nodes, table_id table, tablet_replica src, tablet_replica dst, uint64_t tablet_set_disk_size) { auto src_badness = evaluate_src_badness(nodes, table, src, tablet_set_disk_size); auto dst_badness = evaluate_dst_badness(nodes, table, dst, tablet_set_disk_size); if (src.host == dst.host) { src_badness.src_node_badness = 0; dst_badness.dst_node_badness = 0; } return { src_badness.src_shard_badness, src_badness.src_node_badness, dst_badness.dst_shard_badness, dst_badness.dst_node_badness }; } future peek_candidate(node_load_map& nodes, shard_load& shard_info, tablet_replica src, tablet_replica dst) { if (!_use_table_aware_balancing) { co_return migration_candidate{peek_candidate(shard_info), src, dst, migration_badness{}}; } if (shard_info.candidates.empty()) { on_internal_error(lblogger, format("No candidates for migration on {}", src)); } std::optional best_candidate; for (auto&& [table, tablets] : shard_info.candidates) { if (!tablets.empty()) { auto badness = evaluate_candidate(nodes, table, src, dst, tablets.begin()->tablet_set_disk_size); auto candidate = migration_candidate{*tablets.begin(), src, dst, badness}; lblogger.trace("Candidate: {}", candidate); if (!best_candidate || candidate.badness < best_candidate->badness) { best_candidate = candidate; } } } if (!best_candidate) { on_internal_error(lblogger, format("No candidates for migration on {}", src)); } lblogger.trace("Best candidate: {}", *best_candidate); co_return *best_candidate; } void erase_candidate(shard_load& shard_info, migration_tablet_set tablets) { if (_use_table_aware_balancing) { auto table = tablets.table(); shard_info.candidates[table].erase(tablets); if (shard_info.candidates[table].empty()) { shard_info.candidates.erase(table); } } else { shard_info.candidates_all_tables.erase(tablets); } } void maybe_erase_colocated_candidate(shard_load& shard_info, const tablet_map& tmap, global_tablet_id tablet) { if (!tmap.needs_merge()) { return; } auto siblings = tmap.sibling_tablets(tablet.tablet); if (!siblings) { on_internal_error(lblogger, format("Unable to find sibling tablet of {} during merge", tablet)); } auto left_sibling = global_tablet_id{tablet.table, siblings->first}; auto right_sibling = global_tablet_id{tablet.table, siblings->second}; erase_candidate(shard_info, migration_tablet_set{colocated_tablets{left_sibling, right_sibling}}); } void erase_candidates(node_load_map& nodes, const tablet_map& tmap, const migration_tablet_set& tablets) { // FIXME: indentation. for (auto tablet : tablets.tablets()) { auto& src_tinfo = tmap.get_tablet_info(tablet.tablet); for (auto&& r : src_tinfo.replicas) { if (nodes.contains(r.host)) { lblogger.trace("Erasing tablet {} from {}", tablet, r); // Not necessarily all replicas of sibling tablets are co-located, and so we need to // remove them from candidate list using global_tablet_id. erase_candidate(nodes[r.host].shards[r.shard], migration_tablet_set{tablet}); maybe_erase_colocated_candidate(nodes[r.host].shards[r.shard], tmap, tablet); } } } } void add_candidate(shard_load& shard_info, migration_tablet_set tablets) { if (_use_table_aware_balancing) { shard_info.candidates[tablets.table()].insert(tablets); } else { shard_info.candidates_all_tables.insert(tablets); } } // Checks whether moving a tablet from src_info to target_info would go against convergence. // Returns false if the tablet should not be moved, and true if it may be moved. // // Moving tablets only when this method returns true ensures that balancing nodes will reach convergence. // Otherwise, oscillations of tablet load between nodes across different plan making rounds could happen, // where tablets are moved back and forth between nodes and convergence is never reached. // // The assumption is that the algorithm moves tablets from more loaded nodes to less loaded nodes, // so convergence is reached where the node we picked as source has lower or equal load, than the node we // picked as the destination will have post-movement. bool check_convergence(node_load& src_info, node_load& dst_info, uint64_t tablet_sizes) { if (src_info.drained) { return true; } if (dst_info.drained) { return false; } // Allow migrating only from candidate nodes which have higher load than the target. if (src_info.avg_load <= dst_info.avg_load) { lblogger.trace("Load inversion: src={} (avg_load={}), dst={} (avg_load={})", src_info.id, src_info.avg_load, dst_info.id, dst_info.avg_load); return false; } // Prevent load inversion post-movement which can lead to oscillations. if (src_info.avg_load <= *dst_info.get_avg_load(tablet_sizes)) { lblogger.trace("Load inversion post-movement: src={} (avg_load={}), dst={} (avg_load={}) tablet_sizes={}", src_info.id, src_info.avg_load, dst_info.id, dst_info.avg_load, tablet_sizes); return false; } return true; } bool check_convergence(node_load& src_info, node_load& dst_info, const migration_tablet_set& tablet_set) { return check_convergence(src_info, dst_info, tablet_set.tablet_set_disk_size); } // Checks whether moving a tablet from shard A to B (intra-node) would go against convergence. // Returns false if the tablet should not be moved, and true if it may be moved. // Can be called when node_info.drained. bool check_intranode_convergence(const node_load& node_info, shard_id src_shard, shard_id dst_shard, uint64_t used_size_delta) { return node_info.shard_load(src_shard) > node_info.shard_load(dst_shard, int64_t(used_size_delta)); } // Can be called when node_info.drained. bool check_intranode_convergence(const node_load& node_info, shard_id src_shard, shard_id dst_shard, const migration_tablet_set& tablet_set) { return check_intranode_convergence(node_info, src_shard, dst_shard, tablet_set.tablet_set_disk_size); } // Adjusts the load of the source and destination shards in the host where intra-node migration happens. void update_node_load_on_migration(node_load& node_load, host_id host, shard_id src, shard_id dst, const migration_tablet_set& tablet_set) { auto tablet_count = tablet_set.tablets().size(); auto tablet_sizes = tablet_set.tablet_set_disk_size; auto table = tablet_set.tablets().front().table; auto& dst_shard = node_load.shards[dst]; dst_shard.tablet_count += tablet_count; dst_shard.tablet_count_per_table[table] += tablet_count; dst_shard.tablet_sizes_per_table[table] += tablet_sizes; dst_shard.dusage->used += tablet_sizes; auto& src_shard = node_load.shards[src]; src_shard.tablet_count -= tablet_count; src_shard.tablet_count_per_table[table] -= tablet_count; src_shard.tablet_sizes_per_table[table] -= tablet_sizes; src_shard.dusage->used -= tablet_sizes; } // Adjusts the load of the source and destination (host:shard) that were picked for the migration. void update_node_load_on_migration(node_load_map& nodes, tablet_replica src, tablet_replica dst, const migration_tablet_set& tablet_set) { auto tablet_count = tablet_set.tablets().size(); auto tablet_sizes = tablet_set.tablet_set_disk_size; auto table = tablet_set.tablets().front().table; auto& dst_node = nodes[dst.host]; auto& dst_shard = dst_node.shards[dst.shard]; dst_shard.tablet_count += tablet_count; dst_shard.tablet_count_per_table[table] += tablet_count; dst_shard.tablet_sizes_per_table[table] += tablet_sizes; dst_shard.dusage->used += tablet_sizes; dst_node.tablet_count_per_table[table] += tablet_count; dst_node.tablet_sizes_per_table[table] += tablet_sizes; dst_node.tablet_count += tablet_count; dst_node.dusage->used += tablet_sizes; dst_node.update(); auto& src_node = nodes[src.host]; auto& src_shard = src_node.shards[src.shard]; src_shard.tablet_count -= tablet_count; src_shard.tablet_count_per_table[table] -= tablet_count; src_shard.tablet_sizes_per_table[table] -= tablet_sizes; if (src_shard.dusage) { src_shard.dusage->used -= tablet_sizes; } src_node.tablet_count_per_table[table] -= tablet_count; src_node.tablet_sizes_per_table[table] -= tablet_sizes; src_node.tablet_count -= tablet_count; if (src_node.dusage) { src_node.dusage->used -= tablet_sizes; } src_node.update(); } static void unload(locator::load_sketch& sketch, host_id host, shard_id shard, const migration_tablet_set& tablet_set) { sketch.unload(host, shard, tablet_set.tablets().size(), tablet_set.tablet_set_disk_size); } static void pick(locator::load_sketch& sketch, host_id host, shard_id shard, const migration_tablet_set& tablet_set) { sketch.pick(host, shard, tablet_set.tablets().size(), tablet_set.tablet_set_disk_size); } void mark_as_scheduled(const tablet_migration_info& mig) { _scheduled_tablets.insert(mig.tablet); } void mark_as_scheduled(const migration_plan::migrations_vector& migs) { for (auto&& mig : migs) { mark_as_scheduled(mig); } } future make_node_plan(node_load_map& nodes, host_id host, node_load& node_load) { migration_plan plan; const tablet_metadata& tmeta = _tm->tablets(); bool shuffle = in_shuffle_mode(); if (node_load.shard_count <= 1) { lblogger.debug("Node {} is balanced", host); co_return plan; } auto& sketch = *_load_sketch; // Keeps candidate source shards in a heap which yields highest-loaded shard first. std::vector src_shards; src_shards.reserve(node_load.shard_count); for (shard_id shard = 0; shard < node_load.shard_count; shard++) { src_shards.push_back(shard); } std::make_heap(src_shards.begin(), src_shards.end(), node_load.shards_by_load_cmp()); load_type max_load = 0; // Tracks max load among shards which ran out of candidates. while (true) { co_await coroutine::maybe_yield(); if (src_shards.empty()) { lblogger.debug("Unable to balance node {}: ran out of candidates, max load: {}, avg load: {}", host, max_load, node_load.avg_load); break; } shard_id src, dst; // Post-conditions: // 1) src and dst are chosen. // 2) src_shards.back() == src. if (shuffle) { src = src_shards[rand_shard(src_shards.size())]; std::swap(src_shards.back(), src_shards[src]); do { dst = rand_shard(node_load.shard_count); } while (src == dst); // There are at least two shards here so this converges. } else { std::pop_heap(src_shards.begin(), src_shards.end(), node_load.shards_by_load_cmp()); src = src_shards.back(); dst = sketch.get_least_loaded_shard(host); } auto push_back = seastar::defer([&] { // When shuffling, src_shards is not a heap. if (!shuffle) { std::push_heap(src_shards.begin(), src_shards.end(), node_load.shards_by_load_cmp()); } }); auto& src_info = node_load.shards[src]; // Convergence check // When in shuffle mode, exit condition is guaranteed by running out of candidates or by load limit. if (!shuffle && src == dst) { lblogger.debug("Node {} is balanced", host); break; } if (!src_info.has_candidates()) { lblogger.debug("No more candidates on shard {} of {}", src, host); if (src_info.dusage) { max_load = std::max(max_load, src_info.dusage->get_load()); } src_shards.pop_back(); push_back.cancel(); continue; } auto candidate = co_await peek_candidate(nodes, src_info, tablet_replica{host, src}, tablet_replica{host, dst}); auto tablets = candidate.tablets; if (!shuffle && (src == dst || !check_intranode_convergence(node_load, src, dst, tablets))) { lblogger.debug("Node {} is balanced", host); break; } // Emit migration. auto mig = get_migration_info(tablets, tablet_transition_kind::intranode_migration, tablet_replica{host, src}, tablet_replica{host, dst}); auto& tmap = tmeta.get_tablet_map(tablets.table()); auto mig_streaming_info = get_migration_streaming_infos(_tm->get_topology(), tmap, mig); if (!can_accept_load(nodes, mig_streaming_info)) { _current_stats->migrations_skipped++; lblogger.debug("Unable to balance {}: load limit reached", host); break; } apply_load(nodes, mig_streaming_info); lblogger.debug("Adding migration: {} size: {}", mig, tablets.tablet_set_disk_size); _current_stats->migrations_produced++; _current_stats->intranode_migrations_produced++; mark_as_scheduled(mig); plan.add(std::move(mig)); erase_candidates(nodes, tmap, tablets); update_node_load_on_migration(node_load, host, src, dst, tablets); pick(sketch, host, dst, tablets); unload(sketch, host, src, tablets); } co_return plan; } future make_intranode_plan(node_load_map& nodes, const std::unordered_set& skip_nodes) { migration_plan plan; for (auto&& [host, node_load] : nodes) { if (skip_nodes.contains(host)) { lblogger.debug("Skipped balancing of node {}", host); continue; } plan.merge(co_await make_node_plan(nodes, host, node_load)); } co_return plan; } struct skip_info { std::unordered_set viable_targets; }; // Verifies if moving a given tablet from src_info.id to dst_info.id would not violate // replication constraints (no increase in replica co-location on nodes, racks). // Returns std::nullopt if it does not and the movement is allowed. std::optional check_constraints(node_load_map& nodes, const locator::tablet_map& tmap, node_load& src_info, node_load& dst_info, global_tablet_id tablet, bool need_viable_targets) { int max_rack_load; std::unordered_map rack_load; auto rs = get_rs(tablet.table); auto get_viable_targets = [&] () { std::unordered_set viable_targets; for (auto&& [id, node] : nodes) { if (node.dc() != src_info.dc() || node.drained) { continue; } if (rs->is_rack_based(_dc) && node.rack() != src_info.rack()) { continue; } viable_targets.emplace(id); } for (auto&& r : tmap.get_tablet_info(tablet.tablet).replicas) { viable_targets.erase(r.host); } if (_rack || rs->is_rack_based(_dc)) { // If _rack is set, "nodes" contains only nodes from a single rack, and so does viable_targets. // Therefore, rack overload constraints cannot possibly exclude any target. // Also, if replication factor is rack based, we only move tablets within the rack, and // viable targets belong to the same rack as source, and overload also cannot happen. return viable_targets; } for (auto&& r : tmap.get_tablet_info(tablet.tablet).replicas) { auto* node = _tm->get_topology().find_node(r.host); if (!node) { on_internal_error(lblogger, format("Node {} not found in topology", r.host)); } if (node->dc() == src_info.dc()) { rack_load[node->rack()] += 1; } } // Drop targets which would increase max rack load. max_rack_load = std::max_element(rack_load.begin(), rack_load.end(), [] (auto& a, auto& b) { return a.second < b.second; })->second; for (auto i = viable_targets.begin(); i != viable_targets.end(); ) { auto target = *i; auto& t_info = nodes[target]; auto old_i = i++; if (src_info.rack() != t_info.rack()) { auto new_rack_load = rack_load[t_info.rack()] + 1; if (new_rack_load > max_rack_load) { viable_targets.erase(old_i); } } } return viable_targets; }; if (!_rack && dst_info.rack() != src_info.rack()) { auto targets = get_viable_targets(); if (rs->is_rack_based(_dc)) { lblogger.debug("candidate tablet {} skipped because RF is rack-based and it's in a different rack", tablet); _current_stats->tablets_skipped_rack++; return skip_info{std::move(targets)}; } if (!targets.contains(dst_info.id)) { auto new_rack_load = rack_load[dst_info.rack()] + 1; lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}", tablet, dst_info.rack(), new_rack_load, max_rack_load); _current_stats->tablets_skipped_rack++; return skip_info{std::move(targets)}; } } for (auto&& r : tmap.get_tablet_info(tablet.tablet).replicas) { if (r.host == dst_info.id) { _current_stats->tablets_skipped_node++; lblogger.debug("candidate tablet {} skipped because it has a replica on target node", tablet); if (need_viable_targets) { return skip_info{get_viable_targets()}; } return skip_info{}; } } return std::nullopt; } // Verifies if moving a given tablet from src_info.id to dst_info.id would not violate // replication constraints (no increase in replica co-location on nodes, racks). // Returns std::nullopt if it does not and the movement is allowed. // // The constraints might not be the same for two sibling tablets that have co-located // replicas. // Example: // nodes = {A, B, C, D} // tablet1 = {A, B, C} // tablet2 = {A, B, D} // viable target for {tablet1, B} is D. // viable target for {tablet2, B} is C. // // When co-located replicas share a viable target, then a migration can be emitted to // preserve co-location. // To allow decommission when co-located replicas don't share a viable target, a skip // info will be returned for each tablet, even though that means breaking this // co-location. Decommission is higher in priority. using skip_info_vector = std::vector>; std::optional check_constraints(node_load_map& nodes, const locator::tablet_map& tmap, node_load& src_info, node_load& dst_info, migration_tablet_set tablet_set, bool need_viable_targets) { std::unordered_map viable_targets_count; std::unordered_map skip_info_per_tablet; std::unordered_set shared_viable_targets; const size_t tablet_count = tablet_set.tablets().size(); for (auto tablet : tablet_set.tablets()) { auto skip = check_constraints(nodes, tmap, src_info, dst_info, tablet, need_viable_targets); if (!skip) { continue; } for (const auto& target : skip->viable_targets) { // A viable target is considered shared if all candidates share that same viable target. if (++viable_targets_count[target] == tablet_count) { shared_viable_targets.insert(target); } } skip_info_per_tablet.emplace(std::make_pair(tablet, std::move(*skip))); } if (skip_info_per_tablet.empty()) { return std::nullopt; } if (!shared_viable_targets.empty()) { return skip_info_vector{std::make_pair(skip_info{std::move(shared_viable_targets)}, std::move(tablet_set))}; } skip_info_vector skip_infos; skip_infos.reserve(skip_info_per_tablet.size()); for (auto&& [tablet, info] : skip_info_per_tablet) { skip_infos.push_back(std::make_pair(std::move(info), migration_tablet_set{std::move(tablet)})); } return skip_infos; } // Picks best tablet replica to move and its new destination. // The destination host is picked among nodes_by_load_dst, with dst being the preferred destination. // // If drain_skipped is false, the replica is picked among tablets on src.host, // with src.shard as the preferred source shard. // // If drain_skipped is true, the chosen replica is src_node_info.skipped_candidates.back() // and src must match its location. // // Pre-conditions: // // src_node_info.id == src.host // target_info.id == dst.host // src_node_info.shard_by_load.back() == src.shard // nodes_by_load_dst.back().id == dst.host // // if drain_skipped == true: // src_node_info.skipped_candidates.back().replica = src // // if drain_skipped == false: // src_node_info.shards_by_load // // Invariants: // // nodes_by_load_dst[:-1] is a valid heap // src_node_info.shard_by_load[:-1] is a valid heap // // Post-conditions: // // src_node_info.shard_by_load.back() == result.src.shard // nodes_by_load_dst.back().id == result.dst.host // result.tablet is removed from candidate lists in src_node_info. // future pick_candidate(node_load_map& nodes, node_load& src_node_info, node_load& target_info, tablet_replica src, tablet_replica dst, std::vector& nodes_by_load_dst, bool drain_skipped) { auto get_candidate = [this, drain_skipped, &nodes, &src_node_info] (tablet_replica src, tablet_replica dst) -> future { if (drain_skipped) { auto source_tablets = src_node_info.skipped_candidates.back().tablets; auto badness = evaluate_candidate(nodes, source_tablets.table(), src, dst, source_tablets.tablet_set_disk_size); co_return migration_candidate{source_tablets, src, dst, badness}; } else { auto&& src_shard_info = src_node_info.shards[src.shard]; co_return co_await peek_candidate(nodes, src_shard_info, src, dst); } }; migration_candidate min_candidate = co_await get_candidate(src, dst); // Given src as the source replica, evaluate all destinations. // Updates min_candidate with the best candidate, if better is found. auto evaluate_targets = [&] (migration_tablet_set tablets, tablet_replica src, migration_badness src_badness) -> future<> { migration_badness min_dst_badness; std::optional min_dst_host; std::vector best_hosts; // First, find the best target nodes in terms of node badness. for (auto& new_target : nodes_by_load_dst) { co_await coroutine::maybe_yield(); auto& new_target_info = nodes[new_target]; // Skip movements which may harm convergence. if (!check_convergence(src_node_info, new_target_info, tablets)) { continue; } auto badness = evaluate_dst_badness(nodes, tablets.table(), tablet_replica{new_target, 0}, tablets.tablet_set_disk_size); if (!min_dst_host || badness.dst_node_badness < min_dst_badness.dst_node_badness) { min_dst_badness = badness; min_dst_host = new_target; best_hosts.clear(); } if (badness.dst_node_badness == min_dst_badness.dst_node_badness) { best_hosts.push_back(new_target); } } if (!min_dst_host) { lblogger.debug("No viable targets for src node {}", src.host); co_return; } std::optional min_dst; // Find the best shards on best targets. std::vector best_dsts; for (auto host : best_hosts) { for (shard_id new_dst_shard = 0; new_dst_shard < nodes[host].shard_count; new_dst_shard++) { co_await coroutine::maybe_yield(); auto new_dst = tablet_replica{host, new_dst_shard}; auto badness = evaluate_dst_badness(nodes, tablets.table(), new_dst, tablets.tablet_set_disk_size); if (!min_dst || badness < min_dst_badness) { min_dst_badness = badness; min_dst = new_dst; best_dsts.clear(); } if (badness.dst_shard_badness == min_dst_badness.dst_shard_badness) { best_dsts.push_back(new_dst); } } if (min_dst && !min_dst_badness.is_bad()) { break; } } if (best_dsts.size() > 1) { min_dst = best_dsts[rand_int() % best_dsts.size()]; } if (!min_dst) { on_internal_error(lblogger, fmt::format("No destination shards on {}", best_hosts)); } auto candidate = migration_candidate{ tablets, src, *min_dst, migration_badness{src_badness.src_shard_badness, src_badness.src_node_badness, min_dst_badness.dst_shard_badness, min_dst_badness.dst_node_badness} }; lblogger.trace("candidate: {}", candidate); if (candidate.badness < min_candidate.badness) { min_candidate = candidate; } }; if (min_candidate.badness.is_bad() && _use_table_aware_balancing) { _current_stats->bad_first_candidates++; // Consider better alternatives. if (drain_skipped) { auto tablets = src_node_info.skipped_candidates.back().tablets; auto badness = evaluate_src_badness(nodes, tablets.table(), src, tablets.tablet_set_disk_size); co_await evaluate_targets(tablets, src, badness); } else { // Find a better candidate. // Consider different tables. For each table, first find the best source shard. // Then find the best target node. Then find the best shard on the target node. for (auto [table, tablet_count] : src_node_info.tablet_count_per_table) { if (tablet_count == 0) { lblogger.trace("No src candidates for table {} on node {}", table, src.host); continue; } migration_badness min_src_badness; std::optional min_src; std::optional min_tablet_set; auto check_candidate = [&] (const tablet_replica& new_src, const migration_tablet_set& tablet_set) { auto badness = evaluate_src_badness(nodes, table, new_src, tablet_set.tablet_set_disk_size); if (!min_src || badness < min_src_badness) { min_src_badness = badness; min_src = new_src; min_tablet_set = tablet_set; } }; for (auto new_src_shard: src_node_info.shards_by_load) { auto new_src = tablet_replica{src.host, new_src_shard}; if (src_node_info.shards[new_src_shard].candidates[table].empty()) { lblogger.trace("No src candidates for table {} on shard {}", table, new_src); continue; } co_await coroutine::maybe_yield(); if (_force_capacity_based_balancing) { check_candidate(new_src, *src_node_info.shards[new_src_shard].candidates[table].begin()); } else { for (const auto& tablet_set: src_node_info.shards[new_src_shard].candidates[table]) { check_candidate(new_src, tablet_set); } } } if (!min_src) { lblogger.debug("No candidates for table {} on {}", table, src.host); continue; } co_await evaluate_targets(*min_tablet_set, *min_src, min_src_badness); if (!min_candidate.badness.is_bad()) { break; } } } } lblogger.trace("best candidate: {}", min_candidate); if (drain_skipped) { src_node_info.skipped_candidates.pop_back(); } else { erase_candidate(src_node_info.shards[min_candidate.src.shard], min_candidate.tablets); } // Restore invariants. if (min_candidate.dst != dst) { lblogger.trace("dst changed."); if (min_candidate.dst.host != dst.host) { auto i = std::find(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), min_candidate.dst.host); std::swap(*i, nodes_by_load_dst.back()); auto nodes_dst_cmp = [cmp = nodes_by_load_cmp(nodes)] (const host_id& a, const host_id& b) { return cmp(b, a); }; std::make_heap(nodes_by_load_dst.begin(), std::prev(nodes_by_load_dst.end()), nodes_dst_cmp); } if (min_candidate.src.shard != src.shard) { lblogger.trace("src changed."); auto i = std::find(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), min_candidate.src.shard); std::swap(src_node_info.shards_by_load.back(), *i); std::make_heap(src_node_info.shards_by_load.begin(), std::prev(src_node_info.shards_by_load.end()), src_node_info.shards_by_load_cmp()); } } co_return min_candidate; } future<> log_table_load(node_load_map& nodes, table_id table) { load_type total_load = 0; size_t shard_count = 0; load_type max_shard_load = 0; for (auto&& [host, node] : nodes) { if (node.drained) { continue; } shard_count += node.shard_count; load_type this_node_max_shard_load = 0; load_type node_load = 0; for (shard_id shard = 0; shard < node.shard_count; shard++) { co_await coroutine::maybe_yield(); load_type load = double(node.shards[shard].tablet_count_per_table[table]) * _target_tablet_size / *node.capacity_per_shard(); total_load += load; node_load += load; max_shard_load = std::max(max_shard_load, load); this_node_max_shard_load = std::max(this_node_max_shard_load, load); } node_load /= node.shard_count; lblogger.debug("Load on host {} for table {}: total={}, max={}", host, table, node_load, this_node_max_shard_load); } auto avg_load = double(total_load) / shard_count; auto overcommit = max_shard_load / avg_load; lblogger.debug("Table {} shard overcommit: {}", table, overcommit); } future make_internode_plan(node_load_map& nodes, const std::unordered_set& nodes_to_drain, host_id target) { migration_plan plan; // Prepare candidate nodes and shards for heap-based balancing. // Any given node is either in nodes_by_load or nodes_by_load_dst, but not both. // This means that either of the heap needs to be updated when the node's load changes, not both. // heap which tracks most-loaded nodes in terms of avg_load. // It is used to find source tablet candidates. std::vector nodes_by_load; nodes_by_load.reserve(nodes.size()); // heap which tracks least-loaded nodes in terms of avg_load. // Used to find candidates for target nodes. std::vector nodes_by_load_dst; nodes_by_load_dst.reserve(nodes.size()); auto nodes_cmp = nodes_by_load_cmp(nodes); auto nodes_dst_cmp = [&] (const host_id& a, const host_id& b) { return nodes_cmp(b, a); }; for (auto&& [host, node_load] : nodes) { if (lblogger.is_enabled(seastar::log_level::debug)) { shard_id shard = 0; for (auto&& shard_load : node_load.shards) { lblogger.debug("shard {}: load: {}, tablets: {}, candidates: {}, tables: {}", tablet_replica {host, shard}, node_load.shard_load(shard), shard_load.tablet_count, shard_load.candidate_count(), shard_load.tablet_count_per_table); shard++; } } if (host != target && (nodes_to_drain.empty() || node_load.drained)) { nodes_by_load.push_back(host); std::make_heap(node_load.shards_by_load.begin(), node_load.shards_by_load.end(), node_load.shards_by_load_cmp()); } else { nodes_by_load_dst.push_back(host); } } std::make_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp); std::make_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp); const tablet_metadata& tmeta = _tm->tablets(); const locator::topology& topo = _tm->get_topology(); load_type max_off_candidate_load = 0; // max load among nodes which ran out of candidates. auto batch_size = nodes[target].shard_count; const size_t max_skipped_migrations = nodes[target].shards.size() * 2; size_t skipped_migrations = 0; auto shuffle = in_shuffle_mode(); while (plan.size() < batch_size) { co_await coroutine::maybe_yield(); if (nodes_by_load.empty()) { lblogger.debug("No more candidate nodes"); _current_stats->stop_no_candidates++; break; } // Pick source node. std::pop_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp); auto src_host = nodes_by_load.back(); auto& src_node_info = nodes[src_host]; bool drain_skipped = src_node_info.shards_by_load.empty() && src_node_info.drained && !src_node_info.skipped_candidates.empty(); lblogger.debug("source node: {}, avg_load={:.2f}, skipped={}, drain_skipped={}", src_host, src_node_info.avg_load, src_node_info.skipped_candidates.size(), drain_skipped); if (src_node_info.shards_by_load.empty() && !drain_skipped) { lblogger.debug("candidate node {} ran out of candidate shards with {} tablets remaining", src_host, src_node_info.tablet_count); max_off_candidate_load = std::max(max_off_candidate_load, src_node_info.avg_load); nodes_by_load.pop_back(); continue; } auto push_back_node_candidate = seastar::defer([&] { std::push_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp); }); tablet_replica src; auto push_back_shard_candidate = seastar::defer([&] { std::push_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp()); }); if (drain_skipped) { push_back_shard_candidate.cancel(); auto& candidate = src_node_info.skipped_candidates.back(); src = candidate.replica; lblogger.debug("Skipped candidate: tablet={}, replica={}, targets={}", candidate.tablets, src, candidate.viable_targets); // When draining, need to narrow down targets to viable targets before choosing the best target. nodes_by_load_dst.clear(); for (auto&& h : candidate.viable_targets) { nodes_by_load_dst.push_back(h); } std::make_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp); } else { // Pick best source shard. std::pop_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp()); auto src_shard = src_node_info.shards_by_load.back(); src = tablet_replica {src_host, src_shard}; auto&& src_shard_info = src_node_info.shards[src_shard]; if (!src_shard_info.has_candidates()) { lblogger.debug("shard {} ran out of candidates with {} tablets remaining.", src, src_shard_info.tablet_count); src_node_info.shards_by_load.pop_back(); push_back_shard_candidate.cancel(); if (src_node_info.shards_by_load.empty()) { lblogger.debug("candidate node {} ran out of candidate shards with {} tablets remaining, {} skipped.", src_host, src_node_info.tablet_count, src_node_info.skipped_candidates.size()); } continue; } } // Pick best target node. if (nodes_by_load_dst.empty()) { lblogger.debug("No more target nodes"); _current_stats->stop_no_candidates++; break; } std::pop_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp); target = nodes_by_load_dst.back(); auto& target_info = nodes[target]; auto push_back_target_node = seastar::defer([&] { std::push_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp); }); lblogger.debug("target node: {}, avg_load={}", target, target_info.avg_load); // Check convergence conditions. // When draining nodes, disable convergence checks so that all tablets are migrated away. bool can_check_convergence = !shuffle && nodes_to_drain.empty(); if (can_check_convergence) { // Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load) // and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than // that of any candidate, and avg_load of any candidate is not greater than that of any in the off-candidates set. // This is ensured by the fact that we remove candidates in the order of avg_load from the heap, and // because we prevent load inversion between candidate and target in the next check. // So the max avg_load of candidates is that of the current src_node_info, and max avg_load of off-candidates // is tracked in max_off_candidate_load. If max_off_candidate_load is equal to target's avg_load, // it means that all nodes have equal avg_load. We take the maximum with the current candidate in src_node_info // to handle the case of off-candidates being empty. In that case, max_off_candidate_load is 0. const load_type max_load = std::max(max_off_candidate_load, src_node_info.avg_load); if (is_balanced(target_info.avg_load, max_load)) { lblogger.debug("Balance achieved."); _current_stats->stop_balance++; break; } } // Pick best target shard. auto dst = global_shard_id {target, _load_sketch->get_least_loaded_shard(target)}; lblogger.trace("target shard: {}, tablets={}, load={}", dst.shard, target_info.shards[dst.shard].tablet_count, target_info.shard_load(dst.shard)); if (lblogger.is_enabled(seastar::log_level::trace)) { shard_id shard = 0; for (auto&& shard_load : target_info.shards) { lblogger.trace("shard {}: load: {}, tablets: {}, candidates: {}, tables: {}", tablet_replica{dst.host, shard}, target_info.shard_load(shard), shard_load.tablet_count, shard_load.candidate_count(), shard_load.tablet_count_per_table); shard++; } } // Pick tablet movement. // May choose a different source shard than src.shard or different destination host/shard than dst. auto candidate = co_await pick_candidate(nodes, src_node_info, target_info, src, dst, nodes_by_load_dst, drain_skipped); auto source_tablets = candidate.tablets; src = candidate.src; dst = candidate.dst; auto& tmap = tmeta.get_tablet_map(source_tablets.table()); if (can_check_convergence && !check_convergence(src_node_info, target_info, source_tablets)) { lblogger.debug("No more candidates. Load would be inverted."); _current_stats->stop_load_inversion++; break; } // Check replication strategy constraints. // When drain_skipped is true, we already picked movement to a viable target. if (!drain_skipped) { auto process_skip_info = [&] (migration_tablet_set tablets, skip_info skip) { if (src_node_info.drained && skip.viable_targets.empty()) { auto tablet = tablets.tablets().front(); auto replicas = tmap.get_tablet_info(tablet.tablet).replicas; auto reason = fmt::format("Unable to find new replica for tablet {} on {} when draining {}. Consider adding new nodes or reducing replication factor. (nodes {}, replicas {})", tablet, src, nodes_to_drain, nodes_by_load_dst, replicas); lblogger.warn("{}", reason); plan.add(drain_failure(src_node_info.id, reason)); return; } lblogger.debug("Adding replica {} of candidate {} to skipped list with the viable targets {}", src, candidate, skip.viable_targets); src_node_info.skipped_candidates.emplace_back(src, tablets, std::move(skip.viable_targets)); }; auto skip = check_constraints(nodes, tmap, src_node_info, nodes[dst.host], source_tablets, src_node_info.drained); if (skip) { for (auto&& [skip_info, tablets] : *skip) { process_skip_info(tablets, skip_info); } if (!plan.drain_failures().empty()) { break; } continue; } } if (candidate.badness.is_bad()) { _current_stats->bad_migrations++; } if (drain_skipped) { _current_stats->migrations_from_skiplist++; } if (src_node_info.req && *src_node_info.req == topology_request::leave && src_node_info.excluded) { plan.add(drain_failure(src_node_info.id, "Node was marked as excluded")); break; } tablet_transition_kind kind = (src_node_info.state() == locator::node::state::being_removed || src_node_info.state() == locator::node::state::left || (src_node_info.req && *src_node_info.req == topology_request::remove)) ? locator::choose_rebuild_transition_kind(_db.features()) : tablet_transition_kind::migration; auto mig = get_migration_info(source_tablets, kind, src, dst); auto mig_streaming_info = get_migration_streaming_infos(topo, tmap, mig); pick(*_load_sketch, dst.host, dst.shard, source_tablets); if (can_accept_load(nodes, mig_streaming_info)) { apply_load(nodes, mig_streaming_info); lblogger.debug("Adding migration: {} size: {}", mig, source_tablets.tablet_set_disk_size); _current_stats->migrations_produced++; mark_as_scheduled(mig); plan.add(std::move(mig)); } else { // Shards are overloaded with streaming. Do not include the migration in the plan, but // continue as if it was in the hope that we will find a migration which can be executed without // violating the load. Next make_plan() invocation will notice that the migration was not executed. // We should not just stop here because that can lead to underutilization of the cluster. // Just because the next migration is blocked doesn't mean we could not proceed with migrations // for other shards which are produced by the planner subsequently. skipped_migrations++; _current_stats->migrations_skipped++; if (skipped_migrations >= max_skipped_migrations) { lblogger.debug("Too many migrations skipped, aborting balancing"); _current_stats->stop_skip_limit++; break; } } erase_candidates(nodes, tmap, source_tablets); update_node_load_on_migration(nodes, src, dst, source_tablets); if (src_node_info.tablet_count == 0) { push_back_node_candidate.cancel(); nodes_by_load.pop_back(); } if (lblogger.is_enabled(seastar::log_level::debug)) { co_await log_table_load(nodes, source_tablets.table()); } } if (plan.size() == batch_size) { _current_stats->stop_batch_size++; } if (plan.empty()) { // Due to replica collocation constraints, it may not be possible to balance the cluster evenly. // For example, if nodes have different number of shards. Nodes which have more shards will be // replicas for more tablets which rules out more candidates on other nodes with a higher per-shard load. // // Example: // // node1: 1 shard // node2: 1 shard // node3: 7 shard // // If there are 7 tablets and RF=3, each node must have 1 tablet replica. // So node3 will have average load of 1, and node1 and node2 will have // average shard load of 7. // Show when this is the final plan with no active migrations left to execute, // otherwise it may just be a temporary situation due to lack of candidates. if (_migrating_candidates == 0) { lblogger.info("Not possible to achieve balance in {}", _location); print_node_stats(nodes, only_active::no); } } co_return std::move(plan); } class sibling_tablets_replicas_processor { const tablet_desc _t1; const std::optional _t2; tablet_replica_set _t1_replicas; tablet_replica_set _t2_replicas; tablet_replica_set::iterator _current_t1; tablet_replica_set::iterator _current_t2; public: sibling_tablets_replicas_processor(const tablet_desc t1, const std::optional t2, tablet_replica_set t1_replicas, tablet_replica_set t2_replicas) : _t1(std::move(t1)) , _t2(std::move(t2)) , _t1_replicas(std::move(t1_replicas)) , _t2_replicas(std::move(t2_replicas)) , _current_t1(_t1_replicas.begin()) , _current_t2(_t2_replicas.begin()) { } using tablet_ids = utils::small_vector; // Produces the next replica from sets of sibling tablets. If a given replica has // the sibling tablets co-located in it, the ids of both tablets will be returned // for that replica. // Given replica sets of sibling tablets: // t1 {A, B, C}, // t2 {A, C, D}, // it will yield // {A, {t1, t2}}, {B, {t1}}, {C, {t1, t2}}, {D, {t2}} // Invariant: if return value is engaged, size of tablet_ids will be 1 or 2. std::optional> next_replica() { if (_current_t1 == _t1_replicas.end() && _current_t2 == _t2_replicas.end()) { return std::nullopt; } if (_current_t1 == _t1_replicas.end()) { return std::make_pair(*_current_t2++, tablet_ids{_t2->tid}); } if (_current_t2 == _t2_replicas.end()) { return std::make_pair(*_current_t1++, tablet_ids{_t1.tid}); } // Detect co-located replicas of sibling tablets. if (*_current_t1 == *_current_t2) { _current_t1++; return std::make_pair(*_current_t2++, tablet_ids{_t1.tid, _t2->tid}); } if (*_current_t1 < *_current_t2) { return std::make_pair(*_current_t1++, tablet_ids{_t1.tid}); } return std::make_pair(*_current_t2++, tablet_ids{_t2->tid}); } }; using only_active = bool_class; void print_node_stats(node_load_map& nodes, only_active only_active_) { for (auto&& [host, load] : nodes) { size_t read = 0; size_t write = 0; for (auto& shard_load : load.shards) { read += shard_load.streaming_read_load; write += shard_load.streaming_write_load; } auto level = !only_active_ || (read + write) > 0 ? seastar::log_level::info : seastar::log_level::debug; lblogger.log(level, "Node {}: {}/{} load={:.6f} tablets={} shards={} tablets/shard={:.3f} state={} cap={}" " rd={} wr={}", host, load.dc(), load.rack(), load.avg_load, load.tablet_count, load.shard_count, load.tablets_per_shard(), load.state(), load.dusage->capacity, read, write); } } future make_plan(dc_name dc, std::optional rack = std::nullopt) { migration_plan plan; if (utils::get_local_injector().enter("tablet_migration_bypass")) { co_return std::move(plan); } _dc = dc; _rack = rack; _location = fmt::format("{}{}", dc, rack ? fmt::format("/{}", *rack) : ""); _current_stats = _stats.for_dc(dc); auto _ = seastar::defer([&] { _current_stats = nullptr; }); _migrating_candidates = 0; auto node_filter = [&] (const locator::node& node) { return node.dc_rack().dc == dc && (!rack || node.dc_rack().rack == *rack); }; // Causes load balancer to move some tablet even though load is balanced. auto shuffle = in_shuffle_mode(); _current_stats->calls++; lblogger.debug("Examining DC {} rack {} (shuffle={}, balancing={}, tablets_per_shard_goal={}, force_capacity_based_balancing={})", dc, rack, shuffle, _tm->tablets().balancing_enabled(), _tablets_per_shard_goal, _force_capacity_based_balancing); const locator::topology& topo = _tm->get_topology(); // Select subset of nodes to balance. node_load_map nodes; std::unordered_set nodes_to_drain; _tm->for_each_token_owner([&] (const locator::node& node) { if (!node_filter(node)) { return; } auto drain_node = [&] (topology_request req) { lblogger.info("Will drain node {} ({}) from DC {} due to {} request", node.host_id(), node.get_state(), dc, req); ensure_node(nodes, node.host_id()); nodes_to_drain.emplace(node.host_id()); auto& n = nodes[node.host_id()]; n.req = req; n.drained = true; }; auto req = _topology ? _topology->get_request(raft::server_id(node.host_id().uuid())) : std::nullopt; if (node.get_state() == locator::node::state::being_decommissioned) { drain_node(topology_request::leave); } else if (node.get_state() == locator::node::state::being_removed) { drain_node(topology_request::remove); } else if (req && (*req == topology_request::leave || *req == topology_request::remove)) { drain_node(*req); } else if (node.get_state() == locator::node::state::normal) { if (node.is_excluded()) { // Excluded nodes should not be chosen as targets for migration. lblogger.debug("Ignoring excluded node {}: state={}", node.host_id(), node.get_state()); } else { ensure_node(nodes, node.host_id()); } } }); // Apply skiplist only when not draining. // It's unsafe to move tablets to non-skip nodes as this can lead to node overload. if (nodes_to_drain.empty()) { for (auto host_to_skip : _skiplist) { if (auto handle = nodes.extract(host_to_skip)) { auto& node = handle.mapped(); lblogger.debug("Ignoring dead node {}: state={}", node.id, node.node->get_state()); } } } // Compute tablet load on nodes. for (auto&& [table, tables] : _tm->tablets().all_table_groups()) { const auto& tmap = _tm->tablets().get_tablet_map(table); co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) -> future<> { auto trinfo = tmap.get_tablet_transition_info(tid); // Check if any replica is on a node which has left. // When node is replaced we don't rebuild as part of topology request. for (auto&& r : ti.replicas) { auto* node = topo.find_node(r.host); if (!node) { on_internal_error(lblogger, format("Replica {} of tablet {} not found in topology", r, global_tablet_id{table, tid})); } if (node->left() && node_filter(*node)) { ensure_node(nodes, r.host); nodes_to_drain.insert(r.host); nodes[r.host].drained = true; } } // We reflect migrations in the load as if they already happened, // optimistically assuming that they will succeed. for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) { if (nodes.contains(replica.host)) { nodes[replica.host].tablet_count += 1; // This invariant is assumed later. if (replica.shard >= nodes[replica.host].shard_count) { auto gtid = global_tablet_id{table, tid}; on_internal_error(lblogger, format("Tablet {} replica {} targets non-existent shard", gtid, replica)); } } } return make_ready_future<>(); }); } if (nodes.empty()) { lblogger.debug("No nodes to balance."); _current_stats->stop_balance++; co_return plan; } // Detect finished drain. for (auto i = nodes_to_drain.begin(); i != nodes_to_drain.end();) { if (nodes[*i].tablet_count == 0) { lblogger.info("Node {} is already drained, ignoring", *i); nodes.erase(*i); i = nodes_to_drain.erase(i); } else { ++i; } } _load_sketch = locator::load_sketch(_tm, _table_load_stats, _force_capacity_based_balancing ? _target_tablet_size : 0); _load_sketch->set_minimal_tablet_size(_minimal_tablet_size); _load_sketch->set_force_capacity_based_load(_force_capacity_based_balancing); co_await _load_sketch->populate_dc(dc); // If we don't have nodes to drain, remove nodes which don't have complete tablet sizes if (nodes_to_drain.empty()) { std::optional incomplete_host; size_t incomplete_count = 0; for (auto nodes_i = nodes.begin(); nodes_i != nodes.end();) { host_id host = nodes_i->first; if (!_load_sketch->has_complete_data(host)) { incomplete_host.emplace(host); incomplete_count++; nodes_i = nodes.erase(nodes_i); } else { ++nodes_i; } } if (incomplete_host) { lblogger.info("Ignoring {} node(s) with incomplete tablet stats, e.g. {}", incomplete_count, *incomplete_host); } } plan.set_has_nodes_to_drain(!nodes_to_drain.empty()); // Invariant: node.dusage || node.drained for (auto& [host, node] : nodes) { if (node.drained) { continue; } if (!node.dusage) { lblogger.info("Cannot balance because capacity of node {} (or more) is unknown", host); co_return plan; } } // For size based balancing, only excluded nodes are allowed to have incomplete tablet stats for (auto& [host, node] : nodes) { if (!_load_sketch->has_complete_data(host)) { if (!_force_capacity_based_balancing && node.drained && node.node->is_excluded()) { _load_sketch->ignore_incomplete_data(host); } else { lblogger.info("Cannot balance because node {} (or more) has incomplete tablet stats", host); co_return plan; } } } // Check if we have destination nodes const bool has_dest_nodes = std::ranges::any_of(std::views::values(nodes), [&] (const auto& load) { return !load.drained; }); if (!has_dest_nodes) { for (auto host : nodes_to_drain) { plan.add(drain_failure(host, format("No candidate nodes in {} to drain {}." " Consider adding new nodes or reducing replication factor.", _location, host))); } lblogger.debug("No candidate nodes"); _current_stats->stop_no_candidates++; co_return plan; } // We want to saturate the target node so we migrate several tablets in parallel, one for each shard // on the target node. This assumes that the target node is well-balanced and that tablet migrations // complete at the same time. Both assumptions are not generally true in practice, which we currently ignore. // But they will be true typically, because we fill shards starting from least-loaded shards, // so we naturally strive towards balance between shards. // // If target node is not balanced across shards, we will overload some shards. Streaming concurrency // will suffer because more loaded shards will not participate, which will under-utilize the node. // FIXME: To handle the above, we should rebalance the target node before migrating tablets from other nodes. // Compute per-shard load and candidate tablets. _tablet_count_per_table.clear(); _disk_used_per_table.clear(); for (auto&& [table, tables] : _tm->tablets().all_table_groups()) { const auto& tmap = _tm->tablets().get_tablet_map(table); uint64_t total_tablet_count = 0; uint64_t total_tablet_sizes = 0; auto get_replicas = [this] (std::optional t) -> tablet_replica_set { return t ? sorted_replicas_for_tablet_load(*t->info, t->transition) : tablet_replica_set{}; }; auto migrating = [&] (std::optional t) { return t && (bool(t->transition) || _scheduled_tablets.contains(global_tablet_id{table, t->tid})); }; auto maybe_apply_load = [&] (std::optional t) { if (t && is_streaming(t->transition)) { apply_load(nodes, get_migration_streaming_info(topo, *t->info, *t->transition)); } }; // If a table is undergoing merge, co-located replicas of sibling tablets will be treated as a single migration candidate, // even though each tablet replica will be migrated independently. Next invocation of load balancer is able to exclude both // sibling if either haven't finished migration yet. That's to prevent load balancer from incorrectly considering that // they're not co-located if only one of them completed migration. co_await tmap.for_each_sibling_tablets([&, table = table] (tablet_desc t1, std::optional t2) -> future<> { maybe_apply_load(t1); maybe_apply_load(t2); auto t1_replicas = get_replicas(t1); // If t2 is disengaged, when tablet_count == 1, t2_replicas is empty and so will have no effect // when adding t1 replicas as candidates. auto t2_replicas = get_replicas(t2); sibling_tablets_replicas_processor processor(t1, t2, std::move(t1_replicas), std::move(t2_replicas)); auto get_table_desc = [&] (tablet_id tid) { return tid == t1.tid ? t1 : t2; }; while (auto next = processor.next_replica()) { auto& [replica, tids] = *next; if (!nodes.contains(replica.host)) { continue; } utils::small_vector tablet_sizes; uint64_t tablet_sizes_sum = 0; for (auto tid : tids) { if (_force_capacity_based_balancing) { tablet_sizes_sum += _target_tablet_size; tablet_sizes.push_back(_target_tablet_size); } else { uint64_t tablet_group_size = 0; auto token_range = tmap.get_token_range(tid); for (auto group_member : tables) { const range_based_tablet_id rb_tid {group_member, token_range}; auto& member_tmap = _tm->tablets().get_tablet_map(group_member); auto& ti = member_tmap.get_tablet_info(tid); auto trinfo = member_tmap.get_tablet_transition_info(tid); auto tablet_size_opt = get_tablet_size(replica.host, rb_tid, ti, trinfo); const uint64_t tablet_size = std::max(tablet_size_opt.value_or(_target_tablet_size), _minimal_tablet_size); tablet_group_size += tablet_size; tablet_sizes_sum += tablet_size; } tablet_sizes.push_back(tablet_group_size); } } auto& node_load_info = nodes[replica.host]; shard_load& shard_load_info = node_load_info.shards[replica.shard]; if (shard_load_info.tablet_count == 0) { node_load_info.shards_by_load.push_back(replica.shard); } shard_load_info.tablet_count += tids.size(); if (shard_load_info.dusage) { shard_load_info.dusage->used += tablet_sizes_sum; } shard_load_info.tablet_count_per_table[table] += tids.size(); shard_load_info.tablet_sizes_per_table[table] += tablet_sizes_sum; node_load_info.tablet_count_per_table[table] += tids.size(); node_load_info.tablet_sizes_per_table[table] += tablet_sizes_sum; if (node_load_info.dusage) { node_load_info.dusage->used += tablet_sizes_sum; } total_tablet_count += tids.size(); total_tablet_sizes += tablet_sizes_sum; if (tmap.needs_merge() && tids.size() == 2) { // Exclude both sibling tablets if either haven't finished migration yet. That's to prevent balancer from // un-doing the colocation. if (!migrating(t1) && !migrating(t2)) { auto candidate = colocated_tablets{global_tablet_id{table, t1.tid}, global_tablet_id{table, t2->tid}}; add_candidate(shard_load_info, migration_tablet_set{std::move(candidate), tablet_sizes_sum}); } else { _migrating_candidates++; } } else { if (tids.size() != tablet_sizes.size()) { on_internal_error(lblogger, "Number of co-located tablets and their sizes don't match."); } for (size_t i = 0; i < tids.size(); i++) { if (!migrating(get_table_desc(tids[i]))) { // migrating tablets are not candidates add_candidate(shard_load_info, migration_tablet_set{global_tablet_id{table, tids[i]}, tablet_sizes[i]}); } else { _migrating_candidates++; } } } } return make_ready_future<>(); }); _disk_used_per_table[table] = total_tablet_sizes; _tablet_count_per_table[table] = total_tablet_count; } // Compute load imbalance. _total_capacity_shards = 0; _total_capacity_nodes = 0; _total_capacity_storage = 0; load_type max_load = 0; load_type min_load = 0; std::optional min_load_node = std::nullopt; for (auto&& [host, load] : nodes) { load.update(); _stats.for_node(dc, host).load = load.avg_load; if (!load.drained) { if (!min_load_node || load.avg_load < min_load) { min_load = load.avg_load; min_load_node = host; } if (load.avg_load > max_load) { max_load = load.avg_load; } _total_capacity_shards += load.shard_count; _total_capacity_nodes++; _total_capacity_storage += load.dusage->capacity; } } print_node_stats(nodes, only_active::yes); if (!nodes_to_drain.empty() || (_tm->tablets().balancing_enabled() && (shuffle || !is_balanced(min_load, max_load)))) { host_id target = *min_load_node; lblogger.info("target node: {}, avg_load: {}, max: {}", target, min_load, max_load); plan.merge(co_await make_internode_plan(nodes, nodes_to_drain, target)); } else { _current_stats->stop_balance++; } if (_tm->tablets().balancing_enabled()) { plan.merge(co_await make_intranode_plan(nodes, nodes_to_drain)); } if (_tm->tablets().balancing_enabled() && plan.empty() && !ongoing_rack_list_colocation()) { auto dc_merge_plan = co_await make_merge_colocation_plan(nodes); auto level = dc_merge_plan.tablet_migration_count() > 0 ? seastar::log_level::info : seastar::log_level::debug; lblogger.log(level, "Prepared {} migrations for co-locating sibling tablets in {}", dc_merge_plan.tablet_migration_count(), _location); plan.merge(std::move(dc_merge_plan)); } co_await utils::clear_gently(nodes); co_return std::move(plan); } }; class tablet_allocator_impl : public tablet_allocator::impl , public service::migration_listener::empty_listener { service::migration_notifier& _migration_notifier; replica::database& _db; load_balancer_stats_manager _load_balancer_stats; scheduling_group _background; bool _stopped = false; bool _use_tablet_aware_balancing = true; locator::load_stats_ptr _load_stats; private: load_balancer make_load_balancer(token_metadata_ptr tm, service::topology* topology, db::system_keyspace* sys_ks, locator::load_stats_ptr table_load_stats, std::unordered_set skiplist) { load_balancer lb(_db, tm, topology, sys_ks, std::move(table_load_stats), _load_balancer_stats, _db.get_config().target_tablet_size_in_bytes(), _db.get_config().tablets_per_shard_goal(), std::move(skiplist)); lb.set_use_table_aware_balancing(_use_tablet_aware_balancing); lb.set_initial_scale(_db.get_config().tablets_initial_scale_factor()); return lb; } public: tablet_allocator_impl(tablet_allocator::config cfg, service::migration_notifier& mn, replica::database& db) : _migration_notifier(mn) , _db(db) , _load_balancer_stats("load_balancer") , _background(cfg.background_sg) { _migration_notifier.register_listener(this); } tablet_allocator_impl(tablet_allocator_impl&&) = delete; // "this" captured. ~tablet_allocator_impl() { SCYLLA_ASSERT(_stopped); } future<> stop() { co_await _migration_notifier.unregister_listener(this); _stopped = true; } future balance_tablets(token_metadata_ptr tm, service::topology* topology, db::system_keyspace* sys_ks, locator::load_stats_ptr table_load_stats, std::unordered_set skiplist) { auto lb = make_load_balancer(tm, topology, sys_ks, table_load_stats ? table_load_stats : _load_stats, std::move(skiplist)); co_await coroutine::switch_to(_background); co_return co_await lb.make_plan(); } void set_load_stats(locator::load_stats_ptr load_stats) { _load_stats = std::move(load_stats); } locator::load_stats_ptr get_load_stats() { return _load_stats; } void set_use_tablet_aware_balancing(bool use_tablet_aware_balancing) { _use_tablet_aware_balancing = use_tablet_aware_balancing; } // Allocates new tablets for a table which is not co-located with another table. tablet_map allocate_tablets_for_new_base_table(const tablet_aware_replication_strategy* tablet_rs, const schema& s) { auto tm = _db.get_shared_token_metadata().get(); auto lb = make_load_balancer(tm, nullptr, nullptr, nullptr, {}); auto plan = lb.make_sizing_plan(s.shared_from_this(), tablet_rs).get(); auto& table_plan = plan.tables[s.id()]; if (table_plan.target_tablet_count_aligned != table_plan.target_tablet_count) { lblogger.info("Rounding up tablet count from {} to {} for table {}.{}", table_plan.target_tablet_count, table_plan.target_tablet_count_aligned, s.ks_name(), s.cf_name()); } auto tablet_count = table_plan.target_tablet_count_aligned; auto map = tablet_rs->allocate_tablets_for_new_table(s.shared_from_this(), tm, tablet_count).get(); return map; } // Allocate tablets for multiple new tables, which may be co-located with each other, or co-located with an existing base table. void allocate_tablets_for_new_tables(const keyspace_metadata& ksm, const std::vector& cfms, utils::chunked_vector& muts, api::timestamp_type ts) { utils::get_local_injector().inject("pause_in_allocate_tablets_for_new_table", utils::wait_for_message(std::chrono::minutes(5))).get(); locator::replication_strategy_params params(ksm.strategy_options(), ksm.initial_tablets(), ksm.consistency_option()); auto tm = _db.get_shared_token_metadata().get(); auto rs = abstract_replication_strategy::create_replication_strategy(ksm.strategy_name(), params, tm->get_topology()); if (auto&& tablet_rs = rs->maybe_as_tablet_aware()) { std::unordered_map new_cfms_map; for (auto s : cfms) { new_cfms_map[s->id()] = s; } // Group the new tables by co-location groups. // The key is the base table id, which may be a new table or an existing table. const bool colocated_tablets_enabled = _db.features().colocated_tablets; std::unordered_map> table_groups; std::unordered_set colocated_tables; for (auto s : cfms) { std::optional base_id; if (colocated_tablets_enabled) { base_id = _db.get_base_table_for_tablet_colocation(*s, new_cfms_map); if (base_id) { if (colocated_tables.contains(*base_id) || tm->tablets().get_base_table(*base_id) != *base_id) { on_internal_error(lblogger, format("Trying to set co-located table {} with base table {} but it's not a base table.", s->id(), *base_id)); } colocated_tables.insert(s->id()); } } table_groups[base_id.value_or(s->id())].push_back(s); } // allocate tablets for each co-location group. // if the base is a new table, allocate new tablets for it. // for the other tables in the group, create a co-located tablet map. for (const auto& [base_id, group_schemas] : table_groups) { auto create_colocated_tablet_maps = [&] (const tablet_map& base_map) { for (auto sp : group_schemas) { const auto& s = *sp; if (s.id() != base_id) { lblogger.debug("Creating tablets for {}.{} id={} with base={}", s.ks_name(), s.cf_name(), s.id(), base_id); muts.emplace_back(colocated_tablet_map_to_mutation(s.id(), s.ks_name(), s.cf_name(), base_id, ts)); _db.get_notifier().before_allocate_tablet_map_in_notification(base_map, s, muts, ts); } } }; if (tm->tablets().has_tablet_map(base_id)) { const auto& base_map = tm->tablets().get_tablet_map(base_id); create_colocated_tablet_maps(base_map); } else { const auto& s = *new_cfms_map[base_id]; lblogger.debug("Creating tablets for {}.{} id={}", s.ks_name(), s.cf_name(), s.id()); auto base_map = allocate_tablets_for_new_base_table(tablet_rs, s); tablet_map_to_mutations(base_map, s.id(), s.ks_name(), s.cf_name(), ts, _db.features(), [&] (mutation m) { muts.emplace_back(std::move(m)); return make_ready_future<>(); }).get(); _db.get_notifier().before_allocate_tablet_map_in_notification(base_map, s, muts, ts); create_colocated_tablet_maps(base_map); } } } } void on_before_create_column_families(const keyspace_metadata& ksm, const std::vector& cfms, utils::chunked_vector& muts, api::timestamp_type ts) override { allocate_tablets_for_new_tables(ksm, cfms, muts, ts); } void on_before_create_column_family(const keyspace_metadata& ksm, const schema& s, utils::chunked_vector& muts, api::timestamp_type ts) override { allocate_tablets_for_new_tables(ksm, {s.shared_from_this()}, muts, ts); } void on_before_drop_column_family(const schema& s, utils::chunked_vector& muts, api::timestamp_type ts) override { keyspace& ks = _db.find_keyspace(s.ks_name()); auto&& rs = ks.get_replication_strategy(); if (rs.uses_tablets()) { auto tm = _db.get_shared_token_metadata().get(); lblogger.debug("Dropping tablets for {}.{} id={}", s.ks_name(), s.cf_name(), s.id()); muts.emplace_back(make_drop_tablet_map_mutation(s.id(), ts)); } } void on_before_drop_keyspace(const sstring& keyspace_name, utils::chunked_vector& muts, api::timestamp_type ts) override { keyspace& ks = _db.find_keyspace(keyspace_name); auto&& rs = ks.get_replication_strategy(); if (rs.uses_tablets()) { lblogger.debug("Dropping tablets for keyspace {}", keyspace_name); auto tm = _db.get_shared_token_metadata().get(); for (auto&& [name, s] : ks.metadata()->cf_meta_data()) { muts.emplace_back(make_drop_tablet_map_mutation(s->id(), ts)); } } } void on_leadership_lost() { _load_balancer_stats.unregister(); _load_stats = {}; } load_balancer_stats_manager& stats() { return _load_balancer_stats; } private: // The splitting of tablets today is completely based on the power-of-two constraint. // A tablet of id X is split into 2 new tablets, which new ids are (x << 1) and // (x << 1) + 1. // So a tablet of id 0 is remapped into ids 0 and 1. Another of id 1 is remapped // into ids 2 and 3, and so on. future split_tablets(token_metadata_ptr tm, table_id table) { auto& tablets = tm->tablets().get_tablet_map(table); tablet_map new_tablets(tablets.tablet_count() * 2); for (tablet_id tid : tablets.tablet_ids()) { co_await coroutine::maybe_yield(); tablet_id new_left_tid = tablet_id(tid.value() << 1); tablet_id new_right_tid = tablet_id(new_left_tid.value() + 1); auto& tablet_info = tablets.get_tablet_info(tid); new_tablets.set_tablet(new_left_tid, tablet_info); new_tablets.set_tablet(new_right_tid, tablet_info); } lblogger.info("Split tablets for table {}, increasing tablet count from {} to {}", table, tablets.tablet_count(), new_tablets.tablet_count()); co_return std::move(new_tablets); } // The merging of tablet is completely based on the power-of-two constraint. // Tablet of ids X and X+1 are merged into new tablet id (X >> 1). future merge_tablets(token_metadata_ptr tm, table_id table) { auto& tablets = tm->tablets().get_tablet_map(table); tablet_map new_tablets(tablets.tablet_count() / 2); for (tablet_id tid : new_tablets.tablet_ids()) { co_await coroutine::maybe_yield(); tablet_id old_left_tid = tablet_id(tid.value() << 1); tablet_id old_right_tid = tablet_id(old_left_tid.value() + 1); auto& left_tablet_info = tablets.get_tablet_info(old_left_tid); auto& right_tablet_info = tablets.get_tablet_info(old_right_tid); auto sorted = [] (tablet_replica_set set) { std::ranges::sort(set, std::less()); return set; }; auto left_tablet_replicas = sorted(left_tablet_info.replicas); auto right_tablet_replicas = sorted(right_tablet_info.replicas); if (left_tablet_replicas != right_tablet_replicas) { throw std::runtime_error(format("Sibling tablets {} (r: {}) and {} (r: {}) are not colocated.", old_left_tid, left_tablet_replicas, old_right_tid, right_tablet_replicas)); } auto merged_tablet_info = locator::merge_tablet_info(left_tablet_info, right_tablet_info); if (!merged_tablet_info) { throw std::runtime_error(format("Unable to merge tablet info of sibling tablets {} (r: {}) and {} (r: {}).", old_left_tid, left_tablet_replicas, old_right_tid, right_tablet_replicas)); } lblogger.debug("Got merged_tablet_info with sstables_repaired_at={}", merged_tablet_info->sstables_repaired_at); new_tablets.set_tablet(tid, *merged_tablet_info); } lblogger.info("Merge tablets for table {}, decreasing tablet count from {} to {}", table, tablets.tablet_count(), new_tablets.tablet_count()); co_return std::move(new_tablets); } public: future resize_tablets(token_metadata_ptr tm, table_id table) { auto& tmap = tm->tablets().get_tablet_map(table); if (tmap.needs_split()) { return split_tablets(std::move(tm), table); } else if (tmap.needs_merge()) { return merge_tablets(std::move(tm), table); } throw std::logic_error(format("Table {} cannot be resized", table)); } // FIXME: Handle materialized views. }; future> migration_plan::get_migration_tablet_ids() const { std::unordered_set tablets; for (auto& m : _migrations) { co_await coroutine::maybe_yield(); tablets.insert(m.tablet); } for (auto& gid : _repair_plan._repairs) { co_await coroutine::maybe_yield(); tablets.insert(gid); } co_return tablets; } tablet_allocator::tablet_allocator(config cfg, service::migration_notifier& mn, replica::database& db) : _impl(std::make_unique(std::move(cfg), mn, db)) { } future<> tablet_allocator::stop() { return impl().stop(); } future tablet_allocator::balance_tablets(locator::token_metadata_ptr tm, service::topology* topology, db::system_keyspace* sys_ks, locator::load_stats_ptr load_stats, std::unordered_set skiplist) { return impl().balance_tablets(std::move(tm), topology, sys_ks, std::move(load_stats), std::move(skiplist)); } void tablet_allocator::set_load_stats(locator::load_stats_ptr load_stats) { impl().set_load_stats(std::move(load_stats)); } locator::load_stats_ptr tablet_allocator::get_load_stats() { return impl().get_load_stats(); } void tablet_allocator::set_use_table_aware_balancing(bool use_tablet_aware_balancing) { impl().set_use_tablet_aware_balancing(use_tablet_aware_balancing); } future tablet_allocator::resize_tablets(locator::token_metadata_ptr tm, table_id table) { return impl().resize_tablets(std::move(tm), table); } tablet_allocator_impl& tablet_allocator::impl() { return static_cast(*_impl); } void tablet_allocator::on_leadership_lost() { impl().on_leadership_lost(); } load_balancer_stats_manager& tablet_allocator::stats() { return impl().stats(); } } auto fmt::formatter::format(const service::tablet_migration_info& mig, fmt::format_context& ctx) const -> decltype(ctx.out()) { return fmt::format_to(ctx.out(), "{{tablet: {}, src: {}, dst: {}}}", mig.tablet, mig.src, mig.dst); }