Files
scylladb/test/boost/tablets_test.cc
Tomasz Grabiec 1256a9faa7 tablets: Fix deadlock in background storage group merge fiber
When it deadlocks, groups stop merging and compaction group merge
backlog will run-away.

Also, graceful shutdown will be blocked on it.

Found by flaky unit test
test_merge_chooses_best_replica_with_odd_count, which timed-out in 1
in 100 runs.

Reason for deadlock:

When storage groups are merged, the main compaction group of the new
storage group takes a compaction lock, which is appended to
_compaction_reenablers_for_merging, and released when the merge
completion fiber is done with the whole batch.

If we accumulate more than 1 merge cycle for the fiber, deadlock
occurs. Lock order will be this

Initial state:

 cg0: main
 cg1: main
 cg2: main
 cg3: main

After 1st merge:

 cg0': main [locked], merging_groups=[cg0.main, cg1.main]
 cg1': main [locked], merging_groups=[cg2.main, cg3.main]

After 2nd merge:

 cg0'': main [locked], merging_groups=[cg0'.main [locked], cg0.main, cg1.main, cg1'.main [locked], cg2.main, cg3.main]

merge completion fiber will try to stop cg0'.main, which will be
blocked on compaction lock. which is held by the reenabler in
_compaction_reenablers_for_merging, hence deadlock.

The fix is to wait for background merge to finish before we start the
next merge. It's achieved by holding old erm in the background merge,
and doing a topology barrier from the merge finalizing transition.

Background merge is supposed to be a relatively quick operation, it's
stopping compaction groups. So may wait for active requests. It
shouldn't prolong the barrier indefinitely.

Tablet boost unit tests which trigger merge need to be adjusted to
call the barrier, otherwise they will be vulnerable to the deadlock.

Two cluster tests were removed because they assumed that merge happens
in the backgournd. Now that it happens as part of merge finalization,
and blocks topology state machine, those tests deadlock because they
are unable to make topology changes (node bootstrap) while background
merge is blocked.

The test "test_tablets_merge_waits_for_lwt" needed to be adjusted. It
assumed that merge finalization doesn't wait for the erm held by the
LWT operation, and triggered tablet movement afterwards, and assumed
that this migration will issue a barrier which will block on the LWT
operation. After this commit, it's the barrier in merge finalization
which is blocked. The test was adjusted to use an earlier log mark
when waiting for "Got raft_topology_cmd::barrier_and_drain", which
will catch the barrier in merge finalization.

Fixes SCYLLADB-928
2026-03-12 22:45:01 +01:00

6347 lines
257 KiB
C++

/*
* Copyright (C) 2023-present-2020 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "utils/UUID.hh"
#include <boost/test/tools/old/interface.hpp>
#include <seastar/core/shard_id.hh>
#include <seastar/coroutine/as_future.hh>
#include <source_location>
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "test/lib/random_utils.hh"
#include "service/topology_mutation.hh"
#include "service/storage_service.hh"
#include <fmt/ranges.h>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/testing/on_internal_error.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/log.hh"
#include "test/lib/simple_schema.hh"
#include "test/lib/key_utils.hh"
#include "test/lib/test_utils.hh"
#include "test/lib/topology_builder.hh"
#include "db/config.hh"
#include "cql3/util.hh"
#include "db/schema_tables.hh"
#include "schema/schema_builder.hh"
#include "replica/tablets.hh"
#include "replica/tablet_mutation_builder.hh"
#include "locator/tablets.hh"
#include "service/tablet_allocator.hh"
#include "locator/tablet_replication_strategy.hh"
#include "locator/tablet_sharder.hh"
#include "locator/load_sketch.hh"
#include "locator/snitch_base.hh"
#include "utils/UUID_gen.hh"
#include "utils/error_injection.hh"
#include "utils/to_string.hh"
#include "service/topology_coordinator.hh"
#include "service/topology_state_machine.hh"
#include "service/migration_manager.hh"
#include <boost/regex.hpp>
#include <atomic>
BOOST_AUTO_TEST_SUITE(tablets_test)
using namespace locator;
using namespace replica;
using namespace service;
static inline
future<mutation> tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& keyspace_name, const sstring& table_name,
api::timestamp_type ts, const gms::feature_service& features) {
std::optional<mutation> ret;
co_await tablet_map_to_mutations(tablets, id, keyspace_name, table_name, ts, features, [&] (mutation m) {
SCYLLA_ASSERT(!ret.has_value());
ret = std::move(m);
return make_ready_future();
});
SCYLLA_ASSERT(ret.has_value());
co_return std::move(*ret);
}
static api::timestamp_type current_timestamp(cql_test_env& e) {
// Mutations in system.tablets got there via group0, so in order for new
// mutations to take effect, their timestamp should be "later" than that
return utils::UUID_gen::micros_timestamp(e.get_system_keyspace().local().get_last_group0_state_id().get()) + 1;
}
static
void verify_tablet_metadata_persistence(cql_test_env& env, const tablet_metadata& tm, api::timestamp_type& ts) {
save_tablet_metadata(env.local_db(), tm, ts++).get();
auto tm2 = read_tablet_metadata(env.local_qp()).get();
BOOST_REQUIRE_EQUAL(tm, tm2);
}
static
void verify_tablet_metadata_update(cql_test_env& env, tablet_metadata& tm, utils::chunked_vector<mutation> muts) {
testlog.trace("verify_tablet_metadata_update(): {}", muts);
auto& db = env.local_db();
db.apply(freeze(muts), db::no_timeout).get();
locator::tablet_metadata_change_hint hint;
for (const auto& mut : muts) {
update_tablet_metadata_change_hint(hint, mut);
}
update_tablet_metadata(db, env.local_qp(), tm, hint).get();
auto tm_reload = read_tablet_metadata(env.local_qp()).get();
BOOST_REQUIRE_EQUAL(tm, tm_reload);
}
static
cql_test_config tablet_cql_test_config(db::tablets_mode_t::mode enable_tablets = db::tablets_mode_t::mode::enabled) {
cql_test_config c;
c.db_config->tablets_mode_for_new_keyspaces(enable_tablets);
if (c.db_config->enable_tablets_by_default()) {
c.initial_tablets = 2;
}
return c;
}
static
future<table_id> add_table(cql_test_env& e, sstring test_ks_name = "", std::map<sstring, sstring> tablet_options = {}) {
auto id = table_id(utils::UUID_gen::get_time_UUID());
co_await e.create_table([&] (std::string_view ks_name) {
if (!test_ks_name.empty()) {
ks_name = test_ks_name;
}
auto builder = schema_builder(ks_name, id.to_sstring(), id)
.with_column("p1", utf8_type, column_kind::partition_key)
.with_column("r1", int32_type);
if (!tablet_options.empty()) {
builder.set_tablet_options(std::move(tablet_options));
}
return *builder.build();
});
co_return id;
}
// Run in a seastar thread
static
sstring do_add_keyspace(cql_test_env& e, std::unordered_map<sstring, std::variant<int, std::vector<sstring>>> dc_rf, int initial_tablets = 0) {
static std::atomic<int> ks_id = 0;
auto ks_name = fmt::format("keyspace{}", ks_id.fetch_add(1));
sstring rf_options;
for (auto& [dc, rf] : dc_rf) {
auto rf_fmt = std::visit(overloaded_functor(
[] (int rf) { return fmt::format("{}", rf); },
[] (const std::vector<sstring>& racks) {
return fmt::format("[{}]", fmt::join(racks | std::views::transform(&cql3::util::single_quote), ", "));
}), rf);
rf_options += fmt::format(", '{}': {}", dc, rf_fmt);
}
testlog.info("Adding keyspace {} with replication factor options: {}", ks_name, rf_options);
e.execute_cql(fmt::format("create keyspace {} with replication = {{'class': 'NetworkTopologyStrategy'{}}}"
" and tablets = {{'enabled': true, 'initial': {}}}",
ks_name, rf_options, initial_tablets)).get();
return ks_name;
}
// Run in a seastar thread
static
sstring add_keyspace(cql_test_env& e, std::unordered_map<sstring, int> dc_rf, int initial_tablets = 0) {
std::unordered_map<sstring, std::variant<int, std::vector<sstring>>> dc_rf_expanded;
for (auto& [dc, rf] : dc_rf) {
dc_rf_expanded[dc] = rf;
}
return do_add_keyspace(e, std::move(dc_rf_expanded), initial_tablets);
}
// Run in a seastar thread
static
sstring add_keyspace_racks(cql_test_env& e, std::unordered_map<sstring, std::vector<sstring>> dc_rf, int initial_tablets = 0) {
std::unordered_map<sstring, std::variant<int, std::vector<sstring>>> dc_rf_expanded;
for (auto& [dc, rf] : dc_rf) {
dc_rf_expanded[dc] = rf;
}
return do_add_keyspace(e, std::move(dc_rf_expanded), initial_tablets);
}
// Run in a seastar thread
void mutate_tablets(cql_test_env& e, const group0_guard& guard, seastar::noncopyable_function<future<>(tablet_metadata&)> mutator) {
auto& stm = e.shared_token_metadata().local();
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
return mutator(tm.tablets());
}).get();
save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get();
}
// Run in a seastar thread
void mutate_tablets(cql_test_env& e, seastar::noncopyable_function<future<>(tablet_metadata&)> mutator) {
abort_source as;
auto guard = e.get_raft_group0_client().start_operation(as).get();
mutate_tablets(e, guard, std::move(mutator));
}
SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
auto table1 = add_table(e).get();
auto table2 = add_table(e).get();
auto ts = current_timestamp(e);
{
tablet_metadata tm = read_tablet_metadata(e.local_qp()).get();
// Add table1
{
tablet_map tmap(1);
tmap.set_tablet(tmap.first_tablet(), tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 3},
tablet_replica {h3, 1},
},
db_clock::now(),
locator::tablet_task_info::make_auto_repair_request({}, {"dc1", "dc2"}),
locator::tablet_task_info::make_intranode_migration_request(),
0
});
tm.set_tablet_map(table1, std::move(tmap));
}
verify_tablet_metadata_persistence(e, tm, ts);
// Add table2
{
tablet_map tmap(4);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
},
{},
{},
locator::tablet_task_info::make_migration_request(),
0
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h3, 3},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h2, 2},
},
{},
{},
locator::tablet_task_info::make_migration_request(),
0
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 1},
}
});
tm.set_tablet_map(table2, std::move(tmap));
}
verify_tablet_metadata_persistence(e, tm, ts);
// Increase RF of table2
tm.mutate_tablet_map_async(table2, [&] (tablet_map& tmap) {
auto tb = tmap.first_tablet();
tb = *tmap.next_tablet(tb);
tmap.set_tablet_transition_info(tb, tablet_transition_info{
tablet_transition_stage::allow_write_both_read_old,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h3, 3},
tablet_replica {h1, 7},
},
tablet_replica {h1, 7}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet_transition_info(tb, tablet_transition_info{
tablet_transition_stage::use_new,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h1, 4},
tablet_replica {h2, 2},
},
tablet_replica {h1, 4},
session_id(utils::UUID_gen::get_time_UUID())
});
return make_ready_future();
}).get();
verify_tablet_metadata_persistence(e, tm, ts);
// Reduce tablet count in table2
{
tablet_map tmap(2);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h3, 3},
}
});
tm.set_tablet_map(table2, std::move(tmap));
}
verify_tablet_metadata_persistence(e, tm, ts);
// Reduce RF for table1, increasing tablet count
{
tablet_map tmap(2);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h3, 7},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 3},
}
});
tm.set_tablet_map(table1, std::move(tmap));
}
verify_tablet_metadata_persistence(e, tm, ts);
// Reduce tablet count for table1
{
tablet_map tmap(1);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 3},
}
});
tm.set_tablet_map(table1, std::move(tmap));
}
verify_tablet_metadata_persistence(e, tm, ts);
// Change replica of table1
{
tablet_map tmap(1);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h3, 7},
}
});
tm.set_tablet_map(table1, std::move(tmap));
}
verify_tablet_metadata_persistence(e, tm, ts);
// Change resize decision of table1
{
tablet_map tmap(1);
locator::resize_decision decision;
decision.way = locator::resize_decision::split{},
decision.sequence_number = 1;
tmap.set_resize_decision(decision);
tmap.set_resize_task_info(locator::tablet_task_info::make_split_request());
tm.set_tablet_map(table1, std::move(tmap));
}
verify_tablet_metadata_persistence(e, tm, ts);
}
}, tablet_cql_test_config());
}
SEASTAR_THREAD_TEST_CASE(test_invalid_colocated_tables) {
auto cfg = tablet_cql_test_config(db::tablets_mode_t::mode::enforced);
do_with_cql_env_thread([] (auto& e) {
auto& sp = e.get_storage_proxy().local();
auto& mm = e.migration_manager().local();
topology_builder topo(e);
topo.add_node(node_state::normal, 1);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1);
auto ksm = e.local_db().find_keyspace(ks_name).metadata();
const auto t = schema_builder("ks", "t")
.with_column("pk", int32_type, column_kind::partition_key)
.build();
const auto t_paxos = schema_builder("ks", "t$paxos")
.with_column("pk", int32_type, column_kind::partition_key)
.build();
const auto t_paxos_paxos = schema_builder("ks", "t$paxos$paxos")
.with_column("pk", int32_type, column_kind::partition_key)
.build();
// check we can't colocate a new table with another new colocated table
// in one prepare_new_column_families_announcement call
{
utils::chunked_vector<mutation> muts;
seastar::testing::scoped_no_abort_on_internal_error abort_guard{};
BOOST_CHECK_EXCEPTION(service::prepare_new_column_families_announcement(muts,
sp, *ksm, {t, t_paxos, t_paxos_paxos},
utils::UUID_gen::get_time_UUID().timestamp())
.get(),
std::runtime_error,
[&](const std::runtime_error& e) {
const auto expected_message = ::format("Trying to set co-located table {} with base table {} but it's not a base table.",
t_paxos_paxos->id(), t_paxos->id());
return sstring(e.what()).starts_with(expected_message);
});
}
// create a colocated table
{
auto g = mm.start_group0_operation().get();
utils::chunked_vector<mutation> muts;
service::prepare_new_column_families_announcement(muts,
e.get_storage_proxy().local(),
*ksm,
{t, t_paxos},
g.write_timestamp()).get();
mm.announce(std::move(muts), std::move(g), "create test tables").get();
}
// check the same with an already existing colocated table
{
seastar::testing::scoped_no_abort_on_internal_error abort_guard{};
BOOST_CHECK_EXCEPTION(service::prepare_new_column_family_announcement(
e.get_storage_proxy().local(),
t_paxos_paxos,
utils::UUID_gen::get_time_UUID().timestamp())
.get(),
std::runtime_error,
[&](const std::runtime_error& e) {
const auto expected_message = ::format("Trying to set co-located table {} with base table {} but it's not a base table.",
t_paxos_paxos->id(), t_paxos->id());
return sstring(e.what()).starts_with(expected_message);
});
}
}, tablet_cql_test_config())
.get();
}
SEASTAR_TEST_CASE(test_paused_rf_change_requests_persistence) {
return do_with_cql_env_thread([] (cql_test_env& e) {
topology_builder topo(e);
auto topology = e.get_system_keyspace().local().load_topology_state({}).get();
// Check scheduled_rf_change_requests.
std::unordered_set<utils::UUID> current_requests;
auto new_id1 = utils::make_random_uuid();
topo.pause_rf_change_request(new_id1);
current_requests.insert(new_id1);
auto new_id2 = utils::make_random_uuid();
topo.pause_rf_change_request(new_id2);
current_requests.insert(new_id2);
topology = e.get_system_keyspace().local().load_topology_state({}).get();
BOOST_REQUIRE_EQUAL(current_requests.size(), topology.paused_rf_change_requests.size());
for (const auto& request : current_requests) {
BOOST_REQUIRE(topology.paused_rf_change_requests.contains(request));
}
topo.resume_rf_change_request(current_requests, new_id1);
current_requests.erase(new_id1);
topology = e.get_system_keyspace().local().load_topology_state({}).get();
BOOST_REQUIRE_EQUAL(current_requests.size(), topology.paused_rf_change_requests.size());
for (const auto& request : current_requests) {
BOOST_REQUIRE(topology.paused_rf_change_requests.contains(request));
}
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_tablet_metadata_persistence_with_colocated_tables) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
auto table1 = add_table(e).get();
auto table2 = add_table(e).get();
auto ts = current_timestamp(e);
{
tablet_metadata tm = read_tablet_metadata(e.local_qp()).get();
// Add table1
{
tablet_map tmap(1);
tmap.set_tablet(tmap.first_tablet(), tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 3},
tablet_replica {h3, 1},
},
db_clock::now(),
locator::tablet_task_info::make_auto_repair_request({}, {"dc1", "dc2"}),
locator::tablet_task_info::make_intranode_migration_request(),
0
});
tm.set_tablet_map(table1, std::move(tmap));
}
// Add table2 as a co-located table of table1
tm.set_colocated_table(table2, table1).get();
const auto& tmap1 = tm.get_tablet_map(table1);
const auto& tmap2 = tm.get_tablet_map(table2);
BOOST_REQUIRE_EQUAL(tmap1, tmap2);
verify_tablet_metadata_persistence(e, tm, ts);
}
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_read_required_hosts) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
tablet_metadata tm = read_tablet_metadata(e.local_qp()).get();
auto ts = current_timestamp(e);
verify_tablet_metadata_persistence(e, tm, ts);
BOOST_REQUIRE_EQUAL(std::unordered_set<locator::host_id>({}),
read_required_hosts(e.local_qp()).get());
// Add table1
auto table1 = add_table(e).get();
{
tablet_map tmap(1);
tmap.set_tablet(tmap.first_tablet(), tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 3},
}
});
tm.set_tablet_map(table1, std::move(tmap));
}
ts = current_timestamp(e);
verify_tablet_metadata_persistence(e, tm, ts);
BOOST_REQUIRE_EQUAL(std::unordered_set<locator::host_id>({h1, h2}),
read_required_hosts(e.local_qp()).get());
// Add table2
auto table2 = add_table(e).get();
{
tablet_map tmap(2);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h2, 0},
}
});
tmap.set_tablet_transition_info(tb, tablet_transition_info{
tablet_transition_stage::allow_write_both_read_old,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h3, 0},
},
tablet_replica {h3, 0}
});
tm.set_tablet_map(table2, std::move(tmap));
}
ts = current_timestamp(e);
verify_tablet_metadata_persistence(e, tm, ts);
BOOST_REQUIRE_EQUAL(std::unordered_set<locator::host_id>({h1, h2, h3}),
read_required_hosts(e.local_qp()).get());
}, tablet_cql_test_config());
}
// Check that updating tablet-metadata and reloading only modified parts from
// disk yields the correct metadata.
SEASTAR_TEST_CASE(test_tablet_metadata_update) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
auto& db = e.local_db();
auto table1 = add_table(e).get();
auto table1_schema = db.find_schema(table1);
auto table2 = add_table(e).get();
auto table2_schema = db.find_schema(table2);
testlog.trace("table1: {}", table1);
testlog.trace("table2: {}", table2);
tablet_metadata tm = read_tablet_metadata(e.local_qp()).get();
auto ts = current_timestamp(e);
// Add table1
{
testlog.trace("add table1");
tablet_map tmap(1);
tmap.set_tablet(tmap.first_tablet(), tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 3},
tablet_replica {h3, 1},
}
});
verify_tablet_metadata_update(e, tm, {
tablet_map_to_mutation(tmap, table1, table1_schema->ks_name(), table1_schema->cf_name(), ++ts, db.features()).get(),
});
}
// Add table2
{
testlog.trace("add table2");
tablet_map tmap(4);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h3, 3},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h2, 2},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 1},
}
});
verify_tablet_metadata_update(e, tm, {
tablet_map_to_mutation(tmap, table2, table2_schema->ks_name(), table2_schema->cf_name(), ++ts, db.features()).get(),
});
}
// Increase RF of table2
{
testlog.trace("increates RF of table2");
const auto& tmap = tm.get_tablet_map(table2);
auto tb = tmap.first_tablet();
replica::tablet_mutation_builder builder(ts++, table2);
tb = *tmap.next_tablet(tb);
builder.set_new_replicas(tmap.get_last_token(tb),
tablet_replica_set {
tablet_replica {h1, 7},
}
);
builder.set_stage(tmap.get_last_token(tb), tablet_transition_stage::allow_write_both_read_old);
builder.set_transition(tmap.get_last_token(tb), tablet_transition_kind::migration);
tb = *tmap.next_tablet(tb);
builder.set_new_replicas(tmap.get_last_token(tb),
tablet_replica_set {
tablet_replica {h1, 4},
}
);
builder.set_stage(tmap.get_last_token(tb), tablet_transition_stage::use_new);
builder.set_transition(tmap.get_last_token(tb), tablet_transition_kind::migration);
verify_tablet_metadata_update(e, tm, {
builder.build(),
});
}
// Reduce RF for table1, increasing tablet count
{
testlog.trace("reduce RF for table1, increasing tablet count");
tablet_map tmap(2);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h3, 7},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 3},
}
});
verify_tablet_metadata_update(e, tm, {
tablet_map_to_mutation(tmap, table1, table1_schema->ks_name(), table1_schema->cf_name(), ++ts, db.features()).get(),
});
}
// Reduce tablet count for table1
{
testlog.trace("reduce tablet count for table1");
tablet_map tmap(1);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 3},
}
});
verify_tablet_metadata_update(e, tm, {
tablet_map_to_mutation(tmap, table1, table1_schema->ks_name(), table1_schema->cf_name(), ++ts, db.features()).get(),
});
}
// Change replica of table1
{
testlog.trace("change replica of table1");
replica::tablet_mutation_builder builder(ts++, table1);
const auto& tmap = tm.get_tablet_map(table1);
auto tb = tmap.first_tablet();
builder.set_replicas(tmap.get_last_token(tb),
tablet_replica_set {
tablet_replica {h3, 7},
}
);
verify_tablet_metadata_update(e, tm, {
builder.build(),
});
}
// Migrate all tablets of table2
{
testlog.trace("stream all tablets of table2");
const auto& tmap = tm.get_tablet_map(table2);
utils::chunked_vector<mutation> muts;
for (std::optional<tablet_id> tb = tmap.first_tablet(); tb; tb = tmap.next_tablet(*tb)) {
replica::tablet_mutation_builder builder(ts++, table2);
const auto token = tmap.get_last_token(*tb);
builder.set_new_replicas(token,
tablet_replica_set {
tablet_replica {h2, 7},
}
);
builder.set_stage(token, tablet_transition_stage::streaming);
builder.set_transition(token, tablet_transition_kind::rebuild);
muts.emplace_back(builder.build());
}
verify_tablet_metadata_update(e, tm, std::move(muts));
}
// Remove transitions from tablets of table2
{
testlog.trace("stream all tablets of table2");
const auto& tmap = tm.get_tablet_map(table2);
utils::chunked_vector<mutation> muts;
for (std::optional<tablet_id> tb = tmap.first_tablet(); tb; tb = tmap.next_tablet(*tb)) {
replica::tablet_mutation_builder builder(ts++, table2);
const auto token = tmap.get_last_token(*tb);
builder.set_replicas(token,
tablet_replica_set {
tablet_replica {h2, 7},
}
);
builder.del_transition(token);
muts.emplace_back(builder.build());
}
verify_tablet_metadata_update(e, tm, std::move(muts));
}
// Drop table2
{
testlog.trace("drop table2");
verify_tablet_metadata_update(e, tm, {
make_drop_tablet_map_mutation(table2, ts++)
});
}
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_tablet_metadata_hint) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
auto table1 = add_table(e).get();
auto table2 = add_table(e).get();
testlog.trace("table1: {}", table1);
testlog.trace("table2: {}", table2);
tablet_metadata tm = read_tablet_metadata(e.local_qp()).get();
auto ts = current_timestamp(e);
auto check_hint = [&] (locator::tablet_metadata_change_hint& incremental_hint, utils::chunked_vector<canonical_mutation>& muts, mutation new_mut,
const locator::tablet_metadata_change_hint& expected_hint, std::source_location sl = std::source_location::current()) {
testlog.info("check_hint() called from {}:{}", sl.file_name(), sl.line());
replica::update_tablet_metadata_change_hint(incremental_hint, new_mut);
muts.emplace_back(new_mut);
auto full_hint_opt = replica::get_tablet_metadata_change_hint(muts);
if (expected_hint) {
BOOST_REQUIRE(full_hint_opt);
BOOST_REQUIRE_EQUAL(*full_hint_opt, incremental_hint);
} else {
BOOST_REQUIRE(!full_hint_opt);
}
BOOST_REQUIRE_EQUAL(incremental_hint, expected_hint);
};
auto make_hint = [&] (std::initializer_list<std::pair<table_id, std::vector<token>>> tablets) {
locator::tablet_metadata_change_hint hint;
for (const auto& [tid, tokens] : tablets) {
hint.tables.emplace(tid, locator::tablet_metadata_change_hint::table_hint{.table_id = tid, .tokens = tokens});
}
return hint;
};
// Unrelated mutation generates no hint
{
utils::chunked_vector<canonical_mutation> muts;
locator::tablet_metadata_change_hint hint;
simple_schema s;
auto mut = s.new_mutation("pk1");
s.add_row(mut, s.make_ckey(1), "v");
check_hint(hint, muts, std::move(mut), {});
}
// Incremental update of hint
{
utils::chunked_vector<canonical_mutation> muts;
locator::tablet_metadata_change_hint hint;
const auto& tmap = tm.get_tablet_map(table1);
std::vector<token> tokens;
for (std::optional<tablet_id> tid = tmap.first_tablet(); tid; tid = tmap.next_tablet(*tid)) {
const auto token = tmap.get_last_token(*tid);
tokens.push_back(token);
replica::tablet_mutation_builder builder(ts++, table1);
builder.set_replicas(token,
tablet_replica_set {
tablet_replica {h2, 7},
}
);
check_hint(hint, muts, builder.build(), make_hint({{table1, tokens}}));
}
}
tm = read_tablet_metadata(e.local_qp()).get();
// Deletions (and static rows) should generate a partition hint.
// Furthermore, if the partition had any row hints before, those should
// be cleared, to force a full partition reload.
auto check_delete_scenario = [&] (const char* scenario, std::function<void(table_id, mutation&, api::timestamp_type)> apply_delete) {
testlog.info("check_delete_scenario({})", scenario);
utils::chunked_vector<canonical_mutation> muts;
locator::tablet_metadata_change_hint hint;
// Check that a deletion generates only a partiton hint
{
const auto delete_ts = ts++;
replica::tablet_mutation_builder builder(delete_ts, table1);
auto mut = builder.build();
apply_delete(table1, mut, delete_ts);
check_hint(hint, muts, std::move(mut), make_hint({{table1, {}}}));
}
// First add a row, to check that the deletion will clear the tokens
// vector -- convert the row hints to a partition hint
{
// Add a row which will add a row hint
{
const auto tokens = tm.get_tablet_map(table2).get_sorted_tokens().get();
replica::tablet_mutation_builder builder(ts++, table2);
builder.set_replicas(tokens.front(),
tablet_replica_set {
tablet_replica {h3, 7},
}
);
check_hint(hint, muts, builder.build(), make_hint({{table1, {}}, {table2, {tokens.front()}}}));
}
// Apply the deletion which should clear the row hint, but leave the partition hint
{
const auto delete_ts = ts++;
replica::tablet_mutation_builder builder(delete_ts, table2);
auto mut = builder.build();
apply_delete(table2, mut, delete_ts);
check_hint(hint, muts, std::move(mut), make_hint({{table1, {}}, {table2, {}}}));
}
}
tm = read_tablet_metadata(e.local_qp()).get();
};
// Not a real deletion, but it should act the same way as a delete.
check_delete_scenario("static row", [&e] (table_id tbl, mutation& mut, api::timestamp_type delete_ts) {
auto tbl_s = e.local_db().find_column_family(tbl).schema();
mut.set_static_cell("keyspace_name", data_value(tbl_s->ks_name()), delete_ts);
});
check_delete_scenario("range tombstone", [&tm] (table_id tbl, mutation& mut, api::timestamp_type delete_ts) {
auto s = db::system_keyspace::tablets();
const auto tokens = tm.get_tablet_map(tbl).get_sorted_tokens().get();
BOOST_REQUIRE_GE(tokens.size(), 2);
const auto ck1 = clustering_key::from_single_value(*s, data_value(dht::token::to_int64(tokens[0])).serialize_nonnull());
const auto ck2 = clustering_key::from_single_value(*s, data_value(dht::token::to_int64(tokens[1])).serialize_nonnull());
mut.partition().apply_delete(*s, range_tombstone(ck1, bound_kind::excl_start, ck2, bound_kind::excl_end, tombstone(delete_ts, gc_clock::now())));
});
check_delete_scenario("row tombstone", [&tm] (table_id tbl, mutation& mut, api::timestamp_type delete_ts) {
auto s = db::system_keyspace::tablets();
const auto tokens = tm.get_tablet_map(tbl).get_sorted_tokens().get();
const auto ck = clustering_key::from_single_value(*s, data_value(dht::token::to_int64(tokens[0])).serialize_nonnull());
mut.partition().apply_delete(*s, ck, tombstone(delete_ts, gc_clock::now()));
});
// This will effectively drop both tables
check_delete_scenario("partition tombstone", [] (table_id tbl, mutation& mut, api::timestamp_type delete_ts) {
auto s = db::system_keyspace::tablets();
mut.partition().apply(tombstone(delete_ts, gc_clock::now()));
});
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_get_shard) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
inet_address ip1("192.168.0.1");
inet_address ip2("192.168.0.2");
inet_address ip3("192.168.0.3");
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
const auto shard_count = 2;
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{
locator::topology::config{
.this_endpoint = ip1,
.this_host_id = h1,
.local_dc_rack = locator::endpoint_dc_rack::default_location
}
});
auto stop_stm = deferred_stop(stm);
tablet_id tid(0);
tablet_id tid1(0);
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.update_topology(h1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count);
tm.update_topology(h2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count);
tm.update_topology(h3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count);
tablet_metadata tmeta;
tablet_map tmap(2);
tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h3, 5},
}
});
tid1 = *tmap.next_tablet(tid);
tmap.set_tablet(tid1, tablet_info {
tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h3, 1},
}
});
tmap.set_tablet_transition_info(tid, tablet_transition_info {
tablet_transition_stage::allow_write_both_read_old,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 3},
},
tablet_replica {h2, 3}
});
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
auto&& tmap = stm.get()->tablets().get_tablet_map(table1);
auto get_shard = [&] (tablet_id tid, host_id host) {
tablet_sharder sharder(*stm.get(), table1, host);
return sharder.shard_for_reads(tmap.get_last_token(tid));
};
BOOST_REQUIRE_EQUAL(get_shard(tid1, h1), std::make_optional(shard_id(2)));
BOOST_REQUIRE(!get_shard(tid1, h2));
BOOST_REQUIRE_EQUAL(get_shard(tid1, h3), std::make_optional(shard_id(1)));
BOOST_REQUIRE_EQUAL(get_shard(tid, h1), std::make_optional(shard_id(0)));
BOOST_REQUIRE_EQUAL(get_shard(tid, h2), std::make_optional(shard_id(3)));
BOOST_REQUIRE_EQUAL(get_shard(tid, h3), std::make_optional(shard_id(5)));
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_mutation_builder) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
auto table1 = add_table(e).get();
auto ts = current_timestamp(e);
tablet_metadata tm;
tablet_id tid(0);
tablet_id tid1(0);
{
tablet_map tmap(2);
tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h3, 5},
}
});
tid1 = *tmap.next_tablet(tid);
tmap.set_tablet(tid1, tablet_info {
tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h3, 1},
}
});
tm.set_tablet_map(table1, std::move(tmap));
}
save_tablet_metadata(e.local_db(), tm, ts++).get();
{
tablet_mutation_builder b(ts++, table1);
auto last_token = tm.get_tablet_map(table1).get_last_token(tid1);
b.set_new_replicas(last_token, tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h2, 3},
});
b.set_stage(last_token, tablet_transition_stage::write_both_read_new);
b.set_transition(last_token, tablet_transition_kind::migration);
e.local_db().apply({freeze(b.build())}, db::no_timeout).get();
}
{
tablet_map expected_tmap(2);
tid = expected_tmap.first_tablet();
expected_tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h3, 5},
}
});
tid1 = *expected_tmap.next_tablet(tid);
expected_tmap.set_tablet(tid1, tablet_info {
tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h3, 1},
}
});
expected_tmap.set_tablet_transition_info(tid1, tablet_transition_info {
tablet_transition_stage::write_both_read_new,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h2, 3},
},
tablet_replica {h2, 3}
});
auto tm_from_disk = read_tablet_metadata(e.local_qp()).get();
BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1));
}
{
tablet_mutation_builder b(ts++, table1);
auto last_token = tm.get_tablet_map(table1).get_last_token(tid1);
b.set_stage(last_token, tablet_transition_stage::use_new);
b.set_transition(last_token, tablet_transition_kind::migration);
e.local_db().apply({freeze(b.build())}, db::no_timeout).get();
}
{
tablet_map expected_tmap(2);
tid = expected_tmap.first_tablet();
expected_tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h3, 5},
}
});
tid1 = *expected_tmap.next_tablet(tid);
expected_tmap.set_tablet(tid1, tablet_info {
tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h3, 1},
}
});
expected_tmap.set_tablet_transition_info(tid1, tablet_transition_info {
tablet_transition_stage::use_new,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h2, 3},
},
tablet_replica {h2, 3}
});
auto tm_from_disk = read_tablet_metadata(e.local_qp()).get();
BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1));
}
{
tablet_mutation_builder b(ts++, table1);
auto last_token = tm.get_tablet_map(table1).get_last_token(tid1);
b.set_replicas(last_token, tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h2, 3},
});
b.del_transition(last_token);
e.local_db().apply({freeze(b.build())}, db::no_timeout).get();
}
{
tablet_map expected_tmap(2);
tid = expected_tmap.first_tablet();
expected_tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h3, 5},
}
});
tid1 = *expected_tmap.next_tablet(tid);
expected_tmap.set_tablet(tid1, tablet_info {
tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h2, 3},
}
});
auto tm_from_disk = read_tablet_metadata(e.local_qp()).get();
BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1));
}
static const auto resize_decision = locator::resize_decision("split", 1);
{
tablet_mutation_builder b(ts++, table1);
auto last_token = tm.get_tablet_map(table1).get_last_token(tid1);
b.set_replicas(last_token, tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h2, 3},
});
b.del_transition(last_token);
b.set_resize_decision(resize_decision, e.local_db().features());
e.local_db().apply({freeze(b.build())}, db::no_timeout).get();
}
{
tablet_map expected_tmap(2);
tid = expected_tmap.first_tablet();
expected_tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h3, 5},
}
});
tid1 = *expected_tmap.next_tablet(tid);
expected_tmap.set_tablet(tid1, tablet_info {
tablet_replica_set {
tablet_replica {h1, 2},
tablet_replica {h2, 3},
}
});
expected_tmap.set_resize_decision(resize_decision);
auto tm_from_disk = read_tablet_metadata(e.local_qp()).get();
expected_tmap.set_resize_task_info(tm_from_disk.get_tablet_map(table1).resize_task_info());
BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1));
}
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_sharder) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
token_metadata tokm(e.get_shared_token_metadata().local(), token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } });
tokm.get_topology().add_or_update_endpoint(h1);
std::vector<tablet_id> tablet_ids;
{
tablet_map tmap(8);
auto tid = tmap.first_tablet();
tablet_ids.push_back(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 3},
tablet_replica {h3, 5},
}
});
tid = *tmap.next_tablet(tid);
tablet_ids.push_back(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h2, 3},
tablet_replica {h3, 1},
}
});
tid = *tmap.next_tablet(tid);
tablet_ids.push_back(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h3, 2},
tablet_replica {h1, 1},
}
});
tmap.set_tablet_transition_info(tid, tablet_transition_info {
tablet_transition_stage::use_new,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h1, 1},
tablet_replica {h2, 3},
},
tablet_replica {h2, 3}
});
tid = *tmap.next_tablet(tid);
tablet_ids.push_back(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h3, 7},
tablet_replica {h2, 3},
}
});
// tablet_ids[4]
// h1 is leaving, h3 is pending
tid = *tmap.next_tablet(tid);
tablet_ids.push_back(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 5},
tablet_replica {h2, 1},
}
});
tmap.set_tablet_transition_info(tid, tablet_transition_info {
tablet_transition_stage::allow_write_both_read_old,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h3, 7},
tablet_replica {h2, 1},
},
tablet_replica {h3, 7}
});
// tablet_ids[5]
// h1 is leaving, h3 is pending
tid = *tmap.next_tablet(tid);
tablet_ids.push_back(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 5},
tablet_replica {h2, 1},
}
});
tmap.set_tablet_transition_info(tid, tablet_transition_info {
tablet_transition_stage::write_both_read_old,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h3, 7},
tablet_replica {h2, 1},
},
tablet_replica {h3, 7}
});
// tablet_ids[6]
// h1 is leaving, h3 is pending
tid = *tmap.next_tablet(tid);
tablet_ids.push_back(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 5},
tablet_replica {h2, 1},
}
});
tmap.set_tablet_transition_info(tid, tablet_transition_info {
tablet_transition_stage::write_both_read_new,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h3, 7},
tablet_replica {h2, 1},
},
tablet_replica {h3, 7}
});
// tablet_ids[7]
// h1 is leaving, h3 is pending
tid = *tmap.next_tablet(tid);
tablet_ids.push_back(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 5},
tablet_replica {h2, 1},
}
});
tmap.set_tablet_transition_info(tid, tablet_transition_info {
tablet_transition_stage::use_new,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {h3, 7},
tablet_replica {h2, 1},
},
tablet_replica {h3, 7}
});
tablet_metadata tm;
tm.set_tablet_map(table1, std::move(tmap));
tokm.set_tablets(std::move(tm));
}
auto& tm = tokm.tablets().get_tablet_map(table1);
tablet_sharder sharder(tokm, table1); // for h1
tablet_sharder sharder_h3(tokm, table1, h3);
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[0])), 3);
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[1])), 0); // missing
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[2])), 1);
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[3])), 0); // missing
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[0])), dht::shard_replica_set{3});
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[1])), dht::shard_replica_set{});
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[2])), dht::shard_replica_set{1});
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[3])), dht::shard_replica_set{});
// Shard for read should be stable across stages of migration. The coordinator may route
// requests to the leaving replica even if the stage on the replica side is use_new.
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[4])), 5);
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[5])), 5);
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[6])), 5);
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[7])), 5);
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[4])), dht::shard_replica_set{5});
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[5])), dht::shard_replica_set{5});
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[6])), dht::shard_replica_set{5});
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[7])), dht::shard_replica_set{5});
// On pending host
BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_reads(tm.get_last_token(tablet_ids[4])), 7);
BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_reads(tm.get_last_token(tablet_ids[5])), 7);
BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_reads(tm.get_last_token(tablet_ids[6])), 7);
BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_reads(tm.get_last_token(tablet_ids[7])), 7);
BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_writes(tm.get_last_token(tablet_ids[4])), dht::shard_replica_set{7});
BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_writes(tm.get_last_token(tablet_ids[5])), dht::shard_replica_set{7});
BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_writes(tm.get_last_token(tablet_ids[6])), dht::shard_replica_set{7});
BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_writes(tm.get_last_token(tablet_ids[7])), dht::shard_replica_set{7});
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_last_token(tablet_ids[1]), 0), tm.get_first_token(tablet_ids[3]));
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_last_token(tablet_ids[1]), 1), tm.get_first_token(tablet_ids[2]));
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_last_token(tablet_ids[1]), 3), dht::maximum_token());
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_first_token(tablet_ids[1]), 0), tm.get_first_token(tablet_ids[3]));
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_first_token(tablet_ids[1]), 1), tm.get_first_token(tablet_ids[2]));
BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_first_token(tablet_ids[1]), 3), dht::maximum_token());
{
auto shard_opt = sharder.next_shard_for_reads(tm.get_last_token(tablet_ids[0]));
BOOST_REQUIRE(shard_opt);
BOOST_REQUIRE_EQUAL(shard_opt->shard, 0);
BOOST_REQUIRE_EQUAL(shard_opt->token, tm.get_first_token(tablet_ids[1]));
}
{
auto shard_opt = sharder.next_shard_for_reads(tm.get_last_token(tablet_ids[1]));
BOOST_REQUIRE(shard_opt);
BOOST_REQUIRE_EQUAL(shard_opt->shard, 1);
BOOST_REQUIRE_EQUAL(shard_opt->token, tm.get_first_token(tablet_ids[2]));
}
{
auto shard_opt = sharder.next_shard_for_reads(tm.get_last_token(tablet_ids[2]));
BOOST_REQUIRE(shard_opt);
BOOST_REQUIRE_EQUAL(shard_opt->shard, 0);
BOOST_REQUIRE_EQUAL(shard_opt->token, tm.get_first_token(tablet_ids[3]));
}
{
auto shard_opt = sharder.next_shard_for_reads(tm.get_last_token(tablet_ids[tablet_ids.size() - 1]));
BOOST_REQUIRE(!shard_opt);
}
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_intranode_sharding) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_host_id = h1;
tm_cfg.topo_cfg.local_dc_rack = endpoint_dc_rack::default_location;
semaphore sem(1);
shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
auto tmptr = stm.make_token_metadata_ptr();
auto& tokm = *tmptr;
tokm.get_topology().add_or_update_endpoint(h1);
auto leaving_replica = tablet_replica{h1, 5};
auto pending_replica = tablet_replica{h1, 7};
auto const_replica = tablet_replica{h2, 1};
// Prepare a tablet map with different tablets being in intra-node migration at different stages.
std::vector<tablet_id> tablet_ids;
{
tablet_map tmap(4);
auto tid = tmap.first_tablet();
auto set_tablet = [&] (tablet_id tid, tablet_transition_stage stage) {
tablet_ids.push_back(tid);
tmap.set_tablet(tid, tablet_info{
tablet_replica_set{leaving_replica, const_replica}
});
tmap.set_tablet_transition_info(tid, tablet_transition_info {
stage,
tablet_transition_kind::intranode_migration,
tablet_replica_set{pending_replica, const_replica},
pending_replica
});
};
// tablet_ids[0]
set_tablet(tid, tablet_transition_stage::allow_write_both_read_old);
// tablet_ids[1]
tid = *tmap.next_tablet(tid);
set_tablet(tid, tablet_transition_stage::write_both_read_old);
// tablet_ids[2]
tid = *tmap.next_tablet(tid);
set_tablet(tid, tablet_transition_stage::write_both_read_new);
// tablet_ids[3]
tid = *tmap.next_tablet(tid);
set_tablet(tid, tablet_transition_stage::use_new);
tablet_metadata tm;
tm.set_tablet_map(table1, std::move(tmap));
tokm.set_tablets(std::move(tm));
}
auto& tm = tokm.tablets().get_tablet_map(table1);
tablet_sharder sharder(tokm, table1); // for h1
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[0])), 5);
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[1])), 5);
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[2])), 7);
BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[3])), 7);
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[0])), dht::shard_replica_set{5});
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[1])), dht::shard_replica_set({7, 5}));
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[2])), dht::shard_replica_set({7, 5}));
BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[3])), dht::shard_replica_set{7});
// On const replica
tablet_sharder sharder_h2(tokm, table1, const_replica.host);
for (auto id : tablet_ids) {
BOOST_REQUIRE_EQUAL(sharder_h2.shard_for_reads(tm.get_last_token(id)), const_replica.shard);
BOOST_REQUIRE_EQUAL(sharder_h2.shard_for_writes(tm.get_last_token(id)), dht::shard_replica_set{const_replica.shard});
}
}, tablet_cql_test_config());
}
SEASTAR_TEST_CASE(test_large_tablet_metadata) {
return do_with_cql_env_thread([] (cql_test_env& e) {
tablet_metadata tm;
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
const int nr_tables = 1'00;
const int tablets_per_table = 1024;
for (int i = 0; i < nr_tables; ++i) {
tablet_map tmap(tablets_per_table);
for (tablet_id j : tmap.tablet_ids()) {
tmap.set_tablet(j, tablet_info {
tablet_replica_set {{h1, 0}, {h2, 1}, {h3, 2},}
});
}
auto id = add_table(e).get();
tm.set_tablet_map(id, std::move(tmap));
}
auto ts = current_timestamp(e);
verify_tablet_metadata_persistence(e, tm, ts);
}, tablet_cql_test_config());
}
SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) {
const auto real_min_token = dht::token::first();
const auto real_max_token = dht::token::last();
for (auto&& tmap : {
tablet_map(1),
tablet_map(2),
tablet_map(4),
tablet_map(16),
tablet_map(1024),
}) {
testlog.debug("tmap: {}", tmap);
BOOST_REQUIRE_EQUAL(real_min_token, tmap.get_first_token(tmap.first_tablet()));
BOOST_REQUIRE_EQUAL(real_max_token, tmap.get_last_token(tmap.last_tablet()));
std::optional<tablet_id> prev_tb;
for (tablet_id tb : tmap.tablet_ids()) {
testlog.debug("first: {}, last: {}", tmap.get_first_token(tb), tmap.get_last_token(tb));
BOOST_REQUIRE_EQUAL(tb, tmap.get_tablet_id(tmap.get_first_token(tb)));
BOOST_REQUIRE_EQUAL(tb, tmap.get_tablet_id(tmap.get_last_token(tb)));
if (prev_tb) {
BOOST_REQUIRE_EQUAL(dht::next_token(tmap.get_last_token(*prev_tb)), tmap.get_first_token(tb));
}
prev_tb = tb;
}
}
}
static
future<> apply_resize_plan(token_metadata& tm, const migration_plan& plan) {
for (auto [table_id, resize_decision] : plan.resize_plan().resize) {
co_await tm.tablets().mutate_tablet_map_async(table_id, [&] (tablet_map& tmap) {
resize_decision.sequence_number = tmap.resize_decision().sequence_number + 1;
tmap.set_resize_decision(resize_decision);
return make_ready_future();
});
}
}
static
future<group0_guard> save_token_metadata(cql_test_env& e, group0_guard guard) {
auto& stm = e.local_db().get_shared_token_metadata();
auto tm = stm.get();
e.get_topology_state_machine().local()._topology.version = tm->get_version();
co_await save_tablet_metadata(e.local_db(), tm->tablets(), guard.write_timestamp());
utils::chunked_vector<frozen_mutation> muts;
muts.push_back(freeze(topology_mutation_builder(guard.write_timestamp())
.set_version(tm->get_version())
.build().to_mutation(db::system_keyspace::topology())));
co_await e.local_db().apply(muts, db::no_timeout);
co_await e.get_storage_service().local().update_tablet_metadata({});
// Need a new guard to make sure later changes use later timestamp.
// Also, so that the table layer processes the changes we persisted, which is important for splits.
// Before we can finalize a split, the storage group needs to process the split by creating split-ready compaction groups.
release_guard(std::move(guard));
abort_source as;
co_return co_await e.get_raft_group0_client().start_operation(as);
}
static
future<> handle_resize_finalize(cql_test_env& e, group0_guard& guard, const migration_plan& plan, shared_load_stats* load_stats) {
auto& talloc = e.get_tablet_allocator().local();
auto& stm = e.shared_token_metadata().local();
auto old_tm = stm.get();
bool changed = false;
for (auto table_id : plan.resize_plan().finalize_resize) {
auto tm = stm.get();
const auto& old_tmap = tm->tablets().get_tablet_map(table_id);
auto new_tmap = co_await talloc.resize_tablets(tm, table_id);
auto new_resize_decision = locator::resize_decision{};
new_resize_decision.sequence_number = old_tmap.resize_decision().next_sequence_number();
new_tmap.set_resize_decision(std::move(new_resize_decision));
co_await stm.mutate_token_metadata([table_id, &new_tmap, &changed] (token_metadata& tm) {
changed = true;
tm.tablets().set_tablet_map(table_id, std::move(new_tmap));
tm.set_version(tm.get_version() + 1);
return make_ready_future<>();
});
}
if (changed) {
// Need to reload on each resize because table object expects tablet count to change by a factor of 2.
guard = co_await save_token_metadata(e, std::move(guard));
if (load_stats) {
auto new_tm = stm.get();
auto reconciled_stats = load_stats->stats.reconcile_tablets_resize(plan.resize_plan().finalize_resize, *old_tm, *new_tm);
if (reconciled_stats) {
load_stats->stats = *reconciled_stats;
}
}
testlog.debug("Calling local_topology_barrier()");
old_tm = nullptr;
co_await e.get_storage_service().local().local_topology_barrier();
testlog.debug("Finished local_topology_barrier()");
}
}
static future<> apply_repair_transitions(token_metadata& tm, const migration_plan& plan) {
for (const auto& repair : plan.repair_plan().repairs()) {
co_await tm.tablets().mutate_tablet_map_async(repair.table, [&] (tablet_map& tmap) {
auto tablet_info = tmap.get_tablet_info(repair.tablet);
tmap.set_tablet_transition_info(repair.tablet, tablet_transition_info{
tablet_transition_stage::repair,
tablet_transition_kind::repair,
tablet_info.replicas,
std::nullopt,
});
return make_ready_future();
});
}
}
// Reflects the plan in a given token metadata as if the migrations were fully executed.
static
future<> apply_plan(token_metadata& tm, const migration_plan& plan, service::topology& topology, shared_load_stats* load_stats) {
for (auto&& mig : plan.migrations()) {
co_await tm.tablets().mutate_tablet_map_async(mig.tablet.table, [&] (tablet_map& tmap) {
if (load_stats) {
global_tablet_id gid {mig.tablet.table, mig.tablet.tablet};
dht::token_range trange {tmap.get_token_range(mig.tablet.tablet)};
auto new_stats = load_stats->stats.migrate_tablet_size(mig.src.host, mig.dst.host, gid, trange);
if (new_stats) {
load_stats->stats = std::move(*new_stats);
}
}
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
testlog.trace("Replacing tablet {} replica from {} to {}", mig.tablet.tablet, mig.src, mig.dst);
tinfo.replicas = replace_replica(tinfo.replicas, mig.src, mig.dst);
tmap.set_tablet(mig.tablet.tablet, tinfo);
return make_ready_future();
});
}
co_await apply_resize_plan(tm, plan);
if (auto request_id = plan.rack_list_colocation_plan().request_to_resume(); request_id) {
topology.paused_rf_change_requests.erase(request_id);
}
co_await apply_repair_transitions(tm, plan);
}
// Reflects the plan in a given token metadata as if the migrations were started but not yet executed.
static
future<> apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) {
for (auto&& mig : plan.migrations()) {
co_await tm.tablets().mutate_tablet_map_async(mig.tablet.table, [&] (tablet_map& tmap) {
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
tmap.set_tablet_transition_info(mig.tablet.tablet, migration_to_transition_info(tinfo, mig));
return make_ready_future();
});
}
co_await apply_resize_plan(tm, plan);
}
static
size_t get_tablet_count(const tablet_metadata& tm) {
size_t count = 0;
for (const auto& [table, tmap] : tm.all_tables_ungrouped()) {
count += std::accumulate(tmap->tablets().begin(), tmap->tablets().end(), size_t(0),
[] (size_t accumulator, const locator::tablet_info& info) {
return accumulator + info.replicas.size();
});
}
return count;
}
static
void check_tablet_invariants(const tablet_metadata& tmeta);
static
void do_rebalance_tablets(cql_test_env& e,
group0_guard& guard,
shared_load_stats* load_stats = nullptr,
std::unordered_set<host_id> skiplist = {},
std::function<bool(const migration_plan&)> stop = nullptr,
bool auto_split = false)
{
auto& talloc = e.get_tablet_allocator().local();
auto& stm = e.shared_token_metadata().local();
auto& sys_ks = e.get_system_keyspace().local();
auto& topology = e.get_topology_state_machine().local()._topology;
// Sanity limit to avoid infinite loops.
// The x10 factor is arbitrary, it's there to account for more complex schedules than direct migration.
auto max_iterations = 1 + get_tablet_count(stm.get()->tablets()) * 10;
for (size_t i = 0; i < max_iterations; ++i) {
auto plan = talloc.balance_tablets(stm.get(), &topology, &sys_ks, load_stats ? load_stats->get() : nullptr, skiplist).get();
if (plan.empty()) {
return;
}
if (stop && stop(plan)) {
return;
}
stm.mutate_token_metadata([&] (token_metadata& tm) {
return apply_plan(tm, plan, e.get_topology_state_machine().local()._topology, load_stats);
}).get();
if (auto_split && load_stats) {
bool reload = false;
auto& tm = *stm.get();
for (const auto& [table, tmap]: tm.tablets().all_tables_ungrouped()) {
if (std::holds_alternative<resize_decision::split>(tmap->resize_decision().way)) {
if (load_stats->stats.tables[table].split_ready_seq_number != tmap->resize_decision().sequence_number) {
testlog.debug("set_split_ready_seq_number({}, {})", table, tmap->resize_decision().sequence_number);
load_stats->set_split_ready_seq_number(table, tmap->resize_decision().sequence_number);
reload = true;
}
}
}
// Need to order split-ack before split finalization, storage_group assumes that.
if (reload) {
guard = save_token_metadata(e, std::move(guard)).get();
}
}
handle_resize_finalize(e, guard, plan, load_stats).get();
}
throw std::runtime_error("rebalance_tablets(): convergence not reached within limit");
}
static
void apply_resize_decisions(cql_test_env& e, shared_load_stats& stats) {
testlog.debug("apply_resize_decisions(): start");
abort_source as;
auto guard = e.get_raft_group0_client().start_operation(as).get();
auto& talloc = e.get_tablet_allocator().local();
auto& stm = e.shared_token_metadata().local();
auto plan = talloc.balance_tablets(stm.get(),
&e.get_topology_state_machine().local()._topology,
&e.get_system_keyspace().local(),
stats.get()).get();
stm.mutate_token_metadata([&] (token_metadata& tm) {
return apply_resize_plan(tm, plan);
}).get();
// We should not introduce inconsistency between on-disk state and in-memory state
// as that may violate invariants and cause failures in later operations
// causing test flakiness.
save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get();
e.get_storage_service().local().update_tablet_metadata({}).get();
testlog.debug("apply_resize_decisions(): done");
}
// Invokes the tablet scheduler and executes its plan, continuously until it emits an empty plan.
// Simulates topology coordinator but doesn't perform actual migration,
// only reflects it in the metadata.
// Run in a seastar thread.
void rebalance_tablets(cql_test_env& e,
shared_load_stats* load_stats = nullptr,
std::unordered_set<host_id> skiplist = {},
std::function<bool(const migration_plan&)> stop = nullptr,
bool auto_split = true) {
abort_source as;
testlog.debug("rebalance_tablets(): start");
auto guard = e.get_raft_group0_client().start_operation(as).get();
testlog.debug("rebalance_tablets(): took group0 guard");
shared_load_stats local_stats;
if (!load_stats) {
// Provide default capacity for each node.
e.shared_token_metadata().local().get()->get_topology().for_each_node([&] (const auto& node) {
local_stats.set_capacity(node.host_id(), default_target_tablet_size * node.get_shard_count());
});
load_stats = &local_stats;
}
do_rebalance_tablets(e, guard, load_stats, std::move(skiplist), std::move(stop), auto_split);
testlog.debug("rebalance_tablets(): rebalanced");
// We should not introduce inconsistency between on-disk state and in-memory state
// as that may violate invariants and cause failures in later operations
// causing test flakiness.
auto& stm = e.shared_token_metadata().local();
save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get();
e.get_storage_service().local().update_tablet_metadata({}).get();
testlog.debug("rebalance_tablets(): done");
}
static
void rebalance_tablets_as_in_progress(cql_test_env& env, shared_load_stats& stats,
std::function<bool(const migration_plan&)> stop = nullptr) {
auto& stm = env.local_db().get_shared_token_metadata();
auto& talloc = env.get_tablet_allocator().local();
auto& topology = env.get_topology_state_machine().local()._topology;
auto& sys_ks = env.get_system_keyspace().local();
while (true) {
auto plan = talloc.balance_tablets(stm.get(), &topology, &sys_ks, stats.get()).get();
if (plan.empty() || (stop && stop(plan))) {
break;
}
stm.mutate_token_metadata([&] (token_metadata& tm) {
return apply_plan_as_in_progress(tm, plan);
}).get();
}
}
// Completes any in progress tablet migrations.
static
void execute_transitions(shared_token_metadata& stm) {
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
for (auto&& [table, tables] : tm.tablets().all_table_groups()) {
co_await tm.tablets().mutate_tablet_map_async(table, [&] (tablet_map& tmap) {
for (auto&& [tablet, trinfo]: tmap.transitions()) {
auto ti = tmap.get_tablet_info(tablet);
ti.replicas = trinfo.next;
tmap.set_tablet(tablet, ti);
}
tmap.clear_transitions();
return make_ready_future();
});
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) {
do_with_cql_env_thread([] (auto& e) {
// Tests the scenario of bootstrapping a single node
// Verifies that load balancer sees it and moves tablets to that node.
topology_builder topo(e);
unsigned shard_count = 2;
auto host1 = topo.add_node(node_state::normal, shard_count);
auto host2 = topo.add_node(node_state::normal, shard_count);
auto host3 = topo.add_node(node_state::normal, shard_count);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4);
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(4);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 1},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 1},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 1},
tablet_replica {host2, 0},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
// Sanity check
{
load_sketch load(stm.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_load(host1), 4);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host1), 2);
BOOST_REQUIRE_EQUAL(load.get_load(host2), 4);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host2), 2);
BOOST_REQUIRE_EQUAL(load.get_load(host3), 0);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host3), 0);
}
shared_load_stats& load_stats = topo.get_shared_load_stats();
load_stats.set_default_tablet_sizes(stm.get());
rebalance_tablets(e, &load_stats);
{
load_sketch load(stm.get());
load.populate().get();
for (auto h : {host1, host2, host3}) {
testlog.debug("Checking host {}", h);
BOOST_REQUIRE_LE(load.get_load(h), 3);
BOOST_REQUIRE_GT(load.get_load(h), 1);
BOOST_REQUIRE_LE(load.get_avg_tablet_count(h), 2);
BOOST_REQUIRE_GT(load.get_avg_tablet_count(h), 0);
}
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_no_conflicting_migrations_in_the_plan) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
unsigned shard_count = 1;
auto dc1 = topo.dc();
[[maybe_unused]] auto host1 = topo.add_node(node_state::normal, shard_count);
[[maybe_unused]] auto host2 = topo.add_node(node_state::normal, shard_count);
topo.start_new_rack();
[[maybe_unused]] auto host3 = topo.add_node(node_state::normal, shard_count);
[[maybe_unused]] auto host4 = topo.add_node(node_state::normal, shard_count);
auto dc2 = topo.start_new_dc().dc;
[[maybe_unused]] auto host5 = topo.add_node(node_state::normal, shard_count);
[[maybe_unused]] auto host6 = topo.add_node(node_state::normal, shard_count);
auto ks_name = add_keyspace(e, {{dc1, 2}, {dc2, 1}}, 2);
auto table1 = add_table(e, ks_name).get();
// Create imbalance in dc1::rack1, dc1::rack2, and dc2::rack1
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(2);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica{host1, 0},
tablet_replica{host3, 0},
tablet_replica{host5, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica{host1, 0},
tablet_replica{host3, 0},
tablet_replica{host5, 0},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
auto& talloc = e.get_tablet_allocator().local();
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
talloc.set_load_stats(topo.get_load_stats());
migration_plan plan = talloc.balance_tablets(stm.get(), nullptr, nullptr).get();
BOOST_REQUIRE(!plan.empty());
std::set<global_tablet_id> tablets;
for (auto&& mig : plan.migrations()) {
BOOST_REQUIRE(!tablets.contains(mig.tablet));
tablets.insert(mig.tablet);
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_no_conflicting_internode_and_intra_merge_colocation) {
// 1. The cluster has two racks, RackA and RackB, and the plan is per-rack.
// 2. Two sibling tablets, T1 and T2, are marked for merge.
// 3. In RackA, the replicas of T1 and T2 are co-located on an overloaded node,
// making them a candidate for inter-node migration to achieve load balancing.
// 4. In RackB, the replicas of T1 and T2 are on the same node but on different
// shards, making them a candidate for intra-node migration to fix merge co-location.
//
// Verify that the load balancer's plan does not include conflicting migrations.
// If the tablets T1 and T2 were chosen to be migrated between node in RackA, the merge
// co-location plan should not generate migrations in RackB for the same tablets.
cql_test_config cfg{};
cfg.db_config->rf_rack_valid_keyspaces.set(true);
do_with_cql_env_thread([] (auto& e) {
logging::logger_registry().set_logger_level("load_balancer", logging::log_level::trace);
topology_builder topo(e);
// RackA: NodeA (overloaded), NodeB (underloaded)
// RackB: NodeC (balanced, with intra-node misalignment for co-location)
auto rackA = topo.rack();
auto hostA = topo.add_node(node_state::normal, 2, rackA);
auto hostB = topo.add_node(node_state::normal, 2, rackA);
auto rackB = topo.start_new_rack();
auto hostC = topo.add_node(node_state::normal, 2, rackB);
// Create a table with 2 tablets that will be marked for merge.
auto ks_name = add_keyspace_racks(e, {{topo.dc(), {rackA.rack, rackB.rack}}}, 2);
auto table1 = add_table(e, ks_name).get();
// Add more tables to create a clear load imbalance that can be resolved.
auto table_for_load_1 = add_table(e, ks_name).get();
auto table_for_load_2 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap_merge(2);
auto t1 = tmap_merge.first_tablet();
auto t2 = *tmap_merge.next_tablet(t1);
tmap_merge.set_tablet(t1, tablet_info {
tablet_replica_set {
tablet_replica{hostA, 0}, // RackA
tablet_replica{hostC, 0}, // RackB
}
});
tmap_merge.set_tablet(t2, tablet_info {
tablet_replica_set {
tablet_replica{hostA, 0}, // RackA
tablet_replica{hostC, 1}, // RackB
}
});
tmeta.set_tablet_map(table1, std::move(tmap_merge));
// Add more tablets to hostA to make it clearly overloaded.
// Total load on hostA will be 4, hostB is 0. Avg is 2.
// Moving the {t1,t2} set (load 2) from A->B makes loads {2, 2}, which is balanced.
tablet_map tmap_load(1);
tmap_load.set_tablet(tmap_load.first_tablet(), tablet_info{tablet_replica_set{tablet_replica{hostA, 0}}});
tablet_map tmap_load_clone = co_await tmap_load.clone_gently();
tmeta.set_tablet_map(table_for_load_1, std::move(tmap_load));
tmeta.set_tablet_map(table_for_load_2, std::move(tmap_load_clone));
co_return;
});
// Mark the tablets for merge to create a co-location plan.
mutate_tablets(e, [&] (tablet_metadata& tmeta) {
return tmeta.mutate_tablet_map_async(table1, [] (tablet_map& tmap) {
locator::resize_decision decision;
decision.way = locator::resize_decision::merge{};
decision.sequence_number = tmap.resize_decision().sequence_number + 1;
tmap.set_resize_decision(std::move(decision));
return make_ready_future<>();
});
});
auto& stm = e.shared_token_metadata().local();
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
auto& talloc = e.get_tablet_allocator().local();
talloc.set_load_stats(topo.get_load_stats());
migration_plan plan = talloc.balance_tablets(stm.get(), nullptr, nullptr).get();
// The plan should contain non-conflicting migrations.
BOOST_REQUIRE(!plan.empty());
std::set<global_tablet_id> tablets;
for (auto&& mig : plan.migrations()) {
BOOST_REQUIRE(!tablets.contains(mig.tablet));
tablets.insert(mig.tablet);
}
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_rack_list_conversion) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
unsigned shard_count = 1;
auto dc1 = topo.dc();
auto rack1 = topo.rack();
[[maybe_unused]] auto host1 = topo.add_node(node_state::normal, shard_count);
[[maybe_unused]] auto host2 = topo.add_node(node_state::normal, shard_count);
auto rack2 = topo.start_new_rack();
[[maybe_unused]] auto host3 = topo.add_node(node_state::normal, shard_count);
[[maybe_unused]] auto host4 = topo.add_node(node_state::normal, shard_count);
auto rack3 = topo.start_new_rack();
[[maybe_unused]] auto host5 = topo.add_node(node_state::normal, shard_count);
[[maybe_unused]] auto host6 = topo.add_node(node_state::normal, shard_count);
auto dc2 = topo.start_new_dc().dc;
[[maybe_unused]] auto host7 = topo.add_node(node_state::normal, shard_count);
[[maybe_unused]] auto host8 = topo.add_node(node_state::normal, shard_count);
auto ks_name = add_keyspace(e, {{dc1, 2}}, 4);
auto table1 = add_table(e, ks_name).get();
// rack1: host1: A D host2: C
// rack2: host3: A host4: B
// rack3: host5: C host6: B D
tablet_id A{0}, B{0};
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(4);
auto tid = tmap.first_tablet();
A = tid;
tmap.set_tablet(tid, tablet_info { // A
tablet_replica_set {
tablet_replica{host1, 0},
tablet_replica{host3, 0},
}
});
tid = *tmap.next_tablet(tid);
B = tid;
tmap.set_tablet(tid, tablet_info { // B
tablet_replica_set {
tablet_replica{host4, 0},
tablet_replica{host6, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info { // C
tablet_replica_set {
tablet_replica{host2, 0},
tablet_replica{host5, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info { // D
tablet_replica_set {
tablet_replica{host1, 0},
tablet_replica{host6, 0},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto id = utils::UUID_gen::get_time_UUID();
// Build the map literal for CQL
auto rf_change_data_cql = format("{{'replication:class': 'NetworkTopologyStrategy', 'replication:{}:0': '{}', 'replication:{}:1': '{}'}}",
dc1, rack1.rack, dc1, rack3.rack);
e.execute_cql(format("INSERT INTO system.topology_requests (id, request_type, done, new_keyspace_rf_change_ks_name, new_keyspace_rf_change_data) VALUES ({}, 'keyspace_rf_change', False, '{}', {})",
id, ks_name, rf_change_data_cql)).get();
auto& stm = e.shared_token_metadata().local();
auto& talloc = e.get_tablet_allocator().local();
talloc.set_load_stats(topo.get_load_stats());
auto& sys_ks = e.get_system_keyspace().local();
auto& topology = e.get_topology_state_machine().local()._topology;
topology.paused_rf_change_requests.insert(id);
migration_plan plan = talloc.balance_tablets(stm.get(), &topology, &sys_ks).get();
BOOST_REQUIRE(!plan.empty());
// A : host3 -> host5 / host6
// B : host4 -> host1 / host2
for (auto& mig : plan.migrations()) {
testlog.info("Rack list colocation migration: {}", mig);
BOOST_REQUIRE(mig.kind == locator::tablet_transition_kind::migration);
BOOST_REQUIRE(mig.src.host == host3 || mig.src.host == host4);
if (mig.src.host == host3) {
BOOST_REQUIRE(mig.tablet.tablet == A);
BOOST_REQUIRE(mig.dst.host == host5 || mig.dst.host == host6);
} else {
BOOST_REQUIRE(mig.tablet.tablet == B);
BOOST_REQUIRE(mig.dst.host == host1 || mig.dst.host == host2);
}
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_colocation_skipped_on_excluded_nodes) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto host1 = topo.add_node(node_state::normal, 2, rack1);
// host2 has 1 shard so that rack2 doesn't need co-location and if any, it will be on host1
auto host2 = topo.add_node(node_state::normal, 1, rack2);
auto ks_name = add_keyspace_racks(e, {{topo.dc(), {rack1.rack, rack2.rack}}}, 8);
auto table1 = add_table(e, ks_name).get();
topo.get_shared_load_stats().set_size(table1, 0);
auto& stm = e.shared_token_metadata().local();
topo.add_node(node_state::normal, 1, rack1); // So that balancer doesn't exit early due to no candidate nodes.
e.get_storage_service().local().mark_excluded({host1}).get();
topo.add_draining_request(host1);
// trigger merge
e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get();
apply_resize_decisions(e, topo.get_shared_load_stats());
// Sanity check, to verify that co-location was attempted.
BOOST_REQUIRE(stm.get()->tablets().get_tablet_map(table1).needs_merge());
abort_source as;
auto guard = e.get_raft_group0_client().start_operation(as).get();
rebalance_tablets_as_in_progress(e, topo.get_shared_load_stats(), [&] (const migration_plan& plan) {
// Verify that only rebuilding migrations involve the excluded host.
for (auto&& mig : plan.migrations()) {
BOOST_REQUIRE_NE(mig.dst.host, host1);
if (mig.src.host == host1) {
BOOST_REQUIRE(mig.kind == tablet_transition_kind::rebuild_v2);
}
}
return false;
});
// Restore consistency between stm and system tables before releasing group0 guard.
save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get();
}, tablet_cql_test_config()).get();
}
SEASTAR_THREAD_TEST_CASE(test_no_intranode_migration_on_draining_node) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
// host which is decommissioned has more shards so that it has spare streaming capacity
// to be used by potential intra-node migration.
auto host1 = topo.add_node(node_state::normal, 5, rack1);
auto host2 = topo.add_node(node_state::normal, 1, rack1);
auto ks_name = add_keyspace_racks(e, {{topo.dc(), {rack1.rack}}}, 16);
auto table1 = add_table(e, ks_name).get();
topo.get_shared_load_stats().set_size(table1, 0);
topo.add_draining_request(host1);
auto& stm = e.shared_token_metadata().local();
// trigger merge to exercise co-location
e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get();
apply_resize_decisions(e, topo.get_shared_load_stats());
// Sanity check, to verify that co-location was attempted.
BOOST_REQUIRE(stm.get()->tablets().get_tablet_map(table1).needs_merge());
abort_source as;
auto guard = e.get_raft_group0_client().start_operation(as).get();
rebalance_tablets_as_in_progress(e, topo.get_shared_load_stats(), [&] (const migration_plan& plan) {
// Verify no intra-node migrations on the draining host.
for (auto&& mig : plan.migrations()) {
if (mig.src.host == host1) {
BOOST_REQUIRE_NE(mig.dst.host, host1);
}
}
return false;
});
// Restore consistency between stm and system tables before releasing group0 guard.
save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get();
}, tablet_cql_test_config()).get();
}
// Throws if tablets have more than 1 replica in a given rack.
// Run in seastar thread.
void check_no_rack_overload(const token_metadata& tm) {
auto& topo = tm.get_topology();
for (const auto& [table, tmap_p] : tm.tablets().all_tables_ungrouped()) {
const tablet_map& tmap = *tmap_p;
tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) {
std::unordered_map<sstring, std::unordered_set<sstring>> racks_by_dc;
auto replicas = tinfo.replicas;
for (auto& r : tinfo.replicas) {
auto& rack = topo.get_rack(r.host);
auto& racks = racks_by_dc[topo.get_datacenter(r.host)];
if (racks.contains(rack)) {
throw std::runtime_error("rack overloaded");
}
racks.insert(rack);
}
return make_ready_future<>();
}).get();
}
}
// Verifies that all tablets in the tablet_map are replicated to a given set of racks
// and not placed on any of the bad_nodes.
void check_rack_list(const locator::topology& topo, const tablet_map& tmap, sstring dc, rack_list racks, std::set<host_id> bad_nodes = {}) {
std::sort(racks.begin(), racks.end());
tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) {
std::unordered_map<sstring, std::unordered_set<sstring>> racks_by_dc;
auto replicas = tinfo.replicas;
rack_list actual_racks;
for (auto& r : tinfo.replicas) {
if (bad_nodes.contains(r.host)) {
throw std::runtime_error(fmt::format("Bad node {} found in tablet {}", r.host, tid));
}
if (topo.get_datacenter(r.host) == dc) {
actual_racks.push_back(topo.get_rack(r.host));
}
}
std::sort(actual_racks.begin(), actual_racks.end());
if (actual_racks != racks) {
throw std::runtime_error(fmt::format("Bad racks for tablet {}: expected {}, got {}", tid, racks, actual_racks));
}
return make_ready_future<>();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_rack_list_conversion_with_two_replicas_in_rack) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
unsigned shard_count = 1;
auto dc1 = topo.dc();
auto rack1 = topo.rack();
[[maybe_unused]] auto host1 = topo.add_node(node_state::normal, shard_count);
[[maybe_unused]] auto host2 = topo.add_node(node_state::normal, shard_count);
auto rack2 = topo.start_new_rack();
[[maybe_unused]] auto host3 = topo.add_node(node_state::normal, shard_count);
[[maybe_unused]] auto host4 = topo.add_node(node_state::normal, shard_count);
auto rack3 = topo.start_new_rack();
[[maybe_unused]] auto host5 = topo.add_node(node_state::normal, shard_count);
[[maybe_unused]] auto host6 = topo.add_node(node_state::normal, shard_count);
auto ks_name = add_keyspace(e, {{dc1, 2}}, 2);
auto table1 = add_table(e, ks_name).get();
tablet_id A{0}, B{0};
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(2);
auto tid = tmap.first_tablet();
A = tid;
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica{host1, 0},
tablet_replica{host2, 0},
}
});
tid = *tmap.next_tablet(tid);
B = tid;
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica{host5, 0},
tablet_replica{host6, 0},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto id = utils::UUID_gen::get_time_UUID();
// Build the map literal for CQL
auto rf_change_data_cql = format("{{'replication:class': 'NetworkTopologyStrategy', 'replication:{}:0': '{}', 'replication:{}:1': '{}'}}",
dc1, rack1.rack, dc1, rack2.rack);
e.execute_cql(format("INSERT INTO system.topology_requests (id, request_type, done, new_keyspace_rf_change_ks_name, new_keyspace_rf_change_data) VALUES ({}, 'keyspace_rf_change', False, '{}', {})",
id, ks_name, rf_change_data_cql)).get();
auto& stm = e.shared_token_metadata().local();
auto& topology = e.get_topology_state_machine().local()._topology;
topology.paused_rf_change_requests.insert(id);
rebalance_tablets(e);
check_rack_list(stm.get()->get_topology(), stm.get()->tablets().get_tablet_map(table1), dc1, {rack1.rack, rack2.rack});
}).get();
}
struct alter_result {
tablet_map new_tablet_map;
replication_strategy_config_options opts;
};
// Invokes tablet reallocation which is done on ALTER KEYSPACE.
static
alter_result alter_replication(cql_test_env& e,
const sstring& ks_name,
table_id table,
replication_strategy_config_options alter_options)
{
auto& stm = e.shared_token_metadata().local();
auto tmptr = stm.get();
auto& old_tablets = tmptr->tablets().get_tablet_map(table);
auto& ks = e.local_db().find_keyspace(ks_name);
auto& rs = ks.get_replication_strategy();
alter_options["class"] = sstring("NetworkTopologyStrategy");
cql3::statements::ks_prop_defs new_ks_props;
new_ks_props.add_property("replication", alter_options);
new_ks_props.validate();
BOOST_REQUIRE(new_ks_props.get_replication_strategy_class().has_value());
auto ks_md = new_ks_props.as_ks_metadata_update(ks.metadata(), *tmptr, e.local_db().features(), e.local_db().get_config());
auto new_options = ks_md->strategy_options();
testlog.info("Altering {} from {} using {} to {}", ks_name, rs.get_config_options(), alter_options, new_options);
locator::replication_strategy_params params{new_options, old_tablets.tablet_count(), std::nullopt};
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", params, tmptr->get_topology());
auto s = e.local_db().find_schema(table);
auto new_tablet_map = new_strategy->maybe_as_tablet_aware()->reallocate_tablets(s, tmptr, old_tablets.clone_gently().get()).get();
return alter_result{std::move(new_tablet_map), std::move(new_options)};
}
SEASTAR_THREAD_TEST_CASE(test_replica_allocation_with_rack_list_rf) {
cql_test_config cfg{};
cfg.db_config->rf_rack_valid_keyspaces.set(true);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
std::set<host_id> bad_nodes; // No replicas should be allocated there
// dc1
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto rack3 = topo.start_new_rack();
// dc2
auto rack4 = topo.start_new_dc();
auto rack5 = topo.start_new_rack();
auto dc1 = rack1.dc;
auto dc2 = rack4.dc;
// dc1
topo.add_node(node_state::normal, 1, rack1);
topo.add_node(node_state::normal, 1, rack2);
topo.add_node(node_state::normal, 1, rack3);
topo.add_node(node_state::normal, 1, rack3);
bad_nodes.insert(topo.add_node(node_state::left, 1, rack3));
// dc2
topo.add_node(node_state::normal, 1, rack4);
bad_nodes.insert(topo.add_node(node_state::decommissioning, 1, rack4));
topo.add_node(node_state::normal, 1, rack5);
auto test_alter = [&] (rack_list dc1_racks, rack_list dc2_racks,
replication_strategy_config_options alter_opts,
std::unordered_map<sstring, rack_list> expected_rf) {
auto ks1 = add_keyspace_racks(e, {{dc1, dc1_racks}, {dc2, dc2_racks}});
auto table1 = add_table(e, ks1).get();
rebalance_tablets(e);
auto& stm = e.shared_token_metadata().local();
auto tmptr = stm.get();
auto& tm_topo = tmptr->get_topology();
check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table1), dc1, dc1_racks, bad_nodes);
check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table1), dc2, dc2_racks, bad_nodes);
auto [new_tablet_map, new_opts] = alter_replication(e, ks1, table1, alter_opts);
for (auto&& [dc, rf] : expected_rf) {
check_rack_list(tm_topo, new_tablet_map, dc, rf, bad_nodes);
}
};
// dc1: 0 -> [rack1, rack2]
{
auto ks1 = add_keyspace(e, {{dc2, 1}});
auto table1 = add_table(e, ks1).get();
rebalance_tablets(e);
auto& stm = e.shared_token_metadata().local();
auto tmptr = stm.get();
auto& tm_topo = tmptr->get_topology();
check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table1), dc1, rack_list{}, bad_nodes);
auto dc1_new_racks = rack_list{rack1.rack, rack2.rack};
replication_strategy_config_options alter_opts;
alter_opts[dc1] = dc1_new_racks;
alter_opts[dc2] = "1";
auto [new_tablet_map, new_opts] = alter_replication(e, ks1, table1, alter_opts);
check_rack_list(tm_topo, new_tablet_map, dc1, dc1_new_racks, bad_nodes);
}
// dc1: [rack1] -> 0
{
auto ks1 = add_keyspace_racks(e, {{dc1, {rack1.rack}}});
auto table1 = add_table(e, ks1).get();
rebalance_tablets(e);
auto& stm = e.shared_token_metadata().local();
auto tmptr = stm.get();
auto& tm_topo = tmptr->get_topology();
check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table1), dc1, rack_list{rack1.rack}, bad_nodes);
replication_strategy_config_options alter_opts;
alter_opts[dc1] = sstring("0");
auto [new_tablet_map, new_opts] = alter_replication(e, ks1, table1, alter_opts);
check_rack_list(tm_topo, new_tablet_map, dc1, rack_list{}, bad_nodes);
}
test_alter({rack1.rack, rack2.rack}, {},
{{dc1, rack_list{rack1.rack, rack2.rack, rack3.rack}}},
{{dc1, rack_list{rack1.rack, rack2.rack, rack3.rack}}, {dc2, rack_list{}}});
test_alter({rack1.rack, rack2.rack, rack3.rack}, {},
{{dc1, rack_list{rack1.rack, rack3.rack}}},
{{dc1, rack_list{rack1.rack, rack3.rack}}, {dc2, rack_list{}}});
BOOST_REQUIRE_THROW(test_alter({rack1.rack}, {rack4.rack},
{{dc2, rack_list{}}},
{{dc1, rack_list{rack1.rack}}, {dc2, rack_list{}}}),
exceptions::configuration_exception);
test_alter({rack1.rack, rack2.rack}, {rack4.rack},
{{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack}}},
{{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack}}});
test_alter({rack2.rack}, {rack4.rack},
{{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack}}},
{{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack}}});
BOOST_REQUIRE_THROW(test_alter({rack2.rack}, {rack4.rack},
{{dc1, rack_list{rack2.rack}}},
{{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack}}}),
exceptions::configuration_exception);
test_alter({rack2.rack}, {rack4.rack},
{{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack, rack5.rack}}},
{{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack, rack5.rack}}});
test_alter({rack1.rack, rack2.rack, rack3.rack}, {},
{{dc1, rack_list{rack1.rack, rack2.rack, rack3.rack}}, {dc2, rack_list{rack4.rack}}},
{{dc1, rack_list{rack1.rack, rack2.rack, rack3.rack}}, {dc2, rack_list{rack4.rack}}});
BOOST_REQUIRE_THROW(test_alter({rack1.rack, rack2.rack, rack3.rack}, {},
{{dc2, rack_list{rack4.rack}}},
{{dc1, rack_list{rack1.rack, rack2.rack, rack3.rack}}, {dc2, rack_list{rack4.rack}}}),
exceptions::configuration_exception);
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_per_shard_count_respected_with_rack_list) {
cql_test_config cfg{};
cfg.db_config->tablets_initial_scale_factor.set(10);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
std::set<host_id> bad_nodes; // No replicas should be allocated there
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto rack3 = topo.start_new_rack();
auto dc = topo.dc();
auto host1 = topo.add_node(node_state::normal, 1, rack1);
topo.add_node(node_state::normal, 1, rack2);
topo.add_node(node_state::normal, 1, rack3);
topo.add_node(node_state::normal, 1, rack3);
auto ks_name = add_keyspace_racks(e, {{dc, {rack1.rack}}});
auto table = add_table(e, ks_name).get();
rebalance_tablets(e);
auto& stm = e.shared_token_metadata().local();
auto tmptr = stm.get();
auto& tm_topo = tmptr->get_topology();
// Check that we respect the 10 tablets/shard goal when using a subset of racks.
{
load_sketch load(tmptr);
load.populate_dc(dc).get();
auto l = load.get_shard_minmax(host1);
BOOST_REQUIRE_EQUAL(l.min(), 16);
BOOST_REQUIRE_EQUAL(l.max(), 16);
}
check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table), dc, rack_list{rack1.rack}, bad_nodes);
}, cfg).get();
}
// Reproduces https://github.com/scylladb/scylladb/issues/26768
SEASTAR_THREAD_TEST_CASE(test_replacing_last_node_in_rack_with_rack_list_rf) {
cql_test_config cfg{};
cfg.db_config->tablets_initial_scale_factor.set(10);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto dc = topo.dc();
auto host1 = topo.add_node(node_state::normal, 1, rack1);
auto host2 = topo.add_node(node_state::normal, 1, rack2);
auto ks_name = add_keyspace_racks(e, {{dc, {rack1.rack, rack2.rack}}});
auto table = add_table(e, ks_name).get();
topo.set_node_state(host2, node_state::left);
rebalance_tablets(e);
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_shrinks_respecting_rack_allocation) {
cql_test_config cfg{};
cfg.db_config->tablets_per_shard_goal.set(10);
cfg.db_config->tablets_initial_scale_factor.set(8);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
std::set<host_id> bad_nodes; // No replicas should be allocated there
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto rack3 = topo.start_new_rack();
auto dc = topo.dc();
auto host1 = topo.add_node(node_state::normal, 1, rack1);
auto host2 = topo.add_node(node_state::normal, 1, rack2);
auto host3 = topo.add_node(node_state::normal, 1, rack3);
auto& stats = topo.get_shared_load_stats();
auto ks1 = add_keyspace_racks(e, {{dc, {rack1.rack}}});
// We start with 8 tablets per table. per_shard_goal / 5 = 2
// Should shrink to 2 tablets per table.
auto t1_1 = add_table(e, ks1).get();
auto t1_2 = add_table(e, ks1).get();
auto t1_3 = add_table(e, ks1).get();
auto t1_4 = add_table(e, ks1).get();
auto t1_5 = add_table(e, ks1).get();
// This table doesn't violate the per shard goal in this rack, should not be shrunk.
auto ks2 = add_keyspace_racks(e, {{dc, {rack2.rack}}});
auto t2_1 = add_table(e, ks2).get();
// Those tables violate the goal, but due to rounding up, the count won't change.
auto ks3 = add_keyspace_racks(e, {{dc, {rack3.rack}}});
auto t3_1 = add_table(e, ks3).get();
auto t3_2 = add_table(e, ks3).get();
stats.set_size(t1_1, 0);
stats.set_size(t1_2, 0);
stats.set_size(t1_3, 0);
stats.set_size(t1_4, 0);
stats.set_size(t1_5, 0);
stats.set_size(t2_1, 0);
stats.set_size(t3_1, 0);
stats.set_size(t3_2, 0);
rebalance_tablets(e, &stats);
auto& stm = e.shared_token_metadata().local();
auto tmptr = stm.get();
auto& tm_topo = tmptr->get_topology();
auto& tmeta = stm.get()->tablets();
BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_1).tablet_count());
BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_2).tablet_count());
BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_3).tablet_count());
BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_4).tablet_count());
BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_5).tablet_count());
BOOST_REQUIRE_EQUAL(8, tmeta.get_tablet_map(t2_1).tablet_count());
BOOST_REQUIRE_EQUAL(8, tmeta.get_tablet_map(t3_1).tablet_count());
BOOST_REQUIRE_EQUAL(8, tmeta.get_tablet_map(t3_2).tablet_count());
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_merge_does_not_overload_racks) {
cql_test_config cfg{};
// This test relies on the fact that we use an RF strictly smaller than the number of racks.
// Because of that, we cannot enable `rf_rack_valid_keyspaces` in this test because we won't
// be able to create a keyspace.
cfg.db_config->rf_rack_valid_keyspaces.set(false);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto rack3 = topo.start_new_rack();
auto host1 = topo.add_node(node_state::normal, 1, rack3);
auto host2 = topo.add_node(node_state::normal, 1, rack2);
auto host3 = topo.add_node(node_state::normal, 1, rack1);
auto host4 = topo.add_node(node_state::normal, 1, rack3);
auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 2); // RF=2
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(2);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica{host1, 0},
tablet_replica{host3, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica{host2, 0},
tablet_replica{host4, 0},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
// Trigger merge
e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get();
auto& stm = e.shared_token_metadata().local();
topo.get_shared_load_stats().set_size(table1, 0);
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
rebalance_tablets(e, &topo.get_shared_load_stats(), {}, [&] (const migration_plan& plan) {
check_no_rack_overload(*stm.get());
return false;
});
BOOST_REQUIRE_EQUAL(1, stm.get()->tablets().get_tablet_map(table1).tablet_count());
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_sketch_with_load_stats_uses_tablet_sizes) {
auto cfg = tablet_cql_test_config();
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto& stm = e.shared_token_metadata().local();
const uint64_t tablet_count = 16;
const size_t shard_count = 2;
auto host = topo.add_node(node_state::normal, shard_count, topo.rack());
auto ks = add_keyspace(e, {{topo.dc(), 1}}, tablet_count);
auto table = add_table(e, ks).get();
auto& shared_stats = topo.get_shared_load_stats();
shared_stats.stats.tablet_stats[host].effective_capacity = service::default_target_tablet_size * shard_count;
auto& tmap = stm.get()->tablets().get_tablet_map(table);
// Set load on shard 0 to 0 (all tablet sizes are 0),
// and set all tablet sizes on shard 1 to default_target_tablet_size
tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) {
dht::token_range trange { tmap.get_token_range(tid) };
size_t tablet_size = 0;
if (tinfo.replicas[0].shard == 1) {
tablet_size = service::default_target_tablet_size;
}
shared_stats.stats.tablet_stats[host].tablet_sizes[table][trange] = tablet_size;
return make_ready_future<>();
}).get();
load_sketch load(stm.get(), shared_stats.get());
load.populate().get();
// Add new tablets to shard 0 until the load is equal
for (size_t cnt = 0; cnt < tablet_count / shard_count; cnt++) {
BOOST_REQUIRE_EQUAL(load.get_least_loaded_shard(host), 0);
BOOST_REQUIRE_EQUAL(load.get_most_loaded_shard(host), 1);
auto shard_minmax = load.get_shard_minmax(host);
BOOST_REQUIRE_EQUAL(shard_minmax.min(), cnt);
BOOST_REQUIRE_EQUAL(shard_minmax.max(), tablet_count / 2);
// Add a tablet of size default_target_tablet_size to the least loaded shard (shard 0)
load.next_shard(host, 1, service::default_target_tablet_size);
}
// Check the load on both shards is equal
auto shard_minmax = load.get_shard_minmax(host);
BOOST_REQUIRE_EQUAL(shard_minmax.min(), shard_minmax.max());
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_sketch_uses_correct_disk_capacity) {
auto cfg = tablet_cql_test_config();
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto& stm = e.shared_token_metadata().local();
locator::host_id local_host = e.shared_token_metadata().local().get()->get_my_id();
locator::host_id host1 = topo.add_node(node_state::normal, 1);
load_stats stats;
// Check that load_sketch throws when it didn't get the node capacity from load_stats
{
load_sketch load(stm.get(), make_lw_shared(stats));
load.populate().get();
BOOST_REQUIRE_THROW(load.get_load(local_host), std::runtime_error);
}
// Check that load_sketch falls back to the gross capacity when effective_capacity is not present
{
stats.capacity[local_host] = 10;
load_sketch load(stm.get(), make_lw_shared(stats));
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_capacity(local_host), 10);
}
// Check that load_sketch uses effective_capacity when gross disk capacity is also present
{
stats.capacity[local_host] = 10;
stats.tablet_stats[local_host].effective_capacity = 20;
load_sketch load(stm.get(), make_lw_shared(stats));
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_capacity(local_host), 20);
}
// Check that load_sketch uses gross disk capacity on all nodes when force_capacity_based_load is set
{
stats.capacity[host1] = 5;
stats.capacity[local_host] = 10;
stats.tablet_stats[local_host].effective_capacity = 20;
load_sketch load(stm.get(), make_lw_shared(stats));
load.set_force_capacity_based_load(true);
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_capacity(local_host), 10);
BOOST_REQUIRE_EQUAL(load.get_capacity(host1), 5);
}
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_sketch_without_all_tablet_sizes_throws) {
auto cfg = tablet_cql_test_config();
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto& stm = e.shared_token_metadata().local();
const uint64_t tablet_count = 16;
const size_t shard_count = 2;
auto host = topo.add_node(node_state::normal, shard_count, topo.rack());
auto ks = add_keyspace(e, {{topo.dc(), 1}}, tablet_count);
auto table = add_table(e, ks).get();
auto& shared_stats = topo.get_shared_load_stats();
auto& tmap = stm.get()->tablets().get_tablet_map(table);
// Set all tablet sizes except for tablet_id == 0
tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) {
if (tid.id != 0) {
dht::token_range trange { tmap.get_token_range(tid) };
shared_stats.stats.tablet_stats[host].tablet_sizes[table][trange] = service::default_target_tablet_size;
}
return make_ready_future<>();
}).get();
load_sketch load(stm.get(), shared_stats.get());
load.populate().get();
BOOST_REQUIRE_THROW(load.get_load(host), std::runtime_error);
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_sketch_minimal_tablet_size) {
auto cfg = tablet_cql_test_config();
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto& stm = e.shared_token_metadata().local();
const uint64_t tablet_count = 16;
const size_t shard_count = 2;
const uint64_t GB = 1024L * 1024L * 1024L;
auto host = topo.add_node(node_state::normal, shard_count, topo.rack());
auto ks = add_keyspace(e, {{topo.dc(), 1}}, tablet_count);
auto table = add_table(e, ks).get();
auto& shared_stats = topo.get_shared_load_stats();
shared_stats.stats.tablet_stats[host].effective_capacity = GB;
auto& tmap = stm.get()->tablets().get_tablet_map(table);
// Set all tablet sizes to 1 byte
tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) {
dht::token_range trange { tmap.get_token_range(tid) };
shared_stats.stats.tablet_stats[host].tablet_sizes[table][trange] = GB / 2;
return make_ready_future<>();
}).get();
// Check that load_sketch computes correct load with reported tablet sizes
{
load_sketch load(stm.get(), shared_stats.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_load(host), tablet_count / 2);
}
// Check that load_sketch uses minimal_tablet_sizes
{
load_sketch load(stm.get(), shared_stats.get());
load.set_minimal_tablet_size(GB);
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_load(host), tablet_count);
}
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) {
do_with_cql_env_thread([] (auto& e) {
// Tests the scenario of balacning cluster with DOWN node
// Verifies that load balancer doesn't moves tablets to that node.
unsigned shard_count = 2;
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, shard_count);
auto host2 = topo.add_node(node_state::normal, shard_count);
auto host3 = topo.add_node(node_state::normal, shard_count);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4);
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) {
tablet_map tmap(4);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 1},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 1},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 1},
tablet_replica {host2, 0},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
return make_ready_future<>();
});
auto& stm = e.shared_token_metadata().local();
// Sanity check
{
load_sketch load(stm.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_load(host1), 4);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host1), 2);
BOOST_REQUIRE_EQUAL(load.get_load(host2), 4);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host2), 2);
BOOST_REQUIRE_EQUAL(load.get_load(host3), 0);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host3), 0);
}
rebalance_tablets(e, &topo.get_shared_load_stats(), {host3});
{
load_sketch load(stm.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_load(host3), 0);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host3), 0);
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_colocated_tablets) {
do_with_cql_env_thread([] (auto& e) {
// Tests that co-located tablets remain co-located during load balancing.
// table1 and table2 are co-located
// table3 and table4 are co-located
// initially they all start with one tablet on the same host and shard.
// load balancing is expected to move one pair of co-located tablets to the
// other host while maintaining co-location of each pair.
logging::logger_registry().set_logger_level("load_balancer", logging::log_level::trace);
unsigned shard_count = 2;
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, shard_count);
auto host2 = topo.add_node(node_state::normal, shard_count);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1);
auto table1 = add_table(e, ks_name).get();
auto table2 = add_table(e, ks_name).get();
auto table3 = add_table(e, ks_name).get();
auto table4 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(1);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
}
});
tablet_map tmap1 = co_await tmap.clone_gently();
tmeta.set_tablet_map(table1, std::move(tmap1));
co_await tmeta.set_colocated_table(table2, table1);
tablet_map tmap3 = co_await tmap.clone_gently();
tmeta.set_tablet_map(table3, std::move(tmap3));
co_await tmeta.set_colocated_table(table4, table3);
});
auto& stm = e.shared_token_metadata().local();
// Sanity check
{
load_sketch load(stm.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_load(host1), 4);
BOOST_REQUIRE_EQUAL(load.get_load(host2), 0);
}
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
rebalance_tablets(e, &topo.get_shared_load_stats());
{
load_sketch load(stm.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_load(host1), 2);
BOOST_REQUIRE_EQUAL(load.get_load(host2), 2);
auto& tmap1 = stm.get()->tablets().get_tablet_map(table1);
auto& tmap2 = stm.get()->tablets().get_tablet_map(table2);
auto& tmap3 = stm.get()->tablets().get_tablet_map(table3);
auto& tmap4 = stm.get()->tablets().get_tablet_map(table4);
BOOST_REQUIRE_EQUAL(tmap1.get_tablet_info(tmap1.first_tablet()).replicas, tmap2.get_tablet_info(tmap2.first_tablet()).replicas);
BOOST_REQUIRE_EQUAL(tmap3.get_tablet_info(tmap3.first_tablet()).replicas, tmap4.get_tablet_info(tmap4.first_tablet()).replicas);
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) {
// Verifies that load balancer moves tablets out of the decommissioned node.
// The scenario is such that replication factor of tablets can be satisfied after decommission.
do_with_cql_env_thread([](auto& e) {
unsigned shard_count = 2;
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, shard_count);
auto host2 = topo.add_node(node_state::normal, shard_count);
auto host3 = topo.add_node(node_state::decommissioning, shard_count);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4);
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(4);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 1},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 1},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host3, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host2, 1},
tablet_replica {host3, 1},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
shared_load_stats& load_stats = topo.get_shared_load_stats();
load_stats.set_default_tablet_sizes(stm.get());
rebalance_tablets(e, &load_stats);
{
load_sketch load(stm.get(), load_stats.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host1), 2);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host2), 2);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host3), 0);
}
topo.set_node_state(host3, node_state::left);
rebalance_tablets(e, &load_stats);
{
load_sketch load(stm.get(), load_stats.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host1), 2);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host2), 2);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host3), 0);
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_table_creation_during_decommission) {
// Verifies that new table doesn't get tablets allocated on a node being decommissioned
// which may leave them on replicas absent in topology post decommission.
// Also verifies that the allocated tablet count doesn't take into account nodes being decommissioned
// to achieve the desired tablet count per shard in a DC.
auto cfg = tablet_cql_test_config();
cfg.db_config->tablets_initial_scale_factor(1);
do_with_cql_env_thread([](auto& e) {
topology_builder topo(e);
topo.add_node(node_state::normal);
topo.add_node(node_state::normal);
auto host3 = topo.add_node(node_state::decommissioning);
auto host4 = topo.add_node(node_state::left);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}});
auto table1 = add_table(e, ks_name).get();
auto s = e.local_db().find_schema(table1);
auto& stm = e.shared_token_metadata().local();
auto& tmap = stm.get()->tablets().get_tablet_map(table1);
// Verify we do not treat leaving nodes as having capacity.
BOOST_REQUIRE_EQUAL(tmap.tablet_count(), 2);
tmap.for_each_tablet([&](auto tid, auto& tinfo) {
for (auto& replica : tinfo.replicas) {
BOOST_REQUIRE_NE(replica.host, host3);
BOOST_REQUIRE_NE(replica.host, host4);
}
return make_ready_future<>();
}).get();
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_table_creation_during_rack_decommission) {
// Reproduces #22625
// The problematic scenario happens when allocating tablets for a new table
// when there is a rack with only non-normal nodes.
do_with_cql_env_thread([](auto& e) {
topology_builder topo(e);
topo.add_node();
topo.add_node();
topo.start_new_rack();
auto host3 = topo.add_node(node_state::decommissioning);
auto host4 = topo.add_node(node_state::left);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 8);
auto table1 = add_table(e, ks_name).get();
rebalance_tablets(e);
auto& stm = e.shared_token_metadata().local();
auto& tmap = stm.get()->tablets().get_tablet_map(table1);
tmap.for_each_tablet([&](auto tid, auto& tinfo) {
for (auto& replica : tinfo.replicas) {
BOOST_REQUIRE_NE(replica.host, host3);
BOOST_REQUIRE_NE(replica.host, host4);
}
return make_ready_future<>();
}).get();
}, tablet_cql_test_config()).get();
}
SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) {
// Verifies that load balancer moves tablets out of the decommissioned node.
// The scenario is such that replication constraints of tablets can be satisfied after decommission.
do_with_cql_env_thread([](auto& e) {
std::vector<endpoint_dc_rack> racks;
topology_builder topo(e);
racks.push_back(topo.rack());
auto host1 = topo.add_node(node_state::normal);
auto host3 = topo.add_node(node_state::normal);
topo.start_new_rack();
racks.push_back(topo.rack());
auto host2 = topo.add_node(node_state::normal);
auto host4 = topo.add_node(node_state::decommissioning);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4);
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(4);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host2, 0},
tablet_replica {host3, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host3, 0},
tablet_replica {host4, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 0},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
shared_load_stats& load_stats = topo.get_shared_load_stats();
load_stats.set_default_tablet_sizes(stm.get());
rebalance_tablets(e, &load_stats);
{
load_sketch load(stm.get(), load_stats.get());
load.populate().get();
BOOST_REQUIRE_GE(load.get_avg_tablet_count(host1), 2);
BOOST_REQUIRE_GE(load.get_avg_tablet_count(host2), 2);
BOOST_REQUIRE_GE(load.get_avg_tablet_count(host3), 2);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host4), 0);
}
// Verify replicas are not collocated on racks
{
auto tm = stm.get();
auto& tmap = tm->tablets().get_tablet_map(table1);
tmap.for_each_tablet([&](auto tid, auto& tinfo) -> future<> {
auto rack1 = tm->get_topology().get_rack(tinfo.replicas[0].host);
auto rack2 = tm->get_topology().get_rack(tinfo.replicas[1].host);
BOOST_REQUIRE_NE(rack1, rack2);
return make_ready_future<>();
}).get();
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) {
// Verifies that load balancer moves tablets out of the decommissioned node.
// The scenario is such that it is impossible to distribute replicas without violating rack uniqueness.
do_with_cql_env_thread([](auto& e) {
std::vector<endpoint_dc_rack> racks;
topology_builder topo(e);
racks.push_back(topo.rack());
auto host1 = topo.add_node(node_state::normal);
auto host2 = topo.add_node(node_state::normal);
auto host3 = topo.add_node(node_state::normal);
auto rack2 = topo.start_new_rack();
racks.push_back(topo.rack());
auto host4 = topo.add_node(node_state::normal);
auto ks_name = add_keyspace_racks(e, {{rack2.dc, {rack2.rack}}}, 4);
auto table1 = add_table(e, ks_name).get();
topo.set_node_state(host4, node_state::decommissioning);
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(4);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host4, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host2, 0},
tablet_replica {host4, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host3, 0},
tablet_replica {host4, 0},
}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host4, 0},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
BOOST_REQUIRE_THROW(rebalance_tablets(e, &topo.get_shared_load_stats()), std::runtime_error);
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) {
// Verifies that load balancer moves tablets out of the decommissioned node.
// The scenario is such that replication factor of tablets can be satisfied after decommission.
do_with_cql_env_thread([](auto& e) {
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, 2);
auto host2 = topo.add_node(node_state::normal, 2);
auto host3 = topo.add_node(node_state::decommissioning, 2);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1);
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(1);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 0},
tablet_replica {host3, 0},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
BOOST_REQUIRE_THROW(rebalance_tablets(e, &topo.get_shared_load_stats()), std::runtime_error);
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) {
do_with_cql_env_thread([] (auto& e) {
// Tests the scenario of bootstrapping a single node.
// Verifies that the load balancer balances tablets on that node
// even though there is already an active migration.
// The test verifies that the load balancer creates a plan
// which when executed will achieve perfect balance,
// which is a proof that it doesn't stop due to active migrations.
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, 2);
topo.start_new_rack();
auto host2 = topo.add_node(node_state::normal, 1);
auto host3 = topo.add_node(node_state::normal, 1);
auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4);
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(4);
std::optional<tablet_id> tid = tmap.first_tablet();
for (int i = 0; i < 4; ++i) {
tmap.set_tablet(*tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 0},
}
});
tid = tmap.next_tablet(*tid);
}
tmap.set_tablet_transition_info(tmap.first_tablet(), tablet_transition_info {
tablet_transition_stage::allow_write_both_read_old,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host3, 0},
},
tablet_replica {host3, 0}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
abort_source as;
auto guard = e.get_raft_group0_client().start_operation(as).get();
auto& stm = e.shared_token_metadata().local();
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
rebalance_tablets_as_in_progress(e, topo.get_shared_load_stats());
execute_transitions(stm);
{
load_sketch load(stm.get());
load.populate().get();
for (auto h : {host1, host2, host3}) {
testlog.debug("Checking host {}", h);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(h), 2);
}
}
// Restore consistency between stm and system tables before releasing group0 guard.
save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get();
}).get();
}
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, 1);
topo.start_new_rack();
auto host2 = topo.add_node(node_state::normal, 1);
topo.add_node(node_state::normal, 2);
auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4);
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(4);
std::optional<tablet_id> tid = tmap.first_tablet();
for (int i = 0; i < 4; ++i) {
tmap.set_tablet(*tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 0},
}
});
tid = tmap.next_tablet(*tid);
}
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
rebalance_tablets(e, &topo.get_shared_load_stats());
BOOST_REQUIRE(e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get().empty());
utils::get_local_injector().enable("tablet_allocator_shuffle");
auto disable_injection = seastar::defer([&] {
utils::get_local_injector().disable("tablet_allocator_shuffle");
});
BOOST_REQUIRE(!e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr,topo.get_load_stats()).get().empty());
}).get();
}
#endif
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
const auto shard_count = 2;
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto host1 = topo.add_node(node_state::normal, shard_count, rack1);
auto host2 = topo.add_node(node_state::normal, shard_count, rack2);
auto host3 = topo.add_node(node_state::normal, shard_count, rack1);
auto host4 = topo.add_node(node_state::normal, shard_count, rack2);
auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 16);
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(16);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, tests::random::get_int<shard_id>(0, shard_count - 1)},
tablet_replica {host2, tests::random::get_int<shard_id>(0, shard_count - 1)},
}
});
}
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
shared_load_stats& load_stats = topo.get_shared_load_stats();
load_stats.set_default_tablet_sizes(stm.get());
rebalance_tablets(e, &load_stats);
{
load_sketch load(stm.get());
load.populate().get();
for (auto h : {host1, host2, host3, host4}) {
testlog.debug("Checking host {}", h);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(h), 4);
BOOST_REQUIRE_LE(load.get_shard_tablet_count_imbalance(h), 1);
}
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) {
do_with_cql_env_thread([](auto& e) {
topology_builder topo(e);
auto host1 = topo.add_node(node_state::decommissioning, 8);
auto host2 = topo.add_node(node_state::normal, 1);
auto host3 = topo.add_node(node_state::normal, 7);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16);
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(16);
for (auto tid: tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
}
});
}
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto until_nodes_drained = [] (const migration_plan& plan) {
return !plan.has_nodes_to_drain();
};
auto& stm = e.shared_token_metadata().local();
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
rebalance_tablets(e, &topo.get_shared_load_stats(), {}, until_nodes_drained);
{
load_sketch load(stm.get());
load.populate().get();
for (auto h: {host2, host3}) {
testlog.debug("Checking host {}", h);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(h), 2); // 16 tablets / 8 shards = 2 tablets / shard
BOOST_REQUIRE_EQUAL(load.get_shard_tablet_count_imbalance(h), 0);
}
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, 2);
topo.add_node(node_state::normal, 2);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16);
auto table1 = add_table(e, ks_name).get();
abort_source as;
auto guard = e.get_raft_group0_client().start_operation(as).get();
auto& stm = e.shared_token_metadata().local();
// host1 is loaded and host2 is empty, resulting in an imbalance.
// host1's shard 0 is loaded and shard 1 is empty, resulting in intra-node imbalance.
mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(16);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
}
});
}
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get();
BOOST_REQUIRE(!plan.empty());
}
// Disable load balancing
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.tablets().set_balancing_enabled(false);
return make_ready_future<>();
}).get();
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get();
BOOST_REQUIRE(plan.empty());
}
// Check that cloning preserves the setting
stm.mutate_token_metadata([&] (token_metadata& tm) {
return make_ready_future<>();
}).get();
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get();
BOOST_REQUIRE(plan.empty());
}
// Enable load balancing back
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.tablets().set_balancing_enabled(true);
return make_ready_future<>();
}).get();
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get();
BOOST_REQUIRE(!plan.empty());
}
// Check that cloning preserves the setting
stm.mutate_token_metadata([&] (token_metadata& tm) {
return make_ready_future<>();
}).get();
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get();
BOOST_REQUIRE(!plan.empty());
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto host1 = topo.add_node(node_state::removing, 2);
topo.add_node(node_state::normal, 2);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16);
auto table1 = add_table(e, ks_name).get();
abort_source as;
auto guard = e.get_raft_group0_client().start_operation(as).get();
auto& stm = e.shared_token_metadata().local();
mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(16);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
}
});
}
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
migration_plan plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get();
BOOST_REQUIRE(plan.has_nodes_to_drain());
for (auto&& mig : plan.migrations()) {
BOOST_REQUIRE(mig.kind != tablet_transition_kind::intranode_migration);
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_plan_fails_when_removing_last_replica) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto host1 = topo.add_node();
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1);
auto table1 = add_table(e, ks_name).get();
topo.set_node_state(host1, node_state::removing);
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(1);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set{tablet_replica{host1, 0}}
});
}
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
std::unordered_set<host_id> skiplist = {host1};
auto& stm = e.shared_token_metadata().local();
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
BOOST_REQUIRE_THROW(rebalance_tablets(e, &topo.get_shared_load_stats(), skiplist), std::runtime_error);
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_skiplist_is_ignored_when_draining) {
// When doing normal load balancing, we can ignore DOWN nodes in the node set
// and just balance the UP nodes among themselves because it's ok to equalize
// load in that set.
// It's dangerous to do that when draining because that can lead to overloading of the UP nodes.
// In the worst case, we can have only one non-drained node in the UP set, which would receive
// all the tablets of the drained node, doubling its load.
// It's safer to let the drain fail/stall.
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto host1 = topo.add_node(node_state::removing);
auto host2 = topo.add_node(node_state::normal);
auto host3 = topo.add_node(node_state::normal);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 2);
auto table1 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(2);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set{tablet_replica{host1, 0}}
});
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set{tablet_replica{host1, 0}}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
std::unordered_set<host_id> skiplist = {host2};
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
rebalance_tablets(e, &topo.get_shared_load_stats(), skiplist);
{
load_sketch load(stm.get());
load.populate().get();
for (auto h : {host2, host3}) {
testlog.debug("Checking host {}", h);
BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(h), 1);
}
}
}).get();
}
static
void check_tablet_invariants(const tablet_metadata& tmeta) {
for (const auto& [table, tmap] : tmeta.all_tables_ungrouped()) {
tmap->for_each_tablet([&](auto tid, const tablet_info& tinfo) -> future<> {
std::unordered_set<host_id> hosts;
// Uniqueness of hosts
for (const auto& replica: tinfo.replicas) {
auto ret = hosts.insert(replica.host).second;
if (!ret) {
testlog.error("Failed tablet invariant check for tablet {}: {}", tid, tinfo.replicas);
}
BOOST_REQUIRE(ret);
}
return make_ready_future<>();
}).get();
}
}
static
std::vector<host_id>
allocate_replicas_in_racks(const std::vector<endpoint_dc_rack>& racks, int rf,
const std::unordered_map<sstring, std::vector<host_id>>& hosts_by_rack) {
// Choose replicas randomly while loading racks evenly.
std::vector<host_id> replica_hosts;
for (int i = 0; i < rf; ++i) {
auto rack = racks[i % racks.size()];
auto& rack_hosts = hosts_by_rack.at(rack.rack);
while (true) {
auto candidate_host = rack_hosts[tests::random::get_int<shard_id>(0, rack_hosts.size() - 1)];
if (std::find(replica_hosts.begin(), replica_hosts.end(), candidate_host) == replica_hosts.end()) {
replica_hosts.push_back(candidate_host);
break;
}
}
}
return replica_hosts;
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
auto do_test_case = [] (const shard_id rf) {
cql_test_config cfg;
cfg.need_remote_proxy = true;
return do_with_cql_env_thread([rf] (auto& e) {
topology_builder topo(e);
const int n_hosts = 6;
auto shard_count = 2;
// Sanity check just in case someone modifies the caller of this lambda
// and starts providing RF > n_hosts. In that case, we wouldn't be able
// to create an RF-rack-valid keyspace.
assert(rf <= n_hosts);
std::vector<host_id> hosts;
std::unordered_map<sstring, std::vector<host_id>> hosts_by_rack;
std::vector<endpoint_dc_rack> racks{topo.rack()};
for (shard_id i = 1; i < rf; ++i) {
racks.push_back(topo.start_new_rack());
}
for (int i = 0; i < n_hosts; ++i) {
auto rack = racks[(i + 1) % racks.size()];
auto h = topo.add_node(node_state::normal, shard_count, rack);
if (i) {
// Leave the first host empty by making it invisible to allocation algorithm.
hosts_by_rack[rack.rack].push_back(h);
}
}
auto& stm = e.shared_token_metadata().local();
size_t total_tablet_count = 0;
std::vector<sstring> keyspaces;
size_t tablet_count_bits = 8;
for (size_t log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) {
if (tests::random::get_bool()) {
continue;
}
auto initial_tablets = 1 << log2_tablets;
keyspaces.push_back(add_keyspace(e, {{topo.dc(), rf}}, initial_tablets));
auto table = add_table(e, keyspaces.back()).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(initial_tablets);
for (auto tid : tmap.tablet_ids()) {
// Choose replicas randomly while loading racks evenly.
std::vector<host_id> replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack);
tablet_replica_set replicas;
for (auto h : replica_hosts) {
auto shard = tests::random::get_int<shard_id>(0, shard_count - 1);
replicas.push_back(tablet_replica {h, shard});
}
tmap.set_tablet(tid, tablet_info {std::move(replicas)});
}
total_tablet_count += tmap.tablet_count();
tmeta.set_tablet_map(table, std::move(tmap));
return make_ready_future<>();
});
}
testlog.debug("tablet metadata: {}", stm.get()->tablets());
testlog.info("Total tablet count: {}, hosts: {}", total_tablet_count, hosts.size());
check_tablet_invariants(stm.get()->tablets());
rebalance_tablets(e);
check_tablet_invariants(stm.get()->tablets());
{
load_sketch load(stm.get());
load.populate().get();
min_max_tracker<unsigned> min_max_load;
for (auto h: hosts) {
auto l = load.get_avg_tablet_count(h);
testlog.info("Load on host {}: {}", h, l);
min_max_load.update(l);
BOOST_REQUIRE_LE(load.get_shard_tablet_count_imbalance(h), 1);
}
testlog.debug("tablet metadata: {}", stm.get()->tablets());
testlog.debug("Min load: {}, max load: {}", min_max_load.min(), min_max_load.max());
// FIXME: The algorithm cannot achieve balance in all cases yet, so we only check that it stops.
// For example, if we have an overloaded node in one rack and target underloaded node in a different rack,
// we won't be able to reduce the load gap by moving tablets between the two. We have to balance the overloaded
// rack first, which is unconstrained.
// Uncomment the following line when the algorithm is improved.
// BOOST_REQUIRE(min_max_load.max() - min_max_load.min() <= 1);
}
seastar::parallel_for_each(keyspaces, [&] (const sstring& ks) {
return e.execute_cql(fmt::format("DROP KEYSPACE {}", ks)).discard_result();
}).get();
}, std::move(cfg));
};
const int test_case_number = 13;
for (int i = 0; i < test_case_number; ++i) {
const shard_id rf = tests::random::get_int<shard_id>(2, 4);
testlog.info("{}: Starting test case {} for RF={}", std::source_location::current().function_name(), i + 1, rf);
do_test_case(rf).get();
}
}
SEASTAR_THREAD_TEST_CASE(test_balancing_heterogeneous_cluster) {
// 3 racks, RF=3. 1 table with 90% space.
// We start with 1 i4i_2xlarge per rack, then add i4i_large to each rack.
// We want utilization to be balanced.
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
shared_load_stats& load_stats = topo.get_shared_load_stats();
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto rack3 = topo.start_new_rack();
topo.add_i4i_2xlarge(rack1);
topo.add_i4i_2xlarge(rack2);
topo.add_i4i_2xlarge(rack3);
auto& stm = e.shared_token_metadata().local();
auto ks_name = add_keyspace(e, {{topo.dc(), 3}});
auto table1 = add_table(e, ks_name).get();
load_stats.set_default_tablet_sizes(stm.get());
load_stats.set_size(table1, 0.9 * topo.get_capacity() / 3);
rebalance_tablets(e, &load_stats);
testlog.info("Initial cluster ready");
std::unordered_map<host_id, double> initial_utilization;
auto& hosts = topo.hosts();
{
load_sketch load(stm.get(), load_stats.get());
load.populate().get();
for (auto h: hosts) {
auto u = load.get_allocated_utilization(h);
BOOST_REQUIRE(u);
initial_utilization[h] = *u;
}
}
topo.add_i4i_large(rack1);
rebalance_tablets(e, &load_stats);
testlog.info("Expanded capacity in rack1");
{
load_sketch load(stm.get(), load_stats.get());
load.populate().get();
auto u0 = *load.get_allocated_utilization(hosts[0]);
BOOST_REQUIRE_LT(u0, initial_utilization[hosts[0]]);
initial_utilization[hosts[0]] = u0;
// rack2 and rack3 are not changed, to keep racks not overloaded (RF=rack_count)
BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[1]), initial_utilization[hosts[1]]);
BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[2]), initial_utilization[hosts[2]]);
}
topo.add_i4i_large(rack2);
rebalance_tablets(e, &load_stats);
testlog.info("Expanded capacity in rack2");
{
load_sketch load(stm.get(), load_stats.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[0]), initial_utilization[hosts[0]]);
auto u1 = *load.get_allocated_utilization(hosts[1]);
BOOST_REQUIRE_LT(u1, initial_utilization[hosts[1]]);
initial_utilization[hosts[1]] = u1;
BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[2]), initial_utilization[hosts[2]]);
}
topo.add_i4i_large(rack3);
rebalance_tablets(e, &load_stats);
testlog.info("Expanded capacity in rack3");
{
load_sketch load(stm.get(), load_stats.get());
load.populate().get();
BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[0]), initial_utilization[hosts[0]]);
BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[1]), initial_utilization[hosts[1]]);
auto u2 = *load.get_allocated_utilization(hosts[2]);
BOOST_REQUIRE_LT(u2, initial_utilization[hosts[2]]);
initial_utilization[hosts[2]] = u2;
// Check that utilization difference is < 1%
min_max_tracker<double> node_utilization;
for (auto h: hosts) {
auto u = load.get_allocated_utilization(h);
BOOST_REQUIRE(u);
node_utilization.update(*u);
}
BOOST_REQUIRE_LT(node_utilization.max() - node_utilization.min(), 0.01);
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_imbalance_in_hetero_cluster_with_two_tables) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
shared_load_stats& load_stats = topo.get_shared_load_stats();
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto rack3 = topo.start_new_rack();
topo.add_i4i_2xlarge(rack1);
topo.add_i4i_2xlarge(rack2);
topo.add_i4i_2xlarge(rack3);
auto& stm = e.shared_token_metadata().local();
auto ks_name = add_keyspace(e, {{topo.dc(), 3}}, 128);
auto table1 = add_table(e, ks_name).get();
load_stats.set_size(table1, 0);
load_stats.set_default_tablet_sizes(stm.get());
testlog.info("Initial cluster ready");
topo.add_i4i_large(rack1);
topo.add_i4i_large(rack2);
topo.add_i4i_large(rack3);
rebalance_tablets(e, &load_stats);
testlog.info("Expanded capacity");
auto ks2_name = add_keyspace(e, {{topo.dc(), 3}}, 128);
auto table2 = add_table(e, ks2_name).get();
auto& hosts = topo.hosts();
{
load_sketch load(stm.get(), load_stats.get());
load.populate(std::nullopt, table2).get();
// Check that utilization difference is < 4%
min_max_tracker<double> node_utilization;
for (auto h: hosts) {
auto u = load.get_allocated_utilization(h);
BOOST_REQUIRE(u);
testlog.info("table2: {}: {}", h, u);
node_utilization.update(*u);
}
// Initial allocation is not capacity-aware so we're still not perfect here.
// See https://github.com/scylladb/scylladb/issues/23378
BOOST_REQUIRE_LT(node_utilization.max() - node_utilization.min(), 0.13);
}
}).get();
}
// Reproduces https://github.com/scylladb/scylladb/issues/23631
SEASTAR_THREAD_TEST_CASE(test_imbalance_in_hetero_cluster_with_two_tables_imbalanced) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
shared_load_stats& load_stats = topo.get_shared_load_stats();
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto rack3 = topo.start_new_rack();
topo.add_i4i_2xlarge(rack1);
topo.add_i4i_2xlarge(rack2);
topo.add_i4i_2xlarge(rack3);
auto& stm = e.shared_token_metadata().local();
auto ks_name = add_keyspace(e, {{topo.dc(), 3}}, 512);
auto table1 = add_table(e, ks_name).get();
load_stats.set_size(table1, topo.get_capacity() * 0.8 / 3);
testlog.info("Initial cluster ready");
topo.add_i4i_large(rack1);
topo.add_i4i_large(rack2);
topo.add_i4i_large(rack3);
testlog.info("Expanded capacity");
auto ks2_name = add_keyspace(e, {{topo.dc(), 3}});
auto table2 = add_table(e, ks2_name).get();
auto& hosts = topo.hosts();
{
load_sketch load(stm.get(), load_stats.get());
load.populate(std::nullopt, table2).get();
min_max_tracker<double> node_utilization;
for (auto h : hosts) {
auto u = load.get_allocated_utilization(h);
testlog.info("table2: {}: {}", h, u);
node_utilization.update(u.value_or(0));
}
BOOST_REQUIRE_LT(node_utilization.max() - node_utilization.min(), 0.13);
}
}).get();
}
static table_id create_table_and_set_tablet_sizes(cql_test_env& e, topology_builder& topo, sstring ks_name, size_t tablet_count, uint64_t table_size_bytes) {
const uint64_t tablet_size = table_size_bytes / tablet_count;
std::map<sstring, sstring> tablet_options = {{"min_tablet_count", to_sstring(tablet_count)}};
auto table = add_table(e, ks_name, tablet_options).get();
auto& load_stats = topo.get_shared_load_stats();
load_stats.set_size(table, table_size_bytes);
auto& stm = e.shared_token_metadata().local();
auto& tmap = stm.get()->tablets().get_tablet_map(table);
tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) {
auto replicas = tinfo.replicas;
for (auto& r : tinfo.replicas) {
locator::range_based_tablet_id rb_tid {table, tmap.get_token_range(tid)};
load_stats.set_tablet_size(r.host, rb_tid, tablet_size);
}
return make_ready_future<>();
}).get();
testlog.info("Created table {} of size {:i} with {} tablets and tablet size of {:i}",
table, utils::pretty_printed_data_size(table_size_bytes), tablet_count, utils::pretty_printed_data_size(tablet_size));
return table;
}
SEASTAR_THREAD_TEST_CASE(test_size_based_load_balancing_table_load) {
// This test validates the table balance in size based load balancing.
// The initial tablet allocation during table creation is non-deterministic because of
// shuffle in network_topology_strategy.cc. This means that the tablet balancer will work on a different
// initial setup on every run, and that the final tablet distribution will also be different.
// With max_imbalance_threshold set to 1.4 and running the test 10000 times there were no failures.
// 1.5 was selected as a safety buffer to avoid flakyness.
//
// The following is a table of max_imbalance_threshold and failure rates for 10000 runs:
//
// threshold | # runs | # failures
// ----------+--------+------------
// 1.4 | 10000 | 0
// 1.3 | 10000 | 57
// 1.2 | 10000 | 539
auto cfg = tablet_cql_test_config();
do_with_cql_env_thread([&] (auto& e) {
logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug);
topology_builder topo(e);
endpoint_dc_rack dc_rack;
const uint64_t shard_capacity = 250UL * 1024UL * 1024UL * 1024UL;
const size_t tablet_count = 512;
const double max_imbalance_threshold = 1.5;
const double min_imbalance_threshold = 1 / max_imbalance_threshold;
uint64_t total_capacity = 0;
std::vector<host_id> hosts;
// Add disk capacity for the default node. Add all subsequent nodes to the same DC/rack
e.shared_token_metadata().local().get()->get_topology().for_each_node([&] (const auto& node) {
dc_rack = node.dc_rack();
auto host = node.host_id();
auto num_shards = node.get_shard_count();
auto node_capacity = shard_capacity * num_shards;
topo.get_shared_load_stats().set_capacity(host, node_capacity);
total_capacity += node_capacity;
testlog.info("Default node {} has {} shards and {:i} disk capacity", host, num_shards, utils::pretty_printed_data_size(node_capacity));
hosts.push_back(host);
});
auto create_node = [&] (size_t num_shards) {
auto host = topo.add_node(node_state::normal, num_shards, dc_rack);
auto node_capacity = shard_capacity * num_shards;
topo.get_shared_load_stats().set_capacity(host, node_capacity);
total_capacity += node_capacity;
testlog.info("Added node {} with {} shards and {:i} disk capacity", host, num_shards, utils::pretty_printed_data_size(node_capacity));
hosts.push_back(host);
};
create_node(10);
create_node(8);
auto ks_name = add_keyspace(e, {{dc_rack.dc, 1}});
// Add 3 tables: 0.5 of the current total storage, 0.25 of the total storage and 0.125 of the total storage
std::map<table_id, uint64_t> table_sizes;
uint64_t table_size = total_capacity / 2;
for (int c = 0; c < 3; c++) {
auto table_id = create_table_and_set_tablet_sizes(e, topo, ks_name, tablet_count, table_size);
table_sizes[table_id] = table_size;
table_size /= 2;
}
// Add another table with 1 byte per tablet
table_size = tablet_count;
auto table_id = create_table_and_set_tablet_sizes(e, topo, ks_name, tablet_count, table_size);
table_sizes[table_id] = table_size;
auto& stm = e.shared_token_metadata().local();
auto check_balance = [&] {
for (auto& [table, table_size] : table_sizes) {
load_sketch load(stm.get(), topo.get_shared_load_stats().get());
load.populate(std::nullopt, table).get();
const double ideal_table_load = double(table_size) / total_capacity;
min_max_tracker<double> table_load;
for (auto h : hosts) {
auto shard_minmax_load = load.get_shard_minmax(h);
table_load.update(shard_minmax_load);
testlog.info("Table: {} ideal_load: {} host: {} load: {} min_shard_load: {} max_shard_load: {}",
table, ideal_table_load, h, load.get_load(h), shard_minmax_load.min(), shard_minmax_load.max());
BOOST_REQUIRE_LT(min_imbalance_threshold, shard_minmax_load.min() / ideal_table_load);
BOOST_REQUIRE_GT(max_imbalance_threshold, shard_minmax_load.max() / ideal_table_load);
}
}
};
rebalance_tablets(e, &topo.get_shared_load_stats());
check_balance();
create_node(8);
rebalance_tablets(e, &topo.get_shared_load_stats());
check_balance();
}, std::move(cfg)).get();
}
static future<> run_imbalance_when_creating_plenty_of_tables_test(bool rf_rack_valid_keyspaces) {
cql_test_config cfg{};
cfg.db_config->rf_rack_valid_keyspaces.set(std::move(rf_rack_valid_keyspaces));
return do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto rack3 = topo.start_new_rack();
topo.add_i4i_2xlarge(rack1);
topo.add_i4i_2xlarge(rack2);
topo.add_i4i_2xlarge(rack3);
auto& stm = e.shared_token_metadata().local();
auto ks_name = add_keyspace(e, {{topo.dc(), 3}}, 1);
for (int _ : std::views::iota(1, 100)) {
add_table(e, ks_name, {{"min_per_shard_tablet_count", "10.0"}}).get();
}
testlog.info("Initial cluster ready");
{
load_sketch load(stm.get());
load.populate().get();
for (auto h: topo.hosts()) {
auto node_utilization = load.get_shard_minmax_tablet_count(h);
testlog.info("host {}: min={}, max={}", h, node_utilization.min(), node_utilization.max());
BOOST_REQUIRE_LT(node_utilization.max() - node_utilization.min(), 1.1);
}
}
}, std::move(cfg));
}
// Reproduces https://github.com/scylladb/scylladb/issues/27620
SEASTAR_THREAD_TEST_CASE(test_imbalance_when_creating_plenty_of_tables_with_RF_rack_valid_keyspaces_enforced) {
run_imbalance_when_creating_plenty_of_tables_test(true).get();
}
SEASTAR_THREAD_TEST_CASE(test_imbalance_when_creating_plenty_of_tables_with_RF_rack_valid_keyspaces_disabled) {
run_imbalance_when_creating_plenty_of_tables_test(false).get();
}
SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_mixed_dc_rf) {
cql_test_config cfg = tablet_cql_test_config();
do_with_cql_env_thread([] (auto& e) {
auto per_shard_goal = e.local_db().get_config().tablets_per_shard_goal();
topology_builder topo(e);
shared_load_stats& load_stats = topo.get_shared_load_stats();
std::vector<host_id> hosts;
sstring dc1 = topo.dc();
hosts.push_back(topo.add_node(node_state::normal, 2));
topo.start_new_rack();
hosts.push_back(topo.add_node(node_state::normal, 2));
topo.start_new_rack();
hosts.push_back(topo.add_node(node_state::normal, 2));
auto dc2 = topo.start_new_dc().dc;
hosts.push_back(topo.add_node(node_state::normal, 1));
topo.start_new_rack();
hosts.push_back(topo.add_node(node_state::normal, 1));
auto ks_name1 = add_keyspace(e, {{dc1, 3}});
auto ks_name2 = add_keyspace(e, {{dc2, 2}});
// table1 overflows per-shard goal in dc1, should be scaled down.
// wants 400 tablets (3 nodes * 2 shards * 200 tablets/shard / rf=3 = 400 tablets)
// which will be scaled down by a factor of 0.5 to achieve 100 tablets/shard, giving
// 200 tablets, scaled up to the nearest power of 2, which is 256.
e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1)) "
"WITH tablets = {{'min_per_shard_tablet_count': 200}}", ks_name1)).get();
auto table1 = e.local_db().find_schema(ks_name1, "table1")->id();
// table2 has 64 tablets/shard in dc2, should not be scaled down.
e.execute_cql(fmt::format("CREATE TABLE {}.table2 (p1 text, r1 int, PRIMARY KEY (p1)) "
"WITH tablets = {{'min_per_shard_tablet_count': 64}}", ks_name2)).get();
auto table2 = e.local_db().find_schema(ks_name2, "table2")->id();
rebalance_tablets(e);
{
auto& stm = e.shared_token_metadata().local();
auto tm = stm.get();
BOOST_REQUIRE_EQUAL(tm->tablets().get_tablet_map(table1).tablet_count(), 256);
BOOST_REQUIRE_EQUAL(tm->tablets().get_tablet_map(table2).tablet_count(), 64);
load_sketch load(tm, load_stats.get());
load.populate().get();
for (auto h: hosts) {
auto l = load.get_shard_minmax_tablet_count(h);
testlog.info("Tablet count on host {}: min={}, max={}", h, l.min(), l.max());
BOOST_REQUIRE_LE(l.max(), 2 * per_shard_goal);
}
}
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancer_ignores_hosts_with_incomplete_stats) {
// This checks that nodes with incomplete stats are not included in load balancing.
do_with_cql_env_thread([] (auto& e) {
logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug);
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, 2);
auto host2 = topo.add_node(node_state::normal, 2);
auto host3 = topo.add_node(node_state::normal, 2);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16);
auto table1 = add_table(e, ks_name).get();
abort_source as;
auto guard = e.get_raft_group0_client().start_operation(as).get();
auto& stm = e.shared_token_metadata().local();
// Move all tablets to shard 0 of host2
mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(16);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host2, 0},
}
});
}
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
// Set tablet sizes, then erase a tablet size from load_stats for host2
auto& shared_stats = topo.get_shared_load_stats();
shared_stats.set_default_tablet_sizes(stm.get());
auto tablet_size_i = shared_stats.stats.tablet_stats.at(host2).tablet_sizes.at(table1).begin();
shared_stats.stats.tablet_stats.at(host2).tablet_sizes.at(table1).erase(tablet_size_i);
// Balancing should not issue any migrations because host2 will be ignored
// due to incomplete tablet sizes in load_stats
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get();
BOOST_REQUIRE(plan.empty());
}
// Balancing should issue migrations with host2 having all tablet sizes
shared_stats.set_default_tablet_sizes(stm.get());
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get();
BOOST_REQUIRE(!plan.empty());
BOOST_REQUIRE(!plan.migrations().empty());
for (auto&& mig : plan.migrations()) {
BOOST_REQUIRE_EQUAL(mig.src.host, host2);
}
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancer_does_not_balance_with_missing_tablet_sizes) {
// This checks that the balancer will not issue migrations with incomplete tablet sizes
do_with_cql_env_thread([] (auto& e) {
logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug);
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, 2);
auto host2 = topo.add_node(node_state::normal, 2);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 64);
auto table1 = add_table(e, ks_name).get();
auto& stm = e.shared_token_metadata().local();
// Decommission host2
topo.set_node_state(host2, node_state::decommissioning);
// Set tablet sizes, then erase a tablet size from load_stats for host1 and host2
auto& shared_stats = topo.get_shared_load_stats();
shared_stats.set_default_tablet_sizes(stm.get());
auto erase_tablet_size = [&] (host_id host) {
auto tablet_size_i = shared_stats.stats.tablet_stats.at(host).tablet_sizes.at(table1).begin();
shared_stats.stats.tablet_stats.at(host).tablet_sizes.at(table1).erase(tablet_size_i);
};
erase_tablet_size(host1);
erase_tablet_size(host2);
// Balancing should not issue migrations due to missing tablet sizes
{
auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get();
BOOST_REQUIRE(plan.empty());
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_split_and_merge_of_colocated_tables) {
do_with_cql_env_thread([] (auto& e) {
logging::logger_registry().set_logger_level("load_balancer", logging::log_level::trace);
topology_builder topo(e);
unsigned shard_count = 2;
auto host1 = topo.add_node(node_state::normal, shard_count);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1);
auto table1 = add_table(e, ks_name).get();
auto table2 = add_table(e, ks_name).get();
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(1);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
}
});
tablet_map tmap1 = co_await tmap.clone_gently();
tmeta.set_tablet_map(table1, std::move(tmap1));
co_await tmeta.set_colocated_table(table2, table1);
});
auto& stm = e.shared_token_metadata().local();
BOOST_REQUIRE_EQUAL(1, stm.get()->tablets().get_tablet_map(table1).tablet_count());
BOOST_REQUIRE_EQUAL(1, stm.get()->tablets().get_tablet_map(table2).tablet_count());
// the target tablet size for a group of co-located tablets is the default
// target divided by the group size. see make_sizing_plan
const uint64_t target_tablet_size = service::default_target_tablet_size / 2;
shared_load_stats& load_stats = topo.get_shared_load_stats();
// avg tablet size = 3.5 * target > 2 * target
load_stats.set_size(table1, 3*target_tablet_size);
load_stats.set_size(table2, 4*target_tablet_size);
rebalance_tablets(e, &load_stats);
auto tablet_count_after_split = stm.get()->tablets().get_tablet_map(table1).tablet_count();
BOOST_REQUIRE_EQUAL(tablet_count_after_split, stm.get()->tablets().get_tablet_map(table2).tablet_count());
BOOST_REQUIRE_EQUAL(tablet_count_after_split, 2);
// avg tablet size = (0.6 / 2) * target = 0.3 * target < 0.5 * target
load_stats.set_size(table1, 1.1*target_tablet_size);
load_stats.set_size(table2, 0.1*target_tablet_size);
rebalance_tablets(e, &load_stats);
auto tablet_count_after_merge = stm.get()->tablets().get_tablet_map(table1).tablet_count();
BOOST_REQUIRE_EQUAL(tablet_count_after_merge, stm.get()->tablets().get_tablet_map(table2).tablet_count());
BOOST_REQUIRE_EQUAL(tablet_count_after_merge, 1);
}).get();
}
// This test verifies that per-table tablet count is adjusted
// in reaction to changes of relevant config and schema options.
SEASTAR_THREAD_TEST_CASE(test_tablet_option_and_config_changes) {
auto cfg = tablet_cql_test_config();
cfg.db_config->tablets_initial_scale_factor(10.0);
cfg.need_remote_proxy = true;
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto dc = topo.dc();
// 3 shards. default initial scale wants 30 (32) tablets.
// keyspace 'initial' wants 2 tablets.
topo.add_node(node_state::normal, 3);
auto ks_name1 = add_keyspace(e, {{dc, 1}}, 2);
e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1))", ks_name1)).get();
auto table1 = e.local_db().find_schema(ks_name1, "table1")->id();
auto& stm = e.shared_token_metadata().local();
auto get_tablet_count = [&] {
auto tm = stm.get();
return tm->tablets().get_tablet_map(table1).tablet_count();
};
shared_load_stats& load_stats = topo.get_shared_load_stats();
load_stats.set_size(table1, 0);
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_EQUAL(get_tablet_count(), 2);
// min_per_shard_tablet_count wants 5 * 3 = 15 (16) tablets
e.execute_cql(fmt::format("ALTER TABLE {}.table1 "
"WITH tablets = {{'min_per_shard_tablet_count': 5}}", ks_name1)).get();
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_EQUAL(get_tablet_count(), 16);
// Check that hint can be dropped.
e.execute_cql(fmt::format("ALTER TABLE {}.table1 WITH tablets = {{}}", ks_name1)).get();
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_EQUAL(get_tablet_count(), 2);
// Default kicks in if keyspace setting and hint are missing.
e.execute_cql(format("ALTER KEYSPACE {} with tablets = {{'enabled': true}}", ks_name1, dc)).get();
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_EQUAL(get_tablet_count(), 32);
// initial scale can be live-updated.
auto& cfg = e.db_config();
cfg.tablets_initial_scale_factor(5);
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_EQUAL(get_tablet_count(), 16);
// per-shard goal can be live-updated.
// merge
cfg.tablets_per_shard_goal(1);
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_EQUAL(get_tablet_count(), 4);
// split
cfg.tablets_per_shard_goal(100);
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_EQUAL(get_tablet_count(), 16);
// initial scale can be smaller than 1.
// 0.5 tablet/shard * 3 shards = 1.5 tablets =~ 2 tablets.
cfg.tablets_initial_scale_factor(0.5);
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_EQUAL(get_tablet_count(), 2);
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_creating_lots_of_tables_doesnt_overflow_metadata) {
auto cfg = tablet_cql_test_config();
cfg.db_config->tablets_initial_scale_factor(10.0);
cfg.db_config->tablets_per_shard_goal(100);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto dc = topo.dc();
// 10 tablets/shard (initial_scale) * 16 shards = 160 tablets, rounded up to 256.
// That's 16 tablet replicas per shard per table.
// Creating 100 tables without scaling would give 1'600 tablets per shard,
// which would overshoot the per-shard limit significantly.
// This test verifies that scaling kicks in sooner as more tables are created,
// and we end up with fewer tablets even before tablet merging is executed.
auto host1 = topo.add_node(node_state::normal, 16);
auto ks_name1 = add_keyspace(e, {{dc, 1}});
std::vector<table_id> tables;
shared_load_stats& load_stats = topo.get_shared_load_stats();
const auto nr_tables = 100u;
parallel_for_each(std::views::iota(0u, nr_tables), [&] (auto i) -> future<> {
auto table_name = fmt::format("table_{}", i);
co_await e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))",
ks_name1, table_name));
table_id table = e.local_db().find_schema(ks_name1, table_name)->id();
tables.push_back(table);
load_stats.set_size(table, 0);
}).get();
auto& stm = e.shared_token_metadata().local();
load_stats.set_default_tablet_sizes(stm.get());
{
load_sketch load(stm.get(), load_stats.get());
load.populate().get();
testlog.info("max tablet count: {}", load.get_shard_minmax_tablet_count(host1).max());
// The value 415 was determined empirically. If there was lack of scaling, it would be 1'600.
BOOST_REQUIRE(load.get_shard_minmax_tablet_count(host1).max() <= 415);
}
rebalance_tablets(e, &load_stats);
{
load_sketch load(stm.get(), load_stats.get());
load.populate().get();
testlog.info("max tablet count: {}", load.get_shard_minmax_tablet_count(host1).max());
BOOST_REQUIRE(load.get_shard_minmax_tablet_count(host1).max() <= 200);
}
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_stats_tablet_reconcile) {
auto cfg = tablet_cql_test_config();
// This test checks the correctness of the load_stats reconciliation algorithm.
// We only attempt to reconcile tablet_sizes after a merge or a split.
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto dc = topo.dc();
const size_t tablet_count = 16;
auto host = topo.add_node(node_state::normal, 4);
auto ks_name = add_keyspace(e, {{dc, 1}}, tablet_count);
sstring table_name = "table_1";
e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get();
table_id table = e.local_db().find_schema(ks_name, table_name)->id();
auto& stm = e.shared_token_metadata().local();
token_metadata_ptr old_tmptr = stm.get();
auto& tmap = stm.get()->tablets().get_tablet_map(table);
auto set_tablet_count = [&] (size_t new_tablet_count) {
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map new_tmap(new_tablet_count);
tmeta.set_tablet_map(table, std::move(new_tmap));
return make_ready_future<>();
});
};
// This checks if the tablet sizes have been correctly reconciled after a merge
{
locator::load_stats stats;
locator::tablet_load_stats& tls = stats.tablet_stats[host];
for (size_t i = 0; i < tablet_count; ++i) {
const dht::token_range range {tmap.get_token_range(tablet_id(i))};
tls.tablet_sizes[table][range] = i;
}
size_t tablet_count_after_merge = tablet_count / 2;
set_tablet_count(tablet_count_after_merge);
auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tmptr, *stm.get());
BOOST_REQUIRE(reconciled_stats_ptr);
locator::tablet_load_stats& reconciled_tls = reconciled_stats_ptr->tablet_stats[host];
BOOST_REQUIRE_EQUAL(reconciled_tls.tablet_sizes.at(table).size(), tablet_count_after_merge);
locator::tablet_map tmap_after_merge(tablet_count_after_merge);
for (size_t i = 0; i < tablet_count_after_merge; ++i) {
dht::token_range trange {tmap_after_merge.get_token_range(locator::tablet_id{i})};
const uint64_t reconciled_tablet_size = reconciled_tls.tablet_sizes.at(table).at(trange);
uint64_t expected_sum = 0;
for (uint64_t i_sum = 0; i_sum < 2; ++i_sum) {
expected_sum += i * 2 + i_sum;
}
BOOST_REQUIRE_EQUAL(reconciled_tablet_size, expected_sum);
}
}
// This checks if the tablet sizes have been correctly reconciled after a split
{
locator::load_stats stats;
locator::tablet_load_stats& tls = stats.tablet_stats[host];
for (size_t i = 0; i < tablet_count; ++i) {
const dht::token_range range {tmap.get_token_range(tablet_id(i))};
tls.tablet_sizes[table][range] = i * 2;
}
size_t tablet_count_after_split = tablet_count * 2;
set_tablet_count(tablet_count_after_split);
auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tmptr, *stm.get());
BOOST_REQUIRE(reconciled_stats_ptr);
locator::tablet_load_stats& reconciled_tls = reconciled_stats_ptr->tablet_stats[host];
BOOST_REQUIRE_EQUAL(reconciled_tls.tablet_sizes.at(table).size(), tablet_count_after_split);
locator::tablet_map tmap_after_split(tablet_count_after_split);
for (size_t i = 0; i < tablet_count_after_split; ++i) {
dht::token_range trange {tmap_after_split.get_token_range(locator::tablet_id{i})};
const uint64_t reconciled_tablet_size = reconciled_tls.tablet_sizes.at(table).at(trange);
BOOST_REQUIRE_EQUAL(reconciled_tablet_size, i / 2);
}
}
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_stats_tablet_reconcile_tablet_not_found) {
auto cfg = tablet_cql_test_config();
// This test checks if the reconcile tablet algorithm returns nullptr when it
// can't find all the tablet sizes in load_stats
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto dc = topo.dc();
const size_t tablet_count = 16;
auto host = topo.add_node(node_state::normal, 4);
auto ks_name = add_keyspace(e, {{dc, 1}}, tablet_count);
sstring table_name = "table_1";
e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get();
table_id table = e.local_db().find_schema(ks_name, table_name)->id();
auto& stm = e.shared_token_metadata().local();
auto& tmap = stm.get()->tablets().get_tablet_map(table);
locator::load_stats stats;
locator::tablet_load_stats& tls = stats.tablet_stats[host];
// Add all tablet sizes except the last one. This will cause reconcile to return a nullptr
for (size_t i = 0; i < tablet_count - 1; ++i) {
const dht::token_range range {tmap.get_token_range(tablet_id(i))};
tls.tablet_sizes[table][range] = i;
}
token_metadata_ptr old_tm { stm.get() };
auto set_tablet_count = [&] (size_t new_tablet_count) {
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map new_tmap(new_tablet_count);
tmeta.set_tablet_map(table, std::move(new_tmap));
return make_ready_future<>();
});
};
// Test if merge reconcile detects a missing sibling tablet in load_stats
set_tablet_count(tablet_count / 2);
auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tm, *stm.get());
BOOST_REQUIRE_EQUAL(reconciled_stats_ptr.get(), nullptr);
// Test if split reconcile detects a missing tablet in load_stats
set_tablet_count(tablet_count * 2);
reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tm, *stm.get());
BOOST_REQUIRE_EQUAL(reconciled_stats_ptr.get(), nullptr);
}, cfg).get();
}
SEASTAR_TEST_CASE(test_load_stats_migrate_tablet_size) {
auto table = table_id(utils::UUID_gen::get_time_UUID());
auto host1 = host_id(utils::UUID_gen::get_time_UUID());
auto host2 = host_id(utils::UUID_gen::get_time_UUID());
tablet_map tmap(8);
tablet_id tid(1);
const uint64_t tablet_size = 42;
range_based_tablet_id rb_tid{table, tmap.get_token_range(tid)};
global_tablet_id gid{table, tid};
// Check tablet size is correctly migrated
{
load_stats stats;
stats.tablet_stats[host1].tablet_sizes[table][rb_tid.range] = tablet_size;
stats.tablet_stats[host2] = {};
auto new_load_stats = stats.migrate_tablet_size(host1, host2, gid, rb_tid.range);
BOOST_REQUIRE(new_load_stats);
// Check tablet size is on host2
auto tablet_size_opt = new_load_stats->get_tablet_size(host2, rb_tid);
BOOST_REQUIRE(tablet_size_opt);
BOOST_REQUIRE(*tablet_size_opt == tablet_size);
// Check tablet size is not on host1
tablet_size_opt = new_load_stats->get_tablet_size(host1, rb_tid);
BOOST_REQUIRE(!tablet_size_opt);
// Check the migration removed the entry for the table after removing the last tablet size
BOOST_REQUIRE(!new_load_stats->tablet_stats.at(host1).tablet_sizes.contains(table));
}
// Check migrate_tablet_size() returns nullptr when tablet is not found on leaving replica
{
load_stats stats;
stats.tablet_stats[host1] = {};
stats.tablet_stats[host2] = {};
auto new_load_stats = stats.migrate_tablet_size(host1, host2, gid, rb_tid.range);
BOOST_REQUIRE(!new_load_stats);
}
// Check migrate_tablet_size() returns nullptr when tablet is already on pending replica
{
load_stats stats;
stats.tablet_stats[host1].tablet_sizes[table][rb_tid.range] = tablet_size;
stats.tablet_stats[host2].tablet_sizes[table][rb_tid.range] = tablet_size;
auto new_load_stats = stats.migrate_tablet_size(host1, host2, gid, rb_tid.range);
BOOST_REQUIRE(!new_load_stats);
}
// Check migrate_tablet_size() returns nullptr when leaving and pending replicas are equal
{
load_stats stats;
stats.tablet_stats[host1].tablet_sizes[table][rb_tid.range] = tablet_size;
stats.tablet_stats[host2] = {};
auto new_load_stats = stats.migrate_tablet_size(host1, host1, gid, rb_tid.range);
BOOST_REQUIRE(!new_load_stats);
}
// Check migrate_tablet_size() returns nullptr when pending host is not found in load_stats
{
load_stats stats;
stats.tablet_stats[host1].tablet_sizes[table][rb_tid.range] = tablet_size;
auto new_load_stats = stats.migrate_tablet_size(host1, host2, gid, rb_tid.range);
BOOST_REQUIRE(!new_load_stats);
}
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_tablet_id_and_range_side) {
static constexpr size_t tablet_count = 128;
locator::tablet_map tmap(tablet_count);
locator::tablet_map tmap_after_splitting(tablet_count * 2);
for (size_t id = 0; id < tablet_count; id++) {
auto left_id = tablet_id(id << 1);
auto right_id = tablet_id(left_id.value() + 1);
auto left_tr = tmap_after_splitting.get_token_range(left_id);
auto right_tr = tmap_after_splitting.get_token_range(right_id);
testlog.debug("id {}, left tr {}, right tr {}", id, left_tr, right_tr);
auto test = [&tmap, id] (dht::token token, tablet_range_side expected_side) {
auto [tid, side] = tmap.get_tablet_id_and_range_side(token);
BOOST_REQUIRE_EQUAL(tid.value(), id);
BOOST_REQUIRE_EQUAL(side, expected_side);
};
auto test_range = [&] (dht::token_range& tr, tablet_range_side expected_side) {
auto lower_token = tr.start()->value() == dht::minimum_token() ? dht::first_token() : tr.start()->value();
auto upper_token = tr.end()->value();
test(next_token(lower_token), expected_side);
test(upper_token, expected_side);
};
// Test the lower and upper bound of tablet's left and right ranges ("compaction groups").
test_range(left_tr, tablet_range_side::left);
test_range(right_tr, tablet_range_side::right);
}
return make_ready_future<>();
}
SEASTAR_THREAD_TEST_CASE(basic_tablet_storage_splitting_test) {
auto cfg = tablet_cql_test_config();
cfg.initial_tablets = std::bit_floor(smp::count);
do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql(
"CREATE TABLE cf (pk int, ck int, v int, PRIMARY KEY (pk, ck))").get();
for (unsigned i = 0; i < smp::count * 20; i++) {
e.execute_cql(format("INSERT INTO cf (pk, ck, v) VALUES ({}, 0, 0)", i)).get();
}
e.db().invoke_on_all([] (replica::database& db) {
auto& table = db.find_column_family("ks", "cf");
return table.flush();
}).get();
testlog.info("Splitting sstables...");
e.db().invoke_on_all([] (replica::database& db) {
auto& table = db.find_column_family("ks", "cf");
testlog.info("sstable count: {}", table.sstables_count());
return table.split_all_storage_groups(tasks::task_info{});
}).get();
testlog.info("Verifying sstables are split...");
BOOST_REQUIRE_EQUAL(e.db().map_reduce0([] (replica::database& db) {
auto& table = db.find_column_family("ks", "cf");
return make_ready_future<bool>(table.all_storage_groups_split());
}, bool(false), std::logical_or<bool>()).get(), true);
}, std::move(cfg)).get();
}
using rack_vector = std::vector<endpoint_dc_rack>;
using hosts_by_rack_map = std::unordered_map<sstring, std::vector<host_id>>;
// runs in seastar thread.
static void do_test_load_balancing_merge_colocation(cql_test_env& e, const int n_racks, const int rf, const int n_hosts,
const unsigned shard_count, const unsigned initial_tablets,
std::function<void(token_metadata&, tablet_map&, const rack_vector&, const hosts_by_rack_map&)> set_tablets) {
topology_builder topo(e);
rack_vector racks;
for (int i = 0; i < n_racks; i++) {
racks.push_back(topo.rack());
topo.start_new_rack();
}
testlog.info("merge colocation test - hosts={}, racks={}, rf={}, shard_count={}, initial_tablets={}", n_hosts, racks.size(), rf, shard_count, initial_tablets);
hosts_by_rack_map hosts_by_rack;
for (int i = 0; i < n_hosts; ++i) {
auto rack = racks[i % racks.size()];
auto h = topo.add_node(node_state::normal, shard_count, rack);
hosts_by_rack[rack.rack].push_back(h);
}
auto ks_name = add_keyspace(e, {{topo.dc(), rf}}, initial_tablets);
auto table1 = add_table(e, ks_name).get();
auto& stm = e.shared_token_metadata().local();
{
abort_source as;
auto guard = e.get_raft_group0_client().start_operation(as).get();
stm.mutate_token_metadata([&](token_metadata& tm) -> future<> {
tablet_metadata& tmeta = tm.tablets();
tablet_map tmap(initial_tablets);
locator::resize_decision decision;
// leaves growing mode, allowing for merge decision.
decision.sequence_number = decision.next_sequence_number();
tmap.set_resize_decision(std::move(decision));
set_tablets(tm, tmap, racks, hosts_by_rack);
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
return make_ready_future < > ();
}).get();
save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get();
}
// Lower "initial" tablets option, allowing for merge decision.
e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get();
auto tablet_count = [&] {
return stm.get()->tablets().get_tablet_map(table1).tablet_count();
};
shared_load_stats& load_stats = topo.get_shared_load_stats();
load_stats.set_default_tablet_sizes(stm.get());
auto do_rebalance_tablets = [&] () {
rebalance_tablets(e, &load_stats);
};
const uint64_t target_tablet_size = service::default_target_tablet_size;
auto merge_threshold = [&] () -> uint64_t {
return (target_tablet_size * 0.5f) * tablet_count();
};
while (tablet_count() > 1) {
load_stats.set_size(table1, merge_threshold() - 1);
auto old_tablet_count = tablet_count();
check_tablet_invariants(stm.get()->tablets());
do_rebalance_tablets();
check_tablet_invariants(stm.get()->tablets());
BOOST_REQUIRE_LT(tablet_count(), old_tablet_count);
}
e.execute_cql(fmt::format("drop keyspace {}", ks_name)).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_random_load) {
cql_test_config cfg;
cfg.need_remote_proxy = true;
do_with_cql_env_thread([] (auto& e) {
auto seed = tests::random::get_int<int32_t>();
std::mt19937 random_engine{seed};
testlog.info("test_load_balancing_merge_colocation - seed {}", seed);
for (auto i = 0; i < 10; i++) {
const int rf = tests::random::get_int<int>(3, 3);
const int n_racks = rf;
const int n_hosts = tests::random::get_int<unsigned>(n_racks * rf, n_racks * rf * 2);
const unsigned shard_count = tests::random::get_int<unsigned>(2, 12);
const unsigned total_shard_count = n_hosts * shard_count;
const unsigned initial_tablets = std::bit_ceil<unsigned>(tests::random::get_int<unsigned>(total_shard_count, total_shard_count * 10));
auto set_tablets = [rf, shard_count] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) {
for (auto tid : tmap.tablet_ids()) {
testlog.debug("allocating replica in racks with rf {}", rf);
std::vector<host_id> replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack);
tablet_replica_set replicas;
replicas.reserve(replica_hosts.size());
for (auto h : replica_hosts) {
replicas.push_back(tablet_replica {h, tests::random::get_int<shard_id>(0, shard_count - 1)});
}
testlog.debug("allocating replicas for tablet {}: {}", tid, replicas);
tmap.set_tablet(tid, tablet_info {std::move(replicas)});
}
};
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
}
}, std::move(cfg)).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_single_rack) {
cql_test_config cfg{};
cfg.need_remote_proxy = true;
// This test purposefully uses just one rack, which means that we cannot enable
// the `rf_rack_valid_keyspaces` configuration option because we won't be able to create
// a keyspace with RF > 1.
cfg.db_config->rf_rack_valid_keyspaces.set(false);
do_with_cql_env_thread([] (auto& e) {
const int rf = 2;
const int n_racks = 1;
const int n_hosts = 2;
const unsigned shard_count = 2;
const unsigned initial_tablets = 2;
auto set_tablets = [] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) {
auto& hosts = hosts_by_rack.at(racks.front().rack);
auto host1 = hosts[0];
auto host2 = hosts[1];
tmap.set_tablet(tablet_id(0), tablet_info {
tablet_replica_set {
tablet_replica {host1, shard_id(0)},
tablet_replica {host2, shard_id(0)},
}
});
tmap.set_tablet(tablet_id(1), tablet_info {
tablet_replica_set {
tablet_replica {host2, shard_id(0)},
tablet_replica {host1, shard_id(0)},
}
});
};
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
}, cfg).get();
}
// Verify merge can proceed with multiple racks and RF=#racks
//
// Given replica sets (not in rack order):
// rack1 { n1, n2 }
// rack2 { n3, n4 }
//
// t0: { n1, n3 }
// t1: { n4, n2 }
//
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_multiple_racks_and_rf_equals_racks) {
cql_test_config cfg;
cfg.need_remote_proxy = true;
do_with_cql_env_thread([] (auto& e) {
const int rf = 2;
const int n_racks = rf;
const int n_hosts = 4; // 2 nodes in each rack.
const unsigned shard_count = 1;
const unsigned initial_tablets = 2;
auto set_tablets = [] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) {
auto& first_rack_hosts = hosts_by_rack.at(racks[0].rack);
auto& second_rack_hosts = hosts_by_rack.at(racks[1].rack);
tmap.set_tablet(tablet_id(0), tablet_info {
tablet_replica_set {
tablet_replica {first_rack_hosts[0], shard_id(0)},
tablet_replica {second_rack_hosts[0], shard_id(0)},
}
});
tmap.set_tablet(tablet_id(1), tablet_info {
tablet_replica_set {
tablet_replica {second_rack_hosts[1], shard_id(0)},
tablet_replica {first_rack_hosts[1], shard_id(0)},
}
});
};
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
}, std::move(cfg)).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission) {
cql_test_config cfg{};
cfg.need_remote_proxy = true;
// The scenario this test addresses cannot happen with `rf_rack_valid_keyspaces` set to true.
//
// Among the tablet replicas for a given tablet, there CANNOT be two nodes from the same rack.
// After the decommission of B, both tablets will reside on ALL other nodes, which implies that
// they're on pairwise distinct racks. However, since B was taking part in replication of the
// tablets, it must've been among the replicas of at least one of the tablets and, for the very
// same reason, it must be on a separate rack. Hence, all nodes must reside on pairwise distinct racks.
//
// So, we if want to keep the current number of nodes and RF, we must have 4 racks. But we cannot
// do that until we've implemented scylladb/scylladb#23737. Besides, the test seems to rely on
// using just one rack, which makes it incompatible with `rf_rack_valid_keyspaces: true` anyway.
cfg.db_config->rf_rack_valid_keyspaces.set(false);
do_with_cql_env_thread([] (auto& e) {
const int rf = 3;
const int n_racks = 1;
const int n_hosts = 4;
const unsigned shard_count = 2;
const unsigned initial_tablets = 2;
auto set_tablets = [&] (token_metadata& tm, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) {
auto& rack = racks.front();
auto& hosts = hosts_by_rack.at(rack.rack);
BOOST_REQUIRE(hosts.size() == 4);
auto a = hosts[0];
auto b = hosts[1];
auto c = hosts[2];
auto d = hosts[3];
// nodes = {A, B, C, D}
// tablet1 = {A, B, C}
// tablet2 = {A, B, D}
// viable target for {tablet1, B} is D.
// viable target for {tablet2, B} is C.
//
// Decomission should succeed by migrating away even co-located replicas of sibling tablets that don't share viable targets.
// That should produce:
// tablet1 = {A, D, C}
// tablet2 = {A, C, D}
auto decision = tmap.resize_decision();
decision.way = locator::resize_decision::merge{};
tmap.set_resize_decision(std::move(decision));
tm.update_topology(b, rack, node::state::being_decommissioned, shard_count);
tmap.set_tablet(tablet_id(0), tablet_info {
tablet_replica_set {
tablet_replica {a, shard_id(0)},
tablet_replica {b, shard_id(0)},
tablet_replica {c, shard_id(0)},
}
});
tmap.set_tablet(tablet_id(1), tablet_info {
tablet_replica_set {
tablet_replica {a, shard_id(0)},
tablet_replica {b, shard_id(0)},
tablet_replica {d, shard_id(0)},
}
});
};
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
topo.add_node(node_state::normal, 2);
topo.start_new_rack();
topo.add_node(node_state::normal, 2);
const size_t initial_tablets = 2;
auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, initial_tablets);
auto table1 = add_table(e, ks_name).get();
auto& stm = e.shared_token_metadata().local();
auto tablet_count = [&] {
return stm.get()->tablets().get_tablet_map(table1).tablet_count();
};
auto resize_decision = [&] {
return stm.get()->tablets().get_tablet_map(table1).resize_decision();
};
shared_load_stats& load_stats = topo.get_shared_load_stats();
load_stats.set_default_tablet_sizes(stm.get());
auto do_rebalance_tablets = [&] () {
rebalance_tablets(e, &load_stats, {}, nullptr, false); // no auto-split
};
const uint64_t max_tablet_size = service::default_target_tablet_size * 2;
auto to_size_in_bytes = [&] (double max_tablet_size_pctg) -> uint64_t {
return (max_tablet_size * max_tablet_size_pctg) * tablet_count();
};
const auto initial_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min();
load_stats.set_split_ready_seq_number(table1, initial_ready_seq_number);
// avg size moved above target size, so merge is cancelled
{
load_stats.set_size(table1, to_size_in_bytes(0.75));
do_rebalance_tablets();
BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets);
BOOST_REQUIRE(std::holds_alternative<locator::resize_decision::none>(resize_decision().way));
}
// Drop initial tablet count to 1 so merge can happen.
e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get();
// avg size hits split threshold, and balancer emits split request
{
load_stats.set_size(table1, to_size_in_bytes(1.1));
do_rebalance_tablets();
BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets);
BOOST_REQUIRE(std::holds_alternative<locator::resize_decision::split>(resize_decision().way));
BOOST_REQUIRE_GT(resize_decision().sequence_number, 0);
}
// replicas set their split status as ready, and load balancer finalizes split generating a new
// tablet map, twice as large as the previous one.
{
load_stats.set_split_ready_seq_number(table1, resize_decision().sequence_number);
do_rebalance_tablets();
BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets * 2);
BOOST_REQUIRE(std::holds_alternative<locator::resize_decision::none>(resize_decision().way));
}
// Check that balancer detects table size dropped to 0 and reduces tablet count down to 1 through merges.
{
load_stats.set_size(table1, to_size_in_bytes(0.0));
load_stats.set_split_ready_seq_number(table1, initial_ready_seq_number);
do_rebalance_tablets();
BOOST_REQUIRE_EQUAL(tablet_count(), 1);
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_drain_node_without_capacity) {
do_with_cql_env_thread([] (auto& e) {
logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug);
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, 2);
auto host2 = topo.add_node(node_state::normal, 2);
const uint64_t node_capacity = 100UL * 1024UL * 1024UL * 1024UL;
topo.get_shared_load_stats().set_capacity(host1, node_capacity);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16);
auto table = add_table(e, ks_name).get();
topo.set_node_state(host2, node_state::removing);
auto& stm = e.shared_token_metadata().local();
topo.get_shared_load_stats().set_default_tablet_sizes(stm.get());
rebalance_tablets(e, &topo.get_shared_load_stats());
// check that all tablets have been migrated from host2 to host1
auto& tmap = stm.get()->tablets().get_tablet_map(table);
tmap.for_each_tablet([&](auto tid, auto& tinfo) {
for (auto& replica : tinfo.replicas) {
BOOST_REQUIRE_EQUAL(replica.host, host1);
}
return make_ready_future<>();
}).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_tablet_range_splitter) {
simple_schema ss;
const auto dks = ss.make_pkeys(4);
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
tablet_map tmap(4);
auto tb = tmap.first_tablet();
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h2, 0},
tablet_replica {h3, 0},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 3},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h2, 2},
}
});
tb = *tmap.next_tablet(tb);
tmap.set_tablet(tb, tablet_info {
tablet_replica_set {
tablet_replica {h1, 1},
tablet_replica {h2, 1},
}
});
using result = tablet_range_splitter::range_split_result;
using bound = dht::partition_range::bound;
std::vector<result> included_ranges;
std::vector<dht::partition_range> excluded_ranges;
for (auto tid = std::optional(tmap.first_tablet()); tid; tid = tmap.next_tablet(*tid)) {
const auto& tablet_info = tmap.get_tablet_info(*tid);
auto replica_it = std::ranges::find_if(tablet_info.replicas, [&] (auto&& r) { return r.host == h1; });
auto token_range = tmap.get_token_range(*tid);
auto range = dht::to_partition_range(token_range);
if (replica_it == tablet_info.replicas.end()) {
testlog.info("tablet#{}: {} (no replica on h1)", *tid, token_range);
excluded_ranges.emplace_back(std::move(range));
} else {
testlog.info("tablet#{}: {} (shard {})", *tid, token_range, replica_it->shard);
included_ranges.emplace_back(result{replica_it->shard, std::move(range)});
}
}
dht::ring_position_comparator cmp(*ss.schema());
auto check = [&] (const dht::partition_range_vector& ranges, std::vector<result> expected_result,
std::source_location sl = std::source_location::current()) {
testlog.info("check() @ {}:{} ranges={}", sl.file_name(), sl.line(), ranges);
locator::tablet_range_splitter range_splitter{ss.schema(), tmap, h1, ranges};
auto it = expected_result.begin();
while (auto range_opt = range_splitter()) {
testlog.debug("result: shard={} range={}", range_opt->shard, range_opt->range);
BOOST_REQUIRE(it != expected_result.end());
testlog.debug("expected: shard={} range={}", it->shard, it->range);
BOOST_REQUIRE_EQUAL(it->shard, range_opt->shard);
BOOST_REQUIRE(it->range.equal(range_opt->range, cmp));
++it;
}
if (it != expected_result.end()) {
while (it != expected_result.end()) {
testlog.error("missing expected result: shard={} range={}", it->shard, it->range);
++it;
}
BOOST_FAIL("splitter didn't provide all expected ranges");
}
};
auto check_single = [&] (const dht::partition_range& range, std::vector<result> expected_result,
std::source_location sl = std::source_location::current()) {
dht::partition_range_vector ranges;
ranges.reserve(1);
ranges.push_back(std::move(range));
check(ranges, std::move(expected_result), sl);
};
auto intersect = [&] (const dht::partition_range& range) {
std::vector<result> intersecting_ranges;
for (const auto& included_range : included_ranges) {
if (auto intersection = included_range.range.intersection(range, cmp)) {
intersecting_ranges.push_back({included_range.shard, std::move(*intersection)});
}
}
return intersecting_ranges;
};
auto check_intersection_single = [&] (const dht::partition_range& range,
std::source_location sl = std::source_location::current()) {
check_single(range, intersect(range), sl);
};
auto check_intersection = [&] (const dht::partition_range_vector& ranges,
std::source_location sl = std::source_location::current()) {
std::vector<result> expected_ranges;
for (const auto& range : ranges) {
auto res = intersect(range);
std::move(res.begin(), res.end(), std::back_inserter(expected_ranges));
}
std::sort(expected_ranges.begin(), expected_ranges.end(), [&] (const auto& a, const auto& b) {
return !a.range.start() || b.range.before(a.range.start()->value(), cmp);
});
check(ranges, expected_ranges, sl);
};
check_single(dht::partition_range::make_open_ended_both_sides(), included_ranges);
check(included_ranges | std::views::transform([&] (auto& r) { return r.range; }) | std::ranges::to<dht::partition_range_vector>(), included_ranges);
check(excluded_ranges, {});
check_intersection_single({bound{dks[0], true}, bound{dks[1], false}});
check_intersection_single({bound{dks[0], false}, bound{dks[2], true}});
check_intersection_single({bound{dks[2], true}, bound{dks[3], false}});
check_intersection_single({bound{dks[0], false}, bound{dks[3], false}});
check_intersection_single(dht::partition_range::make_starting_with(bound(dks[2], true)));
check_intersection_single(dht::partition_range::make_ending_with(bound(dks[1], false)));
check_intersection_single(dht::partition_range::make_singular(dks[3]));
check_intersection({
dht::partition_range::make_ending_with(bound(dks[0], false)),
{bound{dks[1], true}, bound{dks[2], false}},
dht::partition_range::make_starting_with(bound(dks[3], true))});
check_intersection({
{bound{dks[0], true}, bound{dks[1], false}},
{bound{dks[1], true}, bound{dks[2], false}},
{bound{dks[2], true}, bound{dks[3], false}}});
}
static locator::endpoint_dc_rack make_endpoint_dc_rack(gms::inet_address endpoint) {
// This resembles rack_inferring_snitch dc/rack generation which is
// still in use by this test via token_metadata internals
auto dc = std::to_string(uint8_t(endpoint.bytes()[1]));
auto rack = std::to_string(uint8_t(endpoint.bytes()[2]));
return locator::endpoint_dc_rack{dc, rack};
}
struct calculate_tablet_replicas_for_new_rf_config
{
struct ring_point {
double point;
inet_address host;
host_id id = host_id::create_random_id();
};
std::vector<ring_point> ring_points;
replication_strategy_config_options options;
replication_strategy_config_options new_dc_rep_factor;
std::map<sstring, size_t> expected_rep_factor;
};
static void execute_tablet_for_new_rf_test(calculate_tablet_replicas_for_new_rf_config const& test_config)
{
auto my_address = gms::inet_address("localhost");
// Create the RackInferringSnitch
snitch_config cfg;
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
cfg.name = "RackInferringSnitch";
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
snitch.invoke_on_all(&snitch_ptr::start).get();
static constexpr size_t tablet_count = 8;
std::vector<unsigned> nodes_shard_count(test_config.ring_points.size(), 3);
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = test_config.ring_points[0].host;
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
tm_cfg.topo_cfg.this_host_id = test_config.ring_points[0].id;
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
// Initialize the token_metadata
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : test_config.ring_points) {
std::unordered_set<token> tokens;
tokens.insert(dht::token{tests::d2t(ring_point / test_config.ring_points.size())});
topo.add_or_update_endpoint(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, 1);
co_await tm.update_normal_tokens(std::move(tokens), id);
}
}).get();
locator::replication_strategy_params params(test_config.options, tablet_count, std::nullopt);
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", params, stm.get()->get_topology());
auto tablet_aware_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tablet_aware_ptr);
auto s = schema_builder("ks", "tb")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("v", utf8_type)
.build();
stm.mutate_token_metadata([&] (token_metadata& tm) {
for (size_t i = 0; i < test_config.ring_points.size(); ++i) {
auto& [ring_point, endpoint, id] = test_config.ring_points[i];
tm.update_topology(id, make_endpoint_dc_rack(endpoint), node::state::normal, nodes_shard_count[i]);
}
return make_ready_future<>();
}).get();
auto allocated_map = tablet_aware_ptr->allocate_tablets_for_new_table(s, stm.get(), tablet_count).get();
BOOST_REQUIRE_EQUAL(allocated_map.tablet_count(), tablet_count);
auto host_id_to_dc = [&stm](const locator::host_id& ep) -> std::optional<sstring> {
auto node = stm.get()->get_topology().find_node(ep);
if (node == nullptr) {
return std::nullopt;
}
return node->dc_rack().dc;
};
stm.mutate_token_metadata([&] (token_metadata& tm) {
tablet_metadata tab_meta;
auto table = s->id();
tab_meta.set_tablet_map(table, std::move(allocated_map));
tm.set_tablets(std::move(tab_meta));
return make_ready_future<>();
}).get();
std::map<sstring, size_t> initial_rep_factor;
for (auto const& [dc, shard_count] : test_config.options) {
initial_rep_factor[dc] = locator::get_replication_factor(shard_count);
}
auto tablets = stm.get()->tablets().get_tablet_map(s->id()).clone_gently().get();
BOOST_REQUIRE_EQUAL(tablets.tablet_count(), tablet_count);
for (auto tb : tablets.tablet_ids()) {
const locator::tablet_info& ti = tablets.get_tablet_info(tb);
std::map<sstring, size_t> dc_replicas_count;
for (const auto& r : ti.replicas) {
auto dc = host_id_to_dc(r.host);
if (dc) {
dc_replicas_count[*dc]++;
}
}
BOOST_REQUIRE_EQUAL(dc_replicas_count, initial_rep_factor);
}
try {
tablet_map old_tablets = stm.get()->tablets().get_tablet_map(s->id()).clone_gently().get();
locator::replication_strategy_params params{test_config.new_dc_rep_factor, old_tablets.tablet_count(), std::nullopt};
auto new_strategy = abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params, stm.get()->get_topology());
auto tmap = new_strategy->maybe_as_tablet_aware()->reallocate_tablets(s, stm.get(), std::move(old_tablets)).get();
auto const& ts = tmap.tablets();
BOOST_REQUIRE_EQUAL(ts.size(), tablet_count);
for (auto tb : tmap.tablet_ids()) {
const locator::tablet_info& ti = tmap.get_tablet_info(tb);
std::map<sstring, size_t> dc_replicas_count;
for (const auto& r : ti.replicas) {
auto dc = host_id_to_dc(r.host);
if (dc) {
dc_replicas_count[*dc]++;
}
}
BOOST_REQUIRE_EQUAL(dc_replicas_count, test_config.expected_rep_factor);
}
} catch (exceptions::configuration_exception const& e) {
thread_local boost::regex re(
"Datacenter [0-9]+ doesn't have enough token-owning nodes for replication_factor=[0-9]+");
boost::cmatch what;
if (!boost::regex_search(e.what(), what, re)) {
BOOST_FAIL("Unexpected exception: " + std::string(e.what()));
}
} catch (std::exception const& e) {
BOOST_FAIL("Unexpected exception: " + std::string(e.what()));
} catch (...) {
BOOST_FAIL("Unexpected exception");
}
}
SEASTAR_THREAD_TEST_CASE(test_ensure_node_for_load_sketch) {
// This tests reproduces the balancer crash when a node is drained and there are more then one
// empty destination nodes. If one of these destination nodes has a lower capacity then the other,
// and the initial target node selected is the one with lower capacity, pick_candidate() will then
// change the target node to the one with higher capacity. The problem is that
// load_sketch::get_least_loaded_shard() and consequently load_sketch::ensure_node() have not yet
// been called for the new, larger target (only for the initial, smaller one), and load_sketch will
// not have the larger node in its _nodes member hash map. This will cause an std::out_of_bounds
// exception when load_sketch::pick() is called with the host_id of the larger node.
do_with_cql_env_thread([] (auto& e) {
logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug);
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, 2);
const uint64_t node1_capacity = 50UL * 1024UL * 1024UL * 1024UL;
topo.get_shared_load_stats().set_capacity(host1, node1_capacity);
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4);
add_table(e, ks_name).get();
auto host2 = topo.add_node(node_state::normal, 2);
const uint64_t node2_capacity = 70UL * 1024UL * 1024UL * 1024UL;
topo.get_shared_load_stats().set_capacity(host2, node2_capacity);
auto host3 = topo.add_node(node_state::normal, 2);
const uint64_t node3_capacity = 60UL * 1024UL * 1024UL * 1024UL;
topo.get_shared_load_stats().set_capacity(host3, node3_capacity);
topo.set_node_state(host1, node_state::removing);
auto& talloc = e.get_tablet_allocator().local();
auto& stm = e.shared_token_metadata().local();
talloc.balance_tablets(stm.get(), nullptr, nullptr, topo.get_shared_load_stats().get()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_upsize_one_dc) {
calculate_tablet_replicas_for_new_rf_config config;
config.ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 4.0, inet_address("192.100.20.1") },
{ 7.0, inet_address("192.100.30.1") },
};
config.options = {{"100", "2"}};
config.new_dc_rep_factor = {{"100", "3"}};
config.expected_rep_factor = {{"100", 3}};
execute_tablet_for_new_rf_test(config);
}
SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_downsize_one_dc) {
calculate_tablet_replicas_for_new_rf_config config;
config.ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 4.0, inet_address("192.100.20.1") },
{ 7.0, inet_address("192.100.30.1") },
};
config.options = {{"100", "3"}};
config.new_dc_rep_factor = {{"100", "2"}};
config.expected_rep_factor = {{"100", 2}};
execute_tablet_for_new_rf_test(config);
}
SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_no_change_one_dc) {
calculate_tablet_replicas_for_new_rf_config config;
config.ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 4.0, inet_address("192.100.20.1") },
{ 7.0, inet_address("192.100.30.1") },
};
config.options = {{"100", "3"}};
config.new_dc_rep_factor = {{"100", "3"}};
config.expected_rep_factor = {{"100", 3}};
execute_tablet_for_new_rf_test(config);
}
SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf) {
calculate_tablet_replicas_for_new_rf_config config;
config.ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 2.0, inet_address("192.101.10.1") },
{ 3.0, inet_address("192.102.10.1") },
{ 4.0, inet_address("192.100.20.1") },
{ 5.0, inet_address("192.101.20.1") },
{ 6.0, inet_address("192.102.20.1") },
{ 7.0, inet_address("192.100.30.1") },
{ 8.0, inet_address("192.101.30.1") },
{ 9.0, inet_address("192.102.30.1") },
{ 10.0, inet_address("192.101.40.1") },
{ 11.0, inet_address("192.102.40.1") },
{ 12.0, inet_address("192.102.40.2") }
};
config.options = {
{"100", "3"},
{"101", "2"},
{"102", "3"}
};
config.new_dc_rep_factor = {
{"100", "3"},
{"101", "4"},
{"102", "2"}
};
config.expected_rep_factor = {
{"100", 3},
{"101", 4},
{"102", 2}
};
execute_tablet_for_new_rf_test(config);
}
SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_not_enough_nodes) {
calculate_tablet_replicas_for_new_rf_config config;
config.ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 4.0, inet_address("192.100.20.1") },
{ 7.0, inet_address("192.100.30.1") },
};
config.options = {{"100", "3"}};
config.new_dc_rep_factor = {{"100", "5"}};
config.expected_rep_factor = {{"100", 3}};
execute_tablet_for_new_rf_test(config);
}
SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_one_dc) {
calculate_tablet_replicas_for_new_rf_config config;
config.ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 4.0, inet_address("192.100.20.1") },
{ 7.0, inet_address("192.100.30.1") },
};
config.options = {{"100", "2"}};
config.new_dc_rep_factor = {{"100", "3"}};
config.expected_rep_factor = {{"100", 3}};
execute_tablet_for_new_rf_test(config);
}
SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_one_dc_1_to_2) {
calculate_tablet_replicas_for_new_rf_config config;
config.ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 4.0, inet_address("192.100.20.1") },
};
config.options = {{"100", "1"}};
config.new_dc_rep_factor = {{"100", "2"}};
config.expected_rep_factor = {{"100", 2}};
execute_tablet_for_new_rf_test(config);
}
SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_one_dc_not_enough_nodes) {
calculate_tablet_replicas_for_new_rf_config config;
config.ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 4.0, inet_address("192.100.10.2") },
{ 7.0, inet_address("192.100.10.3") },
};
config.options = {{"100", "3"}};
config.new_dc_rep_factor = {{"100", "5"}};
config.expected_rep_factor = {{"100", 3}};
execute_tablet_for_new_rf_test(config);
}
SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_default_rf) {
calculate_tablet_replicas_for_new_rf_config config;
config.ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 2.0, inet_address("192.101.10.1") },
{ 3.0, inet_address("192.102.10.1") },
{ 4.0, inet_address("192.100.20.1") },
{ 5.0, inet_address("192.101.20.1") },
{ 6.0, inet_address("192.102.20.1") },
{ 7.0, inet_address("192.100.30.1") },
{ 8.0, inet_address("192.101.30.1") },
{ 9.0, inet_address("192.102.30.1") },
{ 10.0, inet_address("192.100.40.1") },
{ 11.0, inet_address("192.101.40.1") },
{ 12.0, inet_address("192.102.40.1") },
{ 13.0, inet_address("192.102.40.2") }
};
config.options = {
{"100", "3"},
{"101", "2"},
{"102", "2"}
};
config.new_dc_rep_factor = {
{"100", "4"},
{"101", "3"},
{"102", "3"},
};
config.expected_rep_factor = {
{"100", 4},
{"101", 3},
{"102", 3},
};
execute_tablet_for_new_rf_test(config);
}
SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_default_rf_upsize_by_two) {
calculate_tablet_replicas_for_new_rf_config config;
config.ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 2.0, inet_address("192.101.10.1") },
{ 3.0, inet_address("192.102.10.1") },
{ 4.0, inet_address("192.100.20.1") },
{ 5.0, inet_address("192.101.20.1") },
{ 6.0, inet_address("192.102.20.1") },
{ 7.0, inet_address("192.100.30.1") },
{ 8.0, inet_address("192.101.30.1") },
{ 9.0, inet_address("192.102.30.1") },
{ 10.0, inet_address("192.100.40.1") },
{ 11.0, inet_address("192.101.40.1") },
{ 12.0, inet_address("192.102.40.1") },
{ 13.0, inet_address("192.102.40.2") }
};
config.options = {
{"100", "3"},
{"101", "2"},
{"102", "1"}
};
config.new_dc_rep_factor = {
{"100", "4"},
{"101", "3"},
{"102", "3"},
};
config.expected_rep_factor = {
{"100", 4},
{"101", 3},
{"102", 3},
};
execute_tablet_for_new_rf_test(config);
}
SEASTAR_TEST_CASE(test_tablet_count_metric) {
auto cfg = tablet_cql_test_config();
for (unsigned n = 1; n <= smp::count; n *= 2) {
cfg.initial_tablets = n;
}
return do_with_cql_env_thread([cfg] (cql_test_env& e) {
auto tid = add_table(e).get();
auto total = e.db().map_reduce0([&] (replica::database& db) {
auto count = db.find_column_family(tid).get_stats().tablet_count;
testlog.debug("shard table_count={}", count);
return count;
}, int64_t(0), std::plus<int64_t>()).get();
BOOST_REQUIRE_EQUAL(total, cfg.initial_tablets);
}, cfg);
}
SEASTAR_TEST_CASE(test_cleanup_of_deallocated_tablet) {
auto cfg = tablet_cql_test_config();
cfg.initial_tablets = 1;
return do_with_cql_env_thread([](cql_test_env& e) {
// Create a table.
e.execute_cql("create table ks.cf (pk int, ck int, primary key (pk, ck))").get();
size_t all_tablets = 0;
// Double cleanup the tablet.
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
auto& cf = db.find_column_family("ks", "cf");
auto& sys_ks = e.get_system_keyspace().local();
auto tablet_count = cf.get_stats().tablet_count;
all_tablets += tablet_count;
if (tablet_count > 0) {
co_await cf.cleanup_tablet(db, sys_ks, locator::tablet_id(0));
co_await cf.cleanup_tablet(db, sys_ks, locator::tablet_id(0));
}
}).get();
assert(all_tablets);
}, cfg);
}
namespace {
future<> test_create_keyspace(sstring ks_name, std::optional<bool> tablets_opt, const cql_test_config& cfg, uint64_t initial_tablets = 0, sstring replication_strategy = "NetworkTopologyStrategy") {
co_await do_with_cql_env_thread([&] (cql_test_env& e) {
sstring extra;
if (tablets_opt) {
if (*tablets_opt) {
if (initial_tablets) {
extra = format(" and tablets = {{ 'initial' : {} }}", initial_tablets);
} else {
extra = " and tablets = { 'enabled' : true }";
}
} else {
extra = " and tablets = { 'enabled' : false }";
}
}
auto q = format("create keyspace {} with replication = {{ 'class' : '{}', 'replication_factor' : 1 }}{};", ks_name, replication_strategy, extra);
testlog.debug("{}", q);
e.execute_cql(q).get();
BOOST_REQUIRE(e.local_db().has_keyspace(ks_name));
auto tid = add_table(e, ks_name).get();
auto total = e.db().map_reduce0([&] (replica::database& db) {
auto count = db.find_column_family(tid).get_stats().tablet_count;
testlog.debug("shard table_count={}", count);
return count;
}, int64_t(0), std::plus<int64_t>()).get();
if (tablets_opt.value_or(cfg.db_config->enable_tablets_by_default())) {
if (initial_tablets) {
BOOST_REQUIRE_EQUAL(total, initial_tablets);
} else {
BOOST_REQUIRE_GT(total, 0);
}
} else {
BOOST_REQUIRE_EQUAL(total, 0);
}
}, cfg);
}
}
// Test that tablets can be explicitly enabled
// when creating a keyspace when the `tablets_mode_for_new_keyspaces`
// configuration option is set to `disabled`.
SEASTAR_TEST_CASE(test_explicit_tablets_enable) {
auto cfg = tablet_cql_test_config(db::tablets_mode_t::mode::disabled);
// By default tablets are disabled
co_await test_create_keyspace("test_default_settings", std::nullopt, cfg);
// Tablets can be explicitly enabled for a new keyspace
co_await test_create_keyspace("test_explictly_enabled_0", true, cfg, 0);
co_await test_create_keyspace("test_explictly_enabled_128", true, cfg, 128);
// Tablets can also be explicitly disabled for a new keyspace
co_await test_create_keyspace("test_explictly_disabled", false, cfg);
// Replication strategies that do not support tablets cannot be used when tablets are explicitly enabled
for (const auto& [rs_desc, rs_type] : db::replication_strategy_restriction_t::map()) {
if (rs_type != locator::replication_strategy_type::network_topology) {
auto f = co_await coroutine::as_future(test_create_keyspace("test_unsupported_replication_strategy", true, cfg, 0, rs_desc));
BOOST_REQUIRE_THROW(f.get(), exceptions::configuration_exception);
}
}
}
// Test that tablets can be explicitly disabled
// when creating a keyspace when the `tablets_mode_for_new_keyspaces`
// configuration option is set to `enabled`.
SEASTAR_TEST_CASE(test_explicit_tablets_disable) {
auto cfg = tablet_cql_test_config(db::tablets_mode_t::mode::enabled);
// By default tablets are enabled
co_await test_create_keyspace("test_default_settings", std::nullopt, cfg);
// Tablets can be explicitly disabled for a new keyspace
co_await test_create_keyspace("test_explictly_disabled", false, cfg);
// Tablets can also be explicitly enabled for a new keyspace
co_await test_create_keyspace("test_explictly_enabled_0", true, cfg, 0);
co_await test_create_keyspace("test_explictly_enabled_128", true, cfg, 128);
}
// Test that when tablets they cannot be explicitly disabled
// when creating a keyspace when the `enable_tablets`
// configuration option is set to `force`.
SEASTAR_TEST_CASE(test_enforce_tablets) {
auto cfg = tablet_cql_test_config(db::tablets_mode_t::mode::enforced);
// By default tablets are enabled
co_await test_create_keyspace("test_default_settings", std::nullopt, cfg);
// Tablets cannot be explicitly disabled for a new keyspace
auto f = co_await coroutine::as_future(test_create_keyspace("test_not_explictly_disabled", false, cfg));
BOOST_REQUIRE_THROW(f.get(), exceptions::configuration_exception);
// Replication strategies that do not support tablets cannot be used when tablets are explicitly enabled
for (const auto& [rs_desc, rs_type] : db::replication_strategy_restriction_t::map()) {
if (rs_type != locator::replication_strategy_type::network_topology) {
auto f = co_await coroutine::as_future(test_create_keyspace("test_unsupported_replication_strategy", true, cfg, 0, rs_desc));
BOOST_REQUIRE_THROW(f.get(), exceptions::configuration_exception);
}
}
}
SEASTAR_TEST_CASE(test_recognition_of_deprecated_name_for_resize_transition) {
using transition_state = service::topology::transition_state;
BOOST_REQUIRE_EQUAL(service::transition_state_from_string("tablet split finalization"), transition_state::tablet_split_finalization);
BOOST_REQUIRE_EQUAL(service::transition_state_from_string("tablet resize finalization"), transition_state::tablet_resize_finalization);
return make_ready_future<>();
}
SEASTAR_THREAD_TEST_CASE(test_tablets_describe_ring) {
auto cfg = tablet_cql_test_config(db::tablets_mode_t::mode::enforced);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto& db = e.local_db();
auto& ss = e.get_storage_service().local();
auto& gossiper = ss.gossiper();
auto& am = gossiper.get_mutable_address_map();
size_t num_racks = 3;
size_t nodes_per_rack = 10;
size_t shards_per_node = 8;
std::vector<endpoint_dc_rack> racks;
auto min_tablet_count = 10240;
auto& cfg = e.db_config();
cfg.tablets_per_shard_goal(2 * min_tablet_count / (nodes_per_rack * shards_per_node));
racks.push_back(topo.rack());
for (size_t i = 1; i < num_racks; ++i) {
racks.push_back(topo.start_new_rack());
}
for (size_t i = 0; i < num_racks; ++i) {
for (size_t j = 0; j < nodes_per_rack; ++j) {
auto id = topo.add_node(node_state::normal, shards_per_node, racks[i]);
auto addr = topo.host_addresses().at(id);
am.add_or_update_entry(id, addr);
}
}
auto ks = add_keyspace(e, {{topo.dc(), num_racks}}, num_racks * nodes_per_rack);
auto table = add_table(e, ks, std::map<sstring, sstring>({{"min_tablet_count", std::to_string(min_tablet_count)}})).get();
auto s = db.find_schema(table);
auto ring = ss.describe_ring_for_table(s->ks_name(), s->cf_name()).get();
BOOST_REQUIRE_GE(ring.size(), min_tablet_count);
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_tablet_auto_repair_rf1) {
cql_test_config cfg_in;
cfg_in.db_config->auto_repair_enabled_default(true);
cfg_in.db_config->auto_repair_threshold_default_in_seconds(1);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
unsigned shard_count = 1;
auto dc1 = topo.dc();
auto rack1 = topo.rack();
[[maybe_unused]] auto host1 = topo.add_node(node_state::normal, shard_count);
auto rack2 = topo.start_new_rack();
[[maybe_unused]] auto host2 = topo.add_node(node_state::normal, shard_count);
auto ks_name = add_keyspace(e, {{dc1, 1}}, 1);
auto table1 = add_table(e, ks_name).get();
tablet_id tablet{0};
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(1);
auto tid = tmap.first_tablet();
tablet = tid;
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica{host1, 0},
}
});
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
bool once = false;
rebalance_tablets(e, nullptr, {}, [&once] (const migration_plan& plan) { return std::exchange(once, true); });
BOOST_REQUIRE(stm.get()->tablets().get_tablet_map(table1).get_tablet_transition_info(tablet) == nullptr);
}, std::move(cfg_in)).get();
}
void run_tablet_manual_repair_rf1(cql_test_env& e) {
topology_builder topo(e);
unsigned shard_count = 1;
auto dc1 = topo.dc();
auto rack1 = topo.rack();
[[maybe_unused]] auto host1 = topo.add_node(node_state::normal, shard_count);
auto rack2 = topo.start_new_rack();
[[maybe_unused]] auto host2 = topo.add_node(node_state::normal, shard_count);
auto ks_name = add_keyspace(e, {{dc1, 1}}, 1);
auto table1 = add_table(e, ks_name).get();
tablet_id tablet{0};
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(1);
auto tid = tmap.first_tablet();
tablet = tid;
tablet_info ti{
tablet_replica_set {
tablet_replica{host1, 0},
}
};
ti.repair_task_info = ti.repair_task_info.make_user_repair_request();
tmap.set_tablet(tid, std::move(ti));
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
});
auto& stm = e.shared_token_metadata().local();
bool once = false;
rebalance_tablets(e, nullptr, {}, [&once] (const migration_plan& plan) { return std::exchange(once, true); });
BOOST_REQUIRE(stm.get()->tablets().get_tablet_map(table1).get_tablet_transition_info(tablet)->transition == tablet_transition_kind::repair);
}
SEASTAR_THREAD_TEST_CASE(test_tablet_manual_repair_rf1_auto_repair_off) {
cql_test_config cfg_in;
cfg_in.db_config->auto_repair_enabled_default(false);
do_with_cql_env_thread(run_tablet_manual_repair_rf1, std::move(cfg_in)).get();
}
SEASTAR_THREAD_TEST_CASE(test_tablet_manual_repair_rf1_auto_repair_on) {
cql_test_config cfg_in;
cfg_in.db_config->auto_repair_enabled_default(true);
do_with_cql_env_thread(run_tablet_manual_repair_rf1, std::move(cfg_in)).get();
}
// Test for tablet_map::get_secondary_replica() and specifically how it
// relates to get_primary_replica().
// We never officially documented given a list of replicas, which replica
// is to be considered the "primary" - it's not simply the first replica in
// the list but the first in some reshuffling of the list, reshuffling whose
// details changed in commits like 817fdad and d88036d. So this patch doesn't
// enshrine what get_primary_replica() or get_secondary_replica() should
// return. It just verifies that get_secondary_replica() returns a *different*
// replica than get_primary_replica() if there are 2 or more replicas, or
// throws an error when there's just one replica.
// Reproduces SCYLLADB-777.
SEASTAR_THREAD_TEST_CASE(test_get_secondary_replica) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
locator::topology::config cfg = {
.this_endpoint = inet_address("127.0.0.1"),
.this_host_id = h1,
.local_dc_rack = endpoint_dc_rack::default_location,
};
auto topo = locator::topology(cfg);
topo.add_or_update_endpoint(h1, endpoint_dc_rack::default_location, node::state::normal);
topo.add_or_update_endpoint(h2, endpoint_dc_rack::default_location, node::state::normal);
topo.add_or_update_endpoint(h3, endpoint_dc_rack::default_location, node::state::normal);
// With 1 replica, get_secondary_replica should throw.
{
tablet_map tmap(1);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
}
});
BOOST_REQUIRE_THROW(tmap.get_secondary_replica(tid, topo), std::runtime_error);
}
// With 2 replicas, get_secondary_replica should return a different replica
// than get_primary_replica for every tablet.
{
tablet_map tmap(4);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 0},
}
});
}
for (auto tid : tmap.tablet_ids()) {
auto primary = tmap.get_primary_replica(tid, topo);
auto secondary = tmap.get_secondary_replica(tid, topo);
BOOST_REQUIRE(primary != secondary);
}
}
// With 3 replicas, same check.
{
tablet_map tmap(4);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 0},
tablet_replica {h3, 0},
}
});
}
for (auto tid : tmap.tablet_ids()) {
auto primary = tmap.get_primary_replica(tid, topo);
auto secondary = tmap.get_secondary_replica(tid, topo);
BOOST_REQUIRE(primary != secondary);
}
}
topo.clear_gently().get();
}
// The purpose of this test is to emulate a tablet aware restore process
// When a snapshot is taken, load balancing is disabled, so we record the tablet count in a manifest for backup.
// During restore, we set both min_tablet_count and max_tablet_count hints to the same value
// The test makes sure that during restore the tablet count <= max_tablet_count hint which
// allows us to leverage file-based streaming of SSTables, ensuring each SSTable is fully contained within a single tablet.
SEASTAR_THREAD_TEST_CASE(test_tablet_count_fixed_by_table_properties) {
auto cfg = tablet_cql_test_config();
cfg.db_config->tablets_per_shard_goal(16);
do_with_cql_env_thread([&cfg] (auto& e) {
topology_builder topo(e);
auto dc = topo.dc();
topo.add_node(node_state::normal, 1);
// keyspace 'initial' wants 8 tablets. We want to make sure that initial tablet count is greater than max_tablet_count hint
// to ensure that the hint is respected.
auto ks_name1 = add_keyspace(e, {{dc, 1}}, 8);
// Step 1: Create a table
e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1))", ks_name1)).get();
auto table1 = e.local_db().find_schema(ks_name1, "table1")->id();
auto& stm = e.shared_token_metadata().local();
auto get_tablet_count = [&] {
auto tm = stm.get();
return tm->tablets().get_tablet_map(table1).tablet_count();
};
shared_load_stats& load_stats = topo.get_shared_load_stats();
load_stats.set_size(table1, 0);
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_EQUAL(get_tablet_count(), 8);
// Step 2: Drop the table
e.execute_cql(fmt::format("DROP TABLE {}.table1", ks_name1)).get();
// Step 3: Create the same table with min_tablet_count=4 and max_tablet_count=4
auto force_tablet_count = 4;
e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1)) "
"WITH tablets = {{'min_tablet_count': {}, 'max_tablet_count': {}}}",
ks_name1, force_tablet_count, force_tablet_count)).get();
// We need fetch the table again as the previous table was dropped and recreated
table1 = e.local_db().find_schema(ks_name1, "table1")->id();
//Initially table will be empty
load_stats.set_size(table1, 0);
// Step 4: Make sure the tablet count is equal to force_tablet_count
BOOST_REQUIRE_EQUAL(get_tablet_count(), force_tablet_count);
// Step 5: Increase the load and make sure tablet count remains equal to force_tablet_count
load_stats.set_size(table1, default_target_tablet_size * force_tablet_count * 128);
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_EQUAL(get_tablet_count(), force_tablet_count);
// this should force tablets to be merged, the max_tablet_count hint will no longer be respected.
cfg.db_config->tablets_per_shard_goal(force_tablet_count / 2);
rebalance_tablets(e, &load_stats);
BOOST_REQUIRE_LE(get_tablet_count(), force_tablet_count);
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_tablet_options_min_and_max_tablet_count) {
auto cfg = tablet_cql_test_config();
cfg.db_config->tablets_per_shard_goal(16);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto dc = topo.dc();
topo.add_node(node_state::normal, 1);
auto ks_name1 = add_keyspace(e, {{dc, 1}}, 8);
// Test valid combinations
{
// min=64, max=128 - both powers of 2, should work
BOOST_CHECK_NO_THROW(e.execute_cql(fmt::format(
"CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1)) "
"WITH tablets = {{'min_tablet_count': 64, 'max_tablet_count': 128}}",
ks_name1)).get());
}
{
// min=100, max=200 - rounds to min=128, max=128, should work
BOOST_CHECK_NO_THROW(e.execute_cql(fmt::format(
"CREATE TABLE {}.table2 (p1 text, r1 int, PRIMARY KEY (p1)) "
"WITH tablets = {{'min_tablet_count': 100, 'max_tablet_count': 200}}",
ks_name1)).get());
}
// Test invalid combinations that should throw exceptions
{
// min=100, max=100 - rounds to min=128, max=64, invalid
BOOST_CHECK_EXCEPTION(e.execute_cql(fmt::format(
"CREATE TABLE {}.table3 (p1 text, r1 int, PRIMARY KEY (p1)) "
"WITH tablets = {{'min_tablet_count': 100, 'max_tablet_count': 100}}",
ks_name1)).get(),
exceptions::configuration_exception,
[&](const exceptions::configuration_exception& e) {
const auto msg = sstring(e.what());
return msg.contains("Invalid tablet count range");
});
}
{
// min=65, max=127 - rounds to min=128, max=64, invalid
BOOST_CHECK_EXCEPTION(e.execute_cql(fmt::format(
"CREATE TABLE {}.table4 (p1 text, r1 int, PRIMARY KEY (p1)) "
"WITH tablets = {{'min_tablet_count': 65, 'max_tablet_count': 127}}",
ks_name1)).get(),
exceptions::configuration_exception,
[&](const exceptions::configuration_exception& e) {
const auto msg = sstring(e.what());
return msg.contains("Invalid tablet count range");
});
}
{
// min=129, max=128 - even without rounding, min > max is invalid
BOOST_CHECK_EXCEPTION(e.execute_cql(fmt::format(
"CREATE TABLE {}.table6 (p1 text, r1 int, PRIMARY KEY (p1)) "
"WITH tablets = {{'min_tablet_count': 129, 'max_tablet_count': 128}}",
ks_name1)).get(),
exceptions::configuration_exception,
[&](const exceptions::configuration_exception& e) {
const auto msg = sstring(e.what());
return msg.contains("Invalid tablet count range");
});
}
}, cfg).get();
}
BOOST_AUTO_TEST_SUITE_END()