Introduced a new max_tablet_count tablet option that caps the maximum number of tablets a table can have. This feature is designed primarily for backup and restore workflows. During backup, when load balancing is disabled for snapshot consistency, the current tablet count is recorded in the backup manifest. During restore, max_tablet_count is set to this recorded value, ensuring the restored table's tablet count never exceeds the original snapshot's tablet distribution. This guarantee enables efficient file-based SSTable streaming during restore, as each SSTable remains fully contained within a single tablet boundary. Closes scylladb/scylladb#28450
4205 lines
195 KiB
C++
4205 lines
195 KiB
C++
/*
|
|
* Copyright (C) 2023-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "cql3/statements/ks_prop_defs.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "locator/tablets.hh"
|
|
#include "locator/topology.hh"
|
|
#include "replica/tablets.hh"
|
|
#include "locator/tablet_replication_strategy.hh"
|
|
#include "replica/database.hh"
|
|
#include "service/migration_listener.hh"
|
|
#include "service/tablet_allocator.hh"
|
|
#include "utils/UUID.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include "utils/stall_free.hh"
|
|
#include "utils/overloaded_functor.hh"
|
|
#include "db/config.hh"
|
|
#include "db/tablet_options.hh"
|
|
#include "locator/load_sketch.hh"
|
|
#include "replica/database.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include <iterator>
|
|
#include <ranges>
|
|
#include <utility>
|
|
#include <fmt/ranges.h>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <seastar/coroutine/switch_to.hh>
|
|
#include <absl/container/flat_hash_map.h>
|
|
#include <fmt/format.h>
|
|
#include <fmt/chrono.h>
|
|
|
|
using namespace locator;
|
|
using namespace replica;
|
|
|
|
namespace service {
|
|
|
|
seastar::logger lblogger("load_balancer");
|
|
|
|
void load_balancer_stats_manager::setup_metrics(const dc_name& dc, load_balancer_dc_stats& stats) {
|
|
namespace sm = seastar::metrics;
|
|
auto dc_lb = dc_label(dc);
|
|
_metrics.add_group(group_name, {
|
|
sm::make_counter("calls", sm::description("number of calls to the load balancer"),
|
|
stats.calls)(dc_lb),
|
|
sm::make_counter("migrations_produced", sm::description("number of migrations produced by the load balancer"),
|
|
stats.migrations_produced)(dc_lb),
|
|
sm::make_counter("migrations_skipped", sm::description("number of migrations skipped by the load balancer due to load limits"),
|
|
stats.migrations_skipped)(dc_lb),
|
|
sm::make_counter("cross_rack_collocations", sm::description("number of co-locating migrations which move replica across racks"),
|
|
stats.cross_rack_collocations)(dc_lb),
|
|
});
|
|
}
|
|
|
|
void load_balancer_stats_manager::setup_metrics(const dc_name& dc, host_id node, load_balancer_node_stats& stats) {
|
|
namespace sm = seastar::metrics;
|
|
auto dc_lb = dc_label(dc);
|
|
auto node_lb = node_label(node);
|
|
_metrics.add_group(group_name, {
|
|
sm::make_gauge("load", sm::description("node load during last load balancing"),
|
|
stats.load)(dc_lb)(node_lb)
|
|
});
|
|
}
|
|
|
|
void load_balancer_stats_manager::setup_metrics(load_balancer_cluster_stats& stats) {
|
|
namespace sm = seastar::metrics;
|
|
// FIXME: we can probably improve it by making it per resize type (split, merge or none).
|
|
_metrics.add_group(group_name, {
|
|
sm::make_counter("resizes_emitted", sm::description("number of resizes produced by the load balancer"),
|
|
stats.resizes_emitted),
|
|
sm::make_counter("resizes_revoked", sm::description("number of resizes revoked by the load balancer"),
|
|
stats.resizes_revoked),
|
|
sm::make_counter("resizes_finalized", sm::description("number of resizes finalized by the load balancer"),
|
|
stats.resizes_finalized),
|
|
sm::make_counter("auto_repair_needs_repair_nr", sm::description("number of tablets with auto repair enabled that currently needs repair"),
|
|
stats.auto_repair_needs_repair_nr),
|
|
sm::make_counter("auto_repair_enabled_nr", sm::description("number of tablets with auto repair enabled"),
|
|
stats.auto_repair_enabled_nr)
|
|
});
|
|
}
|
|
|
|
load_balancer_stats_manager::load_balancer_stats_manager(sstring group_name):
|
|
group_name(std::move(group_name))
|
|
{
|
|
setup_metrics(_cluster_stats);
|
|
}
|
|
|
|
const lw_shared_ptr<load_balancer_dc_stats>& load_balancer_stats_manager::for_dc(const dc_name& dc) {
|
|
auto it = _dc_stats.find(dc);
|
|
if (it == _dc_stats.end()) {
|
|
auto stats = make_lw_shared<load_balancer_dc_stats>();
|
|
setup_metrics(dc, *stats);
|
|
it = _dc_stats.emplace(dc, std::move(stats)).first;
|
|
}
|
|
return it->second;
|
|
}
|
|
|
|
load_balancer_node_stats& load_balancer_stats_manager::for_node(const dc_name& dc, host_id node) {
|
|
auto it = _node_stats.find(node);
|
|
if (it == _node_stats.end()) {
|
|
auto stats = std::make_unique<load_balancer_node_stats>();
|
|
setup_metrics(dc, node, *stats);
|
|
it = _node_stats.emplace(node, std::move(stats)).first;
|
|
}
|
|
return *it->second;
|
|
}
|
|
|
|
load_balancer_cluster_stats& load_balancer_stats_manager::for_cluster() {
|
|
return _cluster_stats;
|
|
}
|
|
|
|
void load_balancer_stats_manager::unregister() {
|
|
_metrics.clear();
|
|
}
|
|
|
|
template <std::ranges::range R>
|
|
requires std::convertible_to<std::ranges::range_value_t<R>, db::tablet_options>
|
|
db::tablet_options combine_tablet_options(R&& opts) {
|
|
db::tablet_options combined_opts;
|
|
|
|
using data_size_type = decltype(db::tablet_options::expected_data_size_in_gb)::value_type;
|
|
data_size_type total_expected_data_size_in_gb = 0;
|
|
size_t total_expected_data_size_in_gb_count = 0;
|
|
|
|
for (const auto& opt : opts) {
|
|
if (opt.min_tablet_count) {
|
|
combined_opts.min_tablet_count = std::max(combined_opts.min_tablet_count.value_or(0), *opt.min_tablet_count);
|
|
}
|
|
if (opt.min_per_shard_tablet_count) {
|
|
combined_opts.min_per_shard_tablet_count = std::max(combined_opts.min_per_shard_tablet_count.value_or(0), *opt.min_per_shard_tablet_count);
|
|
}
|
|
if (opt.expected_data_size_in_gb) {
|
|
total_expected_data_size_in_gb += *opt.expected_data_size_in_gb;
|
|
total_expected_data_size_in_gb_count++;
|
|
}
|
|
if (opt.max_tablet_count) {
|
|
if (!combined_opts.max_tablet_count) {
|
|
combined_opts.max_tablet_count = *opt.max_tablet_count;
|
|
} else {
|
|
combined_opts.max_tablet_count = std::min(*combined_opts.max_tablet_count, *opt.max_tablet_count);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (total_expected_data_size_in_gb_count) {
|
|
combined_opts.expected_data_size_in_gb = total_expected_data_size_in_gb / total_expected_data_size_in_gb_count;
|
|
}
|
|
|
|
return combined_opts;
|
|
}
|
|
|
|
static std::unordered_set<locator::tablet_id> split_string_to_tablet_id(std::string_view s, char delimiter) {
|
|
auto tokens_view = s | std::views::split(delimiter)
|
|
| std::views::transform([](auto&& range) {
|
|
return std::string_view(&*range.begin(), std::ranges::distance(range));
|
|
})
|
|
| std::views::transform([](std::string_view sv) {
|
|
return locator::tablet_id(std::stoul(std::string(sv)));
|
|
});
|
|
return std::unordered_set<locator::tablet_id>{tokens_view.begin(), tokens_view.end()};
|
|
}
|
|
|
|
struct repair_plan {
|
|
locator::global_tablet_id gid;
|
|
locator::tablet_info tinfo;
|
|
dht::token_range range;
|
|
dht::token last_token;
|
|
db_clock::duration repair_time_diff;
|
|
bool is_user_reuqest;
|
|
};
|
|
|
|
// Used to compare different migration choices in regard to impact on load imbalance.
|
|
// There is a total order on migration_badness such that better migrations are ordered before worse ones.
|
|
struct migration_badness {
|
|
double src_shard_badness = 0;
|
|
double src_node_badness = 0;
|
|
double dst_shard_badness = 0;
|
|
double dst_node_badness = 0;
|
|
bool bad;
|
|
|
|
migration_badness()
|
|
: bad(false)
|
|
{}
|
|
|
|
migration_badness(double src_shard_badness, double src_node_badness, double dst_shard_badness, double dst_node_badness)
|
|
: src_shard_badness(src_shard_badness)
|
|
, src_node_badness(src_node_badness)
|
|
, dst_shard_badness(dst_shard_badness)
|
|
, dst_node_badness(dst_node_badness)
|
|
, bad(src_shard_badness > 0 || src_node_badness > 0 || dst_shard_badness > 0 || dst_node_badness > 0)
|
|
{}
|
|
|
|
double node_badness() const {
|
|
return std::max(src_node_badness, dst_node_badness);
|
|
}
|
|
|
|
double shard_badness() const {
|
|
return std::max(src_shard_badness, dst_shard_badness);
|
|
}
|
|
|
|
bool is_bad() const {
|
|
return bad;
|
|
}
|
|
|
|
bool operator<(const migration_badness& other) const {
|
|
// Prefer candidates with no across-node badness to those with across-node badness.
|
|
// Then, prefer those with lowest shard badness.
|
|
// We want to balance nodes first as balancing nodes internally between shards is cheap.
|
|
if (node_badness() == other.node_badness()) {
|
|
return shard_badness() < other.shard_badness();
|
|
}
|
|
if (node_badness() > 0 || other.node_badness() > 0) {
|
|
return node_badness() < other.node_badness();
|
|
}
|
|
return shard_badness() < other.shard_badness();
|
|
}
|
|
|
|
bool operator<=>(const migration_badness& other) const = default;
|
|
};
|
|
|
|
struct colocated_tablets {
|
|
global_tablet_id left_tablet;
|
|
global_tablet_id right_tablet;
|
|
|
|
auto operator<=>(const colocated_tablets&) const = default;
|
|
};
|
|
|
|
// Represents either a single tablet replica or co-located replicas of sibling
|
|
// tablets. The migration tablet set is logically treated by balancer as a single
|
|
// candidate. When candidate represents co-located replicas, it means that
|
|
// the balancer will work to preserve the co-location by migrating those replicas
|
|
// to same destination.
|
|
struct migration_tablet_set {
|
|
std::variant<global_tablet_id, colocated_tablets> tablet_s;
|
|
uint64_t tablet_set_disk_size = 0;
|
|
|
|
table_id table() const {
|
|
return std::visit(
|
|
overloaded_functor{
|
|
[](global_tablet_id t) { return t.table; },
|
|
[](colocated_tablets t) { return t.left_tablet.table; },
|
|
},
|
|
tablet_s);
|
|
}
|
|
|
|
using tablet_small_vector = utils::small_vector<global_tablet_id, 2>;
|
|
|
|
tablet_small_vector tablets() const {
|
|
return std::visit(
|
|
overloaded_functor{
|
|
[](global_tablet_id t) {
|
|
return tablet_small_vector{t}; },
|
|
[](colocated_tablets t) {
|
|
return tablet_small_vector{t.left_tablet, t.right_tablet};
|
|
},
|
|
},
|
|
tablet_s);
|
|
}
|
|
|
|
bool colocated() const {
|
|
return std::holds_alternative<colocated_tablets>(tablet_s);
|
|
}
|
|
|
|
bool operator==(const migration_tablet_set& rhs) const {
|
|
return tablet_s == rhs.tablet_s;
|
|
}
|
|
};
|
|
|
|
struct migration_candidate {
|
|
migration_tablet_set tablets;
|
|
tablet_replica src;
|
|
tablet_replica dst;
|
|
migration_badness badness;
|
|
};
|
|
|
|
struct colocation_source {
|
|
locator::global_tablet_id gid;
|
|
locator::tablet_replica replica;
|
|
};
|
|
|
|
using colocation_source_set = utils::chunked_vector<colocation_source>;
|
|
using colocation_sources_by_destination_rack = std::unordered_map<endpoint_dc_rack, colocation_source_set>;
|
|
|
|
struct rack_list_colocation_state {
|
|
colocation_sources_by_destination_rack dst_dc_rack_to_tablets;
|
|
std::unordered_map<endpoint_dc_rack, std::unordered_set<utils::UUID>> dst_to_requests;
|
|
utils::UUID request_to_resume;
|
|
|
|
void maybe_set_request_to_resume(const utils::UUID& id) {
|
|
if (!request_to_resume) {
|
|
request_to_resume = id;
|
|
}
|
|
}
|
|
};
|
|
|
|
/// Formattable wrapper for migration_plan, whose formatter prints a short summary of the plan.
|
|
struct plan_summary {
|
|
migration_plan& plan;
|
|
explicit plan_summary(migration_plan& plan) : plan(plan) {}
|
|
};
|
|
|
|
future<rack_list_colocation_state> find_required_rack_list_colocations(
|
|
replica::database& db,
|
|
token_metadata_ptr tmptr,
|
|
db::system_keyspace* sys_ks,
|
|
const std::unordered_set<utils::UUID>& paused_rf_change_requests,
|
|
const std::unordered_set<locator::global_tablet_id>& already_planned_migrations) {
|
|
rack_list_colocation_state state;
|
|
|
|
auto get_node = [&] (locator::host_id host) -> const locator::node& {
|
|
auto* node = tmptr->get_topology().find_node(host);
|
|
if (!node) {
|
|
on_internal_error(lblogger, format("Node {} not found in topology", host));
|
|
}
|
|
return *node;
|
|
};
|
|
for (const auto& request_id : paused_rf_change_requests) {
|
|
auto req_entry = co_await sys_ks->get_topology_request_entry(request_id);
|
|
sstring ks_name = *req_entry.new_keyspace_rf_change_ks_name;
|
|
|
|
if (!db.has_keyspace(ks_name)) {
|
|
state.maybe_set_request_to_resume(request_id);
|
|
continue;
|
|
}
|
|
auto& ks = db.find_keyspace(ks_name);
|
|
std::unordered_map<sstring, sstring> saved_ks_props = *req_entry.new_keyspace_rf_change_data;
|
|
cql3::statements::ks_prop_defs new_ks_props{std::map<sstring, sstring>{saved_ks_props.begin(), saved_ks_props.end()}};
|
|
new_ks_props.validate();
|
|
auto ks_md = new_ks_props.as_ks_metadata_update(ks.metadata(), *tmptr, db.features(), db.get_config());
|
|
|
|
auto tables_with_mvs = ks.metadata()->tables();
|
|
auto views = ks.metadata()->views();
|
|
tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end());
|
|
if (tables_with_mvs.empty()) {
|
|
state.maybe_set_request_to_resume(request_id);
|
|
continue;
|
|
}
|
|
bool no_changes_needed = true;
|
|
for (const auto& table_or_mv : tables_with_mvs) {
|
|
if (!tmptr->tablets().is_base_table(table_or_mv->id())) {
|
|
continue;
|
|
}
|
|
const auto& tmap = tmptr->tablets().get_tablet_map(table_or_mv->id());
|
|
const auto& new_replication_strategy_config = ks_md->strategy_options();
|
|
for (auto& [dc, rf_value] : new_replication_strategy_config) {
|
|
if (!std::holds_alternative<rack_list>(rf_value)) {
|
|
continue;
|
|
}
|
|
|
|
auto racks = std::get<rack_list>(rf_value) | std::ranges::to<std::unordered_set<sstring>>();
|
|
co_await tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& ti) -> future<> {
|
|
auto gid = locator::global_tablet_id{table_or_mv->id(), tid};
|
|
|
|
// Current replicas in this DC. There might be multiple replicas in the same rack.
|
|
auto dc_replicas = ti.replicas | std::views::filter([&] (const tablet_replica& r) {
|
|
return get_node(r.host).dc_rack().dc == dc;
|
|
}) | std::ranges::to<std::vector<tablet_replica>>();
|
|
|
|
if (dc_replicas.empty()) {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
// Find replicas that are not in the desired racks (src_replicas)
|
|
// and racks that do not have replicas yet (dst_racks).
|
|
auto dst_racks = racks;
|
|
std::vector<tablet_replica> src_replicas;
|
|
for (const auto& r : dc_replicas) {
|
|
auto rack = get_node(r.host).dc_rack().rack;
|
|
if (dst_racks.find(rack) != dst_racks.end()) {
|
|
// There is already a replica in this rack.
|
|
dst_racks.erase(rack);
|
|
} else {
|
|
// There is a replica in this rack, but it needs to be moved.
|
|
src_replicas.push_back(r);
|
|
}
|
|
}
|
|
|
|
auto zipped = std::views::zip(src_replicas, dst_racks);
|
|
if (!std::ranges::empty(zipped)) {
|
|
no_changes_needed = false;
|
|
}
|
|
|
|
// Skip tablet that is in transitions.
|
|
auto* tti = tmap.get_tablet_transition_info(tid);
|
|
if (tti) {
|
|
lblogger.debug("Skipped colocation for tablet={} which is already in transition={}", gid, tti->transition);
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
// Skip tablet that is about to be in transition.
|
|
if (already_planned_migrations.contains(gid)) {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
for (auto src_dst : zipped) {
|
|
auto src = std::get<0>(src_dst);
|
|
auto dst = std::get<1>(src_dst);
|
|
auto endpoint = locator::endpoint_dc_rack{dc, dst};
|
|
|
|
state.dst_dc_rack_to_tablets[endpoint].emplace_back(colocation_source{{table_or_mv->id(), tid}, src});
|
|
state.dst_to_requests[endpoint].insert(request_id);
|
|
}
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
}
|
|
if (no_changes_needed) {
|
|
state.maybe_set_request_to_resume(request_id);
|
|
}
|
|
}
|
|
co_return state;
|
|
}
|
|
|
|
future<bool> requires_rack_list_colocation(
|
|
replica::database& db,
|
|
locator::token_metadata_ptr tmptr,
|
|
db::system_keyspace* sys_ks,
|
|
utils::UUID request_id) {
|
|
auto res = co_await find_required_rack_list_colocations(db, tmptr, sys_ks, {request_id}, {});
|
|
co_return res.request_to_resume != request_id;
|
|
}
|
|
|
|
}
|
|
|
|
template<>
|
|
struct fmt::formatter<service::migration_badness> : fmt::formatter<std::string_view> {
|
|
template <typename FormatContext>
|
|
auto format(const service::migration_badness& badness, FormatContext& ctx) const {
|
|
return fmt::format_to(ctx.out(), "{{s: {:.4f}, n: {:.4f}}}", badness.shard_badness(), badness.node_badness());
|
|
}
|
|
};
|
|
|
|
template<>
|
|
struct fmt::formatter<service::migration_tablet_set> : fmt::formatter<std::string_view> {
|
|
template <typename FormatContext>
|
|
auto format(const service::migration_tablet_set& tablet_set, FormatContext& ctx) const {
|
|
if (tablet_set.colocated()) {
|
|
return fmt::format_to(ctx.out(), "{{colocated: {}}}", tablet_set.tablets());
|
|
}
|
|
return fmt::format_to(ctx.out(), "{}", tablet_set.tablets().front());
|
|
}
|
|
};
|
|
|
|
template<>
|
|
struct fmt::formatter<service::migration_candidate> : fmt::formatter<std::string_view> {
|
|
template <typename FormatContext>
|
|
auto format(const service::migration_candidate& candidate, FormatContext& ctx) const {
|
|
fmt::format_to(ctx.out(), "{{tablet: {}, {} -> {}, badness: {}", candidate.tablets, candidate.src,
|
|
candidate.dst, candidate.badness);
|
|
if (candidate.badness.is_bad()) {
|
|
fmt::format_to(ctx.out(), " (bad!)");
|
|
}
|
|
fmt::format_to(ctx.out(), "}}");
|
|
return ctx.out();
|
|
}
|
|
};
|
|
|
|
template<>
|
|
struct fmt::formatter<service::repair_plan> : fmt::formatter<std::string_view> {
|
|
template <typename FormatContext>
|
|
auto format(const service::repair_plan& p, FormatContext& ctx) const {
|
|
auto diff_seconds = std::chrono::duration<float>(p.repair_time_diff).count();
|
|
fmt::format_to(ctx.out(), "{{tablet={} last_token={} is_user_req={} diff_seconds={}}}", p.gid, p.last_token, p.is_user_reuqest, diff_seconds);
|
|
return ctx.out();
|
|
}
|
|
};
|
|
|
|
template<>
|
|
struct fmt::formatter<service::plan_summary> : fmt::formatter<std::string_view> {
|
|
template <typename FormatContext>
|
|
auto format(const service::plan_summary& p, FormatContext& ctx) const {
|
|
auto& plan = p.plan;
|
|
std::string_view delim = "";
|
|
auto get_delim = [&] { return std::exchange(delim, ", "); };
|
|
if (plan.migrations().size()) {
|
|
fmt::format_to(ctx.out(), "{}migrations: {}", get_delim(), plan.migrations().size());
|
|
}
|
|
if (plan.repair_plan().repairs().size()) {
|
|
fmt::format_to(ctx.out(), "{}repairs: {}", get_delim(), plan.repair_plan().repairs().size());
|
|
}
|
|
if (plan.resize_plan().resize.size()) {
|
|
fmt::format_to(ctx.out(), "{}resize: {}", get_delim(), plan.resize_plan().resize.size());
|
|
}
|
|
if (plan.resize_plan().finalize_resize.size()) {
|
|
fmt::format_to(ctx.out(), "{}resize-ready: {}", get_delim(), plan.resize_plan().finalize_resize.size());
|
|
}
|
|
if (plan.rack_list_colocation_plan().size()) {
|
|
fmt::format_to(ctx.out(), "{}rack-list colocation ready: {}", get_delim(), plan.rack_list_colocation_plan().request_to_resume());
|
|
}
|
|
if (delim.empty()) {
|
|
fmt::format_to(ctx.out(), "empty");
|
|
}
|
|
return ctx.out();
|
|
}
|
|
};
|
|
|
|
namespace std {
|
|
|
|
using namespace service;
|
|
|
|
template <>
|
|
struct hash<colocated_tablets> {
|
|
size_t operator()(const colocated_tablets& id) const {
|
|
return utils::hash_combine(std::hash<global_tablet_id>()(id.left_tablet),
|
|
std::hash<global_tablet_id>()(id.right_tablet));
|
|
}
|
|
};
|
|
|
|
template <>
|
|
struct hash<migration_tablet_set> {
|
|
size_t operator()(const migration_tablet_set& tablet_set) const {
|
|
return std::hash<decltype(migration_tablet_set::tablet_s)>()(tablet_set.tablet_s);
|
|
}
|
|
};
|
|
|
|
}
|
|
|
|
namespace service {
|
|
|
|
/// The algorithm aims to equalize tablet count on each shard.
|
|
/// This goal is based on the assumption that every shard has similar processing power and space capacity,
|
|
/// and that each tablet has equal consumption of those resources. So by equalizing tablet count per shard we
|
|
/// equalize resource utilization.
|
|
///
|
|
/// The algorithm produces a migration plan which is a set of instructions about which tablets to move
|
|
/// where. The plan is a small increment, not a complete plan. To achieve balance, the algorithm should
|
|
/// be invoked iteratively until an empty plan is returned.
|
|
///
|
|
/// The algorithm keeps track of load at two levels, per node and per shard. The reason for this is that
|
|
/// we want to equalize the per-node score first, by moving tablets across nodes. Tablets are moved away
|
|
/// from the most loaded node first. We also track load per shard, so that we move tablets from the most
|
|
/// loaded shard on a given node first.
|
|
///
|
|
/// The metric for node load is (number of tablets / shard count) which is the average
|
|
/// per-shard load. If we achieve balance according to this metric, and then rebalance the nodes internally,
|
|
/// we will achieve global balance on all shards in the cluster.
|
|
///
|
|
/// The reason why we focus on nodes first before rebalancing them internally is that this results
|
|
/// in less tablet movements than looking at shards only.
|
|
///
|
|
/// It would be still beneficial to rebalance tablet-receiving nodes internally before moving tablets
|
|
/// to them so that we can distribute load equally without overloading shards which are out of balance,
|
|
/// but this is not implemented yet.
|
|
///
|
|
/// The outline of the inter-node balancing algorithm is as follows:
|
|
///
|
|
/// 1. Determine the set of nodes whose load should be balanced.
|
|
/// 2. Divide the nodes into two sets, sources and destinations.
|
|
/// Tablets are only moved from sources to destinations.
|
|
/// When nodes are drained (e.g. on decommission), the drained nodes are sources and all other
|
|
/// nodes are destinations.
|
|
/// During free load balancing, we pick a single destination node which is the least loaded node
|
|
/// and all other nodes are sources.
|
|
/// 3. Move tablets from sources to destinations until load order between nodes would get inverted after the movement:
|
|
/// 3.1. Pick the most-loaded source node (src.host)
|
|
/// 3.1.1 Pick the most-loaded shard (src.shard) on src.host
|
|
/// 3.2. Pick the least-loaded destination node (dst.host)
|
|
/// 3.3. Pick the least-loaded shard (dst.shard) on dst.host
|
|
/// 3.4. If candidate is not chosen, pick the best candidate tablet on src to move to dst.
|
|
/// 3.5. If movement impact is bad:
|
|
/// 3.5.1. Consider moving from other shards on src.host and to other destination hosts and shards.
|
|
/// Picks the best candidate according to the impact of the movement on load imbalance.
|
|
/// 3.6. Evaluate collocation constraints for tablet replicas
|
|
/// 3.6.1. If met, schedule migration
|
|
/// 3.6.2. If not, add the tablet to the list of skipped tablets on src.host
|
|
///
|
|
///
|
|
/// Even though the algorithm focuses on a single target, the fact the the produced plan is just an increment
|
|
/// means that many under-loaded nodes can be driven forward to balance concurrently because the load balancer
|
|
/// will alternate between them across make_plan() calls.
|
|
///
|
|
/// The algorithm behaves differently when there are decommissioning nodes which have tablet replicas.
|
|
/// In this case, we move those tablets away first. The balancing works in the opposite direction.
|
|
/// Rather than picking a single least-loaded target and moving tablets into it from many sources,
|
|
/// we have a single source and move tablets to multiple targets. This process necessarily disregards
|
|
/// convergence checks, and the stop condition is that the source is drained. We still take target
|
|
/// load into consideration and pick least-loaded targets first. When draining is not possible
|
|
/// because there is no viable new replica for a tablet, load balancing will throw an exception.
|
|
///
|
|
/// After scheduling inter-node migrations, the algorithm schedules intra-node migrations.
|
|
/// This means that across-node migrations can proceed in parallel with intra-node migrations
|
|
/// if there is free capacity to carry them out, but across-node migrations have higher priority.
|
|
///
|
|
/// Intra-node migrations are scheduled for each node independently with the aim to equalize
|
|
/// per-shard tablet count on each node.
|
|
///
|
|
/// If the algorithm is called with active tablet migrations in tablet metadata, those are treated
|
|
/// by load balancer as if they were already completed. This allows the algorithm to incrementally
|
|
/// make decision which when executed with active migrations will produce the desired result.
|
|
/// Overload of shards which still contain migrated-away tablets is limited by the fact
|
|
/// that the algorithm tracks streaming concurrency on both source and target shards of active
|
|
/// migrations and takes concurrency limit into account when producing new migrations.
|
|
///
|
|
/// The cost of make_plan() is relatively heavy in terms of preparing data structures, so the current
|
|
/// implementation is not efficient if the scheduler would like to call make_plan() multiple times
|
|
/// to parallelize execution. This will be addressed in the future by keeping the data structures
|
|
/// valid across calls and only recalculating them when starting a new round with a new token metadata version.
|
|
///
|
|
class load_balancer {
|
|
using global_shard_id = tablet_replica;
|
|
using shard_id = seastar::shard_id;
|
|
|
|
// Represents metric for load which we want to equalize between shards or nodes.
|
|
// Load balancer equalizes storage utilization.
|
|
// In case force_capacity_based_balancing is true, it is assumed that each tablet has equal size and that
|
|
// shards and nodes can have different capacity. If force_capacity_based_balancing is false,
|
|
// tablet sizes are fetched from load_stats.
|
|
// So we equalize: sum of tablet_sizes / capacity_in_bytes.
|
|
using load_type = double;
|
|
|
|
using table_candidates_map = std::unordered_map<table_id, std::unordered_set<migration_tablet_set>>;
|
|
|
|
struct shard_load {
|
|
size_t tablet_count = 0;
|
|
std::optional<disk_usage> dusage;
|
|
|
|
absl::flat_hash_map<table_id, size_t> tablet_count_per_table;
|
|
absl::flat_hash_map<table_id, uint64_t> tablet_sizes_per_table;
|
|
|
|
// Number of tablets which are streamed from this shard.
|
|
size_t streaming_read_load = 0;
|
|
|
|
// Number of tablets which are streamed to this shard.
|
|
size_t streaming_write_load = 0;
|
|
|
|
// Tablets which still have a replica on this shard which are candidates for migrating away from this shard.
|
|
// Grouped by table. Used when _use_table_aware_balancing == true.
|
|
// The set of candidates per table may be empty.
|
|
table_candidates_map candidates;
|
|
// For all tables. Used when _use_table_aware_balancing == false.
|
|
std::unordered_set<migration_tablet_set> candidates_all_tables;
|
|
|
|
future<> clear_gently() {
|
|
co_await utils::clear_gently(candidates);
|
|
co_await utils::clear_gently(candidates_all_tables);
|
|
}
|
|
|
|
bool has_candidates() const {
|
|
for (const auto& [table, tablets] : candidates) {
|
|
if (!tablets.empty()) {
|
|
return true;
|
|
}
|
|
}
|
|
return !candidates_all_tables.empty();
|
|
}
|
|
|
|
size_t candidate_count() const {
|
|
size_t result = 0;
|
|
for (const auto& [table, tablets] : candidates) {
|
|
result += tablets.size();
|
|
}
|
|
return result + candidates_all_tables.size();
|
|
}
|
|
};
|
|
|
|
struct skipped_candidate {
|
|
tablet_replica replica;
|
|
migration_tablet_set tablets;
|
|
std::unordered_set<host_id> viable_targets;
|
|
};
|
|
|
|
struct node_load {
|
|
host_id id;
|
|
uint64_t shard_count = 0;
|
|
uint64_t tablet_count = 0;
|
|
std::optional<disk_usage> dusage; // Invariant: bool(dusage) || drained.
|
|
bool drained = false;
|
|
bool excluded = false;
|
|
|
|
// Engaged if and only if drained == true.
|
|
// Determines whether the action is to migrate (leave request) or rebuild (remove request).
|
|
// Looking at is_excluded() is not sufficient because a node may be marked as excluded during decommission,
|
|
// and we don't want to silently upgrade it to a remove operation, which accepts a replica loss.
|
|
std::optional<topology_request> req;
|
|
|
|
const locator::node* node; // never nullptr
|
|
|
|
// The average shard load on this node.
|
|
// Valid only when "dusage" is set.
|
|
load_type avg_load = 0;
|
|
|
|
absl::flat_hash_map<table_id, size_t> tablet_count_per_table;
|
|
absl::flat_hash_map<table_id, uint64_t> tablet_sizes_per_table;
|
|
|
|
// heap which tracks most-loaded shards using shards_by_load_cmp().
|
|
// Valid during intra-node plan-making for nodes which are in the source node set.
|
|
std::vector<shard_id> shards_by_load;
|
|
|
|
std::vector<shard_load> shards; // Indexed by shard_id to which a given shard_load corresponds.
|
|
|
|
utils::chunked_vector<skipped_candidate> skipped_candidates;
|
|
|
|
std::optional<double> capacity_per_shard() const {
|
|
return dusage.transform([&] (auto du) {
|
|
return load_type(du.capacity) / shard_count;
|
|
});
|
|
}
|
|
|
|
const sstring& dc() const {
|
|
return node->dc_rack().dc;
|
|
}
|
|
|
|
const sstring& rack() const {
|
|
return node->dc_rack().rack;
|
|
}
|
|
|
|
locator::node::state state() const {
|
|
return node->get_state();
|
|
}
|
|
|
|
// Call when tablet_count or capacity changes.
|
|
void update() {
|
|
if (auto load = get_avg_load()) {
|
|
avg_load = *load;
|
|
}
|
|
}
|
|
|
|
// Result engaged when !drained.
|
|
std::optional<load_type> get_avg_load(uint64_t used_size_delta = 0) const {
|
|
return dusage.transform([&] (auto du) {
|
|
du.used += used_size_delta;
|
|
return du.get_load();
|
|
});
|
|
}
|
|
|
|
double tablets_per_shard(uint64_t tablets) const {
|
|
return double(tablets) / shard_count;
|
|
}
|
|
|
|
double tablets_per_shard() const {
|
|
return tablets_per_shard(tablet_count);
|
|
}
|
|
|
|
// Result engaged for !drained nodes.
|
|
std::optional<load_type> shard_load(shard_id shard, int64_t used_size_delta = 0) const {
|
|
return shards[shard].dusage.transform([&] (auto du) {
|
|
du.used += used_size_delta;
|
|
return du.get_load();
|
|
});
|
|
}
|
|
|
|
auto shards_by_load_cmp() {
|
|
return [this] (const auto& a, const auto& b) {
|
|
if (dusage) {
|
|
return shards[a].dusage->get_load() < shards[b].dusage->get_load();
|
|
} else {
|
|
return shards[a].tablet_count < shards[b].tablet_count;
|
|
}
|
|
};
|
|
}
|
|
|
|
future<> clear_gently() {
|
|
co_await utils::clear_gently(shards);
|
|
co_await utils::clear_gently(skipped_candidates);
|
|
}
|
|
};
|
|
|
|
// Data structure used for making load-balancing decisions over a set of nodes.
|
|
using node_load_map = std::unordered_map<host_id, node_load>;
|
|
|
|
// Less-comparator which orders nodes by load.
|
|
struct nodes_by_load_cmp {
|
|
node_load_map& nodes;
|
|
|
|
bool operator()(host_id a, host_id b) const {
|
|
return nodes[a].avg_load < nodes[b].avg_load;
|
|
}
|
|
};
|
|
|
|
// We have split and merge thresholds, which work respectively as (target) upper and lower
|
|
// bound for average size of tablets.
|
|
//
|
|
// The merge threshold is 50% of target tablet size (a midpoint between split and merge),
|
|
// such that after a merge, the average size is equally far from split and merge.
|
|
// The same applies to split. It's 100% of target size, so after split, the average is
|
|
// close to the target size (assuming small variations during the operation).
|
|
//
|
|
// It might happen that during a resize decision, average size changes drastically, and
|
|
// split or merge might get cancelled. E.g. after deleting a large partition or lots of
|
|
// data becoming suddenly expired.
|
|
// If we're splitting, we will only cancel it, if the average size dropped below the
|
|
// target size. That's because a merge would be required right after split completes,
|
|
// due to the average size dropping below the merge threshold, as tablet count doubles.
|
|
const uint64_t _target_tablet_size = default_target_tablet_size;
|
|
|
|
const unsigned _tablets_per_shard_goal;
|
|
|
|
uint64_t target_max_tablet_size(uint64_t target_tablet_size) const noexcept {
|
|
return target_tablet_size * 2;
|
|
}
|
|
|
|
uint64_t target_min_tablet_size(uint64_t target_tablet_size) const noexcept {
|
|
return target_tablet_size / 2;
|
|
}
|
|
|
|
struct table_size_desc {
|
|
uint64_t target_tablet_size;
|
|
uint64_t avg_tablet_size;
|
|
locator::resize_decision resize_decision;
|
|
locator::resize_decision new_resize_decision;
|
|
size_t tablet_count;
|
|
size_t shard_count;
|
|
sstring reason; // reason for target_tablet_count
|
|
};
|
|
|
|
struct cluster_resize_load {
|
|
using table_id_and_size_desc = std::pair<table_id, table_size_desc>;
|
|
std::vector<table_id_and_size_desc> tables_need_resize;
|
|
std::vector<table_id_and_size_desc> tables_being_resized;
|
|
|
|
static locator::resize_decision to_resize_decision(const table_size_desc& d) {
|
|
return d.new_resize_decision;
|
|
}
|
|
|
|
bool table_needs_resize(const table_size_desc& d) const {
|
|
return to_resize_decision(d).split_or_merge();
|
|
}
|
|
|
|
// Resize cancellation will account for possible oscillations caused by compaction, etc.
|
|
// We shouldn't rush into cancelling an ongoing resize. That will only happen if the
|
|
// average size is past the point it would be if either split or merge had completed.
|
|
// If we cancel a split, that's because average size dropped so much a merge would be
|
|
// required post completion, and vice-versa.
|
|
bool table_needs_resize_cancellation(const table_size_desc& d) const {
|
|
if (utils::get_local_injector().enter("force_resize_cancellation")) {
|
|
return true;
|
|
}
|
|
return d.resize_decision.split_or_merge() && to_resize_decision(d).way != d.resize_decision.way;
|
|
}
|
|
|
|
void update(table_id id, table_size_desc d) {
|
|
bool table_undergoing_resize = d.resize_decision.split_or_merge();
|
|
|
|
// Resizing tables that no longer need resize will have the resize decision revoked,
|
|
// therefore they must be listed as being resized.
|
|
if (!table_needs_resize(d) && !table_undergoing_resize) {
|
|
return;
|
|
}
|
|
|
|
auto entry = std::make_pair(id, std::move(d));
|
|
if (table_undergoing_resize) {
|
|
tables_being_resized.push_back(entry);
|
|
} else {
|
|
tables_need_resize.push_back(entry);
|
|
}
|
|
}
|
|
|
|
// Comparator that measures the weight of the need for resizing.
|
|
auto resize_urgency_cmp() const {
|
|
return [] (const table_id_and_size_desc& a, const table_id_and_size_desc& b) {
|
|
auto urgency = [] (const table_size_desc& d) -> double {
|
|
// FIXME: only takes into account split today.
|
|
return double(d.avg_tablet_size);
|
|
};
|
|
return urgency(a.second) < urgency(b.second);
|
|
};
|
|
}
|
|
|
|
// Resize decisions can be revoked with an empty (none) decision, so replicas
|
|
// will know they're no longer required to prepare storage for the execution of
|
|
// topology changes.
|
|
static locator::resize_decision revoke_resize_decision() {
|
|
return locator::resize_decision{};
|
|
}
|
|
};
|
|
|
|
// Per-shard limits for active tablet streaming sessions.
|
|
//
|
|
// There is no hard reason for these values being what they are other than
|
|
// the guidelines below.
|
|
//
|
|
// We want to limit concurrency of active streaming for several reasons.
|
|
// One is that we want to prevent over-utilization of memory required to carry out streaming,
|
|
// as that may lead to OOM or excessive cache eviction.
|
|
//
|
|
// There is no network scheduler yet, so we want to avoid over-utilization of network bandwidth.
|
|
// Limiting per-shard concurrency is a lame way to achieve that, but it's better than nothing.
|
|
//
|
|
// Scheduling groups should limit impact of streaming on other kinds of processes on the same node,
|
|
// so this aspect is not the reason for limiting concurrency.
|
|
//
|
|
// We don't want too much parallelism because it means that we have plenty of migrations
|
|
// which progress slowly. It's better to have fewer which complete faster because
|
|
// less user requests suffer from double-quorum overhead, and under-loaded nodes can take
|
|
// the load sooner. At the same time, we want to have enough concurrency to fully utilize resources.
|
|
//
|
|
// Streaming speed is supposed to be I/O bound and writes are more expensive in terms of IO than reads,
|
|
// so we allow more read concurrency.
|
|
//
|
|
// We allow at least two sessions per shard so that there is less chance for idling until load balancer
|
|
// makes the next decision after streaming is finished.
|
|
size_t max_write_streaming_load;
|
|
size_t max_read_streaming_load;
|
|
|
|
replica::database& _db;
|
|
token_metadata_ptr _tm;
|
|
service::topology* _topology;
|
|
db::system_keyspace* _sys_ks;
|
|
std::optional<locator::load_sketch> _load_sketch;
|
|
// Holds the set of tablets already scheduled for transition during plan-making.
|
|
std::unordered_set<global_tablet_id> _scheduled_tablets;
|
|
// Holds tablet replica count per table in the balanced node set (within a single DC).
|
|
absl::flat_hash_map<table_id, size_t> _tablet_count_per_table;
|
|
// Holds total used storage per table in the DC
|
|
absl::flat_hash_map<table_id, uint64_t> _disk_used_per_table;
|
|
dc_name _dc;
|
|
std::optional<sstring> _rack; // Set when plan making is limited to a single rack.
|
|
sstring _location; // Name of the current scope of plan making. DC or DC+rack.
|
|
lw_shared_ptr<load_balancer_dc_stats> _current_stats; // Stats for current scope of plan making.
|
|
size_t _total_capacity_shards; // Total number of non-drained shards in the balanced node set.
|
|
size_t _total_capacity_nodes; // Total number of non-drained nodes in the balanced node set.
|
|
uint64_t _total_capacity_storage; // Total storage of non-drained nodes in the balanced node set.
|
|
size_t _migrating_candidates; // Number of candidate replicas skipped because tablet is migrating.
|
|
locator::load_stats_ptr _table_load_stats;
|
|
load_balancer_stats_manager& _stats;
|
|
std::unordered_set<host_id> _skiplist;
|
|
bool _use_table_aware_balancing = true;
|
|
double _initial_scale = 1;
|
|
|
|
// This is the maximum load delta between the most and least loaded nodes,
|
|
// below which the balancer considers the DC balanced
|
|
double _size_based_balance_threshold = 0.01;
|
|
|
|
// When this is set to true, the balancer assumes all tablets
|
|
// have the same size: _target_tablet_size
|
|
bool _force_capacity_based_balancing = false;
|
|
|
|
// The minimal tablet size the balancer will compute load with. For any tablet smaller than this,
|
|
// the balancer will use this size instead of the actual tablet size.
|
|
uint64_t _minimal_tablet_size = service::default_target_tablet_size / 100;
|
|
|
|
private:
|
|
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
|
|
// We reflect migrations in the load as if they already happened,
|
|
// optimistically assuming that they will succeed.
|
|
return trinfo ? trinfo->next : ti.replicas;
|
|
}
|
|
|
|
tablet_replica_set sorted_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
|
|
auto set = get_replicas_for_tablet_load(ti, trinfo);
|
|
std::ranges::sort(set, std::less<tablet_replica>());
|
|
return set;
|
|
}
|
|
|
|
// Whether to count the tablet as putting streaming load on the system.
|
|
// Tablets which are streaming or are yet-to-stream are counted.
|
|
bool is_streaming(const tablet_transition_info* trinfo) {
|
|
if (!trinfo) {
|
|
return false;
|
|
}
|
|
switch (trinfo->stage) {
|
|
case tablet_transition_stage::allow_write_both_read_old:
|
|
return true;
|
|
case tablet_transition_stage::write_both_read_old:
|
|
return true;
|
|
case tablet_transition_stage::write_both_read_old_fallback_cleanup:
|
|
return false;
|
|
case tablet_transition_stage::streaming:
|
|
return true;
|
|
case tablet_transition_stage::rebuild_repair:
|
|
return true;
|
|
case tablet_transition_stage::repair:
|
|
return true;
|
|
case tablet_transition_stage::end_repair:
|
|
return false;
|
|
case tablet_transition_stage::write_both_read_new:
|
|
return false;
|
|
case tablet_transition_stage::use_new:
|
|
return false;
|
|
case tablet_transition_stage::cleanup:
|
|
return false;
|
|
case tablet_transition_stage::cleanup_target:
|
|
return false;
|
|
case tablet_transition_stage::revert_migration:
|
|
return false;
|
|
case tablet_transition_stage::end_migration:
|
|
return false;
|
|
}
|
|
on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast<int>(trinfo->stage)));
|
|
}
|
|
|
|
using migration_vector = migration_plan::migrations_vector;
|
|
static migration_vector
|
|
get_migration_info(const migration_tablet_set& tablet_set, tablet_transition_kind kind, tablet_replica src, tablet_replica dst) {
|
|
migration_vector infos;
|
|
for (auto tablet : tablet_set.tablets()) {
|
|
infos.push_back(tablet_migration_info{kind, tablet, src, dst});
|
|
}
|
|
return infos;
|
|
}
|
|
|
|
using migration_streaming_info_vector = utils::small_vector<tablet_migration_streaming_info, 2>;
|
|
static migration_streaming_info_vector
|
|
get_migration_streaming_infos(const locator::topology& topology, const tablet_map& tmap, const migration_vector& infos) {
|
|
migration_streaming_info_vector streaming_infos;
|
|
for (auto& info : infos) {
|
|
auto& ti = tmap.get_tablet_info(info.tablet.tablet);
|
|
streaming_infos.push_back(get_migration_streaming_info(topology, ti, info));
|
|
}
|
|
return streaming_infos;
|
|
}
|
|
public:
|
|
load_balancer(replica::database& db, token_metadata_ptr tm,
|
|
service::topology* topology,
|
|
db::system_keyspace* sys_ks,
|
|
locator::load_stats_ptr table_load_stats,
|
|
load_balancer_stats_manager& stats,
|
|
uint64_t target_tablet_size,
|
|
unsigned tablets_per_shard_goal,
|
|
std::unordered_set<host_id> skiplist)
|
|
: _target_tablet_size(target_tablet_size)
|
|
, _tablets_per_shard_goal(tablets_per_shard_goal)
|
|
, _db(db)
|
|
, _tm(std::move(tm))
|
|
, _topology(topology)
|
|
, _sys_ks(sys_ks)
|
|
, _table_load_stats(std::move(table_load_stats))
|
|
, _stats(stats)
|
|
, _skiplist(std::move(skiplist))
|
|
, _size_based_balance_threshold(db.get_config().size_based_balance_threshold_percentage() / 100.0)
|
|
, _force_capacity_based_balancing(db.get_config().force_capacity_based_balancing())
|
|
, _minimal_tablet_size(db.get_config().minimal_tablet_size_for_balancing()) {
|
|
|
|
// Force capacity based balancing until all the nodes have been upgraded
|
|
if (!_db.features().size_based_load_balancing && !_force_capacity_based_balancing) {
|
|
lblogger.info("Size based load balancing cluster feature disabled; forcing capacity based balancing");
|
|
_force_capacity_based_balancing = true;
|
|
}
|
|
max_read_streaming_load = db.get_config().tablet_streaming_read_concurrency_per_shard();
|
|
max_write_streaming_load = db.get_config().tablet_streaming_write_concurrency_per_shard();
|
|
}
|
|
|
|
bool ongoing_rack_list_colocation() const {
|
|
return _topology != nullptr && _sys_ks != nullptr && !_topology->paused_rf_change_requests.empty();
|
|
}
|
|
|
|
future<migration_plan> make_plan() {
|
|
const locator::topology& topo = _tm->get_topology();
|
|
migration_plan plan;
|
|
|
|
auto rack_list_colocation = ongoing_rack_list_colocation();
|
|
|
|
// Prepare plans for each DC separately and combine them to be executed in parallel.
|
|
for (auto&& dc : topo.get_datacenters()) {
|
|
if (_db.get_config().rf_rack_valid_keyspaces() || _db.get_config().enforce_rack_list() || rack_list_colocation) {
|
|
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
|
|
auto rack_plan = co_await make_plan(dc, rack);
|
|
auto level = rack_plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
|
|
lblogger.log(level, "Plan for {}/{}: {}", dc, rack, plan_summary(rack_plan));
|
|
plan.merge(std::move(rack_plan));
|
|
}
|
|
} else {
|
|
auto dc_plan = co_await make_plan(dc);
|
|
auto level = dc_plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
|
|
lblogger.log(level, "Plan for {}: {}", dc, plan_summary(dc_plan));
|
|
plan.merge(std::move(dc_plan));
|
|
}
|
|
}
|
|
|
|
if (rack_list_colocation) {
|
|
plan.merge(co_await make_rack_list_colocation_plan(plan));
|
|
}
|
|
|
|
// Merge table-wide resize decisions, may emit new decisions, revoke or finalize ongoing ones.
|
|
// Note : Resize plans should be generated before repair plans to avoid scheduling repairs when there is pending resize finalization
|
|
plan.merge_resize_plan(co_await make_resize_plan(plan));
|
|
|
|
// Skip making repair plans if resize finalizations are pending, since repairs could delay finalization.
|
|
if (plan.resize_plan().finalize_resize.empty()) {
|
|
plan.set_repair_plan(co_await make_repair_plan(plan));
|
|
}
|
|
|
|
auto level = plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
|
|
lblogger.log(level, "Prepared plan: {}", plan_summary(plan));
|
|
co_return std::move(plan);
|
|
}
|
|
|
|
void set_use_table_aware_balancing(bool use_table_aware_balancing) {
|
|
_use_table_aware_balancing = use_table_aware_balancing;
|
|
}
|
|
|
|
void set_initial_scale(double initial_scale) {
|
|
_initial_scale = initial_scale;
|
|
}
|
|
|
|
const locator::table_load_stats* load_stats_for_table(table_id id) const {
|
|
if (!_table_load_stats) {
|
|
return nullptr;
|
|
}
|
|
auto it = _table_load_stats->tables.find(id);
|
|
return (it != _table_load_stats->tables.end()) ? &it->second : nullptr;
|
|
}
|
|
|
|
std::optional<uint64_t> get_tablet_size(host_id host, const range_based_tablet_id& rb_tid, const tablet_info& ti, const tablet_transition_info* trinfo) const {
|
|
if (_table_load_stats) {
|
|
return _table_load_stats->get_tablet_size_in_transition(host, rb_tid, ti, trinfo);
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
bool is_auto_repair_enabled(const std::optional<locator::repair_scheduler_config>& config) {
|
|
// Only check the yaml config for now
|
|
return _db.get_config().auto_repair_enabled_default();
|
|
}
|
|
|
|
future<bool> needs_auto_repair(const locator::global_tablet_id& gid, const locator::tablet_info& info,
|
|
const std::optional<locator::repair_scheduler_config>& config, const db_clock::time_point& now,
|
|
db_clock::duration& diff, service::auto_repair_stats& stats) {
|
|
if (utils::get_local_injector().enter("tablet_keep_repairing")) {
|
|
lblogger.info("Forced auto-repair for tablet={}", gid);
|
|
co_return true;
|
|
}
|
|
if (!is_auto_repair_enabled(config)) {
|
|
co_return false;
|
|
}
|
|
auto size = info.replicas.size();
|
|
if (size <= 1) {
|
|
lblogger.debug("Skipped auto repair for tablet={} replicas={}", gid, size);
|
|
co_return false;
|
|
}
|
|
auto threshold = _db.get_config().auto_repair_threshold_default_in_seconds();
|
|
auto repair_time_threshold = std::chrono::seconds(threshold);
|
|
auto& last_repair_time = info.repair_time;
|
|
diff = now - last_repair_time;
|
|
lblogger.trace("Check gid={} diff={} last_repair_time={} repair_time_threshold={}",
|
|
gid, diff, info.repair_time, repair_time_threshold);
|
|
if (diff < repair_time_threshold) {
|
|
co_return false;
|
|
}
|
|
stats.needs_repair_nr++;
|
|
co_return true;
|
|
}
|
|
|
|
void ensure_node(node_load_map& nodes, host_id host) {
|
|
if (nodes.contains(host)) {
|
|
return;
|
|
}
|
|
const locator::topology& topo = _tm->get_topology();
|
|
auto* node = topo.find_node(host);
|
|
if (!node) {
|
|
on_internal_error(lblogger, format("Node {} not found in topology", host));
|
|
}
|
|
node_load& load = nodes[host];
|
|
load.id = host;
|
|
load.node = node;
|
|
load.shard_count = node->get_shard_count();
|
|
load.excluded = node->is_excluded();
|
|
if (!load.shard_count) {
|
|
throw std::runtime_error(format("Shard count of {} not found in topology", host));
|
|
}
|
|
if (!_db.features().tablet_load_stats_v2) {
|
|
// This way load calculation will hold tablet count.
|
|
load.dusage = disk_usage{_target_tablet_size * load.shard_count, 0};
|
|
} else if (_table_load_stats) {
|
|
if (_table_load_stats->tablet_stats.contains(host) && !_force_capacity_based_balancing) {
|
|
load.dusage = disk_usage{_table_load_stats->tablet_stats.at(host).effective_capacity, 0};
|
|
} else if (_table_load_stats->capacity.contains(host)) {
|
|
load.dusage = disk_usage{_table_load_stats->capacity.at(host), 0};
|
|
}
|
|
}
|
|
|
|
load.shards.resize(load.shard_count);
|
|
if (load.dusage) {
|
|
for (auto& sload : load.shards) {
|
|
sload.dusage = disk_usage{ load.dusage->capacity / load.shard_count, 0 };
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> consider_scheduled_load(node_load_map& nodes) {
|
|
const locator::topology& topo = _tm->get_topology();
|
|
for (auto&& [table, tables] : _tm->tablets().all_table_groups()) {
|
|
const auto& tmap = _tm->tablets().get_tablet_map(table);
|
|
for (auto&& [tid, trinfo]: tmap.transitions()) {
|
|
co_await coroutine::maybe_yield();
|
|
if (is_streaming(&trinfo)) {
|
|
auto& tinfo = tmap.get_tablet_info(tid);
|
|
apply_load(nodes, get_migration_streaming_info(topo, tinfo, trinfo));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> consider_planned_load(node_load_map& nodes, const migration_plan& mplan) {
|
|
const locator::topology& topo = _tm->get_topology();
|
|
auto& tablet_meta = _tm->tablets();
|
|
|
|
for (const tablet_migration_info& tmi : mplan.migrations()) {
|
|
co_await coroutine::maybe_yield();
|
|
auto& tmap = tablet_meta.get_tablet_map(tmi.tablet.table);
|
|
auto& tinfo = tmap.get_tablet_info(tmi.tablet.tablet);
|
|
auto streaming_info = get_migration_streaming_info(topo, tinfo, tmi);
|
|
apply_load(nodes, streaming_info);
|
|
}
|
|
}
|
|
|
|
future<tablet_repair_plan> make_repair_plan(const migration_plan& mplan) {
|
|
lblogger.debug("In make_repair_plan");
|
|
|
|
auto ret = tablet_repair_plan();
|
|
|
|
if (!_db.features().tablet_repair_scheduler) {
|
|
lblogger.debug("make_repair_plan: The TABLET_REPAIR_SCHEDULER feature is not enabled");
|
|
co_return ret;
|
|
}
|
|
|
|
const locator::topology& topo = _tm->get_topology();
|
|
|
|
// Populate the load of the migration that is already in the plan
|
|
node_load_map nodes;
|
|
// TODO: share code with make_plan()
|
|
topo.for_each_node([&] (const locator::node& node) {
|
|
bool is_drained = node.get_state() == locator::node::state::being_decommissioned
|
|
|| node.get_state() == locator::node::state::being_removed;
|
|
if (node.get_state() == locator::node::state::normal || is_drained) {
|
|
ensure_node(nodes, node.host_id());
|
|
}
|
|
});
|
|
|
|
// Consider load that is already scheduled
|
|
co_await consider_scheduled_load(nodes);
|
|
|
|
// Consider load that is about to be scheduled
|
|
co_await consider_planned_load(nodes, mplan);
|
|
|
|
service::auto_repair_stats auto_repair_stats;
|
|
|
|
utils::chunked_vector<repair_plan> plans;
|
|
auto migration_tablet_ids = co_await mplan.get_migration_tablet_ids();
|
|
for (auto&& [table, tables] : _tm->tablets().all_table_groups()) {
|
|
const auto& tmap = _tm->tablets().get_tablet_map(table);
|
|
co_await coroutine::maybe_yield();
|
|
auto config = tmap.get_repair_scheduler_config();
|
|
auto auto_repair_enabled = is_auto_repair_enabled(config);
|
|
auto now = db_clock::now();
|
|
auto skip = utils::get_local_injector().inject_parameter<std::string_view>("tablet_repair_skip_sched");
|
|
auto skip_tablets = skip ? split_string_to_tablet_id(*skip, ',') : std::unordered_set<locator::tablet_id>();
|
|
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
|
|
auto gid = locator::global_tablet_id{table, id};
|
|
if (auto_repair_enabled) {
|
|
auto_repair_stats.enabled_nr++;
|
|
}
|
|
// Skip tablet that is in transitions.
|
|
auto* tti = tmap.get_tablet_transition_info(id);
|
|
if (tti) {
|
|
lblogger.debug("Skipped tablet repair for tablet={} which is already in transition={}", gid, tti->transition);
|
|
co_return;
|
|
}
|
|
|
|
// Skip the tablet that is about to be in transition.
|
|
if (migration_tablet_ids.contains(gid)) {
|
|
co_return;
|
|
}
|
|
|
|
// Skip the tablet that has excluded replica node.
|
|
auto& tinfo = tmap.get_tablet_info(id);
|
|
if (tablet_has_excluded_node(topo, tinfo)) {
|
|
co_return;
|
|
}
|
|
|
|
if (skip_tablets.contains(id)) {
|
|
lblogger.debug("Skipped tablet repair for tablet={} by error injector", gid);
|
|
co_return;
|
|
}
|
|
|
|
// Avoid rescheduling a failed tablet repair in a loop
|
|
// TODO: Allow user to config
|
|
const auto min_reschedule_time = std::chrono::seconds(5);
|
|
if (now - info.repair_task_info.sched_time < min_reschedule_time) {
|
|
lblogger.debug("Skipped tablet repair for tablet={} which is scheduled too frequently", gid);
|
|
co_return;
|
|
}
|
|
|
|
db_clock::duration diff;
|
|
auto is_user_reuqest = info.repair_task_info.is_user_repair_request();
|
|
if (is_user_reuqest) {
|
|
// This means the user has issued a repair request manually. Select it for repair scheduling.
|
|
} else {
|
|
auto auto_repair = co_await needs_auto_repair(gid, info, config, now, diff, auto_repair_stats);
|
|
if (!auto_repair) {
|
|
co_return;
|
|
}
|
|
}
|
|
auto range = tmap.get_token_range(id);
|
|
auto last_token = tmap.get_last_token(id);
|
|
plans.push_back(repair_plan{gid, info, range, last_token, diff, is_user_reuqest});
|
|
});
|
|
}
|
|
|
|
_stats.for_cluster().auto_repair_needs_repair_nr = auto_repair_stats.needs_repair_nr;
|
|
_stats.for_cluster().auto_repair_enabled_nr = auto_repair_stats.enabled_nr;
|
|
|
|
// TODO: we could add other factors in addition to the repair time when
|
|
// picking which tablet to repair, e.g., higher repair priority
|
|
// specified by user, tablet with higher purgeable tombstone ratio.
|
|
std::sort(plans.begin(), plans.end(), [] (const repair_plan& x, const repair_plan& y) {
|
|
if (x.is_user_reuqest != y.is_user_reuqest) {
|
|
return x.is_user_reuqest > y.is_user_reuqest;
|
|
}
|
|
return x.repair_time_diff > y.repair_time_diff;
|
|
});
|
|
|
|
|
|
if (utils::get_local_injector().enter("tablet_dump_repair_plan")) {
|
|
lblogger.info("dump_repair_plans=[{}]", fmt::join(plans, ","));
|
|
}
|
|
|
|
auto trinfo = tablet_transition_info(locator::tablet_transition_stage::repair,
|
|
locator::tablet_transition_kind::repair, tablet_replica_set(), {}, service::session_id());
|
|
for (auto& plan : plans) {
|
|
co_await coroutine::maybe_yield();
|
|
tablet_migration_streaming_info tmsi;
|
|
tmsi = get_migration_streaming_info(topo, plan.tinfo, trinfo);
|
|
if (can_accept_load(nodes, tmsi)) {
|
|
apply_load(nodes, tmsi);
|
|
ret.add(plan.gid);
|
|
}
|
|
}
|
|
|
|
if (utils::get_local_injector().enter("tablet_skip_repair_plan")) {
|
|
lblogger.info("Skip repair plan due to error injection=tablet_skip_repair_plan");
|
|
co_return tablet_repair_plan();
|
|
}
|
|
|
|
co_return ret;
|
|
}
|
|
|
|
future<migration_plan> make_rack_list_colocation_plan(const migration_plan& mplan) {
|
|
lblogger.debug("In make_rack_list_colocation_plan");
|
|
|
|
migration_plan plan;
|
|
tablet_rack_list_colocation_plan rack_list_plan;
|
|
if (!ongoing_rack_list_colocation() || utils::get_local_injector().enter("wait_with_rack_list_colocation")) {
|
|
co_return plan;
|
|
}
|
|
|
|
const locator::topology& topo = _tm->get_topology();
|
|
|
|
auto migration_tablet_ids = co_await mplan.get_migration_tablet_ids();
|
|
auto colocation_state = co_await find_required_rack_list_colocations(_db, _tm, _sys_ks,
|
|
_topology->paused_rf_change_requests, std::move(migration_tablet_ids));
|
|
|
|
node_load_map nodes;
|
|
topo.for_each_node([&] (const locator::node& node) {
|
|
if (node.get_state() == locator::node::state::normal && !node.is_excluded()) {
|
|
ensure_node(nodes, node.host_id());
|
|
}
|
|
});
|
|
|
|
// Consider load that is already scheduled.
|
|
co_await consider_scheduled_load(nodes);
|
|
|
|
// Consider load that is about to be scheduled.
|
|
co_await consider_planned_load(nodes, mplan);
|
|
|
|
std::unordered_set<global_tablet_id> colocation_tablet_ids;
|
|
for (auto& [dc_rack, colocation_sources] : colocation_state.dst_dc_rack_to_tablets) {
|
|
auto nodes_by_load_dst = nodes | std::views::filter([&] (const auto& host_load) {
|
|
auto& [host, load] = host_load;
|
|
auto& node = *load.node;
|
|
return node.dc_rack() == dc_rack;
|
|
}) | std::views::keys | std::ranges::to<std::vector<host_id>>();
|
|
|
|
if (nodes_by_load_dst.empty()) {
|
|
lblogger.warn("No target nodes available for RF change colocation plan in dc {}, rack {}", dc_rack.dc, dc_rack.rack);
|
|
if (auto it = colocation_state.dst_to_requests.find(dc_rack); it != colocation_state.dst_to_requests.end()) {
|
|
rack_list_plan.maybe_add_request_to_resume(*it->second.begin());
|
|
}
|
|
continue;
|
|
}
|
|
|
|
auto nodes_cmp = nodes_by_load_cmp(nodes);
|
|
auto nodes_dst_cmp = [&] (const host_id& a, const host_id& b) {
|
|
return nodes_cmp(b, a);
|
|
};
|
|
|
|
// Ascending load heap of candidate target nodes.
|
|
std::make_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
|
|
|
const tablet_metadata& tmeta = _tm->tablets();
|
|
for (colocation_source& source : colocation_sources) {
|
|
if (colocation_tablet_ids.contains(source.gid)) {
|
|
lblogger.debug("Skipped colocation of replica {} of tablet={}, another replica of which is about to be colocated", source.replica, source.gid);
|
|
continue;
|
|
}
|
|
|
|
// Pick the least loaded node as target.
|
|
std::pop_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
|
auto target = nodes_by_load_dst.back();
|
|
auto& target_info = nodes[target];
|
|
auto push_back_target_node = seastar::defer([&] {
|
|
std::push_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
|
});
|
|
|
|
lblogger.debug("target node: {}, avg_load={}", target, target_info.avg_load);
|
|
|
|
auto dst = global_shard_id {target, _load_sketch->get_least_loaded_shard(target)};
|
|
|
|
lblogger.trace("target shard: {}, tablets={}, load={}", dst.shard,
|
|
target_info.shards[dst.shard].tablet_count,
|
|
target_info.shard_load(dst.shard, _target_tablet_size));
|
|
|
|
tablet_transition_kind kind = tablet_transition_kind::migration;
|
|
migration_tablet_set source_tablets {
|
|
.tablet_s = source.gid, // Ignore the merge co-location.
|
|
};
|
|
auto src = source.replica;
|
|
auto mig = get_migration_info(source_tablets, kind, src, dst);
|
|
auto& tmap = tmeta.get_tablet_map(source_tablets.table());
|
|
auto mig_streaming_info = get_migration_streaming_infos(topo, tmap, mig);
|
|
pick(*_load_sketch, dst.host, dst.shard, source_tablets);
|
|
if (can_accept_load(nodes, mig_streaming_info)) {
|
|
apply_load(nodes, mig_streaming_info);
|
|
lblogger.debug("Adding migration: {}", mig);
|
|
mark_as_scheduled(mig);
|
|
for (auto& m : mig) {
|
|
plan.add(std::move(m));
|
|
colocation_tablet_ids.insert(m.tablet);
|
|
}
|
|
}
|
|
update_node_load_on_migration(nodes, src, dst, source_tablets);
|
|
}
|
|
}
|
|
if (colocation_state.request_to_resume) {
|
|
rack_list_plan.maybe_add_request_to_resume(colocation_state.request_to_resume);
|
|
}
|
|
plan.set_rack_list_colocation_plan(std::move(rack_list_plan));
|
|
co_return std::move(plan);
|
|
}
|
|
|
|
// Returns true if a table has replicas of all its sibling tablets co-located.
|
|
// This is used for determining whether merge can be finalized, since co-location
|
|
// is a strict requirement for sibling tablets to be merged.
|
|
future<bool> all_sibling_tablet_replicas_colocated(table_id table, const tablet_map& tmap) {
|
|
bool all_colocated = true;
|
|
co_await tmap.for_each_sibling_tablets([&] (tablet_desc t1, std::optional<tablet_desc> t2_opt) -> future<> {
|
|
// FIXME: introduce variant of for_each_sibling_tablets() that accepts stop_iteration.
|
|
if (!all_colocated) {
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
if (!t2_opt) {
|
|
on_internal_error(lblogger, format("Unable to find sibling tablet during co-location check for table {}", table));
|
|
}
|
|
auto t2 = *t2_opt;
|
|
|
|
// Sibling tablets cannot be considered co-located if their tablet info is temporarily unmergeable.
|
|
// It can happen either has active repair task for example.
|
|
all_colocated &= bool(merge_tablet_info(*t1.info, *t2.info));
|
|
return make_ready_future<>();
|
|
});
|
|
if (all_colocated) {
|
|
lblogger.info("All sibling tablets are co-located for table {}", table);
|
|
}
|
|
co_return all_colocated;
|
|
}
|
|
|
|
future<migration_plan> make_merge_colocation_plan(node_load_map& nodes) {
|
|
migration_plan plan;
|
|
table_resize_plan resize_plan;
|
|
|
|
for (auto&& [table, tables] : _tm->tablets().all_table_groups()) {
|
|
const auto& tmap = _tm->tablets().get_tablet_map(table);
|
|
if (!tmap.needs_merge()) {
|
|
continue;
|
|
}
|
|
|
|
// Also filter out replicas that don't belong to the DC being worked on.
|
|
auto get_replicas = [this, &nodes] (const tablet_desc& t) {
|
|
auto ret = sorted_replicas_for_tablet_load(*t.info, t.transition);
|
|
const auto [first, last] = std::ranges::remove_if(ret, [&] (tablet_replica r) { return !nodes.contains(r.host); });
|
|
ret.erase(first, last);
|
|
return ret;
|
|
};
|
|
|
|
auto migrating = [this, table] (const tablet_desc& t) {
|
|
return bool(t.transition) || _scheduled_tablets.contains(global_tablet_id{table, t.tid});
|
|
};
|
|
auto rack_of = [&topo = _tm->get_topology()] (tablet_replica tr) -> const sstring& {
|
|
return topo.get_rack(tr.host);
|
|
};
|
|
auto cross_rack_migration = [&] (tablet_replica src, tablet_replica dst) {
|
|
return rack_of(src) != rack_of(dst);
|
|
};
|
|
|
|
auto first_non_matching_replicas = [&] (tablet_replica_set r1, tablet_replica_set r2) -> std::optional<std::pair<tablet_replica, tablet_replica>> {
|
|
assert(r1.size() == r2.size());
|
|
// Subtract intersecting (co-located) elements from the replicas set of sibling tablets.
|
|
// Think for example that tablet 0 and 1 have replicas [n2, n4] and [n1, n2] respectively.
|
|
// After subtraction, replica of tablet 1 in n1 will be a candidate for co-location with
|
|
// replica of tablet 0 in n4.
|
|
std::unordered_set<tablet_replica> intersection;
|
|
std::ranges::set_intersection(r1, r2, std::inserter(intersection, intersection.begin()));
|
|
const auto [r1_first, r1_last] = std::ranges::remove_if(r1, [&] (tablet_replica r) { return intersection.contains(r); });
|
|
r1.erase(r1_first, r1_last);
|
|
const auto [r2_first, r2_last] = std::ranges::remove_if(r2, [&] (tablet_replica r) { return intersection.contains(r); });
|
|
r2.erase(r2_first, r2_last);
|
|
|
|
// Favor replicas of different tablets that belong to same node. For example:
|
|
// tablet 0 replicas: [n2:s1, n3:s0]
|
|
// tablet 1 replicas: [n1:s0, n2:s0]
|
|
// Replica in n1:s0 cannot follow sibling replica in n2:s1. Otherwise, RF invariant is broken.
|
|
// Instead, tablet 1 in n2:s0 will be co-located with tablet 0 in n2:s1.
|
|
std::unordered_map<host_id, tablet_replica> r1_map;
|
|
std::ranges::transform(r1, std::inserter(r1_map, r1_map.begin()), [] (tablet_replica r) {
|
|
return std::make_pair(r.host, r);
|
|
});
|
|
for (unsigned i = 0; i < r2.size(); i++) {
|
|
auto r1_it = r1_map.find(r2[i].host);
|
|
if (r1_it != r1_map.end()) {
|
|
return std::make_pair(r1_it->second, r2[i]);
|
|
}
|
|
}
|
|
|
|
// Favor replicas which belong to the same rack. For example:
|
|
//
|
|
// tablet 0: [n1:rack3, n2:rack1]
|
|
// tablet 1: [n3:rack2, n4:rack3]
|
|
//
|
|
// We want to move tablet1's n4:rack3 to n1:rack3 first (within rack3), for the following reasons:
|
|
// 1) Minimize cross-rack migrations (they have higher cost)
|
|
// In particular, this ensures that when RF=#racks, there will be no across-rack migrations.
|
|
// 2) Minimize breaking of pairing: view replica is determined by rack, cross-rack migration breaks it
|
|
// In particular, this ensures that when RF=#racks, no pairing will be broken.
|
|
// 3) Avoid overloading racks temporarily, which is an availability risk in case the rack goes down.
|
|
// Otherwise, n3:rack2 would be migrated to n1:rack3, and tablet 1 would have two replicas in rack3.
|
|
//
|
|
std::unordered_map<sstring, tablet_replica> r1_rack_map;
|
|
for (auto&& r : r1) {
|
|
auto&& rack = rack_of(r);
|
|
auto i = r1_rack_map.find(rack);
|
|
if (i == r1_rack_map.end()) {
|
|
r1_rack_map[rack] = r;
|
|
}
|
|
}
|
|
for (auto&& r : r2) {
|
|
auto&& rack = rack_of(r);
|
|
auto i = r1_rack_map.find(rack);
|
|
if (i != r1_rack_map.end()) {
|
|
return std::make_pair(i->second, r);
|
|
}
|
|
}
|
|
|
|
// r1 and r2 don't share replicas, hosts, or racks.
|
|
if (r1.size() > 0) {
|
|
return std::make_pair(r1[0], r2[0]);
|
|
}
|
|
|
|
return std::nullopt;
|
|
};
|
|
|
|
auto create_migration_info = [] (global_tablet_id gid, tablet_replica src, tablet_replica dst) {
|
|
auto kind = (src.host != dst.host) ? tablet_transition_kind::migration : tablet_transition_kind::intranode_migration;
|
|
return tablet_migration_info{kind, gid, src, dst};
|
|
};
|
|
|
|
co_await tmap.for_each_sibling_tablets([&] (tablet_desc t1, std::optional<tablet_desc> t2_opt) -> future<> {
|
|
// Be optimistic about migrating tablets, as if they succeeded.
|
|
// Merge finalization will have to recheck that all sibling tablets are co-located.
|
|
|
|
if (!t2_opt) {
|
|
on_internal_error(lblogger, format("Unable to find sibling tablet during co-location, with tablet count {}, for table {}",
|
|
tmap.tablet_count(), table));
|
|
}
|
|
auto t2 = *t2_opt;
|
|
|
|
auto r1 = get_replicas(t1);
|
|
auto r2 = get_replicas(t2);
|
|
if (r1 == r2) {
|
|
return make_ready_future<>();
|
|
}
|
|
auto t1_id = global_tablet_id{table, t1.tid};
|
|
auto t2_id = global_tablet_id{table, t2.tid};
|
|
|
|
if (migrating(t1) || migrating(t2)) {
|
|
return make_ready_future<>();
|
|
}
|
|
// During RF change, tablets may have incrementally replicas allocated / deallocated to them.
|
|
// Let's temporarily delay their co-location until their replica sets have the same size.
|
|
if (r1.size() != r2.size()) {
|
|
lblogger.warn("Replica sets of tablets to be co-located differ in size: ({}: {}), ({}, {})",
|
|
t1_id, r1, t2_id, r2);
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
// Returns true if moving candidate into dst will violate replication constraint.
|
|
const auto r2_hosts = r2
|
|
| std::views::transform(std::mem_fn(&locator::tablet_replica::host))
|
|
| std::ranges::to<std::unordered_set<host_id>>();
|
|
auto check_constraints = [r2_hosts = std::move(r2_hosts)] (tablet_replica src, tablet_replica dst) {
|
|
// handles intra-node migration.
|
|
if (src.host == dst.host && src.shard != dst.shard) {
|
|
return false;
|
|
}
|
|
return r2_hosts.contains(dst.host);
|
|
};
|
|
|
|
lblogger.debug("Replica sets of tablets being co-located: ({}: {}), ({}, {})", t1_id, r1, t2_id, r2);
|
|
|
|
auto ret = first_non_matching_replicas(r1, r2);
|
|
if (!ret) {
|
|
// this shouldn't happen in practice, since the above call should always produce a pair of
|
|
// replicas to co-locate, since we only got here if the sibling tablets aren't fully co-located.
|
|
on_internal_error(lblogger, format("Unable to find replicas to co-locate for sibling tablets ({}: {}), and ({}, {})",
|
|
t1_id, r1, t2_id, r2));
|
|
}
|
|
|
|
// Emits migration for replica of t2 to co-habit same shard as replica of t1.
|
|
auto src = ret->second;
|
|
auto dst = ret->first;
|
|
|
|
if (cross_rack_migration(src, dst)) {
|
|
// FIXME: This is illegal if table has views, as it breaks base-view pairing.
|
|
// Can happen when RF!=#racks.
|
|
_current_stats->cross_rack_collocations++;
|
|
lblogger.debug("Cross-rack co-location migration for {}@{} (rack: {}) to co-habit {}@{} (rack: {})",
|
|
t2_id, src, rack_of(src), t1_id, dst, rack_of(dst));
|
|
utils::get_local_injector().inject("forbid_cross_rack_migration_attempt", [&] {
|
|
on_fatal_internal_error(lblogger, "Cross rack colocation is not allowed, killing the node");
|
|
});
|
|
}
|
|
|
|
// Node which is draining is either being decommissioned or removed.
|
|
// If involved node is excluded, co-locating migration will surely fail, so it's pointless.
|
|
// We should wait until the node is removed.
|
|
// Also, it can fail the removenode request, as failure of this migration is interpreted as
|
|
// draining failure.
|
|
// In case of decommission, draining is more important than co-location, so postponing is good.
|
|
if (nodes.at(dst.host).drained || nodes.at(src.host).drained) {
|
|
lblogger.debug("Co-locating migration ({}, {}) -> ({}, {}) involves draining nodes, postponing",
|
|
t2_id, src, t1_id, dst);
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
// If migration will violate replication constraint, skip to next pair of replicas of sibling tablets.
|
|
auto skip = check_constraints(src, dst);
|
|
if (skip) {
|
|
lblogger.debug("Replication constraint check failed, unable to emit migration for replica ({}, {}) to co-habit the replica ({}, {})",
|
|
t2_id, src, t1_id, dst);
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
auto mig = create_migration_info(t2_id, src, dst);
|
|
auto mig_streaming_info = get_migration_streaming_info(_tm->get_topology(), *t2.info, mig);
|
|
if (!can_accept_load(nodes, mig_streaming_info)) {
|
|
// FIXME: we can try another pair of non-colocated replicas of same sibling tablets.
|
|
lblogger.debug("Load limit reached, unable to emit migration for replica ({}, {}) to co-habit the replica ({}, {})",
|
|
t2_id, src, t1_id, dst);
|
|
return make_ready_future<>();
|
|
}
|
|
apply_load(nodes, mig_streaming_info);
|
|
|
|
lblogger.info("Created migration for replica ({}, {}) to co-habit same shard as ({}, {})", t2_id, src, t1_id, dst);
|
|
mark_as_scheduled(mig);
|
|
plan.add(std::move(mig));
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
plan.merge_resize_plan(std::move(resize_plan));
|
|
|
|
co_return std::move(plan);
|
|
}
|
|
|
|
std::tuple<schema_ptr, const tablet_aware_replication_strategy*> get_schema_and_rs(table_id table) {
|
|
auto t = _db.get_tables_metadata().get_table_if_exists(table);
|
|
if (!t) {
|
|
on_internal_error(lblogger, format("Table {} does not exist", table));
|
|
}
|
|
|
|
auto s = t->schema();
|
|
auto erm = t->get_effective_replication_map();
|
|
auto rs = erm->get_replication_strategy().maybe_as_tablet_aware();
|
|
if (!rs) {
|
|
auto msg = format("Table {}.{} has no tablet_aware_replication_strategy: uses_tablets={}",
|
|
s->ks_name(), s->cf_name(), erm->get_replication_strategy().uses_tablets());
|
|
on_internal_error(lblogger, msg);
|
|
}
|
|
|
|
return {s, rs};
|
|
}
|
|
|
|
const tablet_aware_replication_strategy* get_rs(table_id id) {
|
|
auto [s, rs] = get_schema_and_rs(id);
|
|
return rs;
|
|
}
|
|
|
|
struct table_sizing {
|
|
size_t current_tablet_count; // Tablet count in group0.
|
|
size_t target_tablet_count; // Tablet count wanted by scheduler.
|
|
sstring target_tablet_count_reason; // Winning rule for target_tablet_count value.
|
|
std::optional<uint64_t> avg_tablet_size; // nullopt when stats not yet available.
|
|
|
|
size_t target_tablet_count_aligned; // target_tablet_count aligned to power of 2.
|
|
resize_decision::way_type resize_decision; // Decision which should be emitted to achieve target_tablet_count_aligned.
|
|
};
|
|
|
|
struct sizing_plan {
|
|
std::unordered_map<table_id, table_sizing> tables;
|
|
};
|
|
|
|
struct tablet_count_and_reason {
|
|
size_t tablet_count = 0;
|
|
sstring reason;
|
|
};
|
|
|
|
tablet_count_and_reason tablet_count_from_min_per_shard_tablet_count(const schema& s,
|
|
const std::unordered_map<sstring, unsigned>& shards_per_dc,
|
|
const std::unordered_map<endpoint_dc_rack, unsigned>& shards_per_rack,
|
|
const tablet_aware_replication_strategy& rs,
|
|
double min_per_shard_tablet_count)
|
|
{
|
|
// Try to use as many tablets so that all shards in the current topology
|
|
// are covered with at least `min_per_shard_tablet_count` tablets on average.
|
|
|
|
size_t tablet_count = 0;
|
|
const sstring* winning_dc = nullptr;
|
|
sstring winning_rack;
|
|
|
|
for (auto&& [dc, shards_in_dc] : shards_per_dc) {
|
|
auto rf_in_dc = rs.get_replication_factor_data(dc);
|
|
if (!rf_in_dc) {
|
|
continue;
|
|
}
|
|
if (rf_in_dc->is_numeric()) {
|
|
size_t tablets_in_dc = std::ceil((double) (min_per_shard_tablet_count * shards_in_dc) / rf_in_dc->count());
|
|
lblogger.debug("Estimated {} tablets due to min_per_shard_tablet_count={:.3f} for table={}.{} in DC {} ({} shards)",
|
|
tablets_in_dc, min_per_shard_tablet_count, s.ks_name(), s.cf_name(), dc, shards_in_dc);
|
|
if (tablets_in_dc > tablet_count) {
|
|
tablet_count = tablets_in_dc;
|
|
winning_dc = &dc;
|
|
winning_rack = sstring();
|
|
}
|
|
} else {
|
|
for (auto rack : rf_in_dc->get_rack_list()) {
|
|
size_t shards = 0;
|
|
auto dc_rack = endpoint_dc_rack{dc, rack};
|
|
if (!shards_per_rack.contains(dc_rack)) {
|
|
lblogger.warn("No shards for rack {}, but table {}.{} replicates there", rack, s.ks_name(), s.cf_name());
|
|
} else {
|
|
shards = shards_per_rack.at(dc_rack);
|
|
}
|
|
size_t tablets_in_rack = std::ceil(min_per_shard_tablet_count * shards);
|
|
lblogger.debug("Estimated {} tablets due to min_per_shard_tablet_count={:.3f} for table={}.{} in rack {} ({} shards) in DC {}",
|
|
tablets_in_rack, min_per_shard_tablet_count, s.ks_name(), s.cf_name(), rack, shards, dc);
|
|
if (tablets_in_rack > tablet_count) {
|
|
tablet_count = tablets_in_rack;
|
|
winning_dc = &dc;
|
|
winning_rack = rack;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!winning_dc) {
|
|
return {};
|
|
}
|
|
|
|
if (!winning_rack.empty()) {
|
|
return {tablet_count, format("min_per_shard_tablet_count={:.3f} in DC {} rack {}", min_per_shard_tablet_count, *winning_dc, winning_rack)};
|
|
}
|
|
|
|
return {tablet_count, format("min_per_shard_tablet_count={:.3f} in DC {}", min_per_shard_tablet_count, *winning_dc)};
|
|
}
|
|
|
|
future<sizing_plan> make_sizing_plan(schema_ptr new_table = nullptr, const tablet_aware_replication_strategy* new_rs = nullptr) {
|
|
std::unordered_map<table_id, const tablet_aware_replication_strategy*> rs_by_table;
|
|
sizing_plan plan;
|
|
|
|
std::unordered_map<sstring, unsigned> shards_per_dc;
|
|
std::unordered_map<endpoint_dc_rack, unsigned> shards_per_rack;
|
|
std::unordered_map<sstring, std::unordered_set<sstring>> racks_per_dc;
|
|
_tm->for_each_token_owner([&] (const node& n) {
|
|
if (n.is_normal() && !n.is_draining()) {
|
|
shards_per_dc[n.dc_rack().dc] += n.get_shard_count();
|
|
shards_per_rack[n.dc_rack()] += n.get_shard_count();
|
|
racks_per_dc[n.dc_rack().dc].insert(n.dc_rack().rack);
|
|
}
|
|
});
|
|
|
|
auto process_table = [&] (table_id table, const locator::table_group_set& tables, schema_ptr s, db::tablet_options tablet_options, const tablet_aware_replication_strategy* rs, size_t tablet_count) {
|
|
table_sizing& table_plan = plan.tables[table];
|
|
table_plan.current_tablet_count = tablet_count;
|
|
rs_by_table[table] = rs;
|
|
|
|
// for a group of co-located tablets of size g with average tablet size t, the migration unit
|
|
// size is g*t. in order to keep the migration unit size reasonable, we set a lower target tablet size
|
|
// as the group size increases.
|
|
auto target_tablet_size = _target_tablet_size / tables.size();
|
|
|
|
tablet_count_and_reason target_tablet_count = {1, ""};
|
|
auto maybe_apply = [&] (tablet_count_and_reason candidate, bool force = false) {
|
|
lblogger.debug("Table {} ({}.{}) wants {} tablets due to {}", table, s->ks_name(), s->cf_name(),
|
|
candidate.tablet_count, candidate.reason);
|
|
if (candidate.tablet_count > target_tablet_count.tablet_count || force) {
|
|
target_tablet_count = candidate;
|
|
}
|
|
};
|
|
|
|
maybe_apply({rs->get_initial_tablets(), "initial"});
|
|
|
|
if (tablet_options.min_tablet_count) {
|
|
maybe_apply({tablet_options.min_tablet_count.value(), "min_tablet_count"});
|
|
}
|
|
|
|
if (tablet_options.expected_data_size_in_gb) {
|
|
maybe_apply({(tablet_options.expected_data_size_in_gb.value() << 30) / target_tablet_size,
|
|
format("expected_data_size_in_gb={}", tablet_options.expected_data_size_in_gb.value())});
|
|
}
|
|
|
|
auto min_per_shard_tablet_count = tablet_options.min_per_shard_tablet_count.value_or(
|
|
// If min_tablet_count is set, initial_scale should not be effective for
|
|
// compatibility with the deprecated "initial" tablet count.
|
|
(rs->get_initial_tablets() || tablet_options.min_tablet_count) ? 0 : _initial_scale);
|
|
if (min_per_shard_tablet_count) {
|
|
maybe_apply(tablet_count_from_min_per_shard_tablet_count(*s, shards_per_dc, shards_per_rack, *rs, min_per_shard_tablet_count));
|
|
}
|
|
|
|
auto total_size_opt = std::invoke([&] -> std::optional<size_t> {
|
|
size_t total_size = 0;
|
|
for (auto table : tables) {
|
|
const auto* table_stats = load_stats_for_table(table);
|
|
if (!table_stats) {
|
|
return std::nullopt;
|
|
}
|
|
total_size += table_stats->size_in_bytes;
|
|
}
|
|
return total_size;
|
|
});
|
|
|
|
if (total_size_opt) {
|
|
auto total_size = *total_size_opt;
|
|
|
|
auto cur_decision = _tm->tablets().get_tablet_map(table).resize_decision();
|
|
auto avg_tablet_size = total_size / std::max<size_t>(table_plan.current_tablet_count * tables.size(), 1);
|
|
auto tablet_count_from_size = table_plan.current_tablet_count;
|
|
|
|
// Split based on avg_tablet_size, or if the current resize_decision is split, apply hysteresis,
|
|
// so it would get cancelled only when crossing back the half-way point.
|
|
if (avg_tablet_size > target_max_tablet_size(target_tablet_size) ||
|
|
(cur_decision.is_split() && avg_tablet_size >= target_tablet_size)) {
|
|
// TODO: extend to n-way split when needed
|
|
tablet_count_from_size *= 2;
|
|
} else {
|
|
// Consider merge. If the current resize_decision is merge, apply hysteresis,
|
|
// so it would get cancelled only when crossing back the half-way point.
|
|
if (avg_tablet_size < target_min_tablet_size(target_tablet_size) ||
|
|
(cur_decision.is_merge() && avg_tablet_size <= target_tablet_size)) {
|
|
tablet_count_from_size /= 2;
|
|
}
|
|
}
|
|
|
|
table_plan.avg_tablet_size = avg_tablet_size;
|
|
maybe_apply({tablet_count_from_size, format("avg_tablet_size={}", avg_tablet_size)});
|
|
} else {
|
|
// When we don't have tablet size info, allow tablet count to increase but not to decrease.
|
|
// Increasing will always bring us closer to the true target count, since tablet_count_from_size
|
|
// can only increase the count above it, but decreasing may go against the true target count
|
|
// if tablet_count_from_size would demand more tablets.
|
|
maybe_apply({table_plan.current_tablet_count, "current count"});
|
|
}
|
|
|
|
// Apply max_tablet_count cap after all other factors have been considered.
|
|
if (tablet_options.max_tablet_count) {
|
|
if (target_tablet_count.tablet_count > static_cast<size_t>(*tablet_options.max_tablet_count)) {
|
|
maybe_apply({static_cast<size_t>(*tablet_options.max_tablet_count), "max_tablet_count"}, true);
|
|
}
|
|
}
|
|
|
|
if (utils::get_local_injector().enter("tablet_force_tablet_count_increase")) {
|
|
target_tablet_count = {tablet_count * 2, "force_tablet_count_increase"};
|
|
} else if (utils::get_local_injector().enter("tablet_force_tablet_count_decrease")) {
|
|
auto size = std::max(size_t(1), tablet_count / 2);
|
|
target_tablet_count = {size, "force_tablet_count_decrease"};
|
|
}
|
|
|
|
table_plan.target_tablet_count = target_tablet_count.tablet_count;
|
|
table_plan.target_tablet_count_reason = target_tablet_count.reason;
|
|
|
|
lblogger.debug("Table {} ({}.{}) target_tablet_count: {} ({})", table, s->ks_name(), s->cf_name(),
|
|
table_plan.target_tablet_count, table_plan.target_tablet_count_reason);
|
|
};
|
|
|
|
for (const auto& [table, tables] : _tm->tablets().all_table_groups()) {
|
|
const auto& tmap = _tm->tablets().get_tablet_map(table);
|
|
auto [s, rs] = get_schema_and_rs(table);
|
|
|
|
auto tablet_options = combine_tablet_options(
|
|
tables | std::views::transform([&] (table_id table) { return _db.get_tables_metadata().get_table_if_exists(table); })
|
|
| std::views::filter([] (auto t) { return t != nullptr; })
|
|
| std::views::transform([] (auto t) { return t->schema()->tablet_options(); })
|
|
);
|
|
|
|
process_table(table, tables, s, tablet_options, rs, tmap.tablet_count());
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
if (new_table) {
|
|
process_table(new_table->id(), {new_table->id()}, new_table, new_table->tablet_options(), new_rs, 0);
|
|
}
|
|
|
|
// Below section ensures we respect the _tablets_per_shard_goal.
|
|
//
|
|
// It will scale down target_tablet_count for all tables so that
|
|
// the average number of tablets per shard in each DC or rack does not exceed _tablets_per_shard_goal.
|
|
//
|
|
// The impact of table's tablet count on average per-shard tablet replica count
|
|
// is different in each rack because replication factors are different in each DC/rack.
|
|
// Numerical RF impacts all racks in a DC. Rack-list RF impacts particular racks.
|
|
//
|
|
// The algorithm works like this:
|
|
// Compute average tablet replica count per-shard in each rack,
|
|
// determine if per-shard goal is exceeded in that rack,
|
|
// compute scale factor by which tablet count should be multiplied so that the goal
|
|
// is not exceeded in that rack.
|
|
// Take the smallest scale factor among all racks, which ensures that no rack is overloaded.
|
|
//
|
|
// We align tablet counts to the nearest power of 2 post-scaling, which
|
|
// means that scaling may not be effective and in the worst case we may overshoot the goal by
|
|
// a factor of 2. This is acceptable since the goal is a soft limit and not a hard constraint.
|
|
// Scaling post-alignment would be problematic. If we scale down all tables fairly, we undershoot the goal
|
|
// by a factor of 2 in the worst case. If we choose a subset of tables to scale down by a factor of 2 then
|
|
// we have a problem of making sure that the choice is stable across scheduler invocations to avoid
|
|
// oscillations of decisions.
|
|
|
|
struct scale_info {
|
|
double factor;
|
|
endpoint_dc_rack source;
|
|
};
|
|
std::unordered_map<table_id, scale_info> table_scaling;
|
|
|
|
for (auto&& [rack, shard_count] : shards_per_rack) {
|
|
double cur_avg_tablets_per_shard = 0;
|
|
double new_avg_tablets_per_shard = 0;
|
|
|
|
for (auto&& [table, table_plan] : plan.tables) {
|
|
auto* rs = rs_by_table[table];
|
|
auto rf = rs->get_replication_factor_data(rack.dc);
|
|
|
|
auto get_avg_tablets_per_shard = [&] (size_t tablet_count) -> double {
|
|
if (!rf) {
|
|
return 0;
|
|
}
|
|
if (rf->is_numeric()) {
|
|
auto racks_in_dc = racks_per_dc.at(rack.dc).size();
|
|
return double(tablet_count) * rf->count() / shard_count / racks_in_dc;
|
|
}
|
|
if (std::ranges::contains(rf->get_rack_list(), rack.rack)) {
|
|
return double(tablet_count) / shard_count;
|
|
}
|
|
return 0;
|
|
};
|
|
|
|
auto cur_tablets_per_shard = get_avg_tablets_per_shard(table_plan.current_tablet_count);
|
|
cur_avg_tablets_per_shard += cur_tablets_per_shard;
|
|
lblogger.debug("cur_avg_tablets_per_shard [dc={}, rack={}, table={}]: {:.3f}", rack.dc, rack.rack, table, cur_tablets_per_shard);
|
|
|
|
auto new_tablets_per_shard = get_avg_tablets_per_shard(table_plan.target_tablet_count);
|
|
new_avg_tablets_per_shard += new_tablets_per_shard;
|
|
lblogger.debug("new_avg_tablets_per_shard [dc={}, rack={}, table={}]: {:.3f}", rack.dc, rack.rack, table, new_tablets_per_shard);
|
|
}
|
|
|
|
{
|
|
bool overloaded = cur_avg_tablets_per_shard > _tablets_per_shard_goal;
|
|
lblogger.debug("cur_avg_tablets_per_shard[dc={},rack={}]: {:.3f}{}", rack.dc, rack.rack, cur_avg_tablets_per_shard,
|
|
overloaded ? " (overloaded!)" : "");
|
|
}
|
|
|
|
bool overloaded = new_avg_tablets_per_shard > _tablets_per_shard_goal;
|
|
lblogger.debug("new_avg_tablets_per_shard[dc={},rack={}]: {:.3f}{}", rack.dc, rack.rack, new_avg_tablets_per_shard,
|
|
overloaded ? " (overloaded!)" : "");
|
|
|
|
if (overloaded) {
|
|
auto scale = scale_info{_tablets_per_shard_goal / new_avg_tablets_per_shard, rack};
|
|
|
|
for (auto&& [table, table_plan]: plan.tables) {
|
|
auto* rs = rs_by_table[table];
|
|
auto rf = rs->get_replication_factor_data(rack.dc);
|
|
|
|
// If table has no replicas in this rack, scaling it won't help and is harmful to its distribution
|
|
// in other DCs or racks.
|
|
if (rf && (rf->is_numeric() || std::ranges::contains(rf->get_rack_list(), rack.rack))) {
|
|
auto [i, inserted] = table_scaling.try_emplace(table, scale);
|
|
if (!inserted) {
|
|
if (scale.factor < i->second.factor) {
|
|
i->second = std::move(scale);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for (auto&& [table, scale] : table_scaling) {
|
|
auto& table_plan = plan.tables[table];
|
|
auto new_count = std::max<size_t>(1, table_plan.target_tablet_count * scale.factor);
|
|
lblogger.debug("Scaling down table {} by a factor of {:.3f} due to {}.{}: {} => {}", table, scale.factor,
|
|
scale.source.dc, scale.source.rack, table_plan.target_tablet_count, new_count);
|
|
table_plan.target_tablet_count = new_count;
|
|
table_plan.target_tablet_count_reason = format("{} scaled by {:.3f} due to {}.{}", table_plan.target_tablet_count_reason,
|
|
scale.factor, scale.source.dc, scale.source.rack);
|
|
}
|
|
|
|
// Generate:
|
|
// table_plan.target_tablet_count_aligned
|
|
// table_plan.resize_decision
|
|
|
|
for (auto&& [table, table_plan] : plan.tables) {
|
|
table_plan.target_tablet_count_aligned = 1u << log2ceil(table_plan.target_tablet_count);
|
|
|
|
if (table_plan.target_tablet_count_aligned > table_plan.current_tablet_count) {
|
|
table_plan.resize_decision = locator::resize_decision::split();
|
|
} else if (table_plan.target_tablet_count_aligned < table_plan.current_tablet_count) {
|
|
table_plan.resize_decision = locator::resize_decision::merge();
|
|
}
|
|
|
|
lblogger.debug("Table {}, {} => {} ({}: {}), resize: {}", table,
|
|
table_plan.current_tablet_count,
|
|
table_plan.target_tablet_count_aligned,
|
|
table_plan.target_tablet_count,
|
|
table_plan.target_tablet_count_reason,
|
|
table_plan.resize_decision);
|
|
}
|
|
|
|
co_return std::move(plan);
|
|
}
|
|
|
|
future<table_resize_plan> make_resize_plan(const migration_plan& plan) {
|
|
table_resize_plan resize_plan;
|
|
|
|
if (!_tm->tablets().balancing_enabled()) {
|
|
co_return std::move(resize_plan);
|
|
}
|
|
|
|
auto table_sizing_plan = co_await make_sizing_plan();
|
|
|
|
cluster_resize_load resize_load;
|
|
|
|
for (auto&& [table, table_plan] : table_sizing_plan.tables) {
|
|
auto& tmap = _tm->tablets().get_tablet_map(table);
|
|
|
|
if (!table_plan.avg_tablet_size) {
|
|
continue;
|
|
}
|
|
|
|
// shard presence of a table across the cluster
|
|
size_t shard_count = std::accumulate(tmap.tablets().begin(), tmap.tablets().end(), size_t(0),
|
|
[] (size_t shard_count, const locator::tablet_info& info) {
|
|
return shard_count + info.replicas.size();
|
|
});
|
|
|
|
resize_decision new_resize_decision;
|
|
new_resize_decision.way = table_plan.resize_decision;
|
|
|
|
table_size_desc size_desc {
|
|
.avg_tablet_size = *table_plan.avg_tablet_size,
|
|
.resize_decision = tmap.resize_decision(),
|
|
.new_resize_decision = new_resize_decision,
|
|
.tablet_count = table_plan.current_tablet_count,
|
|
.shard_count = shard_count,
|
|
.reason = table_plan.target_tablet_count_reason,
|
|
};
|
|
|
|
resize_load.update(table, std::move(size_desc));
|
|
lblogger.debug("Table {} with tablet_count={} has an average tablet size of {}", table, tmap.tablet_count(),
|
|
*table_plan.avg_tablet_size);
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
// Emit new resize decisions
|
|
|
|
// The limit of resize requests is determined by the shard presence (count) of tables involved.
|
|
// If tables still have a low tablet count, the concurrency must be high in order to saturate the cluster.
|
|
// If a table covers the entire cluster, and needs split, concurrency will be reduced to 1.
|
|
|
|
size_t total_shard_count = std::invoke([this] {
|
|
size_t shard_count = 0;
|
|
_tm->for_each_token_owner([&] (const locator::node& node) {
|
|
shard_count += node.get_shard_count();
|
|
});
|
|
return shard_count;
|
|
});
|
|
size_t resizing_shard_count = std::accumulate(resize_load.tables_being_resized.begin(), resize_load.tables_being_resized.end(), size_t(0),
|
|
[] (size_t shard_count, const auto& table_desc) {
|
|
return shard_count + table_desc.second.shard_count;
|
|
});
|
|
// Limits the amount of new resize requests to be generated in a single round, as each one is a mutation to group0.
|
|
constexpr size_t max_new_resize_requests = 10;
|
|
|
|
auto available_shards = std::max(ssize_t(total_shard_count) - ssize_t(resizing_shard_count), ssize_t(0));
|
|
|
|
std::make_heap(resize_load.tables_need_resize.begin(), resize_load.tables_need_resize.end(), resize_load.resize_urgency_cmp());
|
|
while (resize_load.tables_need_resize.size() && resize_plan.size() < max_new_resize_requests) {
|
|
const auto& [table, size_desc] = resize_load.tables_need_resize.front();
|
|
|
|
if (resize_plan.size() > 0 && std::cmp_less(available_shards, size_desc.shard_count)) {
|
|
break;
|
|
}
|
|
|
|
auto resize_decision = cluster_resize_load::to_resize_decision(size_desc);
|
|
lblogger.info("Emitting resize decision of type {} for table {}, avg_tablet_size={} reason={}",
|
|
resize_decision.type_name(), table, size_desc.avg_tablet_size, size_desc.reason);
|
|
resize_plan.resize[table] = std::move(resize_decision);
|
|
_stats.for_cluster().resizes_emitted++;
|
|
|
|
std::pop_heap(resize_load.tables_need_resize.begin(), resize_load.tables_need_resize.end(), resize_load.resize_urgency_cmp());
|
|
resize_load.tables_need_resize.pop_back();
|
|
|
|
available_shards -= size_desc.shard_count;
|
|
}
|
|
|
|
// Revoke resize decision if any table no longer needs it
|
|
// Also communicate coordinator if any table is ready for finalizing resizing
|
|
|
|
for (const auto& [table, size_desc] : resize_load.tables_being_resized) {
|
|
if (resize_load.table_needs_resize_cancellation(size_desc)) {
|
|
resize_plan.resize[table] = cluster_resize_load::revoke_resize_decision();
|
|
_stats.for_cluster().resizes_revoked++;
|
|
lblogger.info("Revoking resize decision for table {}, avg_tablet_size={} reason={}",
|
|
table, size_desc.avg_tablet_size, size_desc.reason);
|
|
continue;
|
|
}
|
|
|
|
auto& tmap = _tm->tablets().get_tablet_map(table);
|
|
const auto& table_groups = _tm->tablets().all_table_groups();
|
|
|
|
auto finalize_decision = [&] {
|
|
if (utils::get_local_injector().enter("tablet_resize_finalization_postpone")) {
|
|
return;
|
|
}
|
|
|
|
_stats.for_cluster().resizes_finalized++;
|
|
resize_plan.finalize_resize.insert(table);
|
|
};
|
|
|
|
// If all replicas have completed split work for the current sequence number, it means that
|
|
// load balancer can emit finalize decision, for split to be completed.
|
|
if (tmap.needs_split()) {
|
|
bool all_tables_ready = std::ranges::all_of(table_groups.at(table), [&, seq_num = tmap.resize_decision().sequence_number] (table_id table) {
|
|
const auto* table_stats = load_stats_for_table(table);
|
|
return table_stats && table_stats->split_ready_seq_number == seq_num;
|
|
});
|
|
if (all_tables_ready) {
|
|
finalize_decision();
|
|
lblogger.info("Finalizing resize decision for table {} as all replicas agree on sequence number {}",
|
|
table, tmap.resize_decision().sequence_number);
|
|
}
|
|
// If all sibling tablets are co-located across all DCs, then merge can be finalized.
|
|
} else if (tmap.needs_merge() && co_await all_sibling_tablet_replicas_colocated(table, tmap) && !bypass_merge_completion()) {
|
|
finalize_decision();
|
|
lblogger.info("Finalizing resize decision for table {} as all replicas are co-located", table);
|
|
}
|
|
}
|
|
|
|
co_return std::move(resize_plan);
|
|
}
|
|
|
|
void apply_load(node_load_map& nodes, const tablet_migration_streaming_info& info) {
|
|
for (auto&& replica : info.read_from) {
|
|
if (nodes.contains(replica.host)) {
|
|
nodes[replica.host].shards[replica.shard].streaming_read_load += info.stream_weight;
|
|
}
|
|
}
|
|
for (auto&& replica : info.written_to) {
|
|
if (nodes.contains(replica.host)) {
|
|
nodes[replica.host].shards[replica.shard].streaming_write_load += info.stream_weight;
|
|
}
|
|
}
|
|
}
|
|
|
|
void apply_load(node_load_map& nodes, const migration_streaming_info_vector& infos) {
|
|
for (auto& info : infos) {
|
|
apply_load(nodes, info);
|
|
}
|
|
}
|
|
|
|
bool can_accept_load(node_load_map& nodes, const tablet_migration_streaming_info& info) {
|
|
for (auto r : info.read_from) {
|
|
if (!nodes.contains(r.host)) {
|
|
continue;
|
|
}
|
|
auto load = nodes[r.host].shards[r.shard].streaming_read_load;
|
|
if (load > 0 && load + info.stream_weight > max_read_streaming_load) {
|
|
lblogger.debug("Migration skipped because of read load limit on {} ({})", r, load);
|
|
return false;
|
|
}
|
|
}
|
|
for (auto r : info.written_to) {
|
|
if (!nodes.contains(r.host)) {
|
|
continue;
|
|
}
|
|
auto load = nodes[r.host].shards[r.shard].streaming_write_load;
|
|
if (load > 0 && load + info.stream_weight > max_write_streaming_load) {
|
|
lblogger.debug("Migration skipped because of write load limit on {} ({})", r, load);
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// Precondition: all migration streaming info have same source and destination.
|
|
// FIXME: remove precondition but it's not easy without copying noad_load_map.
|
|
bool can_accept_load(node_load_map& nodes, const migration_streaming_info_vector& infos) {
|
|
// Since all migration info have the same source and destination, the load check can be easily done
|
|
// by informing the number of migrations.
|
|
auto info = infos[0];
|
|
info.stream_weight = infos.size();
|
|
return can_accept_load(nodes, info);
|
|
}
|
|
|
|
bool in_shuffle_mode() const {
|
|
return utils::get_local_injector().enter("tablet_allocator_shuffle");
|
|
}
|
|
|
|
bool is_balanced(load_type min_load, load_type max_load) const {
|
|
if (_force_capacity_based_balancing) {
|
|
return min_load == max_load;
|
|
}
|
|
|
|
if (max_load == 0) {
|
|
return true;
|
|
}
|
|
|
|
const load_type load_delta = max_load - min_load;
|
|
return (load_delta / max_load) < _size_based_balance_threshold;
|
|
}
|
|
|
|
// If cluster cannot agree on tablet merge feature, then merge will not be finalized since
|
|
// not all nodes in the cluster can handle the finalization step.
|
|
bool bypass_merge_completion() const {
|
|
return !_db.features().tablet_merge || utils::get_local_injector().enter("tablet_merge_completion_bypass");
|
|
}
|
|
|
|
size_t rand_int() const {
|
|
static thread_local std::default_random_engine re{std::random_device{}()};
|
|
static thread_local std::uniform_int_distribution<size_t> dist;
|
|
return dist(re);
|
|
}
|
|
|
|
shard_id rand_shard(shard_id shard_count) const {
|
|
return rand_int() % shard_count;
|
|
}
|
|
|
|
table_id pick_table(const table_candidates_map& candidates) {
|
|
if (!_use_table_aware_balancing) {
|
|
on_internal_error(lblogger, "pick_table() called when table-aware balancing is disabled");
|
|
}
|
|
size_t total = 0;
|
|
for (auto&& [table, tablets] : candidates) {
|
|
total += tablets.size();
|
|
}
|
|
ssize_t candidate_index = rand_int() % total;
|
|
for (auto&& [table, tablets] : candidates) {
|
|
candidate_index -= tablets.size();
|
|
if (candidate_index <= 0 && !tablets.empty()) {
|
|
return table;
|
|
}
|
|
}
|
|
on_internal_error(lblogger, "No candidate table");
|
|
}
|
|
|
|
migration_tablet_set peek_candidate(shard_load& shard_info) {
|
|
if (_use_table_aware_balancing) {
|
|
auto table = pick_table(shard_info.candidates);
|
|
return *shard_info.candidates[table].begin();
|
|
}
|
|
|
|
return *shard_info.candidates_all_tables.begin();
|
|
}
|
|
|
|
// Evaluates impact on load balance of migrating a tablet set of a given table to dst.
|
|
migration_badness evaluate_dst_badness(node_load_map& nodes, table_id table, tablet_replica dst, uint64_t tablet_set_disk_size) {
|
|
_current_stats->candidates_evaluated++;
|
|
|
|
auto& node_info = nodes[dst.host];
|
|
|
|
// Size of all tablet replicas of the table in bytes.
|
|
uint64_t table_size = _disk_used_per_table[table];
|
|
|
|
if (node_info.drained) {
|
|
// Moving a tablet to a drained node is always bad.
|
|
// We may not have capacity information for the drained node, so we can't evaluate exact badness.
|
|
return migration_badness{0, 0, table_size, table_size};
|
|
}
|
|
|
|
double ideal_table_load = double(table_size) / _total_capacity_storage;
|
|
|
|
auto compute_load_and_dst_badness = [&] (uint64_t capacity, uint64_t new_used) {
|
|
double new_load = double(new_used) / capacity;
|
|
// Divide badness by table_size to take into account that moving a tablet of a small table has
|
|
// greater impact on balance of that table than moving a tablet of the same size of a larger table
|
|
return std::make_pair(new_load, (new_load - ideal_table_load) / table_size);
|
|
};
|
|
|
|
uint64_t capacity = node_info.shards[dst.shard].dusage->capacity;
|
|
uint64_t new_used = node_info.shards[dst.shard].tablet_sizes_per_table[table] + tablet_set_disk_size;
|
|
auto [new_shard_load, dst_shard_badness] = compute_load_and_dst_badness(capacity, new_used);
|
|
lblogger.trace("Table {} @{} shard balance threshold: {}, dst: {} ({:.4f})", table, dst,
|
|
ideal_table_load, new_shard_load, dst_shard_badness);
|
|
|
|
capacity = node_info.dusage->capacity;
|
|
new_used = node_info.tablet_sizes_per_table[table] + tablet_set_disk_size;
|
|
auto [new_node_load, dst_node_badness] = compute_load_and_dst_badness(capacity, new_used);
|
|
lblogger.trace("Table {} @{} node balance threshold: {}, dst: {} ({:.4f})", table, dst,
|
|
ideal_table_load, new_node_load, dst_node_badness);
|
|
|
|
return migration_badness{0, 0, dst_shard_badness, dst_node_badness};
|
|
}
|
|
|
|
// Evaluates impact on load balance of migrating a tablet set of a given table from src.
|
|
migration_badness evaluate_src_badness(node_load_map& nodes, table_id table, tablet_replica src, uint64_t tablet_set_disk_size) {
|
|
_current_stats->candidates_evaluated++;
|
|
|
|
auto& node_info = nodes[src.host];
|
|
|
|
// Size of all tablet replicas of the table in bytes.
|
|
uint64_t table_size = _disk_used_per_table[table];
|
|
|
|
if (node_info.drained) {
|
|
// Moving a tablet away from a drained node is always good.
|
|
return migration_badness{-1, -1, 0, 0};
|
|
}
|
|
|
|
double ideal_table_load = double(table_size) / _total_capacity_storage;
|
|
|
|
auto compute_load_and_src_badness = [&] (uint64_t capacity, uint64_t new_used) {
|
|
// Divide badness by table_size to take into account that moving a tablet of a small table has
|
|
// greater impact on balance of that table than moving a tablet of the same size of a larger table
|
|
double new_load = double(new_used) / capacity;
|
|
return std::make_pair(new_load, (ideal_table_load - new_load) / table_size);
|
|
};
|
|
|
|
uint64_t capacity = node_info.shards[src.shard].dusage->capacity;
|
|
uint64_t new_used = node_info.shards[src.shard].tablet_sizes_per_table[table] - tablet_set_disk_size;
|
|
auto [new_shard_load, src_shard_badness] = compute_load_and_src_badness(capacity, new_used);
|
|
lblogger.trace("Table {} @{} shard balance threshold: {}, src: {} ({:.4f})", table, src,
|
|
ideal_table_load, new_shard_load, src_shard_badness);
|
|
|
|
capacity = node_info.dusage->capacity;
|
|
new_used = node_info.tablet_sizes_per_table[table] - tablet_set_disk_size;
|
|
auto [new_node_load, src_node_badness] = compute_load_and_src_badness(capacity, new_used);
|
|
lblogger.trace("Table {} @{} node balance threshold: {}, src: {} ({:.4f})", table, src,
|
|
ideal_table_load, new_node_load, src_node_badness);
|
|
|
|
return migration_badness{src_shard_badness, src_node_badness, 0, 0};
|
|
}
|
|
|
|
// Evaluates impact on load balance of migrating a single tablet of a given table from src to dst.
|
|
migration_badness evaluate_candidate(node_load_map& nodes, table_id table, tablet_replica src, tablet_replica dst, uint64_t tablet_set_disk_size) {
|
|
auto src_badness = evaluate_src_badness(nodes, table, src, tablet_set_disk_size);
|
|
auto dst_badness = evaluate_dst_badness(nodes, table, dst, tablet_set_disk_size);
|
|
|
|
if (src.host == dst.host) {
|
|
src_badness.src_node_badness = 0;
|
|
dst_badness.dst_node_badness = 0;
|
|
}
|
|
|
|
return {
|
|
src_badness.src_shard_badness,
|
|
src_badness.src_node_badness,
|
|
dst_badness.dst_shard_badness,
|
|
dst_badness.dst_node_badness
|
|
};
|
|
}
|
|
|
|
future<migration_candidate> peek_candidate(node_load_map& nodes, shard_load& shard_info, tablet_replica src, tablet_replica dst) {
|
|
if (!_use_table_aware_balancing) {
|
|
co_return migration_candidate{peek_candidate(shard_info), src, dst, migration_badness{}};
|
|
}
|
|
|
|
if (shard_info.candidates.empty()) {
|
|
on_internal_error(lblogger, format("No candidates for migration on {}", src));
|
|
}
|
|
|
|
std::optional<migration_candidate> best_candidate;
|
|
|
|
for (auto&& [table, tablets] : shard_info.candidates) {
|
|
if (!tablets.empty()) {
|
|
auto badness = evaluate_candidate(nodes, table, src, dst, tablets.begin()->tablet_set_disk_size);
|
|
auto candidate = migration_candidate{*tablets.begin(), src, dst, badness};
|
|
lblogger.trace("Candidate: {}", candidate);
|
|
if (!best_candidate || candidate.badness < best_candidate->badness) {
|
|
best_candidate = candidate;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!best_candidate) {
|
|
on_internal_error(lblogger, format("No candidates for migration on {}", src));
|
|
}
|
|
|
|
lblogger.trace("Best candidate: {}", *best_candidate);
|
|
co_return *best_candidate;
|
|
}
|
|
|
|
void erase_candidate(shard_load& shard_info, migration_tablet_set tablets) {
|
|
if (_use_table_aware_balancing) {
|
|
auto table = tablets.table();
|
|
shard_info.candidates[table].erase(tablets);
|
|
if (shard_info.candidates[table].empty()) {
|
|
shard_info.candidates.erase(table);
|
|
}
|
|
} else {
|
|
shard_info.candidates_all_tables.erase(tablets);
|
|
}
|
|
}
|
|
|
|
void maybe_erase_colocated_candidate(shard_load& shard_info, const tablet_map& tmap, global_tablet_id tablet) {
|
|
if (!tmap.needs_merge()) {
|
|
return;
|
|
}
|
|
auto siblings = tmap.sibling_tablets(tablet.tablet);
|
|
if (!siblings) {
|
|
on_internal_error(lblogger, format("Unable to find sibling tablet of {} during merge", tablet));
|
|
}
|
|
auto left_sibling = global_tablet_id{tablet.table, siblings->first};
|
|
auto right_sibling = global_tablet_id{tablet.table, siblings->second};
|
|
erase_candidate(shard_info, migration_tablet_set{colocated_tablets{left_sibling, right_sibling}});
|
|
}
|
|
|
|
void erase_candidates(node_load_map& nodes, const tablet_map& tmap, const migration_tablet_set& tablets) {
|
|
// FIXME: indentation.
|
|
for (auto tablet : tablets.tablets()) {
|
|
auto& src_tinfo = tmap.get_tablet_info(tablet.tablet);
|
|
for (auto&& r : src_tinfo.replicas) {
|
|
if (nodes.contains(r.host)) {
|
|
lblogger.trace("Erasing tablet {} from {}", tablet, r);
|
|
// Not necessarily all replicas of sibling tablets are co-located, and so we need to
|
|
// remove them from candidate list using global_tablet_id.
|
|
erase_candidate(nodes[r.host].shards[r.shard], migration_tablet_set{tablet});
|
|
maybe_erase_colocated_candidate(nodes[r.host].shards[r.shard], tmap, tablet);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void add_candidate(shard_load& shard_info, migration_tablet_set tablets) {
|
|
if (_use_table_aware_balancing) {
|
|
shard_info.candidates[tablets.table()].insert(tablets);
|
|
} else {
|
|
shard_info.candidates_all_tables.insert(tablets);
|
|
}
|
|
}
|
|
|
|
// Checks whether moving a tablet from src_info to target_info would go against convergence.
|
|
// Returns false if the tablet should not be moved, and true if it may be moved.
|
|
//
|
|
// Moving tablets only when this method returns true ensures that balancing nodes will reach convergence.
|
|
// Otherwise, oscillations of tablet load between nodes across different plan making rounds could happen,
|
|
// where tablets are moved back and forth between nodes and convergence is never reached.
|
|
//
|
|
// The assumption is that the algorithm moves tablets from more loaded nodes to less loaded nodes,
|
|
// so convergence is reached where the node we picked as source has lower or equal load, than the node we
|
|
// picked as the destination will have post-movement.
|
|
bool check_convergence(node_load& src_info, node_load& dst_info, uint64_t tablet_sizes) {
|
|
if (src_info.drained) {
|
|
return true;
|
|
}
|
|
if (dst_info.drained) {
|
|
return false;
|
|
}
|
|
|
|
// Allow migrating only from candidate nodes which have higher load than the target.
|
|
if (src_info.avg_load <= dst_info.avg_load) {
|
|
lblogger.trace("Load inversion: src={} (avg_load={}), dst={} (avg_load={})",
|
|
src_info.id, src_info.avg_load, dst_info.id, dst_info.avg_load);
|
|
return false;
|
|
}
|
|
|
|
// Prevent load inversion post-movement which can lead to oscillations.
|
|
if (src_info.avg_load <= *dst_info.get_avg_load(tablet_sizes)) {
|
|
lblogger.trace("Load inversion post-movement: src={} (avg_load={}), dst={} (avg_load={}) tablet_sizes={}",
|
|
src_info.id, src_info.avg_load, dst_info.id, dst_info.avg_load, tablet_sizes);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool check_convergence(node_load& src_info, node_load& dst_info, const migration_tablet_set& tablet_set) {
|
|
return check_convergence(src_info, dst_info, tablet_set.tablet_set_disk_size);
|
|
}
|
|
|
|
// Checks whether moving a tablet from shard A to B (intra-node) would go against convergence.
|
|
// Returns false if the tablet should not be moved, and true if it may be moved.
|
|
// Can be called when node_info.drained.
|
|
bool check_intranode_convergence(const node_load& node_info, shard_id src_shard, shard_id dst_shard,
|
|
uint64_t used_size_delta) {
|
|
return node_info.shard_load(src_shard) > node_info.shard_load(dst_shard, int64_t(used_size_delta));
|
|
}
|
|
|
|
// Can be called when node_info.drained.
|
|
bool check_intranode_convergence(const node_load& node_info, shard_id src_shard, shard_id dst_shard,
|
|
const migration_tablet_set& tablet_set) {
|
|
return check_intranode_convergence(node_info, src_shard, dst_shard, tablet_set.tablet_set_disk_size);
|
|
}
|
|
|
|
// Adjusts the load of the source and destination shards in the host where intra-node migration happens.
|
|
void update_node_load_on_migration(node_load& node_load, host_id host, shard_id src, shard_id dst, const migration_tablet_set& tablet_set) {
|
|
auto tablet_count = tablet_set.tablets().size();
|
|
auto tablet_sizes = tablet_set.tablet_set_disk_size;
|
|
auto table = tablet_set.tablets().front().table;
|
|
|
|
auto& dst_shard = node_load.shards[dst];
|
|
dst_shard.tablet_count += tablet_count;
|
|
dst_shard.tablet_count_per_table[table] += tablet_count;
|
|
dst_shard.tablet_sizes_per_table[table] += tablet_sizes;
|
|
dst_shard.dusage->used += tablet_sizes;
|
|
|
|
auto& src_shard = node_load.shards[src];
|
|
src_shard.tablet_count -= tablet_count;
|
|
src_shard.tablet_count_per_table[table] -= tablet_count;
|
|
src_shard.tablet_sizes_per_table[table] -= tablet_sizes;
|
|
src_shard.dusage->used -= tablet_sizes;
|
|
}
|
|
|
|
// Adjusts the load of the source and destination (host:shard) that were picked for the migration.
|
|
void update_node_load_on_migration(node_load_map& nodes, tablet_replica src, tablet_replica dst, const migration_tablet_set& tablet_set) {
|
|
auto tablet_count = tablet_set.tablets().size();
|
|
auto tablet_sizes = tablet_set.tablet_set_disk_size;
|
|
auto table = tablet_set.tablets().front().table;
|
|
|
|
auto& dst_node = nodes[dst.host];
|
|
auto& dst_shard = dst_node.shards[dst.shard];
|
|
dst_shard.tablet_count += tablet_count;
|
|
dst_shard.tablet_count_per_table[table] += tablet_count;
|
|
dst_shard.tablet_sizes_per_table[table] += tablet_sizes;
|
|
dst_shard.dusage->used += tablet_sizes;
|
|
dst_node.tablet_count_per_table[table] += tablet_count;
|
|
dst_node.tablet_sizes_per_table[table] += tablet_sizes;
|
|
dst_node.tablet_count += tablet_count;
|
|
dst_node.dusage->used += tablet_sizes;
|
|
dst_node.update();
|
|
|
|
auto& src_node = nodes[src.host];
|
|
auto& src_shard = src_node.shards[src.shard];
|
|
src_shard.tablet_count -= tablet_count;
|
|
src_shard.tablet_count_per_table[table] -= tablet_count;
|
|
src_shard.tablet_sizes_per_table[table] -= tablet_sizes;
|
|
if (src_shard.dusage) {
|
|
src_shard.dusage->used -= tablet_sizes;
|
|
}
|
|
src_node.tablet_count_per_table[table] -= tablet_count;
|
|
src_node.tablet_sizes_per_table[table] -= tablet_sizes;
|
|
src_node.tablet_count -= tablet_count;
|
|
if (src_node.dusage) {
|
|
src_node.dusage->used -= tablet_sizes;
|
|
}
|
|
src_node.update();
|
|
}
|
|
|
|
static void unload(locator::load_sketch& sketch, host_id host, shard_id shard, const migration_tablet_set& tablet_set) {
|
|
sketch.unload(host, shard, tablet_set.tablets().size(), tablet_set.tablet_set_disk_size);
|
|
}
|
|
|
|
static void pick(locator::load_sketch& sketch, host_id host, shard_id shard, const migration_tablet_set& tablet_set) {
|
|
sketch.pick(host, shard, tablet_set.tablets().size(), tablet_set.tablet_set_disk_size);
|
|
}
|
|
|
|
void mark_as_scheduled(const tablet_migration_info& mig) {
|
|
_scheduled_tablets.insert(mig.tablet);
|
|
}
|
|
|
|
void mark_as_scheduled(const migration_plan::migrations_vector& migs) {
|
|
for (auto&& mig : migs) {
|
|
mark_as_scheduled(mig);
|
|
}
|
|
}
|
|
|
|
future<migration_plan> make_node_plan(node_load_map& nodes, host_id host, node_load& node_load) {
|
|
migration_plan plan;
|
|
const tablet_metadata& tmeta = _tm->tablets();
|
|
bool shuffle = in_shuffle_mode();
|
|
|
|
if (node_load.shard_count <= 1) {
|
|
lblogger.debug("Node {} is balanced", host);
|
|
co_return plan;
|
|
}
|
|
|
|
auto& sketch = *_load_sketch;
|
|
|
|
// Keeps candidate source shards in a heap which yields highest-loaded shard first.
|
|
std::vector<shard_id> src_shards;
|
|
src_shards.reserve(node_load.shard_count);
|
|
for (shard_id shard = 0; shard < node_load.shard_count; shard++) {
|
|
src_shards.push_back(shard);
|
|
}
|
|
std::make_heap(src_shards.begin(), src_shards.end(), node_load.shards_by_load_cmp());
|
|
|
|
load_type max_load = 0; // Tracks max load among shards which ran out of candidates.
|
|
|
|
while (true) {
|
|
co_await coroutine::maybe_yield();
|
|
|
|
if (src_shards.empty()) {
|
|
lblogger.debug("Unable to balance node {}: ran out of candidates, max load: {}, avg load: {}",
|
|
host, max_load, node_load.avg_load);
|
|
break;
|
|
}
|
|
|
|
shard_id src, dst;
|
|
|
|
// Post-conditions:
|
|
// 1) src and dst are chosen.
|
|
// 2) src_shards.back() == src.
|
|
if (shuffle) {
|
|
src = src_shards[rand_shard(src_shards.size())];
|
|
std::swap(src_shards.back(), src_shards[src]);
|
|
do {
|
|
dst = rand_shard(node_load.shard_count);
|
|
} while (src == dst); // There are at least two shards here so this converges.
|
|
} else {
|
|
std::pop_heap(src_shards.begin(), src_shards.end(), node_load.shards_by_load_cmp());
|
|
src = src_shards.back();
|
|
dst = sketch.get_least_loaded_shard(host);
|
|
}
|
|
|
|
auto push_back = seastar::defer([&] {
|
|
// When shuffling, src_shards is not a heap.
|
|
if (!shuffle) {
|
|
std::push_heap(src_shards.begin(), src_shards.end(), node_load.shards_by_load_cmp());
|
|
}
|
|
});
|
|
|
|
auto& src_info = node_load.shards[src];
|
|
|
|
// Convergence check
|
|
|
|
// When in shuffle mode, exit condition is guaranteed by running out of candidates or by load limit.
|
|
if (!shuffle && src == dst) {
|
|
lblogger.debug("Node {} is balanced", host);
|
|
break;
|
|
}
|
|
|
|
if (!src_info.has_candidates()) {
|
|
lblogger.debug("No more candidates on shard {} of {}", src, host);
|
|
if (src_info.dusage) {
|
|
max_load = std::max(max_load, src_info.dusage->get_load());
|
|
}
|
|
src_shards.pop_back();
|
|
push_back.cancel();
|
|
continue;
|
|
}
|
|
|
|
auto candidate = co_await peek_candidate(nodes, src_info, tablet_replica{host, src}, tablet_replica{host, dst});
|
|
auto tablets = candidate.tablets;
|
|
|
|
if (!shuffle && (src == dst || !check_intranode_convergence(node_load, src, dst, tablets))) {
|
|
lblogger.debug("Node {} is balanced", host);
|
|
break;
|
|
}
|
|
|
|
// Emit migration.
|
|
|
|
auto mig = get_migration_info(tablets, tablet_transition_kind::intranode_migration,
|
|
tablet_replica{host, src}, tablet_replica{host, dst});
|
|
auto& tmap = tmeta.get_tablet_map(tablets.table());
|
|
auto mig_streaming_info = get_migration_streaming_infos(_tm->get_topology(), tmap, mig);
|
|
|
|
if (!can_accept_load(nodes, mig_streaming_info)) {
|
|
_current_stats->migrations_skipped++;
|
|
lblogger.debug("Unable to balance {}: load limit reached", host);
|
|
break;
|
|
}
|
|
|
|
apply_load(nodes, mig_streaming_info);
|
|
lblogger.debug("Adding migration: {} size: {}", mig, tablets.tablet_set_disk_size);
|
|
_current_stats->migrations_produced++;
|
|
_current_stats->intranode_migrations_produced++;
|
|
mark_as_scheduled(mig);
|
|
plan.add(std::move(mig));
|
|
|
|
erase_candidates(nodes, tmap, tablets);
|
|
|
|
update_node_load_on_migration(node_load, host, src, dst, tablets);
|
|
pick(sketch, host, dst, tablets);
|
|
unload(sketch, host, src, tablets);
|
|
}
|
|
|
|
co_return plan;
|
|
}
|
|
|
|
future<migration_plan> make_intranode_plan(node_load_map& nodes, const std::unordered_set<host_id>& skip_nodes) {
|
|
migration_plan plan;
|
|
|
|
for (auto&& [host, node_load] : nodes) {
|
|
if (skip_nodes.contains(host)) {
|
|
lblogger.debug("Skipped balancing of node {}", host);
|
|
continue;
|
|
}
|
|
|
|
plan.merge(co_await make_node_plan(nodes, host, node_load));
|
|
}
|
|
|
|
co_return plan;
|
|
}
|
|
|
|
struct skip_info {
|
|
std::unordered_set<host_id> viable_targets;
|
|
};
|
|
|
|
// Verifies if moving a given tablet from src_info.id to dst_info.id would not violate
|
|
// replication constraints (no increase in replica co-location on nodes, racks).
|
|
// Returns std::nullopt if it does not and the movement is allowed.
|
|
std::optional<skip_info> check_constraints(node_load_map& nodes,
|
|
const locator::tablet_map& tmap,
|
|
node_load& src_info,
|
|
node_load& dst_info,
|
|
global_tablet_id tablet,
|
|
bool need_viable_targets)
|
|
{
|
|
int max_rack_load;
|
|
std::unordered_map<sstring, int> rack_load;
|
|
|
|
auto rs = get_rs(tablet.table);
|
|
|
|
auto get_viable_targets = [&] () {
|
|
std::unordered_set<host_id> viable_targets;
|
|
|
|
for (auto&& [id, node] : nodes) {
|
|
if (node.dc() != src_info.dc() || node.drained) {
|
|
continue;
|
|
}
|
|
if (rs->is_rack_based(_dc) && node.rack() != src_info.rack()) {
|
|
continue;
|
|
}
|
|
viable_targets.emplace(id);
|
|
}
|
|
|
|
for (auto&& r : tmap.get_tablet_info(tablet.tablet).replicas) {
|
|
viable_targets.erase(r.host);
|
|
}
|
|
|
|
if (_rack || rs->is_rack_based(_dc)) {
|
|
// If _rack is set, "nodes" contains only nodes from a single rack, and so does viable_targets.
|
|
// Therefore, rack overload constraints cannot possibly exclude any target.
|
|
// Also, if replication factor is rack based, we only move tablets within the rack, and
|
|
// viable targets belong to the same rack as source, and overload also cannot happen.
|
|
return viable_targets;
|
|
}
|
|
|
|
for (auto&& r : tmap.get_tablet_info(tablet.tablet).replicas) {
|
|
auto* node = _tm->get_topology().find_node(r.host);
|
|
if (!node) {
|
|
on_internal_error(lblogger, format("Node {} not found in topology", r.host));
|
|
}
|
|
if (node->dc() == src_info.dc()) {
|
|
rack_load[node->rack()] += 1;
|
|
}
|
|
}
|
|
|
|
// Drop targets which would increase max rack load.
|
|
|
|
max_rack_load = std::max_element(rack_load.begin(), rack_load.end(),
|
|
[] (auto& a, auto& b) { return a.second < b.second; })->second;
|
|
|
|
for (auto i = viable_targets.begin(); i != viable_targets.end(); ) {
|
|
auto target = *i;
|
|
auto& t_info = nodes[target];
|
|
auto old_i = i++;
|
|
if (src_info.rack() != t_info.rack()) {
|
|
auto new_rack_load = rack_load[t_info.rack()] + 1;
|
|
if (new_rack_load > max_rack_load) {
|
|
viable_targets.erase(old_i);
|
|
}
|
|
}
|
|
}
|
|
|
|
return viable_targets;
|
|
};
|
|
|
|
if (!_rack && dst_info.rack() != src_info.rack()) {
|
|
auto targets = get_viable_targets();
|
|
if (rs->is_rack_based(_dc)) {
|
|
lblogger.debug("candidate tablet {} skipped because RF is rack-based and it's in a different rack", tablet);
|
|
_current_stats->tablets_skipped_rack++;
|
|
return skip_info{std::move(targets)};
|
|
}
|
|
if (!targets.contains(dst_info.id)) {
|
|
auto new_rack_load = rack_load[dst_info.rack()] + 1;
|
|
lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}",
|
|
tablet, dst_info.rack(), new_rack_load, max_rack_load);
|
|
_current_stats->tablets_skipped_rack++;
|
|
return skip_info{std::move(targets)};
|
|
}
|
|
}
|
|
|
|
for (auto&& r : tmap.get_tablet_info(tablet.tablet).replicas) {
|
|
if (r.host == dst_info.id) {
|
|
_current_stats->tablets_skipped_node++;
|
|
lblogger.debug("candidate tablet {} skipped because it has a replica on target node", tablet);
|
|
if (need_viable_targets) {
|
|
return skip_info{get_viable_targets()};
|
|
}
|
|
return skip_info{};
|
|
}
|
|
}
|
|
|
|
return std::nullopt;
|
|
}
|
|
|
|
// Verifies if moving a given tablet from src_info.id to dst_info.id would not violate
|
|
// replication constraints (no increase in replica co-location on nodes, racks).
|
|
// Returns std::nullopt if it does not and the movement is allowed.
|
|
//
|
|
// The constraints might not be the same for two sibling tablets that have co-located
|
|
// replicas.
|
|
// Example:
|
|
// nodes = {A, B, C, D}
|
|
// tablet1 = {A, B, C}
|
|
// tablet2 = {A, B, D}
|
|
// viable target for {tablet1, B} is D.
|
|
// viable target for {tablet2, B} is C.
|
|
//
|
|
// When co-located replicas share a viable target, then a migration can be emitted to
|
|
// preserve co-location.
|
|
// To allow decommission when co-located replicas don't share a viable target, a skip
|
|
// info will be returned for each tablet, even though that means breaking this
|
|
// co-location. Decommission is higher in priority.
|
|
using skip_info_vector = std::vector<std::pair<skip_info, migration_tablet_set>>;
|
|
std::optional<skip_info_vector>
|
|
check_constraints(node_load_map& nodes,
|
|
const locator::tablet_map& tmap,
|
|
node_load& src_info,
|
|
node_load& dst_info,
|
|
migration_tablet_set tablet_set,
|
|
bool need_viable_targets) {
|
|
std::unordered_map<host_id, unsigned> viable_targets_count;
|
|
std::unordered_map<global_tablet_id, skip_info> skip_info_per_tablet;
|
|
std::unordered_set<host_id> shared_viable_targets;
|
|
const size_t tablet_count = tablet_set.tablets().size();
|
|
|
|
for (auto tablet : tablet_set.tablets()) {
|
|
auto skip = check_constraints(nodes, tmap, src_info, dst_info, tablet, need_viable_targets);
|
|
if (!skip) {
|
|
continue;
|
|
}
|
|
for (const auto& target : skip->viable_targets) {
|
|
// A viable target is considered shared if all candidates share that same viable target.
|
|
if (++viable_targets_count[target] == tablet_count) {
|
|
shared_viable_targets.insert(target);
|
|
}
|
|
}
|
|
skip_info_per_tablet.emplace(std::make_pair(tablet, std::move(*skip)));
|
|
}
|
|
if (skip_info_per_tablet.empty()) {
|
|
return std::nullopt;
|
|
}
|
|
if (!shared_viable_targets.empty()) {
|
|
return skip_info_vector{std::make_pair(skip_info{std::move(shared_viable_targets)}, std::move(tablet_set))};
|
|
}
|
|
|
|
skip_info_vector skip_infos;
|
|
skip_infos.reserve(skip_info_per_tablet.size());
|
|
for (auto&& [tablet, info] : skip_info_per_tablet) {
|
|
skip_infos.push_back(std::make_pair(std::move(info), migration_tablet_set{std::move(tablet)}));
|
|
}
|
|
return skip_infos;
|
|
}
|
|
|
|
// Picks best tablet replica to move and its new destination.
|
|
// The destination host is picked among nodes_by_load_dst, with dst being the preferred destination.
|
|
//
|
|
// If drain_skipped is false, the replica is picked among tablets on src.host,
|
|
// with src.shard as the preferred source shard.
|
|
//
|
|
// If drain_skipped is true, the chosen replica is src_node_info.skipped_candidates.back()
|
|
// and src must match its location.
|
|
//
|
|
// Pre-conditions:
|
|
//
|
|
// src_node_info.id == src.host
|
|
// target_info.id == dst.host
|
|
// src_node_info.shard_by_load.back() == src.shard
|
|
// nodes_by_load_dst.back().id == dst.host
|
|
//
|
|
// if drain_skipped == true:
|
|
// src_node_info.skipped_candidates.back().replica = src
|
|
//
|
|
// if drain_skipped == false:
|
|
// src_node_info.shards_by_load
|
|
//
|
|
// Invariants:
|
|
//
|
|
// nodes_by_load_dst[:-1] is a valid heap
|
|
// src_node_info.shard_by_load[:-1] is a valid heap
|
|
//
|
|
// Post-conditions:
|
|
//
|
|
// src_node_info.shard_by_load.back() == result.src.shard
|
|
// nodes_by_load_dst.back().id == result.dst.host
|
|
// result.tablet is removed from candidate lists in src_node_info.
|
|
//
|
|
future<migration_candidate> pick_candidate(node_load_map& nodes,
|
|
node_load& src_node_info,
|
|
node_load& target_info,
|
|
tablet_replica src,
|
|
tablet_replica dst,
|
|
std::vector<host_id>& nodes_by_load_dst,
|
|
bool drain_skipped)
|
|
{
|
|
auto get_candidate = [this, drain_skipped, &nodes, &src_node_info] (tablet_replica src, tablet_replica dst)
|
|
-> future<migration_candidate> {
|
|
if (drain_skipped) {
|
|
auto source_tablets = src_node_info.skipped_candidates.back().tablets;
|
|
auto badness = evaluate_candidate(nodes, source_tablets.table(), src, dst, source_tablets.tablet_set_disk_size);
|
|
co_return migration_candidate{source_tablets, src, dst, badness};
|
|
} else {
|
|
auto&& src_shard_info = src_node_info.shards[src.shard];
|
|
co_return co_await peek_candidate(nodes, src_shard_info, src, dst);
|
|
}
|
|
};
|
|
|
|
migration_candidate min_candidate = co_await get_candidate(src, dst);
|
|
|
|
// Given src as the source replica, evaluate all destinations.
|
|
// Updates min_candidate with the best candidate, if better is found.
|
|
auto evaluate_targets = [&] (migration_tablet_set tablets, tablet_replica src, migration_badness src_badness) -> future<> {
|
|
migration_badness min_dst_badness;
|
|
std::optional<host_id> min_dst_host;
|
|
std::vector<host_id> best_hosts;
|
|
|
|
// First, find the best target nodes in terms of node badness.
|
|
for (auto& new_target : nodes_by_load_dst) {
|
|
co_await coroutine::maybe_yield();
|
|
auto& new_target_info = nodes[new_target];
|
|
|
|
// Skip movements which may harm convergence.
|
|
if (!check_convergence(src_node_info, new_target_info, tablets)) {
|
|
continue;
|
|
}
|
|
|
|
auto badness = evaluate_dst_badness(nodes, tablets.table(), tablet_replica{new_target, 0}, tablets.tablet_set_disk_size);
|
|
if (!min_dst_host || badness.dst_node_badness < min_dst_badness.dst_node_badness) {
|
|
min_dst_badness = badness;
|
|
min_dst_host = new_target;
|
|
best_hosts.clear();
|
|
}
|
|
if (badness.dst_node_badness == min_dst_badness.dst_node_badness) {
|
|
best_hosts.push_back(new_target);
|
|
}
|
|
}
|
|
|
|
if (!min_dst_host) {
|
|
lblogger.debug("No viable targets for src node {}", src.host);
|
|
co_return;
|
|
}
|
|
|
|
std::optional<tablet_replica> min_dst;
|
|
|
|
// Find the best shards on best targets.
|
|
|
|
std::vector<tablet_replica> best_dsts;
|
|
for (auto host : best_hosts) {
|
|
for (shard_id new_dst_shard = 0; new_dst_shard < nodes[host].shard_count; new_dst_shard++) {
|
|
co_await coroutine::maybe_yield();
|
|
auto new_dst = tablet_replica{host, new_dst_shard};
|
|
|
|
auto badness = evaluate_dst_badness(nodes, tablets.table(), new_dst, tablets.tablet_set_disk_size);
|
|
if (!min_dst || badness < min_dst_badness) {
|
|
min_dst_badness = badness;
|
|
min_dst = new_dst;
|
|
best_dsts.clear();
|
|
}
|
|
if (badness.dst_shard_badness == min_dst_badness.dst_shard_badness) {
|
|
best_dsts.push_back(new_dst);
|
|
}
|
|
}
|
|
if (min_dst && !min_dst_badness.is_bad()) {
|
|
break;
|
|
}
|
|
}
|
|
if (best_dsts.size() > 1) {
|
|
min_dst = best_dsts[rand_int() % best_dsts.size()];
|
|
}
|
|
|
|
if (!min_dst) {
|
|
on_internal_error(lblogger, fmt::format("No destination shards on {}", best_hosts));
|
|
}
|
|
|
|
auto candidate = migration_candidate{
|
|
tablets, src, *min_dst,
|
|
migration_badness{src_badness.src_shard_badness,
|
|
src_badness.src_node_badness,
|
|
min_dst_badness.dst_shard_badness,
|
|
min_dst_badness.dst_node_badness}
|
|
};
|
|
|
|
lblogger.trace("candidate: {}", candidate);
|
|
|
|
if (candidate.badness < min_candidate.badness) {
|
|
min_candidate = candidate;
|
|
}
|
|
};
|
|
|
|
if (min_candidate.badness.is_bad() && _use_table_aware_balancing) {
|
|
_current_stats->bad_first_candidates++;
|
|
|
|
// Consider better alternatives.
|
|
if (drain_skipped) {
|
|
auto tablets = src_node_info.skipped_candidates.back().tablets;
|
|
auto badness = evaluate_src_badness(nodes, tablets.table(), src, tablets.tablet_set_disk_size);
|
|
co_await evaluate_targets(tablets, src, badness);
|
|
} else {
|
|
// Find a better candidate.
|
|
// Consider different tables. For each table, first find the best source shard.
|
|
// Then find the best target node. Then find the best shard on the target node.
|
|
for (auto [table, tablet_count] : src_node_info.tablet_count_per_table) {
|
|
if (tablet_count == 0) {
|
|
lblogger.trace("No src candidates for table {} on node {}", table, src.host);
|
|
continue;
|
|
}
|
|
|
|
migration_badness min_src_badness;
|
|
std::optional<tablet_replica> min_src;
|
|
std::optional<migration_tablet_set> min_tablet_set;
|
|
auto check_candidate = [&] (const tablet_replica& new_src, const migration_tablet_set& tablet_set) {
|
|
auto badness = evaluate_src_badness(nodes, table, new_src, tablet_set.tablet_set_disk_size);
|
|
if (!min_src || badness < min_src_badness) {
|
|
min_src_badness = badness;
|
|
min_src = new_src;
|
|
min_tablet_set = tablet_set;
|
|
}
|
|
};
|
|
for (auto new_src_shard: src_node_info.shards_by_load) {
|
|
auto new_src = tablet_replica{src.host, new_src_shard};
|
|
if (src_node_info.shards[new_src_shard].candidates[table].empty()) {
|
|
lblogger.trace("No src candidates for table {} on shard {}", table, new_src);
|
|
continue;
|
|
}
|
|
co_await coroutine::maybe_yield();
|
|
if (_force_capacity_based_balancing) {
|
|
check_candidate(new_src, *src_node_info.shards[new_src_shard].candidates[table].begin());
|
|
} else {
|
|
for (const auto& tablet_set: src_node_info.shards[new_src_shard].candidates[table]) {
|
|
check_candidate(new_src, tablet_set);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!min_src) {
|
|
lblogger.debug("No candidates for table {} on {}", table, src.host);
|
|
continue;
|
|
}
|
|
|
|
co_await evaluate_targets(*min_tablet_set, *min_src, min_src_badness);
|
|
if (!min_candidate.badness.is_bad()) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
lblogger.trace("best candidate: {}", min_candidate);
|
|
|
|
if (drain_skipped) {
|
|
src_node_info.skipped_candidates.pop_back();
|
|
} else {
|
|
erase_candidate(src_node_info.shards[min_candidate.src.shard], min_candidate.tablets);
|
|
}
|
|
|
|
// Restore invariants.
|
|
|
|
if (min_candidate.dst != dst) {
|
|
lblogger.trace("dst changed.");
|
|
|
|
if (min_candidate.dst.host != dst.host) {
|
|
auto i = std::find(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), min_candidate.dst.host);
|
|
std::swap(*i, nodes_by_load_dst.back());
|
|
|
|
auto nodes_dst_cmp = [cmp = nodes_by_load_cmp(nodes)] (const host_id& a, const host_id& b) {
|
|
return cmp(b, a);
|
|
};
|
|
std::make_heap(nodes_by_load_dst.begin(), std::prev(nodes_by_load_dst.end()), nodes_dst_cmp);
|
|
}
|
|
|
|
if (min_candidate.src.shard != src.shard) {
|
|
lblogger.trace("src changed.");
|
|
auto i = std::find(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), min_candidate.src.shard);
|
|
std::swap(src_node_info.shards_by_load.back(), *i);
|
|
std::make_heap(src_node_info.shards_by_load.begin(), std::prev(src_node_info.shards_by_load.end()),
|
|
src_node_info.shards_by_load_cmp());
|
|
}
|
|
}
|
|
|
|
co_return min_candidate;
|
|
}
|
|
|
|
future<> log_table_load(node_load_map& nodes, table_id table) {
|
|
load_type total_load = 0;
|
|
size_t shard_count = 0;
|
|
load_type max_shard_load = 0;
|
|
|
|
for (auto&& [host, node] : nodes) {
|
|
if (node.drained) {
|
|
continue;
|
|
}
|
|
shard_count += node.shard_count;
|
|
load_type this_node_max_shard_load = 0;
|
|
load_type node_load = 0;
|
|
for (shard_id shard = 0; shard < node.shard_count; shard++) {
|
|
co_await coroutine::maybe_yield();
|
|
load_type load = double(node.shards[shard].tablet_count_per_table[table]) * _target_tablet_size
|
|
/ *node.capacity_per_shard();
|
|
total_load += load;
|
|
node_load += load;
|
|
max_shard_load = std::max(max_shard_load, load);
|
|
this_node_max_shard_load = std::max(this_node_max_shard_load, load);
|
|
}
|
|
node_load /= node.shard_count;
|
|
lblogger.debug("Load on host {} for table {}: total={}, max={}", host, table, node_load, this_node_max_shard_load);
|
|
}
|
|
auto avg_load = double(total_load) / shard_count;
|
|
auto overcommit = max_shard_load / avg_load;
|
|
lblogger.debug("Table {} shard overcommit: {}", table, overcommit);
|
|
}
|
|
|
|
future<migration_plan> make_internode_plan(node_load_map& nodes,
|
|
const std::unordered_set<host_id>& nodes_to_drain,
|
|
host_id target) {
|
|
migration_plan plan;
|
|
|
|
// Prepare candidate nodes and shards for heap-based balancing.
|
|
|
|
// Any given node is either in nodes_by_load or nodes_by_load_dst, but not both.
|
|
// This means that either of the heap needs to be updated when the node's load changes, not both.
|
|
|
|
// heap which tracks most-loaded nodes in terms of avg_load.
|
|
// It is used to find source tablet candidates.
|
|
std::vector<host_id> nodes_by_load;
|
|
nodes_by_load.reserve(nodes.size());
|
|
|
|
// heap which tracks least-loaded nodes in terms of avg_load.
|
|
// Used to find candidates for target nodes.
|
|
std::vector<host_id> nodes_by_load_dst;
|
|
nodes_by_load_dst.reserve(nodes.size());
|
|
|
|
auto nodes_cmp = nodes_by_load_cmp(nodes);
|
|
auto nodes_dst_cmp = [&] (const host_id& a, const host_id& b) {
|
|
return nodes_cmp(b, a);
|
|
};
|
|
|
|
for (auto&& [host, node_load] : nodes) {
|
|
if (lblogger.is_enabled(seastar::log_level::debug)) {
|
|
shard_id shard = 0;
|
|
for (auto&& shard_load : node_load.shards) {
|
|
lblogger.debug("shard {}: load: {}, tablets: {}, candidates: {}, tables: {}", tablet_replica {host, shard},
|
|
node_load.shard_load(shard), shard_load.tablet_count,
|
|
shard_load.candidate_count(), shard_load.tablet_count_per_table);
|
|
shard++;
|
|
}
|
|
}
|
|
|
|
if (host != target && (nodes_to_drain.empty() || node_load.drained)) {
|
|
nodes_by_load.push_back(host);
|
|
std::make_heap(node_load.shards_by_load.begin(), node_load.shards_by_load.end(),
|
|
node_load.shards_by_load_cmp());
|
|
} else {
|
|
nodes_by_load_dst.push_back(host);
|
|
}
|
|
}
|
|
|
|
std::make_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
|
|
std::make_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
|
|
|
const tablet_metadata& tmeta = _tm->tablets();
|
|
const locator::topology& topo = _tm->get_topology();
|
|
load_type max_off_candidate_load = 0; // max load among nodes which ran out of candidates.
|
|
auto batch_size = nodes[target].shard_count;
|
|
const size_t max_skipped_migrations = nodes[target].shards.size() * 2;
|
|
size_t skipped_migrations = 0;
|
|
auto shuffle = in_shuffle_mode();
|
|
while (plan.size() < batch_size) {
|
|
co_await coroutine::maybe_yield();
|
|
|
|
if (nodes_by_load.empty()) {
|
|
lblogger.debug("No more candidate nodes");
|
|
_current_stats->stop_no_candidates++;
|
|
break;
|
|
}
|
|
|
|
// Pick source node.
|
|
|
|
std::pop_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
|
|
auto src_host = nodes_by_load.back();
|
|
auto& src_node_info = nodes[src_host];
|
|
|
|
bool drain_skipped = src_node_info.shards_by_load.empty() && src_node_info.drained
|
|
&& !src_node_info.skipped_candidates.empty();
|
|
|
|
lblogger.debug("source node: {}, avg_load={:.2f}, skipped={}, drain_skipped={}", src_host,
|
|
src_node_info.avg_load, src_node_info.skipped_candidates.size(), drain_skipped);
|
|
|
|
if (src_node_info.shards_by_load.empty() && !drain_skipped) {
|
|
lblogger.debug("candidate node {} ran out of candidate shards with {} tablets remaining",
|
|
src_host, src_node_info.tablet_count);
|
|
max_off_candidate_load = std::max(max_off_candidate_load, src_node_info.avg_load);
|
|
nodes_by_load.pop_back();
|
|
continue;
|
|
}
|
|
|
|
auto push_back_node_candidate = seastar::defer([&] {
|
|
std::push_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
|
|
});
|
|
|
|
tablet_replica src;
|
|
|
|
auto push_back_shard_candidate = seastar::defer([&] {
|
|
std::push_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
|
|
});
|
|
|
|
if (drain_skipped) {
|
|
push_back_shard_candidate.cancel();
|
|
auto& candidate = src_node_info.skipped_candidates.back();
|
|
src = candidate.replica;
|
|
lblogger.debug("Skipped candidate: tablet={}, replica={}, targets={}", candidate.tablets, src, candidate.viable_targets);
|
|
|
|
// When draining, need to narrow down targets to viable targets before choosing the best target.
|
|
nodes_by_load_dst.clear();
|
|
for (auto&& h : candidate.viable_targets) {
|
|
nodes_by_load_dst.push_back(h);
|
|
}
|
|
std::make_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
|
} else {
|
|
// Pick best source shard.
|
|
|
|
std::pop_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(),
|
|
src_node_info.shards_by_load_cmp());
|
|
auto src_shard = src_node_info.shards_by_load.back();
|
|
src = tablet_replica {src_host, src_shard};
|
|
auto&& src_shard_info = src_node_info.shards[src_shard];
|
|
if (!src_shard_info.has_candidates()) {
|
|
lblogger.debug("shard {} ran out of candidates with {} tablets remaining.", src,
|
|
src_shard_info.tablet_count);
|
|
src_node_info.shards_by_load.pop_back();
|
|
push_back_shard_candidate.cancel();
|
|
if (src_node_info.shards_by_load.empty()) {
|
|
lblogger.debug("candidate node {} ran out of candidate shards with {} tablets remaining, {} skipped.",
|
|
src_host, src_node_info.tablet_count, src_node_info.skipped_candidates.size());
|
|
}
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Pick best target node.
|
|
|
|
if (nodes_by_load_dst.empty()) {
|
|
lblogger.debug("No more target nodes");
|
|
_current_stats->stop_no_candidates++;
|
|
break;
|
|
}
|
|
|
|
std::pop_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
|
target = nodes_by_load_dst.back();
|
|
auto& target_info = nodes[target];
|
|
auto push_back_target_node = seastar::defer([&] {
|
|
std::push_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
|
});
|
|
|
|
lblogger.debug("target node: {}, avg_load={}", target, target_info.avg_load);
|
|
|
|
// Check convergence conditions.
|
|
|
|
// When draining nodes, disable convergence checks so that all tablets are migrated away.
|
|
bool can_check_convergence = !shuffle && nodes_to_drain.empty();
|
|
if (can_check_convergence) {
|
|
// Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load)
|
|
// and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than
|
|
// that of any candidate, and avg_load of any candidate is not greater than that of any in the off-candidates set.
|
|
// This is ensured by the fact that we remove candidates in the order of avg_load from the heap, and
|
|
// because we prevent load inversion between candidate and target in the next check.
|
|
// So the max avg_load of candidates is that of the current src_node_info, and max avg_load of off-candidates
|
|
// is tracked in max_off_candidate_load. If max_off_candidate_load is equal to target's avg_load,
|
|
// it means that all nodes have equal avg_load. We take the maximum with the current candidate in src_node_info
|
|
// to handle the case of off-candidates being empty. In that case, max_off_candidate_load is 0.
|
|
const load_type max_load = std::max(max_off_candidate_load, src_node_info.avg_load);
|
|
if (is_balanced(target_info.avg_load, max_load)) {
|
|
lblogger.debug("Balance achieved.");
|
|
_current_stats->stop_balance++;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Pick best target shard.
|
|
|
|
auto dst = global_shard_id {target, _load_sketch->get_least_loaded_shard(target)};
|
|
lblogger.trace("target shard: {}, tablets={}, load={}", dst.shard,
|
|
target_info.shards[dst.shard].tablet_count,
|
|
target_info.shard_load(dst.shard));
|
|
|
|
if (lblogger.is_enabled(seastar::log_level::trace)) {
|
|
shard_id shard = 0;
|
|
for (auto&& shard_load : target_info.shards) {
|
|
lblogger.trace("shard {}: load: {}, tablets: {}, candidates: {}, tables: {}", tablet_replica{dst.host, shard},
|
|
target_info.shard_load(shard), shard_load.tablet_count,
|
|
shard_load.candidate_count(), shard_load.tablet_count_per_table);
|
|
shard++;
|
|
}
|
|
}
|
|
|
|
// Pick tablet movement.
|
|
|
|
// May choose a different source shard than src.shard or different destination host/shard than dst.
|
|
auto candidate = co_await pick_candidate(nodes, src_node_info, target_info, src, dst, nodes_by_load_dst,
|
|
drain_skipped);
|
|
auto source_tablets = candidate.tablets;
|
|
src = candidate.src;
|
|
dst = candidate.dst;
|
|
|
|
auto& tmap = tmeta.get_tablet_map(source_tablets.table());
|
|
if (can_check_convergence && !check_convergence(src_node_info, target_info, source_tablets)) {
|
|
lblogger.debug("No more candidates. Load would be inverted.");
|
|
_current_stats->stop_load_inversion++;
|
|
break;
|
|
}
|
|
|
|
// Check replication strategy constraints.
|
|
|
|
// When drain_skipped is true, we already picked movement to a viable target.
|
|
if (!drain_skipped) {
|
|
auto process_skip_info = [&] (migration_tablet_set tablets, skip_info skip) {
|
|
if (src_node_info.drained && skip.viable_targets.empty()) {
|
|
auto tablet = tablets.tablets().front();
|
|
auto replicas = tmap.get_tablet_info(tablet.tablet).replicas;
|
|
auto reason = fmt::format("Unable to find new replica for tablet {} on {} when draining {}. Consider adding new nodes or reducing replication factor. (nodes {}, replicas {})",
|
|
tablet, src, nodes_to_drain, nodes_by_load_dst, replicas);
|
|
lblogger.warn("{}", reason);
|
|
plan.add(drain_failure(src_node_info.id, reason));
|
|
return;
|
|
}
|
|
lblogger.debug("Adding replica {} of candidate {} to skipped list with the viable targets {}", src, candidate, skip.viable_targets);
|
|
src_node_info.skipped_candidates.emplace_back(src, tablets, std::move(skip.viable_targets));
|
|
};
|
|
|
|
auto skip = check_constraints(nodes, tmap, src_node_info, nodes[dst.host], source_tablets, src_node_info.drained);
|
|
if (skip) {
|
|
for (auto&& [skip_info, tablets] : *skip) {
|
|
process_skip_info(tablets, skip_info);
|
|
}
|
|
if (!plan.drain_failures().empty()) {
|
|
break;
|
|
}
|
|
continue;
|
|
}
|
|
}
|
|
if (candidate.badness.is_bad()) {
|
|
_current_stats->bad_migrations++;
|
|
}
|
|
|
|
if (drain_skipped) {
|
|
_current_stats->migrations_from_skiplist++;
|
|
}
|
|
|
|
if (src_node_info.req && *src_node_info.req == topology_request::leave && src_node_info.excluded) {
|
|
plan.add(drain_failure(src_node_info.id, "Node was marked as excluded"));
|
|
break;
|
|
}
|
|
|
|
tablet_transition_kind kind = (src_node_info.state() == locator::node::state::being_removed
|
|
|| src_node_info.state() == locator::node::state::left
|
|
|| (src_node_info.req && *src_node_info.req == topology_request::remove))
|
|
? locator::choose_rebuild_transition_kind(_db.features()) : tablet_transition_kind::migration;
|
|
auto mig = get_migration_info(source_tablets, kind, src, dst);
|
|
auto mig_streaming_info = get_migration_streaming_infos(topo, tmap, mig);
|
|
|
|
pick(*_load_sketch, dst.host, dst.shard, source_tablets);
|
|
|
|
if (can_accept_load(nodes, mig_streaming_info)) {
|
|
apply_load(nodes, mig_streaming_info);
|
|
lblogger.debug("Adding migration: {} size: {}", mig, source_tablets.tablet_set_disk_size);
|
|
_current_stats->migrations_produced++;
|
|
mark_as_scheduled(mig);
|
|
plan.add(std::move(mig));
|
|
} else {
|
|
// Shards are overloaded with streaming. Do not include the migration in the plan, but
|
|
// continue as if it was in the hope that we will find a migration which can be executed without
|
|
// violating the load. Next make_plan() invocation will notice that the migration was not executed.
|
|
// We should not just stop here because that can lead to underutilization of the cluster.
|
|
// Just because the next migration is blocked doesn't mean we could not proceed with migrations
|
|
// for other shards which are produced by the planner subsequently.
|
|
skipped_migrations++;
|
|
_current_stats->migrations_skipped++;
|
|
if (skipped_migrations >= max_skipped_migrations) {
|
|
lblogger.debug("Too many migrations skipped, aborting balancing");
|
|
_current_stats->stop_skip_limit++;
|
|
break;
|
|
}
|
|
}
|
|
|
|
erase_candidates(nodes, tmap, source_tablets);
|
|
|
|
update_node_load_on_migration(nodes, src, dst, source_tablets);
|
|
if (src_node_info.tablet_count == 0) {
|
|
push_back_node_candidate.cancel();
|
|
nodes_by_load.pop_back();
|
|
}
|
|
|
|
if (lblogger.is_enabled(seastar::log_level::debug)) {
|
|
co_await log_table_load(nodes, source_tablets.table());
|
|
}
|
|
}
|
|
|
|
if (plan.size() == batch_size) {
|
|
_current_stats->stop_batch_size++;
|
|
}
|
|
|
|
if (plan.empty()) {
|
|
// Due to replica collocation constraints, it may not be possible to balance the cluster evenly.
|
|
// For example, if nodes have different number of shards. Nodes which have more shards will be
|
|
// replicas for more tablets which rules out more candidates on other nodes with a higher per-shard load.
|
|
//
|
|
// Example:
|
|
//
|
|
// node1: 1 shard
|
|
// node2: 1 shard
|
|
// node3: 7 shard
|
|
//
|
|
// If there are 7 tablets and RF=3, each node must have 1 tablet replica.
|
|
// So node3 will have average load of 1, and node1 and node2 will have
|
|
// average shard load of 7.
|
|
|
|
// Show when this is the final plan with no active migrations left to execute,
|
|
// otherwise it may just be a temporary situation due to lack of candidates.
|
|
if (_migrating_candidates == 0) {
|
|
lblogger.info("Not possible to achieve balance in {}", _location);
|
|
print_node_stats(nodes, only_active::no);
|
|
}
|
|
}
|
|
|
|
co_return std::move(plan);
|
|
}
|
|
|
|
class sibling_tablets_replicas_processor {
|
|
const tablet_desc _t1;
|
|
const std::optional<tablet_desc> _t2;
|
|
tablet_replica_set _t1_replicas;
|
|
tablet_replica_set _t2_replicas;
|
|
tablet_replica_set::iterator _current_t1;
|
|
tablet_replica_set::iterator _current_t2;
|
|
public:
|
|
sibling_tablets_replicas_processor(const tablet_desc t1, const std::optional<tablet_desc> t2,
|
|
tablet_replica_set t1_replicas, tablet_replica_set t2_replicas)
|
|
: _t1(std::move(t1))
|
|
, _t2(std::move(t2))
|
|
, _t1_replicas(std::move(t1_replicas))
|
|
, _t2_replicas(std::move(t2_replicas))
|
|
, _current_t1(_t1_replicas.begin())
|
|
, _current_t2(_t2_replicas.begin()) {
|
|
}
|
|
|
|
using tablet_ids = utils::small_vector<tablet_id, 2>;
|
|
|
|
// Produces the next replica from sets of sibling tablets. If a given replica has
|
|
// the sibling tablets co-located in it, the ids of both tablets will be returned
|
|
// for that replica.
|
|
// Given replica sets of sibling tablets:
|
|
// t1 {A, B, C},
|
|
// t2 {A, C, D},
|
|
// it will yield
|
|
// {A, {t1, t2}}, {B, {t1}}, {C, {t1, t2}}, {D, {t2}}
|
|
// Invariant: if return value is engaged, size of tablet_ids will be 1 or 2.
|
|
std::optional<std::pair<tablet_replica, tablet_ids>> next_replica() {
|
|
if (_current_t1 == _t1_replicas.end() && _current_t2 == _t2_replicas.end()) {
|
|
return std::nullopt;
|
|
}
|
|
if (_current_t1 == _t1_replicas.end()) {
|
|
return std::make_pair(*_current_t2++, tablet_ids{_t2->tid});
|
|
}
|
|
if (_current_t2 == _t2_replicas.end()) {
|
|
return std::make_pair(*_current_t1++, tablet_ids{_t1.tid});
|
|
}
|
|
// Detect co-located replicas of sibling tablets.
|
|
if (*_current_t1 == *_current_t2) {
|
|
_current_t1++;
|
|
return std::make_pair(*_current_t2++, tablet_ids{_t1.tid, _t2->tid});
|
|
}
|
|
if (*_current_t1 < *_current_t2) {
|
|
return std::make_pair(*_current_t1++, tablet_ids{_t1.tid});
|
|
}
|
|
return std::make_pair(*_current_t2++, tablet_ids{_t2->tid});
|
|
}
|
|
};
|
|
|
|
using only_active = bool_class<struct only_active_tag>;
|
|
|
|
void print_node_stats(node_load_map& nodes, only_active only_active_) {
|
|
for (auto&& [host, load] : nodes) {
|
|
size_t read = 0;
|
|
size_t write = 0;
|
|
for (auto& shard_load : load.shards) {
|
|
read += shard_load.streaming_read_load;
|
|
write += shard_load.streaming_write_load;
|
|
}
|
|
auto level = !only_active_ || (read + write) > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
|
lblogger.log(level, "Node {}: {}/{} load={:.6f} tablets={} shards={} tablets/shard={:.3f} state={} cap={}"
|
|
" rd={} wr={}",
|
|
host, load.dc(), load.rack(), load.avg_load, load.tablet_count, load.shard_count,
|
|
load.tablets_per_shard(), load.state(), load.dusage->capacity, read, write);
|
|
}
|
|
}
|
|
|
|
future<migration_plan> make_plan(dc_name dc, std::optional<sstring> rack = std::nullopt) {
|
|
migration_plan plan;
|
|
|
|
if (utils::get_local_injector().enter("tablet_migration_bypass")) {
|
|
co_return std::move(plan);
|
|
}
|
|
|
|
_dc = dc;
|
|
_rack = rack;
|
|
_location = fmt::format("{}{}", dc, rack ? fmt::format("/{}", *rack) : "");
|
|
_current_stats = _stats.for_dc(dc);
|
|
auto _ = seastar::defer([&] { _current_stats = nullptr; });
|
|
_migrating_candidates = 0;
|
|
|
|
auto node_filter = [&] (const locator::node& node) {
|
|
return node.dc_rack().dc == dc && (!rack || node.dc_rack().rack == *rack);
|
|
};
|
|
|
|
// Causes load balancer to move some tablet even though load is balanced.
|
|
auto shuffle = in_shuffle_mode();
|
|
|
|
_current_stats->calls++;
|
|
lblogger.debug("Examining DC {} rack {} (shuffle={}, balancing={}, tablets_per_shard_goal={}, force_capacity_based_balancing={})",
|
|
dc, rack, shuffle, _tm->tablets().balancing_enabled(), _tablets_per_shard_goal, _force_capacity_based_balancing);
|
|
|
|
const locator::topology& topo = _tm->get_topology();
|
|
|
|
// Select subset of nodes to balance.
|
|
|
|
node_load_map nodes;
|
|
std::unordered_set<host_id> nodes_to_drain;
|
|
|
|
_tm->for_each_token_owner([&] (const locator::node& node) {
|
|
if (!node_filter(node)) {
|
|
return;
|
|
}
|
|
|
|
auto drain_node = [&] (topology_request req) {
|
|
lblogger.info("Will drain node {} ({}) from DC {} due to {} request", node.host_id(), node.get_state(), dc, req);
|
|
ensure_node(nodes, node.host_id());
|
|
nodes_to_drain.emplace(node.host_id());
|
|
auto& n = nodes[node.host_id()];
|
|
n.req = req;
|
|
n.drained = true;
|
|
};
|
|
|
|
auto req = _topology ? _topology->get_request(raft::server_id(node.host_id().uuid())) : std::nullopt;
|
|
|
|
if (node.get_state() == locator::node::state::being_decommissioned) {
|
|
drain_node(topology_request::leave);
|
|
} else if (node.get_state() == locator::node::state::being_removed) {
|
|
drain_node(topology_request::remove);
|
|
} else if (req && (*req == topology_request::leave || *req == topology_request::remove)) {
|
|
drain_node(*req);
|
|
} else if (node.get_state() == locator::node::state::normal) {
|
|
if (node.is_excluded()) {
|
|
// Excluded nodes should not be chosen as targets for migration.
|
|
lblogger.debug("Ignoring excluded node {}: state={}", node.host_id(), node.get_state());
|
|
} else {
|
|
ensure_node(nodes, node.host_id());
|
|
}
|
|
}
|
|
});
|
|
|
|
// Apply skiplist only when not draining.
|
|
// It's unsafe to move tablets to non-skip nodes as this can lead to node overload.
|
|
if (nodes_to_drain.empty()) {
|
|
for (auto host_to_skip : _skiplist) {
|
|
if (auto handle = nodes.extract(host_to_skip)) {
|
|
auto& node = handle.mapped();
|
|
lblogger.debug("Ignoring dead node {}: state={}", node.id, node.node->get_state());
|
|
}
|
|
}
|
|
}
|
|
|
|
// Compute tablet load on nodes.
|
|
|
|
for (auto&& [table, tables] : _tm->tablets().all_table_groups()) {
|
|
const auto& tmap = _tm->tablets().get_tablet_map(table);
|
|
|
|
co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) -> future<> {
|
|
auto trinfo = tmap.get_tablet_transition_info(tid);
|
|
|
|
// Check if any replica is on a node which has left.
|
|
// When node is replaced we don't rebuild as part of topology request.
|
|
for (auto&& r : ti.replicas) {
|
|
auto* node = topo.find_node(r.host);
|
|
if (!node) {
|
|
on_internal_error(lblogger, format("Replica {} of tablet {} not found in topology",
|
|
r, global_tablet_id{table, tid}));
|
|
}
|
|
if (node->left() && node_filter(*node)) {
|
|
ensure_node(nodes, r.host);
|
|
nodes_to_drain.insert(r.host);
|
|
nodes[r.host].drained = true;
|
|
}
|
|
}
|
|
|
|
// We reflect migrations in the load as if they already happened,
|
|
// optimistically assuming that they will succeed.
|
|
for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) {
|
|
if (nodes.contains(replica.host)) {
|
|
nodes[replica.host].tablet_count += 1;
|
|
// This invariant is assumed later.
|
|
if (replica.shard >= nodes[replica.host].shard_count) {
|
|
auto gtid = global_tablet_id{table, tid};
|
|
on_internal_error(lblogger, format("Tablet {} replica {} targets non-existent shard", gtid, replica));
|
|
}
|
|
}
|
|
}
|
|
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
if (nodes.empty()) {
|
|
lblogger.debug("No nodes to balance.");
|
|
_current_stats->stop_balance++;
|
|
co_return plan;
|
|
}
|
|
|
|
// Detect finished drain.
|
|
|
|
for (auto i = nodes_to_drain.begin(); i != nodes_to_drain.end();) {
|
|
if (nodes[*i].tablet_count == 0) {
|
|
lblogger.info("Node {} is already drained, ignoring", *i);
|
|
nodes.erase(*i);
|
|
i = nodes_to_drain.erase(i);
|
|
} else {
|
|
++i;
|
|
}
|
|
}
|
|
|
|
_load_sketch = locator::load_sketch(_tm, _table_load_stats, _force_capacity_based_balancing ? _target_tablet_size : 0);
|
|
_load_sketch->set_minimal_tablet_size(_minimal_tablet_size);
|
|
_load_sketch->set_force_capacity_based_load(_force_capacity_based_balancing);
|
|
co_await _load_sketch->populate_dc(dc);
|
|
|
|
// If we don't have nodes to drain, remove nodes which don't have complete tablet sizes
|
|
if (nodes_to_drain.empty()) {
|
|
std::optional<host_id> incomplete_host;
|
|
size_t incomplete_count = 0;
|
|
|
|
for (auto nodes_i = nodes.begin(); nodes_i != nodes.end();) {
|
|
host_id host = nodes_i->first;
|
|
if (!_load_sketch->has_complete_data(host)) {
|
|
incomplete_host.emplace(host);
|
|
incomplete_count++;
|
|
nodes_i = nodes.erase(nodes_i);
|
|
} else {
|
|
++nodes_i;
|
|
}
|
|
}
|
|
|
|
if (incomplete_host) {
|
|
lblogger.info("Ignoring {} node(s) with incomplete tablet stats, e.g. {}", incomplete_count, *incomplete_host);
|
|
}
|
|
}
|
|
|
|
plan.set_has_nodes_to_drain(!nodes_to_drain.empty());
|
|
|
|
// Invariant: node.dusage || node.drained
|
|
for (auto& [host, node] : nodes) {
|
|
if (node.drained) {
|
|
continue;
|
|
}
|
|
if (!node.dusage) {
|
|
lblogger.info("Cannot balance because capacity of node {} (or more) is unknown", host);
|
|
co_return plan;
|
|
}
|
|
}
|
|
|
|
// For size based balancing, only excluded nodes are allowed to have incomplete tablet stats
|
|
for (auto& [host, node] : nodes) {
|
|
if (!_load_sketch->has_complete_data(host)) {
|
|
if (!_force_capacity_based_balancing && node.drained && node.node->is_excluded()) {
|
|
_load_sketch->ignore_incomplete_data(host);
|
|
} else {
|
|
lblogger.info("Cannot balance because node {} (or more) has incomplete tablet stats", host);
|
|
co_return plan;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check if we have destination nodes
|
|
const bool has_dest_nodes = std::ranges::any_of(std::views::values(nodes), [&] (const auto& load) {
|
|
return !load.drained;
|
|
});
|
|
if (!has_dest_nodes) {
|
|
for (auto host : nodes_to_drain) {
|
|
plan.add(drain_failure(host, format("No candidate nodes in {} to drain {}."
|
|
" Consider adding new nodes or reducing replication factor.", _location, host)));
|
|
}
|
|
lblogger.debug("No candidate nodes");
|
|
_current_stats->stop_no_candidates++;
|
|
co_return plan;
|
|
}
|
|
|
|
// We want to saturate the target node so we migrate several tablets in parallel, one for each shard
|
|
// on the target node. This assumes that the target node is well-balanced and that tablet migrations
|
|
// complete at the same time. Both assumptions are not generally true in practice, which we currently ignore.
|
|
// But they will be true typically, because we fill shards starting from least-loaded shards,
|
|
// so we naturally strive towards balance between shards.
|
|
//
|
|
// If target node is not balanced across shards, we will overload some shards. Streaming concurrency
|
|
// will suffer because more loaded shards will not participate, which will under-utilize the node.
|
|
// FIXME: To handle the above, we should rebalance the target node before migrating tablets from other nodes.
|
|
|
|
// Compute per-shard load and candidate tablets.
|
|
|
|
_tablet_count_per_table.clear();
|
|
_disk_used_per_table.clear();
|
|
|
|
for (auto&& [table, tables] : _tm->tablets().all_table_groups()) {
|
|
const auto& tmap = _tm->tablets().get_tablet_map(table);
|
|
uint64_t total_tablet_count = 0;
|
|
uint64_t total_tablet_sizes = 0;
|
|
|
|
auto get_replicas = [this] (std::optional<tablet_desc> t) -> tablet_replica_set {
|
|
return t ? sorted_replicas_for_tablet_load(*t->info, t->transition) : tablet_replica_set{};
|
|
};
|
|
auto migrating = [&] (std::optional<tablet_desc> t) {
|
|
return t && (bool(t->transition) || _scheduled_tablets.contains(global_tablet_id{table, t->tid}));
|
|
};
|
|
auto maybe_apply_load = [&] (std::optional<tablet_desc> t) {
|
|
if (t && is_streaming(t->transition)) {
|
|
apply_load(nodes, get_migration_streaming_info(topo, *t->info, *t->transition));
|
|
}
|
|
};
|
|
|
|
// If a table is undergoing merge, co-located replicas of sibling tablets will be treated as a single migration candidate,
|
|
// even though each tablet replica will be migrated independently. Next invocation of load balancer is able to exclude both
|
|
// sibling if either haven't finished migration yet. That's to prevent load balancer from incorrectly considering that
|
|
// they're not co-located if only one of them completed migration.
|
|
co_await tmap.for_each_sibling_tablets([&, table = table] (tablet_desc t1, std::optional<tablet_desc> t2) -> future<> {
|
|
maybe_apply_load(t1);
|
|
maybe_apply_load(t2);
|
|
|
|
auto t1_replicas = get_replicas(t1);
|
|
// If t2 is disengaged, when tablet_count == 1, t2_replicas is empty and so will have no effect
|
|
// when adding t1 replicas as candidates.
|
|
auto t2_replicas = get_replicas(t2);
|
|
|
|
sibling_tablets_replicas_processor processor(t1, t2, std::move(t1_replicas), std::move(t2_replicas));
|
|
|
|
auto get_table_desc = [&] (tablet_id tid) {
|
|
return tid == t1.tid ? t1 : t2;
|
|
};
|
|
|
|
while (auto next = processor.next_replica()) {
|
|
auto& [replica, tids] = *next;
|
|
if (!nodes.contains(replica.host)) {
|
|
continue;
|
|
}
|
|
utils::small_vector<uint64_t, 2> tablet_sizes;
|
|
uint64_t tablet_sizes_sum = 0;
|
|
for (auto tid : tids) {
|
|
if (_force_capacity_based_balancing) {
|
|
tablet_sizes_sum += _target_tablet_size;
|
|
tablet_sizes.push_back(_target_tablet_size);
|
|
} else {
|
|
uint64_t tablet_group_size = 0;
|
|
auto token_range = tmap.get_token_range(tid);
|
|
for (auto group_member : tables) {
|
|
const range_based_tablet_id rb_tid {group_member, token_range};
|
|
auto& member_tmap = _tm->tablets().get_tablet_map(group_member);
|
|
auto& ti = member_tmap.get_tablet_info(tid);
|
|
auto trinfo = member_tmap.get_tablet_transition_info(tid);
|
|
auto tablet_size_opt = get_tablet_size(replica.host, rb_tid, ti, trinfo);
|
|
const uint64_t tablet_size = std::max(tablet_size_opt.value_or(_target_tablet_size), _minimal_tablet_size);
|
|
tablet_group_size += tablet_size;
|
|
tablet_sizes_sum += tablet_size;
|
|
}
|
|
tablet_sizes.push_back(tablet_group_size);
|
|
}
|
|
}
|
|
auto& node_load_info = nodes[replica.host];
|
|
shard_load& shard_load_info = node_load_info.shards[replica.shard];
|
|
if (shard_load_info.tablet_count == 0) {
|
|
node_load_info.shards_by_load.push_back(replica.shard);
|
|
}
|
|
shard_load_info.tablet_count += tids.size();
|
|
if (shard_load_info.dusage) {
|
|
shard_load_info.dusage->used += tablet_sizes_sum;
|
|
}
|
|
shard_load_info.tablet_count_per_table[table] += tids.size();
|
|
shard_load_info.tablet_sizes_per_table[table] += tablet_sizes_sum;
|
|
node_load_info.tablet_count_per_table[table] += tids.size();
|
|
node_load_info.tablet_sizes_per_table[table] += tablet_sizes_sum;
|
|
if (node_load_info.dusage) {
|
|
node_load_info.dusage->used += tablet_sizes_sum;
|
|
}
|
|
total_tablet_count += tids.size();
|
|
total_tablet_sizes += tablet_sizes_sum;
|
|
if (tmap.needs_merge() && tids.size() == 2) {
|
|
// Exclude both sibling tablets if either haven't finished migration yet. That's to prevent balancer from
|
|
// un-doing the colocation.
|
|
if (!migrating(t1) && !migrating(t2)) {
|
|
auto candidate = colocated_tablets{global_tablet_id{table, t1.tid}, global_tablet_id{table, t2->tid}};
|
|
add_candidate(shard_load_info, migration_tablet_set{std::move(candidate), tablet_sizes_sum});
|
|
} else {
|
|
_migrating_candidates++;
|
|
}
|
|
} else {
|
|
if (tids.size() != tablet_sizes.size()) {
|
|
on_internal_error(lblogger, "Number of co-located tablets and their sizes don't match.");
|
|
}
|
|
for (size_t i = 0; i < tids.size(); i++) {
|
|
if (!migrating(get_table_desc(tids[i]))) { // migrating tablets are not candidates
|
|
add_candidate(shard_load_info, migration_tablet_set{global_tablet_id{table, tids[i]}, tablet_sizes[i]});
|
|
} else {
|
|
_migrating_candidates++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return make_ready_future<>();
|
|
});
|
|
_disk_used_per_table[table] = total_tablet_sizes;
|
|
_tablet_count_per_table[table] = total_tablet_count;
|
|
}
|
|
|
|
// Compute load imbalance.
|
|
|
|
_total_capacity_shards = 0;
|
|
_total_capacity_nodes = 0;
|
|
_total_capacity_storage = 0;
|
|
load_type max_load = 0;
|
|
load_type min_load = 0;
|
|
std::optional<host_id> min_load_node = std::nullopt;
|
|
for (auto&& [host, load] : nodes) {
|
|
load.update();
|
|
_stats.for_node(dc, host).load = load.avg_load;
|
|
|
|
if (!load.drained) {
|
|
if (!min_load_node || load.avg_load < min_load) {
|
|
min_load = load.avg_load;
|
|
min_load_node = host;
|
|
}
|
|
if (load.avg_load > max_load) {
|
|
max_load = load.avg_load;
|
|
}
|
|
_total_capacity_shards += load.shard_count;
|
|
_total_capacity_nodes++;
|
|
_total_capacity_storage += load.dusage->capacity;
|
|
}
|
|
}
|
|
|
|
print_node_stats(nodes, only_active::yes);
|
|
|
|
if (!nodes_to_drain.empty() || (_tm->tablets().balancing_enabled() && (shuffle || !is_balanced(min_load, max_load)))) {
|
|
host_id target = *min_load_node;
|
|
lblogger.info("target node: {}, avg_load: {}, max: {}", target, min_load, max_load);
|
|
plan.merge(co_await make_internode_plan(nodes, nodes_to_drain, target));
|
|
} else {
|
|
_current_stats->stop_balance++;
|
|
}
|
|
|
|
if (_tm->tablets().balancing_enabled()) {
|
|
plan.merge(co_await make_intranode_plan(nodes, nodes_to_drain));
|
|
}
|
|
|
|
if (_tm->tablets().balancing_enabled() && plan.empty() && !ongoing_rack_list_colocation()) {
|
|
auto dc_merge_plan = co_await make_merge_colocation_plan(nodes);
|
|
auto level = dc_merge_plan.tablet_migration_count() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
|
lblogger.log(level, "Prepared {} migrations for co-locating sibling tablets in {}", dc_merge_plan.tablet_migration_count(), _location);
|
|
plan.merge(std::move(dc_merge_plan));
|
|
}
|
|
|
|
co_await utils::clear_gently(nodes);
|
|
co_return std::move(plan);
|
|
}
|
|
};
|
|
|
|
class tablet_allocator_impl : public tablet_allocator::impl
|
|
, public service::migration_listener::empty_listener {
|
|
service::migration_notifier& _migration_notifier;
|
|
replica::database& _db;
|
|
load_balancer_stats_manager _load_balancer_stats;
|
|
scheduling_group _background;
|
|
bool _stopped = false;
|
|
bool _use_tablet_aware_balancing = true;
|
|
locator::load_stats_ptr _load_stats;
|
|
private:
|
|
load_balancer make_load_balancer(token_metadata_ptr tm,
|
|
service::topology* topology,
|
|
db::system_keyspace* sys_ks,
|
|
locator::load_stats_ptr table_load_stats,
|
|
std::unordered_set<host_id> skiplist) {
|
|
load_balancer lb(_db, tm, topology, sys_ks, std::move(table_load_stats), _load_balancer_stats,
|
|
_db.get_config().target_tablet_size_in_bytes(),
|
|
_db.get_config().tablets_per_shard_goal(),
|
|
std::move(skiplist));
|
|
lb.set_use_table_aware_balancing(_use_tablet_aware_balancing);
|
|
lb.set_initial_scale(_db.get_config().tablets_initial_scale_factor());
|
|
return lb;
|
|
}
|
|
public:
|
|
tablet_allocator_impl(tablet_allocator::config cfg, service::migration_notifier& mn, replica::database& db)
|
|
: _migration_notifier(mn)
|
|
, _db(db)
|
|
, _load_balancer_stats("load_balancer")
|
|
, _background(cfg.background_sg)
|
|
{
|
|
_migration_notifier.register_listener(this);
|
|
}
|
|
|
|
tablet_allocator_impl(tablet_allocator_impl&&) = delete; // "this" captured.
|
|
|
|
~tablet_allocator_impl() {
|
|
SCYLLA_ASSERT(_stopped);
|
|
}
|
|
|
|
future<> stop() {
|
|
co_await _migration_notifier.unregister_listener(this);
|
|
_stopped = true;
|
|
}
|
|
|
|
future<migration_plan> balance_tablets(token_metadata_ptr tm, service::topology* topology, db::system_keyspace* sys_ks, locator::load_stats_ptr table_load_stats, std::unordered_set<host_id> skiplist) {
|
|
auto lb = make_load_balancer(tm, topology, sys_ks, table_load_stats ? table_load_stats : _load_stats, std::move(skiplist));
|
|
co_await coroutine::switch_to(_background);
|
|
co_return co_await lb.make_plan();
|
|
}
|
|
|
|
void set_load_stats(locator::load_stats_ptr load_stats) {
|
|
_load_stats = std::move(load_stats);
|
|
}
|
|
|
|
locator::load_stats_ptr get_load_stats() {
|
|
return _load_stats;
|
|
}
|
|
|
|
void set_use_tablet_aware_balancing(bool use_tablet_aware_balancing) {
|
|
_use_tablet_aware_balancing = use_tablet_aware_balancing;
|
|
}
|
|
|
|
// Allocates new tablets for a table which is not co-located with another table.
|
|
tablet_map allocate_tablets_for_new_base_table(const tablet_aware_replication_strategy* tablet_rs, const schema& s) {
|
|
auto tm = _db.get_shared_token_metadata().get();
|
|
auto lb = make_load_balancer(tm, nullptr, nullptr, nullptr, {});
|
|
auto plan = lb.make_sizing_plan(s.shared_from_this(), tablet_rs).get();
|
|
auto& table_plan = plan.tables[s.id()];
|
|
if (table_plan.target_tablet_count_aligned != table_plan.target_tablet_count) {
|
|
lblogger.info("Rounding up tablet count from {} to {} for table {}.{}", table_plan.target_tablet_count,
|
|
table_plan.target_tablet_count_aligned, s.ks_name(), s.cf_name());
|
|
}
|
|
auto tablet_count = table_plan.target_tablet_count_aligned;
|
|
auto map = tablet_rs->allocate_tablets_for_new_table(s.shared_from_this(), tm, tablet_count).get();
|
|
return map;
|
|
}
|
|
|
|
// Allocate tablets for multiple new tables, which may be co-located with each other, or co-located with an existing base table.
|
|
void allocate_tablets_for_new_tables(const keyspace_metadata& ksm, const std::vector<schema_ptr>& cfms, utils::chunked_vector<mutation>& muts, api::timestamp_type ts) {
|
|
utils::get_local_injector().inject("pause_in_allocate_tablets_for_new_table", utils::wait_for_message(std::chrono::minutes(5))).get();
|
|
locator::replication_strategy_params params(ksm.strategy_options(), ksm.initial_tablets(), ksm.consistency_option());
|
|
auto tm = _db.get_shared_token_metadata().get();
|
|
auto rs = abstract_replication_strategy::create_replication_strategy(ksm.strategy_name(), params, tm->get_topology());
|
|
if (auto&& tablet_rs = rs->maybe_as_tablet_aware()) {
|
|
std::unordered_map<table_id, schema_ptr> new_cfms_map;
|
|
for (auto s : cfms) {
|
|
new_cfms_map[s->id()] = s;
|
|
}
|
|
|
|
// Group the new tables by co-location groups.
|
|
// The key is the base table id, which may be a new table or an existing table.
|
|
const bool colocated_tablets_enabled = _db.features().colocated_tablets;
|
|
std::unordered_map<table_id, std::vector<schema_ptr>> table_groups;
|
|
std::unordered_set<table_id> colocated_tables;
|
|
for (auto s : cfms) {
|
|
std::optional<table_id> base_id;
|
|
if (colocated_tablets_enabled) {
|
|
base_id = _db.get_base_table_for_tablet_colocation(*s, new_cfms_map);
|
|
if (base_id) {
|
|
if (colocated_tables.contains(*base_id) || tm->tablets().get_base_table(*base_id) != *base_id) {
|
|
on_internal_error(lblogger,
|
|
format("Trying to set co-located table {} with base table {} but it's not a base table.",
|
|
s->id(), *base_id));
|
|
}
|
|
colocated_tables.insert(s->id());
|
|
}
|
|
}
|
|
table_groups[base_id.value_or(s->id())].push_back(s);
|
|
}
|
|
|
|
// allocate tablets for each co-location group.
|
|
// if the base is a new table, allocate new tablets for it.
|
|
// for the other tables in the group, create a co-located tablet map.
|
|
for (const auto& [base_id, group_schemas] : table_groups) {
|
|
|
|
auto create_colocated_tablet_maps = [&] (const tablet_map& base_map) {
|
|
for (auto sp : group_schemas) {
|
|
const auto& s = *sp;
|
|
if (s.id() != base_id) {
|
|
lblogger.debug("Creating tablets for {}.{} id={} with base={}", s.ks_name(), s.cf_name(), s.id(), base_id);
|
|
muts.emplace_back(colocated_tablet_map_to_mutation(s.id(), s.ks_name(), s.cf_name(), base_id, ts));
|
|
_db.get_notifier().before_allocate_tablet_map_in_notification(base_map, s, muts, ts);
|
|
}
|
|
}
|
|
};
|
|
|
|
if (tm->tablets().has_tablet_map(base_id)) {
|
|
const auto& base_map = tm->tablets().get_tablet_map(base_id);
|
|
create_colocated_tablet_maps(base_map);
|
|
} else {
|
|
const auto& s = *new_cfms_map[base_id];
|
|
lblogger.debug("Creating tablets for {}.{} id={}", s.ks_name(), s.cf_name(), s.id());
|
|
auto base_map = allocate_tablets_for_new_base_table(tablet_rs, s);
|
|
tablet_map_to_mutations(base_map, s.id(), s.ks_name(), s.cf_name(), ts, _db.features(), [&] (mutation m) {
|
|
muts.emplace_back(std::move(m));
|
|
return make_ready_future<>();
|
|
}).get();
|
|
_db.get_notifier().before_allocate_tablet_map_in_notification(base_map, s, muts, ts);
|
|
|
|
create_colocated_tablet_maps(base_map);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void on_before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>& cfms, utils::chunked_vector<mutation>& muts, api::timestamp_type ts) override {
|
|
allocate_tablets_for_new_tables(ksm, cfms, muts, ts);
|
|
}
|
|
|
|
void on_before_create_column_family(const keyspace_metadata& ksm, const schema& s, utils::chunked_vector<mutation>& muts, api::timestamp_type ts) override {
|
|
allocate_tablets_for_new_tables(ksm, {s.shared_from_this()}, muts, ts);
|
|
}
|
|
|
|
void on_before_drop_column_family(const schema& s, utils::chunked_vector<mutation>& muts, api::timestamp_type ts) override {
|
|
keyspace& ks = _db.find_keyspace(s.ks_name());
|
|
auto&& rs = ks.get_replication_strategy();
|
|
if (rs.uses_tablets()) {
|
|
auto tm = _db.get_shared_token_metadata().get();
|
|
lblogger.debug("Dropping tablets for {}.{} id={}", s.ks_name(), s.cf_name(), s.id());
|
|
muts.emplace_back(make_drop_tablet_map_mutation(s.id(), ts));
|
|
}
|
|
}
|
|
|
|
void on_before_drop_keyspace(const sstring& keyspace_name, utils::chunked_vector<mutation>& muts, api::timestamp_type ts) override {
|
|
keyspace& ks = _db.find_keyspace(keyspace_name);
|
|
auto&& rs = ks.get_replication_strategy();
|
|
if (rs.uses_tablets()) {
|
|
lblogger.debug("Dropping tablets for keyspace {}", keyspace_name);
|
|
auto tm = _db.get_shared_token_metadata().get();
|
|
for (auto&& [name, s] : ks.metadata()->cf_meta_data()) {
|
|
muts.emplace_back(make_drop_tablet_map_mutation(s->id(), ts));
|
|
}
|
|
}
|
|
}
|
|
|
|
void on_leadership_lost() {
|
|
_load_balancer_stats.unregister();
|
|
_load_stats = {};
|
|
}
|
|
|
|
load_balancer_stats_manager& stats() {
|
|
return _load_balancer_stats;
|
|
}
|
|
private:
|
|
// The splitting of tablets today is completely based on the power-of-two constraint.
|
|
// A tablet of id X is split into 2 new tablets, which new ids are (x << 1) and
|
|
// (x << 1) + 1.
|
|
// So a tablet of id 0 is remapped into ids 0 and 1. Another of id 1 is remapped
|
|
// into ids 2 and 3, and so on.
|
|
future<tablet_map> split_tablets(token_metadata_ptr tm, table_id table) {
|
|
auto& tablets = tm->tablets().get_tablet_map(table);
|
|
|
|
tablet_map new_tablets(tablets.tablet_count() * 2);
|
|
|
|
for (tablet_id tid : tablets.tablet_ids()) {
|
|
co_await coroutine::maybe_yield();
|
|
|
|
tablet_id new_left_tid = tablet_id(tid.value() << 1);
|
|
tablet_id new_right_tid = tablet_id(new_left_tid.value() + 1);
|
|
|
|
auto& tablet_info = tablets.get_tablet_info(tid);
|
|
|
|
new_tablets.set_tablet(new_left_tid, tablet_info);
|
|
new_tablets.set_tablet(new_right_tid, tablet_info);
|
|
}
|
|
|
|
lblogger.info("Split tablets for table {}, increasing tablet count from {} to {}",
|
|
table, tablets.tablet_count(), new_tablets.tablet_count());
|
|
co_return std::move(new_tablets);
|
|
}
|
|
|
|
// The merging of tablet is completely based on the power-of-two constraint.
|
|
// Tablet of ids X and X+1 are merged into new tablet id (X >> 1).
|
|
future<tablet_map> merge_tablets(token_metadata_ptr tm, table_id table) {
|
|
auto& tablets = tm->tablets().get_tablet_map(table);
|
|
|
|
tablet_map new_tablets(tablets.tablet_count() / 2);
|
|
|
|
for (tablet_id tid : new_tablets.tablet_ids()) {
|
|
co_await coroutine::maybe_yield();
|
|
|
|
tablet_id old_left_tid = tablet_id(tid.value() << 1);
|
|
tablet_id old_right_tid = tablet_id(old_left_tid.value() + 1);
|
|
|
|
auto& left_tablet_info = tablets.get_tablet_info(old_left_tid);
|
|
auto& right_tablet_info = tablets.get_tablet_info(old_right_tid);
|
|
|
|
auto sorted = [] (tablet_replica_set set) {
|
|
std::ranges::sort(set, std::less<tablet_replica>());
|
|
return set;
|
|
};
|
|
auto left_tablet_replicas = sorted(left_tablet_info.replicas);
|
|
auto right_tablet_replicas = sorted(right_tablet_info.replicas);
|
|
if (left_tablet_replicas != right_tablet_replicas) {
|
|
throw std::runtime_error(format("Sibling tablets {} (r: {}) and {} (r: {}) are not colocated.",
|
|
old_left_tid, left_tablet_replicas, old_right_tid, right_tablet_replicas));
|
|
}
|
|
auto merged_tablet_info = locator::merge_tablet_info(left_tablet_info, right_tablet_info);
|
|
if (!merged_tablet_info) {
|
|
throw std::runtime_error(format("Unable to merge tablet info of sibling tablets {} (r: {}) and {} (r: {}).",
|
|
old_left_tid, left_tablet_replicas, old_right_tid, right_tablet_replicas));
|
|
}
|
|
lblogger.debug("Got merged_tablet_info with sstables_repaired_at={}", merged_tablet_info->sstables_repaired_at);
|
|
|
|
new_tablets.set_tablet(tid, *merged_tablet_info);
|
|
}
|
|
|
|
lblogger.info("Merge tablets for table {}, decreasing tablet count from {} to {}",
|
|
table, tablets.tablet_count(), new_tablets.tablet_count());
|
|
co_return std::move(new_tablets);
|
|
}
|
|
public:
|
|
future<tablet_map> resize_tablets(token_metadata_ptr tm, table_id table) {
|
|
auto& tmap = tm->tablets().get_tablet_map(table);
|
|
if (tmap.needs_split()) {
|
|
return split_tablets(std::move(tm), table);
|
|
} else if (tmap.needs_merge()) {
|
|
return merge_tablets(std::move(tm), table);
|
|
}
|
|
throw std::logic_error(format("Table {} cannot be resized", table));
|
|
}
|
|
|
|
// FIXME: Handle materialized views.
|
|
};
|
|
|
|
future<std::unordered_set<locator::global_tablet_id>> migration_plan::get_migration_tablet_ids() const {
|
|
std::unordered_set<locator::global_tablet_id> tablets;
|
|
for (auto& m : _migrations) {
|
|
co_await coroutine::maybe_yield();
|
|
tablets.insert(m.tablet);
|
|
}
|
|
for (auto& gid : _repair_plan._repairs) {
|
|
co_await coroutine::maybe_yield();
|
|
tablets.insert(gid);
|
|
}
|
|
co_return tablets;
|
|
}
|
|
|
|
tablet_allocator::tablet_allocator(config cfg, service::migration_notifier& mn, replica::database& db)
|
|
: _impl(std::make_unique<tablet_allocator_impl>(std::move(cfg), mn, db)) {
|
|
}
|
|
|
|
future<> tablet_allocator::stop() {
|
|
return impl().stop();
|
|
}
|
|
|
|
future<migration_plan> tablet_allocator::balance_tablets(locator::token_metadata_ptr tm, service::topology* topology, db::system_keyspace* sys_ks, locator::load_stats_ptr load_stats, std::unordered_set<host_id> skiplist) {
|
|
return impl().balance_tablets(std::move(tm), topology, sys_ks, std::move(load_stats), std::move(skiplist));
|
|
}
|
|
|
|
void tablet_allocator::set_load_stats(locator::load_stats_ptr load_stats) {
|
|
impl().set_load_stats(std::move(load_stats));
|
|
}
|
|
|
|
locator::load_stats_ptr tablet_allocator::get_load_stats() {
|
|
return impl().get_load_stats();
|
|
}
|
|
|
|
void tablet_allocator::set_use_table_aware_balancing(bool use_tablet_aware_balancing) {
|
|
impl().set_use_tablet_aware_balancing(use_tablet_aware_balancing);
|
|
}
|
|
|
|
future<locator::tablet_map> tablet_allocator::resize_tablets(locator::token_metadata_ptr tm, table_id table) {
|
|
return impl().resize_tablets(std::move(tm), table);
|
|
}
|
|
|
|
tablet_allocator_impl& tablet_allocator::impl() {
|
|
return static_cast<tablet_allocator_impl&>(*_impl);
|
|
}
|
|
|
|
void tablet_allocator::on_leadership_lost() {
|
|
impl().on_leadership_lost();
|
|
}
|
|
|
|
load_balancer_stats_manager& tablet_allocator::stats() {
|
|
return impl().stats();
|
|
}
|
|
|
|
}
|
|
|
|
auto fmt::formatter<service::tablet_migration_info>::format(const service::tablet_migration_info& mig, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
return fmt::format_to(ctx.out(), "{{tablet: {}, src: {}, dst: {}}}", mig.tablet, mig.src, mig.dst);
|
|
}
|