tablets: load_balancer: Recognize that tablets are confined to racks when computing desired tablet count

The old logic assumes that replicas are spread across whole DC when
determining how many tablets we need to have at least 10 tablets per
shard. If replicas are actually confined to a subset of racks, that
will come up with a too high count and overshoot actual per-shard
count in this rack.

Similar problem happens for scaling-down of tablet count, when we try
to keep per-shard tablet count below the goal. It should be tracked
per-rack rather than per-DC, since racks can differ in how loaded they
are by RF if it's a rack-list.
This commit is contained in:
Tomasz Grabiec
2025-10-01 16:01:35 +02:00
parent 6962464be7
commit 9ebdeb261f
2 changed files with 183 additions and 32 deletions

View File

@@ -1228,6 +1228,7 @@ public:
tablet_count_and_reason tablet_count_from_min_per_shard_tablet_count(const schema& s,
const std::unordered_map<sstring, unsigned>& shards_per_dc,
const std::unordered_map<endpoint_dc_rack, unsigned>& shards_per_rack,
const tablet_aware_replication_strategy& rs,
double min_per_shard_tablet_count)
{
@@ -1236,18 +1237,34 @@ public:
size_t tablet_count = 0;
const sstring* winning_dc = nullptr;
sstring winning_rack;
for (auto&& [dc, shards_in_dc] : shards_per_dc) {
auto rf_in_dc = rs.get_replication_factor(dc);
auto rf_in_dc = rs.get_replication_factor_data(dc);
if (!rf_in_dc) {
continue;
}
size_t tablets_in_dc = std::ceil((double)(min_per_shard_tablet_count * shards_in_dc) / rf_in_dc);
lblogger.debug("Estimated {} tablets due to min_per_shard_tablet_count={:.3f} for table={}.{} in DC {}", tablets_in_dc,
min_per_shard_tablet_count, s.ks_name(), s.cf_name(), dc);
if (tablets_in_dc > tablet_count) {
tablet_count = tablets_in_dc;
winning_dc = &dc;
if (rf_in_dc->is_numeric()) {
size_t tablets_in_dc = std::ceil((double) (min_per_shard_tablet_count * shards_in_dc) / rf_in_dc->count());
lblogger.debug("Estimated {} tablets due to min_per_shard_tablet_count={:.3f} for table={}.{} in DC {} ({} shards)",
tablets_in_dc, min_per_shard_tablet_count, s.ks_name(), s.cf_name(), dc, shards_in_dc);
if (tablets_in_dc > tablet_count) {
tablet_count = tablets_in_dc;
winning_dc = &dc;
winning_rack = sstring();
}
} else {
for (auto rack : rf_in_dc->get_rack_list()) {
auto shards = shards_per_rack.at(endpoint_dc_rack{dc, rack});
size_t tablets_in_rack = std::ceil(min_per_shard_tablet_count * shards);
lblogger.debug("Estimated {} tablets due to min_per_shard_tablet_count={:.3f} for table={}.{} in rack {} ({} shards) in DC {}",
tablets_in_rack, min_per_shard_tablet_count, s.ks_name(), s.cf_name(), rack, shards, dc);
if (tablets_in_rack > tablet_count) {
tablet_count = tablets_in_rack;
winning_dc = &dc;
winning_rack = rack;
}
}
}
}
@@ -1255,6 +1272,10 @@ public:
return {};
}
if (!winning_rack.empty()) {
return {tablet_count, format("min_per_shard_tablet_count={:.3f} in DC {} rack {}", min_per_shard_tablet_count, *winning_dc, winning_rack)};
}
return {tablet_count, format("min_per_shard_tablet_count={:.3f} in DC {}", min_per_shard_tablet_count, *winning_dc)};
}
@@ -1263,9 +1284,13 @@ public:
sizing_plan plan;
std::unordered_map<sstring, unsigned> shards_per_dc;
std::unordered_map<endpoint_dc_rack, unsigned> shards_per_rack;
std::unordered_map<sstring, std::unordered_set<sstring>> racks_per_dc;
_tm->for_each_token_owner([&] (const node& n) {
if (n.is_normal()) {
shards_per_dc[n.dc_rack().dc] += n.get_shard_count();
shards_per_rack[n.dc_rack()] += n.get_shard_count();
racks_per_dc[n.dc_rack().dc].insert(n.dc_rack().rack);
}
});
@@ -1304,7 +1329,7 @@ public:
// compatibility with the deprecated "initial" tablet count.
(rs->get_initial_tablets() || tablet_options.min_tablet_count) ? 0 : _initial_scale);
if (min_per_shard_tablet_count) {
maybe_apply(tablet_count_from_min_per_shard_tablet_count(*s, shards_per_dc, *rs, min_per_shard_tablet_count));
maybe_apply(tablet_count_from_min_per_shard_tablet_count(*s, shards_per_dc, shards_per_rack, *rs, min_per_shard_tablet_count));
}
auto total_size_opt = std::invoke([&] -> std::optional<size_t> {
@@ -1386,17 +1411,18 @@ public:
// Below section ensures we respect the _tablets_per_shard_goal.
//
// It will scale down target_tablet_count for all tables so that
// the average number of tablets per shard in each DC does not exceed _tablets_per_shard_goal.
// the average number of tablets per shard in each DC or rack does not exceed _tablets_per_shard_goal.
//
// The impact of table's tablet count on average per-shard tablet replica count
// is different in each DC because replication factors are different in each DC.
// is different in each rack because replication factors are different in each DC/rack.
// Numerical RF impacts all racks in a DC. Rack-list RF impacts particular racks.
//
// The algorithm works like this:
// Compute average tablet replica count per-shard in each DC,
// determine if per-shard goal is exceeded in that DC,
// Compute average tablet replica count per-shard in each rack,
// determine if per-shard goal is exceeded in that rack,
// compute scale factor by which tablet count should be multiplied so that the goal
// is not exceeded in that DC.
// Take the smallest scale factor among all DCs, which ensures that no DC is overloaded.
// is not exceeded in that rack.
// Take the smallest scale factor among all racks, which ensures that no rack is overloaded.
//
// We align tablet counts to the nearest power of 2 post-scaling, which
// means that scaling may not be effective and in the worst case we may overshoot the goal by
@@ -1406,51 +1432,68 @@ public:
// we have a problem of making sure that the choice is stable across scheduler invocations to avoid
// oscillations of decisions.
std::unordered_map<table_id, double> table_scaling;
struct scale_info {
double factor;
endpoint_dc_rack source;
};
std::unordered_map<table_id, scale_info> table_scaling;
for (auto&& [dc, shard_count] : shards_per_dc) {
for (auto&& [rack, shard_count] : shards_per_rack) {
double cur_avg_tablets_per_shard = 0;
double new_avg_tablets_per_shard = 0;
for (auto&& [table, table_plan] : plan.tables) {
auto* rs = rs_by_table[table];
auto rf = rs->get_replication_factor(dc);
auto get_avg_tablets_per_shard = [&] (size_t tablet_count) {
return double(tablet_count) * rf / shard_count;
auto rf = rs->get_replication_factor_data(rack.dc);
auto get_avg_tablets_per_shard = [&] (size_t tablet_count) -> double {
if (!rf) {
return 0;
}
if (rf->is_numeric()) {
auto racks_in_dc = racks_per_dc.at(rack.dc).size();
return double(tablet_count) * rf->count() / shard_count / racks_in_dc;
}
if (std::ranges::contains(rf->get_rack_list(), rack.rack)) {
return double(tablet_count) / shard_count;
}
return 0;
};
auto cur_tablets_per_shard = get_avg_tablets_per_shard(table_plan.current_tablet_count);
cur_avg_tablets_per_shard += cur_tablets_per_shard;
lblogger.debug("cur_avg_tablets_per_shard [dc={}, table={}]: {:.3f}", dc, table, cur_tablets_per_shard);
lblogger.debug("cur_avg_tablets_per_shard [dc={}, rack={}, table={}]: {:.3f}", rack.dc, rack.rack, table, cur_tablets_per_shard);
auto new_tablets_per_shard = get_avg_tablets_per_shard(table_plan.target_tablet_count);
new_avg_tablets_per_shard += new_tablets_per_shard;
lblogger.debug("new_avg_tablets_per_shard [dc={}, table={}]: {:.3f}", dc, table, new_tablets_per_shard);
lblogger.debug("new_avg_tablets_per_shard [dc={}, rack={}, table={}]: {:.3f}", rack.dc, rack.rack, table, new_tablets_per_shard);
}
{
bool overloaded = cur_avg_tablets_per_shard > _tablets_per_shard_goal;
lblogger.debug("cur_avg_tablets_per_shard[dc={}]: {:.3f}{}", dc, cur_avg_tablets_per_shard,
lblogger.debug("cur_avg_tablets_per_shard[dc={},rack={}]: {:.3f}{}", rack.dc, rack.rack, cur_avg_tablets_per_shard,
overloaded ? " (overloaded!)" : "");
}
bool overloaded = new_avg_tablets_per_shard > _tablets_per_shard_goal;
lblogger.debug("new_avg_tablets_per_shard[dc={}]: {:.3f}{}", dc, new_avg_tablets_per_shard,
lblogger.debug("new_avg_tablets_per_shard[dc={},rack={}]: {:.3f}{}", rack.dc, rack.rack, new_avg_tablets_per_shard,
overloaded ? " (overloaded!)" : "");
if (overloaded) {
auto scale = _tablets_per_shard_goal / new_avg_tablets_per_shard;
auto scale = scale_info{_tablets_per_shard_goal / new_avg_tablets_per_shard, rack};
for (auto&& [table, table_plan]: plan.tables) {
auto* rs = rs_by_table[table];
auto rf = rs->get_replication_factor(dc);
auto rf = rs->get_replication_factor_data(rack.dc);
// If table has no replicas in this DC, scaling it won't help and is harmful to its distribution
// in other DCs.
if (rf) {
// If table has no replicas in this rack, scaling it won't help and is harmful to its distribution
// in other DCs or racks.
if (rf && (rf->is_numeric() || std::ranges::contains(rf->get_rack_list(), rack.rack))) {
auto [i, inserted] = table_scaling.try_emplace(table, scale);
if (!inserted) {
i->second = std::min(i->second, scale);
if (scale.factor < i->second.factor) {
i->second = std::move(scale);
}
}
}
}
@@ -1459,10 +1502,12 @@ public:
for (auto&& [table, scale] : table_scaling) {
auto& table_plan = plan.tables[table];
auto new_count = std::max<size_t>(1, table_plan.target_tablet_count * scale);
lblogger.debug("Scaling down table {} by a factor of {:.3f}: {} => {}", table, scale, table_plan.target_tablet_count, new_count);
auto new_count = std::max<size_t>(1, table_plan.target_tablet_count * scale.factor);
lblogger.debug("Scaling down table {} by a factor of {:.3f} due to {}.{}: {} => {}", table, scale.factor,
scale.source.dc, scale.source.rack, table_plan.target_tablet_count, new_count);
table_plan.target_tablet_count = new_count;
table_plan.target_tablet_count_reason = format("{} scaled by {:.3f}", table_plan.target_tablet_count_reason, scale);
table_plan.target_tablet_count_reason = format("{} scaled by {:.3f} due to {}.{}", table_plan.target_tablet_count_reason,
scale.factor, scale.source.dc, scale.source.rack);
}
// Generate:

View File

@@ -2102,6 +2102,112 @@ SEASTAR_THREAD_TEST_CASE(test_replica_allocation_with_rack_list_rf) {
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_per_shard_count_respected_with_rack_list) {
cql_test_config cfg{};
cfg.db_config->tablets_initial_scale_factor.set(10);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
std::set<host_id> bad_nodes; // No replicas should be allocated there
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto rack3 = topo.start_new_rack();
auto dc = topo.dc();
auto host1 = topo.add_node(node_state::normal, 1, rack1);
topo.add_node(node_state::normal, 1, rack2);
topo.add_node(node_state::normal, 1, rack3);
topo.add_node(node_state::normal, 1, rack3);
auto ks_name = add_keyspace_racks(e, {{dc, {rack1.rack}}});
auto table = add_table(e, ks_name).get();
rebalance_tablets(e);
auto& stm = e.shared_token_metadata().local();
auto tmptr = stm.get();
auto& tm_topo = tmptr->get_topology();
// Check that we respect the 10 tablets/shard goal when using a subset of racks.
{
load_sketch load(tmptr);
load.populate_dc(dc).get();
auto l = load.get_shard_minmax(host1);
BOOST_REQUIRE_EQUAL(l.min(), 16);
BOOST_REQUIRE_EQUAL(l.max(), 16);
}
check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table), dc, rack_list{rack1.rack}, bad_nodes);
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_shrinks_respecting_rack_allocation) {
cql_test_config cfg{};
cfg.db_config->tablets_per_shard_goal.set(10);
cfg.db_config->tablets_initial_scale_factor.set(8);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
std::set<host_id> bad_nodes; // No replicas should be allocated there
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto rack3 = topo.start_new_rack();
auto dc = topo.dc();
auto host1 = topo.add_node(node_state::normal, 1, rack1);
auto host2 = topo.add_node(node_state::normal, 1, rack2);
auto host3 = topo.add_node(node_state::normal, 1, rack3);
auto& stats = topo.get_shared_load_stats();
auto ks1 = add_keyspace_racks(e, {{dc, {rack1.rack}}});
// We start with 8 tablets per table. per_shard_goal / 5 = 2
// Should shrink to 2 tablets per table.
auto t1_1 = add_table(e, ks1).get();
auto t1_2 = add_table(e, ks1).get();
auto t1_3 = add_table(e, ks1).get();
auto t1_4 = add_table(e, ks1).get();
auto t1_5 = add_table(e, ks1).get();
// This table doesn't violate the per shard goal in this rack, should not be shrunk.
auto ks2 = add_keyspace_racks(e, {{dc, {rack2.rack}}});
auto t2_1 = add_table(e, ks2).get();
// Those tables violate the goal, but due to rounding up, the count won't change.
auto ks3 = add_keyspace_racks(e, {{dc, {rack3.rack}}});
auto t3_1 = add_table(e, ks3).get();
auto t3_2 = add_table(e, ks3).get();
stats.set_size(t1_1, 0);
stats.set_size(t1_2, 0);
stats.set_size(t1_3, 0);
stats.set_size(t1_4, 0);
stats.set_size(t1_5, 0);
stats.set_size(t2_1, 0);
stats.set_size(t3_1, 0);
stats.set_size(t3_2, 0);
rebalance_tablets(e, &stats);
auto& stm = e.shared_token_metadata().local();
auto tmptr = stm.get();
auto& tm_topo = tmptr->get_topology();
auto& tmeta = stm.get()->tablets();
BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_1).tablet_count());
BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_2).tablet_count());
BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_3).tablet_count());
BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_4).tablet_count());
BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_5).tablet_count());
BOOST_REQUIRE_EQUAL(8, tmeta.get_tablet_map(t2_1).tablet_count());
BOOST_REQUIRE_EQUAL(8, tmeta.get_tablet_map(t3_1).tablet_count());
BOOST_REQUIRE_EQUAL(8, tmeta.get_tablet_map(t3_2).tablet_count());
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_merge_does_not_overload_racks) {
cql_test_config cfg{};
// This test relies on the fact that we use an RF strictly smaller than the number of racks.