mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-24 02:20:37 +00:00
Auto-exands numeric RF in CREATE/ALTER KEYSPACE statements for new DCs specified in the statement. Doesn't auto-expand existing options, as the rack choice may not be in line with current replica placement. This requires co-locating tablet replicas, and tracking of co-location state, which is not implemented yet. Signed-off-by: Tomasz Grabiec <tgrabiec@scylladb.com>
1747 lines
72 KiB
C++
1747 lines
72 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <boost/test/unit_test.hpp>
|
|
#include <fmt/ranges.h>
|
|
#include "db/tablet_options.hh"
|
|
#include "gms/inet_address.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "inet_address_vectors.hh"
|
|
#include "locator/abstract_replication_strategy.hh"
|
|
#include "locator/host_id.hh"
|
|
#include "locator/types.hh"
|
|
#include "locator/snitch_base.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/UUID_gen.hh"
|
|
#include "utils/sequenced_set.hh"
|
|
#include "utils/to_string.hh"
|
|
#include "locator/network_topology_strategy.hh"
|
|
#undef SEASTAR_TESTING_MAIN
|
|
#include <seastar/testing/test_case.hh>
|
|
#include <seastar/testing/thread_test_case.hh>
|
|
#include <seastar/core/sstring.hh>
|
|
#include "utils/log.hh"
|
|
#include "gms/gossiper.hh"
|
|
#include "schema/schema_builder.hh"
|
|
#include <ranges>
|
|
#include <vector>
|
|
#include <string>
|
|
#include <map>
|
|
#include <set>
|
|
#include <iostream>
|
|
#include <sstream>
|
|
#include <compare>
|
|
#include <ranges>
|
|
#include <boost/algorithm/cxx11/iota.hpp>
|
|
#include "test/lib/log.hh"
|
|
#include "test/lib/cql_test_env.hh"
|
|
#include "test/lib/key_utils.hh"
|
|
#include "test/lib/random_utils.hh"
|
|
#include "test/lib/test_utils.hh"
|
|
#include "test/lib/topology_builder.hh"
|
|
#include <seastar/core/coroutine.hh>
|
|
#include "db/schema_tables.hh"
|
|
#include "db/config.hh"
|
|
|
|
using namespace locator;
|
|
|
|
struct ring_point {
|
|
double point;
|
|
inet_address host;
|
|
host_id id = host_id::create_random_id();
|
|
};
|
|
|
|
void print_natural_endpoints(double point, const host_id_vector_replica_set v) {
|
|
testlog.debug("Natural endpoints for a token {}:", point);
|
|
std::string str;
|
|
std::ostringstream strm(str);
|
|
|
|
for (auto& addr : v) {
|
|
fmt::print(strm, "{} ", addr);
|
|
}
|
|
|
|
testlog.debug("{}", strm.str());
|
|
}
|
|
|
|
static void verify_sorted(const dht::token_range_vector& trv) {
|
|
auto not_strictly_before = [] (const dht::token_range a, const dht::token_range b) {
|
|
return !b.start()
|
|
|| !a.end()
|
|
|| a.end()->value() > b.start()->value()
|
|
|| (a.end()->value() == b.start()->value() && a.end()->is_inclusive() && b.start()->is_inclusive());
|
|
};
|
|
BOOST_CHECK(std::ranges::adjacent_find(trv, not_strictly_before) == trv.end());
|
|
}
|
|
|
|
static future<> check_ranges_are_sorted(static_effective_replication_map_ptr ermp, locator::host_id ep) {
|
|
auto* erm = ermp->maybe_as_vnode_effective_replication_map();
|
|
verify_sorted(co_await erm->get_ranges(ep));
|
|
verify_sorted(co_await erm->get_primary_ranges(ep));
|
|
verify_sorted(co_await erm->get_primary_ranges_within_dc(ep));
|
|
}
|
|
|
|
void strategy_sanity_check(
|
|
replication_strategy_ptr ars_ptr,
|
|
const token_metadata_ptr& tm,
|
|
const replication_strategy_config_options& options) {
|
|
|
|
const network_topology_strategy* nts_ptr =
|
|
dynamic_cast<const network_topology_strategy*>(ars_ptr.get());
|
|
|
|
//
|
|
// Check that both total and per-DC RFs in options match the corresponding
|
|
// value in the strategy object.
|
|
//
|
|
size_t total_rf = 0;
|
|
for (auto& val : options) {
|
|
auto rf = locator::get_replication_factor(val.second);
|
|
BOOST_CHECK(nts_ptr->get_replication_factor(val.first) == rf);
|
|
|
|
total_rf += rf;
|
|
}
|
|
|
|
BOOST_CHECK(ars_ptr->get_replication_factor(*tm) == total_rf);
|
|
}
|
|
|
|
void endpoints_check(
|
|
replication_strategy_ptr ars_ptr,
|
|
const token_metadata_ptr& tm,
|
|
const host_id_vector_replica_set& endpoints,
|
|
const locator::topology& topo,
|
|
bool strict_dc_rf = false) {
|
|
|
|
auto&& nodes_per_dc = tm->get_datacenter_token_owners();
|
|
const network_topology_strategy* nts_ptr =
|
|
dynamic_cast<const network_topology_strategy*>(ars_ptr.get());
|
|
|
|
size_t total_rf = 0;
|
|
for (auto&& [dc, nodes] : nodes_per_dc) {
|
|
auto effective_rf = std::min<size_t>(nts_ptr->get_replication_factor(dc), nodes.size());
|
|
total_rf += effective_rf;
|
|
}
|
|
|
|
// Check the total RF
|
|
BOOST_CHECK_EQUAL(endpoints.size(), total_rf);
|
|
BOOST_CHECK_LE(total_rf, ars_ptr->get_replication_factor(*tm));
|
|
|
|
// Check the uniqueness
|
|
std::unordered_set<locator::host_id> ep_set(endpoints.begin(), endpoints.end());
|
|
BOOST_CHECK_EQUAL(endpoints.size(), ep_set.size());
|
|
|
|
// Check the per-DC RF
|
|
std::unordered_map<sstring, std::unordered_set<locator::host_id>> replicas_per_dc;
|
|
for (auto ep : endpoints) {
|
|
const auto* node = topo.find_node(ep);
|
|
auto dc = node->dc_rack().dc;
|
|
|
|
auto inserted = replicas_per_dc[dc].insert(node->host_id()).second;
|
|
// replicas might never be placed on the same node
|
|
BOOST_REQUIRE(inserted);
|
|
}
|
|
|
|
for (auto&& [dc, nodes] : replicas_per_dc) {
|
|
auto effective_rf = strict_dc_rf ? nts_ptr->get_replication_factor(dc) :
|
|
std::min<size_t>(nts_ptr->get_replication_factor(dc), nodes_per_dc.at(dc).size());
|
|
BOOST_CHECK_EQUAL(nodes.size(), effective_rf);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check the get_natural_endpoints() output for tokens between every two
|
|
* adjacent ring points.
|
|
* @param ring_points ring description
|
|
* @param options strategy options
|
|
* @param ars_ptr strategy object
|
|
*
|
|
* Run in a seastar thread.
|
|
*/
|
|
void full_ring_check(const std::vector<ring_point>& ring_points,
|
|
replication_strategy_config_options& options,
|
|
replication_strategy_ptr ars_ptr,
|
|
locator::token_metadata_ptr tmptr) {
|
|
auto& tm = *tmptr;
|
|
const auto& topo = tm.get_topology();
|
|
strategy_sanity_check(ars_ptr, tmptr, options);
|
|
|
|
auto erm = calculate_vnode_effective_replication_map(ars_ptr, tmptr).get();
|
|
|
|
for (auto& rp : ring_points) {
|
|
double cur_point1 = rp.point - 0.5;
|
|
token t1(tests::d2t(cur_point1 / ring_points.size()));
|
|
auto endpoints1 = erm->get_natural_replicas(t1);
|
|
|
|
endpoints_check(ars_ptr, tmptr, endpoints1, topo);
|
|
|
|
print_natural_endpoints(cur_point1, endpoints1);
|
|
|
|
//
|
|
// Check a different endpoint in the same range as t1 and validate that
|
|
// the endpoints has been taken from the cache and that the output is
|
|
// identical to the one not taken from the cache.
|
|
//
|
|
double cur_point2 = rp.point - 0.2;
|
|
token t2(tests::d2t(cur_point2 / ring_points.size()));
|
|
auto endpoints2 = erm->get_natural_replicas(t2);
|
|
|
|
endpoints_check(ars_ptr, tmptr, endpoints2, topo);
|
|
check_ranges_are_sorted(erm, rp.id).get();
|
|
BOOST_CHECK(endpoints1 == endpoints2);
|
|
}
|
|
}
|
|
|
|
void full_ring_check(const tablet_map& tmap,
|
|
replication_strategy_ptr rs_ptr,
|
|
locator::token_metadata_ptr tmptr) {
|
|
auto& tm = *tmptr;
|
|
const auto& topo = tm.get_topology();
|
|
|
|
auto to_replica_set = [&] (const tablet_replica_set& replicas) {
|
|
host_id_vector_replica_set result;
|
|
result.reserve(replicas.size());
|
|
for (auto&& replica : replicas) {
|
|
result.emplace_back(replica.host);
|
|
}
|
|
return result;
|
|
};
|
|
|
|
for (tablet_id tb : tmap.tablet_ids()) {
|
|
endpoints_check(rs_ptr, tmptr, to_replica_set(tmap.get_tablet_info(tb).replicas), topo, true);
|
|
}
|
|
}
|
|
|
|
void check_tablets_balance(const tablet_map& tmap,
|
|
const network_topology_strategy* nts_ptr,
|
|
const locator::topology& topo) {
|
|
std::unordered_map<sstring, std::unordered_map<sstring, std::unordered_map<host_id, std::unordered_map<unsigned, uint64_t>>>> load_map;
|
|
for (tablet_id tb : tmap.tablet_ids()) {
|
|
for (const auto& r : tmap.get_tablet_info(tb).replicas) {
|
|
const auto& node = topo.get_node(r.host);
|
|
load_map[node.dc_rack().dc][node.dc_rack().rack][r.host][r.shard]++;
|
|
}
|
|
}
|
|
testlog.debug("load_map={}", load_map);
|
|
|
|
for (const auto& [dc, dc_racks] : load_map) {
|
|
std::unordered_map<sstring, double> avg_replicas_per_shard;
|
|
|
|
for (const auto& [rack, rack_nodes] : dc_racks) {
|
|
size_t num_shards = 0;
|
|
for (const auto& [host, shards] : rack_nodes) {
|
|
num_shards += shards.size();
|
|
for (const auto& [shard, n] : shards) {
|
|
avg_replicas_per_shard[rack] += n;
|
|
}
|
|
}
|
|
testlog.debug("dc={} rack={}: replicas={} num_shards={} avg_replicas_per_shard={}", dc, rack, avg_replicas_per_shard[rack], num_shards, avg_replicas_per_shard[rack] / num_shards);
|
|
avg_replicas_per_shard[rack] /= num_shards;
|
|
}
|
|
|
|
for (const auto& [rack, rack_nodes] : dc_racks) {
|
|
for (const auto& [host, shards] : rack_nodes) {
|
|
for (const auto& [shard, n] : shards) {
|
|
BOOST_REQUIRE_GE(n, floor(avg_replicas_per_shard[rack] - 1));
|
|
BOOST_REQUIRE_LE(n, ceil(avg_replicas_per_shard[rack] + 1));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
locator::endpoint_dc_rack make_endpoint_dc_rack(gms::inet_address endpoint) {
|
|
// This resembles rack_inferring_snitch dc/rack generation which is
|
|
// still in use by this test via token_metadata internals
|
|
auto dc = std::to_string(uint8_t(endpoint.bytes()[1]));
|
|
auto rack = std::to_string(uint8_t(endpoint.bytes()[2]));
|
|
return locator::endpoint_dc_rack{dc, rack};
|
|
}
|
|
|
|
// Run in a seastar thread.
|
|
void simple_test() {
|
|
auto my_address = gms::inet_address("localhost");
|
|
|
|
// Create the RackInferringSnitch
|
|
snitch_config cfg;
|
|
cfg.name = "RackInferringSnitch";
|
|
cfg.listen_address = my_address;
|
|
cfg.broadcast_address = my_address;
|
|
sharded<snitch_ptr> snitch;
|
|
snitch.start(cfg).get();
|
|
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
|
|
snitch.invoke_on_all(&snitch_ptr::start).get();
|
|
|
|
locator::token_metadata::config tm_cfg;
|
|
tm_cfg.topo_cfg.this_endpoint = my_address;
|
|
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
|
|
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
|
auto stop_stm = deferred_stop(stm);
|
|
|
|
std::vector<ring_point> ring_points = {
|
|
{ 1.0, inet_address("192.100.10.1") },
|
|
{ 2.0, inet_address("192.101.10.1") },
|
|
{ 3.0, inet_address("192.102.10.1") },
|
|
{ 4.0, inet_address("192.100.20.1") },
|
|
{ 5.0, inet_address("192.101.20.1") },
|
|
{ 6.0, inet_address("192.102.20.1") },
|
|
{ 7.0, inet_address("192.100.30.1") },
|
|
{ 8.0, inet_address("192.101.30.1") },
|
|
{ 9.0, inet_address("192.102.30.1") },
|
|
{ 10.0, inet_address("192.102.40.1") },
|
|
{ 11.0, inet_address("192.102.40.2") }
|
|
};
|
|
|
|
// Initialize the token_metadata
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
auto& topo = tm.get_topology();
|
|
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
|
std::unordered_set<token> tokens;
|
|
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
|
|
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
|
|
co_await tm.update_normal_tokens(std::move(tokens), id);
|
|
}
|
|
}).get();
|
|
|
|
/////////////////////////////////////
|
|
// Create the replication strategy
|
|
std::map<sstring, replication_strategy_config_option> options323 = {
|
|
{"100", "3"},
|
|
{"101", "2"},
|
|
{"102", "3"}
|
|
};
|
|
locator::replication_strategy_params params323(options323, std::nullopt, std::nullopt);
|
|
|
|
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
|
"NetworkTopologyStrategy", params323, stm.get()->get_topology());
|
|
|
|
full_ring_check(ring_points, options323, ars_ptr, stm.get());
|
|
|
|
///////////////
|
|
// Create the replication strategy
|
|
std::map<sstring, replication_strategy_config_option> options320 = {
|
|
{"100", "3"},
|
|
{"101", "2"},
|
|
{"102", "0"}
|
|
};
|
|
locator::replication_strategy_params params320(options320, std::nullopt, std::nullopt);
|
|
|
|
ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
|
"NetworkTopologyStrategy", params320, stm.get()->get_topology());
|
|
|
|
full_ring_check(ring_points, options320, ars_ptr, stm.get());
|
|
|
|
//
|
|
// Check cache invalidation: invalidate the cache and run a full ring
|
|
// check once again. If cache is not properly invalidated one of the
|
|
// points will be taken from the cache when it shouldn't and the
|
|
// corresponding check will fail.
|
|
//
|
|
stm.mutate_token_metadata([] (token_metadata& tm) {
|
|
tm.invalidate_cached_rings();
|
|
return make_ready_future<>();
|
|
}).get();
|
|
full_ring_check(ring_points, options320, ars_ptr, stm.get());
|
|
}
|
|
|
|
// Run in a seastar thread.
|
|
void heavy_origin_test() {
|
|
auto my_address = gms::inet_address("localhost");
|
|
|
|
// Create the RackInferringSnitch
|
|
snitch_config cfg;
|
|
cfg.name = "RackInferringSnitch";
|
|
cfg.listen_address = my_address;
|
|
cfg.broadcast_address = my_address;
|
|
sharded<snitch_ptr> snitch;
|
|
snitch.start(cfg).get();
|
|
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
|
|
snitch.invoke_on_all(&snitch_ptr::start).get();
|
|
|
|
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); },
|
|
locator::token_metadata::config{locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }});
|
|
auto stop_stm = deferred_stop(stm);
|
|
|
|
std::vector<int> dc_racks = {2, 4, 8};
|
|
std::vector<int> dc_endpoints = {128, 256, 512};
|
|
std::vector<int> dc_replication = {2, 6, 6};
|
|
|
|
replication_strategy_config_options config_options;
|
|
std::unordered_map<inet_address, std::unordered_set<token>> tokens;
|
|
std::vector<ring_point> ring_points;
|
|
|
|
size_t total_eps = 0;
|
|
for (size_t dc = 0; dc < dc_racks.size(); ++dc) {
|
|
for (int rack = 0; rack < dc_racks[dc]; ++rack) {
|
|
total_eps += dc_endpoints[dc]/dc_racks[dc];
|
|
}
|
|
}
|
|
|
|
auto& random_engine = seastar::testing::local_random_engine;
|
|
std::vector<double> token_points(total_eps, 0.0);
|
|
boost::algorithm::iota(token_points, 1.0);
|
|
std::shuffle(token_points.begin(), token_points.end(), random_engine);
|
|
auto token_point_iterator = token_points.begin();
|
|
for (size_t dc = 0; dc < dc_racks.size(); ++dc) {
|
|
config_options.emplace(to_sstring(dc),
|
|
to_sstring(dc_replication[dc]));
|
|
for (int rack = 0; rack < dc_racks[dc]; ++rack) {
|
|
for (int ep = 1; ep <= dc_endpoints[dc]/dc_racks[dc]; ++ep) {
|
|
double token_point = *token_point_iterator++;
|
|
// 10.dc.rack.ep
|
|
int32_t ip = 0x0a000000 + ((int8_t)dc << 16) +
|
|
((int8_t)rack << 8) + (int8_t)ep;
|
|
inet_address address(ip);
|
|
ring_point rp = {token_point, address};
|
|
|
|
ring_points.emplace_back(rp);
|
|
tokens[address].emplace(token{tests::d2t(token_point / total_eps)});
|
|
|
|
testlog.debug("adding node {} at {}", address, token_point);
|
|
|
|
token_point++;
|
|
}
|
|
}
|
|
}
|
|
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
auto& topo = tm.get_topology();
|
|
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
|
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
|
|
co_await tm.update_normal_tokens(tokens[endpoint], id);
|
|
}
|
|
}).get();
|
|
|
|
locator::replication_strategy_params params(config_options, std::nullopt, std::nullopt);
|
|
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
|
"NetworkTopologyStrategy", params, stm.get()->get_topology());
|
|
|
|
full_ring_check(ring_points, config_options, ars_ptr, stm.get());
|
|
}
|
|
|
|
BOOST_AUTO_TEST_SUITE(network_topology_strategy_test)
|
|
|
|
SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_simple) {
|
|
return simple_test();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_heavy) {
|
|
return heavy_origin_test();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
|
|
auto my_address = gms::inet_address("localhost");
|
|
|
|
// Create the RackInferringSnitch
|
|
snitch_config cfg;
|
|
cfg.listen_address = my_address;
|
|
cfg.broadcast_address = my_address;
|
|
cfg.name = "RackInferringSnitch";
|
|
sharded<snitch_ptr> snitch;
|
|
snitch.start(cfg).get();
|
|
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
|
|
snitch.invoke_on_all(&snitch_ptr::start).get();
|
|
|
|
locator::token_metadata::config tm_cfg;
|
|
tm_cfg.topo_cfg.this_endpoint = my_address;
|
|
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
|
|
|
|
// Generate a random cluster
|
|
auto& random_engine = seastar::testing::local_random_engine;
|
|
for (unsigned shard_count = 1; shard_count <= 4; ++shard_count) {
|
|
std::map<sstring, size_t> node_count_per_dc;
|
|
std::map<sstring, std::map<sstring, size_t>> node_count_per_rack;
|
|
std::vector<ring_point> ring_points;
|
|
|
|
double point = 1;
|
|
size_t num_dcs = 1 + tests::random::get_int(5);
|
|
for (size_t dc = 0; dc < num_dcs; ++dc) {
|
|
sstring dc_name = fmt::format("{}", 100 + dc);
|
|
size_t num_racks = 1 + tests::random::get_int(5);
|
|
for (size_t rack = 0; rack < num_racks; ++rack) {
|
|
sstring rack_name = fmt::format("{}", 10 + rack);
|
|
size_t rack_nodes = 1 + tests::random::get_int(2);
|
|
for (size_t i = 1; i <= rack_nodes; ++i) {
|
|
ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i)));
|
|
node_count_per_dc[dc_name]++;
|
|
node_count_per_rack[dc_name][rack_name]++;
|
|
point++;
|
|
}
|
|
}
|
|
}
|
|
|
|
testlog.debug("node_count_per_rack={}", node_count_per_rack);
|
|
|
|
// Initialize the token_metadata
|
|
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
|
auto stop_stm = deferred_stop(stm);
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
auto& topo = tm.get_topology();
|
|
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
|
std::unordered_set<token> tokens;
|
|
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
|
|
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count);
|
|
co_await tm.update_normal_tokens(std::move(tokens), id);
|
|
}
|
|
}).get();
|
|
|
|
auto tmptr = stm.get();
|
|
const auto& topo = tmptr->get_topology();
|
|
|
|
auto s = schema_builder("ks", "tb")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_column("v", utf8_type)
|
|
.build();
|
|
|
|
auto make_random_options = [&] () {
|
|
auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to<std::vector>();
|
|
std::map<sstring, replication_strategy_config_option> options;
|
|
std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine);
|
|
size_t num_option_dcs = 1 + tests::random::get_int(option_dcs.size() - 1);
|
|
for (size_t i = 0; i < num_option_dcs; ++i) {
|
|
const auto& dc = option_dcs[i];
|
|
size_t node_count = tests::random::get_int(node_count_per_dc[dc]);
|
|
options.emplace(dc, fmt::to_string(node_count));
|
|
}
|
|
return options;
|
|
};
|
|
|
|
// Create the replication strategy
|
|
auto options = make_random_options();
|
|
size_t tablet_count = 1 + tests::random::get_int(99);
|
|
testlog.debug("tablet_count={} rf_options={}", tablet_count, options);
|
|
locator::replication_strategy_params params(options, tablet_count, std::nullopt);
|
|
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
|
"NetworkTopologyStrategy", params, topo);
|
|
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
|
|
BOOST_REQUIRE(tab_awr_ptr);
|
|
auto tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get(), tablet_count).get();
|
|
full_ring_check(tmap, ars_ptr, stm.get());
|
|
|
|
// Test reallocate_tablets after randomizing a different set of options
|
|
auto realloc_options = make_random_options();
|
|
locator::replication_strategy_params realloc_params(realloc_options, tablet_count, std::nullopt);
|
|
auto realloc_ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
|
"NetworkTopologyStrategy", params, topo);
|
|
auto realloc_tab_awr_ptr = realloc_ars_ptr->maybe_as_tablet_aware();
|
|
BOOST_REQUIRE(realloc_tab_awr_ptr);
|
|
auto realloc_tmap = tab_awr_ptr->reallocate_tablets(s, stm.get(), std::move(tmap)).get();
|
|
full_ring_check(realloc_tmap, realloc_ars_ptr, stm.get());
|
|
}
|
|
}
|
|
|
|
// Called in a seastar thread
|
|
static void test_random_balancing(sharded<snitch_ptr>& snitch, gms::inet_address my_address) {
|
|
auto seed = tests::random::get_int<int64_t>();
|
|
testlog.info("test_random_balancing: seed={}", seed);
|
|
std::default_random_engine rand(seed);
|
|
|
|
locator::token_metadata::config tm_cfg;
|
|
tm_cfg.topo_cfg.this_endpoint = my_address;
|
|
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
|
|
|
|
// Generate a random cluster
|
|
auto shard_count = tests::random::get_int(1, 5, rand);
|
|
std::vector<ring_point> ring_points;
|
|
std::vector<sstring> dcs;
|
|
|
|
double point = 1;
|
|
size_t num_dcs = 1;
|
|
size_t num_racks = tests::random::get_int(1, 5, rand);
|
|
size_t nodes_per_rack = tests::random::get_int(1, 5, rand);
|
|
size_t nodes_per_dc = num_racks * nodes_per_rack;
|
|
for (size_t dc = 0; dc < num_dcs; ++dc) {
|
|
sstring dc_name = fmt::format("{}", 100 + dc);
|
|
dcs.emplace_back(dc_name);
|
|
for (size_t rack = 0; rack < num_racks; ++rack) {
|
|
sstring rack_name = fmt::format("{}", 10 + rack);
|
|
for (size_t i = 1; i <= nodes_per_rack; ++i) {
|
|
ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i)));
|
|
point++;
|
|
}
|
|
}
|
|
}
|
|
|
|
testlog.debug("num_dcs={} num_racks={} nodes_per_rack={} shards_per_node={} ring_points=[{}]", num_dcs, num_racks, nodes_per_rack, shard_count,
|
|
fmt::join(ring_points | std::views::transform([] (const ring_point& rp) {
|
|
return fmt::format("({}, {})", rp.id, rp.host);
|
|
}), ", "));
|
|
|
|
// Initialize the token_metadata
|
|
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
|
auto stop_stm = deferred_stop(stm);
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
auto& topo = tm.get_topology();
|
|
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
|
std::unordered_set<token> tokens;
|
|
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
|
|
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count);
|
|
co_await tm.update_normal_tokens(std::move(tokens), id);
|
|
}
|
|
}).get();
|
|
|
|
auto tmptr = stm.get();
|
|
const auto& topo = tmptr->get_topology();
|
|
|
|
auto s = schema_builder("ks", "tb")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_column("v", utf8_type)
|
|
.build();
|
|
|
|
auto rf_per_dc = tests::random::get_int<size_t>(1, nodes_per_dc, rand);
|
|
|
|
auto make_options = [&] (size_t rf_per_dc) {
|
|
replication_strategy_config_options options;
|
|
for (const auto& dc : dcs) {
|
|
options.emplace(dc, fmt::to_string(rf_per_dc));
|
|
}
|
|
return options;
|
|
};
|
|
|
|
// Create the replication strategy
|
|
auto options = make_options(rf_per_dc);
|
|
size_t tablet_count = 128 * num_dcs * nodes_per_dc * shard_count / rf_per_dc;
|
|
testlog.debug("tablet_count={} options={}", tablet_count, options);
|
|
locator::replication_strategy_params params(options, tablet_count, std::nullopt);
|
|
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
|
"NetworkTopologyStrategy", params, topo);
|
|
auto nts_ptr = dynamic_cast<const network_topology_strategy*>(ars_ptr.get());
|
|
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
|
|
BOOST_REQUIRE(tab_awr_ptr);
|
|
auto tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, tmptr, tablet_count).get();
|
|
full_ring_check(tmap, ars_ptr, stm.get());
|
|
check_tablets_balance(tmap, nts_ptr, topo);
|
|
|
|
if (rf_per_dc < nodes_per_dc) {
|
|
auto inc_options = make_options(rf_per_dc + 1);
|
|
testlog.debug("Increasing rf_per_dc={}", rf_per_dc);
|
|
locator::replication_strategy_params inc_params(inc_options, tablet_count, std::nullopt);
|
|
auto inc_ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
|
"NetworkTopologyStrategy", params, topo);
|
|
auto inc_nts_ptr = dynamic_cast<const network_topology_strategy*>(inc_ars_ptr.get());
|
|
auto inc_tab_awr_ptr = inc_ars_ptr->maybe_as_tablet_aware();
|
|
BOOST_REQUIRE(inc_tab_awr_ptr);
|
|
auto inc_tmap = inc_tab_awr_ptr->reallocate_tablets(s, tmptr, tmap.clone_gently().get()).get();
|
|
full_ring_check(inc_tmap, ars_ptr, stm.get());
|
|
check_tablets_balance(inc_tmap, inc_nts_ptr, topo);
|
|
}
|
|
|
|
if (rf_per_dc > 1) {
|
|
auto dec_options = make_options(rf_per_dc - 1);
|
|
testlog.debug("Increasing rf_per_dc={}", rf_per_dc);
|
|
locator::replication_strategy_params dec_params(dec_options, tablet_count, std::nullopt);
|
|
auto dec_ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
|
"NetworkTopologyStrategy", params, topo);
|
|
auto dec_nts_ptr = dynamic_cast<const network_topology_strategy*>(dec_ars_ptr.get());
|
|
auto dec_tab_awr_ptr = dec_ars_ptr->maybe_as_tablet_aware();
|
|
BOOST_REQUIRE(dec_tab_awr_ptr);
|
|
auto dec_tmap = dec_tab_awr_ptr->reallocate_tablets(s, tmptr, tmap.clone_gently().get()).get();
|
|
full_ring_check(dec_tmap, ars_ptr, stm.get());
|
|
check_tablets_balance(dec_tmap, dec_nts_ptr, topo);
|
|
}
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablet_allocation_balancing_test) {
|
|
auto my_address = gms::inet_address("localhost");
|
|
|
|
// Create the RackInferringSnitch
|
|
snitch_config cfg;
|
|
cfg.listen_address = my_address;
|
|
cfg.broadcast_address = my_address;
|
|
cfg.name = "RackInferringSnitch";
|
|
sharded<snitch_ptr> snitch;
|
|
snitch.start(cfg).get();
|
|
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
|
|
snitch.invoke_on_all(&snitch_ptr::start).get();
|
|
|
|
for (auto i = 0; i < 100; ++i) {
|
|
test_random_balancing(snitch, my_address);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* static impl of "old" network topology strategy endpoint calculation.
|
|
*/
|
|
static size_t get_replication_factor(const sstring& dc,
|
|
const std::unordered_map<sstring, size_t>& datacenters) noexcept {
|
|
auto dc_factor = datacenters.find(dc);
|
|
return (dc_factor == datacenters.end()) ? 0 : dc_factor->second;
|
|
}
|
|
|
|
static bool has_sufficient_replicas(const sstring& dc,
|
|
const std::unordered_map<sstring, std::unordered_set<host_id>>& dc_replicas,
|
|
const std::unordered_map<sstring, std::unordered_set<host_id>>& all_endpoints,
|
|
const std::unordered_map<sstring, size_t>& datacenters) noexcept {
|
|
auto dc_replicas_it = dc_replicas.find(dc);
|
|
if (dc_replicas_it == dc_replicas.end()) {
|
|
BOOST_TEST_FAIL(seastar::format("has_sufficient_replicas: dc {} not found in dc_replicas: {}", dc, dc_replicas));
|
|
}
|
|
auto endpoint_it = all_endpoints.find(dc);
|
|
if (endpoint_it == all_endpoints.end()) {
|
|
BOOST_TEST_MESSAGE(seastar::format("has_sufficient_replicas: dc {} not found in all_endpoints: {}", dc, all_endpoints));
|
|
return true;
|
|
}
|
|
return dc_replicas_it->second.size()
|
|
>= std::min(endpoint_it->second.size(),
|
|
get_replication_factor(dc, datacenters));
|
|
}
|
|
|
|
static bool has_sufficient_replicas(
|
|
const std::unordered_map<sstring, std::unordered_set<host_id>>& dc_replicas,
|
|
const std::unordered_map<sstring, std::unordered_set<host_id>>& all_endpoints,
|
|
const std::unordered_map<sstring, size_t>& datacenters) noexcept {
|
|
|
|
for (auto& dc : datacenters | std::views::keys) {
|
|
if (!has_sufficient_replicas(dc, dc_replicas, all_endpoints,
|
|
datacenters)) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
static locator::host_id_set calculate_natural_endpoints(
|
|
const token& search_token, const token_metadata& tm,
|
|
const locator::topology& topo,
|
|
const std::unordered_map<sstring, size_t>& datacenters) {
|
|
//
|
|
// We want to preserve insertion order so that the first added endpoint
|
|
// becomes primary.
|
|
//
|
|
locator::host_id_set replicas;
|
|
|
|
// replicas we have found in each DC
|
|
std::unordered_map<sstring, std::unordered_set<host_id>> dc_replicas;
|
|
// tracks the racks we have already placed replicas in
|
|
std::unordered_map<sstring, std::unordered_set<sstring>> seen_racks;
|
|
//
|
|
// tracks the endpoints that we skipped over while looking for unique racks
|
|
// when we relax the rack uniqueness we can append this to the current
|
|
// result so we don't have to wind back the iterator
|
|
//
|
|
std::unordered_map<sstring, locator::host_id_set>
|
|
skipped_dc_endpoints;
|
|
|
|
//
|
|
// Populate the temporary data structures.
|
|
//
|
|
for (auto& dc_rep_factor_pair : datacenters) {
|
|
auto& dc_name = dc_rep_factor_pair.first;
|
|
|
|
dc_replicas[dc_name].reserve(dc_rep_factor_pair.second);
|
|
seen_racks[dc_name] = {};
|
|
skipped_dc_endpoints[dc_name] = {};
|
|
}
|
|
|
|
//
|
|
// all token owners in each DC, so we can check when we have exhausted all
|
|
// the token-owning members of a DC
|
|
//
|
|
const std::unordered_map<sstring,
|
|
std::unordered_set<locator::host_id>>
|
|
all_endpoints = tm.get_datacenter_token_owners();
|
|
//
|
|
// all racks (with non-token owners filtered out) in a DC so we can check
|
|
// when we have exhausted all racks in a DC
|
|
//
|
|
const std::unordered_map<sstring,
|
|
std::unordered_map<sstring,
|
|
std::unordered_set<host_id>>>
|
|
racks = tm.get_datacenter_racks_token_owners();
|
|
|
|
// not aware of any cluster members
|
|
SCYLLA_ASSERT(!all_endpoints.empty() && !racks.empty());
|
|
|
|
for (auto& next : tm.ring_range(search_token)) {
|
|
|
|
if (has_sufficient_replicas(dc_replicas, all_endpoints, datacenters)) {
|
|
break;
|
|
}
|
|
|
|
host_id ep = *tm.get_endpoint(next);
|
|
sstring dc = topo.get_location(ep).dc;
|
|
|
|
auto& seen_racks_dc_set = seen_racks[dc];
|
|
auto& racks_dc_map = racks.at(dc);
|
|
auto& skipped_dc_endpoints_set = skipped_dc_endpoints[dc];
|
|
auto& dc_replicas_dc_set = dc_replicas[dc];
|
|
|
|
// have we already found all replicas for this dc?
|
|
if (!datacenters.contains(dc) ||
|
|
has_sufficient_replicas(dc, dc_replicas, all_endpoints, datacenters)) {
|
|
continue;
|
|
}
|
|
|
|
//
|
|
// can we skip checking the rack? - namely, we've seen all racks in this
|
|
// DC already and may add this endpoint right away.
|
|
//
|
|
if (seen_racks_dc_set.size() == racks_dc_map.size()) {
|
|
dc_replicas_dc_set.insert(ep);
|
|
replicas.push_back(ep);
|
|
} else {
|
|
sstring rack = topo.get_location(ep).rack;
|
|
// is this a new rack? - we prefer to replicate on different racks
|
|
if (seen_racks_dc_set.contains(rack)) {
|
|
skipped_dc_endpoints_set.push_back(ep);
|
|
} else { // this IS a new rack
|
|
dc_replicas_dc_set.insert(ep);
|
|
replicas.push_back(ep);
|
|
seen_racks_dc_set.insert(rack);
|
|
//
|
|
// if we've run out of distinct racks, add the hosts we skipped
|
|
// past already (up to RF)
|
|
//
|
|
if (seen_racks_dc_set.size() == racks_dc_map.size())
|
|
{
|
|
auto skipped_it = skipped_dc_endpoints_set.begin();
|
|
while (skipped_it != skipped_dc_endpoints_set.end() &&
|
|
!has_sufficient_replicas(dc, dc_replicas, all_endpoints, datacenters)) {
|
|
host_id skipped = *skipped_it++;
|
|
dc_replicas_dc_set.insert(skipped);
|
|
replicas.push_back(skipped);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return replicas;
|
|
}
|
|
|
|
// Called in a seastar thread.
|
|
static void test_equivalence(const shared_token_metadata& stm, const locator::topology& topo, const std::unordered_map<sstring, size_t>& datacenters) {
|
|
class my_network_topology_strategy : public network_topology_strategy {
|
|
public:
|
|
using network_topology_strategy::network_topology_strategy;
|
|
using network_topology_strategy::calculate_natural_endpoints;
|
|
};
|
|
|
|
my_network_topology_strategy nts(replication_strategy_params(
|
|
datacenters | std::views::transform(
|
|
[](const std::pair<sstring, size_t>& p) {
|
|
return std::make_pair(p.first, to_sstring(p.second));
|
|
})
|
|
| std::ranges::to<std::map<sstring, replication_strategy_config_option>>(),
|
|
std::nullopt, std::nullopt),
|
|
&topo);
|
|
|
|
const token_metadata& tm = *stm.get();
|
|
for (size_t i = 0; i < 1000; ++i) {
|
|
auto token = dht::token::get_random_token();
|
|
auto expected = calculate_natural_endpoints(token, tm, topo, datacenters);
|
|
auto actual = nts.calculate_natural_endpoints(token, *stm.get()).get();
|
|
|
|
// Because the old algorithm does not put the nodes in the correct order in the case where more replicas
|
|
// are required than there are racks in a dc, we accept different order as long as the primary
|
|
// replica is the same.
|
|
|
|
BOOST_REQUIRE_EQUAL(expected[0], actual[0]);
|
|
BOOST_REQUIRE_EQUAL(std::set<host_id>(expected.begin(), expected.end()),
|
|
std::set<host_id>(actual.begin(), actual.end()));
|
|
|
|
}
|
|
}
|
|
|
|
void generate_topology(topology& topo, const std::unordered_map<sstring, size_t> datacenters, const host_id_vector_replica_set& nodes) {
|
|
auto& e1 = seastar::testing::local_random_engine;
|
|
|
|
std::unordered_map<sstring, size_t> racks_per_dc;
|
|
std::vector<std::reference_wrapper<const sstring>> dcs;
|
|
|
|
dcs.reserve(datacenters.size() * 4);
|
|
|
|
using udist = std::uniform_int_distribution<size_t>;
|
|
|
|
auto out = std::back_inserter(dcs);
|
|
|
|
for (auto& p : datacenters) {
|
|
auto& dc = p.first;
|
|
auto rf = p.second;
|
|
auto rc = udist(0, rf * 3 - 1)(e1) + 1;
|
|
racks_per_dc.emplace(dc, rc);
|
|
out = std::fill_n(out, rf, std::cref(dc));
|
|
}
|
|
|
|
for (auto& node : nodes) {
|
|
const sstring& dc = dcs[udist(0, dcs.size() - 1)(e1)];
|
|
auto rc = racks_per_dc.at(dc);
|
|
auto r = udist(0, rc)(e1);
|
|
topo.add_or_update_endpoint(node, endpoint_dc_rack{dc, to_sstring(r)}, locator::node::state::normal);
|
|
}
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
|
|
locator::token_metadata::config tm_cfg;
|
|
auto my_address = gms::inet_address("localhost");
|
|
|
|
constexpr size_t NODES = 100;
|
|
constexpr size_t VNODES = 64;
|
|
constexpr size_t RUNS = 10;
|
|
|
|
std::unordered_map<sstring, size_t> datacenters = {
|
|
{ "rf1", 1 },
|
|
{ "rf3", 3 },
|
|
{ "rf5_1", 5 },
|
|
{ "rf5_2", 5 },
|
|
{ "rf5_3", 5 },
|
|
};
|
|
host_id_vector_replica_set nodes;
|
|
nodes.reserve(NODES);
|
|
std::generate_n(std::back_inserter(nodes), NODES, [i = 0u]() mutable {
|
|
return host_id{utils::UUID(0, ++i)};
|
|
});
|
|
|
|
tm_cfg.topo_cfg.this_endpoint = my_address;
|
|
tm_cfg.topo_cfg.this_cql_address = my_address;
|
|
tm_cfg.topo_cfg.this_host_id = nodes[0];
|
|
tm_cfg.topo_cfg.local_dc_rack = locator::endpoint_dc_rack::default_location;
|
|
|
|
for (size_t run = 0; run < RUNS; ++run) {
|
|
semaphore sem(1);
|
|
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
|
auto stop_stm = deferred_stop(stm);
|
|
|
|
std::unordered_set<dht::token> random_tokens;
|
|
while (random_tokens.size() < nodes.size() * VNODES) {
|
|
random_tokens.insert(dht::token::get_random_token());
|
|
}
|
|
std::unordered_map<host_id, std::unordered_set<token>> endpoint_tokens;
|
|
auto next_token_it = random_tokens.begin();
|
|
for (auto& node : nodes) {
|
|
for (size_t i = 0; i < VNODES; ++i) {
|
|
endpoint_tokens[node].insert(*next_token_it);
|
|
next_token_it++;
|
|
}
|
|
}
|
|
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
generate_topology(tm.get_topology(), datacenters, nodes);
|
|
for (auto&& i : endpoint_tokens) {
|
|
co_await tm.update_normal_tokens(std::move(i.second), i.first);
|
|
}
|
|
}).get();
|
|
test_equivalence(stm, stm.get()->get_topology(), datacenters);
|
|
}
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_invalid_dcs) {
|
|
return do_with_cql_env_thread([] (auto& e) {
|
|
for (auto& incorrect : std::vector<std::string>{"3\"", "", "!!!", "abcb", "!3", "-5", "0x123", "999999999999999999999999999999"}) {
|
|
BOOST_REQUIRE_THROW(e.execute_cql("CREATE KEYSPACE abc WITH REPLICATION "
|
|
"= {'class': 'NetworkTopologyStrategy', 'dc1':'" + incorrect + "'}").get(),
|
|
exceptions::configuration_exception);
|
|
BOOST_REQUIRE_THROW(e.execute_cql("CREATE KEYSPACE abc WITH REPLICATION "
|
|
"= {'class': 'NetworkTopologyStrategy', 'replication_factor':'" + incorrect + "'}").get(),
|
|
exceptions::configuration_exception);
|
|
BOOST_REQUIRE_THROW(e.execute_cql("CREATE KEYSPACE abc WITH REPLICATION "
|
|
"= {'class': 'SimpleStrategy', 'replication_factor':'" + incorrect + "'}").get(),
|
|
exceptions::configuration_exception);
|
|
};
|
|
});
|
|
}
|
|
|
|
static
|
|
sstring describe(cql_test_env& e, sstring ks_name) {
|
|
auto& ks = e.local_db().find_keyspace(ks_name);
|
|
cql3::description desc = ks.metadata()->describe(e.local_db(), cql3::with_create_statement::yes);
|
|
auto result = desc.create_statement->linearize();
|
|
testlog.info("DESCRIBE KEYSPACE {}: {}", ks_name, result);
|
|
return result;
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_rack_list_rf) {
|
|
auto cfg = cql_test_config();
|
|
cfg.db_config->tablets_mode_for_new_keyspaces(db::tablets_mode_t::mode::enabled);
|
|
return do_with_cql_env_thread([] (auto& e) {
|
|
topology_builder topo(e);
|
|
|
|
unsigned shard_count = 2;
|
|
topo.start_new_dc({"dc1", "rack1a"});
|
|
topo.add_node(service::node_state::normal, shard_count);
|
|
topo.start_new_rack("rack1b");
|
|
topo.add_node(service::node_state::normal, shard_count);
|
|
topo.start_new_dc({"dc2", "rack2a"});
|
|
topo.add_node(service::node_state::normal, shard_count);
|
|
topo.start_new_rack("rack2b");
|
|
topo.add_node(service::node_state::normal, shard_count);
|
|
|
|
// Single rack
|
|
e.execute_cql("CREATE KEYSPACE ks11 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1a']}").get();
|
|
BOOST_REQUIRE(describe(e, "ks11").contains("'dc1': ['rack1a']"));
|
|
|
|
// Two racks
|
|
e.execute_cql("CREATE KEYSPACE ks12 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1a', 'rack1b']}").get();
|
|
BOOST_REQUIRE(describe(e, "ks12").contains("'dc1': ['rack1a', 'rack1b']"));
|
|
|
|
// Two DCs, two racks each
|
|
{
|
|
e.execute_cql("CREATE KEYSPACE ks22 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1a', 'rack1b'], "
|
|
"'dc2': ['rack2a', 'rack2b']} AND tablets = {'enabled':true}").get();
|
|
auto& opts = e.local_db().find_keyspace("ks22").get_replication_strategy().get_config_options();
|
|
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc1")).get_rack_list(),
|
|
std::vector<sstring>({"rack1a", "rack1b"}));
|
|
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc2")).get_rack_list(),
|
|
std::vector<sstring>({"rack2a", "rack2b"}));
|
|
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc1")).count(), 2);
|
|
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc2")).count(), 2);
|
|
BOOST_REQUIRE(describe(e, "ks22").contains("'dc1': ['rack1a', 'rack1b']"));
|
|
BOOST_REQUIRE(describe(e, "ks22").contains("'dc2': ['rack2a', 'rack2b']"));
|
|
}
|
|
|
|
// Two DCs, one using rack list, one using numeric RF (auto-expanded)
|
|
{
|
|
e.execute_cql("CREATE KEYSPACE ks2n2 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': 2, "
|
|
"'dc2': ['rack2a', 'rack2b']} AND tablets = {'enabled':true}").get();
|
|
auto& opts = e.local_db().find_keyspace("ks2n2").get_replication_strategy().get_config_options();
|
|
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc1")).get_rack_list(),
|
|
std::vector<sstring>({"rack1a", "rack1b"}));
|
|
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc2")).get_rack_list(),
|
|
std::vector<sstring>({"rack2a", "rack2b"}));
|
|
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc1")).count(), 2);
|
|
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at("dc2")).count(), 2);
|
|
BOOST_REQUIRE(describe(e, "ks2n2").contains("'dc1': ['rack1a', 'rack1b']"));
|
|
BOOST_REQUIRE(describe(e, "ks2n2").contains("'dc2': ['rack2a', 'rack2b']"));
|
|
}
|
|
|
|
// Non-existent DC
|
|
BOOST_REQUIRE_THROW(e.execute_cql(
|
|
"CREATE KEYSPACE fail WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc3': ['rack2a']}").get(),
|
|
exceptions::configuration_exception);
|
|
|
|
// Rack from the wrong DC
|
|
BOOST_REQUIRE_THROW(e.execute_cql(
|
|
"CREATE KEYSPACE fail WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack2a']}").get(),
|
|
exceptions::configuration_exception);
|
|
|
|
// Duplicated racks
|
|
BOOST_REQUIRE_THROW(e.execute_cql(
|
|
"CREATE KEYSPACE fail WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1a', 'rack1b', 'rack1a']}").get(),
|
|
exceptions::configuration_exception);
|
|
|
|
// Duplicated racks
|
|
BOOST_REQUIRE_THROW(e.execute_cql(
|
|
"CREATE KEYSPACE fail WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1a', 'rack1a']}").get(),
|
|
exceptions::configuration_exception);
|
|
|
|
// Alter to numeric with different count
|
|
BOOST_REQUIRE_THROW(e.execute_cql(
|
|
"ALTER KEYSPACE ks12 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': 3}").get(),
|
|
exceptions::configuration_exception);
|
|
BOOST_REQUIRE_THROW(e.execute_cql(
|
|
"ALTER KEYSPACE ks12 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': 1}").get(),
|
|
exceptions::configuration_exception);
|
|
}, cfg);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_rack_list_rejected_when_feature_not_enabled) {
|
|
auto cfg = cql_test_config();
|
|
cfg.db_config->tablets_mode_for_new_keyspaces(db::tablets_mode_t::mode::enabled);
|
|
cfg.disabled_features.insert("RACK_LIST_RF");
|
|
return do_with_cql_env_thread([] (auto& e) {
|
|
auto& topo = e.get_shared_token_metadata().local().get()->get_topology();
|
|
auto loc = topo.get_location();
|
|
auto create_stmt = fmt::format("CREATE KEYSPACE test WITH REPLICATION = {{'class': 'NetworkTopologyStrategy',"
|
|
" '{}': ['{}']}}", loc.dc, loc.rack);
|
|
BOOST_REQUIRE_THROW(e.execute_cql(create_stmt).get(), exceptions::configuration_exception);
|
|
|
|
// Auto-expansion to numeric RF should not happen
|
|
e.execute_cql(fmt::format("CREATE KEYSPACE test2 WITH REPLICATION = {{'class': 'NetworkTopologyStrategy', '{}': 1}}", loc.dc)).get();
|
|
auto& opts = e.local_db().find_keyspace("test2").get_replication_strategy().get_config_options();
|
|
BOOST_REQUIRE(replication_factor_data(opts.at(loc.dc)).is_numeric());
|
|
BOOST_REQUIRE_EQUAL(replication_factor_data(opts.at(loc.dc)).count(), 1);
|
|
BOOST_REQUIRE(describe(e, "test2").contains(fmt::format("'{}': '1'", loc.dc)));
|
|
|
|
// When feature is enabled, rack list is accepted.
|
|
e.get_feature_service().local().rack_list_rf.enable();
|
|
e.execute_cql(create_stmt).get();
|
|
|
|
// Altering numeric RF to rack list is not supported yet.
|
|
BOOST_REQUIRE_THROW(e.execute_cql(fmt::format("ALTER KEYSPACE test2 WITH REPLICATION = {{'class': 'NetworkTopologyStrategy',"
|
|
" '{}': ['{}']}}", loc.dc, loc.rack)).get(),
|
|
exceptions::configuration_exception);
|
|
}, cfg);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_altering_to_numeric_forbidden) {
|
|
auto cfg = cql_test_config();
|
|
cfg.db_config->tablets_mode_for_new_keyspaces(db::tablets_mode_t::mode::enabled);
|
|
cfg.disabled_features.insert("RACK_LIST_RF");
|
|
return do_with_cql_env_thread([] (auto& e) {
|
|
topology_builder topo(e);
|
|
|
|
unsigned shard_count = 1;
|
|
topo.start_new_dc({"dc1", "rack1a"});
|
|
topo.add_node(service::node_state::normal, shard_count);
|
|
topo.start_new_rack("rack1b");
|
|
topo.add_node(service::node_state::normal, shard_count);
|
|
|
|
e.execute_cql("CREATE KEYSPACE ks1 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': 1}").get();
|
|
e.get_feature_service().local().rack_list_rf.enable();
|
|
|
|
// Only altering to rack list should be allowed once rf_rack_valid is enabled.
|
|
BOOST_REQUIRE_THROW(e.execute_cql(
|
|
"ALTER KEYSPACE ks1 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': 2}").get(),
|
|
exceptions::configuration_exception);
|
|
}, cfg);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_rack_list_rejected_when_using_vnodes) {
|
|
auto cfg = cql_test_config();
|
|
return do_with_cql_env_thread([] (auto& e) {
|
|
auto& topo = e.get_shared_token_metadata().local().get()->get_topology();
|
|
auto loc = topo.get_location();
|
|
auto create_stmt = [&] (bool tablets) {
|
|
return fmt::format("CREATE KEYSPACE abc WITH REPLICATION = {{'class': 'NetworkTopologyStrategy',"
|
|
" '{}': ['{}']}} and TABLETS = {{'enabled': {}}}",
|
|
loc.dc, loc.rack, tablets ? "true" : "false");
|
|
};
|
|
|
|
BOOST_REQUIRE_THROW(e.execute_cql(create_stmt(false)).get(), exceptions::configuration_exception);
|
|
e.execute_cql(create_stmt(true)).get();
|
|
}, cfg);
|
|
}
|
|
|
|
} // namespace network_topology_strategy_test
|
|
|
|
namespace locator {
|
|
|
|
std::weak_ordering compare_endpoints(const locator::topology& topo, const locator::host_id& address, const locator::host_id& a1, const locator::host_id& a2) {
|
|
const auto& loc = topo.get_location(address);
|
|
const auto& loc1 = topo.get_location(a1);
|
|
const auto& loc2 = topo.get_location(a2);
|
|
|
|
return topo.distance(address, loc, a1, loc1) <=> topo.distance(address, loc, a2, loc2);
|
|
}
|
|
|
|
void topology::test_compare_endpoints(const locator::host_id& address, const locator::host_id& a1, const locator::host_id& a2) const {
|
|
std::optional<std::partial_ordering> expected;
|
|
const auto& loc = get_location(address);
|
|
const auto& loc1 = get_location(a1);
|
|
const auto& loc2 = get_location(a2);
|
|
if (a1 == a2) {
|
|
expected = std::partial_ordering::equivalent;
|
|
} else {
|
|
if (a1 == address) {
|
|
expected = std::partial_ordering::less;
|
|
} else if (a2 == address) {
|
|
expected = std::partial_ordering::greater;
|
|
} else {
|
|
if (loc1.dc == loc.dc) {
|
|
if (loc2.dc != loc.dc) {
|
|
expected = std::partial_ordering::less;
|
|
} else {
|
|
if (loc1.rack == loc.rack) {
|
|
if (loc2.rack != loc.rack) {
|
|
expected = std::partial_ordering::less;
|
|
}
|
|
} else if (loc2.rack == loc.rack) {
|
|
expected = std::partial_ordering::greater;
|
|
}
|
|
}
|
|
} else if (loc2.dc == loc.dc) {
|
|
expected = std::partial_ordering::greater;
|
|
}
|
|
}
|
|
}
|
|
auto res = compare_endpoints(*this, address, a1, a2);
|
|
testlog.debug("compare_endpoint: address={} [{}/{}] a1={} [{}/{}] a2={} [{}/{}]: res={} expected={} expected_value={}",
|
|
address, loc.dc, loc.rack,
|
|
a1, loc1.dc, loc1.rack,
|
|
a2, loc2.dc, loc2.rack,
|
|
res, bool(expected), expected.value_or(std::partial_ordering::unordered));
|
|
if (expected) {
|
|
BOOST_REQUIRE_EQUAL(res, *expected);
|
|
}
|
|
}
|
|
|
|
void topology::test_sort_by_proximity(const locator::host_id& address, const host_id_vector_replica_set& nodes) const {
|
|
auto sorted_nodes = nodes;
|
|
do_sort_by_proximity(address, sorted_nodes);
|
|
std::unordered_set<locator::host_id> nodes_set(nodes.begin(), nodes.end());
|
|
std::unordered_set<locator::host_id> sorted_nodes_set(sorted_nodes.begin(), sorted_nodes.end());
|
|
// Test that no nodes were lost by sort_by_proximity
|
|
BOOST_REQUIRE_EQUAL(nodes_set, sorted_nodes_set);
|
|
// Verify that the reference address is sorted as first
|
|
// if it is part of the input vector
|
|
if (std::ranges::find(nodes, address) != nodes.end()) {
|
|
BOOST_REQUIRE_EQUAL(sorted_nodes[0], address);
|
|
}
|
|
// Test sort monotonicity
|
|
for (size_t i = 1; i < sorted_nodes.size(); ++i) {
|
|
BOOST_REQUIRE(compare_endpoints(*this, address, sorted_nodes[i-1], sorted_nodes[i]) <= 0);
|
|
}
|
|
}
|
|
|
|
} // namespace locator
|
|
|
|
namespace network_topology_strategy_test {
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) {
|
|
locator::token_metadata::config tm_cfg;
|
|
auto my_address = gms::inet_address("localhost");
|
|
tm_cfg.topo_cfg.this_endpoint = my_address;
|
|
tm_cfg.topo_cfg.this_cql_address = my_address;
|
|
|
|
constexpr size_t NODES = 10;
|
|
|
|
std::unordered_map<sstring, size_t> datacenters = {
|
|
{ "rf1", 1 },
|
|
{ "rf2", 2 },
|
|
{ "rf3", 3 },
|
|
};
|
|
host_id_vector_replica_set nodes;
|
|
nodes.reserve(NODES);
|
|
|
|
auto make_address = [] (unsigned i) {
|
|
return host_id{utils::UUID(0, i)};
|
|
};
|
|
|
|
tm_cfg.topo_cfg.this_endpoint = my_address;
|
|
tm_cfg.topo_cfg.this_cql_address = my_address;
|
|
tm_cfg.topo_cfg.this_host_id = nodes[0];
|
|
tm_cfg.topo_cfg.local_dc_rack = locator::endpoint_dc_rack::default_location;
|
|
|
|
std::generate_n(std::back_inserter(nodes), NODES, [&, i = 0u]() mutable {
|
|
return make_address(++i);
|
|
});
|
|
|
|
semaphore sem(1);
|
|
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
|
auto stop_stm = deferred_stop(stm);
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
|
auto& topo = tm.get_topology();
|
|
generate_topology(topo, datacenters, nodes);
|
|
|
|
const auto& address = nodes[tests::random::get_int<size_t>(0, NODES-1)];
|
|
const auto& a1 = nodes[tests::random::get_int<size_t>(0, NODES-1)];
|
|
const auto& a2 = nodes[tests::random::get_int<size_t>(0, NODES-1)];
|
|
|
|
topo.test_compare_endpoints(address, address, address);
|
|
topo.test_compare_endpoints(address, address, a1);
|
|
topo.test_compare_endpoints(address, a1, address);
|
|
topo.test_compare_endpoints(address, a1, a1);
|
|
topo.test_compare_endpoints(address, a1, a2);
|
|
topo.test_compare_endpoints(address, a2, a1);
|
|
return make_ready_future<>();
|
|
}).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_topology_sort_by_proximity) {
|
|
using map_type = std::unordered_map<sstring, size_t>;
|
|
map_type datacenters;
|
|
size_t num_dcs = tests::random::get_int<size_t>(1, 3);
|
|
for (size_t i = 0; i < num_dcs; ++i) {
|
|
size_t rf = tests::random::get_int<size_t>(3, 5);
|
|
datacenters.emplace(format("dc{}", i), rf);
|
|
}
|
|
size_t num_nodes = std::ranges::fold_left(datacenters | std::views::transform(std::mem_fn(&map_type::value_type::second)), size_t(0), std::plus{});
|
|
host_id_vector_replica_set nodes;
|
|
auto make_address = [] (unsigned i) {
|
|
return host_id{utils::UUID(0, i)};
|
|
};
|
|
nodes.reserve(num_nodes);
|
|
std::generate_n(std::back_inserter(nodes), num_nodes, [&, i = 0u]() mutable {
|
|
return make_address(++i);
|
|
});
|
|
|
|
locator::token_metadata::config tm_cfg;
|
|
auto my_address = gms::inet_address("localhost");
|
|
tm_cfg.topo_cfg.this_endpoint = my_address;
|
|
tm_cfg.topo_cfg.this_cql_address = my_address;
|
|
tm_cfg.topo_cfg.this_host_id = nodes[0];
|
|
tm_cfg.topo_cfg.local_dc_rack = locator::endpoint_dc_rack::default_location;
|
|
semaphore sem(1);
|
|
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
|
|
auto stop_stm = deferred_stop(stm);
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
generate_topology(tm.get_topology(), datacenters, nodes);
|
|
return make_ready_future();
|
|
}).get();
|
|
|
|
auto tmptr = stm.get();
|
|
const auto& topology = stm.get()->get_topology();
|
|
auto it = nodes.begin() + tests::random::get_int<size_t>(0, num_nodes - 1);
|
|
auto address = *it;
|
|
topology.test_sort_by_proximity(address, nodes);
|
|
|
|
// remove the reference node from the nodes list
|
|
nodes.erase(it);
|
|
topology.test_sort_by_proximity(address, nodes);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
|
|
inet_address ip1("192.168.0.1");
|
|
inet_address ip2("192.168.0.2");
|
|
inet_address ip3("192.168.0.3");
|
|
|
|
auto host1 = host_id(utils::make_random_uuid());
|
|
auto host2 = host_id(utils::make_random_uuid());
|
|
|
|
auto ip1_dc_rack = endpoint_dc_rack{ "dc1", "rack_ip1" };
|
|
auto ip1_dc_rack_v2 = endpoint_dc_rack{ "dc1", "rack_ip1_v2" };
|
|
|
|
semaphore sem(1);
|
|
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{
|
|
topology::config{
|
|
.this_endpoint = ip1,
|
|
.this_host_id = host1,
|
|
.local_dc_rack = ip1_dc_rack,
|
|
}
|
|
});
|
|
auto stop_stm = deferred_stop(stm);
|
|
|
|
// get_location() should work before any node is added
|
|
|
|
BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack);
|
|
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
|
// Need to move to non left or none state in order to be indexed by ip
|
|
tm.update_topology(host1, {}, locator::node::state::normal);
|
|
tm.update_topology(host2, {}, locator::node::state::normal);
|
|
return make_ready_future<>();
|
|
}).get();
|
|
|
|
const node* n1 = stm.get()->get_topology().find_node(host1);
|
|
BOOST_REQUIRE(n1);
|
|
BOOST_REQUIRE(bool(n1->is_this_node()));
|
|
BOOST_REQUIRE_EQUAL(n1->host_id(), host1);
|
|
BOOST_REQUIRE(n1->dc_rack() == ip1_dc_rack);
|
|
BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack);
|
|
|
|
const node* n2 = stm.get()->get_topology().find_node(host2);
|
|
BOOST_REQUIRE(n2);
|
|
BOOST_REQUIRE(!bool(n2->is_this_node()));
|
|
BOOST_REQUIRE_EQUAL(n2->host_id(), host2);
|
|
BOOST_REQUIRE(n2->dc_rack() == endpoint_dc_rack::default_location);
|
|
|
|
// Local node cannot be removed
|
|
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
|
tm.remove_endpoint(host1);
|
|
return make_ready_future<>();
|
|
}).get();
|
|
|
|
n1 = stm.get()->get_topology().find_node(host1);
|
|
BOOST_REQUIRE(n1);
|
|
|
|
// Removing node with no local node
|
|
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) {
|
|
tm.remove_endpoint(host2);
|
|
return make_ready_future<>();
|
|
}).get();
|
|
|
|
n2 = stm.get()->get_topology().find_node(host2);
|
|
BOOST_REQUIRE(!n2);
|
|
|
|
// Repopulate after clear_gently()
|
|
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
co_await tm.clear_gently();
|
|
tm.update_topology(host2, std::nullopt, std::nullopt);
|
|
tm.update_topology(host1, std::nullopt, std::nullopt); // this_node added last on purpose
|
|
}).get();
|
|
|
|
n1 = stm.get()->get_topology().find_node(host1);
|
|
BOOST_REQUIRE(n1);
|
|
BOOST_REQUIRE(bool(n1->is_this_node()));
|
|
BOOST_REQUIRE_EQUAL(n1->host_id(), host1);
|
|
BOOST_REQUIRE(n1->dc_rack() == ip1_dc_rack);
|
|
BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack);
|
|
|
|
n2 = stm.get()->get_topology().find_node(host2);
|
|
BOOST_REQUIRE(n2);
|
|
BOOST_REQUIRE(!bool(n2->is_this_node()));
|
|
BOOST_REQUIRE_EQUAL(n2->host_id(), host2);
|
|
BOOST_REQUIRE(n2->dc_rack() == endpoint_dc_rack::default_location);
|
|
|
|
// get_location() should pick up endpoint_dc_rack from node info
|
|
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
co_await tm.clear_gently();
|
|
tm.get_topology().add_or_update_endpoint(host1, ip1_dc_rack_v2, node::state::being_decommissioned);
|
|
}).get();
|
|
|
|
n1 = stm.get()->get_topology().find_node(host1);
|
|
BOOST_REQUIRE(n1);
|
|
BOOST_REQUIRE(bool(n1->is_this_node()));
|
|
BOOST_REQUIRE_EQUAL(n1->host_id(), host1);
|
|
BOOST_REQUIRE(n1->dc_rack() == ip1_dc_rack_v2);
|
|
BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack_v2);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
|
|
auto my_address = gms::inet_address("localhost");
|
|
|
|
// Create the RackInferringSnitch
|
|
snitch_config cfg;
|
|
cfg.listen_address = my_address;
|
|
cfg.broadcast_address = my_address;
|
|
cfg.name = "RackInferringSnitch";
|
|
sharded<snitch_ptr> snitch;
|
|
snitch.start(cfg).get();
|
|
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
|
|
snitch.invoke_on_all(&snitch_ptr::start).get();
|
|
|
|
locator::token_metadata::config tm_cfg;
|
|
tm_cfg.topo_cfg.this_endpoint = my_address;
|
|
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
|
|
|
|
std::map<sstring, size_t> node_count_per_dc;
|
|
std::map<sstring, std::map<sstring, size_t>> node_count_per_rack;
|
|
std::vector<ring_point> ring_points;
|
|
|
|
auto& random_engine = seastar::testing::local_random_engine;
|
|
unsigned shard_count = 2;
|
|
size_t num_dcs = 1 + tests::random::get_int(3);
|
|
|
|
// Generate a random cluster
|
|
double point = 1;
|
|
for (size_t dc = 0; dc < num_dcs; ++dc) {
|
|
sstring dc_name = fmt::format("{}", 100 + dc);
|
|
size_t num_racks = 1 + tests::random::get_int(5);
|
|
for (size_t rack = 0; rack < num_racks; ++rack) {
|
|
sstring rack_name = fmt::format("{}", 10 + rack);
|
|
size_t rack_nodes = 1 + tests::random::get_int(2);
|
|
for (size_t i = 1; i <= rack_nodes; ++i) {
|
|
ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i)));
|
|
node_count_per_dc[dc_name]++;
|
|
node_count_per_rack[dc_name][rack_name]++;
|
|
point++;
|
|
}
|
|
}
|
|
}
|
|
|
|
testlog.debug("node_count_per_rack={}", node_count_per_rack);
|
|
|
|
// Initialize the token_metadata
|
|
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
|
auto stop_stm = deferred_stop(stm);
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
auto& topo = tm.get_topology();
|
|
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
|
std::unordered_set<token> tokens;
|
|
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
|
|
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count);
|
|
co_await tm.update_normal_tokens(std::move(tokens), id);
|
|
}
|
|
}).get();
|
|
|
|
auto base_schema = schema_builder("ks", "base")
|
|
.with_column("k", utf8_type, column_kind::partition_key)
|
|
.with_column("v", utf8_type)
|
|
.build();
|
|
|
|
auto view_schema = schema_builder("ks", "view")
|
|
.with_column("v", utf8_type, column_kind::partition_key)
|
|
.with_column("k", utf8_type)
|
|
.build();
|
|
|
|
auto tmptr = stm.get();
|
|
|
|
// Create the replication strategy
|
|
auto make_random_options = [&] () {
|
|
auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to<std::vector>();
|
|
std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine);
|
|
std::map<sstring, replication_strategy_config_option> options;
|
|
for (const auto& dc : option_dcs) {
|
|
auto num_racks = node_count_per_rack.at(dc).size();
|
|
auto max_rf_factor = std::ranges::min(std::ranges::views::transform(node_count_per_rack.at(dc), [] (auto& x) { return x.second; }));
|
|
auto rf = num_racks * tests::random::get_int(1UL, max_rf_factor);
|
|
options.emplace(dc, fmt::to_string(rf));
|
|
}
|
|
return options;
|
|
};
|
|
|
|
auto options = make_random_options();
|
|
size_t tablet_count = 1 + tests::random::get_int(99);
|
|
testlog.debug("tablet_count={} rf_options={}", tablet_count, options);
|
|
locator::replication_strategy_params params(options, tablet_count, std::nullopt);
|
|
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
|
"NetworkTopologyStrategy", params, tmptr->get_topology());
|
|
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
|
|
BOOST_REQUIRE(tab_awr_ptr);
|
|
auto base_tmap = tab_awr_ptr->allocate_tablets_for_new_table(base_schema, tmptr, 1).get();
|
|
auto base_table_id = base_schema->id();
|
|
testlog.debug("base_table_id={}", base_table_id);
|
|
auto view_table_id = view_schema->id();
|
|
auto view_tmap = tab_awr_ptr->allocate_tablets_for_new_table(view_schema, tmptr, 1).get();
|
|
testlog.debug("view_table_id={}", view_table_id);
|
|
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
tm.tablets().set_tablet_map(base_table_id, co_await base_tmap.clone_gently());
|
|
tm.tablets().set_tablet_map(view_table_id, co_await view_tmap.clone_gently());
|
|
}).get();
|
|
|
|
tmptr = stm.get();
|
|
auto base_erm = tab_awr_ptr->make_replication_map(base_table_id, tmptr);
|
|
auto view_erm = tab_awr_ptr->make_replication_map(view_table_id, tmptr);
|
|
|
|
auto& topology = tmptr->get_topology();
|
|
testlog.debug("topology: {}", topology.get_datacenter_racks());
|
|
|
|
// Test tablets rack-aware base-view pairing
|
|
auto base_token = dht::token::get_random_token();
|
|
auto view_token = dht::token::get_random_token();
|
|
bool use_legacy_self_pairing = false;
|
|
bool use_tablets_basic_rack_aware_view_pairing = true;
|
|
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
|
|
replica::cf_stats cf_stats;
|
|
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
|
|
std::unordered_map<locator::host_id, locator::host_id> view_to_base_pairing;
|
|
for (const auto& base_replica : base_replicas) {
|
|
auto& base_host = base_replica.host;
|
|
auto view_ep_opt = db::view::get_view_natural_endpoint(
|
|
base_host,
|
|
base_erm,
|
|
view_erm,
|
|
*ars_ptr,
|
|
base_token,
|
|
view_token,
|
|
use_legacy_self_pairing,
|
|
use_tablets_basic_rack_aware_view_pairing,
|
|
cf_stats).natural_endpoint;
|
|
|
|
// view pair must be found
|
|
BOOST_REQUIRE(view_ep_opt);
|
|
auto& view_ep = *view_ep_opt;
|
|
|
|
// Assert pairing uniqueness
|
|
auto [base_it, inserted_base_pair] = base_to_view_pairing.emplace(base_host, view_ep);
|
|
BOOST_REQUIRE(inserted_base_pair);
|
|
auto [view_it, inserted_view_pair] = view_to_base_pairing.emplace(view_ep, base_host);
|
|
BOOST_REQUIRE(inserted_view_pair);
|
|
|
|
auto& base_location = topology.find_node(base_host)->dc_rack();
|
|
auto& view_location = topology.find_node(view_ep)->dc_rack();
|
|
|
|
// Assert dc- and rack- aware pairing
|
|
BOOST_REQUIRE_EQUAL(base_location.dc, view_location.dc);
|
|
BOOST_REQUIRE_EQUAL(base_location.rack, view_location.rack);
|
|
}
|
|
}
|
|
|
|
// Called in a seastar thread
|
|
void test_complex_rack_aware_view_pairing_test(bool more_or_less) {
|
|
auto my_address = gms::inet_address("localhost");
|
|
|
|
// Create the RackInferringSnitch
|
|
snitch_config cfg;
|
|
cfg.listen_address = my_address;
|
|
cfg.broadcast_address = my_address;
|
|
cfg.name = "RackInferringSnitch";
|
|
sharded<snitch_ptr> snitch;
|
|
snitch.start(cfg).get();
|
|
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
|
|
snitch.invoke_on_all(&snitch_ptr::start).get();
|
|
|
|
locator::token_metadata::config tm_cfg;
|
|
tm_cfg.topo_cfg.this_endpoint = my_address;
|
|
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
|
|
|
|
std::map<sstring, size_t> node_count_per_dc;
|
|
std::map<sstring, std::map<sstring, size_t>> node_count_per_rack;
|
|
std::vector<ring_point> ring_points;
|
|
|
|
auto& random_engine = seastar::testing::local_random_engine;
|
|
unsigned shard_count = 2;
|
|
size_t num_dcs = 1 + tests::random::get_int(3);
|
|
|
|
// Generate a random cluster
|
|
double point = 1;
|
|
for (size_t dc = 0; dc < num_dcs; ++dc) {
|
|
sstring dc_name = fmt::format("{}", 100 + dc);
|
|
size_t num_racks = 2 + tests::random::get_int(4);
|
|
for (size_t rack = 0; rack < num_racks; ++rack) {
|
|
sstring rack_name = fmt::format("{}", 10 + rack);
|
|
size_t rack_nodes = 1 + tests::random::get_int(2);
|
|
for (size_t i = 1; i <= rack_nodes; ++i) {
|
|
ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i)));
|
|
node_count_per_dc[dc_name]++;
|
|
node_count_per_rack[dc_name][rack_name]++;
|
|
point++;
|
|
}
|
|
}
|
|
}
|
|
|
|
testlog.debug("node_count_per_rack={}", node_count_per_rack);
|
|
|
|
// Initialize the token_metadata
|
|
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
|
|
auto stop_stm = deferred_stop(stm);
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
auto& topo = tm.get_topology();
|
|
for (const auto& [ring_point, endpoint, id] : ring_points) {
|
|
std::unordered_set<token> tokens;
|
|
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
|
|
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count);
|
|
co_await tm.update_normal_tokens(std::move(tokens), id);
|
|
}
|
|
}).get();
|
|
|
|
auto base_schema = schema_builder("ks", "base")
|
|
.with_column("k", utf8_type, column_kind::partition_key)
|
|
.with_column("v", utf8_type)
|
|
.build();
|
|
|
|
auto view_schema = schema_builder("ks", "view")
|
|
.with_column("v", utf8_type, column_kind::partition_key)
|
|
.with_column("k", utf8_type)
|
|
.build();
|
|
|
|
auto tmptr = stm.get();
|
|
|
|
// Create the replication strategy
|
|
auto make_random_options = [&] () {
|
|
auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to<std::vector>();
|
|
std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine);
|
|
std::map<sstring, replication_strategy_config_option> options;
|
|
for (const auto& dc : option_dcs) {
|
|
auto num_racks = node_count_per_rack.at(dc).size();
|
|
auto rf = more_or_less ?
|
|
tests::random::get_int(num_racks, node_count_per_dc[dc]) :
|
|
tests::random::get_int(1UL, num_racks);
|
|
options.emplace(dc, fmt::to_string(rf));
|
|
}
|
|
return options;
|
|
};
|
|
|
|
auto options = make_random_options();
|
|
size_t tablet_count = 1 + tests::random::get_int(99);
|
|
testlog.debug("tablet_count={} rf_options={}", tablet_count, options);
|
|
locator::replication_strategy_params params(options, tablet_count, std::nullopt);
|
|
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
|
|
"NetworkTopologyStrategy", params, tmptr->get_topology());
|
|
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
|
|
BOOST_REQUIRE(tab_awr_ptr);
|
|
auto base_tmap = tab_awr_ptr->allocate_tablets_for_new_table(base_schema, tmptr, 1).get();
|
|
auto base_table_id = base_schema->id();
|
|
testlog.debug("base_table_id={}", base_table_id);
|
|
auto view_table_id = view_schema->id();
|
|
auto view_tmap = tab_awr_ptr->allocate_tablets_for_new_table(view_schema, tmptr, 1).get();
|
|
testlog.debug("view_table_id={}", view_table_id);
|
|
|
|
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
|
|
tm.tablets().set_tablet_map(base_table_id, co_await base_tmap.clone_gently());
|
|
tm.tablets().set_tablet_map(view_table_id, co_await view_tmap.clone_gently());
|
|
}).get();
|
|
|
|
tmptr = stm.get();
|
|
auto base_erm = tab_awr_ptr->make_replication_map(base_table_id, tmptr);
|
|
auto view_erm = tab_awr_ptr->make_replication_map(view_table_id, tmptr);
|
|
|
|
auto& topology = tmptr->get_topology();
|
|
testlog.debug("topology: {}", topology.get_datacenter_racks());
|
|
|
|
// Test tablets rack-aware base-view pairing
|
|
auto base_token = dht::token::get_random_token();
|
|
auto view_token = dht::token::get_random_token();
|
|
bool use_legacy_self_pairing = false;
|
|
bool use_tablets_basic_rack_aware_view_pairing = true;
|
|
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
|
|
replica::cf_stats cf_stats;
|
|
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
|
|
std::unordered_map<locator::host_id, locator::host_id> view_to_base_pairing;
|
|
std::unordered_map<sstring, size_t> same_rack_pairs;
|
|
std::unordered_map<sstring, size_t> cross_rack_pairs;
|
|
for (const auto& base_replica : base_replicas) {
|
|
auto& base_host = base_replica.host;
|
|
auto view_ep_opt = db::view::get_view_natural_endpoint(
|
|
base_host,
|
|
base_erm,
|
|
view_erm,
|
|
*ars_ptr,
|
|
base_token,
|
|
view_token,
|
|
use_legacy_self_pairing,
|
|
use_tablets_basic_rack_aware_view_pairing,
|
|
cf_stats).natural_endpoint;
|
|
|
|
// view pair must be found
|
|
if (!view_ep_opt) {
|
|
BOOST_FAIL(format("Could not pair base_host={} base_token={} view_token={}", base_host, base_token, view_token));
|
|
}
|
|
BOOST_REQUIRE(view_ep_opt);
|
|
auto& view_ep = *view_ep_opt;
|
|
|
|
// Assert pairing uniqueness
|
|
auto [base_it, inserted_base_pair] = base_to_view_pairing.emplace(base_host, view_ep);
|
|
BOOST_REQUIRE(inserted_base_pair);
|
|
auto [view_it, inserted_view_pair] = view_to_base_pairing.emplace(view_ep, base_host);
|
|
BOOST_REQUIRE(inserted_view_pair);
|
|
|
|
auto& base_location = topology.find_node(base_host)->dc_rack();
|
|
auto& view_location = topology.find_node(view_ep)->dc_rack();
|
|
|
|
// Assert dc- and rack- aware pairing
|
|
BOOST_REQUIRE_EQUAL(base_location.dc, view_location.dc);
|
|
|
|
if (base_location.rack == view_location.rack) {
|
|
same_rack_pairs[base_location.dc]++;
|
|
} else {
|
|
cross_rack_pairs[base_location.dc]++;
|
|
}
|
|
}
|
|
for (const auto& [dc, rf_opt] : options) {
|
|
auto rf = locator::get_replication_factor(rf_opt);
|
|
BOOST_REQUIRE_EQUAL(same_rack_pairs[dc] + cross_rack_pairs[dc], rf);
|
|
}
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_lt_racks) {
|
|
test_complex_rack_aware_view_pairing_test(false);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_gt_racks) {
|
|
test_complex_rack_aware_view_pairing_test(true);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_rack_diff) {
|
|
BOOST_REQUIRE(diff_racks({}, {}).empty());
|
|
|
|
{
|
|
rack_list l1 = {"rack1", "rack2", "rack3"};
|
|
rack_list l2 = {"rack1", "rack2", "rack3"};
|
|
auto diff = diff_racks(l1, l2);
|
|
BOOST_REQUIRE(diff.empty());
|
|
}
|
|
|
|
{
|
|
rack_list l1 = {"rack3", "rack2", "rack1"};
|
|
rack_list l2 = {"rack1", "rack2", "rack3"};
|
|
auto diff = diff_racks(l1, l2);
|
|
BOOST_REQUIRE(diff.empty());
|
|
}
|
|
|
|
{
|
|
rack_list l1 = {"rack1", "rack2"};
|
|
rack_list l2 = {"rack1", "rack2", "rack3"};
|
|
auto diff = diff_racks(l1, l2);
|
|
BOOST_REQUIRE_EQUAL(diff.added, rack_list{"rack3"});
|
|
BOOST_REQUIRE(diff.removed.empty());
|
|
}
|
|
|
|
{
|
|
rack_list l1 = {"rack1", "rack2", "rack3"};
|
|
rack_list l2 = {"rack1", "rack3"};
|
|
auto diff = diff_racks(l1, l2);
|
|
BOOST_REQUIRE_EQUAL(diff.removed, rack_list{"rack2"});
|
|
BOOST_REQUIRE(diff.added.empty());
|
|
}
|
|
|
|
{
|
|
rack_list l1 = {"rack3", "rack2", "rack1"};
|
|
rack_list l2 = {"rack4", "rack2"};
|
|
auto diff = diff_racks(l1, l2);
|
|
BOOST_REQUIRE_EQUAL(diff.added, rack_list{"rack4"});
|
|
BOOST_REQUIRE_EQUAL(diff.removed | std::ranges::to<std::unordered_set<sstring>>(),
|
|
std::unordered_set<sstring>({"rack1", "rack3"}));
|
|
}
|
|
}
|
|
|
|
BOOST_AUTO_TEST_SUITE_END()
|