diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9d3fd80169..900b0c9cfa 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -136,9 +136,12 @@ protected: schema_ptr _schema; public: virtual ~mutation_holder() {} - // Can return a nullptr - virtual lw_shared_ptr get_mutation_for(gms::inet_address ep) = 0; - virtual std::optional get_ballot() const { return std::nullopt; } // valid only for CAS writes + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) = 0; + virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) = 0; + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) = 0; virtual bool is_shared() = 0; size_t size() const { return _size; @@ -166,8 +169,36 @@ public: _mutations.emplace(m.first, std::move(fm)); } } - virtual lw_shared_ptr get_mutation_for(gms::inet_address ep) override { - return _mutations[ep]; + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { + auto m = _mutations[ep]; + if (m) { + return hm.store_hint(ep, _schema, std::move(m), tr_state); + } else { + return false; + } + } + virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + auto m = _mutations[utils::fb_utilities::get_broadcast_address()]; + if (m) { + tracing::trace(tr_state, "Executing a mutation locally"); + return sp.mutate_locally(_schema, *m, timeout); + } + return make_ready_future<>(); + } + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + auto& ms = netw::get_local_messaging_service(); + auto m = _mutations[ep]; + if (m) { + tracing::trace(tr_state, "Sending a mutation to /{}", ep); + return ms.send_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, *m, + std::move(forward), utils::fb_utilities::get_broadcast_address(), engine().cpu_id(), + response_id, tracing::make_trace_info(tr_state)); + } + sp.got_response(response_id, ep, std::nullopt); + return make_ready_future<>(); } virtual bool is_shared() override { return false; @@ -195,8 +226,22 @@ public: } explicit shared_mutation(const mutation& m) : shared_mutation(frozen_mutation_and_schema{freeze(m), m.schema()}) { } - lw_shared_ptr get_mutation_for(gms::inet_address ep) override { - return _mutation; + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { + return hm.store_hint(ep, _schema, _mutation, tr_state); + } + virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + tracing::trace(tr_state, "Executing a mutation locally"); + return sp.mutate_locally(_schema, *_mutation, timeout); + } + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + tracing::trace(tr_state, "Sending a mutation to /{}", ep); + auto& ms = netw::get_local_messaging_service(); + return ms.send_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, *_mutation, + std::move(forward), utils::fb_utilities::get_broadcast_address(), engine().cpu_id(), + response_id, tracing::make_trace_info(tr_state)); } virtual bool is_shared() override { return true; @@ -207,24 +252,35 @@ public: }; class cas_mutation : public mutation_holder { - lw_shared_ptr _mutation; - utils::UUID _ballot; + lw_shared_ptr _proposal; public: explicit cas_mutation(paxos::proposal proposal , schema_ptr s) - : _mutation(make_lw_shared(std::move(proposal.update))) { - _size = _mutation->representation().size(); + : _proposal(make_lw_shared(std::move(proposal))) { + _size = _proposal->update.representation().size(); _schema = std::move(s); - _ballot = proposal.ballot; } - lw_shared_ptr get_mutation_for(gms::inet_address ep) override { - return _mutation; + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { + return false; // CAS does not save hints yet + } + virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + tracing::trace(tr_state, "Executing a learn locally"); + return paxos::paxos_state::learn(_schema, *_proposal, timeout, tr_state); + } + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + tracing::trace(tr_state, "Sending a learn to /{}", ep); + auto& ms = netw::get_local_messaging_service(); + return ms.send_paxos_learn(netw::messaging_service::msg_addr{ep, 0}, timeout, + *_proposal, std::move(forward), utils::fb_utilities::get_broadcast_address(), + engine().cpu_id(), response_id, tracing::make_trace_info(tr_state)); } - virtual std::optional get_ballot() const { return _ballot; } virtual bool is_shared() override { return true; } virtual void release_mutation() override { - _mutation.release(); + _proposal.release(); } }; @@ -298,9 +354,6 @@ public: bool is_counter() const { return _type == db::write_type::COUNTER; } - bool is_cas() const { - return _type == db::write_type::CAS; - } // While delayed, a request is not throttled. void unthrottle() { @@ -451,14 +504,22 @@ public: const std::vector& get_dead_endpoints() const { return _dead_endpoints; } - lw_shared_ptr get_mutation_for(gms::inet_address ep) { - return _mutation_holder->get_mutation_for(ep); + bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) { + return _mutation_holder->store_hint(hm, ep, tr_state); + } + future<> apply_locally(storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { + return _mutation_holder->apply_locally(*_proxy, timeout, std::move(tr_state)); + } + future<> apply_remotely(gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) { + return _mutation_holder->apply_remotely(*_proxy, ep, std::move(forward), response_id, timeout, std::move(tr_state)); } const schema_ptr& get_schema() const { return _mutation_holder->schema(); } - const std::optional get_ballot() const { - return _mutation_holder->get_ballot(); + const size_t get_mutation_size() const { + return _mutation_holder->size(); } storage_proxy::response_id_type id() const { return _id; @@ -2129,14 +2190,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo auto my_address = utils::fb_utilities::get_broadcast_address(); // lambda for applying mutation locally - auto lmutate = [handler_ptr, response_id, this, my_address, timeout] (lw_shared_ptr m) mutable { - tracing::trace(handler_ptr->get_trace_state(), "Executing a mutation locally"); - auto s = handler_ptr->get_schema(); - auto f = handler_ptr->is_cas() ? - paxos::paxos_state::learn(s, paxos::proposal(*handler_ptr->get_ballot(), *m), - timeout, handler_ptr->get_trace_state()) : - mutate_locally(std::move(s), *m, timeout); - return f.then([response_id, this, my_address, m, h = std::move(handler_ptr), p = shared_from_this()] { + auto lmutate = [handler_ptr, response_id, this, my_address, timeout] () mutable { + return handler_ptr->apply_locally(timeout, handler_ptr->get_trace_state()) + .then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] { // make mutation alive until it is processed locally, otherwise it // may disappear if write timeouts before this future is ready got_response(response_id, my_address, get_view_update_backlog()); @@ -2144,22 +2200,12 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo }; // lambda for applying mutation remotely - auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &stats] (gms::inet_address coordinator, std::vector&& forward, const frozen_mutation& m) { - auto& ms = netw::get_local_messaging_service(); - auto msize = m.representation().size(); + auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &stats] (gms::inet_address coordinator, std::vector&& forward) { + auto msize = handler_ptr->get_mutation_size(); // can overestimate for repair writes stats.queued_write_bytes += msize; - auto& tr_state = handler_ptr->get_trace_state(); - tracing::trace(tr_state, "Sending a mutation to /{}", coordinator); - - auto f = handler_ptr->is_cas() ? - ms.send_paxos_learn(netw::messaging_service::msg_addr{coordinator, 0}, timeout, - paxos::proposal(*handler_ptr->get_ballot(), m), std::move(forward), my_address, engine().cpu_id(), - response_id, tracing::make_trace_info(tr_state)) : - ms.send_mutation(netw::messaging_service::msg_addr{coordinator, 0}, timeout, m, - std::move(forward), my_address, engine().cpu_id(), response_id, tracing::make_trace_info(tr_state)); - - return f.finally([this, p = shared_from_this(), h = std::move(handler_ptr), msize, &stats] { + return handler_ptr->apply_remotely(coordinator, std::move(forward), response_id, timeout, handler_ptr->get_trace_state()) + .finally([this, p = shared_from_this(), h = std::move(handler_ptr), msize, &stats] { stats.queued_write_bytes -= msize; unthrottle(); }); @@ -2176,9 +2222,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo future<> f = make_ready_future<>(); - lw_shared_ptr m = handler.get_mutation_for(coordinator); - - if (!m || (handler.is_counter() && coordinator == my_address)) { + if (handler.is_counter() && coordinator == my_address) { got_response(response_id, coordinator, std::nullopt); } else { if (!handler.read_repair_write()) { @@ -2188,9 +2232,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo } if (coordinator == my_address) { - f = futurize::apply(lmutate, std::move(m)); + f = futurize_apply(lmutate); } else { - f = futurize::apply(rmutate, coordinator, std::move(forward), *m); + f = futurize_apply(rmutate, coordinator, std::move(forward)); } } @@ -2221,11 +2265,7 @@ size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& m if (hints_enabled(type)) { db::hints::manager& hints_manager = hints_manager_for(type); return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool { - auto m = mh->get_mutation_for(target); - if (!m) { - return 0; - } - return hints_manager.store_hint(target, mh->schema(), std::move(m), tr_state); + return mh->store_hint(hints_manager, target, tr_state); }); } else { return 0; diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 3cf809a47c..68ac70e6b6 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -521,6 +521,10 @@ public: friend class view_update_backlog_broker; friend class view_update_write_response_handler; friend class paxos_response_handler; + friend class mutation_holder; + friend class per_destination_mutation; + friend class shared_mutation; + friend class cas_mutation; }; // A Paxos (AKA Compare And Swap, CAS) protocol involves multiple roundtrips between the coordinator