storage_proxy: change unordered_set<inet_address> to small_vector in write path

The write paths in storage_proxy pass replica sets as
std::unordered_set<gms::inet_address>. This is a complex type, with
N+1 allocations for N members, so we change it to a small_vector (via
inet_address_vector_replica_set) which requires just one allocation, and
even zero when up to three replicas are used.

This change is more nuanced than the corresponding change to the read path
abe3d7d7 ("Merge 'storage_proxy: use small_vector for vectors of
inet_address' from Avi Kivity"), for two reasons:

 - there is a quadratic algorithm in
   abstract_write_response_handler::response(): it searches for a replica
   and erases it. Since this happens for every replica, it happens N^2/2
   times.
 - replica sets for writes always include all datacenters, while reads
   usually involve just one datacenter.

So, a write to a keyspace that has 5 datacenters will invoke 15*(15-1)/2
=105 compares.

We could remove this by sending the index of the replica in the replica
set to the replica and ask it to include the index in the response, but
I think that this is unnecessary. Those 105 compares need to be only
105/15 = 7 times cheaper than the corresponding unordered_set operation,
which they surely will. Handling a response after a cross-datacenter round
trip surely involves L3 cache misses, and a small_vector reduces these
to a minimum compared to an unordered_set with its bucket table, linked
list walking and managent, and table rehashing.

Tests using perf_simple_query --write --smp 1 --operations-per-shard 1000000
 --task-quota-ms show two allocations removed (as expected) and a nice
reduction in instructions executed.

before: median 204842.54 tps ( 54.2 allocs/op,  13.2 tasks/op,   49890 insns/op)
after:  median 206077.65 tps ( 52.2 allocs/op,  13.2 tasks/op,   49138 insns/op)

Closes #8847
This commit is contained in:
Avi Kivity
2021-06-12 11:09:54 +03:00
committed by Nadav Har'El
parent 98cdeaf0f2
commit 4d70f3baee
10 changed files with 47 additions and 40 deletions

View File

@@ -341,10 +341,10 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
});
}
std::unordered_set<gms::inet_address> db::batchlog_manager::endpoint_filter(const sstring& local_rack, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>& endpoints) {
inet_address_vector_replica_set db::batchlog_manager::endpoint_filter(const sstring& local_rack, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>& endpoints) {
// special case for single-node data centers
if (endpoints.size() == 1 && endpoints.begin()->second.size() == 1) {
return endpoints.begin()->second;
return boost::copy_range<inet_address_vector_replica_set>(endpoints.begin()->second);
}
// strip out dead endpoints and localhost
@@ -364,7 +364,7 @@ std::unordered_set<gms::inet_address> db::batchlog_manager::endpoint_filter(cons
}
}
typedef std::unordered_set<gms::inet_address> return_type;
typedef inet_address_vector_replica_set return_type;
if (validated.size() <= 2) {
return boost::copy_range<return_type>(validated | boost::adaptors::map_values);
@@ -395,13 +395,13 @@ std::unordered_set<gms::inet_address> db::batchlog_manager::endpoint_filter(cons
racks.resize(2);
}
std::unordered_set<gms::inet_address> result;
inet_address_vector_replica_set result;
// grab a random member of up to two racks
for (auto& rack : racks) {
auto cpy = boost::copy_range<std::vector<gms::inet_address>>(validated.equal_range(rack) | boost::adaptors::map_values);
std::uniform_int_distribution<size_t> rdist(0, cpy.size() - 1);
result.emplace(cpy[rdist(_e1)]);
result.emplace_back(cpy[rdist(_e1)]);
}
return result;

View File

@@ -49,6 +49,7 @@
#include <seastar/core/metrics_registration.hh>
#include "gms/inet_address.hh"
#include "inet_address_vectors.hh"
#include "db_clock.hh"
#include "mutation.hh"
#include "utils/UUID.hh"
@@ -117,7 +118,7 @@ public:
mutation get_batch_log_mutation_for(const std::vector<mutation>&, const utils::UUID&, int32_t, db_clock::time_point);
db_clock::duration get_batch_log_timeout() const;
std::unordered_set<gms::inet_address> endpoint_filter(const sstring&, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>&);
inet_address_vector_replica_set endpoint_filter(const sstring&, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>&);
};
extern distributed<batchlog_manager> _the_batchlog_manager;

View File

@@ -268,7 +268,7 @@ void assure_sufficient_live_nodes(
}
template void assure_sufficient_live_nodes(consistency_level, keyspace&, const inet_address_vector_replica_set&, const std::array<gms::inet_address, 0>&);
template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const std::unordered_set<gms::inet_address>&, const utils::small_vector<gms::inet_address, 1ul>&);
template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const inet_address_vector_replica_set&, const utils::small_vector<gms::inet_address, 1ul>&);
inet_address_vector_replica_set
filter_for_query(consistency_level cl,

View File

@@ -109,6 +109,6 @@ void assure_sufficient_live_nodes(
const PendingRange& pending_endpoints = std::array<gms::inet_address, 0>());
extern template void assure_sufficient_live_nodes(consistency_level, keyspace&, const inet_address_vector_replica_set&, const std::array<gms::inet_address, 0>&);
extern template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const std::unordered_set<gms::inet_address>&, const utils::small_vector<gms::inet_address, 1ul>&);
extern template void assure_sufficient_live_nodes(db::consistency_level, keyspace&, const inet_address_vector_replica_set&, const utils::small_vector<gms::inet_address, 1ul>&);
}

View File

@@ -59,9 +59,9 @@ void prepare_summary::update_most_recent_promised_ballot(utils::UUID ballot) {
}
}
std::unordered_set<gms::inet_address>
inet_address_vector_replica_set
prepare_summary::replicas_missing_most_recent_commit(schema_ptr s, std::chrono::seconds now_in_sec) const {
std::unordered_set<gms::inet_address> replicas;
inet_address_vector_replica_set replicas;
// In general, we need every replica that has answered to the prepare (a quorum) to agree on the MRC (see
// comment in storage_proxy::begin_and_repair_paxos(), but basically we need to make sure at least a quorum of nodes
// have learned a commit before committing a new one, otherwise that previous commit is not guaranteed to have reached a
@@ -79,7 +79,7 @@ prepare_summary::replicas_missing_most_recent_commit(schema_ptr s, std::chrono::
for (const auto& it: committed_ballots_by_replica) {
if (it.second != most_recent_commit->ballot) {
replicas.insert(it.first);
replicas.push_back(it.first);
}
}
return replicas;

View File

@@ -43,6 +43,7 @@
#pragma once
#include "utils/UUID_gen.hh"
#include "service/paxos/proposal.hh"
#include "inet_address_vectors.hh"
namespace service {
@@ -83,7 +84,7 @@ public:
public:
prepare_summary(size_t node_count);
std::unordered_set<gms::inet_address> replicas_missing_most_recent_commit(schema_ptr s, std::chrono::seconds now_in_sec) const;
inet_address_vector_replica_set replicas_missing_most_recent_commit(schema_ptr s, std::chrono::seconds now_in_sec) const;
void update_most_recent_promised_ballot(utils::UUID ballot);
};

View File

@@ -372,7 +372,7 @@ protected:
size_t _total_block_for = 0;
db::write_type _type;
std::unique_ptr<mutation_holder> _mutation_holder;
std::unordered_set<gms::inet_address> _targets; // who we sent this mutation to
inet_address_vector_replica_set _targets; // who we sent this mutation to
// added dead_endpoints as a memeber here as well. This to be able to carry the info across
// calls in helper methods in a convinient way. Since we hope this will be empty most of the time
// it should not be a huge burden. (flw)
@@ -400,7 +400,7 @@ protected:
public:
abstract_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets, tracing::trace_state_ptr trace_state,
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets, tracing::trace_state_ptr trace_state,
storage_proxy::write_stats& stats, service_permit permit, size_t pending_endpoints = 0, inet_address_vector_topology_change dead_endpoints = {})
: _id(p->get_next_response_id()), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)),
_dead_endpoints(std::move(dead_endpoints)), _stats(stats), _expire_timer([this] { timeout_cb(); }), _permit(std::move(permit)) {
@@ -481,10 +481,12 @@ public:
}
// return true on last ack
bool response(gms::inet_address from) {
auto it = _targets.find(from);
auto it = boost::find(_targets, from);
if (it != _targets.end()) {
signal(from);
_targets.erase(it);
using std::swap;
swap(*it, _targets.back());
_targets.pop_back();
} else {
slogger.warn("Receive outdated write ack from {}", from);
}
@@ -493,7 +495,7 @@ public:
// return true if handler is no longer needed because
// CL cannot be reached
bool failure_response(gms::inet_address from, size_t count, error err) {
if (!_targets.contains(from)) {
if (boost::find(_targets, from) == _targets.end()) {
// There is a little change we can get outdated reply
// if the coordinator was restarted after sending a request and
// getting reply back. The chance is low though since initial
@@ -592,7 +594,7 @@ public:
future<> wait() {
return _ready.get_future();
}
const std::unordered_set<gms::inet_address>& get_targets() const {
const inet_address_vector_replica_set& get_targets() const {
return _targets;
}
const inet_address_vector_topology_change& get_dead_endpoints() const {
@@ -637,7 +639,7 @@ class datacenter_write_response_handler : public abstract_write_response_handler
public:
datacenter_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
@@ -652,7 +654,7 @@ class write_response_handler : public abstract_write_response_handler {
}
public:
write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
@@ -664,7 +666,7 @@ public:
class view_update_write_response_handler : public write_response_handler, public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
public:
view_update_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit):
write_response_handler(p, ks, cl, db::write_type::VIEW, std::move(mh),
@@ -744,7 +746,7 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
}
public:
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets, const inet_address_vector_topology_change& pending_endpoints,
std::unique_ptr<mutation_holder> mh, inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints,
inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, std::move(permit), 0, dead_endpoints) {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
@@ -884,11 +886,11 @@ paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& conte
// those nodes, and retry.
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
std::unordered_set<gms::inet_address> missing_mrc = summary.replicas_missing_most_recent_commit(_schema, now_in_sec);
inet_address_vector_replica_set missing_mrc = summary.replicas_missing_most_recent_commit(_schema, now_in_sec);
if (missing_mrc.size() > 0) {
paxos::paxos_state::logger.debug("CAS[{}] Repairing replicas that missed the most recent commit", _id);
tracing::trace(tr_state, "Repairing replicas that missed the most recent commit");
std::array<std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>, 1>
std::array<std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, inet_address_vector_replica_set>, 1>
m{std::make_tuple(make_lw_shared<paxos::proposal>(std::move(*summary.most_recent_commit)), _schema, _key.token(), std::move(missing_mrc))};
// create_write_response_handler is overloaded for paxos::proposal and will
// create cas_mutation holder, which consequently will ensure paxos::learn is
@@ -1422,7 +1424,7 @@ future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_
}
storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
std::unordered_set<gms::inet_address> targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
inet_address_vector_replica_set targets, const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats, service_permit permit)
{
shared_ptr<abstract_write_response_handler> h;
@@ -1953,11 +1955,11 @@ storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::tok
}
// filter live endpoints from dead ones
std::unordered_set<gms::inet_address> live_endpoints;
inet_address_vector_replica_set live_endpoints;
inet_address_vector_topology_change dead_endpoints;
live_endpoints.reserve(all.size());
dead_endpoints.reserve(all.size());
std::partition_copy(all.begin(), all.end(), std::inserter(live_endpoints, live_endpoints.begin()),
std::partition_copy(all.begin(), all.end(), std::back_inserter(live_endpoints),
std::back_inserter(dead_endpoints), std::bind1st(std::mem_fn(&gms::gossiper::is_alive), &gms::get_local_gossiper()));
slogger.trace("creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints);
@@ -1990,7 +1992,8 @@ storage_proxy::create_write_response_handler(const hint_wrapper& h, db::consiste
storage_proxy::response_id_type
storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>& m, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
std::unordered_set<gms::inet_address> endpoints(m.size());
inet_address_vector_replica_set endpoints;
endpoints.reserve(m.size());
boost::copy(m | boost::adaptors::map_keys, std::inserter(endpoints, endpoints.begin()));
auto mh = std::make_unique<per_destination_mutation>(m);
@@ -2013,7 +2016,7 @@ storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxo
}
storage_proxy::response_id_type
storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, inet_address_vector_replica_set>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
auto& [commit, s, token, endpoints] = meta;
@@ -2404,7 +2407,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
service_permit _permit;
const utils::UUID _batch_uuid;
const std::unordered_set<gms::inet_address> _batchlog_endpoints;
const inet_address_vector_replica_set _batchlog_endpoints;
public:
context(storage_proxy & p, std::vector<mutation>&& mutations, lw_shared_ptr<cdc::operation_result_tracker>&& cdc_tracker, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit)
@@ -2419,7 +2422,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
, _permit(std::move(permit))
, _batch_uuid(utils::UUID_gen::get_time_UUID())
, _batchlog_endpoints(
[this]() -> std::unordered_set<gms::inet_address> {
[this]() -> inet_address_vector_replica_set {
auto local_addr = utils::fb_utilities::get_broadcast_address();
auto& topology = _tmptr->get_topology();
auto& local_endpoints = topology.get_datacenter_racks().at(get_local_dc());
@@ -2532,7 +2535,7 @@ future<> storage_proxy::send_to_endpoint(
std::unique_ptr<mutation_holder>& m,
db::consistency_level cl,
db::write_type type, service_permit permit) mutable {
std::unordered_set<gms::inet_address> targets;
inet_address_vector_replica_set targets;
targets.reserve(pending_endpoints.size() + 1);
inet_address_vector_topology_change dead_endpoints;
boost::algorithm::partition_copy(
@@ -5441,7 +5444,8 @@ void storage_proxy::retire_view_response_handlers(noncopyable_function<bool(cons
void storage_proxy::on_down(const gms::inet_address& endpoint) {
return retire_view_response_handlers([endpoint] (const abstract_write_response_handler& handler) {
return handler.get_targets().contains(endpoint);
const auto& targets = handler.get_targets();
return boost::find(targets, endpoint) != targets.end();
});
};

View File

@@ -329,14 +329,14 @@ private:
response_id_type create_write_response_handler_helper(schema_ptr s, const dht::token& token,
std::unique_ptr<mutation_holder> mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state,
service_permit permit);
response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, std::unordered_set<gms::inet_address> targets,
response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit);
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
response_id_type create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, inet_address_vector_replica_set>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker);
void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout);

View File

@@ -55,7 +55,7 @@ namespace tracing {
logging::logger trace_state_logger("trace_state");
struct trace_state::params_values {
std::optional<std::unordered_set<gms::inet_address>> batchlog_endpoints;
std::optional<inet_address_vector_replica_set> batchlog_endpoints;
std::optional<api::timestamp_type> user_timestamp;
std::vector<sstring> queries;
std::optional<db::consistency_level> cl;
@@ -103,7 +103,7 @@ void trace_state::init_session_records(
}
void trace_state::set_batchlog_endpoints(const std::unordered_set<gms::inet_address>& val) {
void trace_state::set_batchlog_endpoints(const inet_address_vector_replica_set& val) {
_params_ptr->batchlog_endpoints.emplace(val);
}

View File

@@ -51,6 +51,7 @@
#include "db/consistency_level_type.hh"
#include "types.hh"
#include "timestamp.hh"
#include "inet_address_vectors.hh"
namespace cql3{
class query_options;
@@ -300,7 +301,7 @@ private:
*
* @param val the set of batchlog endpoints
*/
void set_batchlog_endpoints(const std::unordered_set<gms::inet_address>& val);
void set_batchlog_endpoints(const inet_address_vector_replica_set& val);
/**
* Stores a consistency level of a query being traced.
@@ -490,7 +491,7 @@ private:
friend void set_page_size(const trace_state_ptr& p, int32_t val);
friend void set_request_size(const trace_state_ptr& p, size_t s) noexcept;
friend void set_response_size(const trace_state_ptr& p, size_t s) noexcept;
friend void set_batchlog_endpoints(const trace_state_ptr& p, const std::unordered_set<gms::inet_address>& val);
friend void set_batchlog_endpoints(const trace_state_ptr& p, const inet_address_vector_replica_set& val);
friend void set_consistency_level(const trace_state_ptr& p, db::consistency_level val);
friend void set_optional_serial_consistency_level(const trace_state_ptr& p, const std::optional<db::consistency_level>&val);
friend void add_query(const trace_state_ptr& p, sstring_view val);
@@ -612,7 +613,7 @@ inline void set_response_size(const trace_state_ptr& p, size_t s) noexcept {
}
}
inline void set_batchlog_endpoints(const trace_state_ptr& p, const std::unordered_set<gms::inet_address>& val) {
inline void set_batchlog_endpoints(const trace_state_ptr& p, const inet_address_vector_replica_set& val) {
if (p) {
p->set_batchlog_endpoints(val);
}