From 4d70f3baeee6cee058b404bc6f0152b149614c59 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sat, 12 Jun 2021 11:09:54 +0300 Subject: [PATCH] storage_proxy: change unordered_set to small_vector in write path The write paths in storage_proxy pass replica sets as std::unordered_set. This is a complex type, with N+1 allocations for N members, so we change it to a small_vector (via inet_address_vector_replica_set) which requires just one allocation, and even zero when up to three replicas are used. This change is more nuanced than the corresponding change to the read path abe3d7d7 ("Merge 'storage_proxy: use small_vector for vectors of inet_address' from Avi Kivity"), for two reasons: - there is a quadratic algorithm in abstract_write_response_handler::response(): it searches for a replica and erases it. Since this happens for every replica, it happens N^2/2 times. - replica sets for writes always include all datacenters, while reads usually involve just one datacenter. So, a write to a keyspace that has 5 datacenters will invoke 15*(15-1)/2 =105 compares. We could remove this by sending the index of the replica in the replica set to the replica and ask it to include the index in the response, but I think that this is unnecessary. Those 105 compares need to be only 105/15 = 7 times cheaper than the corresponding unordered_set operation, which they surely will. Handling a response after a cross-datacenter round trip surely involves L3 cache misses, and a small_vector reduces these to a minimum compared to an unordered_set with its bucket table, linked list walking and managent, and table rehashing. Tests using perf_simple_query --write --smp 1 --operations-per-shard 1000000 --task-quota-ms show two allocations removed (as expected) and a nice reduction in instructions executed. before: median 204842.54 tps ( 54.2 allocs/op, 13.2 tasks/op, 49890 insns/op) after: median 206077.65 tps ( 52.2 allocs/op, 13.2 tasks/op, 49138 insns/op) Closes #8847 --- db/batchlog_manager.cc | 10 +++---- db/batchlog_manager.hh | 3 ++- db/consistency_level.cc | 2 +- db/consistency_level.hh | 2 +- service/paxos/prepare_summary.cc | 6 ++--- service/paxos/prepare_summary.hh | 3 ++- service/storage_proxy.cc | 46 +++++++++++++++++--------------- service/storage_proxy.hh | 4 +-- tracing/trace_state.cc | 4 +-- tracing/trace_state.hh | 7 ++--- 10 files changed, 47 insertions(+), 40 deletions(-) diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index d6ac33ccc4..85debb3543 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -341,10 +341,10 @@ future<> db::batchlog_manager::replay_all_failed_batches() { }); } -std::unordered_set db::batchlog_manager::endpoint_filter(const sstring& local_rack, const std::unordered_map>& endpoints) { +inet_address_vector_replica_set db::batchlog_manager::endpoint_filter(const sstring& local_rack, const std::unordered_map>& endpoints) { // special case for single-node data centers if (endpoints.size() == 1 && endpoints.begin()->second.size() == 1) { - return endpoints.begin()->second; + return boost::copy_range(endpoints.begin()->second); } // strip out dead endpoints and localhost @@ -364,7 +364,7 @@ std::unordered_set db::batchlog_manager::endpoint_filter(cons } } - typedef std::unordered_set return_type; + typedef inet_address_vector_replica_set return_type; if (validated.size() <= 2) { return boost::copy_range(validated | boost::adaptors::map_values); @@ -395,13 +395,13 @@ std::unordered_set db::batchlog_manager::endpoint_filter(cons racks.resize(2); } - std::unordered_set result; + inet_address_vector_replica_set result; // grab a random member of up to two racks for (auto& rack : racks) { auto cpy = boost::copy_range>(validated.equal_range(rack) | boost::adaptors::map_values); std::uniform_int_distribution rdist(0, cpy.size() - 1); - result.emplace(cpy[rdist(_e1)]); + result.emplace_back(cpy[rdist(_e1)]); } return result; diff --git a/db/batchlog_manager.hh b/db/batchlog_manager.hh index cc28853806..fcd0d01b8b 100644 --- a/db/batchlog_manager.hh +++ b/db/batchlog_manager.hh @@ -49,6 +49,7 @@ #include #include "gms/inet_address.hh" +#include "inet_address_vectors.hh" #include "db_clock.hh" #include "mutation.hh" #include "utils/UUID.hh" @@ -117,7 +118,7 @@ public: mutation get_batch_log_mutation_for(const std::vector&, const utils::UUID&, int32_t, db_clock::time_point); db_clock::duration get_batch_log_timeout() const; - std::unordered_set endpoint_filter(const sstring&, const std::unordered_map>&); + inet_address_vector_replica_set endpoint_filter(const sstring&, const std::unordered_map>&); }; extern distributed _the_batchlog_manager; diff --git a/db/consistency_level.cc b/db/consistency_level.cc index a8528f1b7a..b6dddac802 100644 --- a/db/consistency_level.cc +++ b/db/consistency_level.cc @@ -268,7 +268,7 @@ void assure_sufficient_live_nodes( } template void assure_sufficient_live_nodes(consistency_level, keyspace&, const inet_address_vector_replica_set&, const std::array&); -template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const std::unordered_set&, const utils::small_vector&); +template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const inet_address_vector_replica_set&, const utils::small_vector&); inet_address_vector_replica_set filter_for_query(consistency_level cl, diff --git a/db/consistency_level.hh b/db/consistency_level.hh index 009c2a5e37..7737378929 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -109,6 +109,6 @@ void assure_sufficient_live_nodes( const PendingRange& pending_endpoints = std::array()); extern template void assure_sufficient_live_nodes(consistency_level, keyspace&, const inet_address_vector_replica_set&, const std::array&); -extern template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const std::unordered_set&, const utils::small_vector&); +extern template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const inet_address_vector_replica_set&, const utils::small_vector&); } diff --git a/service/paxos/prepare_summary.cc b/service/paxos/prepare_summary.cc index 18c469e21e..5ab8303e2f 100644 --- a/service/paxos/prepare_summary.cc +++ b/service/paxos/prepare_summary.cc @@ -59,9 +59,9 @@ void prepare_summary::update_most_recent_promised_ballot(utils::UUID ballot) { } } -std::unordered_set +inet_address_vector_replica_set prepare_summary::replicas_missing_most_recent_commit(schema_ptr s, std::chrono::seconds now_in_sec) const { - std::unordered_set replicas; + inet_address_vector_replica_set replicas; // In general, we need every replica that has answered to the prepare (a quorum) to agree on the MRC (see // comment in storage_proxy::begin_and_repair_paxos(), but basically we need to make sure at least a quorum of nodes // have learned a commit before committing a new one, otherwise that previous commit is not guaranteed to have reached a @@ -79,7 +79,7 @@ prepare_summary::replicas_missing_most_recent_commit(schema_ptr s, std::chrono:: for (const auto& it: committed_ballots_by_replica) { if (it.second != most_recent_commit->ballot) { - replicas.insert(it.first); + replicas.push_back(it.first); } } return replicas; diff --git a/service/paxos/prepare_summary.hh b/service/paxos/prepare_summary.hh index cc35e2f024..ace06fad49 100644 --- a/service/paxos/prepare_summary.hh +++ b/service/paxos/prepare_summary.hh @@ -43,6 +43,7 @@ #pragma once #include "utils/UUID_gen.hh" #include "service/paxos/proposal.hh" +#include "inet_address_vectors.hh" namespace service { @@ -83,7 +84,7 @@ public: public: prepare_summary(size_t node_count); - std::unordered_set replicas_missing_most_recent_commit(schema_ptr s, std::chrono::seconds now_in_sec) const; + inet_address_vector_replica_set replicas_missing_most_recent_commit(schema_ptr s, std::chrono::seconds now_in_sec) const; void update_most_recent_promised_ballot(utils::UUID ballot); }; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index bdf4ea5eed..555cb66bee 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -372,7 +372,7 @@ protected: size_t _total_block_for = 0; db::write_type _type; std::unique_ptr _mutation_holder; - std::unordered_set _targets; // who we sent this mutation to + inet_address_vector_replica_set _targets; // who we sent this mutation to // added dead_endpoints as a memeber here as well. This to be able to carry the info across // calls in helper methods in a convinient way. Since we hope this will be empty most of the time // it should not be a huge burden. (flw) @@ -400,7 +400,7 @@ protected: public: abstract_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - std::unique_ptr mh, std::unordered_set targets, tracing::trace_state_ptr trace_state, + std::unique_ptr mh, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state, storage_proxy::write_stats& stats, service_permit permit, size_t pending_endpoints = 0, inet_address_vector_topology_change dead_endpoints = {}) : _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), _dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }), _permit(std::move(permit)) { @@ -481,10 +481,12 @@ public: } // return true on last ack bool response(gms::inet_address from) { - auto it = _targets.find(from); + auto it = boost::find(_targets, from); if (it != _targets.end()) { signal(from); - _targets.erase(it); + using std::swap; + swap(*it, _targets.back()); + _targets.pop_back(); } else { slogger.warn("Receive outdated write ack from {}", from); } @@ -493,7 +495,7 @@ public: // return true if handler is no longer needed because // CL cannot be reached bool failure_response(gms::inet_address from, size_t count, error err) { - if (!_targets.contains(from)) { + if (boost::find(_targets, from) == _targets.end()) { // There is a little change we can get outdated reply // if the coordinator was restarted after sending a request and // getting reply back. The chance is low though since initial @@ -592,7 +594,7 @@ public: future<> wait() { return _ready.get_future(); } - const std::unordered_set& get_targets() const { + const inet_address_vector_replica_set& get_targets() const { return _targets; } const inet_address_vector_topology_change& get_dead_endpoints() const { @@ -637,7 +639,7 @@ class datacenter_write_response_handler : public abstract_write_response_handler public: datacenter_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - std::unique_ptr mh, std::unordered_set targets, + std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), @@ -652,7 +654,7 @@ class write_response_handler : public abstract_write_response_handler { } public: write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - std::unique_ptr mh, std::unordered_set targets, + std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), @@ -664,7 +666,7 @@ public: class view_update_write_response_handler : public write_response_handler, public bi::list_base_hook> { public: view_update_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, - std::unique_ptr mh, std::unordered_set targets, + std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit): write_response_handler(p, ks, cl, db::write_type::VIEW, std::move(mh), @@ -744,7 +746,7 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha } public: datacenter_sync_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - std::unique_ptr mh, std::unordered_set targets, const inet_address_vector_topology_change& pending_endpoints, + std::unique_ptr mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), 0, dead_endpoints) { auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); @@ -884,11 +886,11 @@ paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& conte // those nodes, and retry. auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot); - std::unordered_set missing_mrc = summary.replicas_missing_most_recent_commit(_schema, now_in_sec); + inet_address_vector_replica_set missing_mrc = summary.replicas_missing_most_recent_commit(_schema, now_in_sec); if (missing_mrc.size() > 0) { paxos::paxos_state::logger.debug("CAS[{}] Repairing replicas that missed the most recent commit", _id); tracing::trace(tr_state, "Repairing replicas that missed the most recent commit"); - std::array, schema_ptr, dht::token, std::unordered_set>, 1> + std::array, schema_ptr, dht::token, inet_address_vector_replica_set>, 1> m{std::make_tuple(make_lw_shared(std::move(*summary.most_recent_commit)), _schema, _key.token(), std::move(missing_mrc))}; // create_write_response_handler is overloaded for paxos::proposal and will // create cas_mutation holder, which consequently will ensure paxos::learn is @@ -1422,7 +1424,7 @@ future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_ } storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, - std::unordered_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, + inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) { shared_ptr h; @@ -1953,11 +1955,11 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok } // filter live endpoints from dead ones - std::unordered_set live_endpoints; + inet_address_vector_replica_set live_endpoints; inet_address_vector_topology_change dead_endpoints; live_endpoints.reserve(all.size()); dead_endpoints.reserve(all.size()); - std::partition_copy(all.begin(), all.end(), std::inserter(live_endpoints, live_endpoints.begin()), + std::partition_copy(all.begin(), all.end(), std::back_inserter(live_endpoints), std::back_inserter(dead_endpoints), std::bind1st(std::mem_fn(&gms::gossiper::is_alive), &gms::get_local_gossiper())); slogger.trace("creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints); @@ -1990,7 +1992,8 @@ storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consiste storage_proxy::response_id_type storage_proxy::create_write_response_handler(const std::unordered_map>& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) { - std::unordered_set endpoints(m.size()); + inet_address_vector_replica_set endpoints; + endpoints.reserve(m.size()); boost::copy(m | boost::adaptors::map_keys, std::inserter(endpoints, endpoints.begin())); auto mh = std::make_unique(m); @@ -2013,7 +2016,7 @@ storage_proxy::create_write_response_handler(const std::tuple, schema_ptr, dht::token, std::unordered_set>& meta, +storage_proxy::create_write_response_handler(const std::tuple, schema_ptr, dht::token, inet_address_vector_replica_set>& meta, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) { auto& [commit, s, token, endpoints] = meta; @@ -2404,7 +2407,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc service_permit _permit; const utils::UUID _batch_uuid; - const std::unordered_set _batchlog_endpoints; + const inet_address_vector_replica_set _batchlog_endpoints; public: context(storage_proxy & p, std::vector&& mutations, lw_shared_ptr&& cdc_tracker, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit) @@ -2419,7 +2422,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc , _permit(std::move(permit)) , _batch_uuid(utils::UUID_gen::get_time_UUID()) , _batchlog_endpoints( - [this]() -> std::unordered_set { + [this]() -> inet_address_vector_replica_set { auto local_addr = utils::fb_utilities::get_broadcast_address(); auto& topology = _tmptr->get_topology(); auto& local_endpoints = topology.get_datacenter_racks().at(get_local_dc()); @@ -2532,7 +2535,7 @@ future<> storage_proxy::send_to_endpoint( std::unique_ptr& m, db::consistency_level cl, db::write_type type, service_permit permit) mutable { - std::unordered_set targets; + inet_address_vector_replica_set targets; targets.reserve(pending_endpoints.size() + 1); inet_address_vector_topology_change dead_endpoints; boost::algorithm::partition_copy( @@ -5441,7 +5444,8 @@ void storage_proxy::retire_view_response_handlers(noncopyable_function mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); - response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, + response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit); response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(const std::unordered_map>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); response_id_type create_write_response_handler(const std::tuple, schema_ptr, shared_ptr, dht::token>& proposal, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); - response_id_type create_write_response_handler(const std::tuple, schema_ptr, dht::token, std::unordered_set>& meta, + response_id_type create_write_response_handler(const std::tuple, schema_ptr, dht::token, inet_address_vector_replica_set>& meta, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit); void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr tracker); void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout); diff --git a/tracing/trace_state.cc b/tracing/trace_state.cc index 8f8bb7bd93..bbac968eba 100644 --- a/tracing/trace_state.cc +++ b/tracing/trace_state.cc @@ -55,7 +55,7 @@ namespace tracing { logging::logger trace_state_logger("trace_state"); struct trace_state::params_values { - std::optional> batchlog_endpoints; + std::optional batchlog_endpoints; std::optional user_timestamp; std::vector queries; std::optional cl; @@ -103,7 +103,7 @@ void trace_state::init_session_records( } -void trace_state::set_batchlog_endpoints(const std::unordered_set& val) { +void trace_state::set_batchlog_endpoints(const inet_address_vector_replica_set& val) { _params_ptr->batchlog_endpoints.emplace(val); } diff --git a/tracing/trace_state.hh b/tracing/trace_state.hh index 72e2f2b93b..ae37e80721 100644 --- a/tracing/trace_state.hh +++ b/tracing/trace_state.hh @@ -51,6 +51,7 @@ #include "db/consistency_level_type.hh" #include "types.hh" #include "timestamp.hh" +#include "inet_address_vectors.hh" namespace cql3{ class query_options; @@ -300,7 +301,7 @@ private: * * @param val the set of batchlog endpoints */ - void set_batchlog_endpoints(const std::unordered_set& val); + void set_batchlog_endpoints(const inet_address_vector_replica_set& val); /** * Stores a consistency level of a query being traced. @@ -490,7 +491,7 @@ private: friend void set_page_size(const trace_state_ptr& p, int32_t val); friend void set_request_size(const trace_state_ptr& p, size_t s) noexcept; friend void set_response_size(const trace_state_ptr& p, size_t s) noexcept; - friend void set_batchlog_endpoints(const trace_state_ptr& p, const std::unordered_set& val); + friend void set_batchlog_endpoints(const trace_state_ptr& p, const inet_address_vector_replica_set& val); friend void set_consistency_level(const trace_state_ptr& p, db::consistency_level val); friend void set_optional_serial_consistency_level(const trace_state_ptr& p, const std::optional&val); friend void add_query(const trace_state_ptr& p, sstring_view val); @@ -612,7 +613,7 @@ inline void set_response_size(const trace_state_ptr& p, size_t s) noexcept { } } -inline void set_batchlog_endpoints(const trace_state_ptr& p, const std::unordered_set& val) { +inline void set_batchlog_endpoints(const trace_state_ptr& p, const inet_address_vector_replica_set& val) { if (p) { p->set_batchlog_endpoints(val); }