locator: network_topology_strategy: Support tablet replication

This commit is contained in:
Tomasz Grabiec
2023-03-23 14:37:38 +01:00
parent 5e89f2f5ba
commit 819bc86f0f
4 changed files with 217 additions and 11 deletions

View File

@@ -55,7 +55,7 @@ class effective_replication_map_factory;
class per_table_replication_strategy;
class tablet_aware_replication_strategy;
class abstract_replication_strategy {
class abstract_replication_strategy : public seastar::enable_shared_from_this<abstract_replication_strategy> {
friend class vnode_effective_replication_map;
friend class per_table_replication_strategy;
friend class tablet_aware_replication_strategy;

View File

@@ -14,6 +14,7 @@
#include <seastar/coroutine/maybe_yield.hh>
#include "locator/network_topology_strategy.hh"
#include "utils/fb_utilities.hh"
#include <boost/algorithm/string.hpp>
#include "utils/hash.hh"
@@ -32,7 +33,10 @@ network_topology_strategy::network_topology_strategy(
const replication_strategy_config_options& config_options) :
abstract_replication_strategy(config_options,
replication_strategy_type::network_topology) {
for (auto& config_pair : config_options) {
auto opts = config_options;
process_tablet_options(*this, opts);
for (auto& config_pair : opts) {
auto& key = config_pair.first;
auto& val = config_pair.second;
@@ -251,8 +255,13 @@ network_topology_strategy::calculate_natural_endpoints(
co_return std::move(tracker.replicas());
}
void network_topology_strategy::validate_options(const gms::feature_service&) const {
void network_topology_strategy::validate_options(const gms::feature_service& fs) const {
validate_tablet_options(fs, _config_options);
auto tablet_opts = recognized_tablet_options();
for (auto& c : _config_options) {
if (tablet_opts.contains(c.first)) {
continue;
}
if (c.first == sstring("replication_factor")) {
throw exceptions::configuration_exception(
"replication_factor is an option for simple_strategy, not "
@@ -264,7 +273,59 @@ void network_topology_strategy::validate_options(const gms::feature_service&) co
std::optional<std::unordered_set<sstring>> network_topology_strategy::recognized_options(const topology& topology) const {
// We only allow datacenter names as options
return topology.get_datacenters();
auto opts = topology.get_datacenters();
opts.merge(recognized_tablet_options());
return opts;
}
effective_replication_map_ptr network_topology_strategy::make_replication_map(table_id table, token_metadata_ptr tm) const {
if (!uses_tablets()) {
on_internal_error(rslogger, format("make_replication_map() called for table {} but replication strategy not configured to use tablets", table));
}
return do_make_replication_map(table, shared_from_this(), std::move(tm), _rep_factor);
}
future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(schema_ptr s, token_metadata_ptr tm) const {
auto tablet_count = get_initial_tablets();
auto aligned_tablet_count = 1ul << log2ceil(tablet_count);
if (tablet_count != aligned_tablet_count) {
rslogger.info("Rounding up tablet count from {} to {} for table {}.{}", tablet_count, aligned_tablet_count, s->ks_name(), s->cf_name());
tablet_count = aligned_tablet_count;
}
tablet_map tablets(tablet_count);
// FIXME: Don't use tokens to distribute nodes.
// The following reuses the existing token-based algorithm used by NetworkTopologyStrategy.
assert(!tm->sorted_tokens().empty());
auto token_range = tm->ring_range(dht::token::get_random_token());
for (tablet_id tb : tablets.tablet_ids()) {
natural_endpoints_tracker tracker(*tm, _dc_rep_factor);
while (true) {
co_await coroutine::maybe_yield();
if (token_range.begin() == token_range.end()) {
token_range = tm->ring_range(dht::minimum_token());
}
inet_address ep = *tm->get_endpoint(*token_range.begin());
token_range.drop_front();
if (tracker.add_endpoint_and_check_if_done(ep)) {
break;
}
}
tablet_replica_set replicas;
for (auto&& ep : tracker.replicas()) {
// FIXME: Allocate shard. Currently ignored by the replica.
replicas.emplace_back(tablet_replica{tm->get_host_id(ep), 0});
}
tablets.set_tablet(tb, tablet_info{std::move(replicas)});
}
tablet_logger.debug("Allocated tablets for {}.{} ({}): {}", s->ks_name(), s->cf_name(), s->id(), tablets);
co_return tablets;
}
using registry = class_registrator<abstract_replication_strategy, network_topology_strategy, const replication_strategy_config_options&>;

View File

@@ -11,13 +11,16 @@
#pragma once
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablet_replication_strategy.hh"
#include "exceptions/exceptions.hh"
#include <optional>
#include <set>
namespace locator {
class network_topology_strategy : public abstract_replication_strategy {
class network_topology_strategy : public abstract_replication_strategy
, public tablet_aware_replication_strategy {
public:
network_topology_strategy(
const replication_strategy_config_options& config_options);
@@ -39,6 +42,9 @@ public:
return true;
}
public: // tablet_aware_replication_strategy
virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const override;
virtual future<tablet_map> allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr) const override;
protected:
/**
* calculate endpoints in one pass through the tokens by tracking our

View File

@@ -19,6 +19,7 @@
#include <seastar/core/sstring.hh>
#include "log.hh"
#include "gms/gossiper.hh"
#include "schema/schema_builder.hh"
#include <vector>
#include <string>
#include <map>
@@ -96,11 +97,22 @@ void strategy_sanity_check(
void endpoints_check(
replication_strategy_ptr ars_ptr,
const token_metadata& tm,
inet_address_vector_replica_set& endpoints,
const inet_address_vector_replica_set& endpoints,
const locator::topology& topo) {
auto&& nodes_per_dc = tm.get_topology().get_datacenter_endpoints();
const network_topology_strategy* nts_ptr =
dynamic_cast<const network_topology_strategy*>(ars_ptr.get());
size_t total_rf = 0;
for (auto&& [dc, nodes] : nodes_per_dc) {
auto effective_rf = std::min<size_t>(nts_ptr->get_replication_factor(dc), nodes.size());
total_rf += effective_rf;
}
// Check the total RF
BOOST_CHECK(endpoints.size() == ars_ptr->get_replication_factor(tm));
BOOST_CHECK(endpoints.size() == total_rf);
BOOST_CHECK(total_rf <= ars_ptr->get_replication_factor(tm));
// Check the uniqueness
std::unordered_set<inet_address> ep_set(endpoints.begin(), endpoints.end());
@@ -119,10 +131,9 @@ void endpoints_check(
}
}
const network_topology_strategy* nts_ptr =
dynamic_cast<const network_topology_strategy*>(ars_ptr.get());
for (auto& rf : dc_rf) {
BOOST_CHECK(rf.second == nts_ptr->get_replication_factor(rf.first));
for (auto&& [dc, rf] : dc_rf) {
auto effective_rf = std::min<size_t>(nts_ptr->get_replication_factor(dc), nodes_per_dc.at(dc).size());
BOOST_CHECK(rf == effective_rf);
}
}
@@ -177,6 +188,33 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
}
}
void full_ring_check(const tablet_map& tmap,
const std::map<sstring, sstring>& options,
replication_strategy_ptr rs_ptr,
locator::token_metadata_ptr tmptr) {
auto& tm = *tmptr;
const auto& topo = tm.get_topology();
auto get_endpoint_for_host_id = [&] (host_id host) {
auto endpoint_opt = tm.get_endpoint_for_host_id(host);
assert(endpoint_opt);
return *endpoint_opt;
};
auto to_endpoint_set = [&] (const tablet_replica_set& replicas) {
inet_address_vector_replica_set result;
result.reserve(replicas.size());
for (auto&& replica : replicas) {
result.emplace_back(get_endpoint_for_host_id(replica.host));
}
return result;
};
for (tablet_id tb : tmap.tablet_ids()) {
endpoints_check(rs_ptr, tm, to_endpoint_set(tmap.get_tablet_info(tb).replicas), topo);
}
}
locator::endpoint_dc_rack make_endpoint_dc_rack(gms::inet_address endpoint) {
// This resembles rack_inferring_snitch dc/rack generation which is
// still in use by this test via token_metadata internals
@@ -348,6 +386,107 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_heavy) {
return heavy_origin_test();
}
SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
// Create the RackInferringSnitch
snitch_config cfg;
cfg.name = "RackInferringSnitch";
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
snitch.invoke_on_all(&snitch_ptr::start).get();
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_host_id = host_id::create_random_id();
tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address();
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
std::vector<ring_point> ring_points = {
{ 1.0, inet_address("192.100.10.1") },
{ 2.0, inet_address("192.101.10.1") },
{ 3.0, inet_address("192.102.10.1") },
{ 4.0, inet_address("192.100.20.1") },
{ 5.0, inet_address("192.101.20.1") },
{ 6.0, inet_address("192.102.20.1") },
{ 7.0, inet_address("192.100.30.1") },
{ 8.0, inet_address("192.101.30.1") },
{ 9.0, inet_address("192.102.30.1") },
{ 10.0, inet_address("192.102.40.1") },
{ 11.0, inet_address("192.102.40.2") }
};
// Initialize the token_metadata
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())});
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
tm.update_host_id(id, endpoint);
co_await tm.update_normal_tokens(std::move(tokens), endpoint);
}
}).get();
/////////////////////////////////////
// Create the replication strategy
std::map<sstring, sstring> options323 = {
{"100", "3"},
{"101", "2"},
{"102", "3"},
{"initial_tablets", "100"}
};
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", options323);
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);
auto s = schema_builder("ks", "tb")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("v", utf8_type)
.build();
auto tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0();
full_ring_check(tmap, options323, ars_ptr, stm.get());
///////////////
// Create the replication strategy
std::map<sstring, sstring> options320 = {
{"100", "3"},
{"101", "2"},
{"102", "0"},
{"initial_tablets", "100"}
};
ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", options320);
tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);
tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0();
full_ring_check(tmap, options320, ars_ptr, stm.get());
// Test the case of not enough nodes to meet RF in DC 102
std::map<sstring, sstring> options324 = {
{"100", "3"},
{"101", "4"},
{"102", "2"},
{"initial_tablets", "100"}
};
ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", options324);
tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);
tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0();
full_ring_check(tmap, options324, ars_ptr, stm.get());
}
/**
* static impl of "old" network topology strategy endpoint calculation.
*/