diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index bb98cf04cb..a5f5c53c04 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -55,7 +55,7 @@ class effective_replication_map_factory; class per_table_replication_strategy; class tablet_aware_replication_strategy; -class abstract_replication_strategy { +class abstract_replication_strategy : public seastar::enable_shared_from_this { friend class vnode_effective_replication_map; friend class per_table_replication_strategy; friend class tablet_aware_replication_strategy; diff --git a/locator/network_topology_strategy.cc b/locator/network_topology_strategy.cc index 439425f68e..a5bbc40859 100644 --- a/locator/network_topology_strategy.cc +++ b/locator/network_topology_strategy.cc @@ -14,6 +14,7 @@ #include #include "locator/network_topology_strategy.hh" +#include "utils/fb_utilities.hh" #include #include "utils/hash.hh" @@ -32,7 +33,10 @@ network_topology_strategy::network_topology_strategy( const replication_strategy_config_options& config_options) : abstract_replication_strategy(config_options, replication_strategy_type::network_topology) { - for (auto& config_pair : config_options) { + auto opts = config_options; + process_tablet_options(*this, opts); + + for (auto& config_pair : opts) { auto& key = config_pair.first; auto& val = config_pair.second; @@ -251,8 +255,13 @@ network_topology_strategy::calculate_natural_endpoints( co_return std::move(tracker.replicas()); } -void network_topology_strategy::validate_options(const gms::feature_service&) const { +void network_topology_strategy::validate_options(const gms::feature_service& fs) const { + validate_tablet_options(fs, _config_options); + auto tablet_opts = recognized_tablet_options(); for (auto& c : _config_options) { + if (tablet_opts.contains(c.first)) { + continue; + } if (c.first == sstring("replication_factor")) { throw exceptions::configuration_exception( "replication_factor is an option for simple_strategy, not " @@ -264,7 +273,59 @@ void network_topology_strategy::validate_options(const gms::feature_service&) co std::optional> network_topology_strategy::recognized_options(const topology& topology) const { // We only allow datacenter names as options - return topology.get_datacenters(); + auto opts = topology.get_datacenters(); + opts.merge(recognized_tablet_options()); + return opts; +} + +effective_replication_map_ptr network_topology_strategy::make_replication_map(table_id table, token_metadata_ptr tm) const { + if (!uses_tablets()) { + on_internal_error(rslogger, format("make_replication_map() called for table {} but replication strategy not configured to use tablets", table)); + } + return do_make_replication_map(table, shared_from_this(), std::move(tm), _rep_factor); +} + +future network_topology_strategy::allocate_tablets_for_new_table(schema_ptr s, token_metadata_ptr tm) const { + auto tablet_count = get_initial_tablets(); + auto aligned_tablet_count = 1ul << log2ceil(tablet_count); + if (tablet_count != aligned_tablet_count) { + rslogger.info("Rounding up tablet count from {} to {} for table {}.{}", tablet_count, aligned_tablet_count, s->ks_name(), s->cf_name()); + tablet_count = aligned_tablet_count; + } + + tablet_map tablets(tablet_count); + + // FIXME: Don't use tokens to distribute nodes. + // The following reuses the existing token-based algorithm used by NetworkTopologyStrategy. + assert(!tm->sorted_tokens().empty()); + auto token_range = tm->ring_range(dht::token::get_random_token()); + + for (tablet_id tb : tablets.tablet_ids()) { + natural_endpoints_tracker tracker(*tm, _dc_rep_factor); + + while (true) { + co_await coroutine::maybe_yield(); + if (token_range.begin() == token_range.end()) { + token_range = tm->ring_range(dht::minimum_token()); + } + inet_address ep = *tm->get_endpoint(*token_range.begin()); + token_range.drop_front(); + if (tracker.add_endpoint_and_check_if_done(ep)) { + break; + } + } + + tablet_replica_set replicas; + for (auto&& ep : tracker.replicas()) { + // FIXME: Allocate shard. Currently ignored by the replica. + replicas.emplace_back(tablet_replica{tm->get_host_id(ep), 0}); + } + + tablets.set_tablet(tb, tablet_info{std::move(replicas)}); + } + + tablet_logger.debug("Allocated tablets for {}.{} ({}): {}", s->ks_name(), s->cf_name(), s->id(), tablets); + co_return tablets; } using registry = class_registrator; diff --git a/locator/network_topology_strategy.hh b/locator/network_topology_strategy.hh index 47d92c73a7..3f704c98b8 100644 --- a/locator/network_topology_strategy.hh +++ b/locator/network_topology_strategy.hh @@ -11,13 +11,16 @@ #pragma once #include "locator/abstract_replication_strategy.hh" +#include "locator/tablet_replication_strategy.hh" #include "exceptions/exceptions.hh" #include #include namespace locator { -class network_topology_strategy : public abstract_replication_strategy { + +class network_topology_strategy : public abstract_replication_strategy + , public tablet_aware_replication_strategy { public: network_topology_strategy( const replication_strategy_config_options& config_options); @@ -39,6 +42,9 @@ public: return true; } +public: // tablet_aware_replication_strategy + virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const override; + virtual future allocate_tablets_for_new_table(schema_ptr, token_metadata_ptr) const override; protected: /** * calculate endpoints in one pass through the tokens by tracking our diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index ffdf8098c8..0f8b16a13e 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -19,6 +19,7 @@ #include #include "log.hh" #include "gms/gossiper.hh" +#include "schema/schema_builder.hh" #include #include #include @@ -96,11 +97,22 @@ void strategy_sanity_check( void endpoints_check( replication_strategy_ptr ars_ptr, const token_metadata& tm, - inet_address_vector_replica_set& endpoints, + const inet_address_vector_replica_set& endpoints, const locator::topology& topo) { + auto&& nodes_per_dc = tm.get_topology().get_datacenter_endpoints(); + const network_topology_strategy* nts_ptr = + dynamic_cast(ars_ptr.get()); + + size_t total_rf = 0; + for (auto&& [dc, nodes] : nodes_per_dc) { + auto effective_rf = std::min(nts_ptr->get_replication_factor(dc), nodes.size()); + total_rf += effective_rf; + } + // Check the total RF - BOOST_CHECK(endpoints.size() == ars_ptr->get_replication_factor(tm)); + BOOST_CHECK(endpoints.size() == total_rf); + BOOST_CHECK(total_rf <= ars_ptr->get_replication_factor(tm)); // Check the uniqueness std::unordered_set ep_set(endpoints.begin(), endpoints.end()); @@ -119,10 +131,9 @@ void endpoints_check( } } - const network_topology_strategy* nts_ptr = - dynamic_cast(ars_ptr.get()); - for (auto& rf : dc_rf) { - BOOST_CHECK(rf.second == nts_ptr->get_replication_factor(rf.first)); + for (auto&& [dc, rf] : dc_rf) { + auto effective_rf = std::min(nts_ptr->get_replication_factor(dc), nodes_per_dc.at(dc).size()); + BOOST_CHECK(rf == effective_rf); } } @@ -177,6 +188,33 @@ void full_ring_check(const std::vector& ring_points, } } +void full_ring_check(const tablet_map& tmap, + const std::map& options, + replication_strategy_ptr rs_ptr, + locator::token_metadata_ptr tmptr) { + auto& tm = *tmptr; + const auto& topo = tm.get_topology(); + + auto get_endpoint_for_host_id = [&] (host_id host) { + auto endpoint_opt = tm.get_endpoint_for_host_id(host); + assert(endpoint_opt); + return *endpoint_opt; + }; + + auto to_endpoint_set = [&] (const tablet_replica_set& replicas) { + inet_address_vector_replica_set result; + result.reserve(replicas.size()); + for (auto&& replica : replicas) { + result.emplace_back(get_endpoint_for_host_id(replica.host)); + } + return result; + }; + + for (tablet_id tb : tmap.tablet_ids()) { + endpoints_check(rs_ptr, tm, to_endpoint_set(tmap.get_tablet_info(tb).replicas), topo); + } +} + locator::endpoint_dc_rack make_endpoint_dc_rack(gms::inet_address endpoint) { // This resembles rack_inferring_snitch dc/rack generation which is // still in use by this test via token_metadata internals @@ -348,6 +386,107 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_heavy) { return heavy_origin_test(); } +SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) { + utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost")); + utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost")); + + // Create the RackInferringSnitch + snitch_config cfg; + cfg.name = "RackInferringSnitch"; + sharded snitch; + snitch.start(cfg).get(); + auto stop_snitch = defer([&snitch] { snitch.stop().get(); }); + snitch.invoke_on_all(&snitch_ptr::start).get(); + + locator::token_metadata::config tm_cfg; + tm_cfg.topo_cfg.this_host_id = host_id::create_random_id(); + tm_cfg.topo_cfg.this_endpoint = utils::fb_utilities::get_broadcast_address(); + tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() }; + locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + + std::vector ring_points = { + { 1.0, inet_address("192.100.10.1") }, + { 2.0, inet_address("192.101.10.1") }, + { 3.0, inet_address("192.102.10.1") }, + { 4.0, inet_address("192.100.20.1") }, + { 5.0, inet_address("192.101.20.1") }, + { 6.0, inet_address("192.102.20.1") }, + { 7.0, inet_address("192.100.30.1") }, + { 8.0, inet_address("192.101.30.1") }, + { 9.0, inet_address("192.102.30.1") }, + { 10.0, inet_address("192.102.40.1") }, + { 11.0, inet_address("192.102.40.2") } + }; + + // Initialize the token_metadata + stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { + auto& topo = tm.get_topology(); + for (const auto& [ring_point, endpoint, id] : ring_points) { + std::unordered_set tokens; + tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())}); + topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal); + tm.update_host_id(id, endpoint); + co_await tm.update_normal_tokens(std::move(tokens), endpoint); + } + }).get(); + + ///////////////////////////////////// + // Create the replication strategy + std::map options323 = { + {"100", "3"}, + {"101", "2"}, + {"102", "3"}, + {"initial_tablets", "100"} + }; + + auto ars_ptr = abstract_replication_strategy::create_replication_strategy( + "NetworkTopologyStrategy", options323); + + auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware(); + BOOST_REQUIRE(tab_awr_ptr); + + auto s = schema_builder("ks", "tb") + .with_column("pk", utf8_type, column_kind::partition_key) + .with_column("v", utf8_type) + .build(); + + auto tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0(); + full_ring_check(tmap, options323, ars_ptr, stm.get()); + + /////////////// + // Create the replication strategy + std::map options320 = { + {"100", "3"}, + {"101", "2"}, + {"102", "0"}, + {"initial_tablets", "100"} + }; + + ars_ptr = abstract_replication_strategy::create_replication_strategy( + "NetworkTopologyStrategy", options320); + tab_awr_ptr = ars_ptr->maybe_as_tablet_aware(); + BOOST_REQUIRE(tab_awr_ptr); + + tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0(); + full_ring_check(tmap, options320, ars_ptr, stm.get()); + + // Test the case of not enough nodes to meet RF in DC 102 + std::map options324 = { + {"100", "3"}, + {"101", "4"}, + {"102", "2"}, + {"initial_tablets", "100"} + }; + + ars_ptr = abstract_replication_strategy::create_replication_strategy( + "NetworkTopologyStrategy", options324); + tab_awr_ptr = ars_ptr->maybe_as_tablet_aware(); + BOOST_REQUIRE(tab_awr_ptr); + + tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0(); + full_ring_check(tmap, options324, ars_ptr, stm.get()); +} + /** * static impl of "old" network topology strategy endpoint calculation. */