Merge 'storage_proxy: use small_vector for vectors of inet_address' from Avi Kivity

storage_proxy uses std::vector<inet_address> for small lists of nodes - for replication (often 2-3 replicas per operation) and for pending operations (usually 0-1). These vectors require an allocation, sometimes more than one if reserve() is not used correctly.

This series switches storage_proxy to use utils::small_vector instead, removing the allocations in the common case.

Test results (perf_simple_query --smp 1 --task-quota-ms 10):

```
before: median 184810.98 tps ( 91.1 allocs/op,  20.1 tasks/op,   54564 insns/op)
after:  median 192125.99 tps ( 87.1 allocs/op,  20.1 tasks/op,   53673 insns/op)
```

4 allocations and ~900 instructions are removed (the tps figure is also improved, but it is less reliable due to cpu frequency changes).

The type change is unfortunately not contained in storage_proxy - the abstraction leaks to providers of replica sets and topology change vectors. This is sad but IMO the benefits make it worthwhile.

I expect more such changes can be applied in storage_proxy, specifically std::unordered_set<gms::inet_address> and vectors of response handles.

Closes #8592

* github.com:scylladb/scylla:
  storage_proxy, treewide: use utils::small_vector inet_address_vector:s
  storage_proxy, treewide: introduce names for vectors of inet_address
  utils: small_vector: add print operator for std::ostream
  hints: messages.hh: add missing #include
This commit is contained in:
Tomasz Grabiec
2021-05-06 18:00:53 +02:00
33 changed files with 242 additions and 189 deletions

View File

@@ -415,7 +415,7 @@ void set_storage_service(http_context& ctx, routes& r) {
auto keyspace = validate_keyspace(ctx, req->param);
std::vector<ss::maplist_mapper> res;
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().get_range_to_address_map(keyspace),
[](const std::pair<dht::token_range, std::vector<gms::inet_address>>& entry){
[](const std::pair<dht::token_range, inet_address_vector_replica_set>& entry){
ss::maplist_mapper m;
if (entry.first.start()) {
m.key.push(entry.first.start().value().value().to_sstring());

View File

@@ -155,11 +155,11 @@ bool is_local(gms::inet_address endpoint) {
snitch_ptr->get_datacenter(endpoint);
}
std::vector<gms::inet_address>
inet_address_vector_replica_set
filter_for_query(consistency_level cl,
keyspace& ks,
std::vector<gms::inet_address> live_endpoints,
const std::vector<gms::inet_address>& preferred_endpoints,
inet_address_vector_replica_set live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
read_repair_decision read_repair,
gms::inet_address* extra,
column_family* cf) {
@@ -187,7 +187,7 @@ filter_for_query(consistency_level cl,
return std::move(live_endpoints);
}
std::vector<gms::inet_address> selected_endpoints;
inet_address_vector_replica_set selected_endpoints;
// Pre-select endpoints based on client preference. If the endpoints
// selected this way aren't enough to satisfy CL requirements select the
@@ -201,7 +201,7 @@ filter_for_query(consistency_level cl,
if (extra) {
*extra = selected == bf ? live_endpoints.front() : *(it + bf);
}
return std::vector<gms::inet_address>(it, it + bf);
return inet_address_vector_replica_set(it, it + bf);
} else if (selected) {
selected_endpoints.reserve(bf);
std::move(it, live_endpoints.end(), std::back_inserter(selected_endpoints));
@@ -242,7 +242,7 @@ filter_for_query(consistency_level cl,
if (!old_node && ht_max - ht_min > 0.01) { // if there is old node or hit rates are close skip calculations
// local node is always first if present (see storage_proxy::get_live_sorted_endpoints)
unsigned local_idx = epi[0].first == utils::fb_utilities::get_broadcast_address() ? 0 : epi.size() + 1;
live_endpoints = miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra));
live_endpoints = boost::copy_range<inet_address_vector_replica_set>(miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra)));
}
}
@@ -255,10 +255,10 @@ filter_for_query(consistency_level cl,
return selected_endpoints;
}
std::vector<gms::inet_address> filter_for_query(consistency_level cl,
inet_address_vector_replica_set filter_for_query(consistency_level cl,
keyspace& ks,
std::vector<gms::inet_address>& live_endpoints,
const std::vector<gms::inet_address>& preferred_endpoints,
inet_address_vector_replica_set& live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
column_family* cf) {
return filter_for_query(cl, ks, live_endpoints, preferred_endpoints, read_repair_decision::NONE, nullptr, cf);
}
@@ -266,7 +266,7 @@ std::vector<gms::inet_address> filter_for_query(consistency_level cl,
bool
is_sufficient_live_nodes(consistency_level cl,
keyspace& ks,
const std::vector<gms::inet_address>& live_endpoints) {
const inet_address_vector_replica_set& live_endpoints) {
using namespace locator;
switch (cl) {

View File

@@ -47,6 +47,7 @@
#include "exceptions/exceptions.hh"
#include "utils/fb_utilities.hh"
#include "gms/inet_address.hh"
#include "inet_address_vectors.hh"
#include "database.hh"
#include <iosfwd>
@@ -75,19 +76,19 @@ inline size_t count_local_endpoints(const Range& live_endpoints) {
return std::count_if(live_endpoints.begin(), live_endpoints.end(), is_local);
}
std::vector<gms::inet_address>
inet_address_vector_replica_set
filter_for_query(consistency_level cl,
keyspace& ks,
std::vector<gms::inet_address> live_endpoints,
const std::vector<gms::inet_address>& preferred_endpoints,
inet_address_vector_replica_set live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
read_repair_decision read_repair,
gms::inet_address* extra,
column_family* cf);
std::vector<gms::inet_address> filter_for_query(consistency_level cl,
inet_address_vector_replica_set filter_for_query(consistency_level cl,
keyspace& ks,
std::vector<gms::inet_address>& live_endpoints,
const std::vector<gms::inet_address>& preferred_endpoints,
inet_address_vector_replica_set& live_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
column_family* cf);
struct dc_node_count {
@@ -132,7 +133,7 @@ inline std::unordered_map<sstring, dc_node_count> count_per_dc_endpoints(
bool
is_sufficient_live_nodes(consistency_level cl,
keyspace& ks,
const std::vector<gms::inet_address>& live_endpoints);
const inet_address_vector_replica_set& live_endpoints);
template<typename Range, typename PendingRange>
inline bool assure_sufficient_live_nodes_each_quorum(

View File

@@ -418,7 +418,7 @@ future<timespec> manager::end_point_hints_manager::sender::get_last_file_modific
});
}
future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_mutation_and_schema m, const std::vector<gms::inet_address>& natural_endpoints) noexcept {
future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_mutation_and_schema m, const inet_address_vector_replica_set& natural_endpoints) noexcept {
return futurize_invoke([this, m = std::move(m), &natural_endpoints] () mutable -> future<> {
// The fact that we send with CL::ALL in both cases below ensures that new hints are not going
// to be generated as a result of hints sending.
@@ -804,7 +804,7 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_muta
keyspace& ks = _db.find_keyspace(m.s->ks_name());
auto& rs = ks.get_replication_strategy();
auto token = dht::get_token(*m.s, m.fm.key());
std::vector<gms::inet_address> natural_endpoints = rs.get_natural_endpoints(std::move(token));
inet_address_vector_replica_set natural_endpoints = rs.get_natural_endpoints(std::move(token));
return do_send_one_mutation(std::move(m), natural_endpoints);
}

View File

@@ -36,6 +36,7 @@
#include "lister.hh"
#include "gms/gossiper.hh"
#include "locator/snitch_base.hh"
#include "inet_address_vectors.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "db/commitlog/commitlog.hh"
#include "utils/loading_shared_values.hh"
@@ -292,7 +293,7 @@ public:
/// \param m mutation to send
/// \param natural_endpoints current replicas for the given mutation
/// \return future that resolves when the operation is complete
future<> do_send_one_mutation(frozen_mutation_and_schema m, const std::vector<gms::inet_address>& natural_endpoints) noexcept;
future<> do_send_one_mutation(frozen_mutation_and_schema m, const inet_address_vector_replica_set& natural_endpoints) noexcept;
/// \brief Send one mutation out.
///

View File

@@ -23,6 +23,7 @@
#include <vector>
#include <seastar/core/lowres_clock.hh>
#include "gms/inet_address.hh"
#include "utils/UUID.hh"
#include "gms/inet_address.hh"

View File

@@ -1169,7 +1169,7 @@ get_view_natural_endpoint(const sstring& keyspace_name,
return view_endpoints[base_it - base_endpoints.begin()];
}
static future<> apply_to_remote_endpoints(gms::inet_address target, std::vector<gms::inet_address>&& pending_endpoints,
static future<> apply_to_remote_endpoints(gms::inet_address target, inet_address_vector_topology_change&& pending_endpoints,
frozen_mutation_and_schema& mut, const dht::token& base_token, const dht::token& view_token,
service::allow_hints allow_hints, tracing::trace_state_ptr tr_state) {

View File

@@ -131,7 +131,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, dh
}
const range<token>& src_range = x.first;
if (src_range.contains(desired_range, dht::tri_compare)) {
std::vector<inet_address>& addresses = x.second;
inet_address_vector_replica_set& addresses = x.second;
auto preferred = snitch->get_sorted_list_by_proximity(_address, addresses);
for (inet_address& p : preferred) {
range_sources[desired_range].push_back(p);

29
inet_address_vectors.hh Normal file
View File

@@ -0,0 +1,29 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "gms/inet_address.hh"
#include "utils/small_vector.hh"
using inet_address_vector_replica_set = utils::small_vector<gms::inet_address, 3>;
using inet_address_vector_topology_change = utils::small_vector<gms::inet_address, 1>;

View File

@@ -74,11 +74,11 @@ void abstract_replication_strategy::validate_replication_strategy(const sstring&
}
}
std::vector<inet_address> abstract_replication_strategy::get_natural_endpoints(const token& search_token, can_yield can_yield) {
inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, can_yield can_yield) {
return do_get_natural_endpoints(search_token, *_shared_token_metadata.get(), can_yield);
}
std::vector<inet_address> abstract_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) {
inet_address_vector_replica_set abstract_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) {
const token& key_token = tm.first_token(search_token);
auto& cached_endpoints = get_cached_endpoints(tm);
auto res = cached_endpoints.find(key_token);
@@ -94,9 +94,9 @@ std::vector<inet_address> abstract_replication_strategy::do_get_natural_endpoint
return res->second;
}
std::vector<inet_address> abstract_replication_strategy::get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield can_yield) {
inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield can_yield) {
token_metadata_ptr tmptr = _shared_token_metadata.get();
std::vector<gms::inet_address> natural_endpoints = do_get_natural_endpoints(search_token, *tmptr, can_yield);
inet_address_vector_replica_set natural_endpoints = do_get_natural_endpoints(search_token, *tmptr, can_yield);
if (tmptr->is_any_node_being_replaced() &&
allow_remove_node_being_replaced_from_natural_endpoints()) {
// When a new node is started to replace an existing dead node, we want
@@ -132,7 +132,7 @@ void abstract_replication_strategy::validate_replication_factor(sstring rf) cons
}
}
inline std::unordered_map<token, std::vector<inet_address>>&
inline std::unordered_map<token, inet_address_vector_replica_set>&
abstract_replication_strategy::get_cached_endpoints(const token_metadata& tm) {
auto ring_version = tm.get_ring_version();
if (_last_invalidated_ring_version != ring_version) {
@@ -272,9 +272,9 @@ abstract_replication_strategy::get_address_ranges(const token_metadata& tm, inet
return ret;
}
std::unordered_map<dht::token_range, std::vector<inet_address>>
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
abstract_replication_strategy::get_range_addresses(const token_metadata& tm, can_yield can_yield) const {
std::unordered_map<dht::token_range, std::vector<inet_address>> ret;
std::unordered_map<dht::token_range, inet_address_vector_replica_set> ret;
for (auto& t : tm.sorted_tokens()) {
dht::token_range_vector ranges = tm.get_primary_ranges_for(t);
auto eps = calculate_natural_endpoints(t, tm, can_yield);

View File

@@ -25,6 +25,7 @@
#include <functional>
#include <unordered_map>
#include "gms/inet_address.hh"
#include "locator/snitch_base.hh"
#include "dht/i_partitioner.hh"
#include "token_metadata.hh"
#include "snitch_base.hh"
@@ -51,12 +52,12 @@ using can_yield = utils::can_yield;
class abstract_replication_strategy {
private:
long _last_invalidated_ring_version = 0;
std::unordered_map<token, std::vector<inet_address>> _cached_endpoints;
std::unordered_map<token, inet_address_vector_replica_set> _cached_endpoints;
uint64_t _cache_hits_count = 0;
static logging::logger logger;
std::unordered_map<token, std::vector<inet_address>>&
std::unordered_map<token, inet_address_vector_replica_set>&
get_cached_endpoints(const token_metadata& tm);
protected:
sstring _ks_name;
@@ -91,15 +92,15 @@ public:
snitch_ptr& snitch,
const std::map<sstring, sstring>& config_options,
replication_strategy_type my_type);
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield = can_yield::no) const = 0;
virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield = can_yield::no) const = 0;
virtual ~abstract_replication_strategy() {}
static std::unique_ptr<abstract_replication_strategy> create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, const shared_token_metadata& stm, const std::map<sstring, sstring>& config_options);
static void validate_replication_strategy(const sstring& ks_name,
const sstring& strategy_name,
const shared_token_metadata& stm,
const std::map<sstring, sstring>& config_options);
std::vector<inet_address> get_natural_endpoints(const token& search_token, can_yield = can_yield::no);
std::vector<inet_address> get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield = can_yield::no);
inet_address_vector_replica_set get_natural_endpoints(const token& search_token, can_yield = can_yield::no);
inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token, can_yield = can_yield::no);
virtual void validate_options() const = 0;
virtual std::optional<std::set<sstring>> recognized_options() const = 0;
virtual size_t get_replication_factor() const = 0;
@@ -128,7 +129,7 @@ public:
private:
// Caller must ensure that token_metadata will not change throughout the call if can_yield::yes.
dht::token_range_vector do_get_ranges(inet_address ep, const token_metadata_ptr tmptr, can_yield) const;
virtual std::vector<inet_address> do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield);
virtual inet_address_vector_replica_set do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield);
public:
// get_primary_ranges() returns the list of "primary ranges" for the given
@@ -146,7 +147,7 @@ public:
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(const token_metadata& tm, can_yield) const;
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(const token_metadata& tm, inet_address endpoint, can_yield) const;
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_addresses(const token_metadata& tm, can_yield) const;
std::unordered_map<dht::token_range, inet_address_vector_replica_set> get_range_addresses(const token_metadata& tm, can_yield) const;
dht::token_range_vector get_pending_address_ranges(const token_metadata_ptr tmptr, token pending_token, inet_address pending_address, can_yield) const;

View File

@@ -47,13 +47,13 @@ namespace locator {
everywhere_replication_strategy::everywhere_replication_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::everywhere_topology) {}
std::vector<inet_address> everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const {
return tm.get_all_endpoints();
inet_address_vector_replica_set everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const {
return boost::copy_range<inet_address_vector_replica_set>(tm.get_all_endpoints());
}
std::vector<inet_address> everywhere_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) {
inet_address_vector_replica_set everywhere_replication_strategy::do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield can_yield) {
if (tm.sorted_tokens().empty()) {
return std::vector<inet_address>({utils::fb_utilities::get_broadcast_address()});
return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()});
}
return calculate_natural_endpoints(search_token, tm, can_yield);
}

View File

@@ -46,8 +46,8 @@ class everywhere_replication_strategy : public abstract_replication_strategy {
public:
everywhere_replication_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring,sstring>& config_options);
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override;
std::vector<inet_address> do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override;
virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override;
inet_address_vector_replica_set do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override;
virtual void validate_options() const override { /* noop */ }

View File

@@ -30,12 +30,12 @@ namespace locator {
local_strategy::local_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options) :
abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options, replication_strategy_type::local) {}
std::vector<inet_address> local_strategy::do_get_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) {
inet_address_vector_replica_set local_strategy::do_get_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) {
return calculate_natural_endpoints(t, tm, can_yield);
}
std::vector<inet_address> local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield) const {
return std::vector<inet_address>({utils::fb_utilities::get_broadcast_address()});
inet_address_vector_replica_set local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield) const {
return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()});
}
void local_strategy::validate_options() const {

View File

@@ -36,7 +36,7 @@ using token = dht::token;
class local_strategy : public abstract_replication_strategy {
protected:
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override;
virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override;
public:
local_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options);
virtual ~local_strategy() {};
@@ -46,7 +46,7 @@ public:
* because the default implementation depends on token calculations but
* LocalStrategy may be used before tokens are set up.
*/
std::vector<inet_address> do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override;
inet_address_vector_replica_set do_get_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) override;
virtual void validate_options() const override;

View File

@@ -102,7 +102,7 @@ network_topology_strategy::network_topology_strategy(
}
}
std::vector<inet_address>
inet_address_vector_replica_set
network_topology_strategy::calculate_natural_endpoints(
const token& search_token, const token_metadata& tm, can_yield can_yield) const {
@@ -258,7 +258,7 @@ network_topology_strategy::calculate_natural_endpoints(
}
}
return std::move(replicas.get_vector());
return boost::copy_range<inet_address_vector_replica_set>(replicas.get_vector());
}
void network_topology_strategy::validate_options() const {

View File

@@ -75,7 +75,7 @@ protected:
* calculate endpoints in one pass through the tokens by tracking our
* progress in each DC, rack etc.
*/
virtual std::vector<inet_address> calculate_natural_endpoints(
virtual inet_address_vector_replica_set calculate_natural_endpoints(
const token& search_token, const token_metadata& tm, can_yield) const override;
virtual void validate_options() const override;

View File

@@ -66,7 +66,7 @@ struct simple_snitch : public snitch_base {
}
virtual void sort_by_proximity(
inet_address address, std::vector<inet_address>& addresses) override {
inet_address address, inet_address_vector_replica_set& addresses) override {
// Optimization to avoid walking the list
}

View File

@@ -42,11 +42,11 @@ simple_strategy::simple_strategy(const sstring& keyspace_name, const shared_toke
}
}
std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) const {
inet_address_vector_replica_set simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, can_yield can_yield) const {
const std::vector<token>& tokens = tm.sorted_tokens();
if (tokens.empty()) {
return std::vector<inet_address>();
return inet_address_vector_replica_set();
}
size_t replicas = get_replication_factor();
@@ -66,7 +66,7 @@ std::vector<inet_address> simple_strategy::calculate_natural_endpoints(const tok
endpoints.push_back(*ep);
}
return std::move(endpoints.get_vector());
return boost::copy_range<inet_address_vector_replica_set>(endpoints.get_vector());
}
size_t simple_strategy::get_replication_factor() const {

View File

@@ -30,7 +30,7 @@ namespace locator {
class simple_strategy : public abstract_replication_strategy {
protected:
virtual std::vector<inet_address> calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override;
virtual inet_address_vector_replica_set calculate_natural_endpoints(const token& search_token, const token_metadata& tm, can_yield) const override;
public:
simple_strategy(const sstring& keyspace_name, const shared_token_metadata& token_metadata, snitch_ptr& snitch, const std::map<sstring, sstring>& config_options);
virtual ~simple_strategy() {};

View File

@@ -49,11 +49,11 @@ snitch_base::get_endpoint_info(inet_address endpoint,
return ep_state ? std::optional(ep_state->value) : std::nullopt;
}
std::vector<inet_address> snitch_base::get_sorted_list_by_proximity(
inet_address_vector_replica_set snitch_base::get_sorted_list_by_proximity(
inet_address address,
std::vector<inet_address>& unsorted_address) {
inet_address_vector_replica_set& unsorted_address) {
std::vector<inet_address>
inet_address_vector_replica_set
preferred(unsorted_address.begin(), unsorted_address.end());
sort_by_proximity(address, preferred);
@@ -61,7 +61,7 @@ std::vector<inet_address> snitch_base::get_sorted_list_by_proximity(
}
void snitch_base::sort_by_proximity(
inet_address address, std::vector<inet_address>& addresses) {
inet_address address, inet_address_vector_replica_set& addresses) {
std::sort(addresses.begin(), addresses.end(),
[this, &address](inet_address& a1, inet_address& a2)
@@ -122,9 +122,9 @@ int snitch_base::compare_endpoints(
}
bool snitch_base::is_worth_merging_for_range_query(
std::vector<inet_address>& merged,
std::vector<inet_address>& l1,
std::vector<inet_address>& l2) {
inet_address_vector_replica_set& merged,
inet_address_vector_replica_set& l1,
inet_address_vector_replica_set& l2) {
//
// Querying remote DC is likely to be an order of magnitude slower than
// querying locally, so 2 queries to local nodes is likely to still be
@@ -136,7 +136,7 @@ bool snitch_base::is_worth_merging_for_range_query(
: true;
}
bool snitch_base::has_remote_node(std::vector<inet_address>& l) {
bool snitch_base::has_remote_node(inet_address_vector_replica_set& l) {
for (auto&& ep : l) {
if (_my_dc != get_datacenter(ep)) {
return true;

View File

@@ -44,6 +44,7 @@
#include <boost/signals2/dummy_mutex.hpp>
#include "gms/inet_address.hh"
#include "inet_address_vectors.hh"
#include "gms/versioned_value.hh"
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/thread.hh>
@@ -94,16 +95,16 @@ public:
/**
* returns a new <tt>List</tt> sorted by proximity to the given endpoint
*/
virtual std::vector<inet_address> get_sorted_list_by_proximity(
virtual inet_address_vector_replica_set get_sorted_list_by_proximity(
inet_address address,
std::vector<inet_address>& unsorted_address) = 0;
inet_address_vector_replica_set& unsorted_address) = 0;
/**
* This method will sort the <tt>List</tt> by proximity to the given
* address.
*/
virtual void sort_by_proximity(
inet_address address, std::vector<inet_address>& addresses) = 0;
inet_address address, inet_address_vector_replica_set& addresses) = 0;
/**
* compares two endpoints in relation to the target endpoint, returning as
@@ -127,9 +128,9 @@ public:
* against l2.
*/
virtual bool is_worth_merging_for_range_query(
std::vector<inet_address>& merged,
std::vector<inet_address>& l1,
std::vector<inet_address>& l2) = 0;
inet_address_vector_replica_set& merged,
inet_address_vector_replica_set& l1,
inet_address_vector_replica_set& l2) = 0;
virtual ~i_endpoint_snitch() { assert(_state == snitch_state::stopped); };
@@ -426,25 +427,25 @@ public:
// virtual sstring get_datacenter(inet_address endpoint) = 0;
//
virtual std::vector<inet_address> get_sorted_list_by_proximity(
virtual inet_address_vector_replica_set get_sorted_list_by_proximity(
inet_address address,
std::vector<inet_address>& unsorted_address) override;
inet_address_vector_replica_set& unsorted_address) override;
virtual void sort_by_proximity(
inet_address address, std::vector<inet_address>& addresses) override;
inet_address address, inet_address_vector_replica_set& addresses) override;
virtual int compare_endpoints(
inet_address& address, inet_address& a1, inet_address& a2) override;
virtual bool is_worth_merging_for_range_query(
std::vector<inet_address>& merged,
std::vector<inet_address>& l1,
std::vector<inet_address>& l2) override;
inet_address_vector_replica_set& merged,
inet_address_vector_replica_set& l1,
inet_address_vector_replica_set& l2) override;
virtual future<> gossip_snitch_info(std::list<std::pair<gms::application_state, gms::versioned_value>> info) override;
private:
bool has_remote_node(std::vector<inet_address>& l);
bool has_remote_node(inet_address_vector_replica_set& l);
protected:
static std::optional<sstring> get_endpoint_info(inet_address endpoint,

View File

@@ -735,7 +735,7 @@ public:
#endif
public:
// returns empty vector if keyspace_name not found.
std::vector<gms::inet_address> pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
#if 0
/**
* @deprecated retained for benefit of old tests
@@ -1676,7 +1676,7 @@ void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) {
_replacing_endpoints.erase(existing_node);
}
std::vector<gms::inet_address> token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const {
inet_address_vector_topology_change token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const {
// Fast path 0: pending ranges not found for this keyspace_name
const auto pr_it = _pending_ranges_interval_map.find(keyspace_name);
if (pr_it == _pending_ranges_interval_map.end()) {
@@ -1690,12 +1690,12 @@ std::vector<gms::inet_address> token_metadata_impl::pending_endpoints_for(const
}
// Slow path: lookup pending ranges
std::vector<gms::inet_address> endpoints;
inet_address_vector_topology_change endpoints;
auto interval = range_to_interval(range<dht::token>(token));
const auto it = ks_map.find(interval);
if (it != ks_map.end()) {
// interval_map does not work with std::vector, convert to std::vector of ips
endpoints = std::vector<gms::inet_address>(it->second.begin(), it->second.end());
endpoints = inet_address_vector_topology_change(it->second.begin(), it->second.end());
}
return endpoints;
}
@@ -2050,7 +2050,7 @@ token_metadata::count_normal_token_owners() const {
return _impl->count_normal_token_owners();
}
std::vector<gms::inet_address>
inet_address_vector_topology_change
token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) const {
return _impl->pending_endpoints_for(token, keyspace_name);
}

View File

@@ -43,6 +43,7 @@
#include <unordered_map>
#include "gms/inet_address.hh"
#include "dht/i_partitioner.hh"
#include "inet_address_vectors.hh"
#include "utils/UUID.hh"
#include <optional>
#include <memory>
@@ -334,7 +335,7 @@ public:
size_t count_normal_token_owners() const;
// returns empty vector if keyspace_name not found.
std::vector<gms::inet_address> pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
std::multimap<inet_address, token> get_endpoint_to_token_map_for_reading() const;

View File

@@ -1189,7 +1189,7 @@ void messaging_service::register_mutation(std::function<future<rpc::no_wait_type
future<> messaging_service::unregister_mutation() {
return unregister_handler(netw::messaging_verb::MUTATION);
}
future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector<inet_address> forward,
future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward,
inet_address reply_to, unsigned shard, response_id_type response_id, std::optional<tracing::trace_info> trace_info) {
return send_message_oneway_timeout(this, timeout, messaging_verb::MUTATION, std::move(id), fm, std::move(forward),
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
@@ -1462,7 +1462,7 @@ future<> messaging_service::unregister_paxos_learn() {
return unregister_handler(netw::messaging_verb::PAXOS_LEARN);
}
future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point timeout, const service::paxos::proposal& decision,
std::vector<inet_address> forward, inet_address reply_to, unsigned shard, response_id_type response_id,
inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id,
std::optional<tracing::trace_info> trace_info) {
return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_LEARN, std::move(id), decision, std::move(forward),
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
@@ -1488,7 +1488,7 @@ void messaging_service::register_hint_mutation(std::function<future<rpc::no_wait
future<> messaging_service::unregister_hint_mutation() {
return unregister_handler(netw::messaging_verb::HINT_MUTATION);
}
future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector<inet_address> forward,
future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward,
inet_address reply_to, unsigned shard, response_id_type response_id, std::optional<tracing::trace_info> trace_info) {
return send_message_oneway_timeout(this, timeout, messaging_verb::HINT_MUTATION, std::move(id), fm, std::move(forward),
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));

View File

@@ -27,6 +27,7 @@
#include <seastar/core/distributed.hh>
#include <seastar/core/sstring.hh>
#include "gms/inet_address.hh"
#include "inet_address_vectors.hh"
#include <seastar/rpc/rpc_types.hh>
#include <unordered_map>
#include "query-request.hh"
@@ -462,7 +463,7 @@ public:
void register_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func);
future<> unregister_mutation();
future<> send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector<inet_address> forward,
future<> send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward,
inet_address reply_to, unsigned shard, response_id_type response_id, std::optional<tracing::trace_info> trace_info = std::nullopt);
// Wrapper for COUNTER_MUTATION
@@ -543,7 +544,7 @@ public:
future<> unregister_paxos_learn();
future<> send_paxos_learn(msg_addr id, clock_type::time_point timeout, const service::paxos::proposal& decision,
std::vector<inet_address> forward, inet_address reply_to, unsigned shard, response_id_type response_id,
inet_address_vector_replica_set forward, inet_address reply_to, unsigned shard, response_id_type response_id,
std::optional<tracing::trace_info> trace_info = std::nullopt);
void register_paxos_prune(std::function<future<rpc::no_wait_type>(const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key,
@@ -557,7 +558,7 @@ public:
void register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func);
future<> unregister_hint_mutation();
future<> send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector<inet_address> forward,
future<> send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, inet_address_vector_replica_set forward,
inet_address reply_to, unsigned shard, response_id_type response_id, std::optional<tracing::trace_info> trace_info = std::nullopt);
void register_hint_sync_point_create(std::function<future<db::hints::sync_point_create_response> (db::hints::sync_point_create_request request)>&& func);

View File

@@ -285,7 +285,7 @@ static std::vector<gms::inet_address> get_neighbors(database& db,
ret.erase(it, ret.end());
}
return ret;
return boost::copy_range<std::vector<gms::inet_address>>(std::move(ret));
#if 0
// Origin's ActiveRepairService.getNeighbors() also verifies that the
@@ -1592,7 +1592,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size());
size_t nr_ranges_total = ranges.size();
size_t nr_ranges_skipped = 0;
std::unordered_map<dht::token_range, std::vector<inet_address>> current_replica_endpoints;
std::unordered_map<dht::token_range, inet_address_vector_replica_set> current_replica_endpoints;
// Find (for each range) all nodes that store replicas for these ranges as well
for (auto& r : ranges) {
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
@@ -1615,8 +1615,8 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
ops->check_abort();
}
auto end_token = r.end() ? r.end()->value() : dht::maximum_token();
const std::vector<inet_address> new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, utils::can_yield::yes);
const std::vector<inet_address>& current_eps = current_replica_endpoints[r];
const inet_address_vector_replica_set new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp, utils::can_yield::yes);
const inet_address_vector_replica_set& current_eps = current_replica_endpoints[r];
std::unordered_set<inet_address> neighbors_set(new_eps.begin(), new_eps.end());
bool skip_this_range = false;
auto new_owner = neighbors_set;
@@ -1687,7 +1687,7 @@ static future<> do_decommission_removenode_with_repair(seastar::sharded<database
// Choose the decommission node n3 to run repair to
// sync with one of the replica nodes, e.g., n1, in the
// local DC.
neighbors_set = get_neighbors_set(new_eps);
neighbors_set = get_neighbors_set(boost::copy_range<std::vector<inet_address>>(new_eps));
}
} else {
throw std::runtime_error(format("{}: keyspace={}, range={}, current_replica_endpoints={}, new_replica_endpoints={}, wrong nubmer of new owner node={}",

View File

@@ -171,7 +171,7 @@ public:
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) = 0;
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) = 0;
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) = 0;
virtual bool is_shared() = 0;
@@ -222,7 +222,7 @@ public:
}
return make_ready_future<>();
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
auto m = _mutations[ep];
@@ -270,7 +270,7 @@ public:
tracing::trace(tr_state, "Executing a mutation locally");
return sp.mutate_locally(_schema, *_mutation, std::move(tr_state), db::commitlog::force_sync::no, timeout);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
tracing::trace(tr_state, "Sending a mutation to /{}", ep);
@@ -299,7 +299,7 @@ public:
// becomes unavailable - this might include the current node
return sp.mutate_hint(_schema, *_mutation, std::move(tr_state), timeout);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
tracing::trace(tr_state, "Sending a hint to /{}", ep);
@@ -326,7 +326,7 @@ public:
tracing::trace(tr_state, "Executing a learn locally");
return paxos::paxos_state::learn(_schema, *_proposal, timeout, tr_state);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, inet_address_vector_replica_set&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
tracing::trace(tr_state, "Sending a learn to /{}", ep);
@@ -369,7 +369,7 @@ protected:
// added dead_endpoints as a memeber here as well. This to be able to carry the info across
// calls in helper methods in a convinient way. Since we hope this will be empty most of the time
// it should not be a huge burden. (flw)
std::vector<gms::inet_address> _dead_endpoints;
inet_address_vector_topology_change _dead_endpoints;
size_t _cl_acks = 0;
bool _cl_achieved = false;
bool _throttled = false;
@@ -394,7 +394,7 @@ protected:
public:
abstract_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets, tracing::trace_state_ptr trace_state,
storage_proxy::write_stats& stats, service_permit permit, size_t pending_endpoints = 0, std::vector<gms::inet_address> dead_endpoints = {})
storage_proxy::write_stats& stats, service_permit permit, size_t pending_endpoints = 0, inet_address_vector_topology_change dead_endpoints = {})
: _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)),
_dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }), _permit(std::move(permit)) {
// original comment from cassandra:
@@ -588,7 +588,7 @@ public:
const std::unordered_set<gms::inet_address>& get_targets() const {
return _targets;
}
const std::vector<gms::inet_address>& get_dead_endpoints() const {
const inet_address_vector_topology_change& get_dead_endpoints() const {
return _dead_endpoints;
}
bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) {
@@ -597,7 +597,7 @@ public:
future<> apply_locally(storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) {
return _mutation_holder->apply_locally(*_proxy, timeout, std::move(tr_state));
}
future<> apply_remotely(gms::inet_address ep, std::vector<gms::inet_address>&& forward,
future<> apply_remotely(gms::inet_address ep, inet_address_vector_replica_set&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
return _mutation_holder->apply_remotely(*_proxy, ep, std::move(forward), response_id, timeout, std::move(tr_state));
@@ -631,7 +631,7 @@ class datacenter_write_response_handler : public abstract_write_response_handler
public:
datacenter_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
std::move(targets), std::move(tr_state), stats, std::move(permit), db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) {
@@ -646,7 +646,7 @@ class write_response_handler : public abstract_write_response_handler {
public:
write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
std::move(targets), std::move(tr_state), stats, std::move(permit), pending_endpoints.size(), std::move(dead_endpoints)) {
@@ -658,7 +658,7 @@ class view_update_write_response_handler : public write_response_handler, public
public:
view_update_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit):
write_response_handler(p, ks, cl, db::write_type::VIEW, std::move(mh),
std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats, std::move(permit)) {
@@ -737,8 +737,8 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
}
public:
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets, const std::vector<gms::inet_address>& pending_endpoints,
std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) :
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets, const inet_address_vector_topology_change& pending_endpoints,
inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), 0, dead_endpoints) {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
@@ -1289,9 +1289,9 @@ bool paxos_response_handler::learned(gms::inet_address ep) {
return false;
}
static std::vector<gms::inet_address>
static inet_address_vector_replica_set
replica_ids_to_endpoints(const locator::token_metadata& tm, const std::vector<utils::UUID>& replica_ids) {
std::vector<gms::inet_address> endpoints;
inet_address_vector_replica_set endpoints;
endpoints.reserve(replica_ids.size());
for (const auto& replica_id : replica_ids) {
@@ -1304,7 +1304,7 @@ replica_ids_to_endpoints(const locator::token_metadata& tm, const std::vector<ut
}
static std::vector<utils::UUID>
endpoints_to_replica_ids(const locator::token_metadata& tm, const std::vector<gms::inet_address>& endpoints) {
endpoints_to_replica_ids(const locator::token_metadata& tm, const inet_address_vector_replica_set& endpoints) {
std::vector<utils::UUID> replica_ids;
replica_ids.reserve(endpoints.size());
@@ -1415,7 +1415,7 @@ future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_
}
storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
std::unordered_set<gms::inet_address> targets, const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state,
std::unordered_set<gms::inet_address> targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit)
{
shared_ptr<abstract_write_response_handler> h;
@@ -1902,8 +1902,8 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
auto keyspace_name = s->ks_name();
keyspace& ks = _db.local().find_keyspace(keyspace_name);
auto& rs = ks.get_replication_strategy();
std::vector<gms::inet_address> natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token);
std::vector<gms::inet_address> pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, keyspace_name);
inet_address_vector_replica_set natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token);
inet_address_vector_topology_change pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, keyspace_name);
slogger.trace("creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints, pending_endpoints);
tracing::trace(tr_state, "Creating write handler for token: {} natural: {} pending: {}", token, natural_endpoints ,pending_endpoints);
@@ -1939,7 +1939,7 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
// filter live endpoints from dead ones
std::unordered_set<gms::inet_address> live_endpoints;
std::vector<gms::inet_address> dead_endpoints;
inet_address_vector_topology_change dead_endpoints;
live_endpoints.reserve(all.size());
dead_endpoints.reserve(all.size());
std::partition_copy(all.begin(), all.end(), std::inserter(live_endpoints, live_endpoints.begin()),
@@ -1985,7 +1985,7 @@ storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_
auto keyspace_name = mh->schema()->ks_name();
keyspace& ks = _db.local().find_keyspace(keyspace_name);
return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector<gms::inet_address>(), std::vector<gms::inet_address>(), std::move(tr_state), get_stats(), std::move(permit));
return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit));
}
storage_proxy::response_id_type
@@ -2009,7 +2009,7 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
keyspace& ks = _db.local().find_keyspace(keyspace_name);
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
std::vector<gms::inet_address>(), std::vector<gms::inet_address>(), std::move(tr_state), get_stats(), std::move(permit));
inet_address_vector_topology_change(), inet_address_vector_topology_change(), std::move(tr_state), get_stats(), std::move(permit));
}
void storage_proxy::register_cdc_operation_result_tracker(const std::vector<storage_proxy::unique_response_handler>& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker) {
@@ -2145,7 +2145,7 @@ gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation&
return my_address;
}
const auto local_endpoints = boost::copy_range<std::vector<gms::inet_address>>(live_endpoints | boost::adaptors::filtered([&] (auto&& ep) {
const auto local_endpoints = boost::copy_range<inet_address_vector_replica_set>(live_endpoints | boost::adaptors::filtered([&] (auto&& ep) {
return db::is_local(ep);
}));
@@ -2226,8 +2226,8 @@ storage_proxy::paxos_participants
storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &token, db::consistency_level cl_for_paxos) {
keyspace& ks = _db.local().find_keyspace(ks_name);
auto& rs = ks.get_replication_strategy();
std::vector<gms::inet_address> natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token);
std::vector<gms::inet_address> pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, ks_name);
inet_address_vector_replica_set natural_endpoints = rs.get_natural_endpoints_without_node_being_replaced(token);
inet_address_vector_topology_change pending_endpoints = get_token_metadata_ptr()->pending_endpoints_for(token, ks_name);
if (cl_for_paxos == db::consistency_level::LOCAL_SERIAL) {
auto itend = boost::range::remove_if(natural_endpoints, std::not_fn(std::cref(db::is_local)));
@@ -2247,7 +2247,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
const size_t quorum_size = natural_endpoints.size() / 2 + 1;
const size_t required_participants = quorum_size + pending_endpoints.size();
std::vector<gms::inet_address> live_endpoints;
inet_address_vector_replica_set live_endpoints;
live_endpoints.reserve(participants);
boost::copy(boost::range::join(natural_endpoints, pending_endpoints) |
@@ -2497,7 +2497,7 @@ bool storage_proxy::cannot_hint(const Range& targets, db::write_type type) const
future<> storage_proxy::send_to_endpoint(
std::unique_ptr<mutation_holder> m,
gms::inet_address target,
std::vector<gms::inet_address> pending_endpoints,
inet_address_vector_topology_change pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
@@ -2519,7 +2519,7 @@ future<> storage_proxy::send_to_endpoint(
db::write_type type, service_permit permit) mutable {
std::unordered_set<gms::inet_address> targets;
targets.reserve(pending_endpoints.size() + 1);
std::vector<gms::inet_address> dead_endpoints;
inet_address_vector_topology_change dead_endpoints;
boost::algorithm::partition_copy(
boost::range::join(pending_endpoints, target),
std::inserter(targets, targets.begin()),
@@ -2549,7 +2549,7 @@ future<> storage_proxy::send_to_endpoint(
future<> storage_proxy::send_to_endpoint(
frozen_mutation_and_schema fm_a_s,
gms::inet_address target,
std::vector<gms::inet_address> pending_endpoints,
inet_address_vector_topology_change pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
allow_hints allow_hints) {
@@ -2566,7 +2566,7 @@ future<> storage_proxy::send_to_endpoint(
future<> storage_proxy::send_to_endpoint(
frozen_mutation_and_schema fm_a_s,
gms::inet_address target,
std::vector<gms::inet_address> pending_endpoints,
inet_address_vector_topology_change pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
@@ -2631,8 +2631,8 @@ future<> storage_proxy::send_hint_to_all_replicas(frozen_mutation_and_schema fm_
void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type response_id, clock_type::time_point timeout)
{
// extra-datacenter replicas, grouped by dc
std::unordered_map<sstring, std::vector<gms::inet_address>> dc_groups;
std::vector<std::pair<const sstring, std::vector<gms::inet_address>>> local;
std::unordered_map<sstring, inet_address_vector_replica_set> dc_groups;
std::vector<std::pair<const sstring, inet_address_vector_replica_set>> local;
local.reserve(3);
auto handler_ptr = get_write_response_handler(response_id);
@@ -2644,7 +2644,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
sstring dc = get_dc(dest);
// read repair writes do not go through coordinator since mutations are per destination
if (handler.read_repair_write() || dc == get_local_dc()) {
local.emplace_back("", std::vector<gms::inet_address>({dest}));
local.emplace_back("", inet_address_vector_replica_set({dest}));
} else {
dc_groups[dc].push_back(dest);
}
@@ -2664,7 +2664,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
};
// lambda for applying mutation remotely
auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &global_stats] (gms::inet_address coordinator, std::vector<gms::inet_address>&& forward) {
auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &global_stats] (gms::inet_address coordinator, inet_address_vector_replica_set&& forward) {
auto msize = handler_ptr->get_mutation_size(); // can overestimate for repair writes
global_stats.queued_write_bytes += msize;
@@ -3384,7 +3384,7 @@ public:
class abstract_read_executor : public enable_shared_from_this<abstract_read_executor> {
protected:
using targets_iterator = std::vector<gms::inet_address>::iterator;
using targets_iterator = inet_address_vector_replica_set::iterator;
using digest_resolver_ptr = ::shared_ptr<digest_read_resolver>;
using data_resolver_ptr = ::shared_ptr<data_read_resolver>;
using clock_type = storage_proxy::clock_type;
@@ -3396,9 +3396,9 @@ protected:
dht::partition_range _partition_range;
db::consistency_level _cl;
size_t _block_for;
std::vector<gms::inet_address> _targets;
inet_address_vector_replica_set _targets;
// Targets that were succesfully used for a data or digest request
std::vector<gms::inet_address> _used_targets;
inet_address_vector_replica_set _used_targets;
promise<foreign_ptr<lw_shared_ptr<query::result>>> _result_promise;
tracing::trace_state_ptr _trace_state;
lw_shared_ptr<column_family> _cf;
@@ -3413,7 +3413,7 @@ private:
}
public:
abstract_read_executor(schema_ptr s, lw_shared_ptr<column_family> cf, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, size_t block_for,
std::vector<gms::inet_address> targets, tracing::trace_state_ptr trace_state, service_permit permit) :
inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, service_permit permit) :
_schema(std::move(s)), _proxy(std::move(proxy)), _cmd(std::move(cmd)), _partition_range(std::move(pr)), _cl(cl), _block_for(block_for), _targets(std::move(targets)), _trace_state(std::move(trace_state)),
_cf(std::move(cf)), _permit(std::move(permit)) {
_proxy->get_stats().reads++;
@@ -3428,7 +3428,7 @@ public:
///
/// Only filled after the request is finished, call only after
/// execute()'s future is ready.
std::vector<gms::inet_address> used_targets() const {
inet_address_vector_replica_set used_targets() const {
return _used_targets;
}
@@ -3716,7 +3716,7 @@ public:
class never_speculating_read_executor : public abstract_read_executor {
public:
never_speculating_read_executor(schema_ptr s, lw_shared_ptr<column_family> cf, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets, tracing::trace_state_ptr trace_state,
never_speculating_read_executor(schema_ptr s, lw_shared_ptr<column_family> cf, shared_ptr<storage_proxy> proxy, lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state,
service_permit permit) :
abstract_read_executor(std::move(s), std::move(cf), std::move(proxy), std::move(cmd), std::move(pr), cl, 0, std::move(targets), std::move(trace_state), std::move(permit)) {
_block_for = _targets.size();
@@ -3810,7 +3810,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
db::consistency_level cl,
db::read_repair_decision repair_decision,
tracing::trace_state_ptr trace_state,
const std::vector<gms::inet_address>& preferred_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
bool& is_read_non_local,
service_permit permit) {
const dht::token& token = pr.start()->value().token();
@@ -3818,7 +3818,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
speculative_retry::type retry_type = schema->speculative_retry().get_type();
gms::inet_address extra_replica;
std::vector<gms::inet_address> all_replicas = get_live_sorted_endpoints(ks, token);
inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(ks, token);
// Check for a non-local read before heat-weighted load balancing
// reordering of endpoints happens. The local endpoint, if
// present, is always first in the list, as get_live_sorted_endpoints()
@@ -3826,7 +3826,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
is_read_non_local |= !all_replicas.empty() && all_replicas.front() != utils::fb_utilities::get_broadcast_address();
auto cf = _db.local().find_column_family(schema).shared_from_this();
std::vector<gms::inet_address> target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision,
inet_address_vector_replica_set target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision,
retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica,
_db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr);
@@ -3956,7 +3956,7 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
auto token_range = dht::token_range::make_singular(pr.start()->value().token());
auto it = query_options.preferred_replicas.find(token_range);
const auto replicas = it == query_options.preferred_replicas.end()
? std::vector<gms::inet_address>{} : replica_ids_to_endpoints(*tmptr, it->second);
? inet_address_vector_replica_set{} : replica_ids_to_endpoints(*tmptr, it->second);
auto read_executor = get_read_executor(cmd, schema, std::move(pr), cl, repair_decision,
query_options.trace_state, replicas, is_read_non_local,
@@ -4032,7 +4032,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
const auto preferred_replicas_for_range = [this, &preferred_replicas, tmptr] (const dht::partition_range& r) {
auto it = preferred_replicas.find(r.transform(std::mem_fn(&dht::ring_position::token)));
return it == preferred_replicas.end() ? std::vector<gms::inet_address>{} : replica_ids_to_endpoints(*tmptr, it->second);
return it == preferred_replicas.end() ? inet_address_vector_replica_set{} : replica_ids_to_endpoints(*tmptr, it->second);
};
const auto to_token_range = [] (const dht::partition_range& r) { return r.transform(std::mem_fn(&dht::ring_position::token)); };
@@ -4047,9 +4047,9 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
while (i != ranges.end()) {
dht::partition_range& range = *i;
std::vector<gms::inet_address> live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
std::vector<gms::inet_address> merged_preferred_replicas = preferred_replicas_for_range(*i);
std::vector<gms::inet_address> filtered_endpoints = filter_for_query(cl, ks, live_endpoints, merged_preferred_replicas, pcf);
inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
inet_address_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i);
inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, ks, live_endpoints, merged_preferred_replicas, pcf);
std::vector<dht::token_range> merged_ranges{to_token_range(range)};
++i;
@@ -4060,8 +4060,8 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
{
const auto current_range_preferred_replicas = preferred_replicas_for_range(*i);
dht::partition_range& next_range = *i;
std::vector<gms::inet_address> next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range));
std::vector<gms::inet_address> next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, current_range_preferred_replicas, pcf);
inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range));
inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, current_range_preferred_replicas, pcf);
// Origin has this to say here:
// * If the current range right is the min token, we should stop merging because CFS.getRangeSlice
@@ -4074,22 +4074,22 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
break;
}
std::vector<gms::inet_address> merged = intersection(live_endpoints, next_endpoints);
std::vector<gms::inet_address> current_merged_preferred_replicas = intersection(merged_preferred_replicas, current_range_preferred_replicas);
inet_address_vector_replica_set merged = intersection(live_endpoints, next_endpoints);
inet_address_vector_replica_set current_merged_preferred_replicas = intersection(merged_preferred_replicas, current_range_preferred_replicas);
// Check if there is enough endpoint for the merge to be possible.
if (!is_sufficient_live_nodes(cl, ks, merged)) {
break;
}
std::vector<gms::inet_address> filtered_merged = filter_for_query(cl, ks, merged, current_merged_preferred_replicas, pcf);
inet_address_vector_replica_set filtered_merged = filter_for_query(cl, ks, merged, current_merged_preferred_replicas, pcf);
// Estimate whether merging will be a win or not
if (!locator::i_endpoint_snitch::get_local_snitch_ptr()->is_worth_merging_for_range_query(filtered_merged, filtered_endpoints, next_filtered_endpoints)) {
break;
} else if (pcf) {
// check that merged set hit rate is not to low
auto find_min = [pcf] (const std::vector<gms::inet_address>& range) {
auto find_min = [pcf] (const inet_address_vector_replica_set& range) {
struct {
column_family* cf = nullptr;
float operator()(const gms::inet_address& ep) const {
@@ -4587,15 +4587,15 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
co_return condition_met;
}
std::vector<gms::inet_address> storage_proxy::get_live_endpoints(keyspace& ks, const dht::token& token) const {
inet_address_vector_replica_set storage_proxy::get_live_endpoints(keyspace& ks, const dht::token& token) const {
auto& rs = ks.get_replication_strategy();
std::vector<gms::inet_address> eps = rs.get_natural_endpoints_without_node_being_replaced(token);
inet_address_vector_replica_set eps = rs.get_natural_endpoints_without_node_being_replaced(token);
auto itend = boost::range::remove_if(eps, std::not1(std::bind1st(std::mem_fn(&gms::gossiper::is_alive), &gms::get_local_gossiper())));
eps.erase(itend, eps.end());
return eps;
}
void storage_proxy::sort_endpoints_by_proximity(std::vector<gms::inet_address>& eps) {
void storage_proxy::sort_endpoints_by_proximity(inet_address_vector_replica_set& eps) {
locator::i_endpoint_snitch::get_local_snitch_ptr()->sort_by_proximity(utils::fb_utilities::get_broadcast_address(), eps);
// FIXME: before dynamic snitch is implement put local address (if present) at the beginning
auto it = boost::range::find(eps, utils::fb_utilities::get_broadcast_address());
@@ -4604,14 +4604,14 @@ void storage_proxy::sort_endpoints_by_proximity(std::vector<gms::inet_address>&
}
}
std::vector<gms::inet_address> storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const {
inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const {
auto eps = get_live_endpoints(ks, token);
sort_endpoints_by_proximity(eps);
return eps;
}
std::vector<gms::inet_address> storage_proxy::intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2) {
std::vector<gms::inet_address> inter;
inet_address_vector_replica_set storage_proxy::intersection(const inet_address_vector_replica_set& l1, const inet_address_vector_replica_set& l2) {
inet_address_vector_replica_set inter;
inter.reserve(l1.size());
std::remove_copy_if(l1.begin(), l1.end(), std::back_inserter(inter), [&l2] (const gms::inet_address& a) {
return std::find(l2.begin(), l2.end(), a) == l2.end();

View File

@@ -45,6 +45,7 @@
#include "query-request.hh"
#include "query-result.hh"
#include "query-result-set.hh"
#include "inet_address_vectors.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/execution_stage.hh>
#include <seastar/core/scheduling_specific.hh>
@@ -242,7 +243,7 @@ public:
// Holds a list of endpoints participating in CAS request, for a given
// consistency level, token, and state of joining/leaving nodes.
struct paxos_participants {
std::vector<gms::inet_address> endpoints;
inet_address_vector_replica_set endpoints;
// How many participants are required for a quorum (i.e. is it SERIAL or LOCAL_SERIAL).
size_t required_participants;
bool has_dead_endpoints;
@@ -337,7 +338,7 @@ private:
std::unique_ptr<mutation_holder> mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state,
service_permit permit);
response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, std::unordered_set<gms::inet_address> targets,
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address>, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit);
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit);
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
@@ -354,9 +355,9 @@ private:
bool cannot_hint(const Range& targets, db::write_type type) const;
bool hints_enabled(db::write_type type) const noexcept;
db::hints::manager& hints_manager_for(db::write_type type);
std::vector<gms::inet_address> get_live_endpoints(keyspace& ks, const dht::token& token) const;
static void sort_endpoints_by_proximity(std::vector<gms::inet_address>& eps);
std::vector<gms::inet_address> get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const;
inet_address_vector_replica_set get_live_endpoints(keyspace& ks, const dht::token& token) const;
static void sort_endpoints_by_proximity(inet_address_vector_replica_set& eps);
inet_address_vector_replica_set get_live_sorted_endpoints(keyspace& ks, const dht::token& token) const;
db::read_repair_decision new_read_repair_decision(const schema& s);
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd,
schema_ptr schema,
@@ -364,7 +365,7 @@ private:
db::consistency_level cl,
db::read_repair_decision repair_decision,
tracing::trace_state_ptr trace_state,
const std::vector<gms::inet_address>& preferred_endpoints,
const inet_address_vector_replica_set& preferred_endpoints,
bool& is_bounced_read,
service_permit permit);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
@@ -379,7 +380,7 @@ private:
dht::partition_range_vector partition_ranges,
db::consistency_level cl,
coordinator_query_options optional_params);
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
static inet_address_vector_replica_set intersection(const inet_address_vector_replica_set& l1, const inet_address_vector_replica_set& l2);
future<query_partition_key_range_concurrent_result> query_partition_key_range_concurrent(clock_type::time_point timeout,
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
lw_shared_ptr<query::read_command> cmd,
@@ -433,7 +434,7 @@ private:
future<> send_to_endpoint(
std::unique_ptr<mutation_holder> m,
gms::inet_address target,
std::vector<gms::inet_address> pending_endpoints,
inet_address_vector_topology_change pending_endpoints,
db::write_type type,
tracing::trace_state_ptr tr_state,
write_stats& stats,
@@ -561,9 +562,9 @@ public:
// Inspired by Cassandra's StorageProxy.sendToHintedEndpoints but without
// hinted handoff support, and just one target. See also
// send_to_live_endpoints() - another take on the same original function.
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type,
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type,
tracing::trace_state_ptr tr_state, write_stats& stats, allow_hints allow_hints = allow_hints::yes);
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type,
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, inet_address_vector_topology_change pending_endpoints, db::write_type type,
tracing::trace_state_ptr tr_state, allow_hints allow_hints = allow_hints::yes);
// Send a mutation to a specific remote target as a hint.
@@ -692,7 +693,7 @@ private:
// Is either set explicitly or derived from the consistency level set in keyspace options.
db::consistency_level _cl_for_learn;
// Live endpoints, as per get_paxos_participants()
std::vector<gms::inet_address> _live_endpoints;
inet_address_vector_replica_set _live_endpoints;
// How many endpoints need to respond favourably for the protocol to progress to the next step.
size_t _required_participants;
// A deadline when the entire CAS operation timeout expires, derived from write_request_timeout_in_ms

View File

@@ -773,12 +773,12 @@ storage_service::get_rpc_address(const inet_address& endpoint) const {
return boost::lexical_cast<std::string>(endpoint);
}
std::unordered_map<dht::token_range, std::vector<inet_address>>
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
storage_service::get_range_to_address_map(const sstring& keyspace) const {
return get_range_to_address_map(keyspace, get_token_metadata().sorted_tokens());
}
std::unordered_map<dht::token_range, std::vector<inet_address>>
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
storage_service::get_range_to_address_map_in_local_dc(
const sstring& keyspace) const {
std::function<bool(const inet_address&)> filter = [this](const inet_address& address) {
@@ -786,7 +786,7 @@ storage_service::get_range_to_address_map_in_local_dc(
};
auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc());
std::unordered_map<dht::token_range, std::vector<inet_address>> filtered_map;
std::unordered_map<dht::token_range, inet_address_vector_replica_set> filtered_map;
for (auto entry : orig_map) {
auto& addresses = filtered_map[entry.first];
addresses.reserve(entry.second.size());
@@ -815,7 +815,7 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
return remote_dc == local_dc;
}
std::unordered_map<dht::token_range, std::vector<inet_address>>
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
storage_service::get_range_to_address_map(const sstring& keyspace,
const std::vector<token>& sorted_tokens) const {
sstring ks = keyspace;
@@ -2752,7 +2752,7 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
slogger.debug("Node {} ranges [{}]", endpoint, ranges);
std::unordered_map<dht::token_range, std::vector<inet_address>> current_replica_endpoints;
std::unordered_map<dht::token_range, inet_address_vector_replica_set> current_replica_endpoints;
// Find (for each range) all nodes that store replicas for these ranges as well
auto tmptr = get_token_metadata_ptr();
@@ -2791,7 +2791,7 @@ std::unordered_multimap<dht::token_range, inet_address> storage_service::get_cha
auto rg = current_replica_endpoints.equal_range(r);
for (auto it = rg.first; it != rg.second; it++) {
const dht::token_range& range_ = it->first;
std::vector<inet_address>& current_eps = it->second;
inet_address_vector_replica_set& current_eps = it->second;
slogger.debug("range={}, current_replica_endpoints={}, new_replica_endpoints={}", range_, current_eps, new_replica_endpoints);
for (auto ep : it->second) {
auto beg = new_replica_endpoints.begin();
@@ -3149,7 +3149,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name,
ops_uuid, nr_sst_current, nr_sst_current + sst_processed.size(), nr_sst_total, sst_names);
auto start_time = std::chrono::steady_clock::now();
std::vector<gms::inet_address> current_targets;
inet_address_vector_replica_set current_targets;
std::unordered_map<gms::inet_address, send_meta_data> metas;
size_t num_partitions_processed = 0;
size_t num_bytes_read = 0;
@@ -3315,19 +3315,19 @@ storage_service::get_new_source_ranges(const sstring& keyspace_name, const dht::
auto my_address = get_broadcast_address();
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strat = ks.get_replication_strategy();
std::unordered_map<dht::token_range, std::vector<inet_address>> range_addresses = strat.get_range_addresses(tm, utils::can_yield::yes);
std::unordered_map<dht::token_range, inet_address_vector_replica_set> range_addresses = strat.get_range_addresses(tm, utils::can_yield::yes);
std::unordered_multimap<inet_address, dht::token_range> source_ranges;
// find alive sources for our new ranges
for (auto r : ranges) {
std::vector<inet_address> possible_nodes;
inet_address_vector_replica_set possible_nodes;
auto it = range_addresses.find(r);
if (it != range_addresses.end()) {
possible_nodes = it->second;
}
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
std::vector<inet_address> sources = snitch->get_sorted_list_by_proximity(my_address, possible_nodes);
inet_address_vector_replica_set sources = snitch->get_sorted_list_by_proximity(my_address, possible_nodes);
if (std::find(sources.begin(), sources.end(), my_address) != sources.end()) {
auto err = format("get_new_source_ranges: sources={}, my_address={}", sources, my_address);
@@ -3357,7 +3357,7 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
std::vector<token_range_endpoints> ranges;
//Token.TokenFactory tf = getPartitioner().getTokenFactory();
std::unordered_map<dht::token_range, std::vector<inet_address>> range_to_address_map =
std::unordered_map<dht::token_range, inet_address_vector_replica_set> range_to_address_map =
include_only_local_dc
? get_range_to_address_map_in_local_dc(keyspace)
: get_range_to_address_map(keyspace);
@@ -3401,11 +3401,11 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
return ranges;
}
std::unordered_map<dht::token_range, std::vector<inet_address>>
std::unordered_map<dht::token_range, inet_address_vector_replica_set>
storage_service::construct_range_to_endpoint_map(
const sstring& keyspace,
const dht::token_range_vector& ranges) const {
std::unordered_map<dht::token_range, std::vector<inet_address>> res;
std::unordered_map<dht::token_range, inet_address_vector_replica_set> res;
for (auto r : ranges) {
res[r] = _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(
r.end() ? r.end()->value() : dht::maximum_token());
@@ -3670,7 +3670,7 @@ storage_service::get_all_ranges(const std::vector<token>& sorted_tokens) const {
return ranges;
}
std::vector<gms::inet_address>
inet_address_vector_replica_set
storage_service::get_natural_endpoints(const sstring& keyspace,
const sstring& cf, const sstring& key) const {
auto schema = _db.local().find_schema(keyspace, cf);
@@ -3679,7 +3679,7 @@ storage_service::get_natural_endpoints(const sstring& keyspace,
return get_natural_endpoints(keyspace, token);
}
std::vector<gms::inet_address>
inet_address_vector_replica_set
storage_service::get_natural_endpoints(const sstring& keyspace, const token& pos) const {
return _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(pos);
}

View File

@@ -43,6 +43,7 @@
#include "service/endpoint_lifecycle_subscriber.hh"
#include "locator/token_metadata.hh"
#include "gms/gossiper.hh"
#include "inet_address_vectors.hh"
#include <seastar/core/distributed.hh>
#include "dht/i_partitioner.hh"
#include "dht/token_range_endpoints.hh"
@@ -469,16 +470,16 @@ public:
*/
sstring get_rpc_address(const inet_address& endpoint) const;
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace) const;
std::unordered_map<dht::token_range, inet_address_vector_replica_set> get_range_to_address_map(const sstring& keyspace) const;
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_to_address_map_in_local_dc(
std::unordered_map<dht::token_range, inet_address_vector_replica_set> get_range_to_address_map_in_local_dc(
const sstring& keyspace) const;
std::vector<token> get_tokens_in_local_dc() const;
bool is_local_dc(const inet_address& targetHost) const;
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace,
std::unordered_map<dht::token_range, inet_address_vector_replica_set> get_range_to_address_map(const sstring& keyspace,
const std::vector<token>& sorted_tokens) const;
/**
@@ -511,7 +512,7 @@ public:
* @param ranges
* @return mapping of ranges to the replicas responsible for them.
*/
std::unordered_map<dht::token_range, std::vector<inet_address>> construct_range_to_endpoint_map(
std::unordered_map<dht::token_range, inet_address_vector_replica_set> construct_range_to_endpoint_map(
const sstring& keyspace,
const dht::token_range_vector& ranges) const;
public:
@@ -728,7 +729,7 @@ public:
* @param key key for which we need to find the endpoint
* @return the endpoint responsible for this key
*/
std::vector<gms::inet_address> get_natural_endpoints(const sstring& keyspace,
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace,
const sstring& cf, const sstring& key) const;
/**
@@ -739,7 +740,7 @@ public:
* @param pos position for which we need to find the endpoint
* @return the endpoint responsible for this token
*/
std::vector<gms::inet_address> get_natural_endpoints(const sstring& keyspace, const token& pos) const;
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const token& pos) const;
/**
* @return Vector of Token ranges (_not_ keys!) together with estimated key count,

View File

@@ -48,7 +48,7 @@ struct ring_point {
inet_address host;
};
void print_natural_endpoints(double point, const std::vector<inet_address> v) {
void print_natural_endpoints(double point, const inet_address_vector_replica_set v) {
testlog.debug("Natural endpoints for a token {}:", point);
std::string str;
std::ostringstream strm(str);
@@ -105,7 +105,7 @@ void strategy_sanity_check(
void endpoints_check(
abstract_replication_strategy* ars_ptr,
std::vector<inet_address>& endpoints) {
inet_address_vector_replica_set& endpoints) {
// Check the total RF
BOOST_CHECK(endpoints.size() == ars_ptr->get_replication_factor());

View File

@@ -468,4 +468,19 @@ public:
}
};
template <typename T, size_t N>
std::ostream& operator<<(std::ostream& os, const utils::small_vector<T, N>& v) {
os << "{";
bool first = true;
for (auto&& e : v) {
if (!first) {
os << ", ";
}
first = false;
os << e;
}
os << "}";
return os;
}
}