From 84ea06f15b8076ed29e9e7a6c6493b54870ab384 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 5 May 2021 12:10:17 +0300 Subject: [PATCH 1/4] hints: messages.hh: add missing #include Make the header self-contained. --- db/hints/messages.hh | 1 + 1 file changed, 1 insertion(+) diff --git a/db/hints/messages.hh b/db/hints/messages.hh index 642f935964..d7133b462b 100644 --- a/db/hints/messages.hh +++ b/db/hints/messages.hh @@ -23,6 +23,7 @@ #include #include +#include "gms/inet_address.hh" #include "utils/UUID.hh" From 3114f09d765e84c85162ca1587e1eff6edae635c Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 29 Apr 2021 16:05:07 +0300 Subject: [PATCH 2/4] utils: small_vector: add print operator for std::ostream In order to replace std::vector with utils::small_vector, it needs to support this feature too. --- utils/small_vector.hh | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/utils/small_vector.hh b/utils/small_vector.hh index 42667a8bf5..c387f15a47 100644 --- a/utils/small_vector.hh +++ b/utils/small_vector.hh @@ -468,4 +468,19 @@ public: } }; +template +std::ostream& operator<<(std::ostream& os, const utils::small_vector& v) { + os << "{"; + bool first = true; + for (auto&& e : v) { + if (!first) { + os << ", "; + } + first = false; + os << e; + } + os << "}"; + return os; +} + } From cea5493cb7679907c950d8fc77485a6a4568066a Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 27 Apr 2021 16:24:14 +0300 Subject: [PATCH 3/4] storage_proxy, treewide: introduce names for vectors of inet_address storage_proxy works with vectors of inet_addresses for replica sets and for topology changes (pending endpoints, dead nodes). This patch introduces new names for these (without changing the underlying type - it's still std::vector). This is so that the following patch, that changes those types to utils::small_vector, will be less noisy and highlight the real changes that take place. --- api/storage_service.cc | 2 +- db/consistency_level.cc | 18 +-- db/consistency_level.hh | 15 +-- db/hints/manager.cc | 4 +- db/hints/manager.hh | 3 +- db/view/view.cc | 2 +- dht/range_streamer.cc | 2 +- inet_address_vectors.hh | 29 +++++ locator/abstract_replication_strategy.cc | 14 +-- locator/abstract_replication_strategy.hh | 15 +-- locator/everywhere_replication_strategy.cc | 8 +- locator/everywhere_replication_strategy.hh | 4 +- locator/local_strategy.cc | 6 +- locator/local_strategy.hh | 4 +- locator/network_topology_strategy.cc | 2 +- locator/network_topology_strategy.hh | 2 +- locator/simple_snitch.hh | 2 +- locator/simple_strategy.cc | 4 +- locator/simple_strategy.hh | 2 +- locator/snitch_base.cc | 16 +-- locator/snitch_base.hh | 27 ++-- locator/token_metadata.cc | 10 +- locator/token_metadata.hh | 3 +- message/messaging_service.cc | 6 +- message/messaging_service.hh | 7 +- repair/repair.cc | 6 +- service/storage_proxy.cc | 122 +++++++++---------- service/storage_proxy.hh | 23 ++-- service/storage_service.cc | 30 ++--- service/storage_service.hh | 13 +- test/boost/network_topology_strategy_test.cc | 4 +- 31 files changed, 221 insertions(+), 184 deletions(-) create mode 100644 inet_address_vectors.hh diff --git a/api/storage_service.cc b/api/storage_service.cc index 38efac84fd..953d5dc6d8 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -415,7 +415,7 @@ void set_storage_service(http_context& ctx, routes& r) { auto keyspace = validate_keyspace(ctx, req->param); std::vector res; return make_ready_future(stream_range_as_array(service::get_local_storage_service().get_range_to_address_map(keyspace), - [](const std::pair>& entry){ + [](const std::pair& entry){ ss::maplist_mapper m; if (entry.first.start()) { m.key.push(entry.first.start().value().value().to_sstring()); diff --git a/db/consistency_level.cc b/db/consistency_level.cc index 1f9df0b5cb..573f2eadd3 100644 --- a/db/consistency_level.cc +++ b/db/consistency_level.cc @@ -155,11 +155,11 @@ bool is_local(gms::inet_address endpoint) { snitch_ptr->get_datacenter(endpoint); } -std::vector +inet_address_vector_replica_set filter_for_query(consistency_level cl, keyspace& ks, - std::vector live_endpoints, - const std::vector& preferred_endpoints, + inet_address_vector_replica_set live_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, read_repair_decision read_repair, gms::inet_address* extra, column_family* cf) { @@ -187,7 +187,7 @@ filter_for_query(consistency_level cl, return std::move(live_endpoints); } - std::vector selected_endpoints; + inet_address_vector_replica_set selected_endpoints; // Pre-select endpoints based on client preference. If the endpoints // selected this way aren't enough to satisfy CL requirements select the @@ -201,7 +201,7 @@ filter_for_query(consistency_level cl, if (extra) { *extra = selected == bf ? live_endpoints.front() : *(it + bf); } - return std::vector(it, it + bf); + return inet_address_vector_replica_set(it, it + bf); } else if (selected) { selected_endpoints.reserve(bf); std::move(it, live_endpoints.end(), std::back_inserter(selected_endpoints)); @@ -255,10 +255,10 @@ filter_for_query(consistency_level cl, return selected_endpoints; } -std::vector filter_for_query(consistency_level cl, +inet_address_vector_replica_set filter_for_query(consistency_level cl, keyspace& ks, - std::vector& live_endpoints, - const std::vector& preferred_endpoints, + inet_address_vector_replica_set& live_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, column_family* cf) { return filter_for_query(cl, ks, live_endpoints, preferred_endpoints, read_repair_decision::NONE, nullptr, cf); } @@ -266,7 +266,7 @@ std::vector filter_for_query(consistency_level cl, bool is_sufficient_live_nodes(consistency_level cl, keyspace& ks, - const std::vector& live_endpoints) { + const inet_address_vector_replica_set& live_endpoints) { using namespace locator; switch (cl) { diff --git a/db/consistency_level.hh b/db/consistency_level.hh index bfc6ba3e9c..5db558c922 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -47,6 +47,7 @@ #include "exceptions/exceptions.hh" #include "utils/fb_utilities.hh" #include "gms/inet_address.hh" +#include "inet_address_vectors.hh" #include "database.hh" #include @@ -75,19 +76,19 @@ inline size_t count_local_endpoints(const Range& live_endpoints) { return std::count_if(live_endpoints.begin(), live_endpoints.end(), is_local); } -std::vector +inet_address_vector_replica_set filter_for_query(consistency_level cl, keyspace& ks, - std::vector live_endpoints, - const std::vector& preferred_endpoints, + inet_address_vector_replica_set live_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, read_repair_decision read_repair, gms::inet_address* extra, column_family* cf); -std::vector filter_for_query(consistency_level cl, +inet_address_vector_replica_set filter_for_query(consistency_level cl, keyspace& ks, - std::vector& live_endpoints, - const std::vector& preferred_endpoints, + inet_address_vector_replica_set& live_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, column_family* cf); struct dc_node_count { @@ -132,7 +133,7 @@ inline std::unordered_map count_per_dc_endpoints( bool is_sufficient_live_nodes(consistency_level cl, keyspace& ks, - const std::vector& live_endpoints); + const inet_address_vector_replica_set& live_endpoints); template inline bool assure_sufficient_live_nodes_each_quorum( diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 513f10ef39..6ba7cda42b 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -418,7 +418,7 @@ future manager::end_point_hints_manager::sender::get_last_file_modific }); } -future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_mutation_and_schema m, const std::vector& natural_endpoints) noexcept { +future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_mutation_and_schema m, const inet_address_vector_replica_set& natural_endpoints) noexcept { return futurize_invoke([this, m = std::move(m), &natural_endpoints] () mutable -> future<> { // The fact that we send with CL::ALL in both cases below ensures that new hints are not going // to be generated as a result of hints sending. @@ -804,7 +804,7 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_muta keyspace& ks = _db.find_keyspace(m.s->ks_name()); auto& rs = ks.get_replication_strategy(); auto token = dht::get_token(*m.s, m.fm.key()); - std::vector natural_endpoints = rs.get_natural_endpoints(std::move(token)); + inet_address_vector_replica_set natural_endpoints = rs.get_natural_endpoints(std::move(token)); return do_send_one_mutation(std::move(m), natural_endpoints); } diff --git a/db/hints/manager.hh b/db/hints/manager.hh index 4d0c641de9..8046422150 100644 --- a/db/hints/manager.hh +++ b/db/hints/manager.hh @@ -36,6 +36,7 @@ #include "lister.hh" #include "gms/gossiper.hh" #include "locator/snitch_base.hh" +#include "inet_address_vectors.hh" #include "service/endpoint_lifecycle_subscriber.hh" #include "db/commitlog/commitlog.hh" #include "utils/loading_shared_values.hh" @@ -292,7 +293,7 @@ public: /// \param m mutation to send /// \param natural_endpoints current replicas for the given mutation /// \return future that resolves when the operation is complete - future<> do_send_one_mutation(frozen_mutation_and_schema m, const std::vector& natural_endpoints) noexcept; + future<> do_send_one_mutation(frozen_mutation_and_schema m, const inet_address_vector_replica_set& natural_endpoints) noexcept; /// \brief Send one mutation out. /// diff --git a/db/view/view.cc b/db/view/view.cc index 0593a51fe8..3db6e3dd2f 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1169,7 +1169,7 @@ get_view_natural_endpoint(const sstring& keyspace_name, return view_endpoints[base_it - base_endpoints.begin()]; } -static future<> apply_to_remote_endpoints(gms::inet_address target, std::vector&& pending_endpoints, +static future<> apply_to_remote_endpoints(gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints, frozen_mutation_and_schema& mut, const dht::token& base_token, const dht::token& view_token, service::allow_hints allow_hints, tracing::trace_state_ptr tr_state) { diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 8b26316b8d..c0cdd5c84f 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -131,7 +131,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, dh } const range& src_range = x.first; if (src_range.contains(desired_range, dht::tri_compare)) { - std::vector& addresses = x.second; + inet_address_vector_replica_set& addresses = x.second; auto preferred = snitch->get_sorted_list_by_proximity(_address, addresses); for (inet_address& p : preferred) { range_sources[desired_range].push_back(p); diff --git a/inet_address_vectors.hh b/inet_address_vectors.hh new file mode 100644 index 0000000000..b9803cf9d7 --- /dev/null +++ b/inet_address_vectors.hh @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "gms/inet_address.hh" +#include + +using inet_address_vector_replica_set = std::vector; + +using inet_address_vector_topology_change = std::vector; diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index aae401ca9c..c023923f02 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -74,11 +74,11 @@ void abstract_replication_strategy::validate_replication_strategy(const sstring& } } -std::vector abstract_replication_strategy::get_natural_endpoints(const token& search_token, can_yield can_yield) { +inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, can_yield can_yield) { return do_get_natural_endpoints(search_token, *_shared_token_metadata.get(), can_yield); } -std::vector abstract_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) { +inet_address_vector_replica_set abstract_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) { const token& key_token = tm.first_token(search_token); auto& cached_endpoints = get_cached_endpoints(tm); auto res = cached_endpoints.find(key_token); @@ -94,9 +94,9 @@ std::vector abstract_replication_strategy::do_get_natural_endpoint return res->second; } -std::vector abstract_replication_strategy::get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield can_yield) { +inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield can_yield) { token_metadata_ptr tmptr = _shared_token_metadata.get(); - std::vector natural_endpoints = do_get_natural_endpoints(search_token, *tmptr, can_yield); + inet_address_vector_replica_set natural_endpoints = do_get_natural_endpoints(search_token, *tmptr, can_yield); if (tmptr->is_any_node_being_replaced() && allow_remove_node_being_replaced_from_natural_endpoints()) { // When a new node is started to replace an existing dead node, we want @@ -132,7 +132,7 @@ void abstract_replication_strategy::validate_replication_factor(sstring rf) cons } } -inline std::unordered_map>& +inline std::unordered_map& abstract_replication_strategy::get_cached_endpoints(const token_metadata& tm) { auto ring_version = tm.get_ring_version(); if (_last_invalidated_ring_version != ring_version) { @@ -272,9 +272,9 @@ abstract_replication_strategy::get_address_ranges(const token_metadata& tm, inet return ret; } -std::unordered_map> +std::unordered_map abstract_replication_strategy::get_range_addresses(const token_metadata& tm, can_yield can_yield) const { - std::unordered_map> ret; + std::unordered_map ret; for (auto& t : tm.sorted_tokens()) { dht::token_range_vector ranges = tm.get_primary_ranges_for(t); auto eps = calculate_natural_endpoints(t, tm, can_yield); diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 987be64325..b4881bbda2 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -25,6 +25,7 @@ #include #include #include "gms/inet_address.hh" +#include "locator/snitch_base.hh" #include "dht/i_partitioner.hh" #include "token_metadata.hh" #include "snitch_base.hh" @@ -51,12 +52,12 @@ using can_yield = utils::can_yield; class abstract_replication_strategy { private: long _last_invalidated_ring_version = 0; - std::unordered_map> _cached_endpoints; + std::unordered_map _cached_endpoints; uint64_t _cache_hits_count = 0; static logging::logger logger; - std::unordered_map>& + std::unordered_map& get_cached_endpoints(const token_metadata& tm); protected: sstring _ks_name; @@ -91,15 +92,15 @@ public: snitch_ptr& snitch, const std::map& config_options, replication_strategy_type my_type); - virtual std::vector calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield = can_yield::no) const = 0; + virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield = can_yield::no) const = 0; virtual ~abstract_replication_strategy() {} static std::unique_ptr create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, const shared_token_metadata& stm, const std::map& config_options); static void validate_replication_strategy(const sstring& ks_name, const sstring& strategy_name, const shared_token_metadata& stm, const std::map& config_options); - std::vector get_natural_endpoints(const token& search_token, can_yield = can_yield::no); - std::vector get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield = can_yield::no); + inet_address_vector_replica_set get_natural_endpoints(const token& search_token, can_yield = can_yield::no); + inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield = can_yield::no); virtual void validate_options() const = 0; virtual std::optional> recognized_options() const = 0; virtual size_t get_replication_factor() const = 0; @@ -128,7 +129,7 @@ public: private: // Caller must ensure that token_metadata will not change throughout the call if can_yield::yes. dht::token_range_vector do_get_ranges(inet_address ep, const token_metadata_ptr tmptr, can_yield) const; - virtual std::vector do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield); + virtual inet_address_vector_replica_set do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield); public: // get_primary_ranges() returns the list of "primary ranges" for the given @@ -146,7 +147,7 @@ public: std::unordered_multimap get_address_ranges(const token_metadata& tm, can_yield) const; std::unordered_multimap get_address_ranges(const token_metadata& tm, inet_address endpoint, can_yield) const; - std::unordered_map> get_range_addresses(const token_metadata& tm, can_yield) const; + std::unordered_map get_range_addresses(const token_metadata& tm, can_yield) const; dht::token_range_vector get_pending_address_ranges(const token_metadata_ptr tmptr, token pending_token, inet_address pending_address, can_yield) const; diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index 59a2c9848d..bab2c6b7d2 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -47,13 +47,13 @@ namespace locator { everywhere_replication_strategy::everywhere_replication_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map& config_options) : abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::everywhere_topology) {} -std::vector everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const { - return tm.get_all_endpoints(); +inet_address_vector_replica_set everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const { + return boost::copy_range(tm.get_all_endpoints()); } -std::vector everywhere_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) { +inet_address_vector_replica_set everywhere_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) { if (tm.sorted_tokens().empty()) { - return std::vector({utils::fb_utilities::get_broadcast_address()}); + return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); } return calculate_natural_endpoints(search_token, tm, can_yield); } diff --git a/locator/everywhere_replication_strategy.hh b/locator/everywhere_replication_strategy.hh index ff447fd174..b87a9cc66d 100644 --- a/locator/everywhere_replication_strategy.hh +++ b/locator/everywhere_replication_strategy.hh @@ -46,8 +46,8 @@ class everywhere_replication_strategy : public abstract_replication_strategy { public: everywhere_replication_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map& config_options); - virtual std::vector calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; - std::vector do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override; + virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; + inet_address_vector_replica_set do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override; virtual void validate_options() const override { /* noop */ } diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index b7280d3c74..954b4b4ed0 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -30,12 +30,12 @@ namespace locator { local_strategy::local_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map& config_options) : abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::local) {} -std::vector local_strategy::do_get_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) { +inet_address_vector_replica_set local_strategy::do_get_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) { return calculate_natural_endpoints(t, tm, can_yield); } -std::vector local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield) const { - return std::vector({utils::fb_utilities::get_broadcast_address()}); +inet_address_vector_replica_set local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield) const { + return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); } void local_strategy::validate_options() const { diff --git a/locator/local_strategy.hh b/locator/local_strategy.hh index a8f119380d..3affd43b69 100644 --- a/locator/local_strategy.hh +++ b/locator/local_strategy.hh @@ -36,7 +36,7 @@ using token = dht::token; class local_strategy : public abstract_replication_strategy { protected: - virtual std::vector calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; + virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; public: local_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map& config_options); virtual ~local_strategy() {}; @@ -46,7 +46,7 @@ public: * because the default implementation depends on token calculations but * LocalStrategy may be used before tokens are set up. */ - std::vector do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override; + inet_address_vector_replica_set do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override; virtual void validate_options() const override; diff --git a/locator/network_topology_strategy.cc b/locator/network_topology_strategy.cc index 29c62aea90..15dbae174a 100644 --- a/locator/network_topology_strategy.cc +++ b/locator/network_topology_strategy.cc @@ -102,7 +102,7 @@ network_topology_strategy::network_topology_strategy( } } -std::vector +inet_address_vector_replica_set network_topology_strategy::calculate_natural_endpoints( const token& search_token, const token_metadata& tm, can_yield can_yield) const { diff --git a/locator/network_topology_strategy.hh b/locator/network_topology_strategy.hh index abe22a02f4..50478e94dd 100644 --- a/locator/network_topology_strategy.hh +++ b/locator/network_topology_strategy.hh @@ -75,7 +75,7 @@ protected: * calculate endpoints in one pass through the tokens by tracking our * progress in each DC, rack etc. */ - virtual std::vector calculate_natural_endpoints( + virtual inet_address_vector_replica_set calculate_natural_endpoints( const token& search_token, const token_metadata& tm, can_yield) const override; virtual void validate_options() const override; diff --git a/locator/simple_snitch.hh b/locator/simple_snitch.hh index fa5cf20659..10776818e3 100644 --- a/locator/simple_snitch.hh +++ b/locator/simple_snitch.hh @@ -66,7 +66,7 @@ struct simple_snitch : public snitch_base { } virtual void sort_by_proximity( - inet_address address, std::vector& addresses) override { + inet_address address, inet_address_vector_replica_set& addresses) override { // Optimization to avoid walking the list } diff --git a/locator/simple_strategy.cc b/locator/simple_strategy.cc index 4840c925d8..b527c765b2 100644 --- a/locator/simple_strategy.cc +++ b/locator/simple_strategy.cc @@ -42,11 +42,11 @@ simple_strategy::simple_strategy(const sstring& keyspace_name, const shared_toke } } -std::vector simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) const { +inet_address_vector_replica_set simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) const { const std::vector& tokens = tm.sorted_tokens(); if (tokens.empty()) { - return std::vector(); + return inet_address_vector_replica_set(); } size_t replicas = get_replication_factor(); diff --git a/locator/simple_strategy.hh b/locator/simple_strategy.hh index 90a8965a97..9082dfba0a 100644 --- a/locator/simple_strategy.hh +++ b/locator/simple_strategy.hh @@ -30,7 +30,7 @@ namespace locator { class simple_strategy : public abstract_replication_strategy { protected: - virtual std::vector calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; + virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override; public: simple_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map& config_options); virtual ~simple_strategy() {}; diff --git a/locator/snitch_base.cc b/locator/snitch_base.cc index 2c056a0a5a..f7412726ed 100644 --- a/locator/snitch_base.cc +++ b/locator/snitch_base.cc @@ -49,11 +49,11 @@ snitch_base::get_endpoint_info(inet_address endpoint, return ep_state ? std::optional(ep_state->value) : std::nullopt; } -std::vector snitch_base::get_sorted_list_by_proximity( +inet_address_vector_replica_set snitch_base::get_sorted_list_by_proximity( inet_address address, - std::vector& unsorted_address) { + inet_address_vector_replica_set& unsorted_address) { - std::vector + inet_address_vector_replica_set preferred(unsorted_address.begin(), unsorted_address.end()); sort_by_proximity(address, preferred); @@ -61,7 +61,7 @@ std::vector snitch_base::get_sorted_list_by_proximity( } void snitch_base::sort_by_proximity( - inet_address address, std::vector& addresses) { + inet_address address, inet_address_vector_replica_set& addresses) { std::sort(addresses.begin(), addresses.end(), [this, &address](inet_address& a1, inet_address& a2) @@ -122,9 +122,9 @@ int snitch_base::compare_endpoints( } bool snitch_base::is_worth_merging_for_range_query( - std::vector& merged, - std::vector& l1, - std::vector& l2) { + inet_address_vector_replica_set& merged, + inet_address_vector_replica_set& l1, + inet_address_vector_replica_set& l2) { // // Querying remote DC is likely to be an order of magnitude slower than // querying locally, so 2 queries to local nodes is likely to still be @@ -136,7 +136,7 @@ bool snitch_base::is_worth_merging_for_range_query( : true; } -bool snitch_base::has_remote_node(std::vector& l) { +bool snitch_base::has_remote_node(inet_address_vector_replica_set& l) { for (auto&& ep : l) { if (_my_dc != get_datacenter(ep)) { return true; diff --git a/locator/snitch_base.hh b/locator/snitch_base.hh index e4a5d47be4..423205d18b 100644 --- a/locator/snitch_base.hh +++ b/locator/snitch_base.hh @@ -44,6 +44,7 @@ #include #include "gms/inet_address.hh" +#include "inet_address_vectors.hh" #include "gms/versioned_value.hh" #include #include @@ -94,16 +95,16 @@ public: /** * returns a new List sorted by proximity to the given endpoint */ - virtual std::vector get_sorted_list_by_proximity( + virtual inet_address_vector_replica_set get_sorted_list_by_proximity( inet_address address, - std::vector& unsorted_address) = 0; + inet_address_vector_replica_set& unsorted_address) = 0; /** * This method will sort the List by proximity to the given * address. */ virtual void sort_by_proximity( - inet_address address, std::vector& addresses) = 0; + inet_address address, inet_address_vector_replica_set& addresses) = 0; /** * compares two endpoints in relation to the target endpoint, returning as @@ -127,9 +128,9 @@ public: * against l2. */ virtual bool is_worth_merging_for_range_query( - std::vector& merged, - std::vector& l1, - std::vector& l2) = 0; + inet_address_vector_replica_set& merged, + inet_address_vector_replica_set& l1, + inet_address_vector_replica_set& l2) = 0; virtual ~i_endpoint_snitch() { assert(_state == snitch_state::stopped); }; @@ -426,25 +427,25 @@ public: // virtual sstring get_datacenter(inet_address endpoint) = 0; // - virtual std::vector get_sorted_list_by_proximity( + virtual inet_address_vector_replica_set get_sorted_list_by_proximity( inet_address address, - std::vector& unsorted_address) override; + inet_address_vector_replica_set& unsorted_address) override; virtual void sort_by_proximity( - inet_address address, std::vector& addresses) override; + inet_address address, inet_address_vector_replica_set& addresses) override; virtual int compare_endpoints( inet_address& address, inet_address& a1, inet_address& a2) override; virtual bool is_worth_merging_for_range_query( - std::vector& merged, - std::vector& l1, - std::vector& l2) override; + inet_address_vector_replica_set& merged, + inet_address_vector_replica_set& l1, + inet_address_vector_replica_set& l2) override; virtual future<> gossip_snitch_info(std::list> info) override; private: - bool has_remote_node(std::vector& l); + bool has_remote_node(inet_address_vector_replica_set& l); protected: static std::optional get_endpoint_info(inet_address endpoint, diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index e4491e6afb..200cf180de 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -735,7 +735,7 @@ public: #endif public: // returns empty vector if keyspace_name not found. - std::vector pending_endpoints_for(const token& token, const sstring& keyspace_name) const; + inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const; #if 0 /** * @deprecated retained for benefit of old tests @@ -1676,7 +1676,7 @@ void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) { _replacing_endpoints.erase(existing_node); } -std::vector token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { +inet_address_vector_topology_change token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { // Fast path 0: pending ranges not found for this keyspace_name const auto pr_it = _pending_ranges_interval_map.find(keyspace_name); if (pr_it == _pending_ranges_interval_map.end()) { @@ -1690,12 +1690,12 @@ std::vector token_metadata_impl::pending_endpoints_for(const } // Slow path: lookup pending ranges - std::vector endpoints; + inet_address_vector_topology_change endpoints; auto interval = range_to_interval(range(token)); const auto it = ks_map.find(interval); if (it != ks_map.end()) { // interval_map does not work with std::vector, convert to std::vector of ips - endpoints = std::vector(it->second.begin(), it->second.end()); + endpoints = inet_address_vector_topology_change(it->second.begin(), it->second.end()); } return endpoints; } @@ -2050,7 +2050,7 @@ token_metadata::count_normal_token_owners() const { return _impl->count_normal_token_owners(); } -std::vector +inet_address_vector_topology_change token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { return _impl->pending_endpoints_for(token, keyspace_name); } diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 6fe5994c4d..feb9ad053a 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -43,6 +43,7 @@ #include #include "gms/inet_address.hh" #include "dht/i_partitioner.hh" +#include "inet_address_vectors.hh" #include "utils/UUID.hh" #include #include @@ -334,7 +335,7 @@ public: size_t count_normal_token_owners() const; // returns empty vector if keyspace_name not found. - std::vector pending_endpoints_for(const token& token, const sstring& keyspace_name) const; + inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const; /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */ std::multimap get_endpoint_to_token_map_for_reading() const; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 9780b086bc..e712b0bc6d 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1189,7 +1189,7 @@ void messaging_service::register_mutation(std::function messaging_service::unregister_mutation() { return unregister_handler(netw::messaging_verb::MUTATION); } -future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, +future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::MUTATION, std::move(id), fm, std::move(forward), std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); @@ -1462,7 +1462,7 @@ future<> messaging_service::unregister_paxos_learn() { return unregister_handler(netw::messaging_verb::PAXOS_LEARN); } future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point timeout, const service::paxos::proposal& decision, - std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, + inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_LEARN, std::move(id), decision, std::move(forward), std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); @@ -1488,7 +1488,7 @@ void messaging_service::register_hint_mutation(std::function messaging_service::unregister_hint_mutation() { return unregister_handler(netw::messaging_verb::HINT_MUTATION); } -future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, +future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::HINT_MUTATION, std::move(id), fm, std::move(forward), std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 3d374750db..ec7aac3b04 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -27,6 +27,7 @@ #include #include #include "gms/inet_address.hh" +#include "inet_address_vectors.hh" #include #include #include "query-request.hh" @@ -462,7 +463,7 @@ public: void register_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional> trace_info)>&& func); future<> unregister_mutation(); - future<> send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, + future<> send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info = std::nullopt); // Wrapper for COUNTER_MUTATION @@ -543,7 +544,7 @@ public: future<> unregister_paxos_learn(); future<> send_paxos_learn(msg_addr id, clock_type::time_point timeout, const service::paxos::proposal& decision, - std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, + inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info = std::nullopt); void register_paxos_prune(std::function(const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key, @@ -557,7 +558,7 @@ public: void register_hint_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional> trace_info)>&& func); future<> unregister_hint_mutation(); - future<> send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, + future<> send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info = std::nullopt); void register_hint_sync_point_create(std::function (db::hints::sync_point_create_request request)>&& func); diff --git a/repair/repair.cc b/repair/repair.cc index 5071d6c415..5549df1a38 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1591,7 +1591,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded> current_replica_endpoints; + std::unordered_map current_replica_endpoints; // Find (for each range) all nodes that store replicas for these ranges as well for (auto& r : ranges) { auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); @@ -1614,8 +1614,8 @@ static future<> do_decommission_removenode_with_repair(seastar::shardedcheck_abort(); } auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - const std::vector new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, utils::can_yield::yes); - const std::vector& current_eps = current_replica_endpoints[r]; + const inet_address_vector_replica_set new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, utils::can_yield::yes); + const inet_address_vector_replica_set& current_eps = current_replica_endpoints[r]; std::unordered_set neighbors_set(new_eps.begin(), new_eps.end()); bool skip_this_range = false; auto new_owner = neighbors_set; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 0ffa7e0b34..b4eaa2056b 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -171,7 +171,7 @@ public: virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) = 0; virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) = 0; - virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) = 0; virtual bool is_shared() = 0; @@ -222,7 +222,7 @@ public: } return make_ready_future<>(); } - virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) override { auto m = _mutations[ep]; @@ -270,7 +270,7 @@ public: tracing::trace(tr_state, "Executing a mutation locally"); return sp.mutate_locally(_schema, *_mutation, std::move(tr_state), db::commitlog::force_sync::no, timeout); } - virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) override { tracing::trace(tr_state, "Sending a mutation to /{}", ep); @@ -299,7 +299,7 @@ public: // becomes unavailable - this might include the current node return sp.mutate_hint(_schema, *_mutation, std::move(tr_state), timeout); } - virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) override { tracing::trace(tr_state, "Sending a hint to /{}", ep); @@ -326,7 +326,7 @@ public: tracing::trace(tr_state, "Executing a learn locally"); return paxos::paxos_state::learn(_schema, *_proposal, timeout, tr_state); } - virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) override { tracing::trace(tr_state, "Sending a learn to /{}", ep); @@ -369,7 +369,7 @@ protected: // added dead_endpoints as a memeber here as well. This to be able to carry the info across // calls in helper methods in a convinient way. Since we hope this will be empty most of the time // it should not be a huge burden. (flw) - std::vector _dead_endpoints; + inet_address_vector_topology_change _dead_endpoints; size_t _cl_acks = 0; bool _cl_achieved = false; bool _throttled = false; @@ -394,7 +394,7 @@ protected: public: abstract_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, tracing::trace_state_ptr trace_state, - storage_proxy::write_stats& stats, service_permit permit, size_t pending_endpoints = 0, std::vector dead_endpoints = {}) + storage_proxy::write_stats& stats, service_permit permit, size_t pending_endpoints = 0, inet_address_vector_topology_change dead_endpoints = {}) : _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), _dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }), _permit(std::move(permit)) { // original comment from cassandra: @@ -588,7 +588,7 @@ public: const std::unordered_set& get_targets() const { return _targets; } - const std::vector& get_dead_endpoints() const { + const inet_address_vector_topology_change& get_dead_endpoints() const { return _dead_endpoints; } bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) { @@ -597,7 +597,7 @@ public: future<> apply_locally(storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { return _mutation_holder->apply_locally(*_proxy, timeout, std::move(tr_state)); } - future<> apply_remotely(gms::inet_address ep, std::vector&& forward, + future<> apply_remotely(gms::inet_address ep, inet_address_vector_replica_set&& forward, storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { return _mutation_holder->apply_remotely(*_proxy, ep, std::move(forward), response_id, timeout, std::move(tr_state)); @@ -631,7 +631,7 @@ class datacenter_write_response_handler : public abstract_write_response_handler public: datacenter_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), std::move(targets), std::move(tr_state), stats, std::move(permit), db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) { @@ -646,7 +646,7 @@ class write_response_handler : public abstract_write_response_handler { public: write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), std::move(targets), std::move(tr_state), stats, std::move(permit), pending_endpoints.size(), std::move(dead_endpoints)) { @@ -658,7 +658,7 @@ class view_update_write_response_handler : public write_response_handler, public public: view_update_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, std::unique_ptr mh, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit): write_response_handler(p, ks, cl, db::write_type::VIEW, std::move(mh), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit)) { @@ -737,8 +737,8 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha } public: datacenter_sync_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - std::unique_ptr mh, std::unordered_set targets, const std::vector& pending_endpoints, - std::vector dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : + std::unique_ptr mh, std::unordered_set targets, const inet_address_vector_topology_change& pending_endpoints, + inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), 0, dead_endpoints) { auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); @@ -1289,9 +1289,9 @@ bool paxos_response_handler::learned(gms::inet_address ep) { return false; } -static std::vector +static inet_address_vector_replica_set replica_ids_to_endpoints(const locator::token_metadata& tm, const std::vector& replica_ids) { - std::vector endpoints; + inet_address_vector_replica_set endpoints; endpoints.reserve(replica_ids.size()); for (const auto& replica_id : replica_ids) { @@ -1304,7 +1304,7 @@ replica_ids_to_endpoints(const locator::token_metadata& tm, const std::vector -endpoints_to_replica_ids(const locator::token_metadata& tm, const std::vector& endpoints) { +endpoints_to_replica_ids(const locator::token_metadata& tm, const inet_address_vector_replica_set& endpoints) { std::vector replica_ids; replica_ids.reserve(endpoints.size()); @@ -1421,7 +1421,7 @@ future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_ } storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, - std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + std::unordered_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) { shared_ptr h; @@ -1908,8 +1908,8 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok auto keyspace_name = s->ks_name(); keyspace& ks = _db.local().find_keyspace(keyspace_name); auto& rs = ks.get_replication_strategy(); - std::vector natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token); - std::vector pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, keyspace_name); + inet_address_vector_replica_set natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token); + inet_address_vector_topology_change pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, keyspace_name); slogger.trace("creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints, pending_endpoints); tracing::trace(tr_state, "Creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints ,pending_endpoints); @@ -1945,7 +1945,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok // filter live endpoints from dead ones std::unordered_set live_endpoints; - std::vector dead_endpoints; + inet_address_vector_topology_change dead_endpoints; live_endpoints.reserve(all.size()); dead_endpoints.reserve(all.size()); std::partition_copy(all.begin(), all.end(), std::inserter(live_endpoints, live_endpoints.begin()), @@ -1991,7 +1991,7 @@ storage_proxy::create_write_response_handler(const std::unordered_mapschema()->ks_name(); keyspace& ks = _db.local().find_keyspace(keyspace_name); - return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector(), std::vector(), std::move(tr_state), get_stats(), std::move(permit)); + return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit)); } storage_proxy::response_id_type @@ -2015,7 +2015,7 @@ storage_proxy::create_write_response_handler(const std::tuple(std::move(commit), s, nullptr), std::move(endpoints), - std::vector(), std::vector(), std::move(tr_state), get_stats(), std::move(permit)); + inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit)); } void storage_proxy::register_cdc_operation_result_tracker(const std::vector& ids, lw_shared_ptr tracker) { @@ -2151,7 +2151,7 @@ gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation& return my_address; } - const auto local_endpoints = boost::copy_range>(live_endpoints | boost::adaptors::filtered([&] (auto&& ep) { + const auto local_endpoints = boost::copy_range(live_endpoints | boost::adaptors::filtered([&] (auto&& ep) { return db::is_local(ep); })); @@ -2232,8 +2232,8 @@ storage_proxy::paxos_participants storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &token, db::consistency_level cl_for_paxos) { keyspace& ks = _db.local().find_keyspace(ks_name); auto& rs = ks.get_replication_strategy(); - std::vector natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token); - std::vector pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, ks_name); + inet_address_vector_replica_set natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token); + inet_address_vector_topology_change pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, ks_name); if (cl_for_paxos == db::consistency_level::LOCAL_SERIAL) { auto itend = boost::range::remove_if(natural_endpoints, std::not_fn(std::cref(db::is_local))); @@ -2253,7 +2253,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token & const size_t quorum_size = natural_endpoints.size() / 2 + 1; const size_t required_participants = quorum_size + pending_endpoints.size(); - std::vector live_endpoints; + inet_address_vector_replica_set live_endpoints; live_endpoints.reserve(participants); boost::copy(boost::range::join(natural_endpoints, pending_endpoints) | @@ -2503,7 +2503,7 @@ bool storage_proxy::cannot_hint(const Range& targets, db::write_type type) const future<> storage_proxy::send_to_endpoint( std::unique_ptr m, gms::inet_address target, - std::vector pending_endpoints, + inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, @@ -2525,7 +2525,7 @@ future<> storage_proxy::send_to_endpoint( db::write_type type, service_permit permit) mutable { std::unordered_set targets; targets.reserve(pending_endpoints.size() + 1); - std::vector dead_endpoints; + inet_address_vector_topology_change dead_endpoints; boost::algorithm::partition_copy( boost::range::join(pending_endpoints, target), std::inserter(targets, targets.begin()), @@ -2555,7 +2555,7 @@ future<> storage_proxy::send_to_endpoint( future<> storage_proxy::send_to_endpoint( frozen_mutation_and_schema fm_a_s, gms::inet_address target, - std::vector pending_endpoints, + inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, allow_hints allow_hints) { @@ -2572,7 +2572,7 @@ future<> storage_proxy::send_to_endpoint( future<> storage_proxy::send_to_endpoint( frozen_mutation_and_schema fm_a_s, gms::inet_address target, - std::vector pending_endpoints, + inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, @@ -2637,8 +2637,8 @@ future<> storage_proxy::send_hint_to_all_replicas(frozen_mutation_and_schema fm_ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type response_id, clock_type::time_point timeout) { // extra-datacenter replicas, grouped by dc - std::unordered_map> dc_groups; - std::vector>> local; + std::unordered_map dc_groups; + std::vector> local; local.reserve(3); auto handler_ptr = get_write_response_handler(response_id); @@ -2650,7 +2650,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo sstring dc = get_dc(dest); // read repair writes do not go through coordinator since mutations are per destination if (handler.read_repair_write() || dc == get_local_dc()) { - local.emplace_back("", std::vector({dest})); + local.emplace_back("", inet_address_vector_replica_set({dest})); } else { dc_groups[dc].push_back(dest); } @@ -2670,7 +2670,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo }; // lambda for applying mutation remotely - auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &global_stats] (gms::inet_address coordinator, std::vector&& forward) { + auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &global_stats] (gms::inet_address coordinator, inet_address_vector_replica_set&& forward) { auto msize = handler_ptr->get_mutation_size(); // can overestimate for repair writes global_stats.queued_write_bytes += msize; @@ -3390,7 +3390,7 @@ public: class abstract_read_executor : public enable_shared_from_this { protected: - using targets_iterator = std::vector::iterator; + using targets_iterator = inet_address_vector_replica_set::iterator; using digest_resolver_ptr = ::shared_ptr; using data_resolver_ptr = ::shared_ptr; using clock_type = storage_proxy::clock_type; @@ -3402,9 +3402,9 @@ protected: dht::partition_range _partition_range; db::consistency_level _cl; size_t _block_for; - std::vector _targets; + inet_address_vector_replica_set _targets; // Targets that were succesfully used for a data or digest request - std::vector _used_targets; + inet_address_vector_replica_set _used_targets; promise>> _result_promise; tracing::trace_state_ptr _trace_state; lw_shared_ptr _cf; @@ -3419,7 +3419,7 @@ private: } public: abstract_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for, - std::vector targets, tracing::trace_state_ptr trace_state, service_permit permit) : + inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit) : _schema(std::move(s)), _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)), _trace_state(std::move(trace_state)), _cf(std::move(cf)), _permit(std::move(permit)) { _proxy->get_stats().reads++; @@ -3434,7 +3434,7 @@ public: /// /// Only filled after the request is finished, call only after /// execute()'s future is ready. - std::vector used_targets() const { + inet_address_vector_replica_set used_targets() const { return _used_targets; } @@ -3722,7 +3722,7 @@ public: class never_speculating_read_executor : public abstract_read_executor { public: - never_speculating_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, std::vector targets, tracing::trace_state_ptr trace_state, + never_speculating_read_executor(schema_ptr s, lw_shared_ptr cf, shared_ptr proxy, lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit) : abstract_read_executor(std::move(s), std::move(cf), std::move(proxy), std::move(cmd), std::move(pr), cl, 0, std::move(targets), std::move(trace_state), std::move(permit)) { _block_for = _targets.size(); @@ -3816,7 +3816,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s db::consistency_level cl, db::read_repair_decision repair_decision, tracing::trace_state_ptr trace_state, - const std::vector& preferred_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, bool& is_read_non_local, service_permit permit) { const dht::token& token = pr.start()->value().token(); @@ -3824,7 +3824,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s speculative_retry::type retry_type = schema->speculative_retry().get_type(); gms::inet_address extra_replica; - std::vector all_replicas = get_live_sorted_endpoints(ks, token); + inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(ks, token); // Check for a non-local read before heat-weighted load balancing // reordering of endpoints happens. The local endpoint, if // present, is always first in the list, as get_live_sorted_endpoints() @@ -3832,7 +3832,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s is_read_non_local |= !all_replicas.empty() && all_replicas.front() != utils::fb_utilities::get_broadcast_address(); auto cf = _db.local().find_column_family(schema).shared_from_this(); - std::vector target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision, + inet_address_vector_replica_set target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision, retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica, _db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr); @@ -3962,7 +3962,7 @@ storage_proxy::query_singular(lw_shared_ptr cmd, auto token_range = dht::token_range::make_singular(pr.start()->value().token()); auto it = query_options.preferred_replicas.find(token_range); const auto replicas = it == query_options.preferred_replicas.end() - ? std::vector{} : replica_ids_to_endpoints(*tmptr, it->second); + ? inet_address_vector_replica_set{} : replica_ids_to_endpoints(*tmptr, it->second); auto read_executor = get_read_executor(cmd, schema, std::move(pr), cl, repair_decision, query_options.trace_state, replicas, is_read_non_local, @@ -4038,7 +4038,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t const auto preferred_replicas_for_range = [this, &preferred_replicas, tmptr] (const dht::partition_range& r) { auto it = preferred_replicas.find(r.transform(std::mem_fn(&dht::ring_position::token))); - return it == preferred_replicas.end() ? std::vector{} : replica_ids_to_endpoints(*tmptr, it->second); + return it == preferred_replicas.end() ? inet_address_vector_replica_set{} : replica_ids_to_endpoints(*tmptr, it->second); }; const auto to_token_range = [] (const dht::partition_range& r) { return r.transform(std::mem_fn(&dht::ring_position::token)); }; @@ -4053,9 +4053,9 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t while (i != ranges.end()) { dht::partition_range& range = *i; - std::vector live_endpoints = get_live_sorted_endpoints(ks, end_token(range)); - std::vector merged_preferred_replicas = preferred_replicas_for_range(*i); - std::vector filtered_endpoints = filter_for_query(cl, ks, live_endpoints, merged_preferred_replicas, pcf); + inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(ks, end_token(range)); + inet_address_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i); + inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, ks, live_endpoints, merged_preferred_replicas, pcf); std::vector merged_ranges{to_token_range(range)}; ++i; @@ -4066,8 +4066,8 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t { const auto current_range_preferred_replicas = preferred_replicas_for_range(*i); dht::partition_range& next_range = *i; - std::vector next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range)); - std::vector next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, current_range_preferred_replicas, pcf); + inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range)); + inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, current_range_preferred_replicas, pcf); // Origin has this to say here: // * If the current range right is the min token, we should stop merging because CFS.getRangeSlice @@ -4080,22 +4080,22 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t break; } - std::vector merged = intersection(live_endpoints, next_endpoints); - std::vector current_merged_preferred_replicas = intersection(merged_preferred_replicas, current_range_preferred_replicas); + inet_address_vector_replica_set merged = intersection(live_endpoints, next_endpoints); + inet_address_vector_replica_set current_merged_preferred_replicas = intersection(merged_preferred_replicas, current_range_preferred_replicas); // Check if there is enough endpoint for the merge to be possible. if (!is_sufficient_live_nodes(cl, ks, merged)) { break; } - std::vector filtered_merged = filter_for_query(cl, ks, merged, current_merged_preferred_replicas, pcf); + inet_address_vector_replica_set filtered_merged = filter_for_query(cl, ks, merged, current_merged_preferred_replicas, pcf); // Estimate whether merging will be a win or not if (!locator::i_endpoint_snitch::get_local_snitch_ptr()->is_worth_merging_for_range_query(filtered_merged, filtered_endpoints, next_filtered_endpoints)) { break; } else if (pcf) { // check that merged set hit rate is not to low - auto find_min = [pcf] (const std::vector& range) { + auto find_min = [pcf] (const inet_address_vector_replica_set& range) { struct { column_family* cf = nullptr; float operator()(const gms::inet_address& ep) const { @@ -4593,15 +4593,15 @@ future storage_proxy::cas(schema_ptr schema, shared_ptr reque co_return condition_met; } -std::vector storage_proxy::get_live_endpoints(keyspace& ks, const dht::token& token) const { +inet_address_vector_replica_set storage_proxy::get_live_endpoints(keyspace& ks, const dht::token& token) const { auto& rs = ks.get_replication_strategy(); - std::vector eps = rs.get_natural_endpoints_without_node_being_replaced(token); + inet_address_vector_replica_set eps = rs.get_natural_endpoints_without_node_being_replaced(token); auto itend = boost::range::remove_if(eps, std::not1(std::bind1st(std::mem_fn(&gms::gossiper::is_alive), &gms::get_local_gossiper()))); eps.erase(itend, eps.end()); return eps; } -void storage_proxy::sort_endpoints_by_proximity(std::vector& eps) { +void storage_proxy::sort_endpoints_by_proximity(inet_address_vector_replica_set& eps) { locator::i_endpoint_snitch::get_local_snitch_ptr()->sort_by_proximity(utils::fb_utilities::get_broadcast_address(), eps); // FIXME: before dynamic snitch is implement put local address (if present) at the beginning auto it = boost::range::find(eps, utils::fb_utilities::get_broadcast_address()); @@ -4610,14 +4610,14 @@ void storage_proxy::sort_endpoints_by_proximity(std::vector& } } -std::vector storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const { +inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const { auto eps = get_live_endpoints(ks, token); sort_endpoints_by_proximity(eps); return eps; } -std::vector storage_proxy::intersection(const std::vector& l1, const std::vector& l2) { - std::vector inter; +inet_address_vector_replica_set storage_proxy::intersection(const inet_address_vector_replica_set& l1, const inet_address_vector_replica_set& l2) { + inet_address_vector_replica_set inter; inter.reserve(l1.size()); std::remove_copy_if(l1.begin(), l1.end(), std::back_inserter(inter), [&l2] (const gms::inet_address& a) { return std::find(l2.begin(), l2.end(), a) == l2.end(); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 9b47e4b6b9..4110b465ce 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -45,6 +45,7 @@ #include "query-request.hh" #include "query-result.hh" #include "query-result-set.hh" +#include "inet_address_vectors.hh" #include #include #include @@ -243,7 +244,7 @@ public: // Holds a list of endpoints participating in CAS request, for a given // consistency level, token, and state of joining/leaving nodes. struct paxos_participants { - std::vector endpoints; + inet_address_vector_replica_set endpoints; // How many participants are required for a quorum (i.e. is it SERIAL or LOCAL_SERIAL). size_t required_participants; bool has_dead_endpoints; @@ -338,7 +339,7 @@ private: std::unique_ptr mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit); + const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit); response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(const std::unordered_map>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); @@ -355,9 +356,9 @@ private: bool cannot_hint(const Range& targets, db::write_type type) const; bool hints_enabled(db::write_type type) const noexcept; db::hints::manager& hints_manager_for(db::write_type type); - std::vector get_live_endpoints(keyspace& ks, const dht::token& token) const; - static void sort_endpoints_by_proximity(std::vector& eps); - std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const; + inet_address_vector_replica_set get_live_endpoints(keyspace& ks, const dht::token& token) const; + static void sort_endpoints_by_proximity(inet_address_vector_replica_set& eps); + inet_address_vector_replica_set get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const; db::read_repair_decision new_read_repair_decision(const schema& s); ::shared_ptr get_read_executor(lw_shared_ptr cmd, schema_ptr schema, @@ -365,7 +366,7 @@ private: db::consistency_level cl, db::read_repair_decision repair_decision, tracing::trace_state_ptr trace_state, - const std::vector& preferred_endpoints, + const inet_address_vector_replica_set& preferred_endpoints, bool& is_bounced_read, service_permit permit); future>, cache_temperature>> query_result_local(schema_ptr, lw_shared_ptr cmd, const dht::partition_range& pr, @@ -380,7 +381,7 @@ private: dht::partition_range_vector partition_ranges, db::consistency_level cl, coordinator_query_options optional_params); - static std::vector intersection(const std::vector& l1, const std::vector& l2); + static inet_address_vector_replica_set intersection(const inet_address_vector_replica_set& l1, const inet_address_vector_replica_set& l2); future query_partition_key_range_concurrent(clock_type::time_point timeout, std::vector>>&& results, lw_shared_ptr cmd, @@ -434,7 +435,7 @@ private: future<> send_to_endpoint( std::unique_ptr m, gms::inet_address target, - std::vector pending_endpoints, + inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, @@ -562,9 +563,9 @@ public: // Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without // hinted handoff support, and just one target. See also // send_to_live_endpoints() - another take on the same original function. - future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector pending_endpoints, db::write_type type, + future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints allow_hints = allow_hints::yes); - future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector pending_endpoints, db::write_type type, + future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type, tracing::trace_state_ptr tr_state, allow_hints allow_hints = allow_hints::yes); // Send a mutation to a specific remote target as a hint. @@ -693,7 +694,7 @@ private: // Is either set explicitly or derived from the consistency level set in keyspace options. db::consistency_level _cl_for_learn; // Live endpoints, as per get_paxos_participants() - std::vector _live_endpoints; + inet_address_vector_replica_set _live_endpoints; // How many endpoints need to respond favourably for the protocol to progress to the next step. size_t _required_participants; // A deadline when the entire CAS operation timeout expires, derived from write_request_timeout_in_ms diff --git a/service/storage_service.cc b/service/storage_service.cc index dc8d7df19a..37b08879e4 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -765,12 +765,12 @@ storage_service::get_rpc_address(const inet_address& endpoint) const { return boost::lexical_cast(endpoint); } -std::unordered_map> +std::unordered_map storage_service::get_range_to_address_map(const sstring& keyspace) const { return get_range_to_address_map(keyspace, get_token_metadata().sorted_tokens()); } -std::unordered_map> +std::unordered_map storage_service::get_range_to_address_map_in_local_dc( const sstring& keyspace) const { std::function filter = [this](const inet_address& address) { @@ -778,7 +778,7 @@ storage_service::get_range_to_address_map_in_local_dc( }; auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc()); - std::unordered_map> filtered_map; + std::unordered_map filtered_map; for (auto entry : orig_map) { auto& addresses = filtered_map[entry.first]; addresses.reserve(entry.second.size()); @@ -807,7 +807,7 @@ storage_service::is_local_dc(const inet_address& targetHost) const { return remote_dc == local_dc; } -std::unordered_map> +std::unordered_map storage_service::get_range_to_address_map(const sstring& keyspace, const std::vector& sorted_tokens) const { sstring ks = keyspace; @@ -2433,7 +2433,7 @@ std::unordered_multimap storage_service::get_cha slogger.debug("Node {} ranges [{}]", endpoint, ranges); - std::unordered_map> current_replica_endpoints; + std::unordered_map current_replica_endpoints; // Find (for each range) all nodes that store replicas for these ranges as well auto tmptr = get_token_metadata_ptr(); @@ -2472,7 +2472,7 @@ std::unordered_multimap storage_service::get_cha auto rg = current_replica_endpoints.equal_range(r); for (auto it = rg.first; it != rg.second; it++) { const dht::token_range& range_ = it->first; - std::vector& current_eps = it->second; + inet_address_vector_replica_set& current_eps = it->second; slogger.debug("range={}, current_replica_endpoints={}, new_replica_endpoints={}", range_, current_eps, new_replica_endpoints); for (auto ep : it->second) { auto beg = new_replica_endpoints.begin(); @@ -2830,7 +2830,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name, ops_uuid, nr_sst_current, nr_sst_current + sst_processed.size(), nr_sst_total, sst_names); auto start_time = std::chrono::steady_clock::now(); - std::vector current_targets; + inet_address_vector_replica_set current_targets; std::unordered_map metas; size_t num_partitions_processed = 0; size_t num_bytes_read = 0; @@ -2996,19 +2996,19 @@ storage_service::get_new_source_ranges(const sstring& keyspace_name, const dht:: auto my_address = get_broadcast_address(); auto& ks = _db.local().find_keyspace(keyspace_name); auto& strat = ks.get_replication_strategy(); - std::unordered_map> range_addresses = strat.get_range_addresses(tm, utils::can_yield::yes); + std::unordered_map range_addresses = strat.get_range_addresses(tm, utils::can_yield::yes); std::unordered_multimap source_ranges; // find alive sources for our new ranges for (auto r : ranges) { - std::vector possible_nodes; + inet_address_vector_replica_set possible_nodes; auto it = range_addresses.find(r); if (it != range_addresses.end()) { possible_nodes = it->second; } auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); - std::vector sources = snitch->get_sorted_list_by_proximity(my_address, possible_nodes); + inet_address_vector_replica_set sources = snitch->get_sorted_list_by_proximity(my_address, possible_nodes); if (std::find(sources.begin(), sources.end(), my_address) != sources.end()) { auto err = format("get_new_source_ranges: sources={}, my_address={}", sources, my_address); @@ -3038,7 +3038,7 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ std::vector ranges; //Token.TokenFactory tf = getPartitioner().getTokenFactory(); - std::unordered_map> range_to_address_map = + std::unordered_map range_to_address_map = include_only_local_dc ? get_range_to_address_map_in_local_dc(keyspace) : get_range_to_address_map(keyspace); @@ -3082,11 +3082,11 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ return ranges; } -std::unordered_map> +std::unordered_map storage_service::construct_range_to_endpoint_map( const sstring& keyspace, const dht::token_range_vector& ranges) const { - std::unordered_map> res; + std::unordered_map res; for (auto r : ranges) { res[r] = _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints( r.end() ? r.end()->value() : dht::maximum_token()); @@ -3351,7 +3351,7 @@ storage_service::get_all_ranges(const std::vector& sorted_tokens) const { return ranges; } -std::vector +inet_address_vector_replica_set storage_service::get_natural_endpoints(const sstring& keyspace, const sstring& cf, const sstring& key) const { auto schema = _db.local().find_schema(keyspace, cf); @@ -3360,7 +3360,7 @@ storage_service::get_natural_endpoints(const sstring& keyspace, return get_natural_endpoints(keyspace, token); } -std::vector +inet_address_vector_replica_set storage_service::get_natural_endpoints(const sstring& keyspace, const token& pos) const { return _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(pos); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 6abe9bcdce..7ddf57fe52 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -43,6 +43,7 @@ #include "service/endpoint_lifecycle_subscriber.hh" #include "locator/token_metadata.hh" #include "gms/gossiper.hh" +#include "inet_address_vectors.hh" #include #include "dht/i_partitioner.hh" #include "dht/token_range_endpoints.hh" @@ -468,16 +469,16 @@ public: */ sstring get_rpc_address(const inet_address& endpoint) const; - std::unordered_map> get_range_to_address_map(const sstring& keyspace) const; + std::unordered_map get_range_to_address_map(const sstring& keyspace) const; - std::unordered_map> get_range_to_address_map_in_local_dc( + std::unordered_map get_range_to_address_map_in_local_dc( const sstring& keyspace) const; std::vector get_tokens_in_local_dc() const; bool is_local_dc(const inet_address& targetHost) const; - std::unordered_map> get_range_to_address_map(const sstring& keyspace, + std::unordered_map get_range_to_address_map(const sstring& keyspace, const std::vector& sorted_tokens) const; /** @@ -510,7 +511,7 @@ public: * @param ranges * @return mapping of ranges to the replicas responsible for them. */ - std::unordered_map> construct_range_to_endpoint_map( + std::unordered_map construct_range_to_endpoint_map( const sstring& keyspace, const dht::token_range_vector& ranges) const; public: @@ -727,7 +728,7 @@ public: * @param key key for which we need to find the endpoint * @return the endpoint responsible for this key */ - std::vector get_natural_endpoints(const sstring& keyspace, + inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const sstring& cf, const sstring& key) const; /** @@ -738,7 +739,7 @@ public: * @param pos position for which we need to find the endpoint * @return the endpoint responsible for this token */ - std::vector get_natural_endpoints(const sstring& keyspace, const token& pos) const; + inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const token& pos) const; /** * @return Vector of Token ranges (_not_ keys!) together with estimated key count, diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index 8f5e636e2b..b1dcc95e8d 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -48,7 +48,7 @@ struct ring_point { inet_address host; }; -void print_natural_endpoints(double point, const std::vector v) { +void print_natural_endpoints(double point, const inet_address_vector_replica_set v) { testlog.debug("Natural endpoints for a token {}:", point); std::string str; std::ostringstream strm(str); @@ -105,7 +105,7 @@ void strategy_sanity_check( void endpoints_check( abstract_replication_strategy* ars_ptr, - std::vector& endpoints) { + inet_address_vector_replica_set& endpoints) { // Check the total RF BOOST_CHECK(endpoints.size() == ars_ptr->get_replication_factor()); From e9802348b59fa6f9b1d96bcbee9dc6059b38060a Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 28 Apr 2021 14:59:07 +0300 Subject: [PATCH 4/4] storage_proxy, treewide: use utils::small_vector inet_address_vector:s Replace std::vector with a small_vector of size 3 for replica sets (reflecting the common case of local reads, and the somewhat less common case of single-datacenter writes). Vectors used to describe topology changes are of size 1, reflecting that up to one node is usually involved with topology changes. At those counts and below we save an allocation; above those counts everything still works, but small_vector allocates like std::vector. In a few places we need to convert between std::vector and the new types, but these are all out of the hot paths (or are in a hot path, but behind a cache). --- db/consistency_level.cc | 2 +- inet_address_vectors.hh | 6 +++--- locator/network_topology_strategy.cc | 2 +- locator/simple_strategy.cc | 2 +- repair/repair.cc | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/db/consistency_level.cc b/db/consistency_level.cc index 573f2eadd3..e783f040a7 100644 --- a/db/consistency_level.cc +++ b/db/consistency_level.cc @@ -242,7 +242,7 @@ filter_for_query(consistency_level cl, if (!old_node && ht_max - ht_min > 0.01) { // if there is old node or hit rates are close skip calculations // local node is always first if present (see storage_proxy::get_live_sorted_endpoints) unsigned local_idx = epi[0].first == utils::fb_utilities::get_broadcast_address() ? 0 : epi.size() + 1; - live_endpoints = miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra)); + live_endpoints = boost::copy_range(miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra))); } } diff --git a/inet_address_vectors.hh b/inet_address_vectors.hh index b9803cf9d7..596a38bcf9 100644 --- a/inet_address_vectors.hh +++ b/inet_address_vectors.hh @@ -22,8 +22,8 @@ #pragma once #include "gms/inet_address.hh" -#include +#include "utils/small_vector.hh" -using inet_address_vector_replica_set = std::vector; +using inet_address_vector_replica_set = utils::small_vector; -using inet_address_vector_topology_change = std::vector; +using inet_address_vector_topology_change = utils::small_vector; diff --git a/locator/network_topology_strategy.cc b/locator/network_topology_strategy.cc index 15dbae174a..62d8a053b7 100644 --- a/locator/network_topology_strategy.cc +++ b/locator/network_topology_strategy.cc @@ -258,7 +258,7 @@ network_topology_strategy::calculate_natural_endpoints( } } - return std::move(replicas.get_vector()); + return boost::copy_range(replicas.get_vector()); } void network_topology_strategy::validate_options() const { diff --git a/locator/simple_strategy.cc b/locator/simple_strategy.cc index b527c765b2..2b9dd79a2b 100644 --- a/locator/simple_strategy.cc +++ b/locator/simple_strategy.cc @@ -66,7 +66,7 @@ inet_address_vector_replica_set simple_strategy::calculate_natural_endpoints(con endpoints.push_back(*ep); } - return std::move(endpoints.get_vector()); + return boost::copy_range(endpoints.get_vector()); } size_t simple_strategy::get_replication_factor() const { diff --git a/repair/repair.cc b/repair/repair.cc index 5549df1a38..74b5f18858 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -284,7 +284,7 @@ static std::vector get_neighbors(database& db, ret.erase(it, ret.end()); } - return ret; + return boost::copy_range>(std::move(ret)); #if 0 // Origin's ActiveRepairService.getNeighbors() also verifies that the @@ -1686,7 +1686,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded>(new_eps)); } } else { throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, wrong nubmer of new owner node={}",