/* * Copyright (C) 2025-present ScyllaDB * */ /* * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) */ #pragma once #include "cql_test_env.hh" #include "locator/topology.hh" #include "gms/inet_address.hh" #include "service/topology_mutation.hh" #include "service/topology_state_machine.hh" #include "service/raft/raft_group0_client.hh" #include "service/tablet_allocator_fwd.hh" #include "locator/host_id.hh" #include "locator/tablets.hh" #include "test/lib/log.hh" #include "version.hh" #include struct shared_load_stats { locator::load_stats stats; locator::load_stats_ptr get() const { return make_lw_shared(stats); } void set_size(table_id table, size_t size_in_bytes) { stats.tables[table].size_in_bytes = size_in_bytes; } void set_split_ready_seq_number(table_id table, size_t seq_number) { stats.tables[table].split_ready_seq_number = seq_number; } void set_capacity(locator::host_id host, size_t capacity) { stats.capacity[host] = capacity; } }; /// Modifies topology inside a given cql_test_env. /// Local node's membership is not affected, but it belongs to a different DC than those produced by this builder. /// /// Creating the builder locks the topology state machine so there are no concurrent topology operations /// and load balancing. /// The built topology is not removed when the builder is destroyed and the state machine is left locked. /// /// All methods expect to be run in a seastar thread. /// /// Examples usage: /// /// topology_builder topo(e); /// auto host1 = topo.add_node(); // dc1 rack1 /// auto host2 = topo.add_node(); // dc1 rack1 /// topo.start_new_dc(); /// auto host3 = topo.add_node(); // dc2 rack1 /// auto host4 = topo.add_node(); // dc2 rack1 /// topo.start_new_rack(); /// auto host5 = topo.add_node(); // dc2 rack2 /// auto host6 = topo.add_node(); // dc2 rack2 /// class topology_builder { public: using inet_address = locator::inet_address; using endpoint_dc_rack = locator::endpoint_dc_rack; private: cql_test_env& _env; int _nr_nodes = 0; int _dc_id; int _rack_id; sstring _dc; sstring _rack; shared_load_stats _load_stats; std::vector _hosts; std::unordered_map _host_addresses; private: inet_address make_node_address(int n) { assert(n > 0); int a = n % 256; n /= 256; int b = n % 256; n /= 256; assert(n < 256); return inet_address(fmt::format("10.{}.{}.{}", n, b, a)); } // Locks the topology to prevent concurrent topology operations and load balancing. // Setting transition_state to "lock" blocks background load-balancing which could interfere with the test // and prevents errors from load_topology_state() complaining about nodes in transition with no transition state. void lock_topology() { abort_source as; auto& client = _env.get_raft_group0_client(); service::topology& topo = _env.get_topology_state_machine().local()._topology; while (true) { if (topo.tstate && *topo.tstate == service::topology::transition_state::lock) { testlog.info("Topology is locked"); return; } auto guard = client.start_operation(as).get(); if (topo.tstate) { testlog.info("Waiting for topology state machine to be idle"); release_guard(std::move(guard)); _env.get_topology_state_machine().local().await_not_busy().get(); testlog.info("Woken up"); continue; } service::topology_change change({service::topology_mutation_builder(guard.write_timestamp()) .set_transition_state(service::topology::transition_state::lock) .build()}); service::group0_command g0_cmd = client.prepare_command(std::move(change), guard, "locking topology"); try { client.add_entry(std::move(g0_cmd), std::move(guard), as).get(); } catch (service::group0_concurrent_modification&) { testlog.info("Concurrent modification detected while locking topology, retrying"); } } } public: topology_builder(cql_test_env& e) : _env(e) { start_new_dc(); lock_topology(); } // Returns a new token from some sequence of unique tokens. // Uniqueness is in the scope of the process, not just this object. dht::token new_token() { static std::atomic next_token = 1; return dht::token(next_token.fetch_add(1)); } // Returns the name of the currently built DC. const sstring& dc() const { return _dc; } // Returns location of the currently built rack. endpoint_dc_rack rack() const { return {_dc, _rack}; } // Starts building a new rack in the current DC. // Returns location of the new rack. endpoint_dc_rack start_new_rack() { _rack = fmt::format("rack{}{:c}", _dc_id, 'a' + _rack_id++); return rack(); } // Starts building a new rack in the current DC. // Returns location of the new rack. endpoint_dc_rack start_new_rack(sstring rack_name) { _rack = std::move(rack_name); return rack(); } // Starts building a new DC. // DC is named uniquely in the scope of the process, not just this object. endpoint_dc_rack start_new_dc() { static std::atomic next_id = 1; _dc_id = next_id.fetch_add(1); _dc = fmt::format("dc{}", _dc_id); _rack_id = 0; return start_new_rack(); } // Starts building a new DC. endpoint_dc_rack start_new_dc(endpoint_dc_rack dc_and_rack) { _dc = dc_and_rack.dc; _rack = dc_and_rack.rack; return rack(); } locator::load_stats_ptr get_load_stats() const { return _load_stats.get(); } shared_load_stats& get_shared_load_stats() { return _load_stats; } /// Returns total cluster's storage capacity in bytes. uint64_t get_capacity() const { uint64_t cap = 0; auto stats = get_load_stats(); for (auto h : _hosts) { cap += stats->capacity.at(h); } return cap; } locator::host_id add_node(service::node_state state = service::node_state::normal, unsigned shard_count = 1, std::optional rack_override = {}) { ++_nr_nodes; auto ip = make_node_address(_nr_nodes); auto id = locator::host_id(utils::UUID_gen::get_time_UUID()); auto dc_rack = rack_override.value_or(rack()); dht::token token = new_token(); std::unordered_set tokens({token}); abort_source as; auto& client = _env.get_raft_group0_client(); _load_stats.set_capacity(id, service::default_target_tablet_size * shard_count); while (true) { auto guard = client.start_operation(as).get(); service::topology_mutation_builder builder(guard.write_timestamp()); builder.with_node(raft::server_id(id.uuid())) .set("datacenter", dc_rack.dc) .set("rack", dc_rack.rack) .set("node_state", state) .set("shard_count", (uint32_t) shard_count) .set("cleanup_status", service::cleanup_status::clean) .set("release_version", version::release()) .set("num_tokens", (uint32_t) 1) .set("tokens_string", fmt::format("{}", token)) .set("tokens", tokens) .set("supported_features", std::set()) .set("request_id", utils::UUID()) .set("ignore_msb", (uint32_t) 0); service::topology_change change({builder.build()}); service::group0_command g0_cmd = client.prepare_command(std::move(change), guard, format("adding node {} to topology", id)); testlog.info("Adding node {}/{} dc={} rack={} to topology", id, ip, dc_rack.dc, dc_rack.rack); try { client.add_entry(std::move(g0_cmd), std::move(guard), as).get(); break; } catch (service::group0_concurrent_modification&) { testlog.warn("Concurrent modification detected, retrying"); } } _hosts.push_back(id); _host_addresses.emplace(id, ip); return id; } locator::host_id add_i4i_2xlarge(endpoint_dc_rack rack) { auto h = add_node(service::node_state::normal, 7, rack); get_shared_load_stats().set_capacity(h, 1'875'000'000'000); return h; } locator::host_id add_i4i_large(endpoint_dc_rack rack) { auto h = add_node(service::node_state::normal, 2, rack); get_shared_load_stats().set_capacity(h, 468'000'000'000); return h; } void set_node_state(locator::host_id id, service::node_state state) { abort_source as; auto& client = _env.get_raft_group0_client(); while (true) { auto guard = client.start_operation(as).get(); service::topology_mutation_builder builder(guard.write_timestamp()); builder.with_node(raft::server_id(id.uuid())) .set("node_state", state); service::topology_change change({builder.build()}); service::group0_command g0_cmd = client.prepare_command(std::move(change), guard, format("node {} state={}", id, state)); testlog.info("Changing node {} state={}", id, state); try { client.add_entry(std::move(g0_cmd), std::move(guard), as).get(); break; } catch (service::group0_concurrent_modification&) { testlog.warn("Concurrent modification detected, retrying"); } } } void pause_rf_change_request(utils::UUID new_elem) { abort_source as; auto& client = _env.get_raft_group0_client(); while (true) { auto guard = client.start_operation(as).get(); service::topology_mutation_builder builder(guard.write_timestamp()); builder.pause_rf_change_request(new_elem); service::topology_change change({builder.build()}); service::group0_command g0_cmd = client.prepare_command(std::move(change), guard, "setting ongoing RF change data"); try { client.add_entry(std::move(g0_cmd), std::move(guard), as).get(); break; } catch (service::group0_concurrent_modification&) { testlog.warn("Concurrent modification detected, retrying"); } } } void resume_rf_change_request(const std::unordered_set& current_queue, utils::UUID elem_to_remove) { abort_source as; auto& client = _env.get_raft_group0_client(); while (true) { auto guard = client.start_operation(as).get(); service::topology_mutation_builder builder(guard.write_timestamp()); builder.resume_rf_change_request(current_queue, elem_to_remove); service::topology_change change({builder.build()}); service::group0_command g0_cmd = client.prepare_command(std::move(change), guard, "setting ongoing RF change data"); try { client.add_entry(std::move(g0_cmd), std::move(guard), as).get(); break; } catch (service::group0_concurrent_modification&) { testlog.warn("Concurrent modification detected, retrying"); } } } const std::vector& hosts() const { return _hosts; } const std::unordered_map& host_addresses() const { return _host_addresses; } };