diff --git a/api/storage_service.cc b/api/storage_service.cc index 38efac84fd..953d5dc6d8 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -415,7 +415,7 @@ void set_storage_service(http_context& ctx, routes& r) { auto keyspace = validate_keyspace(ctx, req->param); std::vector res; return make_ready_future(stream_range_as_array(service::get_local_storage_service().get_range_to_address_map(keyspace), - [](const std::pair>& entry){ + [](const std::pair& entry){ ss::maplist_mapper m; if (entry.first.start()) { m.key.push(entry.first.start().value().value().to_sstring()); diff --git a/db/consistency_level.cc b/db/consistency_level.cc index 1f9df0b5cb..e783f040a7 100644 --- a/db/consistency_level.cc +++ b/db/consistency_level.cc @@ -155,11 +155,11 @@ bool is_local(gms::inet_address endpoint) { snitch_ptr->get_datacenter(endpoint); } -std::vector +inet_address_vector_replica_set filter_for_query(consistency_level cl, keyspace& ks, - std::vector live_endpoints, - const std::vector& preferred_endpoints, + inet_address_vector_replica_set live_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, read_repair_decision read_repair, gms::inet_address* extra, column_family* cf) { @@ -187,7 +187,7 @@ filter_for_query(consistency_level cl, return std::move(live_endpoints); } - std::vector selected_endpoints; + inet_address_vector_replica_set selected_endpoints; // Pre-select endpoints based on client preference. If the endpoints // selected this way aren't enough to satisfy CL requirements select the @@ -201,7 +201,7 @@ filter_for_query(consistency_level cl, if (extra) { *extra = selected == bf ? live_endpoints.front() : *(it + bf); } - return std::vector(it, it + bf); + return inet_address_vector_replica_set(it, it + bf); } else if (selected) { selected_endpoints.reserve(bf); std::move(it, live_endpoints.end(), std::back_inserter(selected_endpoints)); @@ -242,7 +242,7 @@ filter_for_query(consistency_level cl, if (!old_node && ht_max - ht_min > 0.01) { // if there is old node or hit rates are close skip calculations // local node is always first if present (see storage_proxy::get_live_sorted_endpoints) unsigned local_idx = epi[0].first == utils::fb_utilities::get_broadcast_address() ? 0 : epi.size() + 1; - live_endpoints = miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra)); + live_endpoints = boost::copy_range(miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra))); } } @@ -255,10 +255,10 @@ filter_for_query(consistency_level cl, return selected_endpoints; } -std::vector filter_for_query(consistency_level cl, +inet_address_vector_replica_set filter_for_query(consistency_level cl, keyspace& ks, - std::vector& live_endpoints, - const std::vector& preferred_endpoints, + inet_address_vector_replica_set& live_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, column_family* cf) { return filter_for_query(cl, ks, live_endpoints, preferred_endpoints, read_repair_decision::NONE, nullptr, cf); } @@ -266,7 +266,7 @@ std::vector filter_for_query(consistency_level cl, bool is_sufficient_live_nodes(consistency_level cl, keyspace& ks, - const std::vector& live_endpoints) { + const inet_address_vector_replica_set& live_endpoints) { using namespace locator; switch (cl) { diff --git a/db/consistency_level.hh b/db/consistency_level.hh index bfc6ba3e9c..5db558c922 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -47,6 +47,7 @@ #include "exceptions/exceptions.hh" #include "utils/fb_utilities.hh" #include "gms/inet_address.hh" +#include "inet_address_vectors.hh" #include "database.hh" #include @@ -75,19 +76,19 @@ inline size_t count_local_endpoints(const Range& live_endpoints) { return std::count_if(live_endpoints.begin(), live_endpoints.end(), is_local); } -std::vector +inet_address_vector_replica_set filter_for_query(consistency_level cl, keyspace& ks, - std::vector live_endpoints, - const std::vector& preferred_endpoints, + inet_address_vector_replica_set live_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, read_repair_decision read_repair, gms::inet_address* extra, column_family* cf); -std::vector filter_for_query(consistency_level cl, +inet_address_vector_replica_set filter_for_query(consistency_level cl, keyspace& ks, - std::vector& live_endpoints, - const std::vector& preferred_endpoints, + inet_address_vector_replica_set& live_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, column_family* cf); struct dc_node_count { @@ -132,7 +133,7 @@ inline std::unordered_map count_per_dc_endpoints( bool is_sufficient_live_nodes(consistency_level cl, keyspace& ks, - const std::vector& live_endpoints); + const inet_address_vector_replica_set& live_endpoints); template inline bool assure_sufficient_live_nodes_each_quorum( diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 513f10ef39..6ba7cda42b 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -418,7 +418,7 @@ future manager::end_point_hints_manager::sender::get_last_file_modific }); } -future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_mutation_and_schema m, const std::vector& natural_endpoints) noexcept { +future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_mutation_and_schema m, const inet_address_vector_replica_set& natural_endpoints) noexcept { return futurize_invoke([this, m = std::move(m), &natural_endpoints] () mutable -> future<> { // The fact that we send with CL::ALL in both cases below ensures that new hints are not going // to be generated as a result of hints sending. @@ -804,7 +804,7 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_muta keyspace& ks = _db.find_keyspace(m.s->ks_name()); auto& rs = ks.get_replication_strategy(); auto token = dht::get_token(*m.s, m.fm.key()); - std::vector natural_endpoints = rs.get_natural_endpoints(std::move(token)); + inet_address_vector_replica_set natural_endpoints = rs.get_natural_endpoints(std::move(token)); return do_send_one_mutation(std::move(m), natural_endpoints); } diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 4d0c641de9..8046422150 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -36,6 +36,7 @@ #include "lister.hh" #include "gms/gossiper.hh" #include "locator/snitch_base.hh" +#include "inet_address_vectors.hh" #include "service/endpoint_lifecycle_subscriber.hh" #include "db/commitlog/commitlog.hh" #include "utils/loading_shared_values.hh" @@ -292,7 +293,7 @@ public: /// \param m mutation to send /// \param natural_endpoints current replicas for the given mutation /// \return future that resolves when the operation is complete - future<> do_send_one_mutation(frozen_mutation_and_schema m, const std::vector& natural_endpoints) noexcept; + future<> do_send_one_mutation(frozen_mutation_and_schema m, const inet_address_vector_replica_set& natural_endpoints) noexcept; /// \brief Send one mutation out. /// diff --git a/db/hints/messages.hh b/db/hints/messages.hh index e94bc3b47b..b30cc3c14e 100644 --- a/db/hints/messages.hh +++ b/db/hints/messages.hh @@ -23,6 +23,7 @@ #include #include +#include "gms/inet_address.hh" #include "utils/UUID.hh" #include "gms/inet_address.hh" diff --git a/db/view/view.cc b/db/view/view.cc index 0593a51fe8..3db6e3dd2f 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1169,7 +1169,7 @@ get_view_natural_endpoint(const sstring& keyspace_name, return view_endpoints[base_it - base_endpoints.begin()]; } -static future<> apply_to_remote_endpoints(gms::inet_address target, std::vector&& pending_endpoints, +static future<> apply_to_remote_endpoints(gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints, frozen_mutation_and_schema& mut, const dht::token& base_token, const dht::token& view_token, service::allow_hints allow_hints, tracing::trace_state_ptr tr_state) { diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 8b26316b8d..c0cdd5c84f 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -131,7 +131,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, dh } const range& src_range = x.first; if (src_range.contains(desired_range, dht::tri_compare)) { - std::vector& addresses = x.second; + inet_address_vector_replica_set& addresses = x.second; auto preferred = snitch->get_sorted_list_by_proximity(_address, addresses); for (inet_address& p : preferred) { range_sources[desired_range].push_back(p); diff --git a/inet_address_vectors.hh b/inet_address_vectors.hh new file mode 100644 index 0000000000..596a38bcf9 --- /dev/null +++ b/inet_address_vectors.hh @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "gms/inet_address.hh" +#include "utils/small_vector.hh" + +using inet_address_vector_replica_set = utils::small_vector; + +using inet_address_vector_topology_change = utils::small_vector; diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index aae401ca9c..c023923f02 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -74,11 +74,11 @@ void abstract_replication_strategy::validate_replication_strategy(const sstring& } } -std::vector abstract_replication_strategy::get_natural_endpoints(const token& search_token, can_yield can_yield) { +inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, can_yield can_yield) { return do_get_natural_endpoints(search_token, *_shared_token_metadata.get(), can_yield); } -std::vector abstract_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) { +inet_address_vector_replica_set abstract_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) { const token& key_token = tm.first_token(search_token); auto& cached_endpoints = get_cached_endpoints(tm); auto res = cached_endpoints.find(key_token); @@ -94,9 +94,9 @@ std::vector abstract_replication_strategy::do_get_natural_endpoint return res->second; } -std::vector abstract_replication_strategy::get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield can_yield) { +inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield can_yield) { token_metadata_ptr tmptr = _shared_token_metadata.get(); - std::vector natural_endpoints = do_get_natural_endpoints(search_token, *tmptr, can_yield); + inet_address_vector_replica_set natural_endpoints = do_get_natural_endpoints(search_token, *tmptr, can_yield); if (tmptr->is_any_node_being_replaced() && allow_remove_node_being_replaced_from_natural_endpoints()) { // When a new node is started to replace an existing dead node, we want @@ -132,7 +132,7 @@ void abstract_replication_strategy::validate_replication_factor(sstring rf) cons } } -inline std::unordered_map>& +inline std::unordered_map& abstract_replication_strategy::get_cached_endpoints(const token_metadata& tm) { auto ring_version = tm.get_ring_version(); if (_last_invalidated_ring_version != ring_version) { @@ -272,9 +272,9 @@ abstract_replication_strategy::get_address_ranges(const token_metadata& tm, inet return ret; } -std::unordered_map> +std::unordered_map abstract_replication_strategy::get_range_addresses(const token_metadata& tm, can_yield can_yield) const { - std::unordered_map> ret; + std::unordered_map ret; for (auto& t : tm.sorted_tokens()) { dht::token_range_vector ranges = tm.get_primary_ranges_for(t); auto eps = calculate_natural_endpoints(t, tm, can_yield); diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 987be64325..b4881bbda2 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -25,6 +25,7 @@ #include #include #include "gms/inet_address.hh" +#include "locator/snitch_base.hh" #include "dht/i_partitioner.hh" #include "token_metadata.hh" #include "snitch_base.hh" @@ -51,12 +52,12 @@ using can_yield = utils::can_yield; class abstract_replication_strategy { private: long _last_invalidated_ring_version = 0; - std::unordered_map> _cached_endpoints; + std::unordered_map _cached_endpoints; uint64_t _cache_hits_count = 0; static logging::logger logger; - std::unordered_map>& + std::unordered_map& get_cached_endpoints(const token_metadata& tm); protected: sstring _ks_name; @@ -91,15 +92,15 @@ public: snitch_ptr& snitch, const std::map& config_options, replication_strategy_type my_type); - virtual std::vector calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield = can_yield::no) const = 0; + virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield = can_yield::no) const = 0; virtual ~abstract_replication_strategy() {} static std::unique_ptr create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, const shared_token_metadata& stm, const std::map& config_options); static void validate_replication_strategy(const sstring& ks_name, const sstring& strategy_name, const shared_token_metadata& stm, const std::map& config_options); - std::vector get_natural_endpoints(const token& search_token, can_yield = can_yield::no); - std::vector get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield = can_yield::no); + inet_address_vector_replica_set get_natural_endpoints(const token& search_token, can_yield = can_yield::no); + inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield = can_yield::no); virtual void validate_options() const = 0; virtual std::optional> recognized_options() const = 0; virtual size_t get_replication_factor() const = 0; @@ -128,7 +129,7 @@ public: private: // Caller must ensure that token_metadata will not change throughout the call if can_yield::yes. dht::token_range_vector do_get_ranges(inet_address ep, const token_metadata_ptr tmptr, can_yield) const; - virtual std::vector do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield); + virtual inet_address_vector_replica_set do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield); public: // get_primary_ranges() returns the list of "primary ranges" for the given @@ -146,7 +147,7 @@ public: std::unordered_multimap get_address_ranges(const token_metadata& tm, can_yield) const; std::unordered_multimap get_address_ranges(const token_metadata& tm, inet_address endpoint, can_yield) const; - std::unordered_map> get_range_addresses(const token_metadata& tm, can_yield) const; + std::unordered_map get_range_addresses(const token_metadata& tm, can_yield) const; dht::token_range_vector get_pending_address_ranges(const token_metadata_ptr tmptr, token pending_token, inet_address pending_address, can_yield) const; diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index 59a2c9848d..bab2c6b7d2 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -47,13 +47,13 @@ namespace locator { everywhere_replication_strategy::everywhere_replication_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map& config_options) : abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::everywhere_topology) {} -std::vector everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const { - return tm.get_all_endpoints(); +inet_address_vector_replica_set everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const { + return boost::copy_range(tm.get_all_endpoints()); } -std::vector everywhere_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) { +inet_address_vector_replica_set everywhere_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) { if (tm.sorted_tokens().empty()) { - return std::vector({utils::fb_utilities::get_broadcast_address()}); + return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); } return calculate_natural_endpoints(search_token, tm, can_yield); } diff --git a/locator/everywhere_replication_strategy.hh b/locator/everywhere_replication_strategy.hh index ff447fd174..b87a9cc66d 100644 --- a/locator/everywhere_replication_strategy.hh +++ b/locator/everywhere_replication_strategy.hh @@ -46,8 +46,8 @@ class everywhere_replication_strategy : public abstract_replication_strategy { public: everywhere_replication_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map& config_options); - virtual std::vector calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; - std::vector do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override; + virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; + inet_address_vector_replica_set do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override; virtual void validate_options() const override { /* noop */ } diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index b7280d3c74..954b4b4ed0 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -30,12 +30,12 @@ namespace locator { local_strategy::local_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map& config_options) : abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::local) {} -std::vector local_strategy::do_get_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) { +inet_address_vector_replica_set local_strategy::do_get_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) { return calculate_natural_endpoints(t, tm, can_yield); } -std::vector local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield) const { - return std::vector({utils::fb_utilities::get_broadcast_address()}); +inet_address_vector_replica_set local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield) const { + return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); } void local_strategy::validate_options() const { diff --git a/locator/local_strategy.hh b/locator/local_strategy.hh index a8f119380d..3affd43b69 100644 --- a/locator/local_strategy.hh +++ b/locator/local_strategy.hh @@ -36,7 +36,7 @@ using token = dht::token; class local_strategy : public abstract_replication_strategy { protected: - virtual std::vector calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; + virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; public: local_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map& config_options); virtual ~local_strategy() {}; @@ -46,7 +46,7 @@ public: * because the default implementation depends on token calculations but * LocalStrategy may be used before tokens are set up. */ - std::vector do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override; + inet_address_vector_replica_set do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override; virtual void validate_options() const override; diff --git a/locator/network_topology_strategy.cc b/locator/network_topology_strategy.cc index 29c62aea90..62d8a053b7 100644 --- a/locator/network_topology_strategy.cc +++ b/locator/network_topology_strategy.cc @@ -102,7 +102,7 @@ network_topology_strategy::network_topology_strategy( } } -std::vector +inet_address_vector_replica_set network_topology_strategy::calculate_natural_endpoints( const token& search_token, const token_metadata& tm, can_yield can_yield) const { @@ -258,7 +258,7 @@ network_topology_strategy::calculate_natural_endpoints( } } - return std::move(replicas.get_vector()); + return boost::copy_range(replicas.get_vector()); } void network_topology_strategy::validate_options() const { diff --git a/locator/network_topology_strategy.hh b/locator/network_topology_strategy.hh index abe22a02f4..50478e94dd 100644 --- a/locator/network_topology_strategy.hh +++ b/locator/network_topology_strategy.hh @@ -75,7 +75,7 @@ protected: * calculate endpoints in one pass through the tokens by tracking our * progress in each DC, rack etc. */ - virtual std::vector calculate_natural_endpoints( + virtual inet_address_vector_replica_set calculate_natural_endpoints( const token& search_token, const token_metadata& tm, can_yield) const override; virtual void validate_options() const override; diff --git a/locator/simple_snitch.hh b/locator/simple_snitch.hh index fa5cf20659..10776818e3 100644 --- a/locator/simple_snitch.hh +++ b/locator/simple_snitch.hh @@ -66,7 +66,7 @@ struct simple_snitch : public snitch_base { } virtual void sort_by_proximity( - inet_address address, std::vector& addresses) override { + inet_address address, inet_address_vector_replica_set& addresses) override { // Optimization to avoid walking the list } diff --git a/locator/simple_strategy.cc b/locator/simple_strategy.cc index 4840c925d8..2b9dd79a2b 100644 --- a/locator/simple_strategy.cc +++ b/locator/simple_strategy.cc @@ -42,11 +42,11 @@ simple_strategy::simple_strategy(const sstring& keyspace_name, const shared_toke } } -std::vector simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) const { +inet_address_vector_replica_set simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) const { const std::vector& tokens = tm.sorted_tokens(); if (tokens.empty()) { - return std::vector(); + return inet_address_vector_replica_set(); } size_t replicas = get_replication_factor(); @@ -66,7 +66,7 @@ std::vector simple_strategy::calculate_natural_endpoints(const tok endpoints.push_back(*ep); } - return std::move(endpoints.get_vector()); + return boost::copy_range(endpoints.get_vector()); } size_t simple_strategy::get_replication_factor() const { diff --git a/locator/simple_strategy.hh b/locator/simple_strategy.hh index 90a8965a97..9082dfba0a 100644 --- a/locator/simple_strategy.hh +++ b/locator/simple_strategy.hh @@ -30,7 +30,7 @@ namespace locator { class simple_strategy : public abstract_replication_strategy { protected: - virtual std::vector calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; + virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; public: simple_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map& config_options); virtual ~simple_strategy() {}; diff --git a/locator/snitch_base.cc b/locator/snitch_base.cc index 2c056a0a5a..f7412726ed 100644 --- a/locator/snitch_base.cc +++ b/locator/snitch_base.cc @@ -49,11 +49,11 @@ snitch_base::get_endpoint_info(inet_address endpoint, return ep_state ? std::optional(ep_state->value) : std::nullopt; } -std::vector snitch_base::get_sorted_list_by_proximity( +inet_address_vector_replica_set snitch_base::get_sorted_list_by_proximity( inet_address address, - std::vector& unsorted_address) { + inet_address_vector_replica_set& unsorted_address) { - std::vector + inet_address_vector_replica_set preferred(unsorted_address.begin(), unsorted_address.end()); sort_by_proximity(address, preferred); @@ -61,7 +61,7 @@ std::vector snitch_base::get_sorted_list_by_proximity( } void snitch_base::sort_by_proximity( - inet_address address, std::vector& addresses) { + inet_address address, inet_address_vector_replica_set& addresses) { std::sort(addresses.begin(), addresses.end(), [this, &address](inet_address& a1, inet_address& a2) @@ -122,9 +122,9 @@ int snitch_base::compare_endpoints( } bool snitch_base::is_worth_merging_for_range_query( - std::vector& merged, - std::vector& l1, - std::vector& l2) { + inet_address_vector_replica_set& merged, + inet_address_vector_replica_set& l1, + inet_address_vector_replica_set& l2) { // // Querying remote DC is likely to be an order of magnitude slower than // querying locally, so 2 queries to local nodes is likely to still be @@ -136,7 +136,7 @@ bool snitch_base::is_worth_merging_for_range_query( : true; } -bool snitch_base::has_remote_node(std::vector& l) { +bool snitch_base::has_remote_node(inet_address_vector_replica_set& l) { for (auto&& ep : l) { if (_my_dc != get_datacenter(ep)) { return true; diff --git a/locator/snitch_base.hh b/locator/snitch_base.hh index e4a5d47be4..423205d18b 100644 --- a/locator/snitch_base.hh +++ b/locator/snitch_base.hh @@ -44,6 +44,7 @@ #include #include "gms/inet_address.hh" +#include "inet_address_vectors.hh" #include "gms/versioned_value.hh" #include #include @@ -94,16 +95,16 @@ public: /** * returns a new List sorted by proximity to the given endpoint */ - virtual std::vector get_sorted_list_by_proximity( + virtual inet_address_vector_replica_set get_sorted_list_by_proximity( inet_address address, - std::vector& unsorted_address) = 0; + inet_address_vector_replica_set& unsorted_address) = 0; /** * This method will sort the List by proximity to the given * address. */ virtual void sort_by_proximity( - inet_address address, std::vector& addresses) = 0; + inet_address address, inet_address_vector_replica_set& addresses) = 0; /** * compares two endpoints in relation to the target endpoint, returning as @@ -127,9 +128,9 @@ public: * against l2. */ virtual bool is_worth_merging_for_range_query( - std::vector& merged, - std::vector& l1, - std::vector& l2) = 0; + inet_address_vector_replica_set& merged, + inet_address_vector_replica_set& l1, + inet_address_vector_replica_set& l2) = 0; virtual ~i_endpoint_snitch() { assert(_state == snitch_state::stopped); }; @@ -426,25 +427,25 @@ public: // virtual sstring get_datacenter(inet_address endpoint) = 0; // - virtual std::vector get_sorted_list_by_proximity( + virtual inet_address_vector_replica_set get_sorted_list_by_proximity( inet_address address, - std::vector& unsorted_address) override; + inet_address_vector_replica_set& unsorted_address) override; virtual void sort_by_proximity( - inet_address address, std::vector& addresses) override; + inet_address address, inet_address_vector_replica_set& addresses) override; virtual int compare_endpoints( inet_address& address, inet_address& a1, inet_address& a2) override; virtual bool is_worth_merging_for_range_query( - std::vector& merged, - std::vector& l1, - std::vector& l2) override; + inet_address_vector_replica_set& merged, + inet_address_vector_replica_set& l1, + inet_address_vector_replica_set& l2) override; virtual future<> gossip_snitch_info(std::list> info) override; private: - bool has_remote_node(std::vector& l); + bool has_remote_node(inet_address_vector_replica_set& l); protected: static std::optional get_endpoint_info(inet_address endpoint, diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index e4491e6afb..200cf180de 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -735,7 +735,7 @@ public: #endif public: // returns empty vector if keyspace_name not found. - std::vector pending_endpoints_for(const token& token, const sstring& keyspace_name) const; + inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const; #if 0 /** * @deprecated retained for benefit of old tests @@ -1676,7 +1676,7 @@ void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) { _replacing_endpoints.erase(existing_node); } -std::vector token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { +inet_address_vector_topology_change token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { // Fast path 0: pending ranges not found for this keyspace_name const auto pr_it = _pending_ranges_interval_map.find(keyspace_name); if (pr_it == _pending_ranges_interval_map.end()) { @@ -1690,12 +1690,12 @@ std::vector token_metadata_impl::pending_endpoints_for(const } // Slow path: lookup pending ranges - std::vector endpoints; + inet_address_vector_topology_change endpoints; auto interval = range_to_interval(range(token)); const auto it = ks_map.find(interval); if (it != ks_map.end()) { // interval_map does not work with std::vector, convert to std::vector of ips - endpoints = std::vector(it->second.begin(), it->second.end()); + endpoints = inet_address_vector_topology_change(it->second.begin(), it->second.end()); } return endpoints; } @@ -2050,7 +2050,7 @@ token_metadata::count_normal_token_owners() const { return _impl->count_normal_token_owners(); } -std::vector +inet_address_vector_topology_change token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { return _impl->pending_endpoints_for(token, keyspace_name); } diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 6fe5994c4d..feb9ad053a 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -43,6 +43,7 @@ #include #include "gms/inet_address.hh" #include "dht/i_partitioner.hh" +#include "inet_address_vectors.hh" #include "utils/UUID.hh" #include #include @@ -334,7 +335,7 @@ public: size_t count_normal_token_owners() const; // returns empty vector if keyspace_name not found. - std::vector pending_endpoints_for(const token& token, const sstring& keyspace_name) const; + inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const; /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */ std::multimap get_endpoint_to_token_map_for_reading() const; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 9780b086bc..e712b0bc6d 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1189,7 +1189,7 @@ void messaging_service::register_mutation(std::function messaging_service::unregister_mutation() { return unregister_handler(netw::messaging_verb::MUTATION); } -future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, +future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::MUTATION, std::move(id), fm, std::move(forward), std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); @@ -1462,7 +1462,7 @@ future<> messaging_service::unregister_paxos_learn() { return unregister_handler(netw::messaging_verb::PAXOS_LEARN); } future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point timeout, const service::paxos::proposal& decision, - std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, + inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_LEARN, std::move(id), decision, std::move(forward), std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); @@ -1488,7 +1488,7 @@ void messaging_service::register_hint_mutation(std::function messaging_service::unregister_hint_mutation() { return unregister_handler(netw::messaging_verb::HINT_MUTATION); } -future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, +future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::HINT_MUTATION, std::move(id), fm, std::move(forward), std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 3d374750db..ec7aac3b04 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -27,6 +27,7 @@ #include #include #include "gms/inet_address.hh" +#include "inet_address_vectors.hh" #include #include #include "query-request.hh" @@ -462,7 +463,7 @@ public: void register_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional> trace_info)>&& func); future<> unregister_mutation(); - future<> send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, + future<> send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info = std::nullopt); // Wrapper for COUNTER_MUTATION @@ -543,7 +544,7 @@ public: future<> unregister_paxos_learn(); future<> send_paxos_learn(msg_addr id, clock_type::time_point timeout, const service::paxos::proposal& decision, - std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, + inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info = std::nullopt); void register_paxos_prune(std::function(const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key, @@ -557,7 +558,7 @@ public: void register_hint_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional> trace_info)>&& func); future<> unregister_hint_mutation(); - future<> send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, + future<> send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info = std::nullopt); void register_hint_sync_point_create(std::function (db::hints::sync_point_create_request request)>&& func); diff --git a/repair/repair.cc b/repair/repair.cc index d1ec30607d..9406e3394b 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -285,7 +285,7 @@ static std::vector get_neighbors(database& db, ret.erase(it, ret.end()); } - return ret; + return boost::copy_range>(std::move(ret)); #if 0 // Origin's ActiveRepairService.getNeighbors() also verifies that the @@ -1592,7 +1592,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded> current_replica_endpoints; + std::unordered_map current_replica_endpoints; // Find (for each range) all nodes that store replicas for these ranges as well for (auto& r : ranges) { auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); @@ -1615,8 +1615,8 @@ static future<> do_decommission_removenode_with_repair(seastar::shardedcheck_abort(); } auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - const std::vector new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, utils::can_yield::yes); - const std::vector& current_eps = current_replica_endpoints[r]; + const inet_address_vector_replica_set new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, utils::can_yield::yes); + const inet_address_vector_replica_set& current_eps = current_replica_endpoints[r]; std::unordered_set neighbors_set(new_eps.begin(), new_eps.end()); bool skip_this_range = false; auto new_owner = neighbors_set; @@ -1687,7 +1687,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded>(new_eps)); } } else { throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, wrong nubmer of new owner node={}", diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 5563c4d850..de017a1745 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -171,7 +171,7 @@ public: virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) = 0; virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) = 0; - virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) = 0; virtual bool is_shared() = 0; @@ -222,7 +222,7 @@ public: } return make_ready_future<>(); } - virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) override { auto m = _mutations[ep]; @@ -270,7 +270,7 @@ public: tracing::trace(tr_state, "Executing a mutation locally"); return sp.mutate_locally(_schema, *_mutation, std::move(tr_state), db::commitlog::force_sync::no, timeout); } - virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) override { tracing::trace(tr_state, "Sending a mutation to /{}", ep); @@ -299,7 +299,7 @@ public: // becomes unavailable - this might include the current node return sp.mutate_hint(_schema, *_mutation, std::move(tr_state), timeout); } - virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) override { tracing::trace(tr_state, "Sending a hint to /{}", ep); @@ -326,7 +326,7 @@ public: tracing::trace(tr_state, "Executing a learn locally"); return paxos::paxos_state::learn(_schema, *_proposal, timeout, tr_state); } - virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) override { tracing::trace(tr_state, "Sending a learn to /{}", ep); @@ -369,7 +369,7 @@ protected: // added dead_endpoints as a memeber here as well. This to be able to carry the info across // calls in helper methods in a convinient way. Since we hope this will be empty most of the time // it should not be a huge burden. (flw) - std::vector _dead_endpoints; + inet_address_vector_topology_change _dead_endpoints; size_t _cl_acks = 0; bool _cl_achieved = false; bool _throttled = false; @@ -394,7 +394,7 @@ protected: public: abstract_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, tracing::trace_state_ptr trace_state, - storage_proxy::write_stats& stats, service_permit permit, size_t pending_endpoints = 0, std::vector dead_endpoints = {}) + storage_proxy::write_stats& stats, service_permit permit, size_t pending_endpoints = 0, inet_address_vector_topology_change dead_endpoints = {}) : _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), _dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }), _permit(std::move(permit)) { // original comment from cassandra: @@ -588,7 +588,7 @@ public: const std::unordered_set& get_targets() const { return _targets; } - const std::vector& get_dead_endpoints() const { + const inet_address_vector_topology_change& get_dead_endpoints() const { return _dead_endpoints; } bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) { @@ -597,7 +597,7 @@ public: future<> apply_locally(storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { return _mutation_holder->apply_locally(*_proxy, timeout, std::move(tr_state)); } - future<> apply_remotely(gms::inet_address ep, std::vector&& forward, + future<> apply_remotely(gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { return _mutation_holder->apply_remotely(*_proxy, ep, std::move(forward), response_id, timeout, std::move(tr_state)); @@ -631,7 +631,7 @@ class datacenter_write_response_handler : public abstract_write_response_handler public: datacenter_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), std::move(targets), std::move(tr_state), stats, std::move(permit), db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) { @@ -646,7 +646,7 @@ class write_response_handler : public abstract_write_response_handler { public: write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), std::move(targets), std::move(tr_state), stats, std::move(permit), pending_endpoints.size(), std::move(dead_endpoints)) { @@ -658,7 +658,7 @@ class view_update_write_response_handler : public write_response_handler, public public: view_update_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, std::unique_ptr mh, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit): write_response_handler(p, ks, cl, db::write_type::VIEW, std::move(mh), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit)) { @@ -737,8 +737,8 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha } public: datacenter_sync_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - std::unique_ptr mh, std::unordered_set targets, const std::vector& pending_endpoints, - std::vector dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : + std::unique_ptr mh, std::unordered_set targets, const inet_address_vector_topology_change& pending_endpoints, + inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), 0, dead_endpoints) { auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); @@ -1289,9 +1289,9 @@ bool paxos_response_handler::learned(gms::inet_address ep) { return false; } -static std::vector +static inet_address_vector_replica_set replica_ids_to_endpoints(const locator::token_metadata& tm, const std::vector& replica_ids) { - std::vector endpoints; + inet_address_vector_replica_set endpoints; endpoints.reserve(replica_ids.size()); for (const auto& replica_id : replica_ids) { @@ -1304,7 +1304,7 @@ replica_ids_to_endpoints(const locator::token_metadata& tm, const std::vector -endpoints_to_replica_ids(const locator::token_metadata& tm, const std::vector& endpoints) { +endpoints_to_replica_ids(const locator::token_metadata& tm, const inet_address_vector_replica_set& endpoints) { std::vector replica_ids; replica_ids.reserve(endpoints.size()); @@ -1415,7 +1415,7 @@ future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_ } storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, - std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + std::unordered_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) { shared_ptr h; @@ -1902,8 +1902,8 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok auto keyspace_name = s->ks_name(); keyspace& ks = _db.local().find_keyspace(keyspace_name); auto& rs = ks.get_replication_strategy(); - std::vector natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token); - std::vector pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, keyspace_name); + inet_address_vector_replica_set natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token); + inet_address_vector_topology_change pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, keyspace_name); slogger.trace("creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints, pending_endpoints); tracing::trace(tr_state, "Creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints ,pending_endpoints); @@ -1939,7 +1939,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok // filter live endpoints from dead ones std::unordered_set live_endpoints; - std::vector dead_endpoints; + inet_address_vector_topology_change dead_endpoints; live_endpoints.reserve(all.size()); dead_endpoints.reserve(all.size()); std::partition_copy(all.begin(), all.end(), std::inserter(live_endpoints, live_endpoints.begin()), @@ -1985,7 +1985,7 @@ storage_proxy::create_write_response_handler(const std::unordered_mapschema()->ks_name(); keyspace& ks = _db.local().find_keyspace(keyspace_name); - return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector(), std::vector(), std::move(tr_state), get_stats(), std::move(permit)); + return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit)); } storage_proxy::response_id_type @@ -2009,7 +2009,7 @@ storage_proxy::create_write_response_handler(const std::tuple(std::move(commit), s, nullptr), std::move(endpoints), - std::vector(), std::vector(), std::move(tr_state), get_stats(), std::move(permit)); + inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit)); } void storage_proxy::register_cdc_operation_result_tracker(const std::vector& ids, lw_shared_ptr tracker) { @@ -2145,7 +2145,7 @@ gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation& return my_address; } - const auto local_endpoints = boost::copy_range>(live_endpoints | boost::adaptors::filtered([&] (auto&& ep) { + const auto local_endpoints = boost::copy_range(live_endpoints | boost::adaptors::filtered([&] (auto&& ep) { return db::is_local(ep); })); @@ -2226,8 +2226,8 @@ storage_proxy::paxos_participants storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &token, db::consistency_level cl_for_paxos) { keyspace& ks = _db.local().find_keyspace(ks_name); auto& rs = ks.get_replication_strategy(); - std::vector natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token); - std::vector pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, ks_name); + inet_address_vector_replica_set natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token); + inet_address_vector_topology_change pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, ks_name); if (cl_for_paxos == db::consistency_level::LOCAL_SERIAL) { auto itend = boost::range::remove_if(natural_endpoints, std::not_fn(std::cref(db::is_local))); @@ -2247,7 +2247,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token & const size_t quorum_size = natural_endpoints.size() / 2 + 1; const size_t required_participants = quorum_size + pending_endpoints.size(); - std::vector live_endpoints; + inet_address_vector_replica_set live_endpoints; live_endpoints.reserve(participants); boost::copy(boost::range::join(natural_endpoints, pending_endpoints) | @@ -2497,7 +2497,7 @@ bool storage_proxy::cannot_hint(const Range& targets, db::write_type type) const future<> storage_proxy::send_to_endpoint( std::unique_ptr m, gms::inet_address target, - std::vector pending_endpoints, + inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, @@ -2519,7 +2519,7 @@ future<> storage_proxy::send_to_endpoint( db::write_type type, service_permit permit) mutable { std::unordered_set targets; targets.reserve(pending_endpoints.size() + 1); - std::vector dead_endpoints; + inet_address_vector_topology_change dead_endpoints; boost::algorithm::partition_copy( boost::range::join(pending_endpoints, target), std::inserter(targets, targets.begin()), @@ -2549,7 +2549,7 @@ future<> storage_proxy::send_to_endpoint( future<> storage_proxy::send_to_endpoint( frozen_mutation_and_schema fm_a_s, gms::inet_address target, - std::vector pending_endpoints, + inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, allow_hints allow_hints) { @@ -2566,7 +2566,7 @@ future<> storage_proxy::send_to_endpoint( future<> storage_proxy::send_to_endpoint( frozen_mutation_and_schema fm_a_s, gms::inet_address target, - std::vector pending_endpoints, + inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, @@ -2631,8 +2631,8 @@ future<> storage_proxy::send_hint_to_all_replicas(frozen_mutation_and_schema fm_ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type response_id, clock_type::time_point timeout) { // extra-datacenter replicas, grouped by dc - std::unordered_map> dc_groups; - std::vector>> local; + std::unordered_map dc_groups; + std::vector> local; local.reserve(3); auto handler_ptr = get_write_response_handler(response_id); @@ -2644,7 +2644,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo sstring dc = get_dc(dest); // read repair writes do not go through coordinator since mutations are per destination if (handler.read_repair_write() || dc == get_local_dc()) { - local.emplace_back("", std::vector({dest})); + local.emplace_back("", inet_address_vector_replica_set({dest})); } else { dc_groups[dc].push_back(dest); } @@ -2664,7 +2664,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo }; // lambda for applying mutation remotely - auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &global_stats] (gms::inet_address coordinator, std::vector&& forward) { + auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &global_stats] (gms::inet_address coordinator, inet_address_vector_replica_set&& forward) { auto msize = handler_ptr->get_mutation_size(); // can overestimate for repair writes global_stats.queued_write_bytes += msize; @@ -3384,7 +3384,7 @@ public: class abstract_read_executor : public enable_shared_from_this { protected: - using targets_iterator = std::vector::iterator; + using targets_iterator = inet_address_vector_replica_set::iterator; using digest_resolver_ptr = ::shared_ptr; using data_resolver_ptr = ::shared_ptr; using clock_type = storage_proxy::clock_type; @@ -3396,9 +3396,9 @@ protected: dht::partition_range _partition_range; db::consistency_level _cl; size_t _block_for; - std::vector _targets; + inet_address_vector_replica_set _targets; // Targets that were succesfully used for a data or digest request - std::vector _used_targets; + inet_address_vector_replica_set _used_targets; promise>> _result_promise; tracing::trace_state_ptr _trace_state; lw_shared_ptr _cf; @@ -3413,7 +3413,7 @@ private: } public: abstract_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for, - std::vector targets, tracing::trace_state_ptr trace_state, service_permit permit) : + inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit) : _schema(std::move(s)), _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)), _trace_state(std::move(trace_state)), _cf(std::move(cf)), _permit(std::move(permit)) { _proxy->get_stats().reads++; @@ -3428,7 +3428,7 @@ public: /// /// Only filled after the request is finished, call only after /// execute()'s future is ready. - std::vector used_targets() const { + inet_address_vector_replica_set used_targets() const { return _used_targets; } @@ -3716,7 +3716,7 @@ public: class never_speculating_read_executor : public abstract_read_executor { public: - never_speculating_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, std::vector targets, tracing::trace_state_ptr trace_state, + never_speculating_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit) : abstract_read_executor(std::move(s), std::move(cf), std::move(proxy), std::move(cmd), std::move(pr), cl, 0, std::move(targets), std::move(trace_state), std::move(permit)) { _block_for = _targets.size(); @@ -3810,7 +3810,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s db::consistency_level cl, db::read_repair_decision repair_decision, tracing::trace_state_ptr trace_state, - const std::vector& preferred_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, bool& is_read_non_local, service_permit permit) { const dht::token& token = pr.start()->value().token(); @@ -3818,7 +3818,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s speculative_retry::type retry_type = schema->speculative_retry().get_type(); gms::inet_address extra_replica; - std::vector all_replicas = get_live_sorted_endpoints(ks, token); + inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(ks, token); // Check for a non-local read before heat-weighted load balancing // reordering of endpoints happens. The local endpoint, if // present, is always first in the list, as get_live_sorted_endpoints() @@ -3826,7 +3826,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s is_read_non_local |= !all_replicas.empty() && all_replicas.front() != utils::fb_utilities::get_broadcast_address(); auto cf = _db.local().find_column_family(schema).shared_from_this(); - std::vector target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision, + inet_address_vector_replica_set target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision, retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica, _db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr); @@ -3956,7 +3956,7 @@ storage_proxy::query_singular(lw_shared_ptr cmd, auto token_range = dht::token_range::make_singular(pr.start()->value().token()); auto it = query_options.preferred_replicas.find(token_range); const auto replicas = it == query_options.preferred_replicas.end() - ? std::vector{} : replica_ids_to_endpoints(*tmptr, it->second); + ? inet_address_vector_replica_set{} : replica_ids_to_endpoints(*tmptr, it->second); auto read_executor = get_read_executor(cmd, schema, std::move(pr), cl, repair_decision, query_options.trace_state, replicas, is_read_non_local, @@ -4032,7 +4032,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t const auto preferred_replicas_for_range = [this, &preferred_replicas, tmptr] (const dht::partition_range& r) { auto it = preferred_replicas.find(r.transform(std::mem_fn(&dht::ring_position::token))); - return it == preferred_replicas.end() ? std::vector{} : replica_ids_to_endpoints(*tmptr, it->second); + return it == preferred_replicas.end() ? inet_address_vector_replica_set{} : replica_ids_to_endpoints(*tmptr, it->second); }; const auto to_token_range = [] (const dht::partition_range& r) { return r.transform(std::mem_fn(&dht::ring_position::token)); }; @@ -4047,9 +4047,9 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t while (i != ranges.end()) { dht::partition_range& range = *i; - std::vector live_endpoints = get_live_sorted_endpoints(ks, end_token(range)); - std::vector merged_preferred_replicas = preferred_replicas_for_range(*i); - std::vector filtered_endpoints = filter_for_query(cl, ks, live_endpoints, merged_preferred_replicas, pcf); + inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(ks, end_token(range)); + inet_address_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i); + inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, ks, live_endpoints, merged_preferred_replicas, pcf); std::vector merged_ranges{to_token_range(range)}; ++i; @@ -4060,8 +4060,8 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t { const auto current_range_preferred_replicas = preferred_replicas_for_range(*i); dht::partition_range& next_range = *i; - std::vector next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range)); - std::vector next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, current_range_preferred_replicas, pcf); + inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range)); + inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, current_range_preferred_replicas, pcf); // Origin has this to say here: // * If the current range right is the min token, we should stop merging because CFS.getRangeSlice @@ -4074,22 +4074,22 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t break; } - std::vector merged = intersection(live_endpoints, next_endpoints); - std::vector current_merged_preferred_replicas = intersection(merged_preferred_replicas, current_range_preferred_replicas); + inet_address_vector_replica_set merged = intersection(live_endpoints, next_endpoints); + inet_address_vector_replica_set current_merged_preferred_replicas = intersection(merged_preferred_replicas, current_range_preferred_replicas); // Check if there is enough endpoint for the merge to be possible. if (!is_sufficient_live_nodes(cl, ks, merged)) { break; } - std::vector filtered_merged = filter_for_query(cl, ks, merged, current_merged_preferred_replicas, pcf); + inet_address_vector_replica_set filtered_merged = filter_for_query(cl, ks, merged, current_merged_preferred_replicas, pcf); // Estimate whether merging will be a win or not if (!locator::i_endpoint_snitch::get_local_snitch_ptr()->is_worth_merging_for_range_query(filtered_merged, filtered_endpoints, next_filtered_endpoints)) { break; } else if (pcf) { // check that merged set hit rate is not to low - auto find_min = [pcf] (const std::vector& range) { + auto find_min = [pcf] (const inet_address_vector_replica_set& range) { struct { column_family* cf = nullptr; float operator()(const gms::inet_address& ep) const { @@ -4587,15 +4587,15 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque co_return condition_met; } -std::vector storage_proxy::get_live_endpoints(keyspace& ks, const dht::token& token) const { +inet_address_vector_replica_set storage_proxy::get_live_endpoints(keyspace& ks, const dht::token& token) const { auto& rs = ks.get_replication_strategy(); - std::vector eps = rs.get_natural_endpoints_without_node_being_replaced(token); + inet_address_vector_replica_set eps = rs.get_natural_endpoints_without_node_being_replaced(token); auto itend = boost::range::remove_if(eps, std::not1(std::bind1st(std::mem_fn(&gms::gossiper::is_alive), &gms::get_local_gossiper()))); eps.erase(itend, eps.end()); return eps; } -void storage_proxy::sort_endpoints_by_proximity(std::vector& eps) { +void storage_proxy::sort_endpoints_by_proximity(inet_address_vector_replica_set& eps) { locator::i_endpoint_snitch::get_local_snitch_ptr()->sort_by_proximity(utils::fb_utilities::get_broadcast_address(), eps); // FIXME: before dynamic snitch is implement put local address (if present) at the beginning auto it = boost::range::find(eps, utils::fb_utilities::get_broadcast_address()); @@ -4604,14 +4604,14 @@ void storage_proxy::sort_endpoints_by_proximity(std::vector& } } -std::vector storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const { +inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const { auto eps = get_live_endpoints(ks, token); sort_endpoints_by_proximity(eps); return eps; } -std::vector storage_proxy::intersection(const std::vector& l1, const std::vector& l2) { - std::vector inter; +inet_address_vector_replica_set storage_proxy::intersection(const inet_address_vector_replica_set& l1, const inet_address_vector_replica_set& l2) { + inet_address_vector_replica_set inter; inter.reserve(l1.size()); std::remove_copy_if(l1.begin(), l1.end(), std::back_inserter(inter), [&l2] (const gms::inet_address& a) { return std::find(l2.begin(), l2.end(), a) == l2.end(); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 9df7d8acc0..aaff44fa56 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -45,6 +45,7 @@ #include "query-request.hh" #include "query-result.hh" #include "query-result-set.hh" +#include "inet_address_vectors.hh" #include #include #include @@ -242,7 +243,7 @@ public: // Holds a list of endpoints participating in CAS request, for a given // consistency level, token, and state of joining/leaving nodes. struct paxos_participants { - std::vector endpoints; + inet_address_vector_replica_set endpoints; // How many participants are required for a quorum (i.e. is it SERIAL or LOCAL_SERIAL). size_t required_participants; bool has_dead_endpoints; @@ -337,7 +338,7 @@ private: std::unique_ptr mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit); + const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit); response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(const std::unordered_map>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); @@ -354,9 +355,9 @@ private: bool cannot_hint(const Range& targets, db::write_type type) const; bool hints_enabled(db::write_type type) const noexcept; db::hints::manager& hints_manager_for(db::write_type type); - std::vector get_live_endpoints(keyspace& ks, const dht::token& token) const; - static void sort_endpoints_by_proximity(std::vector& eps); - std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const; + inet_address_vector_replica_set get_live_endpoints(keyspace& ks, const dht::token& token) const; + static void sort_endpoints_by_proximity(inet_address_vector_replica_set& eps); + inet_address_vector_replica_set get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const; db::read_repair_decision new_read_repair_decision(const schema& s); ::shared_ptr get_read_executor(lw_shared_ptr cmd, schema_ptr schema, @@ -364,7 +365,7 @@ private: db::consistency_level cl, db::read_repair_decision repair_decision, tracing::trace_state_ptr trace_state, - const std::vector& preferred_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, bool& is_bounced_read, service_permit permit); future>, cache_temperature>> query_result_local(schema_ptr, lw_shared_ptr cmd, const dht::partition_range& pr, @@ -379,7 +380,7 @@ private: dht::partition_range_vector partition_ranges, db::consistency_level cl, coordinator_query_options optional_params); - static std::vector intersection(const std::vector& l1, const std::vector& l2); + static inet_address_vector_replica_set intersection(const inet_address_vector_replica_set& l1, const inet_address_vector_replica_set& l2); future query_partition_key_range_concurrent(clock_type::time_point timeout, std::vector>>&& results, lw_shared_ptr cmd, @@ -433,7 +434,7 @@ private: future<> send_to_endpoint( std::unique_ptr m, gms::inet_address target, - std::vector pending_endpoints, + inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, @@ -561,9 +562,9 @@ public: // Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without // hinted handoff support, and just one target. See also // send_to_live_endpoints() - another take on the same original function. - future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector pending_endpoints, db::write_type type, + future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints allow_hints = allow_hints::yes); - future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector pending_endpoints, db::write_type type, + future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, allow_hints allow_hints = allow_hints::yes); // Send a mutation to a specific remote target as a hint. @@ -692,7 +693,7 @@ private: // Is either set explicitly or derived from the consistency level set in keyspace options. db::consistency_level _cl_for_learn; // Live endpoints, as per get_paxos_participants() - std::vector _live_endpoints; + inet_address_vector_replica_set _live_endpoints; // How many endpoints need to respond favourably for the protocol to progress to the next step. size_t _required_participants; // A deadline when the entire CAS operation timeout expires, derived from write_request_timeout_in_ms diff --git a/service/storage_service.cc b/service/storage_service.cc index b7af960e68..c68ea41973 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -773,12 +773,12 @@ storage_service::get_rpc_address(const inet_address& endpoint) const { return boost::lexical_cast(endpoint); } -std::unordered_map> +std::unordered_map storage_service::get_range_to_address_map(const sstring& keyspace) const { return get_range_to_address_map(keyspace, get_token_metadata().sorted_tokens()); } -std::unordered_map> +std::unordered_map storage_service::get_range_to_address_map_in_local_dc( const sstring& keyspace) const { std::function filter = [this](const inet_address& address) { @@ -786,7 +786,7 @@ storage_service::get_range_to_address_map_in_local_dc( }; auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc()); - std::unordered_map> filtered_map; + std::unordered_map filtered_map; for (auto entry : orig_map) { auto& addresses = filtered_map[entry.first]; addresses.reserve(entry.second.size()); @@ -815,7 +815,7 @@ storage_service::is_local_dc(const inet_address& targetHost) const { return remote_dc == local_dc; } -std::unordered_map> +std::unordered_map storage_service::get_range_to_address_map(const sstring& keyspace, const std::vector& sorted_tokens) const { sstring ks = keyspace; @@ -2752,7 +2752,7 @@ std::unordered_multimap storage_service::get_cha slogger.debug("Node {} ranges [{}]", endpoint, ranges); - std::unordered_map> current_replica_endpoints; + std::unordered_map current_replica_endpoints; // Find (for each range) all nodes that store replicas for these ranges as well auto tmptr = get_token_metadata_ptr(); @@ -2791,7 +2791,7 @@ std::unordered_multimap storage_service::get_cha auto rg = current_replica_endpoints.equal_range(r); for (auto it = rg.first; it != rg.second; it++) { const dht::token_range& range_ = it->first; - std::vector& current_eps = it->second; + inet_address_vector_replica_set& current_eps = it->second; slogger.debug("range={}, current_replica_endpoints={}, new_replica_endpoints={}", range_, current_eps, new_replica_endpoints); for (auto ep : it->second) { auto beg = new_replica_endpoints.begin(); @@ -3149,7 +3149,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name, ops_uuid, nr_sst_current, nr_sst_current + sst_processed.size(), nr_sst_total, sst_names); auto start_time = std::chrono::steady_clock::now(); - std::vector current_targets; + inet_address_vector_replica_set current_targets; std::unordered_map metas; size_t num_partitions_processed = 0; size_t num_bytes_read = 0; @@ -3315,19 +3315,19 @@ storage_service::get_new_source_ranges(const sstring& keyspace_name, const dht:: auto my_address = get_broadcast_address(); auto& ks = _db.local().find_keyspace(keyspace_name); auto& strat = ks.get_replication_strategy(); - std::unordered_map> range_addresses = strat.get_range_addresses(tm, utils::can_yield::yes); + std::unordered_map range_addresses = strat.get_range_addresses(tm, utils::can_yield::yes); std::unordered_multimap source_ranges; // find alive sources for our new ranges for (auto r : ranges) { - std::vector possible_nodes; + inet_address_vector_replica_set possible_nodes; auto it = range_addresses.find(r); if (it != range_addresses.end()) { possible_nodes = it->second; } auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); - std::vector sources = snitch->get_sorted_list_by_proximity(my_address, possible_nodes); + inet_address_vector_replica_set sources = snitch->get_sorted_list_by_proximity(my_address, possible_nodes); if (std::find(sources.begin(), sources.end(), my_address) != sources.end()) { auto err = format("get_new_source_ranges: sources={}, my_address={}", sources, my_address); @@ -3357,7 +3357,7 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ std::vector ranges; //Token.TokenFactory tf = getPartitioner().getTokenFactory(); - std::unordered_map> range_to_address_map = + std::unordered_map range_to_address_map = include_only_local_dc ? get_range_to_address_map_in_local_dc(keyspace) : get_range_to_address_map(keyspace); @@ -3401,11 +3401,11 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ return ranges; } -std::unordered_map> +std::unordered_map storage_service::construct_range_to_endpoint_map( const sstring& keyspace, const dht::token_range_vector& ranges) const { - std::unordered_map> res; + std::unordered_map res; for (auto r : ranges) { res[r] = _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints( r.end() ? r.end()->value() : dht::maximum_token()); @@ -3670,7 +3670,7 @@ storage_service::get_all_ranges(const std::vector& sorted_tokens) const { return ranges; } -std::vector +inet_address_vector_replica_set storage_service::get_natural_endpoints(const sstring& keyspace, const sstring& cf, const sstring& key) const { auto schema = _db.local().find_schema(keyspace, cf); @@ -3679,7 +3679,7 @@ storage_service::get_natural_endpoints(const sstring& keyspace, return get_natural_endpoints(keyspace, token); } -std::vector +inet_address_vector_replica_set storage_service::get_natural_endpoints(const sstring& keyspace, const token& pos) const { return _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(pos); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 1745331c09..4505903cff 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -43,6 +43,7 @@ #include "service/endpoint_lifecycle_subscriber.hh" #include "locator/token_metadata.hh" #include "gms/gossiper.hh" +#include "inet_address_vectors.hh" #include #include "dht/i_partitioner.hh" #include "dht/token_range_endpoints.hh" @@ -469,16 +470,16 @@ public: */ sstring get_rpc_address(const inet_address& endpoint) const; - std::unordered_map> get_range_to_address_map(const sstring& keyspace) const; + std::unordered_map get_range_to_address_map(const sstring& keyspace) const; - std::unordered_map> get_range_to_address_map_in_local_dc( + std::unordered_map get_range_to_address_map_in_local_dc( const sstring& keyspace) const; std::vector get_tokens_in_local_dc() const; bool is_local_dc(const inet_address& targetHost) const; - std::unordered_map> get_range_to_address_map(const sstring& keyspace, + std::unordered_map get_range_to_address_map(const sstring& keyspace, const std::vector& sorted_tokens) const; /** @@ -511,7 +512,7 @@ public: * @param ranges * @return mapping of ranges to the replicas responsible for them. */ - std::unordered_map> construct_range_to_endpoint_map( + std::unordered_map construct_range_to_endpoint_map( const sstring& keyspace, const dht::token_range_vector& ranges) const; public: @@ -728,7 +729,7 @@ public: * @param key key for which we need to find the endpoint * @return the endpoint responsible for this key */ - std::vector get_natural_endpoints(const sstring& keyspace, + inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const sstring& cf, const sstring& key) const; /** @@ -739,7 +740,7 @@ public: * @param pos position for which we need to find the endpoint * @return the endpoint responsible for this token */ - std::vector get_natural_endpoints(const sstring& keyspace, const token& pos) const; + inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const token& pos) const; /** * @return Vector of Token ranges (_not_ keys!) together with estimated key count, diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index 8f5e636e2b..b1dcc95e8d 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -48,7 +48,7 @@ struct ring_point { inet_address host; }; -void print_natural_endpoints(double point, const std::vector v) { +void print_natural_endpoints(double point, const inet_address_vector_replica_set v) { testlog.debug("Natural endpoints for a token {}:", point); std::string str; std::ostringstream strm(str); @@ -105,7 +105,7 @@ void strategy_sanity_check( void endpoints_check( abstract_replication_strategy* ars_ptr, - std::vector& endpoints) { + inet_address_vector_replica_set& endpoints) { // Check the total RF BOOST_CHECK(endpoints.size() == ars_ptr->get_replication_factor()); diff --git a/utils/small_vector.hh b/utils/small_vector.hh index 42667a8bf5..c387f15a47 100644 --- a/utils/small_vector.hh +++ b/utils/small_vector.hh @@ -468,4 +468,19 @@ public: } }; +template +std::ostream& operator<<(std::ostream& os, const utils::small_vector& v) { + os << "{"; + bool first = true; + for (auto&& e : v) { + if (!first) { + os << ", "; + } + first = false; + os << e; + } + os << "}"; + return os; +} + }