/* * Copyright (C) 2023-present-2020 ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "utils/UUID.hh" #include #include #include #include #undef SEASTAR_TESTING_MAIN #include #include "test/lib/random_utils.hh" #include "service/topology_mutation.hh" #include "service/storage_service.hh" #include #include #include #include "test/lib/cql_test_env.hh" #include "test/lib/log.hh" #include "test/lib/simple_schema.hh" #include "test/lib/key_utils.hh" #include "test/lib/test_utils.hh" #include "test/lib/topology_builder.hh" #include "db/config.hh" #include "cql3/util.hh" #include "db/schema_tables.hh" #include "schema/schema_builder.hh" #include "replica/tablets.hh" #include "replica/tablet_mutation_builder.hh" #include "locator/tablets.hh" #include "service/tablet_allocator.hh" #include "locator/tablet_replication_strategy.hh" #include "locator/tablet_sharder.hh" #include "locator/load_sketch.hh" #include "locator/snitch_base.hh" #include "utils/UUID_gen.hh" #include "utils/error_injection.hh" #include "utils/to_string.hh" #include "service/topology_coordinator.hh" #include "service/topology_state_machine.hh" #include "service/migration_manager.hh" #include #include BOOST_AUTO_TEST_SUITE(tablets_test) using namespace locator; using namespace replica; using namespace service; static inline future tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& keyspace_name, const sstring& table_name, api::timestamp_type ts, const gms::feature_service& features) { std::optional ret; co_await tablet_map_to_mutations(tablets, id, keyspace_name, table_name, ts, features, [&] (mutation m) { SCYLLA_ASSERT(!ret.has_value()); ret = std::move(m); return make_ready_future(); }); SCYLLA_ASSERT(ret.has_value()); co_return std::move(*ret); } static api::timestamp_type current_timestamp(cql_test_env& e) { // Mutations in system.tablets got there via group0, so in order for new // mutations to take effect, their timestamp should be "later" than that return utils::UUID_gen::micros_timestamp(e.get_system_keyspace().local().get_last_group0_state_id().get()) + 1; } static void verify_tablet_metadata_persistence(cql_test_env& env, const tablet_metadata& tm, api::timestamp_type& ts) { save_tablet_metadata(env.local_db(), tm, ts++).get(); auto tm2 = read_tablet_metadata(env.local_qp()).get(); BOOST_REQUIRE_EQUAL(tm, tm2); } static void verify_tablet_metadata_update(cql_test_env& env, tablet_metadata& tm, utils::chunked_vector muts) { testlog.trace("verify_tablet_metadata_update(): {}", muts); auto& db = env.local_db(); db.apply(freeze(muts), db::no_timeout).get(); locator::tablet_metadata_change_hint hint; for (const auto& mut : muts) { update_tablet_metadata_change_hint(hint, mut); } update_tablet_metadata(db, env.local_qp(), tm, hint).get(); auto tm_reload = read_tablet_metadata(env.local_qp()).get(); BOOST_REQUIRE_EQUAL(tm, tm_reload); } static cql_test_config tablet_cql_test_config(db::tablets_mode_t::mode enable_tablets = db::tablets_mode_t::mode::enabled) { cql_test_config c; c.db_config->tablets_mode_for_new_keyspaces(enable_tablets); if (c.db_config->enable_tablets_by_default()) { c.initial_tablets = 2; } return c; } static future add_table(cql_test_env& e, sstring test_ks_name = "", std::map tablet_options = {}) { auto id = table_id(utils::UUID_gen::get_time_UUID()); co_await e.create_table([&] (std::string_view ks_name) { if (!test_ks_name.empty()) { ks_name = test_ks_name; } auto builder = schema_builder(ks_name, id.to_sstring(), id) .with_column("p1", utf8_type, column_kind::partition_key) .with_column("r1", int32_type); if (!tablet_options.empty()) { builder.set_tablet_options(std::move(tablet_options)); } return *builder.build(); }); co_return id; } // Run in a seastar thread static sstring do_add_keyspace(cql_test_env& e, std::unordered_map>> dc_rf, int initial_tablets = 0) { static std::atomic ks_id = 0; auto ks_name = fmt::format("keyspace{}", ks_id.fetch_add(1)); sstring rf_options; for (auto& [dc, rf] : dc_rf) { auto rf_fmt = std::visit(overloaded_functor( [] (int rf) { return fmt::format("{}", rf); }, [] (const std::vector& racks) { return fmt::format("[{}]", fmt::join(racks | std::views::transform(&cql3::util::single_quote), ", ")); }), rf); rf_options += fmt::format(", '{}': {}", dc, rf_fmt); } testlog.info("Adding keyspace {} with replication factor options: {}", ks_name, rf_options); e.execute_cql(fmt::format("create keyspace {} with replication = {{'class': 'NetworkTopologyStrategy'{}}}" " and tablets = {{'enabled': true, 'initial': {}}}", ks_name, rf_options, initial_tablets)).get(); return ks_name; } // Run in a seastar thread static sstring add_keyspace(cql_test_env& e, std::unordered_map dc_rf, int initial_tablets = 0) { std::unordered_map>> dc_rf_expanded; for (auto& [dc, rf] : dc_rf) { dc_rf_expanded[dc] = rf; } return do_add_keyspace(e, std::move(dc_rf_expanded), initial_tablets); } // Run in a seastar thread static sstring add_keyspace_racks(cql_test_env& e, std::unordered_map> dc_rf, int initial_tablets = 0) { std::unordered_map>> dc_rf_expanded; for (auto& [dc, rf] : dc_rf) { dc_rf_expanded[dc] = rf; } return do_add_keyspace(e, std::move(dc_rf_expanded), initial_tablets); } // Run in a seastar thread void mutate_tablets(cql_test_env& e, const group0_guard& guard, seastar::noncopyable_function(tablet_metadata&)> mutator) { auto& stm = e.shared_token_metadata().local(); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { return mutator(tm.tablets()); }).get(); save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); } // Run in a seastar thread void mutate_tablets(cql_test_env& e, seastar::noncopyable_function(tablet_metadata&)> mutator) { abort_source as; auto guard = e.get_raft_group0_client().start_operation(as).get(); mutate_tablets(e, guard, std::move(mutator)); } SEASTAR_TEST_CASE(test_tablet_metadata_persistence) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); auto table1 = add_table(e).get(); auto table2 = add_table(e).get(); auto ts = current_timestamp(e); { tablet_metadata tm = read_tablet_metadata(e.local_qp()).get(); // Add table1 { tablet_map tmap(1); tmap.set_tablet(tmap.first_tablet(), tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h2, 3}, tablet_replica {h3, 1}, }, db_clock::now(), locator::tablet_task_info::make_auto_repair_request({}, {"dc1", "dc2"}), locator::tablet_task_info::make_intranode_migration_request(), 0 }); tm.set_tablet_map(table1, std::move(tmap)); } verify_tablet_metadata_persistence(e, tm, ts); // Add table2 { tablet_map tmap(4); auto tb = tmap.first_tablet(); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, }, {}, {}, locator::tablet_task_info::make_migration_request(), 0 }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h3, 3}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h2, 2}, }, {}, {}, locator::tablet_task_info::make_migration_request(), 0 }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 1}, } }); tm.set_tablet_map(table2, std::move(tmap)); } verify_tablet_metadata_persistence(e, tm, ts); // Increase RF of table2 tm.mutate_tablet_map_async(table2, [&] (tablet_map& tmap) { auto tb = tmap.first_tablet(); tb = *tmap.next_tablet(tb); tmap.set_tablet_transition_info(tb, tablet_transition_info{ tablet_transition_stage::allow_write_both_read_old, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h3, 3}, tablet_replica {h1, 7}, }, tablet_replica {h1, 7} }); tb = *tmap.next_tablet(tb); tmap.set_tablet_transition_info(tb, tablet_transition_info{ tablet_transition_stage::use_new, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h1, 4}, tablet_replica {h2, 2}, }, tablet_replica {h1, 4}, session_id(utils::UUID_gen::get_time_UUID()) }); return make_ready_future(); }).get(); verify_tablet_metadata_persistence(e, tm, ts); // Reduce tablet count in table2 { tablet_map tmap(2); auto tb = tmap.first_tablet(); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h3, 3}, } }); tm.set_tablet_map(table2, std::move(tmap)); } verify_tablet_metadata_persistence(e, tm, ts); // Reduce RF for table1, increasing tablet count { tablet_map tmap(2); auto tb = tmap.first_tablet(); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h3, 7}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 3}, } }); tm.set_tablet_map(table1, std::move(tmap)); } verify_tablet_metadata_persistence(e, tm, ts); // Reduce tablet count for table1 { tablet_map tmap(1); auto tb = tmap.first_tablet(); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 3}, } }); tm.set_tablet_map(table1, std::move(tmap)); } verify_tablet_metadata_persistence(e, tm, ts); // Change replica of table1 { tablet_map tmap(1); auto tb = tmap.first_tablet(); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h3, 7}, } }); tm.set_tablet_map(table1, std::move(tmap)); } verify_tablet_metadata_persistence(e, tm, ts); // Change resize decision of table1 { tablet_map tmap(1); locator::resize_decision decision; decision.way = locator::resize_decision::split{}, decision.sequence_number = 1; tmap.set_resize_decision(decision); tmap.set_resize_task_info(locator::tablet_task_info::make_split_request()); tm.set_tablet_map(table1, std::move(tmap)); } verify_tablet_metadata_persistence(e, tm, ts); } }, tablet_cql_test_config()); } SEASTAR_THREAD_TEST_CASE(test_invalid_colocated_tables) { auto cfg = tablet_cql_test_config(db::tablets_mode_t::mode::enforced); do_with_cql_env_thread([] (auto& e) { auto& sp = e.get_storage_proxy().local(); auto& mm = e.migration_manager().local(); topology_builder topo(e); topo.add_node(node_state::normal, 1); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); auto ksm = e.local_db().find_keyspace(ks_name).metadata(); const auto t = schema_builder("ks", "t") .with_column("pk", int32_type, column_kind::partition_key) .build(); const auto t_paxos = schema_builder("ks", "t$paxos") .with_column("pk", int32_type, column_kind::partition_key) .build(); const auto t_paxos_paxos = schema_builder("ks", "t$paxos$paxos") .with_column("pk", int32_type, column_kind::partition_key) .build(); // check we can't colocate a new table with another new colocated table // in one prepare_new_column_families_announcement call { utils::chunked_vector muts; seastar::testing::scoped_no_abort_on_internal_error abort_guard{}; BOOST_CHECK_EXCEPTION(service::prepare_new_column_families_announcement(muts, sp, *ksm, {t, t_paxos, t_paxos_paxos}, utils::UUID_gen::get_time_UUID().timestamp()) .get(), std::runtime_error, [&](const std::runtime_error& e) { const auto expected_message = ::format("Trying to set co-located table {} with base table {} but it's not a base table.", t_paxos_paxos->id(), t_paxos->id()); return sstring(e.what()).starts_with(expected_message); }); } // create a colocated table { auto g = mm.start_group0_operation().get(); utils::chunked_vector muts; service::prepare_new_column_families_announcement(muts, e.get_storage_proxy().local(), *ksm, {t, t_paxos}, g.write_timestamp()).get(); mm.announce(std::move(muts), std::move(g), "create test tables").get(); } // check the same with an already existing colocated table { seastar::testing::scoped_no_abort_on_internal_error abort_guard{}; BOOST_CHECK_EXCEPTION(service::prepare_new_column_family_announcement( e.get_storage_proxy().local(), t_paxos_paxos, utils::UUID_gen::get_time_UUID().timestamp()) .get(), std::runtime_error, [&](const std::runtime_error& e) { const auto expected_message = ::format("Trying to set co-located table {} with base table {} but it's not a base table.", t_paxos_paxos->id(), t_paxos->id()); return sstring(e.what()).starts_with(expected_message); }); } }, tablet_cql_test_config()) .get(); } SEASTAR_TEST_CASE(test_paused_rf_change_requests_persistence) { return do_with_cql_env_thread([] (cql_test_env& e) { topology_builder topo(e); auto topology = e.get_system_keyspace().local().load_topology_state({}).get(); // Check scheduled_rf_change_requests. std::unordered_set current_requests; auto new_id1 = utils::make_random_uuid(); topo.pause_rf_change_request(new_id1); current_requests.insert(new_id1); auto new_id2 = utils::make_random_uuid(); topo.pause_rf_change_request(new_id2); current_requests.insert(new_id2); topology = e.get_system_keyspace().local().load_topology_state({}).get(); BOOST_REQUIRE_EQUAL(current_requests.size(), topology.paused_rf_change_requests.size()); for (const auto& request : current_requests) { BOOST_REQUIRE(topology.paused_rf_change_requests.contains(request)); } topo.resume_rf_change_request(current_requests, new_id1); current_requests.erase(new_id1); topology = e.get_system_keyspace().local().load_topology_state({}).get(); BOOST_REQUIRE_EQUAL(current_requests.size(), topology.paused_rf_change_requests.size()); for (const auto& request : current_requests) { BOOST_REQUIRE(topology.paused_rf_change_requests.contains(request)); } }, tablet_cql_test_config()); } SEASTAR_TEST_CASE(test_tablet_metadata_persistence_with_colocated_tables) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); auto table1 = add_table(e).get(); auto table2 = add_table(e).get(); auto ts = current_timestamp(e); { tablet_metadata tm = read_tablet_metadata(e.local_qp()).get(); // Add table1 { tablet_map tmap(1); tmap.set_tablet(tmap.first_tablet(), tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h2, 3}, tablet_replica {h3, 1}, }, db_clock::now(), locator::tablet_task_info::make_auto_repair_request({}, {"dc1", "dc2"}), locator::tablet_task_info::make_intranode_migration_request(), 0 }); tm.set_tablet_map(table1, std::move(tmap)); } // Add table2 as a co-located table of table1 tm.set_colocated_table(table2, table1).get(); const auto& tmap1 = tm.get_tablet_map(table1); const auto& tmap2 = tm.get_tablet_map(table2); BOOST_REQUIRE_EQUAL(tmap1, tmap2); verify_tablet_metadata_persistence(e, tm, ts); } }, tablet_cql_test_config()); } SEASTAR_TEST_CASE(test_read_required_hosts) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); tablet_metadata tm = read_tablet_metadata(e.local_qp()).get(); auto ts = current_timestamp(e); verify_tablet_metadata_persistence(e, tm, ts); BOOST_REQUIRE_EQUAL(std::unordered_set({}), read_required_hosts(e.local_qp()).get()); // Add table1 auto table1 = add_table(e).get(); { tablet_map tmap(1); tmap.set_tablet(tmap.first_tablet(), tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h2, 3}, } }); tm.set_tablet_map(table1, std::move(tmap)); } ts = current_timestamp(e); verify_tablet_metadata_persistence(e, tm, ts); BOOST_REQUIRE_EQUAL(std::unordered_set({h1, h2}), read_required_hosts(e.local_qp()).get()); // Add table2 auto table2 = add_table(e).get(); { tablet_map tmap(2); auto tb = tmap.first_tablet(); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h2, 0}, } }); tmap.set_tablet_transition_info(tb, tablet_transition_info{ tablet_transition_stage::allow_write_both_read_old, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h3, 0}, }, tablet_replica {h3, 0} }); tm.set_tablet_map(table2, std::move(tmap)); } ts = current_timestamp(e); verify_tablet_metadata_persistence(e, tm, ts); BOOST_REQUIRE_EQUAL(std::unordered_set({h1, h2, h3}), read_required_hosts(e.local_qp()).get()); }, tablet_cql_test_config()); } // Check that updating tablet-metadata and reloading only modified parts from // disk yields the correct metadata. SEASTAR_TEST_CASE(test_tablet_metadata_update) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); auto& db = e.local_db(); auto table1 = add_table(e).get(); auto table1_schema = db.find_schema(table1); auto table2 = add_table(e).get(); auto table2_schema = db.find_schema(table2); testlog.trace("table1: {}", table1); testlog.trace("table2: {}", table2); tablet_metadata tm = read_tablet_metadata(e.local_qp()).get(); auto ts = current_timestamp(e); // Add table1 { testlog.trace("add table1"); tablet_map tmap(1); tmap.set_tablet(tmap.first_tablet(), tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h2, 3}, tablet_replica {h3, 1}, } }); verify_tablet_metadata_update(e, tm, { tablet_map_to_mutation(tmap, table1, table1_schema->ks_name(), table1_schema->cf_name(), ++ts, db.features()).get(), }); } // Add table2 { testlog.trace("add table2"); tablet_map tmap(4); auto tb = tmap.first_tablet(); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h3, 3}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h2, 2}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 1}, } }); verify_tablet_metadata_update(e, tm, { tablet_map_to_mutation(tmap, table2, table2_schema->ks_name(), table2_schema->cf_name(), ++ts, db.features()).get(), }); } // Increase RF of table2 { testlog.trace("increates RF of table2"); const auto& tmap = tm.get_tablet_map(table2); auto tb = tmap.first_tablet(); replica::tablet_mutation_builder builder(ts++, table2); tb = *tmap.next_tablet(tb); builder.set_new_replicas(tmap.get_last_token(tb), tablet_replica_set { tablet_replica {h1, 7}, } ); builder.set_stage(tmap.get_last_token(tb), tablet_transition_stage::allow_write_both_read_old); builder.set_transition(tmap.get_last_token(tb), tablet_transition_kind::migration); tb = *tmap.next_tablet(tb); builder.set_new_replicas(tmap.get_last_token(tb), tablet_replica_set { tablet_replica {h1, 4}, } ); builder.set_stage(tmap.get_last_token(tb), tablet_transition_stage::use_new); builder.set_transition(tmap.get_last_token(tb), tablet_transition_kind::migration); verify_tablet_metadata_update(e, tm, { builder.build(), }); } // Reduce RF for table1, increasing tablet count { testlog.trace("reduce RF for table1, increasing tablet count"); tablet_map tmap(2); auto tb = tmap.first_tablet(); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h3, 7}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 3}, } }); verify_tablet_metadata_update(e, tm, { tablet_map_to_mutation(tmap, table1, table1_schema->ks_name(), table1_schema->cf_name(), ++ts, db.features()).get(), }); } // Reduce tablet count for table1 { testlog.trace("reduce tablet count for table1"); tablet_map tmap(1); auto tb = tmap.first_tablet(); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 3}, } }); verify_tablet_metadata_update(e, tm, { tablet_map_to_mutation(tmap, table1, table1_schema->ks_name(), table1_schema->cf_name(), ++ts, db.features()).get(), }); } // Change replica of table1 { testlog.trace("change replica of table1"); replica::tablet_mutation_builder builder(ts++, table1); const auto& tmap = tm.get_tablet_map(table1); auto tb = tmap.first_tablet(); builder.set_replicas(tmap.get_last_token(tb), tablet_replica_set { tablet_replica {h3, 7}, } ); verify_tablet_metadata_update(e, tm, { builder.build(), }); } // Migrate all tablets of table2 { testlog.trace("stream all tablets of table2"); const auto& tmap = tm.get_tablet_map(table2); utils::chunked_vector muts; for (std::optional tb = tmap.first_tablet(); tb; tb = tmap.next_tablet(*tb)) { replica::tablet_mutation_builder builder(ts++, table2); const auto token = tmap.get_last_token(*tb); builder.set_new_replicas(token, tablet_replica_set { tablet_replica {h2, 7}, } ); builder.set_stage(token, tablet_transition_stage::streaming); builder.set_transition(token, tablet_transition_kind::rebuild); muts.emplace_back(builder.build()); } verify_tablet_metadata_update(e, tm, std::move(muts)); } // Remove transitions from tablets of table2 { testlog.trace("stream all tablets of table2"); const auto& tmap = tm.get_tablet_map(table2); utils::chunked_vector muts; for (std::optional tb = tmap.first_tablet(); tb; tb = tmap.next_tablet(*tb)) { replica::tablet_mutation_builder builder(ts++, table2); const auto token = tmap.get_last_token(*tb); builder.set_replicas(token, tablet_replica_set { tablet_replica {h2, 7}, } ); builder.del_transition(token); muts.emplace_back(builder.build()); } verify_tablet_metadata_update(e, tm, std::move(muts)); } // Drop table2 { testlog.trace("drop table2"); verify_tablet_metadata_update(e, tm, { make_drop_tablet_map_mutation(table2, ts++) }); } }, tablet_cql_test_config()); } SEASTAR_TEST_CASE(test_tablet_metadata_hint) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); auto table1 = add_table(e).get(); auto table2 = add_table(e).get(); testlog.trace("table1: {}", table1); testlog.trace("table2: {}", table2); tablet_metadata tm = read_tablet_metadata(e.local_qp()).get(); auto ts = current_timestamp(e); auto check_hint = [&] (locator::tablet_metadata_change_hint& incremental_hint, utils::chunked_vector& muts, mutation new_mut, const locator::tablet_metadata_change_hint& expected_hint, std::source_location sl = std::source_location::current()) { testlog.info("check_hint() called from {}:{}", sl.file_name(), sl.line()); replica::update_tablet_metadata_change_hint(incremental_hint, new_mut); muts.emplace_back(new_mut); auto full_hint_opt = replica::get_tablet_metadata_change_hint(muts); if (expected_hint) { BOOST_REQUIRE(full_hint_opt); BOOST_REQUIRE_EQUAL(*full_hint_opt, incremental_hint); } else { BOOST_REQUIRE(!full_hint_opt); } BOOST_REQUIRE_EQUAL(incremental_hint, expected_hint); }; auto make_hint = [&] (std::initializer_list>> tablets) { locator::tablet_metadata_change_hint hint; for (const auto& [tid, tokens] : tablets) { hint.tables.emplace(tid, locator::tablet_metadata_change_hint::table_hint{.table_id = tid, .tokens = tokens}); } return hint; }; // Unrelated mutation generates no hint { utils::chunked_vector muts; locator::tablet_metadata_change_hint hint; simple_schema s; auto mut = s.new_mutation("pk1"); s.add_row(mut, s.make_ckey(1), "v"); check_hint(hint, muts, std::move(mut), {}); } // Incremental update of hint { utils::chunked_vector muts; locator::tablet_metadata_change_hint hint; const auto& tmap = tm.get_tablet_map(table1); std::vector tokens; for (std::optional tid = tmap.first_tablet(); tid; tid = tmap.next_tablet(*tid)) { const auto token = tmap.get_last_token(*tid); tokens.push_back(token); replica::tablet_mutation_builder builder(ts++, table1); builder.set_replicas(token, tablet_replica_set { tablet_replica {h2, 7}, } ); check_hint(hint, muts, builder.build(), make_hint({{table1, tokens}})); } } tm = read_tablet_metadata(e.local_qp()).get(); // Deletions (and static rows) should generate a partition hint. // Furthermore, if the partition had any row hints before, those should // be cleared, to force a full partition reload. auto check_delete_scenario = [&] (const char* scenario, std::function apply_delete) { testlog.info("check_delete_scenario({})", scenario); utils::chunked_vector muts; locator::tablet_metadata_change_hint hint; // Check that a deletion generates only a partiton hint { const auto delete_ts = ts++; replica::tablet_mutation_builder builder(delete_ts, table1); auto mut = builder.build(); apply_delete(table1, mut, delete_ts); check_hint(hint, muts, std::move(mut), make_hint({{table1, {}}})); } // First add a row, to check that the deletion will clear the tokens // vector -- convert the row hints to a partition hint { // Add a row which will add a row hint { const auto tokens = tm.get_tablet_map(table2).get_sorted_tokens().get(); replica::tablet_mutation_builder builder(ts++, table2); builder.set_replicas(tokens.front(), tablet_replica_set { tablet_replica {h3, 7}, } ); check_hint(hint, muts, builder.build(), make_hint({{table1, {}}, {table2, {tokens.front()}}})); } // Apply the deletion which should clear the row hint, but leave the partition hint { const auto delete_ts = ts++; replica::tablet_mutation_builder builder(delete_ts, table2); auto mut = builder.build(); apply_delete(table2, mut, delete_ts); check_hint(hint, muts, std::move(mut), make_hint({{table1, {}}, {table2, {}}})); } } tm = read_tablet_metadata(e.local_qp()).get(); }; // Not a real deletion, but it should act the same way as a delete. check_delete_scenario("static row", [&e] (table_id tbl, mutation& mut, api::timestamp_type delete_ts) { auto tbl_s = e.local_db().find_column_family(tbl).schema(); mut.set_static_cell("keyspace_name", data_value(tbl_s->ks_name()), delete_ts); }); check_delete_scenario("range tombstone", [&tm] (table_id tbl, mutation& mut, api::timestamp_type delete_ts) { auto s = db::system_keyspace::tablets(); const auto tokens = tm.get_tablet_map(tbl).get_sorted_tokens().get(); BOOST_REQUIRE_GE(tokens.size(), 2); const auto ck1 = clustering_key::from_single_value(*s, data_value(dht::token::to_int64(tokens[0])).serialize_nonnull()); const auto ck2 = clustering_key::from_single_value(*s, data_value(dht::token::to_int64(tokens[1])).serialize_nonnull()); mut.partition().apply_delete(*s, range_tombstone(ck1, bound_kind::excl_start, ck2, bound_kind::excl_end, tombstone(delete_ts, gc_clock::now()))); }); check_delete_scenario("row tombstone", [&tm] (table_id tbl, mutation& mut, api::timestamp_type delete_ts) { auto s = db::system_keyspace::tablets(); const auto tokens = tm.get_tablet_map(tbl).get_sorted_tokens().get(); const auto ck = clustering_key::from_single_value(*s, data_value(dht::token::to_int64(tokens[0])).serialize_nonnull()); mut.partition().apply_delete(*s, ck, tombstone(delete_ts, gc_clock::now())); }); // This will effectively drop both tables check_delete_scenario("partition tombstone", [] (table_id tbl, mutation& mut, api::timestamp_type delete_ts) { auto s = db::system_keyspace::tablets(); mut.partition().apply(tombstone(delete_ts, gc_clock::now())); }); }, tablet_cql_test_config()); } SEASTAR_TEST_CASE(test_get_shard) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); inet_address ip1("192.168.0.1"); inet_address ip2("192.168.0.2"); inet_address ip3("192.168.0.3"); auto table1 = table_id(utils::UUID_gen::get_time_UUID()); const auto shard_count = 2; semaphore sem(1); shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ locator::topology::config{ .this_endpoint = ip1, .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } }); auto stop_stm = deferred_stop(stm); tablet_id tid(0); tablet_id tid1(0); stm.mutate_token_metadata([&] (token_metadata& tm) { tm.update_topology(h1, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); tm.update_topology(h2, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); tm.update_topology(h3, locator::endpoint_dc_rack::default_location, node::state::normal, shard_count); tablet_metadata tmeta; tablet_map tmap(2); tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h3, 5}, } }); tid1 = *tmap.next_tablet(tid); tmap.set_tablet(tid1, tablet_info { tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h3, 1}, } }); tmap.set_tablet_transition_info(tid, tablet_transition_info { tablet_transition_stage::allow_write_both_read_old, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h2, 3}, }, tablet_replica {h2, 3} }); tmeta.set_tablet_map(table1, std::move(tmap)); tm.set_tablets(std::move(tmeta)); return make_ready_future<>(); }).get(); auto&& tmap = stm.get()->tablets().get_tablet_map(table1); auto get_shard = [&] (tablet_id tid, host_id host) { tablet_sharder sharder(*stm.get(), table1, host); return sharder.shard_for_reads(tmap.get_last_token(tid)); }; BOOST_REQUIRE_EQUAL(get_shard(tid1, h1), std::make_optional(shard_id(2))); BOOST_REQUIRE(!get_shard(tid1, h2)); BOOST_REQUIRE_EQUAL(get_shard(tid1, h3), std::make_optional(shard_id(1))); BOOST_REQUIRE_EQUAL(get_shard(tid, h1), std::make_optional(shard_id(0))); BOOST_REQUIRE_EQUAL(get_shard(tid, h2), std::make_optional(shard_id(3))); BOOST_REQUIRE_EQUAL(get_shard(tid, h3), std::make_optional(shard_id(5))); }, tablet_cql_test_config()); } SEASTAR_TEST_CASE(test_mutation_builder) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); auto table1 = add_table(e).get(); auto ts = current_timestamp(e); tablet_metadata tm; tablet_id tid(0); tablet_id tid1(0); { tablet_map tmap(2); tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h3, 5}, } }); tid1 = *tmap.next_tablet(tid); tmap.set_tablet(tid1, tablet_info { tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h3, 1}, } }); tm.set_tablet_map(table1, std::move(tmap)); } save_tablet_metadata(e.local_db(), tm, ts++).get(); { tablet_mutation_builder b(ts++, table1); auto last_token = tm.get_tablet_map(table1).get_last_token(tid1); b.set_new_replicas(last_token, tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h2, 3}, }); b.set_stage(last_token, tablet_transition_stage::write_both_read_new); b.set_transition(last_token, tablet_transition_kind::migration); e.local_db().apply({freeze(b.build())}, db::no_timeout).get(); } { tablet_map expected_tmap(2); tid = expected_tmap.first_tablet(); expected_tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h3, 5}, } }); tid1 = *expected_tmap.next_tablet(tid); expected_tmap.set_tablet(tid1, tablet_info { tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h3, 1}, } }); expected_tmap.set_tablet_transition_info(tid1, tablet_transition_info { tablet_transition_stage::write_both_read_new, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h2, 3}, }, tablet_replica {h2, 3} }); auto tm_from_disk = read_tablet_metadata(e.local_qp()).get(); BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1)); } { tablet_mutation_builder b(ts++, table1); auto last_token = tm.get_tablet_map(table1).get_last_token(tid1); b.set_stage(last_token, tablet_transition_stage::use_new); b.set_transition(last_token, tablet_transition_kind::migration); e.local_db().apply({freeze(b.build())}, db::no_timeout).get(); } { tablet_map expected_tmap(2); tid = expected_tmap.first_tablet(); expected_tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h3, 5}, } }); tid1 = *expected_tmap.next_tablet(tid); expected_tmap.set_tablet(tid1, tablet_info { tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h3, 1}, } }); expected_tmap.set_tablet_transition_info(tid1, tablet_transition_info { tablet_transition_stage::use_new, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h2, 3}, }, tablet_replica {h2, 3} }); auto tm_from_disk = read_tablet_metadata(e.local_qp()).get(); BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1)); } { tablet_mutation_builder b(ts++, table1); auto last_token = tm.get_tablet_map(table1).get_last_token(tid1); b.set_replicas(last_token, tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h2, 3}, }); b.del_transition(last_token); e.local_db().apply({freeze(b.build())}, db::no_timeout).get(); } { tablet_map expected_tmap(2); tid = expected_tmap.first_tablet(); expected_tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h3, 5}, } }); tid1 = *expected_tmap.next_tablet(tid); expected_tmap.set_tablet(tid1, tablet_info { tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h2, 3}, } }); auto tm_from_disk = read_tablet_metadata(e.local_qp()).get(); BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1)); } static const auto resize_decision = locator::resize_decision("split", 1); { tablet_mutation_builder b(ts++, table1); auto last_token = tm.get_tablet_map(table1).get_last_token(tid1); b.set_replicas(last_token, tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h2, 3}, }); b.del_transition(last_token); b.set_resize_decision(resize_decision, e.local_db().features()); e.local_db().apply({freeze(b.build())}, db::no_timeout).get(); } { tablet_map expected_tmap(2); tid = expected_tmap.first_tablet(); expected_tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h3, 5}, } }); tid1 = *expected_tmap.next_tablet(tid); expected_tmap.set_tablet(tid1, tablet_info { tablet_replica_set { tablet_replica {h1, 2}, tablet_replica {h2, 3}, } }); expected_tmap.set_resize_decision(resize_decision); auto tm_from_disk = read_tablet_metadata(e.local_qp()).get(); expected_tmap.set_resize_task_info(tm_from_disk.get_tablet_map(table1).resize_task_info()); BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1)); } }, tablet_cql_test_config()); } SEASTAR_TEST_CASE(test_sharder) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); auto table1 = table_id(utils::UUID_gen::get_time_UUID()); token_metadata tokm(e.get_shared_token_metadata().local(), token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } }); tokm.get_topology().add_or_update_endpoint(h1); std::vector tablet_ids; { tablet_map tmap(8); auto tid = tmap.first_tablet(); tablet_ids.push_back(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 3}, tablet_replica {h3, 5}, } }); tid = *tmap.next_tablet(tid); tablet_ids.push_back(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h2, 3}, tablet_replica {h3, 1}, } }); tid = *tmap.next_tablet(tid); tablet_ids.push_back(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h3, 2}, tablet_replica {h1, 1}, } }); tmap.set_tablet_transition_info(tid, tablet_transition_info { tablet_transition_stage::use_new, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h1, 1}, tablet_replica {h2, 3}, }, tablet_replica {h2, 3} }); tid = *tmap.next_tablet(tid); tablet_ids.push_back(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h3, 7}, tablet_replica {h2, 3}, } }); // tablet_ids[4] // h1 is leaving, h3 is pending tid = *tmap.next_tablet(tid); tablet_ids.push_back(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 5}, tablet_replica {h2, 1}, } }); tmap.set_tablet_transition_info(tid, tablet_transition_info { tablet_transition_stage::allow_write_both_read_old, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h3, 7}, tablet_replica {h2, 1}, }, tablet_replica {h3, 7} }); // tablet_ids[5] // h1 is leaving, h3 is pending tid = *tmap.next_tablet(tid); tablet_ids.push_back(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 5}, tablet_replica {h2, 1}, } }); tmap.set_tablet_transition_info(tid, tablet_transition_info { tablet_transition_stage::write_both_read_old, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h3, 7}, tablet_replica {h2, 1}, }, tablet_replica {h3, 7} }); // tablet_ids[6] // h1 is leaving, h3 is pending tid = *tmap.next_tablet(tid); tablet_ids.push_back(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 5}, tablet_replica {h2, 1}, } }); tmap.set_tablet_transition_info(tid, tablet_transition_info { tablet_transition_stage::write_both_read_new, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h3, 7}, tablet_replica {h2, 1}, }, tablet_replica {h3, 7} }); // tablet_ids[7] // h1 is leaving, h3 is pending tid = *tmap.next_tablet(tid); tablet_ids.push_back(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 5}, tablet_replica {h2, 1}, } }); tmap.set_tablet_transition_info(tid, tablet_transition_info { tablet_transition_stage::use_new, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {h3, 7}, tablet_replica {h2, 1}, }, tablet_replica {h3, 7} }); tablet_metadata tm; tm.set_tablet_map(table1, std::move(tmap)); tokm.set_tablets(std::move(tm)); } auto& tm = tokm.tablets().get_tablet_map(table1); tablet_sharder sharder(tokm, table1); // for h1 tablet_sharder sharder_h3(tokm, table1, h3); BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[0])), 3); BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[1])), 0); // missing BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[2])), 1); BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[3])), 0); // missing BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[0])), dht::shard_replica_set{3}); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[1])), dht::shard_replica_set{}); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[2])), dht::shard_replica_set{1}); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[3])), dht::shard_replica_set{}); // Shard for read should be stable across stages of migration. The coordinator may route // requests to the leaving replica even if the stage on the replica side is use_new. BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[4])), 5); BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[5])), 5); BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[6])), 5); BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[7])), 5); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[4])), dht::shard_replica_set{5}); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[5])), dht::shard_replica_set{5}); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[6])), dht::shard_replica_set{5}); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[7])), dht::shard_replica_set{5}); // On pending host BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_reads(tm.get_last_token(tablet_ids[4])), 7); BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_reads(tm.get_last_token(tablet_ids[5])), 7); BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_reads(tm.get_last_token(tablet_ids[6])), 7); BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_reads(tm.get_last_token(tablet_ids[7])), 7); BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_writes(tm.get_last_token(tablet_ids[4])), dht::shard_replica_set{7}); BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_writes(tm.get_last_token(tablet_ids[5])), dht::shard_replica_set{7}); BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_writes(tm.get_last_token(tablet_ids[6])), dht::shard_replica_set{7}); BOOST_REQUIRE_EQUAL(sharder_h3.shard_for_writes(tm.get_last_token(tablet_ids[7])), dht::shard_replica_set{7}); BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_last_token(tablet_ids[1]), 0), tm.get_first_token(tablet_ids[3])); BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_last_token(tablet_ids[1]), 1), tm.get_first_token(tablet_ids[2])); BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_last_token(tablet_ids[1]), 3), dht::maximum_token()); BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_first_token(tablet_ids[1]), 0), tm.get_first_token(tablet_ids[3])); BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_first_token(tablet_ids[1]), 1), tm.get_first_token(tablet_ids[2])); BOOST_REQUIRE_EQUAL(sharder.token_for_next_shard_for_reads(tm.get_first_token(tablet_ids[1]), 3), dht::maximum_token()); { auto shard_opt = sharder.next_shard_for_reads(tm.get_last_token(tablet_ids[0])); BOOST_REQUIRE(shard_opt); BOOST_REQUIRE_EQUAL(shard_opt->shard, 0); BOOST_REQUIRE_EQUAL(shard_opt->token, tm.get_first_token(tablet_ids[1])); } { auto shard_opt = sharder.next_shard_for_reads(tm.get_last_token(tablet_ids[1])); BOOST_REQUIRE(shard_opt); BOOST_REQUIRE_EQUAL(shard_opt->shard, 1); BOOST_REQUIRE_EQUAL(shard_opt->token, tm.get_first_token(tablet_ids[2])); } { auto shard_opt = sharder.next_shard_for_reads(tm.get_last_token(tablet_ids[2])); BOOST_REQUIRE(shard_opt); BOOST_REQUIRE_EQUAL(shard_opt->shard, 0); BOOST_REQUIRE_EQUAL(shard_opt->token, tm.get_first_token(tablet_ids[3])); } { auto shard_opt = sharder.next_shard_for_reads(tm.get_last_token(tablet_ids[tablet_ids.size() - 1])); BOOST_REQUIRE(!shard_opt); } }, tablet_cql_test_config()); } SEASTAR_TEST_CASE(test_intranode_sharding) { return do_with_cql_env_thread([] (cql_test_env& e) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto table1 = table_id(utils::UUID_gen::get_time_UUID()); locator::token_metadata::config tm_cfg; tm_cfg.topo_cfg.this_host_id = h1; tm_cfg.topo_cfg.local_dc_rack = endpoint_dc_rack::default_location; semaphore sem(1); shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); auto stop_stm = deferred_stop(stm); auto tmptr = stm.make_token_metadata_ptr(); auto& tokm = *tmptr; tokm.get_topology().add_or_update_endpoint(h1); auto leaving_replica = tablet_replica{h1, 5}; auto pending_replica = tablet_replica{h1, 7}; auto const_replica = tablet_replica{h2, 1}; // Prepare a tablet map with different tablets being in intra-node migration at different stages. std::vector tablet_ids; { tablet_map tmap(4); auto tid = tmap.first_tablet(); auto set_tablet = [&] (tablet_id tid, tablet_transition_stage stage) { tablet_ids.push_back(tid); tmap.set_tablet(tid, tablet_info{ tablet_replica_set{leaving_replica, const_replica} }); tmap.set_tablet_transition_info(tid, tablet_transition_info { stage, tablet_transition_kind::intranode_migration, tablet_replica_set{pending_replica, const_replica}, pending_replica }); }; // tablet_ids[0] set_tablet(tid, tablet_transition_stage::allow_write_both_read_old); // tablet_ids[1] tid = *tmap.next_tablet(tid); set_tablet(tid, tablet_transition_stage::write_both_read_old); // tablet_ids[2] tid = *tmap.next_tablet(tid); set_tablet(tid, tablet_transition_stage::write_both_read_new); // tablet_ids[3] tid = *tmap.next_tablet(tid); set_tablet(tid, tablet_transition_stage::use_new); tablet_metadata tm; tm.set_tablet_map(table1, std::move(tmap)); tokm.set_tablets(std::move(tm)); } auto& tm = tokm.tablets().get_tablet_map(table1); tablet_sharder sharder(tokm, table1); // for h1 BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[0])), 5); BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[1])), 5); BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[2])), 7); BOOST_REQUIRE_EQUAL(sharder.shard_for_reads(tm.get_last_token(tablet_ids[3])), 7); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[0])), dht::shard_replica_set{5}); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[1])), dht::shard_replica_set({7, 5})); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[2])), dht::shard_replica_set({7, 5})); BOOST_REQUIRE_EQUAL(sharder.shard_for_writes(tm.get_last_token(tablet_ids[3])), dht::shard_replica_set{7}); // On const replica tablet_sharder sharder_h2(tokm, table1, const_replica.host); for (auto id : tablet_ids) { BOOST_REQUIRE_EQUAL(sharder_h2.shard_for_reads(tm.get_last_token(id)), const_replica.shard); BOOST_REQUIRE_EQUAL(sharder_h2.shard_for_writes(tm.get_last_token(id)), dht::shard_replica_set{const_replica.shard}); } }, tablet_cql_test_config()); } SEASTAR_TEST_CASE(test_large_tablet_metadata) { return do_with_cql_env_thread([] (cql_test_env& e) { tablet_metadata tm; auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); const int nr_tables = 1'00; const int tablets_per_table = 1024; for (int i = 0; i < nr_tables; ++i) { tablet_map tmap(tablets_per_table); for (tablet_id j : tmap.tablet_ids()) { tmap.set_tablet(j, tablet_info { tablet_replica_set {{h1, 0}, {h2, 1}, {h3, 2},} }); } auto id = add_table(e).get(); tm.set_tablet_map(id, std::move(tmap)); } auto ts = current_timestamp(e); verify_tablet_metadata_persistence(e, tm, ts); }, tablet_cql_test_config()); } SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) { const auto real_min_token = dht::token::first(); const auto real_max_token = dht::token::last(); for (auto&& tmap : { tablet_map(1), tablet_map(2), tablet_map(4), tablet_map(16), tablet_map(1024), }) { testlog.debug("tmap: {}", tmap); BOOST_REQUIRE_EQUAL(real_min_token, tmap.get_first_token(tmap.first_tablet())); BOOST_REQUIRE_EQUAL(real_max_token, tmap.get_last_token(tmap.last_tablet())); std::optional prev_tb; for (tablet_id tb : tmap.tablet_ids()) { testlog.debug("first: {}, last: {}", tmap.get_first_token(tb), tmap.get_last_token(tb)); BOOST_REQUIRE_EQUAL(tb, tmap.get_tablet_id(tmap.get_first_token(tb))); BOOST_REQUIRE_EQUAL(tb, tmap.get_tablet_id(tmap.get_last_token(tb))); if (prev_tb) { BOOST_REQUIRE_EQUAL(dht::next_token(tmap.get_last_token(*prev_tb)), tmap.get_first_token(tb)); } prev_tb = tb; } } } static future<> apply_resize_plan(token_metadata& tm, const migration_plan& plan) { for (auto [table_id, resize_decision] : plan.resize_plan().resize) { co_await tm.tablets().mutate_tablet_map_async(table_id, [&] (tablet_map& tmap) { resize_decision.sequence_number = tmap.resize_decision().sequence_number + 1; tmap.set_resize_decision(resize_decision); return make_ready_future(); }); } } static future save_token_metadata(cql_test_env& e, group0_guard guard) { auto& stm = e.local_db().get_shared_token_metadata(); auto tm = stm.get(); e.get_topology_state_machine().local()._topology.version = tm->get_version(); co_await save_tablet_metadata(e.local_db(), tm->tablets(), guard.write_timestamp()); utils::chunked_vector muts; muts.push_back(freeze(topology_mutation_builder(guard.write_timestamp()) .set_version(tm->get_version()) .build().to_mutation(db::system_keyspace::topology()))); co_await e.local_db().apply(muts, db::no_timeout); co_await e.get_storage_service().local().update_tablet_metadata({}); // Need a new guard to make sure later changes use later timestamp. // Also, so that the table layer processes the changes we persisted, which is important for splits. // Before we can finalize a split, the storage group needs to process the split by creating split-ready compaction groups. release_guard(std::move(guard)); abort_source as; co_return co_await e.get_raft_group0_client().start_operation(as); } static future<> handle_resize_finalize(cql_test_env& e, group0_guard& guard, const migration_plan& plan, shared_load_stats* load_stats) { auto& talloc = e.get_tablet_allocator().local(); auto& stm = e.shared_token_metadata().local(); auto old_tm = stm.get(); bool changed = false; for (auto table_id : plan.resize_plan().finalize_resize) { auto tm = stm.get(); const auto& old_tmap = tm->tablets().get_tablet_map(table_id); auto new_tmap = co_await talloc.resize_tablets(tm, table_id); auto new_resize_decision = locator::resize_decision{}; new_resize_decision.sequence_number = old_tmap.resize_decision().next_sequence_number(); new_tmap.set_resize_decision(std::move(new_resize_decision)); co_await stm.mutate_token_metadata([table_id, &new_tmap, &changed] (token_metadata& tm) { changed = true; tm.tablets().set_tablet_map(table_id, std::move(new_tmap)); tm.set_version(tm.get_version() + 1); return make_ready_future<>(); }); } if (changed) { // Need to reload on each resize because table object expects tablet count to change by a factor of 2. guard = co_await save_token_metadata(e, std::move(guard)); if (load_stats) { auto new_tm = stm.get(); auto reconciled_stats = load_stats->stats.reconcile_tablets_resize(plan.resize_plan().finalize_resize, *old_tm, *new_tm); if (reconciled_stats) { load_stats->stats = *reconciled_stats; } } testlog.debug("Calling local_topology_barrier()"); old_tm = nullptr; co_await e.get_storage_service().local().local_topology_barrier(); testlog.debug("Finished local_topology_barrier()"); } } static future<> apply_repair_transitions(token_metadata& tm, const migration_plan& plan) { for (const auto& repair : plan.repair_plan().repairs()) { co_await tm.tablets().mutate_tablet_map_async(repair.table, [&] (tablet_map& tmap) { auto tablet_info = tmap.get_tablet_info(repair.tablet); tmap.set_tablet_transition_info(repair.tablet, tablet_transition_info{ tablet_transition_stage::repair, tablet_transition_kind::repair, tablet_info.replicas, std::nullopt, }); return make_ready_future(); }); } } // Reflects the plan in a given token metadata as if the migrations were fully executed. static future<> apply_plan(token_metadata& tm, const migration_plan& plan, service::topology& topology, shared_load_stats* load_stats) { for (auto&& mig : plan.migrations()) { co_await tm.tablets().mutate_tablet_map_async(mig.tablet.table, [&] (tablet_map& tmap) { if (load_stats) { global_tablet_id gid {mig.tablet.table, mig.tablet.tablet}; dht::token_range trange {tmap.get_token_range(mig.tablet.tablet)}; auto new_stats = load_stats->stats.migrate_tablet_size(mig.src.host, mig.dst.host, gid, trange); if (new_stats) { load_stats->stats = std::move(*new_stats); } } auto tinfo = tmap.get_tablet_info(mig.tablet.tablet); testlog.trace("Replacing tablet {} replica from {} to {}", mig.tablet.tablet, mig.src, mig.dst); tinfo.replicas = replace_replica(tinfo.replicas, mig.src, mig.dst); tmap.set_tablet(mig.tablet.tablet, tinfo); return make_ready_future(); }); } co_await apply_resize_plan(tm, plan); if (auto request_id = plan.rack_list_colocation_plan().request_to_resume(); request_id) { topology.paused_rf_change_requests.erase(request_id); } co_await apply_repair_transitions(tm, plan); } // Reflects the plan in a given token metadata as if the migrations were started but not yet executed. static future<> apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) { for (auto&& mig : plan.migrations()) { co_await tm.tablets().mutate_tablet_map_async(mig.tablet.table, [&] (tablet_map& tmap) { auto tinfo = tmap.get_tablet_info(mig.tablet.tablet); tmap.set_tablet_transition_info(mig.tablet.tablet, migration_to_transition_info(tinfo, mig)); return make_ready_future(); }); } co_await apply_resize_plan(tm, plan); } static size_t get_tablet_count(const tablet_metadata& tm) { size_t count = 0; for (const auto& [table, tmap] : tm.all_tables_ungrouped()) { count += std::accumulate(tmap->tablets().begin(), tmap->tablets().end(), size_t(0), [] (size_t accumulator, const locator::tablet_info& info) { return accumulator + info.replicas.size(); }); } return count; } static void check_tablet_invariants(const tablet_metadata& tmeta); static void do_rebalance_tablets(cql_test_env& e, group0_guard& guard, shared_load_stats* load_stats = nullptr, std::unordered_set skiplist = {}, std::function stop = nullptr, bool auto_split = false) { auto& talloc = e.get_tablet_allocator().local(); auto& stm = e.shared_token_metadata().local(); auto& sys_ks = e.get_system_keyspace().local(); auto& topology = e.get_topology_state_machine().local()._topology; // Sanity limit to avoid infinite loops. // The x10 factor is arbitrary, it's there to account for more complex schedules than direct migration. auto max_iterations = 1 + get_tablet_count(stm.get()->tablets()) * 10; for (size_t i = 0; i < max_iterations; ++i) { auto plan = talloc.balance_tablets(stm.get(), &topology, &sys_ks, load_stats ? load_stats->get() : nullptr, skiplist).get(); if (plan.empty()) { return; } if (stop && stop(plan)) { return; } stm.mutate_token_metadata([&] (token_metadata& tm) { return apply_plan(tm, plan, e.get_topology_state_machine().local()._topology, load_stats); }).get(); if (auto_split && load_stats) { bool reload = false; auto& tm = *stm.get(); for (const auto& [table, tmap]: tm.tablets().all_tables_ungrouped()) { if (std::holds_alternative(tmap->resize_decision().way)) { if (load_stats->stats.tables[table].split_ready_seq_number != tmap->resize_decision().sequence_number) { testlog.debug("set_split_ready_seq_number({}, {})", table, tmap->resize_decision().sequence_number); load_stats->set_split_ready_seq_number(table, tmap->resize_decision().sequence_number); reload = true; } } } // Need to order split-ack before split finalization, storage_group assumes that. if (reload) { guard = save_token_metadata(e, std::move(guard)).get(); } } handle_resize_finalize(e, guard, plan, load_stats).get(); } throw std::runtime_error("rebalance_tablets(): convergence not reached within limit"); } static void apply_resize_decisions(cql_test_env& e, shared_load_stats& stats) { testlog.debug("apply_resize_decisions(): start"); abort_source as; auto guard = e.get_raft_group0_client().start_operation(as).get(); auto& talloc = e.get_tablet_allocator().local(); auto& stm = e.shared_token_metadata().local(); auto plan = talloc.balance_tablets(stm.get(), &e.get_topology_state_machine().local()._topology, &e.get_system_keyspace().local(), stats.get()).get(); stm.mutate_token_metadata([&] (token_metadata& tm) { return apply_resize_plan(tm, plan); }).get(); // We should not introduce inconsistency between on-disk state and in-memory state // as that may violate invariants and cause failures in later operations // causing test flakiness. save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); e.get_storage_service().local().update_tablet_metadata({}).get(); testlog.debug("apply_resize_decisions(): done"); } // Invokes the tablet scheduler and executes its plan, continuously until it emits an empty plan. // Simulates topology coordinator but doesn't perform actual migration, // only reflects it in the metadata. // Run in a seastar thread. void rebalance_tablets(cql_test_env& e, shared_load_stats* load_stats = nullptr, std::unordered_set skiplist = {}, std::function stop = nullptr, bool auto_split = true) { abort_source as; testlog.debug("rebalance_tablets(): start"); auto guard = e.get_raft_group0_client().start_operation(as).get(); testlog.debug("rebalance_tablets(): took group0 guard"); shared_load_stats local_stats; if (!load_stats) { // Provide default capacity for each node. e.shared_token_metadata().local().get()->get_topology().for_each_node([&] (const auto& node) { local_stats.set_capacity(node.host_id(), default_target_tablet_size * node.get_shard_count()); }); load_stats = &local_stats; } do_rebalance_tablets(e, guard, load_stats, std::move(skiplist), std::move(stop), auto_split); testlog.debug("rebalance_tablets(): rebalanced"); // We should not introduce inconsistency between on-disk state and in-memory state // as that may violate invariants and cause failures in later operations // causing test flakiness. auto& stm = e.shared_token_metadata().local(); save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); e.get_storage_service().local().update_tablet_metadata({}).get(); testlog.debug("rebalance_tablets(): done"); } static void rebalance_tablets_as_in_progress(cql_test_env& env, shared_load_stats& stats, std::function stop = nullptr) { auto& stm = env.local_db().get_shared_token_metadata(); auto& talloc = env.get_tablet_allocator().local(); auto& topology = env.get_topology_state_machine().local()._topology; auto& sys_ks = env.get_system_keyspace().local(); while (true) { auto plan = talloc.balance_tablets(stm.get(), &topology, &sys_ks, stats.get()).get(); if (plan.empty() || (stop && stop(plan))) { break; } stm.mutate_token_metadata([&] (token_metadata& tm) { return apply_plan_as_in_progress(tm, plan); }).get(); } } // Completes any in progress tablet migrations. static void execute_transitions(shared_token_metadata& stm) { stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { for (auto&& [table, tables] : tm.tablets().all_table_groups()) { co_await tm.tablets().mutate_tablet_map_async(table, [&] (tablet_map& tmap) { for (auto&& [tablet, trinfo]: tmap.transitions()) { auto ti = tmap.get_tablet_info(tablet); ti.replicas = trinfo.next; tmap.set_tablet(tablet, ti); } tmap.clear_transitions(); return make_ready_future(); }); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { do_with_cql_env_thread([] (auto& e) { // Tests the scenario of bootstrapping a single node // Verifies that load balancer sees it and moves tablets to that node. topology_builder topo(e); unsigned shard_count = 2; auto host1 = topo.add_node(node_state::normal, shard_count); auto host2 = topo.add_node(node_state::normal, shard_count); auto host3 = topo.add_node(node_state::normal, shard_count); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 1}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 1}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 1}, tablet_replica {host2, 0}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); // Sanity check { load_sketch load(stm.get()); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_load(host1), 4); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host1), 2); BOOST_REQUIRE_EQUAL(load.get_load(host2), 4); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host2), 2); BOOST_REQUIRE_EQUAL(load.get_load(host3), 0); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host3), 0); } shared_load_stats& load_stats = topo.get_shared_load_stats(); load_stats.set_default_tablet_sizes(stm.get()); rebalance_tablets(e, &load_stats); { load_sketch load(stm.get()); load.populate().get(); for (auto h : {host1, host2, host3}) { testlog.debug("Checking host {}", h); BOOST_REQUIRE_LE(load.get_load(h), 3); BOOST_REQUIRE_GT(load.get_load(h), 1); BOOST_REQUIRE_LE(load.get_avg_tablet_count(h), 2); BOOST_REQUIRE_GT(load.get_avg_tablet_count(h), 0); } } }).get(); } SEASTAR_THREAD_TEST_CASE(test_no_conflicting_migrations_in_the_plan) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); unsigned shard_count = 1; auto dc1 = topo.dc(); [[maybe_unused]] auto host1 = topo.add_node(node_state::normal, shard_count); [[maybe_unused]] auto host2 = topo.add_node(node_state::normal, shard_count); topo.start_new_rack(); [[maybe_unused]] auto host3 = topo.add_node(node_state::normal, shard_count); [[maybe_unused]] auto host4 = topo.add_node(node_state::normal, shard_count); auto dc2 = topo.start_new_dc().dc; [[maybe_unused]] auto host5 = topo.add_node(node_state::normal, shard_count); [[maybe_unused]] auto host6 = topo.add_node(node_state::normal, shard_count); auto ks_name = add_keyspace(e, {{dc1, 2}, {dc2, 1}}, 2); auto table1 = add_table(e, ks_name).get(); // Create imbalance in dc1::rack1, dc1::rack2, and dc2::rack1 mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(2); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica{host1, 0}, tablet_replica{host3, 0}, tablet_replica{host5, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica{host1, 0}, tablet_replica{host3, 0}, tablet_replica{host5, 0}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); auto& talloc = e.get_tablet_allocator().local(); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); talloc.set_load_stats(topo.get_load_stats()); migration_plan plan = talloc.balance_tablets(stm.get(), nullptr, nullptr).get(); BOOST_REQUIRE(!plan.empty()); std::set tablets; for (auto&& mig : plan.migrations()) { BOOST_REQUIRE(!tablets.contains(mig.tablet)); tablets.insert(mig.tablet); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_no_conflicting_internode_and_intra_merge_colocation) { // 1. The cluster has two racks, RackA and RackB, and the plan is per-rack. // 2. Two sibling tablets, T1 and T2, are marked for merge. // 3. In RackA, the replicas of T1 and T2 are co-located on an overloaded node, // making them a candidate for inter-node migration to achieve load balancing. // 4. In RackB, the replicas of T1 and T2 are on the same node but on different // shards, making them a candidate for intra-node migration to fix merge co-location. // // Verify that the load balancer's plan does not include conflicting migrations. // If the tablets T1 and T2 were chosen to be migrated between node in RackA, the merge // co-location plan should not generate migrations in RackB for the same tablets. cql_test_config cfg{}; cfg.db_config->rf_rack_valid_keyspaces.set(true); do_with_cql_env_thread([] (auto& e) { logging::logger_registry().set_logger_level("load_balancer", logging::log_level::trace); topology_builder topo(e); // RackA: NodeA (overloaded), NodeB (underloaded) // RackB: NodeC (balanced, with intra-node misalignment for co-location) auto rackA = topo.rack(); auto hostA = topo.add_node(node_state::normal, 2, rackA); auto hostB = topo.add_node(node_state::normal, 2, rackA); auto rackB = topo.start_new_rack(); auto hostC = topo.add_node(node_state::normal, 2, rackB); // Create a table with 2 tablets that will be marked for merge. auto ks_name = add_keyspace_racks(e, {{topo.dc(), {rackA.rack, rackB.rack}}}, 2); auto table1 = add_table(e, ks_name).get(); // Add more tables to create a clear load imbalance that can be resolved. auto table_for_load_1 = add_table(e, ks_name).get(); auto table_for_load_2 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap_merge(2); auto t1 = tmap_merge.first_tablet(); auto t2 = *tmap_merge.next_tablet(t1); tmap_merge.set_tablet(t1, tablet_info { tablet_replica_set { tablet_replica{hostA, 0}, // RackA tablet_replica{hostC, 0}, // RackB } }); tmap_merge.set_tablet(t2, tablet_info { tablet_replica_set { tablet_replica{hostA, 0}, // RackA tablet_replica{hostC, 1}, // RackB } }); tmeta.set_tablet_map(table1, std::move(tmap_merge)); // Add more tablets to hostA to make it clearly overloaded. // Total load on hostA will be 4, hostB is 0. Avg is 2. // Moving the {t1,t2} set (load 2) from A->B makes loads {2, 2}, which is balanced. tablet_map tmap_load(1); tmap_load.set_tablet(tmap_load.first_tablet(), tablet_info{tablet_replica_set{tablet_replica{hostA, 0}}}); tablet_map tmap_load_clone = co_await tmap_load.clone_gently(); tmeta.set_tablet_map(table_for_load_1, std::move(tmap_load)); tmeta.set_tablet_map(table_for_load_2, std::move(tmap_load_clone)); co_return; }); // Mark the tablets for merge to create a co-location plan. mutate_tablets(e, [&] (tablet_metadata& tmeta) { return tmeta.mutate_tablet_map_async(table1, [] (tablet_map& tmap) { locator::resize_decision decision; decision.way = locator::resize_decision::merge{}; decision.sequence_number = tmap.resize_decision().sequence_number + 1; tmap.set_resize_decision(std::move(decision)); return make_ready_future<>(); }); }); auto& stm = e.shared_token_metadata().local(); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); auto& talloc = e.get_tablet_allocator().local(); talloc.set_load_stats(topo.get_load_stats()); migration_plan plan = talloc.balance_tablets(stm.get(), nullptr, nullptr).get(); // The plan should contain non-conflicting migrations. BOOST_REQUIRE(!plan.empty()); std::set tablets; for (auto&& mig : plan.migrations()) { BOOST_REQUIRE(!tablets.contains(mig.tablet)); tablets.insert(mig.tablet); } }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_rack_list_conversion) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); unsigned shard_count = 1; auto dc1 = topo.dc(); auto rack1 = topo.rack(); [[maybe_unused]] auto host1 = topo.add_node(node_state::normal, shard_count); [[maybe_unused]] auto host2 = topo.add_node(node_state::normal, shard_count); auto rack2 = topo.start_new_rack(); [[maybe_unused]] auto host3 = topo.add_node(node_state::normal, shard_count); [[maybe_unused]] auto host4 = topo.add_node(node_state::normal, shard_count); auto rack3 = topo.start_new_rack(); [[maybe_unused]] auto host5 = topo.add_node(node_state::normal, shard_count); [[maybe_unused]] auto host6 = topo.add_node(node_state::normal, shard_count); auto dc2 = topo.start_new_dc().dc; [[maybe_unused]] auto host7 = topo.add_node(node_state::normal, shard_count); [[maybe_unused]] auto host8 = topo.add_node(node_state::normal, shard_count); auto ks_name = add_keyspace(e, {{dc1, 2}}, 4); auto table1 = add_table(e, ks_name).get(); // rack1: host1: A D host2: C // rack2: host3: A host4: B // rack3: host5: C host6: B D tablet_id A{0}, B{0}; mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); A = tid; tmap.set_tablet(tid, tablet_info { // A tablet_replica_set { tablet_replica{host1, 0}, tablet_replica{host3, 0}, } }); tid = *tmap.next_tablet(tid); B = tid; tmap.set_tablet(tid, tablet_info { // B tablet_replica_set { tablet_replica{host4, 0}, tablet_replica{host6, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { // C tablet_replica_set { tablet_replica{host2, 0}, tablet_replica{host5, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { // D tablet_replica_set { tablet_replica{host1, 0}, tablet_replica{host6, 0}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto id = utils::UUID_gen::get_time_UUID(); // Build the map literal for CQL auto rf_change_data_cql = format("{{'replication:class': 'NetworkTopologyStrategy', 'replication:{}:0': '{}', 'replication:{}:1': '{}'}}", dc1, rack1.rack, dc1, rack3.rack); e.execute_cql(format("INSERT INTO system.topology_requests (id, request_type, done, new_keyspace_rf_change_ks_name, new_keyspace_rf_change_data) VALUES ({}, 'keyspace_rf_change', False, '{}', {})", id, ks_name, rf_change_data_cql)).get(); auto& stm = e.shared_token_metadata().local(); auto& talloc = e.get_tablet_allocator().local(); talloc.set_load_stats(topo.get_load_stats()); auto& sys_ks = e.get_system_keyspace().local(); auto& topology = e.get_topology_state_machine().local()._topology; topology.paused_rf_change_requests.insert(id); migration_plan plan = talloc.balance_tablets(stm.get(), &topology, &sys_ks).get(); BOOST_REQUIRE(!plan.empty()); // A : host3 -> host5 / host6 // B : host4 -> host1 / host2 for (auto& mig : plan.migrations()) { testlog.info("Rack list colocation migration: {}", mig); BOOST_REQUIRE(mig.kind == locator::tablet_transition_kind::migration); BOOST_REQUIRE(mig.src.host == host3 || mig.src.host == host4); if (mig.src.host == host3) { BOOST_REQUIRE(mig.tablet.tablet == A); BOOST_REQUIRE(mig.dst.host == host5 || mig.dst.host == host6); } else { BOOST_REQUIRE(mig.tablet.tablet == B); BOOST_REQUIRE(mig.dst.host == host1 || mig.dst.host == host2); } } }).get(); } SEASTAR_THREAD_TEST_CASE(test_colocation_skipped_on_excluded_nodes) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto host1 = topo.add_node(node_state::normal, 2, rack1); // host2 has 1 shard so that rack2 doesn't need co-location and if any, it will be on host1 auto host2 = topo.add_node(node_state::normal, 1, rack2); auto ks_name = add_keyspace_racks(e, {{topo.dc(), {rack1.rack, rack2.rack}}}, 8); auto table1 = add_table(e, ks_name).get(); topo.get_shared_load_stats().set_size(table1, 0); auto& stm = e.shared_token_metadata().local(); topo.add_node(node_state::normal, 1, rack1); // So that balancer doesn't exit early due to no candidate nodes. e.get_storage_service().local().mark_excluded({host1}).get(); topo.add_draining_request(host1); // trigger merge e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get(); apply_resize_decisions(e, topo.get_shared_load_stats()); // Sanity check, to verify that co-location was attempted. BOOST_REQUIRE(stm.get()->tablets().get_tablet_map(table1).needs_merge()); abort_source as; auto guard = e.get_raft_group0_client().start_operation(as).get(); rebalance_tablets_as_in_progress(e, topo.get_shared_load_stats(), [&] (const migration_plan& plan) { // Verify that only rebuilding migrations involve the excluded host. for (auto&& mig : plan.migrations()) { BOOST_REQUIRE_NE(mig.dst.host, host1); if (mig.src.host == host1) { BOOST_REQUIRE(mig.kind == tablet_transition_kind::rebuild_v2); } } return false; }); // Restore consistency between stm and system tables before releasing group0 guard. save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); }, tablet_cql_test_config()).get(); } SEASTAR_THREAD_TEST_CASE(test_no_intranode_migration_on_draining_node) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); // host which is decommissioned has more shards so that it has spare streaming capacity // to be used by potential intra-node migration. auto host1 = topo.add_node(node_state::normal, 5, rack1); auto host2 = topo.add_node(node_state::normal, 1, rack1); auto ks_name = add_keyspace_racks(e, {{topo.dc(), {rack1.rack}}}, 16); auto table1 = add_table(e, ks_name).get(); topo.get_shared_load_stats().set_size(table1, 0); topo.add_draining_request(host1); auto& stm = e.shared_token_metadata().local(); // trigger merge to exercise co-location e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get(); apply_resize_decisions(e, topo.get_shared_load_stats()); // Sanity check, to verify that co-location was attempted. BOOST_REQUIRE(stm.get()->tablets().get_tablet_map(table1).needs_merge()); abort_source as; auto guard = e.get_raft_group0_client().start_operation(as).get(); rebalance_tablets_as_in_progress(e, topo.get_shared_load_stats(), [&] (const migration_plan& plan) { // Verify no intra-node migrations on the draining host. for (auto&& mig : plan.migrations()) { if (mig.src.host == host1) { BOOST_REQUIRE_NE(mig.dst.host, host1); } } return false; }); // Restore consistency between stm and system tables before releasing group0 guard. save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); }, tablet_cql_test_config()).get(); } // Throws if tablets have more than 1 replica in a given rack. // Run in seastar thread. void check_no_rack_overload(const token_metadata& tm) { auto& topo = tm.get_topology(); for (const auto& [table, tmap_p] : tm.tablets().all_tables_ungrouped()) { const tablet_map& tmap = *tmap_p; tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) { std::unordered_map> racks_by_dc; auto replicas = tinfo.replicas; for (auto& r : tinfo.replicas) { auto& rack = topo.get_rack(r.host); auto& racks = racks_by_dc[topo.get_datacenter(r.host)]; if (racks.contains(rack)) { throw std::runtime_error("rack overloaded"); } racks.insert(rack); } return make_ready_future<>(); }).get(); } } // Verifies that all tablets in the tablet_map are replicated to a given set of racks // and not placed on any of the bad_nodes. void check_rack_list(const locator::topology& topo, const tablet_map& tmap, sstring dc, rack_list racks, std::set bad_nodes = {}) { std::sort(racks.begin(), racks.end()); tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) { std::unordered_map> racks_by_dc; auto replicas = tinfo.replicas; rack_list actual_racks; for (auto& r : tinfo.replicas) { if (bad_nodes.contains(r.host)) { throw std::runtime_error(fmt::format("Bad node {} found in tablet {}", r.host, tid)); } if (topo.get_datacenter(r.host) == dc) { actual_racks.push_back(topo.get_rack(r.host)); } } std::sort(actual_racks.begin(), actual_racks.end()); if (actual_racks != racks) { throw std::runtime_error(fmt::format("Bad racks for tablet {}: expected {}, got {}", tid, racks, actual_racks)); } return make_ready_future<>(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_rack_list_conversion_with_two_replicas_in_rack) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); unsigned shard_count = 1; auto dc1 = topo.dc(); auto rack1 = topo.rack(); [[maybe_unused]] auto host1 = topo.add_node(node_state::normal, shard_count); [[maybe_unused]] auto host2 = topo.add_node(node_state::normal, shard_count); auto rack2 = topo.start_new_rack(); [[maybe_unused]] auto host3 = topo.add_node(node_state::normal, shard_count); [[maybe_unused]] auto host4 = topo.add_node(node_state::normal, shard_count); auto rack3 = topo.start_new_rack(); [[maybe_unused]] auto host5 = topo.add_node(node_state::normal, shard_count); [[maybe_unused]] auto host6 = topo.add_node(node_state::normal, shard_count); auto ks_name = add_keyspace(e, {{dc1, 2}}, 2); auto table1 = add_table(e, ks_name).get(); tablet_id A{0}, B{0}; mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(2); auto tid = tmap.first_tablet(); A = tid; tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica{host1, 0}, tablet_replica{host2, 0}, } }); tid = *tmap.next_tablet(tid); B = tid; tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica{host5, 0}, tablet_replica{host6, 0}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto id = utils::UUID_gen::get_time_UUID(); // Build the map literal for CQL auto rf_change_data_cql = format("{{'replication:class': 'NetworkTopologyStrategy', 'replication:{}:0': '{}', 'replication:{}:1': '{}'}}", dc1, rack1.rack, dc1, rack2.rack); e.execute_cql(format("INSERT INTO system.topology_requests (id, request_type, done, new_keyspace_rf_change_ks_name, new_keyspace_rf_change_data) VALUES ({}, 'keyspace_rf_change', False, '{}', {})", id, ks_name, rf_change_data_cql)).get(); auto& stm = e.shared_token_metadata().local(); auto& topology = e.get_topology_state_machine().local()._topology; topology.paused_rf_change_requests.insert(id); rebalance_tablets(e); check_rack_list(stm.get()->get_topology(), stm.get()->tablets().get_tablet_map(table1), dc1, {rack1.rack, rack2.rack}); }).get(); } struct alter_result { tablet_map new_tablet_map; replication_strategy_config_options opts; }; // Invokes tablet reallocation which is done on ALTER KEYSPACE. static alter_result alter_replication(cql_test_env& e, const sstring& ks_name, table_id table, replication_strategy_config_options alter_options) { auto& stm = e.shared_token_metadata().local(); auto tmptr = stm.get(); auto& old_tablets = tmptr->tablets().get_tablet_map(table); auto& ks = e.local_db().find_keyspace(ks_name); auto& rs = ks.get_replication_strategy(); alter_options["class"] = sstring("NetworkTopologyStrategy"); cql3::statements::ks_prop_defs new_ks_props; new_ks_props.add_property("replication", alter_options); new_ks_props.validate(); BOOST_REQUIRE(new_ks_props.get_replication_strategy_class().has_value()); auto ks_md = new_ks_props.as_ks_metadata_update(ks.metadata(), *tmptr, e.local_db().features(), e.local_db().get_config()); auto new_options = ks_md->strategy_options(); testlog.info("Altering {} from {} using {} to {}", ks_name, rs.get_config_options(), alter_options, new_options); locator::replication_strategy_params params{new_options, old_tablets.tablet_count(), std::nullopt}; auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy( "NetworkTopologyStrategy", params, tmptr->get_topology()); auto s = e.local_db().find_schema(table); auto new_tablet_map = new_strategy->maybe_as_tablet_aware()->reallocate_tablets(s, tmptr, old_tablets.clone_gently().get()).get(); return alter_result{std::move(new_tablet_map), std::move(new_options)}; } SEASTAR_THREAD_TEST_CASE(test_replica_allocation_with_rack_list_rf) { cql_test_config cfg{}; cfg.db_config->rf_rack_valid_keyspaces.set(true); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); std::set bad_nodes; // No replicas should be allocated there // dc1 auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto rack3 = topo.start_new_rack(); // dc2 auto rack4 = topo.start_new_dc(); auto rack5 = topo.start_new_rack(); auto dc1 = rack1.dc; auto dc2 = rack4.dc; // dc1 topo.add_node(node_state::normal, 1, rack1); topo.add_node(node_state::normal, 1, rack2); topo.add_node(node_state::normal, 1, rack3); topo.add_node(node_state::normal, 1, rack3); bad_nodes.insert(topo.add_node(node_state::left, 1, rack3)); // dc2 topo.add_node(node_state::normal, 1, rack4); bad_nodes.insert(topo.add_node(node_state::decommissioning, 1, rack4)); topo.add_node(node_state::normal, 1, rack5); auto test_alter = [&] (rack_list dc1_racks, rack_list dc2_racks, replication_strategy_config_options alter_opts, std::unordered_map expected_rf) { auto ks1 = add_keyspace_racks(e, {{dc1, dc1_racks}, {dc2, dc2_racks}}); auto table1 = add_table(e, ks1).get(); rebalance_tablets(e); auto& stm = e.shared_token_metadata().local(); auto tmptr = stm.get(); auto& tm_topo = tmptr->get_topology(); check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table1), dc1, dc1_racks, bad_nodes); check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table1), dc2, dc2_racks, bad_nodes); auto [new_tablet_map, new_opts] = alter_replication(e, ks1, table1, alter_opts); for (auto&& [dc, rf] : expected_rf) { check_rack_list(tm_topo, new_tablet_map, dc, rf, bad_nodes); } }; // dc1: 0 -> [rack1, rack2] { auto ks1 = add_keyspace(e, {{dc2, 1}}); auto table1 = add_table(e, ks1).get(); rebalance_tablets(e); auto& stm = e.shared_token_metadata().local(); auto tmptr = stm.get(); auto& tm_topo = tmptr->get_topology(); check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table1), dc1, rack_list{}, bad_nodes); auto dc1_new_racks = rack_list{rack1.rack, rack2.rack}; replication_strategy_config_options alter_opts; alter_opts[dc1] = dc1_new_racks; alter_opts[dc2] = "1"; auto [new_tablet_map, new_opts] = alter_replication(e, ks1, table1, alter_opts); check_rack_list(tm_topo, new_tablet_map, dc1, dc1_new_racks, bad_nodes); } // dc1: [rack1] -> 0 { auto ks1 = add_keyspace_racks(e, {{dc1, {rack1.rack}}}); auto table1 = add_table(e, ks1).get(); rebalance_tablets(e); auto& stm = e.shared_token_metadata().local(); auto tmptr = stm.get(); auto& tm_topo = tmptr->get_topology(); check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table1), dc1, rack_list{rack1.rack}, bad_nodes); replication_strategy_config_options alter_opts; alter_opts[dc1] = sstring("0"); auto [new_tablet_map, new_opts] = alter_replication(e, ks1, table1, alter_opts); check_rack_list(tm_topo, new_tablet_map, dc1, rack_list{}, bad_nodes); } test_alter({rack1.rack, rack2.rack}, {}, {{dc1, rack_list{rack1.rack, rack2.rack, rack3.rack}}}, {{dc1, rack_list{rack1.rack, rack2.rack, rack3.rack}}, {dc2, rack_list{}}}); test_alter({rack1.rack, rack2.rack, rack3.rack}, {}, {{dc1, rack_list{rack1.rack, rack3.rack}}}, {{dc1, rack_list{rack1.rack, rack3.rack}}, {dc2, rack_list{}}}); BOOST_REQUIRE_THROW(test_alter({rack1.rack}, {rack4.rack}, {{dc2, rack_list{}}}, {{dc1, rack_list{rack1.rack}}, {dc2, rack_list{}}}), exceptions::configuration_exception); test_alter({rack1.rack, rack2.rack}, {rack4.rack}, {{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack}}}, {{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack}}}); test_alter({rack2.rack}, {rack4.rack}, {{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack}}}, {{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack}}}); BOOST_REQUIRE_THROW(test_alter({rack2.rack}, {rack4.rack}, {{dc1, rack_list{rack2.rack}}}, {{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack}}}), exceptions::configuration_exception); test_alter({rack2.rack}, {rack4.rack}, {{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack, rack5.rack}}}, {{dc1, rack_list{rack2.rack}}, {dc2, rack_list{rack4.rack, rack5.rack}}}); test_alter({rack1.rack, rack2.rack, rack3.rack}, {}, {{dc1, rack_list{rack1.rack, rack2.rack, rack3.rack}}, {dc2, rack_list{rack4.rack}}}, {{dc1, rack_list{rack1.rack, rack2.rack, rack3.rack}}, {dc2, rack_list{rack4.rack}}}); BOOST_REQUIRE_THROW(test_alter({rack1.rack, rack2.rack, rack3.rack}, {}, {{dc2, rack_list{rack4.rack}}}, {{dc1, rack_list{rack1.rack, rack2.rack, rack3.rack}}, {dc2, rack_list{rack4.rack}}}), exceptions::configuration_exception); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_per_shard_count_respected_with_rack_list) { cql_test_config cfg{}; cfg.db_config->tablets_initial_scale_factor.set(10); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); std::set bad_nodes; // No replicas should be allocated there auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto rack3 = topo.start_new_rack(); auto dc = topo.dc(); auto host1 = topo.add_node(node_state::normal, 1, rack1); topo.add_node(node_state::normal, 1, rack2); topo.add_node(node_state::normal, 1, rack3); topo.add_node(node_state::normal, 1, rack3); auto ks_name = add_keyspace_racks(e, {{dc, {rack1.rack}}}); auto table = add_table(e, ks_name).get(); rebalance_tablets(e); auto& stm = e.shared_token_metadata().local(); auto tmptr = stm.get(); auto& tm_topo = tmptr->get_topology(); // Check that we respect the 10 tablets/shard goal when using a subset of racks. { load_sketch load(tmptr); load.populate_dc(dc).get(); auto l = load.get_shard_minmax(host1); BOOST_REQUIRE_EQUAL(l.min(), 16); BOOST_REQUIRE_EQUAL(l.max(), 16); } check_rack_list(tm_topo, tmptr->tablets().get_tablet_map(table), dc, rack_list{rack1.rack}, bad_nodes); }, cfg).get(); } // Reproduces https://github.com/scylladb/scylladb/issues/26768 SEASTAR_THREAD_TEST_CASE(test_replacing_last_node_in_rack_with_rack_list_rf) { cql_test_config cfg{}; cfg.db_config->tablets_initial_scale_factor.set(10); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto dc = topo.dc(); auto host1 = topo.add_node(node_state::normal, 1, rack1); auto host2 = topo.add_node(node_state::normal, 1, rack2); auto ks_name = add_keyspace_racks(e, {{dc, {rack1.rack, rack2.rack}}}); auto table = add_table(e, ks_name).get(); topo.set_node_state(host2, node_state::left); rebalance_tablets(e); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_shrinks_respecting_rack_allocation) { cql_test_config cfg{}; cfg.db_config->tablets_per_shard_goal.set(10); cfg.db_config->tablets_initial_scale_factor.set(8); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); std::set bad_nodes; // No replicas should be allocated there auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto rack3 = topo.start_new_rack(); auto dc = topo.dc(); auto host1 = topo.add_node(node_state::normal, 1, rack1); auto host2 = topo.add_node(node_state::normal, 1, rack2); auto host3 = topo.add_node(node_state::normal, 1, rack3); auto& stats = topo.get_shared_load_stats(); auto ks1 = add_keyspace_racks(e, {{dc, {rack1.rack}}}); // We start with 8 tablets per table. per_shard_goal / 5 = 2 // Should shrink to 2 tablets per table. auto t1_1 = add_table(e, ks1).get(); auto t1_2 = add_table(e, ks1).get(); auto t1_3 = add_table(e, ks1).get(); auto t1_4 = add_table(e, ks1).get(); auto t1_5 = add_table(e, ks1).get(); // This table doesn't violate the per shard goal in this rack, should not be shrunk. auto ks2 = add_keyspace_racks(e, {{dc, {rack2.rack}}}); auto t2_1 = add_table(e, ks2).get(); // Those tables violate the goal, but due to rounding up, the count won't change. auto ks3 = add_keyspace_racks(e, {{dc, {rack3.rack}}}); auto t3_1 = add_table(e, ks3).get(); auto t3_2 = add_table(e, ks3).get(); stats.set_size(t1_1, 0); stats.set_size(t1_2, 0); stats.set_size(t1_3, 0); stats.set_size(t1_4, 0); stats.set_size(t1_5, 0); stats.set_size(t2_1, 0); stats.set_size(t3_1, 0); stats.set_size(t3_2, 0); rebalance_tablets(e, &stats); auto& stm = e.shared_token_metadata().local(); auto tmptr = stm.get(); auto& tm_topo = tmptr->get_topology(); auto& tmeta = stm.get()->tablets(); BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_1).tablet_count()); BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_2).tablet_count()); BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_3).tablet_count()); BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_4).tablet_count()); BOOST_REQUIRE_EQUAL(2, tmeta.get_tablet_map(t1_5).tablet_count()); BOOST_REQUIRE_EQUAL(8, tmeta.get_tablet_map(t2_1).tablet_count()); BOOST_REQUIRE_EQUAL(8, tmeta.get_tablet_map(t3_1).tablet_count()); BOOST_REQUIRE_EQUAL(8, tmeta.get_tablet_map(t3_2).tablet_count()); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_merge_does_not_overload_racks) { cql_test_config cfg{}; // This test relies on the fact that we use an RF strictly smaller than the number of racks. // Because of that, we cannot enable `rf_rack_valid_keyspaces` in this test because we won't // be able to create a keyspace. cfg.db_config->rf_rack_valid_keyspaces.set(false); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto rack3 = topo.start_new_rack(); auto host1 = topo.add_node(node_state::normal, 1, rack3); auto host2 = topo.add_node(node_state::normal, 1, rack2); auto host3 = topo.add_node(node_state::normal, 1, rack1); auto host4 = topo.add_node(node_state::normal, 1, rack3); auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 2); // RF=2 auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(2); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica{host1, 0}, tablet_replica{host3, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica{host2, 0}, tablet_replica{host4, 0}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); // Trigger merge e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get(); auto& stm = e.shared_token_metadata().local(); topo.get_shared_load_stats().set_size(table1, 0); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); rebalance_tablets(e, &topo.get_shared_load_stats(), {}, [&] (const migration_plan& plan) { check_no_rack_overload(*stm.get()); return false; }); BOOST_REQUIRE_EQUAL(1, stm.get()->tablets().get_tablet_map(table1).tablet_count()); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_load_sketch_with_load_stats_uses_tablet_sizes) { auto cfg = tablet_cql_test_config(); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto& stm = e.shared_token_metadata().local(); const uint64_t tablet_count = 16; const size_t shard_count = 2; auto host = topo.add_node(node_state::normal, shard_count, topo.rack()); auto ks = add_keyspace(e, {{topo.dc(), 1}}, tablet_count); auto table = add_table(e, ks).get(); auto& shared_stats = topo.get_shared_load_stats(); shared_stats.stats.tablet_stats[host].effective_capacity = service::default_target_tablet_size * shard_count; auto& tmap = stm.get()->tablets().get_tablet_map(table); // Set load on shard 0 to 0 (all tablet sizes are 0), // and set all tablet sizes on shard 1 to default_target_tablet_size tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) { dht::token_range trange { tmap.get_token_range(tid) }; size_t tablet_size = 0; if (tinfo.replicas[0].shard == 1) { tablet_size = service::default_target_tablet_size; } shared_stats.stats.tablet_stats[host].tablet_sizes[table][trange] = tablet_size; return make_ready_future<>(); }).get(); load_sketch load(stm.get(), shared_stats.get()); load.populate().get(); // Add new tablets to shard 0 until the load is equal for (size_t cnt = 0; cnt < tablet_count / shard_count; cnt++) { BOOST_REQUIRE_EQUAL(load.get_least_loaded_shard(host), 0); BOOST_REQUIRE_EQUAL(load.get_most_loaded_shard(host), 1); auto shard_minmax = load.get_shard_minmax(host); BOOST_REQUIRE_EQUAL(shard_minmax.min(), cnt); BOOST_REQUIRE_EQUAL(shard_minmax.max(), tablet_count / 2); // Add a tablet of size default_target_tablet_size to the least loaded shard (shard 0) load.next_shard(host, 1, service::default_target_tablet_size); } // Check the load on both shards is equal auto shard_minmax = load.get_shard_minmax(host); BOOST_REQUIRE_EQUAL(shard_minmax.min(), shard_minmax.max()); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_load_sketch_uses_correct_disk_capacity) { auto cfg = tablet_cql_test_config(); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto& stm = e.shared_token_metadata().local(); locator::host_id local_host = e.shared_token_metadata().local().get()->get_my_id(); locator::host_id host1 = topo.add_node(node_state::normal, 1); load_stats stats; // Check that load_sketch throws when it didn't get the node capacity from load_stats { load_sketch load(stm.get(), make_lw_shared(stats)); load.populate().get(); BOOST_REQUIRE_THROW(load.get_load(local_host), std::runtime_error); } // Check that load_sketch falls back to the gross capacity when effective_capacity is not present { stats.capacity[local_host] = 10; load_sketch load(stm.get(), make_lw_shared(stats)); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_capacity(local_host), 10); } // Check that load_sketch uses effective_capacity when gross disk capacity is also present { stats.capacity[local_host] = 10; stats.tablet_stats[local_host].effective_capacity = 20; load_sketch load(stm.get(), make_lw_shared(stats)); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_capacity(local_host), 20); } // Check that load_sketch uses gross disk capacity on all nodes when force_capacity_based_load is set { stats.capacity[host1] = 5; stats.capacity[local_host] = 10; stats.tablet_stats[local_host].effective_capacity = 20; load_sketch load(stm.get(), make_lw_shared(stats)); load.set_force_capacity_based_load(true); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_capacity(local_host), 10); BOOST_REQUIRE_EQUAL(load.get_capacity(host1), 5); } }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_load_sketch_without_all_tablet_sizes_throws) { auto cfg = tablet_cql_test_config(); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto& stm = e.shared_token_metadata().local(); const uint64_t tablet_count = 16; const size_t shard_count = 2; auto host = topo.add_node(node_state::normal, shard_count, topo.rack()); auto ks = add_keyspace(e, {{topo.dc(), 1}}, tablet_count); auto table = add_table(e, ks).get(); auto& shared_stats = topo.get_shared_load_stats(); auto& tmap = stm.get()->tablets().get_tablet_map(table); // Set all tablet sizes except for tablet_id == 0 tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) { if (tid.id != 0) { dht::token_range trange { tmap.get_token_range(tid) }; shared_stats.stats.tablet_stats[host].tablet_sizes[table][trange] = service::default_target_tablet_size; } return make_ready_future<>(); }).get(); load_sketch load(stm.get(), shared_stats.get()); load.populate().get(); BOOST_REQUIRE_THROW(load.get_load(host), std::runtime_error); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_load_sketch_minimal_tablet_size) { auto cfg = tablet_cql_test_config(); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto& stm = e.shared_token_metadata().local(); const uint64_t tablet_count = 16; const size_t shard_count = 2; const uint64_t GB = 1024L * 1024L * 1024L; auto host = topo.add_node(node_state::normal, shard_count, topo.rack()); auto ks = add_keyspace(e, {{topo.dc(), 1}}, tablet_count); auto table = add_table(e, ks).get(); auto& shared_stats = topo.get_shared_load_stats(); shared_stats.stats.tablet_stats[host].effective_capacity = GB; auto& tmap = stm.get()->tablets().get_tablet_map(table); // Set all tablet sizes to 1 byte tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) { dht::token_range trange { tmap.get_token_range(tid) }; shared_stats.stats.tablet_stats[host].tablet_sizes[table][trange] = GB / 2; return make_ready_future<>(); }).get(); // Check that load_sketch computes correct load with reported tablet sizes { load_sketch load(stm.get(), shared_stats.get()); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_load(host), tablet_count / 2); } // Check that load_sketch uses minimal_tablet_sizes { load_sketch load(stm.get(), shared_stats.get()); load.set_minimal_tablet_size(GB); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_load(host), tablet_count); } }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) { do_with_cql_env_thread([] (auto& e) { // Tests the scenario of balacning cluster with DOWN node // Verifies that load balancer doesn't moves tablets to that node. unsigned shard_count = 2; topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, shard_count); auto host2 = topo.add_node(node_state::normal, shard_count); auto host3 = topo.add_node(node_state::normal, shard_count); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 1}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 1}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 1}, tablet_replica {host2, 0}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); return make_ready_future<>(); }); auto& stm = e.shared_token_metadata().local(); // Sanity check { load_sketch load(stm.get()); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_load(host1), 4); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host1), 2); BOOST_REQUIRE_EQUAL(load.get_load(host2), 4); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host2), 2); BOOST_REQUIRE_EQUAL(load.get_load(host3), 0); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host3), 0); } rebalance_tablets(e, &topo.get_shared_load_stats(), {host3}); { load_sketch load(stm.get()); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_load(host3), 0); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host3), 0); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_colocated_tablets) { do_with_cql_env_thread([] (auto& e) { // Tests that co-located tablets remain co-located during load balancing. // table1 and table2 are co-located // table3 and table4 are co-located // initially they all start with one tablet on the same host and shard. // load balancing is expected to move one pair of co-located tablets to the // other host while maintaining co-location of each pair. logging::logger_registry().set_logger_level("load_balancer", logging::log_level::trace); unsigned shard_count = 2; topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, shard_count); auto host2 = topo.add_node(node_state::normal, shard_count); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); auto table1 = add_table(e, ks_name).get(); auto table2 = add_table(e, ks_name).get(); auto table3 = add_table(e, ks_name).get(); auto table4 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, } }); tablet_map tmap1 = co_await tmap.clone_gently(); tmeta.set_tablet_map(table1, std::move(tmap1)); co_await tmeta.set_colocated_table(table2, table1); tablet_map tmap3 = co_await tmap.clone_gently(); tmeta.set_tablet_map(table3, std::move(tmap3)); co_await tmeta.set_colocated_table(table4, table3); }); auto& stm = e.shared_token_metadata().local(); // Sanity check { load_sketch load(stm.get()); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_load(host1), 4); BOOST_REQUIRE_EQUAL(load.get_load(host2), 0); } topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); rebalance_tablets(e, &topo.get_shared_load_stats()); { load_sketch load(stm.get()); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_load(host1), 2); BOOST_REQUIRE_EQUAL(load.get_load(host2), 2); auto& tmap1 = stm.get()->tablets().get_tablet_map(table1); auto& tmap2 = stm.get()->tablets().get_tablet_map(table2); auto& tmap3 = stm.get()->tablets().get_tablet_map(table3); auto& tmap4 = stm.get()->tablets().get_tablet_map(table4); BOOST_REQUIRE_EQUAL(tmap1.get_tablet_info(tmap1.first_tablet()).replicas, tmap2.get_tablet_info(tmap2.first_tablet()).replicas); BOOST_REQUIRE_EQUAL(tmap3.get_tablet_info(tmap3.first_tablet()).replicas, tmap4.get_tablet_info(tmap4.first_tablet()).replicas); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication factor of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { unsigned shard_count = 2; topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, shard_count); auto host2 = topo.add_node(node_state::normal, shard_count); auto host3 = topo.add_node(node_state::decommissioning, shard_count); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 1}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 1}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host3, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host2, 1}, tablet_replica {host3, 1}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); shared_load_stats& load_stats = topo.get_shared_load_stats(); load_stats.set_default_tablet_sizes(stm.get()); rebalance_tablets(e, &load_stats); { load_sketch load(stm.get(), load_stats.get()); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host1), 2); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host2), 2); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host3), 0); } topo.set_node_state(host3, node_state::left); rebalance_tablets(e, &load_stats); { load_sketch load(stm.get(), load_stats.get()); load.populate().get(); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host1), 2); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host2), 2); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host3), 0); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_table_creation_during_decommission) { // Verifies that new table doesn't get tablets allocated on a node being decommissioned // which may leave them on replicas absent in topology post decommission. // Also verifies that the allocated tablet count doesn't take into account nodes being decommissioned // to achieve the desired tablet count per shard in a DC. auto cfg = tablet_cql_test_config(); cfg.db_config->tablets_initial_scale_factor(1); do_with_cql_env_thread([](auto& e) { topology_builder topo(e); topo.add_node(node_state::normal); topo.add_node(node_state::normal); auto host3 = topo.add_node(node_state::decommissioning); auto host4 = topo.add_node(node_state::left); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}); auto table1 = add_table(e, ks_name).get(); auto s = e.local_db().find_schema(table1); auto& stm = e.shared_token_metadata().local(); auto& tmap = stm.get()->tablets().get_tablet_map(table1); // Verify we do not treat leaving nodes as having capacity. BOOST_REQUIRE_EQUAL(tmap.tablet_count(), 2); tmap.for_each_tablet([&](auto tid, auto& tinfo) { for (auto& replica : tinfo.replicas) { BOOST_REQUIRE_NE(replica.host, host3); BOOST_REQUIRE_NE(replica.host, host4); } return make_ready_future<>(); }).get(); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_table_creation_during_rack_decommission) { // Reproduces #22625 // The problematic scenario happens when allocating tablets for a new table // when there is a rack with only non-normal nodes. do_with_cql_env_thread([](auto& e) { topology_builder topo(e); topo.add_node(); topo.add_node(); topo.start_new_rack(); auto host3 = topo.add_node(node_state::decommissioning); auto host4 = topo.add_node(node_state::left); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 8); auto table1 = add_table(e, ks_name).get(); rebalance_tablets(e); auto& stm = e.shared_token_metadata().local(); auto& tmap = stm.get()->tablets().get_tablet_map(table1); tmap.for_each_tablet([&](auto tid, auto& tinfo) { for (auto& replica : tinfo.replicas) { BOOST_REQUIRE_NE(replica.host, host3); BOOST_REQUIRE_NE(replica.host, host4); } return make_ready_future<>(); }).get(); }, tablet_cql_test_config()).get(); } SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication constraints of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { std::vector racks; topology_builder topo(e); racks.push_back(topo.rack()); auto host1 = topo.add_node(node_state::normal); auto host3 = topo.add_node(node_state::normal); topo.start_new_rack(); racks.push_back(topo.rack()); auto host2 = topo.add_node(node_state::normal); auto host4 = topo.add_node(node_state::decommissioning); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host2, 0}, tablet_replica {host3, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host3, 0}, tablet_replica {host4, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 0}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); shared_load_stats& load_stats = topo.get_shared_load_stats(); load_stats.set_default_tablet_sizes(stm.get()); rebalance_tablets(e, &load_stats); { load_sketch load(stm.get(), load_stats.get()); load.populate().get(); BOOST_REQUIRE_GE(load.get_avg_tablet_count(host1), 2); BOOST_REQUIRE_GE(load.get_avg_tablet_count(host2), 2); BOOST_REQUIRE_GE(load.get_avg_tablet_count(host3), 2); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(host4), 0); } // Verify replicas are not collocated on racks { auto tm = stm.get(); auto& tmap = tm->tablets().get_tablet_map(table1); tmap.for_each_tablet([&](auto tid, auto& tinfo) -> future<> { auto rack1 = tm->get_topology().get_rack(tinfo.replicas[0].host); auto rack2 = tm->get_topology().get_rack(tinfo.replicas[1].host); BOOST_REQUIRE_NE(rack1, rack2); return make_ready_future<>(); }).get(); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that it is impossible to distribute replicas without violating rack uniqueness. do_with_cql_env_thread([](auto& e) { std::vector racks; topology_builder topo(e); racks.push_back(topo.rack()); auto host1 = topo.add_node(node_state::normal); auto host2 = topo.add_node(node_state::normal); auto host3 = topo.add_node(node_state::normal); auto rack2 = topo.start_new_rack(); racks.push_back(topo.rack()); auto host4 = topo.add_node(node_state::normal); auto ks_name = add_keyspace_racks(e, {{rack2.dc, {rack2.rack}}}, 4); auto table1 = add_table(e, ks_name).get(); topo.set_node_state(host4, node_state::decommissioning); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host4, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host2, 0}, tablet_replica {host4, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host3, 0}, tablet_replica {host4, 0}, } }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host4, 0}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); BOOST_REQUIRE_THROW(rebalance_tablets(e, &topo.get_shared_load_stats()), std::runtime_error); }).get(); } SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) { // Verifies that load balancer moves tablets out of the decommissioned node. // The scenario is such that replication factor of tablets can be satisfied after decommission. do_with_cql_env_thread([](auto& e) { topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, 2); auto host2 = topo.add_node(node_state::normal, 2); auto host3 = topo.add_node(node_state::decommissioning, 2); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 0}, tablet_replica {host3, 0}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); BOOST_REQUIRE_THROW(rebalance_tablets(e, &topo.get_shared_load_stats()), std::runtime_error); }).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) { do_with_cql_env_thread([] (auto& e) { // Tests the scenario of bootstrapping a single node. // Verifies that the load balancer balances tablets on that node // even though there is already an active migration. // The test verifies that the load balancer creates a plan // which when executed will achieve perfect balance, // which is a proof that it doesn't stop due to active migrations. topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, 2); topo.start_new_rack(); auto host2 = topo.add_node(node_state::normal, 1); auto host3 = topo.add_node(node_state::normal, 1); auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4); auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); std::optional tid = tmap.first_tablet(); for (int i = 0; i < 4; ++i) { tmap.set_tablet(*tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 0}, } }); tid = tmap.next_tablet(*tid); } tmap.set_tablet_transition_info(tmap.first_tablet(), tablet_transition_info { tablet_transition_stage::allow_write_both_read_old, tablet_transition_kind::migration, tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host3, 0}, }, tablet_replica {host3, 0} }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); abort_source as; auto guard = e.get_raft_group0_client().start_operation(as).get(); auto& stm = e.shared_token_metadata().local(); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); rebalance_tablets_as_in_progress(e, topo.get_shared_load_stats()); execute_transitions(stm); { load_sketch load(stm.get()); load.populate().get(); for (auto h : {host1, host2, host3}) { testlog.debug("Checking host {}", h); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(h), 2); } } // Restore consistency between stm and system tables before releasing group0 guard. save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); }).get(); } #ifdef SCYLLA_ENABLE_ERROR_INJECTION SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, 1); topo.start_new_rack(); auto host2 = topo.add_node(node_state::normal, 1); topo.add_node(node_state::normal, 2); auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4); auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(4); std::optional tid = tmap.first_tablet(); for (int i = 0; i < 4; ++i) { tmap.set_tablet(*tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, tablet_replica {host2, 0}, } }); tid = tmap.next_tablet(*tid); } tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); rebalance_tablets(e, &topo.get_shared_load_stats()); BOOST_REQUIRE(e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get().empty()); utils::get_local_injector().enable("tablet_allocator_shuffle"); auto disable_injection = seastar::defer([&] { utils::get_local_injector().disable("tablet_allocator_shuffle"); }); BOOST_REQUIRE(!e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr,topo.get_load_stats()).get().empty()); }).get(); } #endif SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); const auto shard_count = 2; auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto host1 = topo.add_node(node_state::normal, shard_count, rack1); auto host2 = topo.add_node(node_state::normal, shard_count, rack2); auto host3 = topo.add_node(node_state::normal, shard_count, rack1); auto host4 = topo.add_node(node_state::normal, shard_count, rack2); auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 16); auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, tests::random::get_int(0, shard_count - 1)}, tablet_replica {host2, tests::random::get_int(0, shard_count - 1)}, } }); } tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); shared_load_stats& load_stats = topo.get_shared_load_stats(); load_stats.set_default_tablet_sizes(stm.get()); rebalance_tablets(e, &load_stats); { load_sketch load(stm.get()); load.populate().get(); for (auto h : {host1, host2, host3, host4}) { testlog.debug("Checking host {}", h); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(h), 4); BOOST_REQUIRE_LE(load.get_shard_tablet_count_imbalance(h), 1); } } }).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_asymmetric_node_capacity) { do_with_cql_env_thread([](auto& e) { topology_builder topo(e); auto host1 = topo.add_node(node_state::decommissioning, 8); auto host2 = topo.add_node(node_state::normal, 1); auto host3 = topo.add_node(node_state::normal, 7); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid: tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, } }); } tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto until_nodes_drained = [] (const migration_plan& plan) { return !plan.has_nodes_to_drain(); }; auto& stm = e.shared_token_metadata().local(); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); rebalance_tablets(e, &topo.get_shared_load_stats(), {}, until_nodes_drained); { load_sketch load(stm.get()); load.populate().get(); for (auto h: {host2, host3}) { testlog.debug("Checking host {}", h); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(h), 2); // 16 tablets / 8 shards = 2 tablets / shard BOOST_REQUIRE_EQUAL(load.get_shard_tablet_count_imbalance(h), 0); } } }).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, 2); topo.add_node(node_state::normal, 2); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); auto table1 = add_table(e, ks_name).get(); abort_source as; auto guard = e.get_raft_group0_client().start_operation(as).get(); auto& stm = e.shared_token_metadata().local(); // host1 is loaded and host2 is empty, resulting in an imbalance. // host1's shard 0 is loaded and shard 1 is empty, resulting in intra-node imbalance. mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, } }); } tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get(); BOOST_REQUIRE(!plan.empty()); } // Disable load balancing stm.mutate_token_metadata([&] (token_metadata& tm) { tm.tablets().set_balancing_enabled(false); return make_ready_future<>(); }).get(); { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get(); BOOST_REQUIRE(plan.empty()); } // Check that cloning preserves the setting stm.mutate_token_metadata([&] (token_metadata& tm) { return make_ready_future<>(); }).get(); { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get(); BOOST_REQUIRE(plan.empty()); } // Enable load balancing back stm.mutate_token_metadata([&] (token_metadata& tm) { tm.tablets().set_balancing_enabled(true); return make_ready_future<>(); }).get(); { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get(); BOOST_REQUIRE(!plan.empty()); } // Check that cloning preserves the setting stm.mutate_token_metadata([&] (token_metadata& tm) { return make_ready_future<>(); }).get(); { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get(); BOOST_REQUIRE(!plan.empty()); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_drained_node_is_not_balanced_internally) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto host1 = topo.add_node(node_state::removing, 2); topo.add_node(node_state::normal, 2); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); auto table1 = add_table(e, ks_name).get(); abort_source as; auto guard = e.get_raft_group0_client().start_operation(as).get(); auto& stm = e.shared_token_metadata().local(); mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, } }); } tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); migration_plan plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get(); BOOST_REQUIRE(plan.has_nodes_to_drain()); for (auto&& mig : plan.migrations()) { BOOST_REQUIRE(mig.kind != tablet_transition_kind::intranode_migration); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_plan_fails_when_removing_last_replica) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto host1 = topo.add_node(); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); auto table1 = add_table(e, ks_name).get(); topo.set_node_state(host1, node_state::removing); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set{tablet_replica{host1, 0}} }); } tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); std::unordered_set skiplist = {host1}; auto& stm = e.shared_token_metadata().local(); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); BOOST_REQUIRE_THROW(rebalance_tablets(e, &topo.get_shared_load_stats(), skiplist), std::runtime_error); }).get(); } SEASTAR_THREAD_TEST_CASE(test_skiplist_is_ignored_when_draining) { // When doing normal load balancing, we can ignore DOWN nodes in the node set // and just balance the UP nodes among themselves because it's ok to equalize // load in that set. // It's dangerous to do that when draining because that can lead to overloading of the UP nodes. // In the worst case, we can have only one non-drained node in the UP set, which would receive // all the tablets of the drained node, doubling its load. // It's safer to let the drain fail/stall. do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto host1 = topo.add_node(node_state::removing); auto host2 = topo.add_node(node_state::normal); auto host3 = topo.add_node(node_state::normal); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 2); auto table1 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(2); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set{tablet_replica{host1, 0}} }); tid = *tmap.next_tablet(tid); tmap.set_tablet(tid, tablet_info { tablet_replica_set{tablet_replica{host1, 0}} }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); std::unordered_set skiplist = {host2}; topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); rebalance_tablets(e, &topo.get_shared_load_stats(), skiplist); { load_sketch load(stm.get()); load.populate().get(); for (auto h : {host2, host3}) { testlog.debug("Checking host {}", h); BOOST_REQUIRE_EQUAL(load.get_avg_tablet_count(h), 1); } } }).get(); } static void check_tablet_invariants(const tablet_metadata& tmeta) { for (const auto& [table, tmap] : tmeta.all_tables_ungrouped()) { tmap->for_each_tablet([&](auto tid, const tablet_info& tinfo) -> future<> { std::unordered_set hosts; // Uniqueness of hosts for (const auto& replica: tinfo.replicas) { auto ret = hosts.insert(replica.host).second; if (!ret) { testlog.error("Failed tablet invariant check for tablet {}: {}", tid, tinfo.replicas); } BOOST_REQUIRE(ret); } return make_ready_future<>(); }).get(); } } static std::vector allocate_replicas_in_racks(const std::vector& racks, int rf, const std::unordered_map>& hosts_by_rack) { // Choose replicas randomly while loading racks evenly. std::vector replica_hosts; for (int i = 0; i < rf; ++i) { auto rack = racks[i % racks.size()]; auto& rack_hosts = hosts_by_rack.at(rack.rack); while (true) { auto candidate_host = rack_hosts[tests::random::get_int(0, rack_hosts.size() - 1)]; if (std::find(replica_hosts.begin(), replica_hosts.end(), candidate_host) == replica_hosts.end()) { replica_hosts.push_back(candidate_host); break; } } } return replica_hosts; } SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) { auto do_test_case = [] (const shard_id rf) { cql_test_config cfg; cfg.need_remote_proxy = true; return do_with_cql_env_thread([rf] (auto& e) { topology_builder topo(e); const int n_hosts = 6; auto shard_count = 2; // Sanity check just in case someone modifies the caller of this lambda // and starts providing RF > n_hosts. In that case, we wouldn't be able // to create an RF-rack-valid keyspace. assert(rf <= n_hosts); std::vector hosts; std::unordered_map> hosts_by_rack; std::vector racks{topo.rack()}; for (shard_id i = 1; i < rf; ++i) { racks.push_back(topo.start_new_rack()); } for (int i = 0; i < n_hosts; ++i) { auto rack = racks[(i + 1) % racks.size()]; auto h = topo.add_node(node_state::normal, shard_count, rack); if (i) { // Leave the first host empty by making it invisible to allocation algorithm. hosts_by_rack[rack.rack].push_back(h); } } auto& stm = e.shared_token_metadata().local(); size_t total_tablet_count = 0; std::vector keyspaces; size_t tablet_count_bits = 8; for (size_t log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) { if (tests::random::get_bool()) { continue; } auto initial_tablets = 1 << log2_tablets; keyspaces.push_back(add_keyspace(e, {{topo.dc(), rf}}, initial_tablets)); auto table = add_table(e, keyspaces.back()).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(initial_tablets); for (auto tid : tmap.tablet_ids()) { // Choose replicas randomly while loading racks evenly. std::vector replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack); tablet_replica_set replicas; for (auto h : replica_hosts) { auto shard = tests::random::get_int(0, shard_count - 1); replicas.push_back(tablet_replica {h, shard}); } tmap.set_tablet(tid, tablet_info {std::move(replicas)}); } total_tablet_count += tmap.tablet_count(); tmeta.set_tablet_map(table, std::move(tmap)); return make_ready_future<>(); }); } testlog.debug("tablet metadata: {}", stm.get()->tablets()); testlog.info("Total tablet count: {}, hosts: {}", total_tablet_count, hosts.size()); check_tablet_invariants(stm.get()->tablets()); rebalance_tablets(e); check_tablet_invariants(stm.get()->tablets()); { load_sketch load(stm.get()); load.populate().get(); min_max_tracker min_max_load; for (auto h: hosts) { auto l = load.get_avg_tablet_count(h); testlog.info("Load on host {}: {}", h, l); min_max_load.update(l); BOOST_REQUIRE_LE(load.get_shard_tablet_count_imbalance(h), 1); } testlog.debug("tablet metadata: {}", stm.get()->tablets()); testlog.debug("Min load: {}, max load: {}", min_max_load.min(), min_max_load.max()); // FIXME: The algorithm cannot achieve balance in all cases yet, so we only check that it stops. // For example, if we have an overloaded node in one rack and target underloaded node in a different rack, // we won't be able to reduce the load gap by moving tablets between the two. We have to balance the overloaded // rack first, which is unconstrained. // Uncomment the following line when the algorithm is improved. // BOOST_REQUIRE(min_max_load.max() - min_max_load.min() <= 1); } seastar::parallel_for_each(keyspaces, [&] (const sstring& ks) { return e.execute_cql(fmt::format("DROP KEYSPACE {}", ks)).discard_result(); }).get(); }, std::move(cfg)); }; const int test_case_number = 13; for (int i = 0; i < test_case_number; ++i) { const shard_id rf = tests::random::get_int(2, 4); testlog.info("{}: Starting test case {} for RF={}", std::source_location::current().function_name(), i + 1, rf); do_test_case(rf).get(); } } SEASTAR_THREAD_TEST_CASE(test_balancing_heterogeneous_cluster) { // 3 racks, RF=3. 1 table with 90% space. // We start with 1 i4i_2xlarge per rack, then add i4i_large to each rack. // We want utilization to be balanced. do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); shared_load_stats& load_stats = topo.get_shared_load_stats(); auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto rack3 = topo.start_new_rack(); topo.add_i4i_2xlarge(rack1); topo.add_i4i_2xlarge(rack2); topo.add_i4i_2xlarge(rack3); auto& stm = e.shared_token_metadata().local(); auto ks_name = add_keyspace(e, {{topo.dc(), 3}}); auto table1 = add_table(e, ks_name).get(); load_stats.set_default_tablet_sizes(stm.get()); load_stats.set_size(table1, 0.9 * topo.get_capacity() / 3); rebalance_tablets(e, &load_stats); testlog.info("Initial cluster ready"); std::unordered_map initial_utilization; auto& hosts = topo.hosts(); { load_sketch load(stm.get(), load_stats.get()); load.populate().get(); for (auto h: hosts) { auto u = load.get_allocated_utilization(h); BOOST_REQUIRE(u); initial_utilization[h] = *u; } } topo.add_i4i_large(rack1); rebalance_tablets(e, &load_stats); testlog.info("Expanded capacity in rack1"); { load_sketch load(stm.get(), load_stats.get()); load.populate().get(); auto u0 = *load.get_allocated_utilization(hosts[0]); BOOST_REQUIRE_LT(u0, initial_utilization[hosts[0]]); initial_utilization[hosts[0]] = u0; // rack2 and rack3 are not changed, to keep racks not overloaded (RF=rack_count) BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[1]), initial_utilization[hosts[1]]); BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[2]), initial_utilization[hosts[2]]); } topo.add_i4i_large(rack2); rebalance_tablets(e, &load_stats); testlog.info("Expanded capacity in rack2"); { load_sketch load(stm.get(), load_stats.get()); load.populate().get(); BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[0]), initial_utilization[hosts[0]]); auto u1 = *load.get_allocated_utilization(hosts[1]); BOOST_REQUIRE_LT(u1, initial_utilization[hosts[1]]); initial_utilization[hosts[1]] = u1; BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[2]), initial_utilization[hosts[2]]); } topo.add_i4i_large(rack3); rebalance_tablets(e, &load_stats); testlog.info("Expanded capacity in rack3"); { load_sketch load(stm.get(), load_stats.get()); load.populate().get(); BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[0]), initial_utilization[hosts[0]]); BOOST_REQUIRE_EQUAL(*load.get_allocated_utilization(hosts[1]), initial_utilization[hosts[1]]); auto u2 = *load.get_allocated_utilization(hosts[2]); BOOST_REQUIRE_LT(u2, initial_utilization[hosts[2]]); initial_utilization[hosts[2]] = u2; // Check that utilization difference is < 1% min_max_tracker node_utilization; for (auto h: hosts) { auto u = load.get_allocated_utilization(h); BOOST_REQUIRE(u); node_utilization.update(*u); } BOOST_REQUIRE_LT(node_utilization.max() - node_utilization.min(), 0.01); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_imbalance_in_hetero_cluster_with_two_tables) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); shared_load_stats& load_stats = topo.get_shared_load_stats(); auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto rack3 = topo.start_new_rack(); topo.add_i4i_2xlarge(rack1); topo.add_i4i_2xlarge(rack2); topo.add_i4i_2xlarge(rack3); auto& stm = e.shared_token_metadata().local(); auto ks_name = add_keyspace(e, {{topo.dc(), 3}}, 128); auto table1 = add_table(e, ks_name).get(); load_stats.set_size(table1, 0); load_stats.set_default_tablet_sizes(stm.get()); testlog.info("Initial cluster ready"); topo.add_i4i_large(rack1); topo.add_i4i_large(rack2); topo.add_i4i_large(rack3); rebalance_tablets(e, &load_stats); testlog.info("Expanded capacity"); auto ks2_name = add_keyspace(e, {{topo.dc(), 3}}, 128); auto table2 = add_table(e, ks2_name).get(); auto& hosts = topo.hosts(); { load_sketch load(stm.get(), load_stats.get()); load.populate(std::nullopt, table2).get(); // Check that utilization difference is < 4% min_max_tracker node_utilization; for (auto h: hosts) { auto u = load.get_allocated_utilization(h); BOOST_REQUIRE(u); testlog.info("table2: {}: {}", h, u); node_utilization.update(*u); } // Initial allocation is not capacity-aware so we're still not perfect here. // See https://github.com/scylladb/scylladb/issues/23378 BOOST_REQUIRE_LT(node_utilization.max() - node_utilization.min(), 0.13); } }).get(); } // Reproduces https://github.com/scylladb/scylladb/issues/23631 SEASTAR_THREAD_TEST_CASE(test_imbalance_in_hetero_cluster_with_two_tables_imbalanced) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); shared_load_stats& load_stats = topo.get_shared_load_stats(); auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto rack3 = topo.start_new_rack(); topo.add_i4i_2xlarge(rack1); topo.add_i4i_2xlarge(rack2); topo.add_i4i_2xlarge(rack3); auto& stm = e.shared_token_metadata().local(); auto ks_name = add_keyspace(e, {{topo.dc(), 3}}, 512); auto table1 = add_table(e, ks_name).get(); load_stats.set_size(table1, topo.get_capacity() * 0.8 / 3); testlog.info("Initial cluster ready"); topo.add_i4i_large(rack1); topo.add_i4i_large(rack2); topo.add_i4i_large(rack3); testlog.info("Expanded capacity"); auto ks2_name = add_keyspace(e, {{topo.dc(), 3}}); auto table2 = add_table(e, ks2_name).get(); auto& hosts = topo.hosts(); { load_sketch load(stm.get(), load_stats.get()); load.populate(std::nullopt, table2).get(); min_max_tracker node_utilization; for (auto h : hosts) { auto u = load.get_allocated_utilization(h); testlog.info("table2: {}: {}", h, u); node_utilization.update(u.value_or(0)); } BOOST_REQUIRE_LT(node_utilization.max() - node_utilization.min(), 0.13); } }).get(); } static table_id create_table_and_set_tablet_sizes(cql_test_env& e, topology_builder& topo, sstring ks_name, size_t tablet_count, uint64_t table_size_bytes) { const uint64_t tablet_size = table_size_bytes / tablet_count; std::map tablet_options = {{"min_tablet_count", to_sstring(tablet_count)}}; auto table = add_table(e, ks_name, tablet_options).get(); auto& load_stats = topo.get_shared_load_stats(); load_stats.set_size(table, table_size_bytes); auto& stm = e.shared_token_metadata().local(); auto& tmap = stm.get()->tablets().get_tablet_map(table); tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) { auto replicas = tinfo.replicas; for (auto& r : tinfo.replicas) { locator::range_based_tablet_id rb_tid {table, tmap.get_token_range(tid)}; load_stats.set_tablet_size(r.host, rb_tid, tablet_size); } return make_ready_future<>(); }).get(); testlog.info("Created table {} of size {:i} with {} tablets and tablet size of {:i}", table, utils::pretty_printed_data_size(table_size_bytes), tablet_count, utils::pretty_printed_data_size(tablet_size)); return table; } SEASTAR_THREAD_TEST_CASE(test_size_based_load_balancing_table_load) { // This test validates the table balance in size based load balancing. // The initial tablet allocation during table creation is non-deterministic because of // shuffle in network_topology_strategy.cc. This means that the tablet balancer will work on a different // initial setup on every run, and that the final tablet distribution will also be different. // With max_imbalance_threshold set to 1.4 and running the test 10000 times there were no failures. // 1.5 was selected as a safety buffer to avoid flakyness. // // The following is a table of max_imbalance_threshold and failure rates for 10000 runs: // // threshold | # runs | # failures // ----------+--------+------------ // 1.4 | 10000 | 0 // 1.3 | 10000 | 57 // 1.2 | 10000 | 539 auto cfg = tablet_cql_test_config(); do_with_cql_env_thread([&] (auto& e) { logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug); topology_builder topo(e); endpoint_dc_rack dc_rack; const uint64_t shard_capacity = 250UL * 1024UL * 1024UL * 1024UL; const size_t tablet_count = 512; const double max_imbalance_threshold = 1.5; const double min_imbalance_threshold = 1 / max_imbalance_threshold; uint64_t total_capacity = 0; std::vector hosts; // Add disk capacity for the default node. Add all subsequent nodes to the same DC/rack e.shared_token_metadata().local().get()->get_topology().for_each_node([&] (const auto& node) { dc_rack = node.dc_rack(); auto host = node.host_id(); auto num_shards = node.get_shard_count(); auto node_capacity = shard_capacity * num_shards; topo.get_shared_load_stats().set_capacity(host, node_capacity); total_capacity += node_capacity; testlog.info("Default node {} has {} shards and {:i} disk capacity", host, num_shards, utils::pretty_printed_data_size(node_capacity)); hosts.push_back(host); }); auto create_node = [&] (size_t num_shards) { auto host = topo.add_node(node_state::normal, num_shards, dc_rack); auto node_capacity = shard_capacity * num_shards; topo.get_shared_load_stats().set_capacity(host, node_capacity); total_capacity += node_capacity; testlog.info("Added node {} with {} shards and {:i} disk capacity", host, num_shards, utils::pretty_printed_data_size(node_capacity)); hosts.push_back(host); }; create_node(10); create_node(8); auto ks_name = add_keyspace(e, {{dc_rack.dc, 1}}); // Add 3 tables: 0.5 of the current total storage, 0.25 of the total storage and 0.125 of the total storage std::map table_sizes; uint64_t table_size = total_capacity / 2; for (int c = 0; c < 3; c++) { auto table_id = create_table_and_set_tablet_sizes(e, topo, ks_name, tablet_count, table_size); table_sizes[table_id] = table_size; table_size /= 2; } // Add another table with 1 byte per tablet table_size = tablet_count; auto table_id = create_table_and_set_tablet_sizes(e, topo, ks_name, tablet_count, table_size); table_sizes[table_id] = table_size; auto& stm = e.shared_token_metadata().local(); auto check_balance = [&] { for (auto& [table, table_size] : table_sizes) { load_sketch load(stm.get(), topo.get_shared_load_stats().get()); load.populate(std::nullopt, table).get(); const double ideal_table_load = double(table_size) / total_capacity; min_max_tracker table_load; for (auto h : hosts) { auto shard_minmax_load = load.get_shard_minmax(h); table_load.update(shard_minmax_load); testlog.info("Table: {} ideal_load: {} host: {} load: {} min_shard_load: {} max_shard_load: {}", table, ideal_table_load, h, load.get_load(h), shard_minmax_load.min(), shard_minmax_load.max()); BOOST_REQUIRE_LT(min_imbalance_threshold, shard_minmax_load.min() / ideal_table_load); BOOST_REQUIRE_GT(max_imbalance_threshold, shard_minmax_load.max() / ideal_table_load); } } }; rebalance_tablets(e, &topo.get_shared_load_stats()); check_balance(); create_node(8); rebalance_tablets(e, &topo.get_shared_load_stats()); check_balance(); }, std::move(cfg)).get(); } static future<> run_imbalance_when_creating_plenty_of_tables_test(bool rf_rack_valid_keyspaces) { cql_test_config cfg{}; cfg.db_config->rf_rack_valid_keyspaces.set(std::move(rf_rack_valid_keyspaces)); return do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto rack1 = topo.rack(); auto rack2 = topo.start_new_rack(); auto rack3 = topo.start_new_rack(); topo.add_i4i_2xlarge(rack1); topo.add_i4i_2xlarge(rack2); topo.add_i4i_2xlarge(rack3); auto& stm = e.shared_token_metadata().local(); auto ks_name = add_keyspace(e, {{topo.dc(), 3}}, 1); for (int _ : std::views::iota(1, 100)) { add_table(e, ks_name, {{"min_per_shard_tablet_count", "10.0"}}).get(); } testlog.info("Initial cluster ready"); { load_sketch load(stm.get()); load.populate().get(); for (auto h: topo.hosts()) { auto node_utilization = load.get_shard_minmax_tablet_count(h); testlog.info("host {}: min={}, max={}", h, node_utilization.min(), node_utilization.max()); BOOST_REQUIRE_LT(node_utilization.max() - node_utilization.min(), 1.1); } } }, std::move(cfg)); } // Reproduces https://github.com/scylladb/scylladb/issues/27620 SEASTAR_THREAD_TEST_CASE(test_imbalance_when_creating_plenty_of_tables_with_RF_rack_valid_keyspaces_enforced) { run_imbalance_when_creating_plenty_of_tables_test(true).get(); } SEASTAR_THREAD_TEST_CASE(test_imbalance_when_creating_plenty_of_tables_with_RF_rack_valid_keyspaces_disabled) { run_imbalance_when_creating_plenty_of_tables_test(false).get(); } SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_mixed_dc_rf) { cql_test_config cfg = tablet_cql_test_config(); do_with_cql_env_thread([] (auto& e) { auto per_shard_goal = e.local_db().get_config().tablets_per_shard_goal(); topology_builder topo(e); shared_load_stats& load_stats = topo.get_shared_load_stats(); std::vector hosts; sstring dc1 = topo.dc(); hosts.push_back(topo.add_node(node_state::normal, 2)); topo.start_new_rack(); hosts.push_back(topo.add_node(node_state::normal, 2)); topo.start_new_rack(); hosts.push_back(topo.add_node(node_state::normal, 2)); auto dc2 = topo.start_new_dc().dc; hosts.push_back(topo.add_node(node_state::normal, 1)); topo.start_new_rack(); hosts.push_back(topo.add_node(node_state::normal, 1)); auto ks_name1 = add_keyspace(e, {{dc1, 3}}); auto ks_name2 = add_keyspace(e, {{dc2, 2}}); // table1 overflows per-shard goal in dc1, should be scaled down. // wants 400 tablets (3 nodes * 2 shards * 200 tablets/shard / rf=3 = 400 tablets) // which will be scaled down by a factor of 0.5 to achieve 100 tablets/shard, giving // 200 tablets, scaled up to the nearest power of 2, which is 256. e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1)) " "WITH tablets = {{'min_per_shard_tablet_count': 200}}", ks_name1)).get(); auto table1 = e.local_db().find_schema(ks_name1, "table1")->id(); // table2 has 64 tablets/shard in dc2, should not be scaled down. e.execute_cql(fmt::format("CREATE TABLE {}.table2 (p1 text, r1 int, PRIMARY KEY (p1)) " "WITH tablets = {{'min_per_shard_tablet_count': 64}}", ks_name2)).get(); auto table2 = e.local_db().find_schema(ks_name2, "table2")->id(); rebalance_tablets(e); { auto& stm = e.shared_token_metadata().local(); auto tm = stm.get(); BOOST_REQUIRE_EQUAL(tm->tablets().get_tablet_map(table1).tablet_count(), 256); BOOST_REQUIRE_EQUAL(tm->tablets().get_tablet_map(table2).tablet_count(), 64); load_sketch load(tm, load_stats.get()); load.populate().get(); for (auto h: hosts) { auto l = load.get_shard_minmax_tablet_count(h); testlog.info("Tablet count on host {}: min={}, max={}", h, l.min(), l.max()); BOOST_REQUIRE_LE(l.max(), 2 * per_shard_goal); } } }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancer_ignores_hosts_with_incomplete_stats) { // This checks that nodes with incomplete stats are not included in load balancing. do_with_cql_env_thread([] (auto& e) { logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug); topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, 2); auto host2 = topo.add_node(node_state::normal, 2); auto host3 = topo.add_node(node_state::normal, 2); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); auto table1 = add_table(e, ks_name).get(); abort_source as; auto guard = e.get_raft_group0_client().start_operation(as).get(); auto& stm = e.shared_token_metadata().local(); // Move all tablets to shard 0 of host2 mutate_tablets(e, guard, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(16); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host2, 0}, } }); } tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); // Set tablet sizes, then erase a tablet size from load_stats for host2 auto& shared_stats = topo.get_shared_load_stats(); shared_stats.set_default_tablet_sizes(stm.get()); auto tablet_size_i = shared_stats.stats.tablet_stats.at(host2).tablet_sizes.at(table1).begin(); shared_stats.stats.tablet_stats.at(host2).tablet_sizes.at(table1).erase(tablet_size_i); // Balancing should not issue any migrations because host2 will be ignored // due to incomplete tablet sizes in load_stats { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get(); BOOST_REQUIRE(plan.empty()); } // Balancing should issue migrations with host2 having all tablet sizes shared_stats.set_default_tablet_sizes(stm.get()); { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get(); BOOST_REQUIRE(!plan.empty()); BOOST_REQUIRE(!plan.migrations().empty()); for (auto&& mig : plan.migrations()) { BOOST_REQUIRE_EQUAL(mig.src.host, host2); } } }).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancer_does_not_balance_with_missing_tablet_sizes) { // This checks that the balancer will not issue migrations with incomplete tablet sizes do_with_cql_env_thread([] (auto& e) { logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug); topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, 2); auto host2 = topo.add_node(node_state::normal, 2); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 64); auto table1 = add_table(e, ks_name).get(); auto& stm = e.shared_token_metadata().local(); // Decommission host2 topo.set_node_state(host2, node_state::decommissioning); // Set tablet sizes, then erase a tablet size from load_stats for host1 and host2 auto& shared_stats = topo.get_shared_load_stats(); shared_stats.set_default_tablet_sizes(stm.get()); auto erase_tablet_size = [&] (host_id host) { auto tablet_size_i = shared_stats.stats.tablet_stats.at(host).tablet_sizes.at(table1).begin(); shared_stats.stats.tablet_stats.at(host).tablet_sizes.at(table1).erase(tablet_size_i); }; erase_tablet_size(host1); erase_tablet_size(host2); // Balancing should not issue migrations due to missing tablet sizes { auto plan = e.get_tablet_allocator().local().balance_tablets(stm.get(), nullptr, nullptr, topo.get_load_stats()).get(); BOOST_REQUIRE(plan.empty()); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_split_and_merge_of_colocated_tables) { do_with_cql_env_thread([] (auto& e) { logging::logger_registry().set_logger_level("load_balancer", logging::log_level::trace); topology_builder topo(e); unsigned shard_count = 2; auto host1 = topo.add_node(node_state::normal, shard_count); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 1); auto table1 = add_table(e, ks_name).get(); auto table2 = add_table(e, ks_name).get(); mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {host1, 0}, } }); tablet_map tmap1 = co_await tmap.clone_gently(); tmeta.set_tablet_map(table1, std::move(tmap1)); co_await tmeta.set_colocated_table(table2, table1); }); auto& stm = e.shared_token_metadata().local(); BOOST_REQUIRE_EQUAL(1, stm.get()->tablets().get_tablet_map(table1).tablet_count()); BOOST_REQUIRE_EQUAL(1, stm.get()->tablets().get_tablet_map(table2).tablet_count()); // the target tablet size for a group of co-located tablets is the default // target divided by the group size. see make_sizing_plan const uint64_t target_tablet_size = service::default_target_tablet_size / 2; shared_load_stats& load_stats = topo.get_shared_load_stats(); // avg tablet size = 3.5 * target > 2 * target load_stats.set_size(table1, 3*target_tablet_size); load_stats.set_size(table2, 4*target_tablet_size); rebalance_tablets(e, &load_stats); auto tablet_count_after_split = stm.get()->tablets().get_tablet_map(table1).tablet_count(); BOOST_REQUIRE_EQUAL(tablet_count_after_split, stm.get()->tablets().get_tablet_map(table2).tablet_count()); BOOST_REQUIRE_EQUAL(tablet_count_after_split, 2); // avg tablet size = (0.6 / 2) * target = 0.3 * target < 0.5 * target load_stats.set_size(table1, 1.1*target_tablet_size); load_stats.set_size(table2, 0.1*target_tablet_size); rebalance_tablets(e, &load_stats); auto tablet_count_after_merge = stm.get()->tablets().get_tablet_map(table1).tablet_count(); BOOST_REQUIRE_EQUAL(tablet_count_after_merge, stm.get()->tablets().get_tablet_map(table2).tablet_count()); BOOST_REQUIRE_EQUAL(tablet_count_after_merge, 1); }).get(); } // This test verifies that per-table tablet count is adjusted // in reaction to changes of relevant config and schema options. SEASTAR_THREAD_TEST_CASE(test_tablet_option_and_config_changes) { auto cfg = tablet_cql_test_config(); cfg.db_config->tablets_initial_scale_factor(10.0); cfg.need_remote_proxy = true; do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto dc = topo.dc(); // 3 shards. default initial scale wants 30 (32) tablets. // keyspace 'initial' wants 2 tablets. topo.add_node(node_state::normal, 3); auto ks_name1 = add_keyspace(e, {{dc, 1}}, 2); e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1))", ks_name1)).get(); auto table1 = e.local_db().find_schema(ks_name1, "table1")->id(); auto& stm = e.shared_token_metadata().local(); auto get_tablet_count = [&] { auto tm = stm.get(); return tm->tablets().get_tablet_map(table1).tablet_count(); }; shared_load_stats& load_stats = topo.get_shared_load_stats(); load_stats.set_size(table1, 0); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_EQUAL(get_tablet_count(), 2); // min_per_shard_tablet_count wants 5 * 3 = 15 (16) tablets e.execute_cql(fmt::format("ALTER TABLE {}.table1 " "WITH tablets = {{'min_per_shard_tablet_count': 5}}", ks_name1)).get(); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_EQUAL(get_tablet_count(), 16); // Check that hint can be dropped. e.execute_cql(fmt::format("ALTER TABLE {}.table1 WITH tablets = {{}}", ks_name1)).get(); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_EQUAL(get_tablet_count(), 2); // Default kicks in if keyspace setting and hint are missing. e.execute_cql(format("ALTER KEYSPACE {} with tablets = {{'enabled': true}}", ks_name1, dc)).get(); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_EQUAL(get_tablet_count(), 32); // initial scale can be live-updated. auto& cfg = e.db_config(); cfg.tablets_initial_scale_factor(5); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_EQUAL(get_tablet_count(), 16); // per-shard goal can be live-updated. // merge cfg.tablets_per_shard_goal(1); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_EQUAL(get_tablet_count(), 4); // split cfg.tablets_per_shard_goal(100); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_EQUAL(get_tablet_count(), 16); // initial scale can be smaller than 1. // 0.5 tablet/shard * 3 shards = 1.5 tablets =~ 2 tablets. cfg.tablets_initial_scale_factor(0.5); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_EQUAL(get_tablet_count(), 2); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_creating_lots_of_tables_doesnt_overflow_metadata) { auto cfg = tablet_cql_test_config(); cfg.db_config->tablets_initial_scale_factor(10.0); cfg.db_config->tablets_per_shard_goal(100); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto dc = topo.dc(); // 10 tablets/shard (initial_scale) * 16 shards = 160 tablets, rounded up to 256. // That's 16 tablet replicas per shard per table. // Creating 100 tables without scaling would give 1'600 tablets per shard, // which would overshoot the per-shard limit significantly. // This test verifies that scaling kicks in sooner as more tables are created, // and we end up with fewer tablets even before tablet merging is executed. auto host1 = topo.add_node(node_state::normal, 16); auto ks_name1 = add_keyspace(e, {{dc, 1}}); std::vector tables; shared_load_stats& load_stats = topo.get_shared_load_stats(); const auto nr_tables = 100u; parallel_for_each(std::views::iota(0u, nr_tables), [&] (auto i) -> future<> { auto table_name = fmt::format("table_{}", i); co_await e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name1, table_name)); table_id table = e.local_db().find_schema(ks_name1, table_name)->id(); tables.push_back(table); load_stats.set_size(table, 0); }).get(); auto& stm = e.shared_token_metadata().local(); load_stats.set_default_tablet_sizes(stm.get()); { load_sketch load(stm.get(), load_stats.get()); load.populate().get(); testlog.info("max tablet count: {}", load.get_shard_minmax_tablet_count(host1).max()); // The value 415 was determined empirically. If there was lack of scaling, it would be 1'600. BOOST_REQUIRE(load.get_shard_minmax_tablet_count(host1).max() <= 415); } rebalance_tablets(e, &load_stats); { load_sketch load(stm.get(), load_stats.get()); load.populate().get(); testlog.info("max tablet count: {}", load.get_shard_minmax_tablet_count(host1).max()); BOOST_REQUIRE(load.get_shard_minmax_tablet_count(host1).max() <= 200); } }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_load_stats_tablet_reconcile) { auto cfg = tablet_cql_test_config(); // This test checks the correctness of the load_stats reconciliation algorithm. // We only attempt to reconcile tablet_sizes after a merge or a split. do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto dc = topo.dc(); const size_t tablet_count = 16; auto host = topo.add_node(node_state::normal, 4); auto ks_name = add_keyspace(e, {{dc, 1}}, tablet_count); sstring table_name = "table_1"; e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get(); table_id table = e.local_db().find_schema(ks_name, table_name)->id(); auto& stm = e.shared_token_metadata().local(); token_metadata_ptr old_tmptr = stm.get(); auto& tmap = stm.get()->tablets().get_tablet_map(table); auto set_tablet_count = [&] (size_t new_tablet_count) { mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map new_tmap(new_tablet_count); tmeta.set_tablet_map(table, std::move(new_tmap)); return make_ready_future<>(); }); }; // This checks if the tablet sizes have been correctly reconciled after a merge { locator::load_stats stats; locator::tablet_load_stats& tls = stats.tablet_stats[host]; for (size_t i = 0; i < tablet_count; ++i) { const dht::token_range range {tmap.get_token_range(tablet_id(i))}; tls.tablet_sizes[table][range] = i; } size_t tablet_count_after_merge = tablet_count / 2; set_tablet_count(tablet_count_after_merge); auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tmptr, *stm.get()); BOOST_REQUIRE(reconciled_stats_ptr); locator::tablet_load_stats& reconciled_tls = reconciled_stats_ptr->tablet_stats[host]; BOOST_REQUIRE_EQUAL(reconciled_tls.tablet_sizes.at(table).size(), tablet_count_after_merge); locator::tablet_map tmap_after_merge(tablet_count_after_merge); for (size_t i = 0; i < tablet_count_after_merge; ++i) { dht::token_range trange {tmap_after_merge.get_token_range(locator::tablet_id{i})}; const uint64_t reconciled_tablet_size = reconciled_tls.tablet_sizes.at(table).at(trange); uint64_t expected_sum = 0; for (uint64_t i_sum = 0; i_sum < 2; ++i_sum) { expected_sum += i * 2 + i_sum; } BOOST_REQUIRE_EQUAL(reconciled_tablet_size, expected_sum); } } // This checks if the tablet sizes have been correctly reconciled after a split { locator::load_stats stats; locator::tablet_load_stats& tls = stats.tablet_stats[host]; for (size_t i = 0; i < tablet_count; ++i) { const dht::token_range range {tmap.get_token_range(tablet_id(i))}; tls.tablet_sizes[table][range] = i * 2; } size_t tablet_count_after_split = tablet_count * 2; set_tablet_count(tablet_count_after_split); auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tmptr, *stm.get()); BOOST_REQUIRE(reconciled_stats_ptr); locator::tablet_load_stats& reconciled_tls = reconciled_stats_ptr->tablet_stats[host]; BOOST_REQUIRE_EQUAL(reconciled_tls.tablet_sizes.at(table).size(), tablet_count_after_split); locator::tablet_map tmap_after_split(tablet_count_after_split); for (size_t i = 0; i < tablet_count_after_split; ++i) { dht::token_range trange {tmap_after_split.get_token_range(locator::tablet_id{i})}; const uint64_t reconciled_tablet_size = reconciled_tls.tablet_sizes.at(table).at(trange); BOOST_REQUIRE_EQUAL(reconciled_tablet_size, i / 2); } } }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_load_stats_tablet_reconcile_tablet_not_found) { auto cfg = tablet_cql_test_config(); // This test checks if the reconcile tablet algorithm returns nullptr when it // can't find all the tablet sizes in load_stats do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto dc = topo.dc(); const size_t tablet_count = 16; auto host = topo.add_node(node_state::normal, 4); auto ks_name = add_keyspace(e, {{dc, 1}}, tablet_count); sstring table_name = "table_1"; e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get(); table_id table = e.local_db().find_schema(ks_name, table_name)->id(); auto& stm = e.shared_token_metadata().local(); auto& tmap = stm.get()->tablets().get_tablet_map(table); locator::load_stats stats; locator::tablet_load_stats& tls = stats.tablet_stats[host]; // Add all tablet sizes except the last one. This will cause reconcile to return a nullptr for (size_t i = 0; i < tablet_count - 1; ++i) { const dht::token_range range {tmap.get_token_range(tablet_id(i))}; tls.tablet_sizes[table][range] = i; } token_metadata_ptr old_tm { stm.get() }; auto set_tablet_count = [&] (size_t new_tablet_count) { mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map new_tmap(new_tablet_count); tmeta.set_tablet_map(table, std::move(new_tmap)); return make_ready_future<>(); }); }; // Test if merge reconcile detects a missing sibling tablet in load_stats set_tablet_count(tablet_count / 2); auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tm, *stm.get()); BOOST_REQUIRE_EQUAL(reconciled_stats_ptr.get(), nullptr); // Test if split reconcile detects a missing tablet in load_stats set_tablet_count(tablet_count * 2); reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tm, *stm.get()); BOOST_REQUIRE_EQUAL(reconciled_stats_ptr.get(), nullptr); }, cfg).get(); } SEASTAR_TEST_CASE(test_load_stats_migrate_tablet_size) { auto table = table_id(utils::UUID_gen::get_time_UUID()); auto host1 = host_id(utils::UUID_gen::get_time_UUID()); auto host2 = host_id(utils::UUID_gen::get_time_UUID()); tablet_map tmap(8); tablet_id tid(1); const uint64_t tablet_size = 42; range_based_tablet_id rb_tid{table, tmap.get_token_range(tid)}; global_tablet_id gid{table, tid}; // Check tablet size is correctly migrated { load_stats stats; stats.tablet_stats[host1].tablet_sizes[table][rb_tid.range] = tablet_size; stats.tablet_stats[host2] = {}; auto new_load_stats = stats.migrate_tablet_size(host1, host2, gid, rb_tid.range); BOOST_REQUIRE(new_load_stats); // Check tablet size is on host2 auto tablet_size_opt = new_load_stats->get_tablet_size(host2, rb_tid); BOOST_REQUIRE(tablet_size_opt); BOOST_REQUIRE(*tablet_size_opt == tablet_size); // Check tablet size is not on host1 tablet_size_opt = new_load_stats->get_tablet_size(host1, rb_tid); BOOST_REQUIRE(!tablet_size_opt); // Check the migration removed the entry for the table after removing the last tablet size BOOST_REQUIRE(!new_load_stats->tablet_stats.at(host1).tablet_sizes.contains(table)); } // Check migrate_tablet_size() returns nullptr when tablet is not found on leaving replica { load_stats stats; stats.tablet_stats[host1] = {}; stats.tablet_stats[host2] = {}; auto new_load_stats = stats.migrate_tablet_size(host1, host2, gid, rb_tid.range); BOOST_REQUIRE(!new_load_stats); } // Check migrate_tablet_size() returns nullptr when tablet is already on pending replica { load_stats stats; stats.tablet_stats[host1].tablet_sizes[table][rb_tid.range] = tablet_size; stats.tablet_stats[host2].tablet_sizes[table][rb_tid.range] = tablet_size; auto new_load_stats = stats.migrate_tablet_size(host1, host2, gid, rb_tid.range); BOOST_REQUIRE(!new_load_stats); } // Check migrate_tablet_size() returns nullptr when leaving and pending replicas are equal { load_stats stats; stats.tablet_stats[host1].tablet_sizes[table][rb_tid.range] = tablet_size; stats.tablet_stats[host2] = {}; auto new_load_stats = stats.migrate_tablet_size(host1, host1, gid, rb_tid.range); BOOST_REQUIRE(!new_load_stats); } // Check migrate_tablet_size() returns nullptr when pending host is not found in load_stats { load_stats stats; stats.tablet_stats[host1].tablet_sizes[table][rb_tid.range] = tablet_size; auto new_load_stats = stats.migrate_tablet_size(host1, host2, gid, rb_tid.range); BOOST_REQUIRE(!new_load_stats); } return make_ready_future<>(); } SEASTAR_TEST_CASE(test_tablet_id_and_range_side) { static constexpr size_t tablet_count = 128; locator::tablet_map tmap(tablet_count); locator::tablet_map tmap_after_splitting(tablet_count * 2); for (size_t id = 0; id < tablet_count; id++) { auto left_id = tablet_id(id << 1); auto right_id = tablet_id(left_id.value() + 1); auto left_tr = tmap_after_splitting.get_token_range(left_id); auto right_tr = tmap_after_splitting.get_token_range(right_id); testlog.debug("id {}, left tr {}, right tr {}", id, left_tr, right_tr); auto test = [&tmap, id] (dht::token token, tablet_range_side expected_side) { auto [tid, side] = tmap.get_tablet_id_and_range_side(token); BOOST_REQUIRE_EQUAL(tid.value(), id); BOOST_REQUIRE_EQUAL(side, expected_side); }; auto test_range = [&] (dht::token_range& tr, tablet_range_side expected_side) { auto lower_token = tr.start()->value() == dht::minimum_token() ? dht::first_token() : tr.start()->value(); auto upper_token = tr.end()->value(); test(next_token(lower_token), expected_side); test(upper_token, expected_side); }; // Test the lower and upper bound of tablet's left and right ranges ("compaction groups"). test_range(left_tr, tablet_range_side::left); test_range(right_tr, tablet_range_side::right); } return make_ready_future<>(); } SEASTAR_THREAD_TEST_CASE(basic_tablet_storage_splitting_test) { auto cfg = tablet_cql_test_config(); cfg.initial_tablets = std::bit_floor(smp::count); do_with_cql_env_thread([] (cql_test_env& e) { e.execute_cql( "CREATE TABLE cf (pk int, ck int, v int, PRIMARY KEY (pk, ck))").get(); for (unsigned i = 0; i < smp::count * 20; i++) { e.execute_cql(format("INSERT INTO cf (pk, ck, v) VALUES ({}, 0, 0)", i)).get(); } e.db().invoke_on_all([] (replica::database& db) { auto& table = db.find_column_family("ks", "cf"); return table.flush(); }).get(); testlog.info("Splitting sstables..."); e.db().invoke_on_all([] (replica::database& db) { auto& table = db.find_column_family("ks", "cf"); testlog.info("sstable count: {}", table.sstables_count()); return table.split_all_storage_groups(tasks::task_info{}); }).get(); testlog.info("Verifying sstables are split..."); BOOST_REQUIRE_EQUAL(e.db().map_reduce0([] (replica::database& db) { auto& table = db.find_column_family("ks", "cf"); return make_ready_future(table.all_storage_groups_split()); }, bool(false), std::logical_or()).get(), true); }, std::move(cfg)).get(); } using rack_vector = std::vector; using hosts_by_rack_map = std::unordered_map>; // runs in seastar thread. static void do_test_load_balancing_merge_colocation(cql_test_env& e, const int n_racks, const int rf, const int n_hosts, const unsigned shard_count, const unsigned initial_tablets, std::function set_tablets) { topology_builder topo(e); rack_vector racks; for (int i = 0; i < n_racks; i++) { racks.push_back(topo.rack()); topo.start_new_rack(); } testlog.info("merge colocation test - hosts={}, racks={}, rf={}, shard_count={}, initial_tablets={}", n_hosts, racks.size(), rf, shard_count, initial_tablets); hosts_by_rack_map hosts_by_rack; for (int i = 0; i < n_hosts; ++i) { auto rack = racks[i % racks.size()]; auto h = topo.add_node(node_state::normal, shard_count, rack); hosts_by_rack[rack.rack].push_back(h); } auto ks_name = add_keyspace(e, {{topo.dc(), rf}}, initial_tablets); auto table1 = add_table(e, ks_name).get(); auto& stm = e.shared_token_metadata().local(); { abort_source as; auto guard = e.get_raft_group0_client().start_operation(as).get(); stm.mutate_token_metadata([&](token_metadata& tm) -> future<> { tablet_metadata& tmeta = tm.tablets(); tablet_map tmap(initial_tablets); locator::resize_decision decision; // leaves growing mode, allowing for merge decision. decision.sequence_number = decision.next_sequence_number(); tmap.set_resize_decision(std::move(decision)); set_tablets(tm, tmap, racks, hosts_by_rack); tmeta.set_tablet_map(table1, std::move(tmap)); tm.set_tablets(std::move(tmeta)); return make_ready_future < > (); }).get(); save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get(); } // Lower "initial" tablets option, allowing for merge decision. e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get(); auto tablet_count = [&] { return stm.get()->tablets().get_tablet_map(table1).tablet_count(); }; shared_load_stats& load_stats = topo.get_shared_load_stats(); load_stats.set_default_tablet_sizes(stm.get()); auto do_rebalance_tablets = [&] () { rebalance_tablets(e, &load_stats); }; const uint64_t target_tablet_size = service::default_target_tablet_size; auto merge_threshold = [&] () -> uint64_t { return (target_tablet_size * 0.5f) * tablet_count(); }; while (tablet_count() > 1) { load_stats.set_size(table1, merge_threshold() - 1); auto old_tablet_count = tablet_count(); check_tablet_invariants(stm.get()->tablets()); do_rebalance_tablets(); check_tablet_invariants(stm.get()->tablets()); BOOST_REQUIRE_LT(tablet_count(), old_tablet_count); } e.execute_cql(fmt::format("drop keyspace {}", ks_name)).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_random_load) { cql_test_config cfg; cfg.need_remote_proxy = true; do_with_cql_env_thread([] (auto& e) { auto seed = tests::random::get_int(); std::mt19937 random_engine{seed}; testlog.info("test_load_balancing_merge_colocation - seed {}", seed); for (auto i = 0; i < 10; i++) { const int rf = tests::random::get_int(3, 3); const int n_racks = rf; const int n_hosts = tests::random::get_int(n_racks * rf, n_racks * rf * 2); const unsigned shard_count = tests::random::get_int(2, 12); const unsigned total_shard_count = n_hosts * shard_count; const unsigned initial_tablets = std::bit_ceil(tests::random::get_int(total_shard_count, total_shard_count * 10)); auto set_tablets = [rf, shard_count] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) { for (auto tid : tmap.tablet_ids()) { testlog.debug("allocating replica in racks with rf {}", rf); std::vector replica_hosts = allocate_replicas_in_racks(racks, rf, hosts_by_rack); tablet_replica_set replicas; replicas.reserve(replica_hosts.size()); for (auto h : replica_hosts) { replicas.push_back(tablet_replica {h, tests::random::get_int(0, shard_count - 1)}); } testlog.debug("allocating replicas for tablet {}: {}", tid, replicas); tmap.set_tablet(tid, tablet_info {std::move(replicas)}); } }; do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets); } }, std::move(cfg)).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_single_rack) { cql_test_config cfg{}; cfg.need_remote_proxy = true; // This test purposefully uses just one rack, which means that we cannot enable // the `rf_rack_valid_keyspaces` configuration option because we won't be able to create // a keyspace with RF > 1. cfg.db_config->rf_rack_valid_keyspaces.set(false); do_with_cql_env_thread([] (auto& e) { const int rf = 2; const int n_racks = 1; const int n_hosts = 2; const unsigned shard_count = 2; const unsigned initial_tablets = 2; auto set_tablets = [] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) { auto& hosts = hosts_by_rack.at(racks.front().rack); auto host1 = hosts[0]; auto host2 = hosts[1]; tmap.set_tablet(tablet_id(0), tablet_info { tablet_replica_set { tablet_replica {host1, shard_id(0)}, tablet_replica {host2, shard_id(0)}, } }); tmap.set_tablet(tablet_id(1), tablet_info { tablet_replica_set { tablet_replica {host2, shard_id(0)}, tablet_replica {host1, shard_id(0)}, } }); }; do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets); }, cfg).get(); } // Verify merge can proceed with multiple racks and RF=#racks // // Given replica sets (not in rack order): // rack1 { n1, n2 } // rack2 { n3, n4 } // // t0: { n1, n3 } // t1: { n4, n2 } // SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_multiple_racks_and_rf_equals_racks) { cql_test_config cfg; cfg.need_remote_proxy = true; do_with_cql_env_thread([] (auto& e) { const int rf = 2; const int n_racks = rf; const int n_hosts = 4; // 2 nodes in each rack. const unsigned shard_count = 1; const unsigned initial_tablets = 2; auto set_tablets = [] (token_metadata&, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) { auto& first_rack_hosts = hosts_by_rack.at(racks[0].rack); auto& second_rack_hosts = hosts_by_rack.at(racks[1].rack); tmap.set_tablet(tablet_id(0), tablet_info { tablet_replica_set { tablet_replica {first_rack_hosts[0], shard_id(0)}, tablet_replica {second_rack_hosts[0], shard_id(0)}, } }); tmap.set_tablet(tablet_id(1), tablet_info { tablet_replica_set { tablet_replica {second_rack_hosts[1], shard_id(0)}, tablet_replica {first_rack_hosts[1], shard_id(0)}, } }); }; do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets); }, std::move(cfg)).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission) { cql_test_config cfg{}; cfg.need_remote_proxy = true; // The scenario this test addresses cannot happen with `rf_rack_valid_keyspaces` set to true. // // Among the tablet replicas for a given tablet, there CANNOT be two nodes from the same rack. // After the decommission of B, both tablets will reside on ALL other nodes, which implies that // they're on pairwise distinct racks. However, since B was taking part in replication of the // tablets, it must've been among the replicas of at least one of the tablets and, for the very // same reason, it must be on a separate rack. Hence, all nodes must reside on pairwise distinct racks. // // So, we if want to keep the current number of nodes and RF, we must have 4 racks. But we cannot // do that until we've implemented scylladb/scylladb#23737. Besides, the test seems to rely on // using just one rack, which makes it incompatible with `rf_rack_valid_keyspaces: true` anyway. cfg.db_config->rf_rack_valid_keyspaces.set(false); do_with_cql_env_thread([] (auto& e) { const int rf = 3; const int n_racks = 1; const int n_hosts = 4; const unsigned shard_count = 2; const unsigned initial_tablets = 2; auto set_tablets = [&] (token_metadata& tm, tablet_map& tmap, const rack_vector& racks, const hosts_by_rack_map& hosts_by_rack) { auto& rack = racks.front(); auto& hosts = hosts_by_rack.at(rack.rack); BOOST_REQUIRE(hosts.size() == 4); auto a = hosts[0]; auto b = hosts[1]; auto c = hosts[2]; auto d = hosts[3]; // nodes = {A, B, C, D} // tablet1 = {A, B, C} // tablet2 = {A, B, D} // viable target for {tablet1, B} is D. // viable target for {tablet2, B} is C. // // Decomission should succeed by migrating away even co-located replicas of sibling tablets that don't share viable targets. // That should produce: // tablet1 = {A, D, C} // tablet2 = {A, C, D} auto decision = tmap.resize_decision(); decision.way = locator::resize_decision::merge{}; tmap.set_resize_decision(std::move(decision)); tm.update_topology(b, rack, node::state::being_decommissioned, shard_count); tmap.set_tablet(tablet_id(0), tablet_info { tablet_replica_set { tablet_replica {a, shard_id(0)}, tablet_replica {b, shard_id(0)}, tablet_replica {c, shard_id(0)}, } }); tmap.set_tablet(tablet_id(1), tablet_info { tablet_replica_set { tablet_replica {a, shard_id(0)}, tablet_replica {b, shard_id(0)}, tablet_replica {d, shard_id(0)}, } }); }; do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) { do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); topo.add_node(node_state::normal, 2); topo.start_new_rack(); topo.add_node(node_state::normal, 2); const size_t initial_tablets = 2; auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, initial_tablets); auto table1 = add_table(e, ks_name).get(); auto& stm = e.shared_token_metadata().local(); auto tablet_count = [&] { return stm.get()->tablets().get_tablet_map(table1).tablet_count(); }; auto resize_decision = [&] { return stm.get()->tablets().get_tablet_map(table1).resize_decision(); }; shared_load_stats& load_stats = topo.get_shared_load_stats(); load_stats.set_default_tablet_sizes(stm.get()); auto do_rebalance_tablets = [&] () { rebalance_tablets(e, &load_stats, {}, nullptr, false); // no auto-split }; const uint64_t max_tablet_size = service::default_target_tablet_size * 2; auto to_size_in_bytes = [&] (double max_tablet_size_pctg) -> uint64_t { return (max_tablet_size * max_tablet_size_pctg) * tablet_count(); }; const auto initial_ready_seq_number = std::numeric_limits::min(); load_stats.set_split_ready_seq_number(table1, initial_ready_seq_number); // avg size moved above target size, so merge is cancelled { load_stats.set_size(table1, to_size_in_bytes(0.75)); do_rebalance_tablets(); BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets); BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); } // Drop initial tablet count to 1 so merge can happen. e.execute_cql(fmt::format("alter keyspace {} with tablets = {{'enabled': true, 'initial': 1}}", ks_name)).get(); // avg size hits split threshold, and balancer emits split request { load_stats.set_size(table1, to_size_in_bytes(1.1)); do_rebalance_tablets(); BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets); BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); BOOST_REQUIRE_GT(resize_decision().sequence_number, 0); } // replicas set their split status as ready, and load balancer finalizes split generating a new // tablet map, twice as large as the previous one. { load_stats.set_split_ready_seq_number(table1, resize_decision().sequence_number); do_rebalance_tablets(); BOOST_REQUIRE_EQUAL(tablet_count(), initial_tablets * 2); BOOST_REQUIRE(std::holds_alternative(resize_decision().way)); } // Check that balancer detects table size dropped to 0 and reduces tablet count down to 1 through merges. { load_stats.set_size(table1, to_size_in_bytes(0.0)); load_stats.set_split_ready_seq_number(table1, initial_ready_seq_number); do_rebalance_tablets(); BOOST_REQUIRE_EQUAL(tablet_count(), 1); } }).get(); } SEASTAR_THREAD_TEST_CASE(test_drain_node_without_capacity) { do_with_cql_env_thread([] (auto& e) { logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug); topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, 2); auto host2 = topo.add_node(node_state::normal, 2); const uint64_t node_capacity = 100UL * 1024UL * 1024UL * 1024UL; topo.get_shared_load_stats().set_capacity(host1, node_capacity); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 16); auto table = add_table(e, ks_name).get(); topo.set_node_state(host2, node_state::removing); auto& stm = e.shared_token_metadata().local(); topo.get_shared_load_stats().set_default_tablet_sizes(stm.get()); rebalance_tablets(e, &topo.get_shared_load_stats()); // check that all tablets have been migrated from host2 to host1 auto& tmap = stm.get()->tablets().get_tablet_map(table); tmap.for_each_tablet([&](auto tid, auto& tinfo) { for (auto& replica : tinfo.replicas) { BOOST_REQUIRE_EQUAL(replica.host, host1); } return make_ready_future<>(); }).get(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_tablet_range_splitter) { simple_schema ss; const auto dks = ss.make_pkeys(4); auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); tablet_map tmap(4); auto tb = tmap.first_tablet(); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h2, 0}, tablet_replica {h3, 0}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 3}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h2, 2}, } }); tb = *tmap.next_tablet(tb); tmap.set_tablet(tb, tablet_info { tablet_replica_set { tablet_replica {h1, 1}, tablet_replica {h2, 1}, } }); using result = tablet_range_splitter::range_split_result; using bound = dht::partition_range::bound; std::vector included_ranges; std::vector excluded_ranges; for (auto tid = std::optional(tmap.first_tablet()); tid; tid = tmap.next_tablet(*tid)) { const auto& tablet_info = tmap.get_tablet_info(*tid); auto replica_it = std::ranges::find_if(tablet_info.replicas, [&] (auto&& r) { return r.host == h1; }); auto token_range = tmap.get_token_range(*tid); auto range = dht::to_partition_range(token_range); if (replica_it == tablet_info.replicas.end()) { testlog.info("tablet#{}: {} (no replica on h1)", *tid, token_range); excluded_ranges.emplace_back(std::move(range)); } else { testlog.info("tablet#{}: {} (shard {})", *tid, token_range, replica_it->shard); included_ranges.emplace_back(result{replica_it->shard, std::move(range)}); } } dht::ring_position_comparator cmp(*ss.schema()); auto check = [&] (const dht::partition_range_vector& ranges, std::vector expected_result, std::source_location sl = std::source_location::current()) { testlog.info("check() @ {}:{} ranges={}", sl.file_name(), sl.line(), ranges); locator::tablet_range_splitter range_splitter{ss.schema(), tmap, h1, ranges}; auto it = expected_result.begin(); while (auto range_opt = range_splitter()) { testlog.debug("result: shard={} range={}", range_opt->shard, range_opt->range); BOOST_REQUIRE(it != expected_result.end()); testlog.debug("expected: shard={} range={}", it->shard, it->range); BOOST_REQUIRE_EQUAL(it->shard, range_opt->shard); BOOST_REQUIRE(it->range.equal(range_opt->range, cmp)); ++it; } if (it != expected_result.end()) { while (it != expected_result.end()) { testlog.error("missing expected result: shard={} range={}", it->shard, it->range); ++it; } BOOST_FAIL("splitter didn't provide all expected ranges"); } }; auto check_single = [&] (const dht::partition_range& range, std::vector expected_result, std::source_location sl = std::source_location::current()) { dht::partition_range_vector ranges; ranges.reserve(1); ranges.push_back(std::move(range)); check(ranges, std::move(expected_result), sl); }; auto intersect = [&] (const dht::partition_range& range) { std::vector intersecting_ranges; for (const auto& included_range : included_ranges) { if (auto intersection = included_range.range.intersection(range, cmp)) { intersecting_ranges.push_back({included_range.shard, std::move(*intersection)}); } } return intersecting_ranges; }; auto check_intersection_single = [&] (const dht::partition_range& range, std::source_location sl = std::source_location::current()) { check_single(range, intersect(range), sl); }; auto check_intersection = [&] (const dht::partition_range_vector& ranges, std::source_location sl = std::source_location::current()) { std::vector expected_ranges; for (const auto& range : ranges) { auto res = intersect(range); std::move(res.begin(), res.end(), std::back_inserter(expected_ranges)); } std::sort(expected_ranges.begin(), expected_ranges.end(), [&] (const auto& a, const auto& b) { return !a.range.start() || b.range.before(a.range.start()->value(), cmp); }); check(ranges, expected_ranges, sl); }; check_single(dht::partition_range::make_open_ended_both_sides(), included_ranges); check(included_ranges | std::views::transform([&] (auto& r) { return r.range; }) | std::ranges::to(), included_ranges); check(excluded_ranges, {}); check_intersection_single({bound{dks[0], true}, bound{dks[1], false}}); check_intersection_single({bound{dks[0], false}, bound{dks[2], true}}); check_intersection_single({bound{dks[2], true}, bound{dks[3], false}}); check_intersection_single({bound{dks[0], false}, bound{dks[3], false}}); check_intersection_single(dht::partition_range::make_starting_with(bound(dks[2], true))); check_intersection_single(dht::partition_range::make_ending_with(bound(dks[1], false))); check_intersection_single(dht::partition_range::make_singular(dks[3])); check_intersection({ dht::partition_range::make_ending_with(bound(dks[0], false)), {bound{dks[1], true}, bound{dks[2], false}}, dht::partition_range::make_starting_with(bound(dks[3], true))}); check_intersection({ {bound{dks[0], true}, bound{dks[1], false}}, {bound{dks[1], true}, bound{dks[2], false}}, {bound{dks[2], true}, bound{dks[3], false}}}); } static locator::endpoint_dc_rack make_endpoint_dc_rack(gms::inet_address endpoint) { // This resembles rack_inferring_snitch dc/rack generation which is // still in use by this test via token_metadata internals auto dc = std::to_string(uint8_t(endpoint.bytes()[1])); auto rack = std::to_string(uint8_t(endpoint.bytes()[2])); return locator::endpoint_dc_rack{dc, rack}; } struct calculate_tablet_replicas_for_new_rf_config { struct ring_point { double point; inet_address host; host_id id = host_id::create_random_id(); }; std::vector ring_points; replication_strategy_config_options options; replication_strategy_config_options new_dc_rep_factor; std::map expected_rep_factor; }; static void execute_tablet_for_new_rf_test(calculate_tablet_replicas_for_new_rf_config const& test_config) { auto my_address = gms::inet_address("localhost"); // Create the RackInferringSnitch snitch_config cfg; cfg.listen_address = my_address; cfg.broadcast_address = my_address; cfg.name = "RackInferringSnitch"; sharded snitch; snitch.start(cfg).get(); auto stop_snitch = defer([&snitch] { snitch.stop().get(); }); snitch.invoke_on_all(&snitch_ptr::start).get(); static constexpr size_t tablet_count = 8; std::vector nodes_shard_count(test_config.ring_points.size(), 3); locator::token_metadata::config tm_cfg; tm_cfg.topo_cfg.this_endpoint = test_config.ring_points[0].host; tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() }; tm_cfg.topo_cfg.this_host_id = test_config.ring_points[0].id; locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); auto stop_stm = deferred_stop(stm); // Initialize the token_metadata stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { auto& topo = tm.get_topology(); for (const auto& [ring_point, endpoint, id] : test_config.ring_points) { std::unordered_set tokens; tokens.insert(dht::token{tests::d2t(ring_point / test_config.ring_points.size())}); topo.add_or_update_endpoint(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, 1); co_await tm.update_normal_tokens(std::move(tokens), id); } }).get(); locator::replication_strategy_params params(test_config.options, tablet_count, std::nullopt); auto ars_ptr = abstract_replication_strategy::create_replication_strategy( "NetworkTopologyStrategy", params, stm.get()->get_topology()); auto tablet_aware_ptr = ars_ptr->maybe_as_tablet_aware(); BOOST_REQUIRE(tablet_aware_ptr); auto s = schema_builder("ks", "tb") .with_column("pk", utf8_type, column_kind::partition_key) .with_column("v", utf8_type) .build(); stm.mutate_token_metadata([&] (token_metadata& tm) { for (size_t i = 0; i < test_config.ring_points.size(); ++i) { auto& [ring_point, endpoint, id] = test_config.ring_points[i]; tm.update_topology(id, make_endpoint_dc_rack(endpoint), node::state::normal, nodes_shard_count[i]); } return make_ready_future<>(); }).get(); auto allocated_map = tablet_aware_ptr->allocate_tablets_for_new_table(s, stm.get(), tablet_count).get(); BOOST_REQUIRE_EQUAL(allocated_map.tablet_count(), tablet_count); auto host_id_to_dc = [&stm](const locator::host_id& ep) -> std::optional { auto node = stm.get()->get_topology().find_node(ep); if (node == nullptr) { return std::nullopt; } return node->dc_rack().dc; }; stm.mutate_token_metadata([&] (token_metadata& tm) { tablet_metadata tab_meta; auto table = s->id(); tab_meta.set_tablet_map(table, std::move(allocated_map)); tm.set_tablets(std::move(tab_meta)); return make_ready_future<>(); }).get(); std::map initial_rep_factor; for (auto const& [dc, shard_count] : test_config.options) { initial_rep_factor[dc] = locator::get_replication_factor(shard_count); } auto tablets = stm.get()->tablets().get_tablet_map(s->id()).clone_gently().get(); BOOST_REQUIRE_EQUAL(tablets.tablet_count(), tablet_count); for (auto tb : tablets.tablet_ids()) { const locator::tablet_info& ti = tablets.get_tablet_info(tb); std::map dc_replicas_count; for (const auto& r : ti.replicas) { auto dc = host_id_to_dc(r.host); if (dc) { dc_replicas_count[*dc]++; } } BOOST_REQUIRE_EQUAL(dc_replicas_count, initial_rep_factor); } try { tablet_map old_tablets = stm.get()->tablets().get_tablet_map(s->id()).clone_gently().get(); locator::replication_strategy_params params{test_config.new_dc_rep_factor, old_tablets.tablet_count(), std::nullopt}; auto new_strategy = abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params, stm.get()->get_topology()); auto tmap = new_strategy->maybe_as_tablet_aware()->reallocate_tablets(s, stm.get(), std::move(old_tablets)).get(); auto const& ts = tmap.tablets(); BOOST_REQUIRE_EQUAL(ts.size(), tablet_count); for (auto tb : tmap.tablet_ids()) { const locator::tablet_info& ti = tmap.get_tablet_info(tb); std::map dc_replicas_count; for (const auto& r : ti.replicas) { auto dc = host_id_to_dc(r.host); if (dc) { dc_replicas_count[*dc]++; } } BOOST_REQUIRE_EQUAL(dc_replicas_count, test_config.expected_rep_factor); } } catch (exceptions::configuration_exception const& e) { thread_local boost::regex re( "Datacenter [0-9]+ doesn't have enough token-owning nodes for replication_factor=[0-9]+"); boost::cmatch what; if (!boost::regex_search(e.what(), what, re)) { BOOST_FAIL("Unexpected exception: " + std::string(e.what())); } } catch (std::exception const& e) { BOOST_FAIL("Unexpected exception: " + std::string(e.what())); } catch (...) { BOOST_FAIL("Unexpected exception"); } } SEASTAR_THREAD_TEST_CASE(test_ensure_node_for_load_sketch) { // This tests reproduces the balancer crash when a node is drained and there are more then one // empty destination nodes. If one of these destination nodes has a lower capacity then the other, // and the initial target node selected is the one with lower capacity, pick_candidate() will then // change the target node to the one with higher capacity. The problem is that // load_sketch::get_least_loaded_shard() and consequently load_sketch::ensure_node() have not yet // been called for the new, larger target (only for the initial, smaller one), and load_sketch will // not have the larger node in its _nodes member hash map. This will cause an std::out_of_bounds // exception when load_sketch::pick() is called with the host_id of the larger node. do_with_cql_env_thread([] (auto& e) { logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug); topology_builder topo(e); auto host1 = topo.add_node(node_state::normal, 2); const uint64_t node1_capacity = 50UL * 1024UL * 1024UL * 1024UL; topo.get_shared_load_stats().set_capacity(host1, node1_capacity); auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, 4); add_table(e, ks_name).get(); auto host2 = topo.add_node(node_state::normal, 2); const uint64_t node2_capacity = 70UL * 1024UL * 1024UL * 1024UL; topo.get_shared_load_stats().set_capacity(host2, node2_capacity); auto host3 = topo.add_node(node_state::normal, 2); const uint64_t node3_capacity = 60UL * 1024UL * 1024UL * 1024UL; topo.get_shared_load_stats().set_capacity(host3, node3_capacity); topo.set_node_state(host1, node_state::removing); auto& talloc = e.get_tablet_allocator().local(); auto& stm = e.shared_token_metadata().local(); talloc.balance_tablets(stm.get(), nullptr, nullptr, topo.get_shared_load_stats().get()).get(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_upsize_one_dc) { calculate_tablet_replicas_for_new_rf_config config; config.ring_points = { { 1.0, inet_address("192.100.10.1") }, { 4.0, inet_address("192.100.20.1") }, { 7.0, inet_address("192.100.30.1") }, }; config.options = {{"100", "2"}}; config.new_dc_rep_factor = {{"100", "3"}}; config.expected_rep_factor = {{"100", 3}}; execute_tablet_for_new_rf_test(config); } SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_downsize_one_dc) { calculate_tablet_replicas_for_new_rf_config config; config.ring_points = { { 1.0, inet_address("192.100.10.1") }, { 4.0, inet_address("192.100.20.1") }, { 7.0, inet_address("192.100.30.1") }, }; config.options = {{"100", "3"}}; config.new_dc_rep_factor = {{"100", "2"}}; config.expected_rep_factor = {{"100", 2}}; execute_tablet_for_new_rf_test(config); } SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_no_change_one_dc) { calculate_tablet_replicas_for_new_rf_config config; config.ring_points = { { 1.0, inet_address("192.100.10.1") }, { 4.0, inet_address("192.100.20.1") }, { 7.0, inet_address("192.100.30.1") }, }; config.options = {{"100", "3"}}; config.new_dc_rep_factor = {{"100", "3"}}; config.expected_rep_factor = {{"100", 3}}; execute_tablet_for_new_rf_test(config); } SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf) { calculate_tablet_replicas_for_new_rf_config config; config.ring_points = { { 1.0, inet_address("192.100.10.1") }, { 2.0, inet_address("192.101.10.1") }, { 3.0, inet_address("192.102.10.1") }, { 4.0, inet_address("192.100.20.1") }, { 5.0, inet_address("192.101.20.1") }, { 6.0, inet_address("192.102.20.1") }, { 7.0, inet_address("192.100.30.1") }, { 8.0, inet_address("192.101.30.1") }, { 9.0, inet_address("192.102.30.1") }, { 10.0, inet_address("192.101.40.1") }, { 11.0, inet_address("192.102.40.1") }, { 12.0, inet_address("192.102.40.2") } }; config.options = { {"100", "3"}, {"101", "2"}, {"102", "3"} }; config.new_dc_rep_factor = { {"100", "3"}, {"101", "4"}, {"102", "2"} }; config.expected_rep_factor = { {"100", 3}, {"101", 4}, {"102", 2} }; execute_tablet_for_new_rf_test(config); } SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_not_enough_nodes) { calculate_tablet_replicas_for_new_rf_config config; config.ring_points = { { 1.0, inet_address("192.100.10.1") }, { 4.0, inet_address("192.100.20.1") }, { 7.0, inet_address("192.100.30.1") }, }; config.options = {{"100", "3"}}; config.new_dc_rep_factor = {{"100", "5"}}; config.expected_rep_factor = {{"100", 3}}; execute_tablet_for_new_rf_test(config); } SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_one_dc) { calculate_tablet_replicas_for_new_rf_config config; config.ring_points = { { 1.0, inet_address("192.100.10.1") }, { 4.0, inet_address("192.100.20.1") }, { 7.0, inet_address("192.100.30.1") }, }; config.options = {{"100", "2"}}; config.new_dc_rep_factor = {{"100", "3"}}; config.expected_rep_factor = {{"100", 3}}; execute_tablet_for_new_rf_test(config); } SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_one_dc_1_to_2) { calculate_tablet_replicas_for_new_rf_config config; config.ring_points = { { 1.0, inet_address("192.100.10.1") }, { 4.0, inet_address("192.100.20.1") }, }; config.options = {{"100", "1"}}; config.new_dc_rep_factor = {{"100", "2"}}; config.expected_rep_factor = {{"100", 2}}; execute_tablet_for_new_rf_test(config); } SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_one_dc_not_enough_nodes) { calculate_tablet_replicas_for_new_rf_config config; config.ring_points = { { 1.0, inet_address("192.100.10.1") }, { 4.0, inet_address("192.100.10.2") }, { 7.0, inet_address("192.100.10.3") }, }; config.options = {{"100", "3"}}; config.new_dc_rep_factor = {{"100", "5"}}; config.expected_rep_factor = {{"100", 3}}; execute_tablet_for_new_rf_test(config); } SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_default_rf) { calculate_tablet_replicas_for_new_rf_config config; config.ring_points = { { 1.0, inet_address("192.100.10.1") }, { 2.0, inet_address("192.101.10.1") }, { 3.0, inet_address("192.102.10.1") }, { 4.0, inet_address("192.100.20.1") }, { 5.0, inet_address("192.101.20.1") }, { 6.0, inet_address("192.102.20.1") }, { 7.0, inet_address("192.100.30.1") }, { 8.0, inet_address("192.101.30.1") }, { 9.0, inet_address("192.102.30.1") }, { 10.0, inet_address("192.100.40.1") }, { 11.0, inet_address("192.101.40.1") }, { 12.0, inet_address("192.102.40.1") }, { 13.0, inet_address("192.102.40.2") } }; config.options = { {"100", "3"}, {"101", "2"}, {"102", "2"} }; config.new_dc_rep_factor = { {"100", "4"}, {"101", "3"}, {"102", "3"}, }; config.expected_rep_factor = { {"100", 4}, {"101", 3}, {"102", 3}, }; execute_tablet_for_new_rf_test(config); } SEASTAR_THREAD_TEST_CASE(test_calculate_tablet_replicas_for_new_rf_default_rf_upsize_by_two) { calculate_tablet_replicas_for_new_rf_config config; config.ring_points = { { 1.0, inet_address("192.100.10.1") }, { 2.0, inet_address("192.101.10.1") }, { 3.0, inet_address("192.102.10.1") }, { 4.0, inet_address("192.100.20.1") }, { 5.0, inet_address("192.101.20.1") }, { 6.0, inet_address("192.102.20.1") }, { 7.0, inet_address("192.100.30.1") }, { 8.0, inet_address("192.101.30.1") }, { 9.0, inet_address("192.102.30.1") }, { 10.0, inet_address("192.100.40.1") }, { 11.0, inet_address("192.101.40.1") }, { 12.0, inet_address("192.102.40.1") }, { 13.0, inet_address("192.102.40.2") } }; config.options = { {"100", "3"}, {"101", "2"}, {"102", "1"} }; config.new_dc_rep_factor = { {"100", "4"}, {"101", "3"}, {"102", "3"}, }; config.expected_rep_factor = { {"100", 4}, {"101", 3}, {"102", 3}, }; execute_tablet_for_new_rf_test(config); } SEASTAR_TEST_CASE(test_tablet_count_metric) { auto cfg = tablet_cql_test_config(); for (unsigned n = 1; n <= smp::count; n *= 2) { cfg.initial_tablets = n; } return do_with_cql_env_thread([cfg] (cql_test_env& e) { auto tid = add_table(e).get(); auto total = e.db().map_reduce0([&] (replica::database& db) { auto count = db.find_column_family(tid).get_stats().tablet_count; testlog.debug("shard table_count={}", count); return count; }, int64_t(0), std::plus()).get(); BOOST_REQUIRE_EQUAL(total, cfg.initial_tablets); }, cfg); } SEASTAR_TEST_CASE(test_cleanup_of_deallocated_tablet) { auto cfg = tablet_cql_test_config(); cfg.initial_tablets = 1; return do_with_cql_env_thread([](cql_test_env& e) { // Create a table. e.execute_cql("create table ks.cf (pk int, ck int, primary key (pk, ck))").get(); size_t all_tablets = 0; // Double cleanup the tablet. e.db().invoke_on_all([&] (replica::database& db) -> future<> { auto& cf = db.find_column_family("ks", "cf"); auto& sys_ks = e.get_system_keyspace().local(); auto tablet_count = cf.get_stats().tablet_count; all_tablets += tablet_count; if (tablet_count > 0) { co_await cf.cleanup_tablet(db, sys_ks, locator::tablet_id(0)); co_await cf.cleanup_tablet(db, sys_ks, locator::tablet_id(0)); } }).get(); assert(all_tablets); }, cfg); } namespace { future<> test_create_keyspace(sstring ks_name, std::optional tablets_opt, const cql_test_config& cfg, uint64_t initial_tablets = 0, sstring replication_strategy = "NetworkTopologyStrategy") { co_await do_with_cql_env_thread([&] (cql_test_env& e) { sstring extra; if (tablets_opt) { if (*tablets_opt) { if (initial_tablets) { extra = format(" and tablets = {{ 'initial' : {} }}", initial_tablets); } else { extra = " and tablets = { 'enabled' : true }"; } } else { extra = " and tablets = { 'enabled' : false }"; } } auto q = format("create keyspace {} with replication = {{ 'class' : '{}', 'replication_factor' : 1 }}{};", ks_name, replication_strategy, extra); testlog.debug("{}", q); e.execute_cql(q).get(); BOOST_REQUIRE(e.local_db().has_keyspace(ks_name)); auto tid = add_table(e, ks_name).get(); auto total = e.db().map_reduce0([&] (replica::database& db) { auto count = db.find_column_family(tid).get_stats().tablet_count; testlog.debug("shard table_count={}", count); return count; }, int64_t(0), std::plus()).get(); if (tablets_opt.value_or(cfg.db_config->enable_tablets_by_default())) { if (initial_tablets) { BOOST_REQUIRE_EQUAL(total, initial_tablets); } else { BOOST_REQUIRE_GT(total, 0); } } else { BOOST_REQUIRE_EQUAL(total, 0); } }, cfg); } } // Test that tablets can be explicitly enabled // when creating a keyspace when the `tablets_mode_for_new_keyspaces` // configuration option is set to `disabled`. SEASTAR_TEST_CASE(test_explicit_tablets_enable) { auto cfg = tablet_cql_test_config(db::tablets_mode_t::mode::disabled); // By default tablets are disabled co_await test_create_keyspace("test_default_settings", std::nullopt, cfg); // Tablets can be explicitly enabled for a new keyspace co_await test_create_keyspace("test_explictly_enabled_0", true, cfg, 0); co_await test_create_keyspace("test_explictly_enabled_128", true, cfg, 128); // Tablets can also be explicitly disabled for a new keyspace co_await test_create_keyspace("test_explictly_disabled", false, cfg); // Replication strategies that do not support tablets cannot be used when tablets are explicitly enabled for (const auto& [rs_desc, rs_type] : db::replication_strategy_restriction_t::map()) { if (rs_type != locator::replication_strategy_type::network_topology) { auto f = co_await coroutine::as_future(test_create_keyspace("test_unsupported_replication_strategy", true, cfg, 0, rs_desc)); BOOST_REQUIRE_THROW(f.get(), exceptions::configuration_exception); } } } // Test that tablets can be explicitly disabled // when creating a keyspace when the `tablets_mode_for_new_keyspaces` // configuration option is set to `enabled`. SEASTAR_TEST_CASE(test_explicit_tablets_disable) { auto cfg = tablet_cql_test_config(db::tablets_mode_t::mode::enabled); // By default tablets are enabled co_await test_create_keyspace("test_default_settings", std::nullopt, cfg); // Tablets can be explicitly disabled for a new keyspace co_await test_create_keyspace("test_explictly_disabled", false, cfg); // Tablets can also be explicitly enabled for a new keyspace co_await test_create_keyspace("test_explictly_enabled_0", true, cfg, 0); co_await test_create_keyspace("test_explictly_enabled_128", true, cfg, 128); } // Test that when tablets they cannot be explicitly disabled // when creating a keyspace when the `enable_tablets` // configuration option is set to `force`. SEASTAR_TEST_CASE(test_enforce_tablets) { auto cfg = tablet_cql_test_config(db::tablets_mode_t::mode::enforced); // By default tablets are enabled co_await test_create_keyspace("test_default_settings", std::nullopt, cfg); // Tablets cannot be explicitly disabled for a new keyspace auto f = co_await coroutine::as_future(test_create_keyspace("test_not_explictly_disabled", false, cfg)); BOOST_REQUIRE_THROW(f.get(), exceptions::configuration_exception); // Replication strategies that do not support tablets cannot be used when tablets are explicitly enabled for (const auto& [rs_desc, rs_type] : db::replication_strategy_restriction_t::map()) { if (rs_type != locator::replication_strategy_type::network_topology) { auto f = co_await coroutine::as_future(test_create_keyspace("test_unsupported_replication_strategy", true, cfg, 0, rs_desc)); BOOST_REQUIRE_THROW(f.get(), exceptions::configuration_exception); } } } SEASTAR_TEST_CASE(test_recognition_of_deprecated_name_for_resize_transition) { using transition_state = service::topology::transition_state; BOOST_REQUIRE_EQUAL(service::transition_state_from_string("tablet split finalization"), transition_state::tablet_split_finalization); BOOST_REQUIRE_EQUAL(service::transition_state_from_string("tablet resize finalization"), transition_state::tablet_resize_finalization); return make_ready_future<>(); } SEASTAR_THREAD_TEST_CASE(test_tablets_describe_ring) { auto cfg = tablet_cql_test_config(db::tablets_mode_t::mode::enforced); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto& db = e.local_db(); auto& ss = e.get_storage_service().local(); auto& gossiper = ss.gossiper(); auto& am = gossiper.get_mutable_address_map(); size_t num_racks = 3; size_t nodes_per_rack = 10; size_t shards_per_node = 8; std::vector racks; auto min_tablet_count = 10240; auto& cfg = e.db_config(); cfg.tablets_per_shard_goal(2 * min_tablet_count / (nodes_per_rack * shards_per_node)); racks.push_back(topo.rack()); for (size_t i = 1; i < num_racks; ++i) { racks.push_back(topo.start_new_rack()); } for (size_t i = 0; i < num_racks; ++i) { for (size_t j = 0; j < nodes_per_rack; ++j) { auto id = topo.add_node(node_state::normal, shards_per_node, racks[i]); auto addr = topo.host_addresses().at(id); am.add_or_update_entry(id, addr); } } auto ks = add_keyspace(e, {{topo.dc(), num_racks}}, num_racks * nodes_per_rack); auto table = add_table(e, ks, std::map({{"min_tablet_count", std::to_string(min_tablet_count)}})).get(); auto s = db.find_schema(table); auto ring = ss.describe_ring_for_table(s->ks_name(), s->cf_name()).get(); BOOST_REQUIRE_GE(ring.size(), min_tablet_count); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_tablet_auto_repair_rf1) { cql_test_config cfg_in; cfg_in.db_config->auto_repair_enabled_default(true); cfg_in.db_config->auto_repair_threshold_default_in_seconds(1); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); unsigned shard_count = 1; auto dc1 = topo.dc(); auto rack1 = topo.rack(); [[maybe_unused]] auto host1 = topo.add_node(node_state::normal, shard_count); auto rack2 = topo.start_new_rack(); [[maybe_unused]] auto host2 = topo.add_node(node_state::normal, shard_count); auto ks_name = add_keyspace(e, {{dc1, 1}}, 1); auto table1 = add_table(e, ks_name).get(); tablet_id tablet{0}; mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); auto tid = tmap.first_tablet(); tablet = tid; tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica{host1, 0}, } }); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); bool once = false; rebalance_tablets(e, nullptr, {}, [&once] (const migration_plan& plan) { return std::exchange(once, true); }); BOOST_REQUIRE(stm.get()->tablets().get_tablet_map(table1).get_tablet_transition_info(tablet) == nullptr); }, std::move(cfg_in)).get(); } void run_tablet_manual_repair_rf1(cql_test_env& e) { topology_builder topo(e); unsigned shard_count = 1; auto dc1 = topo.dc(); auto rack1 = topo.rack(); [[maybe_unused]] auto host1 = topo.add_node(node_state::normal, shard_count); auto rack2 = topo.start_new_rack(); [[maybe_unused]] auto host2 = topo.add_node(node_state::normal, shard_count); auto ks_name = add_keyspace(e, {{dc1, 1}}, 1); auto table1 = add_table(e, ks_name).get(); tablet_id tablet{0}; mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> { tablet_map tmap(1); auto tid = tmap.first_tablet(); tablet = tid; tablet_info ti{ tablet_replica_set { tablet_replica{host1, 0}, } }; ti.repair_task_info = ti.repair_task_info.make_user_repair_request(); tmap.set_tablet(tid, std::move(ti)); tmeta.set_tablet_map(table1, std::move(tmap)); co_return; }); auto& stm = e.shared_token_metadata().local(); bool once = false; rebalance_tablets(e, nullptr, {}, [&once] (const migration_plan& plan) { return std::exchange(once, true); }); BOOST_REQUIRE(stm.get()->tablets().get_tablet_map(table1).get_tablet_transition_info(tablet)->transition == tablet_transition_kind::repair); } SEASTAR_THREAD_TEST_CASE(test_tablet_manual_repair_rf1_auto_repair_off) { cql_test_config cfg_in; cfg_in.db_config->auto_repair_enabled_default(false); do_with_cql_env_thread(run_tablet_manual_repair_rf1, std::move(cfg_in)).get(); } SEASTAR_THREAD_TEST_CASE(test_tablet_manual_repair_rf1_auto_repair_on) { cql_test_config cfg_in; cfg_in.db_config->auto_repair_enabled_default(true); do_with_cql_env_thread(run_tablet_manual_repair_rf1, std::move(cfg_in)).get(); } // Test for tablet_map::get_secondary_replica() and specifically how it // relates to get_primary_replica(). // We never officially documented given a list of replicas, which replica // is to be considered the "primary" - it's not simply the first replica in // the list but the first in some reshuffling of the list, reshuffling whose // details changed in commits like 817fdad and d88036d. So this patch doesn't // enshrine what get_primary_replica() or get_secondary_replica() should // return. It just verifies that get_secondary_replica() returns a *different* // replica than get_primary_replica() if there are 2 or more replicas, or // throws an error when there's just one replica. // Reproduces SCYLLADB-777. SEASTAR_THREAD_TEST_CASE(test_get_secondary_replica) { auto h1 = host_id(utils::UUID_gen::get_time_UUID()); auto h2 = host_id(utils::UUID_gen::get_time_UUID()); auto h3 = host_id(utils::UUID_gen::get_time_UUID()); locator::topology::config cfg = { .this_endpoint = inet_address("127.0.0.1"), .this_host_id = h1, .local_dc_rack = endpoint_dc_rack::default_location, }; auto topo = locator::topology(cfg); topo.add_or_update_endpoint(h1, endpoint_dc_rack::default_location, node::state::normal); topo.add_or_update_endpoint(h2, endpoint_dc_rack::default_location, node::state::normal); topo.add_or_update_endpoint(h3, endpoint_dc_rack::default_location, node::state::normal); // With 1 replica, get_secondary_replica should throw. { tablet_map tmap(1); auto tid = tmap.first_tablet(); tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, } }); BOOST_REQUIRE_THROW(tmap.get_secondary_replica(tid, topo), std::runtime_error); } // With 2 replicas, get_secondary_replica should return a different replica // than get_primary_replica for every tablet. { tablet_map tmap(4); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h2, 0}, } }); } for (auto tid : tmap.tablet_ids()) { auto primary = tmap.get_primary_replica(tid, topo); auto secondary = tmap.get_secondary_replica(tid, topo); BOOST_REQUIRE(primary != secondary); } } // With 3 replicas, same check. { tablet_map tmap(4); for (auto tid : tmap.tablet_ids()) { tmap.set_tablet(tid, tablet_info { tablet_replica_set { tablet_replica {h1, 0}, tablet_replica {h2, 0}, tablet_replica {h3, 0}, } }); } for (auto tid : tmap.tablet_ids()) { auto primary = tmap.get_primary_replica(tid, topo); auto secondary = tmap.get_secondary_replica(tid, topo); BOOST_REQUIRE(primary != secondary); } } topo.clear_gently().get(); } // The purpose of this test is to emulate a tablet aware restore process // When a snapshot is taken, load balancing is disabled, so we record the tablet count in a manifest for backup. // During restore, we set both min_tablet_count and max_tablet_count hints to the same value // The test makes sure that during restore the tablet count <= max_tablet_count hint which // allows us to leverage file-based streaming of SSTables, ensuring each SSTable is fully contained within a single tablet. SEASTAR_THREAD_TEST_CASE(test_tablet_count_fixed_by_table_properties) { auto cfg = tablet_cql_test_config(); cfg.db_config->tablets_per_shard_goal(16); do_with_cql_env_thread([&cfg] (auto& e) { topology_builder topo(e); auto dc = topo.dc(); topo.add_node(node_state::normal, 1); // keyspace 'initial' wants 8 tablets. We want to make sure that initial tablet count is greater than max_tablet_count hint // to ensure that the hint is respected. auto ks_name1 = add_keyspace(e, {{dc, 1}}, 8); // Step 1: Create a table e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1))", ks_name1)).get(); auto table1 = e.local_db().find_schema(ks_name1, "table1")->id(); auto& stm = e.shared_token_metadata().local(); auto get_tablet_count = [&] { auto tm = stm.get(); return tm->tablets().get_tablet_map(table1).tablet_count(); }; shared_load_stats& load_stats = topo.get_shared_load_stats(); load_stats.set_size(table1, 0); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_EQUAL(get_tablet_count(), 8); // Step 2: Drop the table e.execute_cql(fmt::format("DROP TABLE {}.table1", ks_name1)).get(); // Step 3: Create the same table with min_tablet_count=4 and max_tablet_count=4 auto force_tablet_count = 4; e.execute_cql(fmt::format("CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1)) " "WITH tablets = {{'min_tablet_count': {}, 'max_tablet_count': {}}}", ks_name1, force_tablet_count, force_tablet_count)).get(); // We need fetch the table again as the previous table was dropped and recreated table1 = e.local_db().find_schema(ks_name1, "table1")->id(); //Initially table will be empty load_stats.set_size(table1, 0); // Step 4: Make sure the tablet count is equal to force_tablet_count BOOST_REQUIRE_EQUAL(get_tablet_count(), force_tablet_count); // Step 5: Increase the load and make sure tablet count remains equal to force_tablet_count load_stats.set_size(table1, default_target_tablet_size * force_tablet_count * 128); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_EQUAL(get_tablet_count(), force_tablet_count); // this should force tablets to be merged, the max_tablet_count hint will no longer be respected. cfg.db_config->tablets_per_shard_goal(force_tablet_count / 2); rebalance_tablets(e, &load_stats); BOOST_REQUIRE_LE(get_tablet_count(), force_tablet_count); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_tablet_options_min_and_max_tablet_count) { auto cfg = tablet_cql_test_config(); cfg.db_config->tablets_per_shard_goal(16); do_with_cql_env_thread([] (auto& e) { topology_builder topo(e); auto dc = topo.dc(); topo.add_node(node_state::normal, 1); auto ks_name1 = add_keyspace(e, {{dc, 1}}, 8); // Test valid combinations { // min=64, max=128 - both powers of 2, should work BOOST_CHECK_NO_THROW(e.execute_cql(fmt::format( "CREATE TABLE {}.table1 (p1 text, r1 int, PRIMARY KEY (p1)) " "WITH tablets = {{'min_tablet_count': 64, 'max_tablet_count': 128}}", ks_name1)).get()); } { // min=100, max=200 - rounds to min=128, max=128, should work BOOST_CHECK_NO_THROW(e.execute_cql(fmt::format( "CREATE TABLE {}.table2 (p1 text, r1 int, PRIMARY KEY (p1)) " "WITH tablets = {{'min_tablet_count': 100, 'max_tablet_count': 200}}", ks_name1)).get()); } // Test invalid combinations that should throw exceptions { // min=100, max=100 - rounds to min=128, max=64, invalid BOOST_CHECK_EXCEPTION(e.execute_cql(fmt::format( "CREATE TABLE {}.table3 (p1 text, r1 int, PRIMARY KEY (p1)) " "WITH tablets = {{'min_tablet_count': 100, 'max_tablet_count': 100}}", ks_name1)).get(), exceptions::configuration_exception, [&](const exceptions::configuration_exception& e) { const auto msg = sstring(e.what()); return msg.contains("Invalid tablet count range"); }); } { // min=65, max=127 - rounds to min=128, max=64, invalid BOOST_CHECK_EXCEPTION(e.execute_cql(fmt::format( "CREATE TABLE {}.table4 (p1 text, r1 int, PRIMARY KEY (p1)) " "WITH tablets = {{'min_tablet_count': 65, 'max_tablet_count': 127}}", ks_name1)).get(), exceptions::configuration_exception, [&](const exceptions::configuration_exception& e) { const auto msg = sstring(e.what()); return msg.contains("Invalid tablet count range"); }); } { // min=129, max=128 - even without rounding, min > max is invalid BOOST_CHECK_EXCEPTION(e.execute_cql(fmt::format( "CREATE TABLE {}.table6 (p1 text, r1 int, PRIMARY KEY (p1)) " "WITH tablets = {{'min_tablet_count': 129, 'max_tablet_count': 128}}", ks_name1)).get(), exceptions::configuration_exception, [&](const exceptions::configuration_exception& e) { const auto msg = sstring(e.what()); return msg.contains("Invalid tablet count range"); }); } }, cfg).get(); } BOOST_AUTO_TEST_SUITE_END()