From 2e802ca65061207308dd90bab38149f8cbdcc9b6 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 21 Nov 2019 15:15:00 +0100 Subject: [PATCH 1/5] hh: add HINT_MUTATION verb Introduce a new verb dedicated for receiving and sending hints: HINT_MUTATION. It is handled on the streaming connection, which is separate from the one used for handling mutations sent by coordinator during a write. The intent of using a separate connection is to increase fariness while handling hints and user requests - this way, a situation can be avoided in which one type of requests saturate the connection, negatively impacting the other one. --- message/messaging_service.cc | 14 ++++++++++++++ message/messaging_service.hh | 9 ++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index d48d4619d9..050c1094ce 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -480,6 +480,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM: case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM: case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM: + case messaging_verb::HINT_MUTATION: return 2; case messaging_verb::MUTATION_DONE: case messaging_verb::MUTATION_FAILED: @@ -1276,4 +1277,17 @@ future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info)); } +void messaging_service::register_hint_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, + inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional> trace_info)>&& func) { + register_handler(this, netw::messaging_verb::HINT_MUTATION, std::move(func)); +} +void messaging_service::unregister_hint_mutation() { + _rpc->unregister_handler(netw::messaging_verb::HINT_MUTATION); +} +future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, + inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { + return send_message_oneway_timeout(this, timeout, messaging_verb::HINT_MUTATION, std::move(id), fm, std::move(forward), + std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info)); +} + } // namespace net diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 6df9c577ac..70d76465c6 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -138,7 +138,8 @@ enum class messaging_verb : int32_t { PAXOS_PREPARE = 39, PAXOS_ACCEPT = 40, PAXOS_LEARN = 41, - LAST = 42, + HINT_MUTATION = 42, + LAST = 43, }; } // namespace netw @@ -490,6 +491,12 @@ public: std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info = std::nullopt); + void register_hint_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, + inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional> trace_info)>&& func); + void unregister_hint_mutation(); + future<> send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, + inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info = std::nullopt); + void foreach_server_connection_stats(std::function&& f) const; private: bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only); From 6198ee27353ad63c2005df822f7c4f1498646c6c Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Wed, 20 Nov 2019 18:02:53 +0100 Subject: [PATCH 2/5] hh: introduce HINTED_HANDOFF_SEPARATE_CONNECTION feature The feature introduced by this commit declares that hints can be sent using the new dedicated RPC verb. Before using the new verb, nodes need to know if other nodes in the cluster will be able to handle the new RPC verb. --- service/storage_service.cc | 6 +++++- service/storage_service.hh | 5 +++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index a360e5366d..001b680fb0 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -114,6 +114,7 @@ static const sstring DIGEST_INSENSITIVE_TO_EXPIRY = "DIGEST_INSENSITIVE_TO_EXPIR static const sstring COMPUTED_COLUMNS_FEATURE = "COMPUTED_COLUMNS"; static const sstring CDC_FEATURE = "CDC"; static const sstring NONFROZEN_UDTS_FEATURE = "NONFROZEN_UDTS"; +static const sstring HINTED_HANDOFF_SEPARATE_CONNECTION_FEATURE = "HINTED_HANDOFF_SEPARATE_CONNECTION_FEATURE"; static const sstring SSTABLE_FORMAT_PARAM_NAME = "sstable_format"; @@ -175,6 +176,7 @@ storage_service::storage_service(abort_source& abort_source, distributed storage_service::get_config_supported_features_set() { DIGEST_INSENSITIVE_TO_EXPIRY, COMPUTED_COLUMNS_FEATURE, NONFROZEN_UDTS_FEATURE, + HINTED_HANDOFF_SEPARATE_CONNECTION_FEATURE, }; // Do not respect config in the case database is not started diff --git a/service/storage_service.hh b/service/storage_service.hh index d7bec0b18d..7a69e2cebf 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -340,6 +340,7 @@ private: gms::feature _computed_columns; gms::feature _cdc_feature; gms::feature _nonfrozen_udts; + gms::feature _hinted_handoff_separate_connection; sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::ka; seastar::named_semaphore _feature_listeners_sem = {1, named_semaphore_exception_factory{"feature listeners"}}; @@ -2378,6 +2379,10 @@ public: return bool(_nonfrozen_udts); } + bool cluster_supports_hinted_handoff_separate_connection() { + return bool(_hinted_handoff_separate_connection); + } + // Returns schema features which all nodes in the cluster advertise as supported. db::schema_features cluster_schema_features() const; From 26090650905261548252077a86d27c566d22904e Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 21 Nov 2019 16:41:57 +0100 Subject: [PATCH 3/5] storage_proxy: move register_mutation handler to local lambda This refactor makes it possible to reuse the lambda in following commits. --- service/storage_proxy.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3c6be2407a..d0740bafcb 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4420,7 +4420,8 @@ void storage_proxy::init_messaging_service() { }); }; - ms.register_mutation([] (const rpc::client_info& cinfo, rpc::opt_time_point t, frozen_mutation in, std::vector forward, gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional> trace_info) { + auto receive_mutation_handler = [] (const rpc::client_info& cinfo, rpc::opt_time_point t, frozen_mutation in, std::vector forward, + gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional> trace_info) { tracing::trace_state_ptr trace_state_ptr; auto src_addr = netw::messaging_service::get_source(cinfo); @@ -4437,7 +4438,9 @@ void storage_proxy::init_messaging_service() { auto& ms = netw::get_local_messaging_service(); return ms.send_mutation(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info)); }); - }); + }; + ms.register_mutation(receive_mutation_handler); + ms.register_paxos_learn([] (const rpc::client_info& cinfo, rpc::opt_time_point t, paxos::proposal decision, std::vector forward, gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, std::optional trace_info) { From 77d2ceaeba9c96aa415afaf46603400727b0399c Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 21 Nov 2019 16:23:38 +0100 Subject: [PATCH 4/5] storage_proxy: handle hints through separate rpc verb --- db/hints/manager.cc | 2 +- service/storage_proxy.cc | 44 ++++++++++++++++++++++++++++++++++++++++ service/storage_proxy.hh | 6 ++++++ 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 9e8e602619..f6513b96fd 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -402,7 +402,7 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_m // to be generated as a result of hints sending. if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) { manager_logger.trace("Sending directly to {}", end_point_key()); - return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE, service::allow_hints::no); + return _proxy.send_hint_to_endpoint(std::move(m), end_point_key()); } else { manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key()); // FIXME: using 1h as infinite timeout. If a node is down, we should get an diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index d0740bafcb..f7cb80d93d 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -224,6 +224,7 @@ public: // same mutation for each destination class shared_mutation : public mutation_holder { +protected: lw_shared_ptr _mutation; public: explicit shared_mutation(frozen_mutation_and_schema&& fm_a_s) @@ -258,6 +259,28 @@ public: } }; +// shared mutation, but gets sent as a hint +class hint_mutation : public shared_mutation { +public: + using shared_mutation::shared_mutation; + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { + throw std::runtime_error("Attempted to store a hint for a hint"); + } + virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + return make_exception_future<>(std::runtime_error("Executing hint locally doesn't make sense")); + } + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + tracing::trace(tr_state, "Sending a hint to /{}", ep); + auto& ms = netw::get_local_messaging_service(); + return ms.send_hint_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, *_mutation, + std::move(forward), utils::fb_utilities::get_broadcast_address(), engine().cpu_id(), + response_id, tracing::make_trace_info(tr_state)); + } +}; + class cas_mutation : public mutation_holder { lw_shared_ptr _proposal; public: @@ -2230,6 +2253,26 @@ future<> storage_proxy::send_to_endpoint( allow_hints); } +future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target) { + if (!service::get_local_storage_service().cluster_supports_hinted_handoff_separate_connection()) { + return send_to_endpoint( + std::make_unique(std::move(fm_a_s)), + std::move(target), + { }, + db::write_type::SIMPLE, + _stats, + allow_hints::no); + } + + return send_to_endpoint( + std::make_unique(std::move(fm_a_s)), + std::move(target), + { }, + db::write_type::SIMPLE, + _stats, + allow_hints::no); +} + /** * Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node * is not available. @@ -4440,6 +4483,7 @@ void storage_proxy::init_messaging_service() { }); }; ms.register_mutation(receive_mutation_handler); + ms.register_hint_mutation(receive_mutation_handler); ms.register_paxos_learn([] (const rpc::client_info& cinfo, rpc::opt_time_point t, paxos::proposal decision, std::vector forward, gms::inet_address reply_to, unsigned shard, diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index f342788818..6705529d79 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -466,6 +466,11 @@ public: future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector pending_endpoints, db::write_type type, write_stats& stats, allow_hints allow_hints = allow_hints::yes); future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector pending_endpoints, db::write_type type, allow_hints allow_hints = allow_hints::yes); + // Send a mutation to a specific remote target as a hint. + // Unlike regular mutations during write operations, hints are sent on the streaming connection + // and use different RPC verb. + future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target); + /** * Performs the truncate operatoin, which effectively deletes all data from * the column family cfname @@ -541,6 +546,7 @@ public: friend class mutation_holder; friend class per_destination_mutation; friend class shared_mutation; + friend class hint_mutation; friend class cas_mutation; }; From adfa7d7b8d8416a078b9ed57072f410d67d70324 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Wed, 4 Dec 2019 14:16:47 +0100 Subject: [PATCH 5/5] messaging_service: don't move `unsigned` values in handlers Performing std::move on integral types is pointless. This commit gets rid of moves of values of `unsigned` type in rpc handlers. --- message/messaging_service.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 050c1094ce..159e85207b 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -995,7 +995,7 @@ void messaging_service::unregister_mutation() { future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::MUTATION, std::move(id), fm, std::move(forward), - std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info)); + std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); } void messaging_service::register_counter_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, std::vector fms, db::consistency_level cl, std::optional trace_info)>&& func) { @@ -1015,7 +1015,7 @@ void messaging_service::unregister_mutation_done() { _rpc->unregister_handler(netw::messaging_verb::MUTATION_DONE); } future<> messaging_service::send_mutation_done(msg_addr id, unsigned shard, response_id_type response_id, db::view::update_backlog backlog) { - return send_message_oneway(this, messaging_verb::MUTATION_DONE, std::move(id), std::move(shard), std::move(response_id), std::move(backlog)); + return send_message_oneway(this, messaging_verb::MUTATION_DONE, std::move(id), shard, std::move(response_id), std::move(backlog)); } void messaging_service::register_mutation_failed(std::function (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id, size_t num_failed, rpc::optional backlog)>&& func) { @@ -1025,7 +1025,7 @@ void messaging_service::unregister_mutation_failed() { _rpc->unregister_handler(netw::messaging_verb::MUTATION_FAILED); } future<> messaging_service::send_mutation_failed(msg_addr id, unsigned shard, response_id_type response_id, size_t num_failed, db::view::update_backlog backlog) { - return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), std::move(shard), std::move(response_id), num_failed, std::move(backlog)); + return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), shard, std::move(response_id), num_failed, std::move(backlog)); } void messaging_service::register_read_data(std::function>, cache_temperature>> (const rpc::client_info&, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional oda)>&& func) { @@ -1274,7 +1274,7 @@ future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_LEARN, std::move(id), decision, std::move(forward), - std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info)); + std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); } void messaging_service::register_hint_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, @@ -1287,7 +1287,7 @@ void messaging_service::unregister_hint_mutation() { future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::HINT_MUTATION, std::move(id), fm, std::move(forward), - std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info)); + std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); } } // namespace net