Merge "handle hints on separate connection and scheduling group" from Piotr
Introduce a new verb dedicated for receiving and sending hints: HINT_MUTATION. It is handled on the streaming connection, which is separate from the one used for handling mutations sent by coordinator during a write. The intent of using a separate connection is to increase fairness while handling hints and user requests - this way, a situation can be avoided in which one type of requests saturate the connection, negatively impacting the other one. Information about new RPC support is propagated through new gossip feature HINTED_HANDOFF_SEPARATE_CONNECTION. Fixes #4974. Tests: unit(release)
This commit is contained in:
@@ -402,7 +402,7 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_m
|
||||
// to be generated as a result of hints sending.
|
||||
if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) {
|
||||
manager_logger.trace("Sending directly to {}", end_point_key());
|
||||
return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE, service::allow_hints::no);
|
||||
return _proxy.send_hint_to_endpoint(std::move(m), end_point_key());
|
||||
} else {
|
||||
manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key());
|
||||
// FIXME: using 1h as infinite timeout. If a node is down, we should get an
|
||||
|
||||
@@ -480,6 +480,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM:
|
||||
case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM:
|
||||
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
|
||||
case messaging_verb::HINT_MUTATION:
|
||||
return 2;
|
||||
case messaging_verb::MUTATION_DONE:
|
||||
case messaging_verb::MUTATION_FAILED:
|
||||
@@ -994,7 +995,7 @@ void messaging_service::unregister_mutation() {
|
||||
future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector<inet_address> forward,
|
||||
inet_address reply_to, unsigned shard, response_id_type response_id, std::optional<tracing::trace_info> trace_info) {
|
||||
return send_message_oneway_timeout(this, timeout, messaging_verb::MUTATION, std::move(id), fm, std::move(forward),
|
||||
std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info));
|
||||
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
|
||||
}
|
||||
|
||||
void messaging_service::register_counter_mutation(std::function<future<> (const rpc::client_info&, rpc::opt_time_point, std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info)>&& func) {
|
||||
@@ -1014,7 +1015,7 @@ void messaging_service::unregister_mutation_done() {
|
||||
_rpc->unregister_handler(netw::messaging_verb::MUTATION_DONE);
|
||||
}
|
||||
future<> messaging_service::send_mutation_done(msg_addr id, unsigned shard, response_id_type response_id, db::view::update_backlog backlog) {
|
||||
return send_message_oneway(this, messaging_verb::MUTATION_DONE, std::move(id), std::move(shard), std::move(response_id), std::move(backlog));
|
||||
return send_message_oneway(this, messaging_verb::MUTATION_DONE, std::move(id), shard, std::move(response_id), std::move(backlog));
|
||||
}
|
||||
|
||||
void messaging_service::register_mutation_failed(std::function<future<rpc::no_wait_type> (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id, size_t num_failed, rpc::optional<db::view::update_backlog> backlog)>&& func) {
|
||||
@@ -1024,7 +1025,7 @@ void messaging_service::unregister_mutation_failed() {
|
||||
_rpc->unregister_handler(netw::messaging_verb::MUTATION_FAILED);
|
||||
}
|
||||
future<> messaging_service::send_mutation_failed(msg_addr id, unsigned shard, response_id_type response_id, size_t num_failed, db::view::update_backlog backlog) {
|
||||
return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), std::move(shard), std::move(response_id), num_failed, std::move(backlog));
|
||||
return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), shard, std::move(response_id), num_failed, std::move(backlog));
|
||||
}
|
||||
|
||||
void messaging_service::register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> (const rpc::client_info&, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda)>&& func) {
|
||||
@@ -1273,7 +1274,20 @@ future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point
|
||||
std::vector<inet_address> forward, inet_address reply_to, unsigned shard, response_id_type response_id,
|
||||
std::optional<tracing::trace_info> trace_info) {
|
||||
return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_LEARN, std::move(id), decision, std::move(forward),
|
||||
std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info));
|
||||
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
|
||||
}
|
||||
|
||||
void messaging_service::register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
||||
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func) {
|
||||
register_handler(this, netw::messaging_verb::HINT_MUTATION, std::move(func));
|
||||
}
|
||||
void messaging_service::unregister_hint_mutation() {
|
||||
_rpc->unregister_handler(netw::messaging_verb::HINT_MUTATION);
|
||||
}
|
||||
future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector<inet_address> forward,
|
||||
inet_address reply_to, unsigned shard, response_id_type response_id, std::optional<tracing::trace_info> trace_info) {
|
||||
return send_message_oneway_timeout(this, timeout, messaging_verb::HINT_MUTATION, std::move(id), fm, std::move(forward),
|
||||
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
|
||||
}
|
||||
|
||||
} // namespace net
|
||||
|
||||
@@ -138,7 +138,8 @@ enum class messaging_verb : int32_t {
|
||||
PAXOS_PREPARE = 39,
|
||||
PAXOS_ACCEPT = 40,
|
||||
PAXOS_LEARN = 41,
|
||||
LAST = 42,
|
||||
HINT_MUTATION = 42,
|
||||
LAST = 43,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
@@ -490,6 +491,12 @@ public:
|
||||
std::vector<inet_address> forward, inet_address reply_to, unsigned shard, response_id_type response_id,
|
||||
std::optional<tracing::trace_info> trace_info = std::nullopt);
|
||||
|
||||
void register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
||||
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func);
|
||||
void unregister_hint_mutation();
|
||||
future<> send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector<inet_address> forward,
|
||||
inet_address reply_to, unsigned shard, response_id_type response_id, std::optional<tracing::trace_info> trace_info = std::nullopt);
|
||||
|
||||
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
|
||||
private:
|
||||
bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only);
|
||||
|
||||
@@ -224,6 +224,7 @@ public:
|
||||
|
||||
// same mutation for each destination
|
||||
class shared_mutation : public mutation_holder {
|
||||
protected:
|
||||
lw_shared_ptr<const frozen_mutation> _mutation;
|
||||
public:
|
||||
explicit shared_mutation(frozen_mutation_and_schema&& fm_a_s)
|
||||
@@ -258,6 +259,28 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// shared mutation, but gets sent as a hint
|
||||
class hint_mutation : public shared_mutation {
|
||||
public:
|
||||
using shared_mutation::shared_mutation;
|
||||
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override {
|
||||
throw std::runtime_error("Attempted to store a hint for a hint");
|
||||
}
|
||||
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state) override {
|
||||
return make_exception_future<>(std::runtime_error("Executing hint locally doesn't make sense"));
|
||||
}
|
||||
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
|
||||
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state) override {
|
||||
tracing::trace(tr_state, "Sending a hint to /{}", ep);
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
return ms.send_hint_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, *_mutation,
|
||||
std::move(forward), utils::fb_utilities::get_broadcast_address(), engine().cpu_id(),
|
||||
response_id, tracing::make_trace_info(tr_state));
|
||||
}
|
||||
};
|
||||
|
||||
class cas_mutation : public mutation_holder {
|
||||
lw_shared_ptr<paxos::proposal> _proposal;
|
||||
public:
|
||||
@@ -2230,6 +2253,26 @@ future<> storage_proxy::send_to_endpoint(
|
||||
allow_hints);
|
||||
}
|
||||
|
||||
future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target) {
|
||||
if (!service::get_local_storage_service().cluster_supports_hinted_handoff_separate_connection()) {
|
||||
return send_to_endpoint(
|
||||
std::make_unique<shared_mutation>(std::move(fm_a_s)),
|
||||
std::move(target),
|
||||
{ },
|
||||
db::write_type::SIMPLE,
|
||||
_stats,
|
||||
allow_hints::no);
|
||||
}
|
||||
|
||||
return send_to_endpoint(
|
||||
std::make_unique<hint_mutation>(std::move(fm_a_s)),
|
||||
std::move(target),
|
||||
{ },
|
||||
db::write_type::SIMPLE,
|
||||
_stats,
|
||||
allow_hints::no);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
|
||||
* is not available.
|
||||
@@ -4420,7 +4463,8 @@ void storage_proxy::init_messaging_service() {
|
||||
});
|
||||
};
|
||||
|
||||
ms.register_mutation([] (const rpc::client_info& cinfo, rpc::opt_time_point t, frozen_mutation in, std::vector<gms::inet_address> forward, gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info) {
|
||||
auto receive_mutation_handler = [] (const rpc::client_info& cinfo, rpc::opt_time_point t, frozen_mutation in, std::vector<gms::inet_address> forward,
|
||||
gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info) {
|
||||
tracing::trace_state_ptr trace_state_ptr;
|
||||
auto src_addr = netw::messaging_service::get_source(cinfo);
|
||||
|
||||
@@ -4437,7 +4481,10 @@ void storage_proxy::init_messaging_service() {
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
return ms.send_mutation(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info));
|
||||
});
|
||||
});
|
||||
};
|
||||
ms.register_mutation(receive_mutation_handler);
|
||||
ms.register_hint_mutation(receive_mutation_handler);
|
||||
|
||||
ms.register_paxos_learn([] (const rpc::client_info& cinfo, rpc::opt_time_point t, paxos::proposal decision,
|
||||
std::vector<gms::inet_address> forward, gms::inet_address reply_to, unsigned shard,
|
||||
storage_proxy::response_id_type response_id, std::optional<tracing::trace_info> trace_info) {
|
||||
|
||||
@@ -466,6 +466,11 @@ public:
|
||||
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type, write_stats& stats, allow_hints allow_hints = allow_hints::yes);
|
||||
future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector<gms::inet_address> pending_endpoints, db::write_type type, allow_hints allow_hints = allow_hints::yes);
|
||||
|
||||
// Send a mutation to a specific remote target as a hint.
|
||||
// Unlike regular mutations during write operations, hints are sent on the streaming connection
|
||||
// and use different RPC verb.
|
||||
future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target);
|
||||
|
||||
/**
|
||||
* Performs the truncate operatoin, which effectively deletes all data from
|
||||
* the column family cfname
|
||||
@@ -541,6 +546,7 @@ public:
|
||||
friend class mutation_holder;
|
||||
friend class per_destination_mutation;
|
||||
friend class shared_mutation;
|
||||
friend class hint_mutation;
|
||||
friend class cas_mutation;
|
||||
};
|
||||
|
||||
|
||||
@@ -114,6 +114,7 @@ static const sstring DIGEST_INSENSITIVE_TO_EXPIRY = "DIGEST_INSENSITIVE_TO_EXPIR
|
||||
static const sstring COMPUTED_COLUMNS_FEATURE = "COMPUTED_COLUMNS";
|
||||
static const sstring CDC_FEATURE = "CDC";
|
||||
static const sstring NONFROZEN_UDTS_FEATURE = "NONFROZEN_UDTS";
|
||||
static const sstring HINTED_HANDOFF_SEPARATE_CONNECTION_FEATURE = "HINTED_HANDOFF_SEPARATE_CONNECTION_FEATURE";
|
||||
|
||||
static const sstring SSTABLE_FORMAT_PARAM_NAME = "sstable_format";
|
||||
|
||||
@@ -175,6 +176,7 @@ storage_service::storage_service(abort_source& abort_source, distributed<databas
|
||||
, _computed_columns(_feature_service, COMPUTED_COLUMNS_FEATURE)
|
||||
, _cdc_feature(_feature_service, CDC_FEATURE)
|
||||
, _nonfrozen_udts(_feature_service, NONFROZEN_UDTS_FEATURE)
|
||||
, _hinted_handoff_separate_connection(_feature_service, HINTED_HANDOFF_SEPARATE_CONNECTION_FEATURE)
|
||||
, _la_feature_listener(*this, _feature_listeners_sem, sstables::sstable_version_types::la)
|
||||
, _mc_feature_listener(*this, _feature_listeners_sem, sstables::sstable_version_types::mc)
|
||||
, _replicate_action([this] { return do_replicate_to_all_cores(); })
|
||||
@@ -234,7 +236,8 @@ void storage_service::enable_all_features() {
|
||||
std::ref(_digest_insensitive_to_expiry),
|
||||
std::ref(_computed_columns),
|
||||
std::ref(_cdc_feature),
|
||||
std::ref(_nonfrozen_udts)
|
||||
std::ref(_nonfrozen_udts),
|
||||
std::ref(_hinted_handoff_separate_connection)
|
||||
})
|
||||
{
|
||||
if (features.count(f.name())) {
|
||||
@@ -341,6 +344,7 @@ std::set<sstring> storage_service::get_config_supported_features_set() {
|
||||
DIGEST_INSENSITIVE_TO_EXPIRY,
|
||||
COMPUTED_COLUMNS_FEATURE,
|
||||
NONFROZEN_UDTS_FEATURE,
|
||||
HINTED_HANDOFF_SEPARATE_CONNECTION_FEATURE,
|
||||
};
|
||||
|
||||
// Do not respect config in the case database is not started
|
||||
|
||||
@@ -340,6 +340,7 @@ private:
|
||||
gms::feature _computed_columns;
|
||||
gms::feature _cdc_feature;
|
||||
gms::feature _nonfrozen_udts;
|
||||
gms::feature _hinted_handoff_separate_connection;
|
||||
|
||||
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::ka;
|
||||
seastar::named_semaphore _feature_listeners_sem = {1, named_semaphore_exception_factory{"feature listeners"}};
|
||||
@@ -2378,6 +2379,10 @@ public:
|
||||
return bool(_nonfrozen_udts);
|
||||
}
|
||||
|
||||
bool cluster_supports_hinted_handoff_separate_connection() {
|
||||
return bool(_hinted_handoff_separate_connection);
|
||||
}
|
||||
|
||||
// Returns schema features which all nodes in the cluster advertise as supported.
|
||||
db::schema_features cluster_schema_features() const;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user