diff --git a/db/hints/manager.cc b/db/hints/manager.cc index 9e8e602619..f6513b96fd 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -402,7 +402,7 @@ future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_m // to be generated as a result of hints sending. if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) { manager_logger.trace("Sending directly to {}", end_point_key()); - return _proxy.send_to_endpoint(std::move(m), end_point_key(), { }, write_type::SIMPLE, service::allow_hints::no); + return _proxy.send_hint_to_endpoint(std::move(m), end_point_key()); } else { manager_logger.trace("Endpoints set has changed and {} is no longer a replica. Mutating from scratch...", end_point_key()); // FIXME: using 1h as infinite timeout. If a node is down, we should get an diff --git a/message/messaging_service.cc b/message/messaging_service.cc index d48d4619d9..159e85207b 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -480,6 +480,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM: case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM: case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM: + case messaging_verb::HINT_MUTATION: return 2; case messaging_verb::MUTATION_DONE: case messaging_verb::MUTATION_FAILED: @@ -994,7 +995,7 @@ void messaging_service::unregister_mutation() { future<> messaging_service::send_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::MUTATION, std::move(id), fm, std::move(forward), - std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info)); + std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); } void messaging_service::register_counter_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, std::vector fms, db::consistency_level cl, std::optional trace_info)>&& func) { @@ -1014,7 +1015,7 @@ void messaging_service::unregister_mutation_done() { _rpc->unregister_handler(netw::messaging_verb::MUTATION_DONE); } future<> messaging_service::send_mutation_done(msg_addr id, unsigned shard, response_id_type response_id, db::view::update_backlog backlog) { - return send_message_oneway(this, messaging_verb::MUTATION_DONE, std::move(id), std::move(shard), std::move(response_id), std::move(backlog)); + return send_message_oneway(this, messaging_verb::MUTATION_DONE, std::move(id), shard, std::move(response_id), std::move(backlog)); } void messaging_service::register_mutation_failed(std::function (const rpc::client_info& cinfo, unsigned shard, response_id_type response_id, size_t num_failed, rpc::optional backlog)>&& func) { @@ -1024,7 +1025,7 @@ void messaging_service::unregister_mutation_failed() { _rpc->unregister_handler(netw::messaging_verb::MUTATION_FAILED); } future<> messaging_service::send_mutation_failed(msg_addr id, unsigned shard, response_id_type response_id, size_t num_failed, db::view::update_backlog backlog) { - return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), std::move(shard), std::move(response_id), num_failed, std::move(backlog)); + return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), shard, std::move(response_id), num_failed, std::move(backlog)); } void messaging_service::register_read_data(std::function>, cache_temperature>> (const rpc::client_info&, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional oda)>&& func) { @@ -1273,7 +1274,20 @@ future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_LEARN, std::move(id), decision, std::move(forward), - std::move(reply_to), std::move(shard), std::move(response_id), std::move(trace_info)); + std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); +} + +void messaging_service::register_hint_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, + inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional> trace_info)>&& func) { + register_handler(this, netw::messaging_verb::HINT_MUTATION, std::move(func)); +} +void messaging_service::unregister_hint_mutation() { + _rpc->unregister_handler(netw::messaging_verb::HINT_MUTATION); +} +future<> messaging_service::send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, + inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info) { + return send_message_oneway_timeout(this, timeout, messaging_verb::HINT_MUTATION, std::move(id), fm, std::move(forward), + std::move(reply_to), shard, std::move(response_id), std::move(trace_info)); } } // namespace net diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 6df9c577ac..70d76465c6 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -138,7 +138,8 @@ enum class messaging_verb : int32_t { PAXOS_PREPARE = 39, PAXOS_ACCEPT = 40, PAXOS_LEARN = 41, - LAST = 42, + HINT_MUTATION = 42, + LAST = 43, }; } // namespace netw @@ -490,6 +491,12 @@ public: std::vector forward, inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info = std::nullopt); + void register_hint_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, + inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional> trace_info)>&& func); + void unregister_hint_mutation(); + future<> send_hint_mutation(msg_addr id, clock_type::time_point timeout, const frozen_mutation& fm, std::vector forward, + inet_address reply_to, unsigned shard, response_id_type response_id, std::optional trace_info = std::nullopt); + void foreach_server_connection_stats(std::function&& f) const; private: bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3c6be2407a..f7cb80d93d 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -224,6 +224,7 @@ public: // same mutation for each destination class shared_mutation : public mutation_holder { +protected: lw_shared_ptr _mutation; public: explicit shared_mutation(frozen_mutation_and_schema&& fm_a_s) @@ -258,6 +259,28 @@ public: } }; +// shared mutation, but gets sent as a hint +class hint_mutation : public shared_mutation { +public: + using shared_mutation::shared_mutation; + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { + throw std::runtime_error("Attempted to store a hint for a hint"); + } + virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + return make_exception_future<>(std::runtime_error("Executing hint locally doesn't make sense")); + } + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + tracing::trace(tr_state, "Sending a hint to /{}", ep); + auto& ms = netw::get_local_messaging_service(); + return ms.send_hint_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, *_mutation, + std::move(forward), utils::fb_utilities::get_broadcast_address(), engine().cpu_id(), + response_id, tracing::make_trace_info(tr_state)); + } +}; + class cas_mutation : public mutation_holder { lw_shared_ptr _proposal; public: @@ -2230,6 +2253,26 @@ future<> storage_proxy::send_to_endpoint( allow_hints); } +future<> storage_proxy::send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target) { + if (!service::get_local_storage_service().cluster_supports_hinted_handoff_separate_connection()) { + return send_to_endpoint( + std::make_unique(std::move(fm_a_s)), + std::move(target), + { }, + db::write_type::SIMPLE, + _stats, + allow_hints::no); + } + + return send_to_endpoint( + std::make_unique(std::move(fm_a_s)), + std::move(target), + { }, + db::write_type::SIMPLE, + _stats, + allow_hints::no); +} + /** * Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node * is not available. @@ -4420,7 +4463,8 @@ void storage_proxy::init_messaging_service() { }); }; - ms.register_mutation([] (const rpc::client_info& cinfo, rpc::opt_time_point t, frozen_mutation in, std::vector forward, gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional> trace_info) { + auto receive_mutation_handler = [] (const rpc::client_info& cinfo, rpc::opt_time_point t, frozen_mutation in, std::vector forward, + gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional> trace_info) { tracing::trace_state_ptr trace_state_ptr; auto src_addr = netw::messaging_service::get_source(cinfo); @@ -4437,7 +4481,10 @@ void storage_proxy::init_messaging_service() { auto& ms = netw::get_local_messaging_service(); return ms.send_mutation(addr, timeout, m, {}, reply_to, shard, response_id, std::move(trace_info)); }); - }); + }; + ms.register_mutation(receive_mutation_handler); + ms.register_hint_mutation(receive_mutation_handler); + ms.register_paxos_learn([] (const rpc::client_info& cinfo, rpc::opt_time_point t, paxos::proposal decision, std::vector forward, gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, std::optional trace_info) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index f342788818..6705529d79 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -466,6 +466,11 @@ public: future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector pending_endpoints, db::write_type type, write_stats& stats, allow_hints allow_hints = allow_hints::yes); future<> send_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target, std::vector pending_endpoints, db::write_type type, allow_hints allow_hints = allow_hints::yes); + // Send a mutation to a specific remote target as a hint. + // Unlike regular mutations during write operations, hints are sent on the streaming connection + // and use different RPC verb. + future<> send_hint_to_endpoint(frozen_mutation_and_schema fm_a_s, gms::inet_address target); + /** * Performs the truncate operatoin, which effectively deletes all data from * the column family cfname @@ -541,6 +546,7 @@ public: friend class mutation_holder; friend class per_destination_mutation; friend class shared_mutation; + friend class hint_mutation; friend class cas_mutation; }; diff --git a/service/storage_service.cc b/service/storage_service.cc index a360e5366d..001b680fb0 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -114,6 +114,7 @@ static const sstring DIGEST_INSENSITIVE_TO_EXPIRY = "DIGEST_INSENSITIVE_TO_EXPIR static const sstring COMPUTED_COLUMNS_FEATURE = "COMPUTED_COLUMNS"; static const sstring CDC_FEATURE = "CDC"; static const sstring NONFROZEN_UDTS_FEATURE = "NONFROZEN_UDTS"; +static const sstring HINTED_HANDOFF_SEPARATE_CONNECTION_FEATURE = "HINTED_HANDOFF_SEPARATE_CONNECTION_FEATURE"; static const sstring SSTABLE_FORMAT_PARAM_NAME = "sstable_format"; @@ -175,6 +176,7 @@ storage_service::storage_service(abort_source& abort_source, distributed storage_service::get_config_supported_features_set() { DIGEST_INSENSITIVE_TO_EXPIRY, COMPUTED_COLUMNS_FEATURE, NONFROZEN_UDTS_FEATURE, + HINTED_HANDOFF_SEPARATE_CONNECTION_FEATURE, }; // Do not respect config in the case database is not started diff --git a/service/storage_service.hh b/service/storage_service.hh index d7bec0b18d..7a69e2cebf 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -340,6 +340,7 @@ private: gms::feature _computed_columns; gms::feature _cdc_feature; gms::feature _nonfrozen_udts; + gms::feature _hinted_handoff_separate_connection; sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::ka; seastar::named_semaphore _feature_listeners_sem = {1, named_semaphore_exception_factory{"feature listeners"}}; @@ -2378,6 +2379,10 @@ public: return bool(_nonfrozen_udts); } + bool cluster_supports_hinted_handoff_separate_connection() { + return bool(_hinted_handoff_separate_connection); + } + // Returns schema features which all nodes in the cluster advertise as supported. db::schema_features cluster_schema_features() const;