diff --git a/configure.py b/configure.py index daba9c535d..44d65eaaed 100755 --- a/configure.py +++ b/configure.py @@ -946,6 +946,7 @@ scylla_core = (['message/messaging_service.cc', 'locator/ec2_multi_region_snitch.cc', 'locator/gce_snitch.cc', 'locator/topology.cc', + 'locator/util.cc', 'service/client_state.cc', 'service/storage_service.cc', 'service/misc_services.cc', diff --git a/gms/gossiper.cc b/gms/gossiper.cc index e7df039f06..16952eaa18 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1443,6 +1443,16 @@ int gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) { return ep1->get_heart_beat_state().get_generation() - ep2->get_heart_beat_state().get_generation(); } +sstring gossiper::get_rpc_address(const inet_address& endpoint) const { + if (endpoint != get_broadcast_address()) { + auto* v = get_application_state_ptr(endpoint, gms::application_state::RPC_ADDRESS); + if (v) { + return v->value; + } + } + return boost::lexical_cast(endpoint); +} + void gossiper::update_timestamp_for_nodes(const std::map& map) { for (const auto& x : map) { const gms::inet_address& endpoint = x.first; diff --git a/gms/gossiper.hh b/gms/gossiper.hh index ab656ecad2..0fc6b8dd1d 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -405,6 +405,13 @@ public: * determine which endpoint started up earlier */ int compare_endpoint_startup(inet_address addr1, inet_address addr2); + + /** + * Return the rpc address associated with an endpoint as a string. + * @param endpoint The endpoint to get rpc address for + * @return the rpc address + */ + sstring get_rpc_address(const inet_address& endpoint) const; private: void update_timestamp_for_nodes(const std::map& map); diff --git a/locator/util.cc b/locator/util.cc new file mode 100644 index 0000000000..e0022e9431 --- /dev/null +++ b/locator/util.cc @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ +#include "locator/util.hh" +#include "replica/database.hh" +#include "gms/gossiper.hh" + +namespace locator { + +static future> +construct_range_to_endpoint_map( + locator::effective_replication_map_ptr erm, + const dht::token_range_vector& ranges) { + std::unordered_map res; + res.reserve(ranges.size()); + for (auto r : ranges) { + res[r] = erm->get_natural_endpoints( + r.end() ? r.end()->value() : dht::maximum_token()); + co_await coroutine::maybe_yield(); + } + co_return res; +} + +// Caller is responsible to hold token_metadata valid until the returned future is resolved +static future +get_all_ranges(const std::vector& sorted_tokens) { + if (sorted_tokens.empty()) + co_return dht::token_range_vector(); + int size = sorted_tokens.size(); + dht::token_range_vector ranges; + ranges.reserve(size); + ranges.push_back(dht::token_range::make_ending_with(range_bound(sorted_tokens[0], true))); + co_await coroutine::maybe_yield(); + for (int i = 1; i < size; ++i) { + dht::token_range r(range::bound(sorted_tokens[i - 1], false), range::bound(sorted_tokens[i], true)); + ranges.push_back(r); + co_await coroutine::maybe_yield(); + } + ranges.push_back(dht::token_range::make_starting_with(range_bound(sorted_tokens[size-1], false))); + + co_return ranges; +} + +// Caller is responsible to hold token_metadata valid until the returned future is resolved +static future> +get_range_to_address_map(locator::effective_replication_map_ptr erm, + const std::vector& sorted_tokens) { + co_return co_await construct_range_to_endpoint_map(erm, co_await get_all_ranges(sorted_tokens)); +} + +// Caller is responsible to hold token_metadata valid until the returned future is resolved +static future> +get_tokens_in_local_dc(const locator::token_metadata& tm) { + std::vector filtered_tokens; + auto local_dc_filter = tm.get_topology().get_local_dc_filter(); + for (auto token : tm.sorted_tokens()) { + auto endpoint = tm.get_endpoint(token); + if (local_dc_filter(*endpoint)) + filtered_tokens.push_back(token); + co_await coroutine::maybe_yield(); + } + co_return filtered_tokens; +} + +static future> +get_range_to_address_map_in_local_dc( + locator::effective_replication_map_ptr erm) { + auto tmptr = erm->get_token_metadata_ptr(); + auto orig_map = co_await get_range_to_address_map(erm, co_await get_tokens_in_local_dc(*tmptr)); + std::unordered_map filtered_map; + filtered_map.reserve(orig_map.size()); + auto local_dc_filter = tmptr->get_topology().get_local_dc_filter(); + for (auto entry : orig_map) { + auto& addresses = filtered_map[entry.first]; + addresses.reserve(entry.second.size()); + std::copy_if(entry.second.begin(), entry.second.end(), std::back_inserter(addresses), std::cref(local_dc_filter)); + co_await coroutine::maybe_yield(); + } + + co_return filtered_map; +} + +// static future> +// get_range_to_address_map(const replica::database& db, const sstring& keyspace) { +// return get_range_to_address_map(db.find_keyspace(keyspace).get_effective_replication_map()); +// } + +static future> +get_range_to_address_map(locator::effective_replication_map_ptr erm) { + return get_range_to_address_map(erm, erm->get_token_metadata_ptr()->sorted_tokens()); +} + +future> +describe_ring(const replica::database& db, const gms::gossiper& gossiper, const sstring& keyspace, bool include_only_local_dc) { + std::vector ranges; + //Token.TokenFactory tf = getPartitioner().getTokenFactory(); + + auto erm = db.find_keyspace(keyspace).get_effective_replication_map(); + std::unordered_map range_to_address_map = co_await ( + include_only_local_dc + ? get_range_to_address_map_in_local_dc(erm) + : get_range_to_address_map(erm) + ); + auto tmptr = erm->get_token_metadata_ptr(); + for (auto entry : range_to_address_map) { + const auto& topology = tmptr->get_topology(); + auto range = entry.first; + auto addresses = entry.second; + dht::token_range_endpoints tr; + if (range.start()) { + tr._start_token = range.start()->value().to_sstring(); + } + if (range.end()) { + tr._end_token = range.end()->value().to_sstring(); + } + for (auto endpoint : addresses) { + dht::endpoint_details details; + details._host = endpoint; + details._datacenter = topology.get_datacenter(endpoint); + details._rack = topology.get_rack(endpoint); + tr._rpc_endpoints.push_back(gossiper.get_rpc_address(endpoint)); + tr._endpoints.push_back(boost::lexical_cast(details._host)); + tr._endpoint_details.push_back(details); + } + ranges.push_back(tr); + co_await coroutine::maybe_yield(); + } + // Convert to wrapping ranges + auto left_inf = boost::find_if(ranges, [] (const dht::token_range_endpoints& tr) { + return tr._start_token.empty(); + }); + auto right_inf = boost::find_if(ranges, [] (const dht::token_range_endpoints& tr) { + return tr._end_token.empty(); + }); + using set = std::unordered_set; + if (left_inf != right_inf + && left_inf != ranges.end() + && right_inf != ranges.end() + && (boost::copy_range(left_inf->_endpoints) + == boost::copy_range(right_inf->_endpoints))) { + left_inf->_start_token = std::move(right_inf->_start_token); + ranges.erase(right_inf); + } + co_return ranges; +} + +} \ No newline at end of file diff --git a/locator/util.hh b/locator/util.hh new file mode 100644 index 0000000000..4de6cd0c50 --- /dev/null +++ b/locator/util.hh @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "dht/token_range_endpoints.hh" + +namespace replica { + class database; +} + +namespace gms { + class gossiper; +} + +namespace locator { + future> describe_ring(const replica::database& db, const gms::gossiper& gossiper, const sstring& keyspace, bool include_only_local_dc = false); +} \ No newline at end of file diff --git a/service/storage_service.cc b/service/storage_service.cc index d2749baa73..702b337032 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -65,6 +65,7 @@ #include #include "utils/stall_free.hh" #include "utils/error_injection.hh" +#include "locator/util.hh" #include #include @@ -776,38 +777,6 @@ storage_service::get_range_to_address_map(locator::effective_replication_map_ptr return get_range_to_address_map(erm, erm->get_token_metadata_ptr()->sorted_tokens()); } -future> -storage_service::get_range_to_address_map_in_local_dc( - locator::effective_replication_map_ptr erm) const { - auto tmptr = erm->get_token_metadata_ptr(); - auto orig_map = co_await get_range_to_address_map(erm, co_await get_tokens_in_local_dc(*tmptr)); - std::unordered_map filtered_map; - filtered_map.reserve(orig_map.size()); - auto local_dc_filter = tmptr->get_topology().get_local_dc_filter(); - for (auto entry : orig_map) { - auto& addresses = filtered_map[entry.first]; - addresses.reserve(entry.second.size()); - std::copy_if(entry.second.begin(), entry.second.end(), std::back_inserter(addresses), std::cref(local_dc_filter)); - co_await coroutine::maybe_yield(); - } - - co_return filtered_map; -} - -// Caller is responsible to hold token_metadata valid until the returned future is resolved -future> -storage_service::get_tokens_in_local_dc(const locator::token_metadata& tm) const { - std::vector filtered_tokens; - auto local_dc_filter = tm.get_topology().get_local_dc_filter(); - for (auto token : tm.sorted_tokens()) { - auto endpoint = tm.get_endpoint(token); - if (local_dc_filter(*endpoint)) - filtered_tokens.push_back(token); - co_await coroutine::maybe_yield(); - } - co_return filtered_tokens; -} - // Caller is responsible to hold token_metadata valid until the returned future is resolved future> storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm, @@ -3240,56 +3209,7 @@ future<> storage_service::move(token new_token) { future> storage_service::describe_ring(const sstring& keyspace, bool include_only_local_dc) const { - std::vector ranges; - //Token.TokenFactory tf = getPartitioner().getTokenFactory(); - - auto erm = _db.local().find_keyspace(keyspace).get_effective_replication_map(); - std::unordered_map range_to_address_map = co_await ( - include_only_local_dc - ? get_range_to_address_map_in_local_dc(erm) - : get_range_to_address_map(erm) - ); - auto tmptr = erm->get_token_metadata_ptr(); - for (auto entry : range_to_address_map) { - const auto& topology = tmptr->get_topology(); - auto range = entry.first; - auto addresses = entry.second; - token_range_endpoints tr; - if (range.start()) { - tr._start_token = range.start()->value().to_sstring(); - } - if (range.end()) { - tr._end_token = range.end()->value().to_sstring(); - } - for (auto endpoint : addresses) { - endpoint_details details; - details._host = endpoint; - details._datacenter = topology.get_datacenter(endpoint); - details._rack = topology.get_rack(endpoint); - tr._rpc_endpoints.push_back(get_rpc_address(endpoint)); - tr._endpoints.push_back(boost::lexical_cast(details._host)); - tr._endpoint_details.push_back(details); - } - ranges.push_back(tr); - co_await coroutine::maybe_yield(); - } - // Convert to wrapping ranges - auto left_inf = boost::find_if(ranges, [] (const token_range_endpoints& tr) { - return tr._start_token.empty(); - }); - auto right_inf = boost::find_if(ranges, [] (const token_range_endpoints& tr) { - return tr._end_token.empty(); - }); - using set = std::unordered_set; - if (left_inf != right_inf - && left_inf != ranges.end() - && right_inf != ranges.end() - && (boost::copy_range(left_inf->_endpoints) - == boost::copy_range(right_inf->_endpoints))) { - left_inf->_start_token = std::move(right_inf->_start_token); - ranges.erase(right_inf); - } - co_return ranges; + return locator::describe_ring(_db.local(), _gossiper, keyspace, include_only_local_dc); } future> diff --git a/service/storage_service.hh b/service/storage_service.hh index a385ee051f..4add73c2c5 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -382,11 +382,6 @@ public: future> get_range_to_address_map(const sstring& keyspace) const; future> get_range_to_address_map(locator::effective_replication_map_ptr erm) const; - future> get_range_to_address_map_in_local_dc( - locator::effective_replication_map_ptr erm) const; - - future> get_tokens_in_local_dc(const locator::token_metadata&) const; - future> get_range_to_address_map(locator::effective_replication_map_ptr erm, const std::vector& sorted_tokens) const;