diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index d6ac33ccc4..85debb3543 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -341,10 +341,10 @@ future<> db::batchlog_manager::replay_all_failed_batches() { }); } -std::unordered_set db::batchlog_manager::endpoint_filter(const sstring& local_rack, const std::unordered_map>& endpoints) { +inet_address_vector_replica_set db::batchlog_manager::endpoint_filter(const sstring& local_rack, const std::unordered_map>& endpoints) { // special case for single-node data centers if (endpoints.size() == 1 && endpoints.begin()->second.size() == 1) { - return endpoints.begin()->second; + return boost::copy_range(endpoints.begin()->second); } // strip out dead endpoints and localhost @@ -364,7 +364,7 @@ std::unordered_set db::batchlog_manager::endpoint_filter(cons } } - typedef std::unordered_set return_type; + typedef inet_address_vector_replica_set return_type; if (validated.size() <= 2) { return boost::copy_range(validated | boost::adaptors::map_values); @@ -395,13 +395,13 @@ std::unordered_set db::batchlog_manager::endpoint_filter(cons racks.resize(2); } - std::unordered_set result; + inet_address_vector_replica_set result; // grab a random member of up to two racks for (auto& rack : racks) { auto cpy = boost::copy_range>(validated.equal_range(rack) | boost::adaptors::map_values); std::uniform_int_distribution rdist(0, cpy.size() - 1); - result.emplace(cpy[rdist(_e1)]); + result.emplace_back(cpy[rdist(_e1)]); } return result; diff --git a/db/batchlog_manager.hh b/db/batchlog_manager.hh index cc28853806..fcd0d01b8b 100644 --- a/db/batchlog_manager.hh +++ b/db/batchlog_manager.hh @@ -49,6 +49,7 @@ #include #include "gms/inet_address.hh" +#include "inet_address_vectors.hh" #include "db_clock.hh" #include "mutation.hh" #include "utils/UUID.hh" @@ -117,7 +118,7 @@ public: mutation get_batch_log_mutation_for(const std::vector&, const utils::UUID&, int32_t, db_clock::time_point); db_clock::duration get_batch_log_timeout() const; - std::unordered_set endpoint_filter(const sstring&, const std::unordered_map>&); + inet_address_vector_replica_set endpoint_filter(const sstring&, const std::unordered_map>&); }; extern distributed _the_batchlog_manager; diff --git a/db/consistency_level.cc b/db/consistency_level.cc index a8528f1b7a..b6dddac802 100644 --- a/db/consistency_level.cc +++ b/db/consistency_level.cc @@ -268,7 +268,7 @@ void assure_sufficient_live_nodes( } template void assure_sufficient_live_nodes(consistency_level, keyspace&, const inet_address_vector_replica_set&, const std::array&); -template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const std::unordered_set&, const utils::small_vector&); +template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const inet_address_vector_replica_set&, const utils::small_vector&); inet_address_vector_replica_set filter_for_query(consistency_level cl, diff --git a/db/consistency_level.hh b/db/consistency_level.hh index 009c2a5e37..7737378929 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -109,6 +109,6 @@ void assure_sufficient_live_nodes( const PendingRange& pending_endpoints = std::array()); extern template void assure_sufficient_live_nodes(consistency_level, keyspace&, const inet_address_vector_replica_set&, const std::array&); -extern template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const std::unordered_set&, const utils::small_vector&); +extern template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const inet_address_vector_replica_set&, const utils::small_vector&); } diff --git a/service/paxos/prepare_summary.cc b/service/paxos/prepare_summary.cc index 18c469e21e..5ab8303e2f 100644 --- a/service/paxos/prepare_summary.cc +++ b/service/paxos/prepare_summary.cc @@ -59,9 +59,9 @@ void prepare_summary::update_most_recent_promised_ballot(utils::UUID ballot) { } } -std::unordered_set +inet_address_vector_replica_set prepare_summary::replicas_missing_most_recent_commit(schema_ptr s, std::chrono::seconds now_in_sec) const { - std::unordered_set replicas; + inet_address_vector_replica_set replicas; // In general, we need every replica that has answered to the prepare (a quorum) to agree on the MRC (see // comment in storage_proxy::begin_and_repair_paxos(), but basically we need to make sure at least a quorum of nodes // have learned a commit before committing a new one, otherwise that previous commit is not guaranteed to have reached a @@ -79,7 +79,7 @@ prepare_summary::replicas_missing_most_recent_commit(schema_ptr s, std::chrono:: for (const auto& it: committed_ballots_by_replica) { if (it.second != most_recent_commit->ballot) { - replicas.insert(it.first); + replicas.push_back(it.first); } } return replicas; diff --git a/service/paxos/prepare_summary.hh b/service/paxos/prepare_summary.hh index cc35e2f024..ace06fad49 100644 --- a/service/paxos/prepare_summary.hh +++ b/service/paxos/prepare_summary.hh @@ -43,6 +43,7 @@ #pragma once #include "utils/UUID_gen.hh" #include "service/paxos/proposal.hh" +#include "inet_address_vectors.hh" namespace service { @@ -83,7 +84,7 @@ public: public: prepare_summary(size_t node_count); - std::unordered_set replicas_missing_most_recent_commit(schema_ptr s, std::chrono::seconds now_in_sec) const; + inet_address_vector_replica_set replicas_missing_most_recent_commit(schema_ptr s, std::chrono::seconds now_in_sec) const; void update_most_recent_promised_ballot(utils::UUID ballot); }; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index bdf4ea5eed..555cb66bee 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -372,7 +372,7 @@ protected: size_t _total_block_for = 0; db::write_type _type; std::unique_ptr _mutation_holder; - std::unordered_set _targets; // who we sent this mutation to + inet_address_vector_replica_set _targets; // who we sent this mutation to // added dead_endpoints as a memeber here as well. This to be able to carry the info across // calls in helper methods in a convinient way. Since we hope this will be empty most of the time // it should not be a huge burden. (flw) @@ -400,7 +400,7 @@ protected: public: abstract_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - std::unique_ptr mh, std::unordered_set targets, tracing::trace_state_ptr trace_state, + std::unique_ptr mh, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, storage_proxy::write_stats& stats, service_permit permit, size_t pending_endpoints = 0, inet_address_vector_topology_change dead_endpoints = {}) : _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), _dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }), _permit(std::move(permit)) { @@ -481,10 +481,12 @@ public: } // return true on last ack bool response(gms::inet_address from) { - auto it = _targets.find(from); + auto it = boost::find(_targets, from); if (it != _targets.end()) { signal(from); - _targets.erase(it); + using std::swap; + swap(*it, _targets.back()); + _targets.pop_back(); } else { slogger.warn("Receive outdated write ack from {}", from); } @@ -493,7 +495,7 @@ public: // return true if handler is no longer needed because // CL cannot be reached bool failure_response(gms::inet_address from, size_t count, error err) { - if (!_targets.contains(from)) { + if (boost::find(_targets, from) == _targets.end()) { // There is a little change we can get outdated reply // if the coordinator was restarted after sending a request and // getting reply back. The chance is low though since initial @@ -592,7 +594,7 @@ public: future<> wait() { return _ready.get_future(); } - const std::unordered_set& get_targets() const { + const inet_address_vector_replica_set& get_targets() const { return _targets; } const inet_address_vector_topology_change& get_dead_endpoints() const { @@ -637,7 +639,7 @@ class datacenter_write_response_handler : public abstract_write_response_handler public: datacenter_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - std::unique_ptr mh, std::unordered_set targets, + std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), @@ -652,7 +654,7 @@ class write_response_handler : public abstract_write_response_handler { } public: write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - std::unique_ptr mh, std::unordered_set targets, + std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), @@ -664,7 +666,7 @@ public: class view_update_write_response_handler : public write_response_handler, public bi::list_base_hook> { public: view_update_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, - std::unique_ptr mh, std::unordered_set targets, + std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit): write_response_handler(p, ks, cl, db::write_type::VIEW, std::move(mh), @@ -744,7 +746,7 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha } public: datacenter_sync_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - std::unique_ptr mh, std::unordered_set targets, const inet_address_vector_topology_change& pending_endpoints, + std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), 0, dead_endpoints) { auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); @@ -884,11 +886,11 @@ paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& conte // those nodes, and retry. auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot); - std::unordered_set missing_mrc = summary.replicas_missing_most_recent_commit(_schema, now_in_sec); + inet_address_vector_replica_set missing_mrc = summary.replicas_missing_most_recent_commit(_schema, now_in_sec); if (missing_mrc.size() > 0) { paxos::paxos_state::logger.debug("CAS[{}] Repairing replicas that missed the most recent commit", _id); tracing::trace(tr_state, "Repairing replicas that missed the most recent commit"); - std::array, schema_ptr, dht::token, std::unordered_set>, 1> + std::array, schema_ptr, dht::token, inet_address_vector_replica_set>, 1> m{std::make_tuple(make_lw_shared(std::move(*summary.most_recent_commit)), _schema, _key.token(), std::move(missing_mrc))}; // create_write_response_handler is overloaded for paxos::proposal and will // create cas_mutation holder, which consequently will ensure paxos::learn is @@ -1422,7 +1424,7 @@ future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_ } storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, - std::unordered_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, + inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) { shared_ptr h; @@ -1953,11 +1955,11 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok } // filter live endpoints from dead ones - std::unordered_set live_endpoints; + inet_address_vector_replica_set live_endpoints; inet_address_vector_topology_change dead_endpoints; live_endpoints.reserve(all.size()); dead_endpoints.reserve(all.size()); - std::partition_copy(all.begin(), all.end(), std::inserter(live_endpoints, live_endpoints.begin()), + std::partition_copy(all.begin(), all.end(), std::back_inserter(live_endpoints), std::back_inserter(dead_endpoints), std::bind1st(std::mem_fn(&gms::gossiper::is_alive), &gms::get_local_gossiper())); slogger.trace("creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints); @@ -1990,7 +1992,8 @@ storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consiste storage_proxy::response_id_type storage_proxy::create_write_response_handler(const std::unordered_map>& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) { - std::unordered_set endpoints(m.size()); + inet_address_vector_replica_set endpoints; + endpoints.reserve(m.size()); boost::copy(m | boost::adaptors::map_keys, std::inserter(endpoints, endpoints.begin())); auto mh = std::make_unique(m); @@ -2013,7 +2016,7 @@ storage_proxy::create_write_response_handler(const std::tuple, schema_ptr, dht::token, std::unordered_set>& meta, +storage_proxy::create_write_response_handler(const std::tuple, schema_ptr, dht::token, inet_address_vector_replica_set>& meta, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) { auto& [commit, s, token, endpoints] = meta; @@ -2404,7 +2407,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc service_permit _permit; const utils::UUID _batch_uuid; - const std::unordered_set _batchlog_endpoints; + const inet_address_vector_replica_set _batchlog_endpoints; public: context(storage_proxy & p, std::vector&& mutations, lw_shared_ptr&& cdc_tracker, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit) @@ -2419,7 +2422,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc , _permit(std::move(permit)) , _batch_uuid(utils::UUID_gen::get_time_UUID()) , _batchlog_endpoints( - [this]() -> std::unordered_set { + [this]() -> inet_address_vector_replica_set { auto local_addr = utils::fb_utilities::get_broadcast_address(); auto& topology = _tmptr->get_topology(); auto& local_endpoints = topology.get_datacenter_racks().at(get_local_dc()); @@ -2532,7 +2535,7 @@ future<> storage_proxy::send_to_endpoint( std::unique_ptr& m, db::consistency_level cl, db::write_type type, service_permit permit) mutable { - std::unordered_set targets; + inet_address_vector_replica_set targets; targets.reserve(pending_endpoints.size() + 1); inet_address_vector_topology_change dead_endpoints; boost::algorithm::partition_copy( @@ -5441,7 +5444,8 @@ void storage_proxy::retire_view_response_handlers(noncopyable_function mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); - response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, + response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit); response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(const std::unordered_map>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(const std::tuple, schema_ptr, shared_ptr, dht::token>& proposal, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); - response_id_type create_write_response_handler(const std::tuple, schema_ptr, dht::token, std::unordered_set>& meta, + response_id_type create_write_response_handler(const std::tuple, schema_ptr, dht::token, inet_address_vector_replica_set>& meta, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr tracker); void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout); diff --git a/tracing/trace_state.cc b/tracing/trace_state.cc index 8f8bb7bd93..bbac968eba 100644 --- a/tracing/trace_state.cc +++ b/tracing/trace_state.cc @@ -55,7 +55,7 @@ namespace tracing { logging::logger trace_state_logger("trace_state"); struct trace_state::params_values { - std::optional> batchlog_endpoints; + std::optional batchlog_endpoints; std::optional user_timestamp; std::vector queries; std::optional cl; @@ -103,7 +103,7 @@ void trace_state::init_session_records( } -void trace_state::set_batchlog_endpoints(const std::unordered_set& val) { +void trace_state::set_batchlog_endpoints(const inet_address_vector_replica_set& val) { _params_ptr->batchlog_endpoints.emplace(val); } diff --git a/tracing/trace_state.hh b/tracing/trace_state.hh index 72e2f2b93b..ae37e80721 100644 --- a/tracing/trace_state.hh +++ b/tracing/trace_state.hh @@ -51,6 +51,7 @@ #include "db/consistency_level_type.hh" #include "types.hh" #include "timestamp.hh" +#include "inet_address_vectors.hh" namespace cql3{ class query_options; @@ -300,7 +301,7 @@ private: * * @param val the set of batchlog endpoints */ - void set_batchlog_endpoints(const std::unordered_set& val); + void set_batchlog_endpoints(const inet_address_vector_replica_set& val); /** * Stores a consistency level of a query being traced. @@ -490,7 +491,7 @@ private: friend void set_page_size(const trace_state_ptr& p, int32_t val); friend void set_request_size(const trace_state_ptr& p, size_t s) noexcept; friend void set_response_size(const trace_state_ptr& p, size_t s) noexcept; - friend void set_batchlog_endpoints(const trace_state_ptr& p, const std::unordered_set& val); + friend void set_batchlog_endpoints(const trace_state_ptr& p, const inet_address_vector_replica_set& val); friend void set_consistency_level(const trace_state_ptr& p, db::consistency_level val); friend void set_optional_serial_consistency_level(const trace_state_ptr& p, const std::optional&val); friend void add_query(const trace_state_ptr& p, sstring_view val); @@ -612,7 +613,7 @@ inline void set_response_size(const trace_state_ptr& p, size_t s) noexcept { } } -inline void set_batchlog_endpoints(const trace_state_ptr& p, const std::unordered_set& val) { +inline void set_batchlog_endpoints(const trace_state_ptr& p, const inet_address_vector_replica_set& val) { if (p) { p->set_batchlog_endpoints(val); }