Files
scylladb/test/lib/topology_builder.hh
Benny Halevy 4955ca3ddd test: lib: topology_builder: generate unique rack names
Encode the dc identifier into each rack name so each dc will have its
own unique racks.

Just for easier distinction in logs.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-10-02 19:45:00 +02:00

291 lines
10 KiB
C++

/*
* Copyright (C) 2025-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "cql_test_env.hh"
#include "locator/topology.hh"
#include "gms/inet_address.hh"
#include "service/topology_mutation.hh"
#include "service/topology_state_machine.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/tablet_allocator_fwd.hh"
#include "locator/host_id.hh"
#include "locator/tablets.hh"
#include "test/lib/log.hh"
#include "version.hh"
#include <atomic>
struct shared_load_stats {
locator::load_stats stats;
locator::load_stats_ptr get() const {
return make_lw_shared(stats);
}
void set_size(table_id table, size_t size_in_bytes) {
stats.tables[table].size_in_bytes = size_in_bytes;
}
void set_split_ready_seq_number(table_id table, size_t seq_number) {
stats.tables[table].split_ready_seq_number = seq_number;
}
void set_capacity(locator::host_id host, size_t capacity) {
stats.capacity[host] = capacity;
}
};
/// Modifies topology inside a given cql_test_env.
/// Local node's membership is not affected, but it belongs to a different DC than those produced by this builder.
///
/// Creating the builder locks the topology state machine so there are no concurrent topology operations
/// and load balancing.
/// The built topology is not removed when the builder is destroyed and the state machine is left locked.
///
/// All methods expect to be run in a seastar thread.
///
/// Examples usage:
///
/// topology_builder topo(e);
/// auto host1 = topo.add_node(); // dc1 rack1
/// auto host2 = topo.add_node(); // dc1 rack1
/// topo.start_new_dc();
/// auto host3 = topo.add_node(); // dc2 rack1
/// auto host4 = topo.add_node(); // dc2 rack1
/// topo.start_new_rack();
/// auto host5 = topo.add_node(); // dc2 rack2
/// auto host6 = topo.add_node(); // dc2 rack2
///
class topology_builder {
public:
using inet_address = locator::inet_address;
using endpoint_dc_rack = locator::endpoint_dc_rack;
private:
cql_test_env& _env;
int _nr_nodes = 0;
int _dc_id;
int _rack_id;
sstring _dc;
sstring _rack;
shared_load_stats _load_stats;
std::vector<locator::host_id> _hosts;
std::unordered_map<locator::host_id, gms::inet_address> _host_addresses;
private:
inet_address make_node_address(int n) {
assert(n > 0);
int a = n % 256;
n /= 256;
int b = n % 256;
n /= 256;
assert(n < 256);
return inet_address(fmt::format("10.{}.{}.{}", n, b, a));
}
// Locks the topology to prevent concurrent topology operations and load balancing.
// Setting transition_state to "lock" blocks background load-balancing which could interfere with the test
// and prevents errors from load_topology_state() complaining about nodes in transition with no transition state.
void lock_topology() {
abort_source as;
auto& client = _env.get_raft_group0_client();
service::topology& topo = _env.get_topology_state_machine().local()._topology;
while (true) {
if (topo.tstate && *topo.tstate == service::topology::transition_state::lock) {
testlog.info("Topology is locked");
return;
}
auto guard = client.start_operation(as).get();
if (topo.tstate) {
testlog.info("Waiting for topology state machine to be idle");
release_guard(std::move(guard));
_env.get_topology_state_machine().local().await_not_busy().get();
testlog.info("Woken up");
continue;
}
service::topology_change change({service::topology_mutation_builder(guard.write_timestamp())
.set_transition_state(service::topology::transition_state::lock)
.build()});
service::group0_command g0_cmd = client.prepare_command(std::move(change), guard, "locking topology");
try {
client.add_entry(std::move(g0_cmd), std::move(guard), as).get();
} catch (service::group0_concurrent_modification&) {
testlog.info("Concurrent modification detected while locking topology, retrying");
}
}
}
public:
topology_builder(cql_test_env& e)
: _env(e)
{
start_new_dc();
lock_topology();
}
// Returns a new token from some sequence of unique tokens.
// Uniqueness is in the scope of the process, not just this object.
dht::token new_token() {
static std::atomic<int64_t> next_token = 1;
return dht::token(next_token.fetch_add(1));
}
// Returns the name of the currently built DC.
const sstring& dc() const {
return _dc;
}
// Returns location of the currently built rack.
endpoint_dc_rack rack() const {
return {_dc, _rack};
}
// Starts building a new rack in the current DC.
// Returns location of the new rack.
endpoint_dc_rack start_new_rack() {
_rack = fmt::format("rack{}{:c}", _dc_id, 'a' + _rack_id++);
return rack();
}
// Starts building a new rack in the current DC.
// Returns location of the new rack.
endpoint_dc_rack start_new_rack(sstring rack_name) {
_rack = std::move(rack_name);
return rack();
}
// Starts building a new DC.
// DC is named uniquely in the scope of the process, not just this object.
endpoint_dc_rack start_new_dc() {
static std::atomic<int> next_id = 1;
_dc_id = next_id.fetch_add(1);
_dc = fmt::format("dc{}", _dc_id);
_rack_id = 0;
return start_new_rack();
}
// Starts building a new DC.
endpoint_dc_rack start_new_dc(endpoint_dc_rack dc_and_rack) {
_dc = dc_and_rack.dc;
_rack = dc_and_rack.rack;
return rack();
}
locator::load_stats_ptr get_load_stats() const {
return _load_stats.get();
}
shared_load_stats& get_shared_load_stats() {
return _load_stats;
}
/// Returns total cluster's storage capacity in bytes.
uint64_t get_capacity() const {
uint64_t cap = 0;
auto stats = get_load_stats();
for (auto h : _hosts) {
cap += stats->capacity.at(h);
}
return cap;
}
locator::host_id add_node(service::node_state state = service::node_state::normal,
unsigned shard_count = 1,
std::optional<endpoint_dc_rack> rack_override = {})
{
++_nr_nodes;
auto ip = make_node_address(_nr_nodes);
auto id = locator::host_id(utils::UUID_gen::get_time_UUID());
auto dc_rack = rack_override.value_or(rack());
dht::token token = new_token();
std::unordered_set<dht::token> tokens({token});
abort_source as;
auto& client = _env.get_raft_group0_client();
_load_stats.set_capacity(id, service::default_target_tablet_size * shard_count);
while (true) {
auto guard = client.start_operation(as).get();
service::topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft::server_id(id.uuid()))
.set("datacenter", dc_rack.dc)
.set("rack", dc_rack.rack)
.set("node_state", state)
.set("shard_count", (uint32_t) shard_count)
.set("cleanup_status", service::cleanup_status::clean)
.set("release_version", version::release())
.set("num_tokens", (uint32_t) 1)
.set("tokens_string", fmt::format("{}", token))
.set("tokens", tokens)
.set("supported_features", std::set<sstring>())
.set("request_id", utils::UUID())
.set("ignore_msb", (uint32_t) 0);
service::topology_change change({builder.build()});
service::group0_command g0_cmd = client.prepare_command(std::move(change), guard,
format("adding node {} to topology", id));
testlog.info("Adding node {}/{} dc={} rack={} to topology", id, ip, dc_rack.dc, dc_rack.rack);
try {
client.add_entry(std::move(g0_cmd), std::move(guard), as).get();
break;
} catch (service::group0_concurrent_modification&) {
testlog.warn("Concurrent modification detected, retrying");
}
}
_hosts.push_back(id);
_host_addresses.emplace(id, ip);
return id;
}
locator::host_id add_i4i_2xlarge(endpoint_dc_rack rack) {
auto h = add_node(service::node_state::normal, 7, rack);
get_shared_load_stats().set_capacity(h, 1'875'000'000'000);
return h;
}
locator::host_id add_i4i_large(endpoint_dc_rack rack) {
auto h = add_node(service::node_state::normal, 2, rack);
get_shared_load_stats().set_capacity(h, 468'000'000'000);
return h;
}
void set_node_state(locator::host_id id, service::node_state state) {
abort_source as;
auto& client = _env.get_raft_group0_client();
while (true) {
auto guard = client.start_operation(as).get();
service::topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft::server_id(id.uuid()))
.set("node_state", state);
service::topology_change change({builder.build()});
service::group0_command g0_cmd = client.prepare_command(std::move(change), guard,
format("node {} state={}", id, state));
testlog.info("Changing node {} state={}", id, state);
try {
client.add_entry(std::move(g0_cmd), std::move(guard), as).get();
break;
} catch (service::group0_concurrent_modification&) {
testlog.warn("Concurrent modification detected, retrying");
}
}
}
const std::vector<locator::host_id>& hosts() const {
return _hosts;
}
const std::unordered_map<locator::host_id, gms::inet_address>& host_addresses() const {
return _host_addresses;
}
};