From 70adf653410f02076ea47bfa8bbf2ff6c2048c9d Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 23 Oct 2019 15:33:57 +0300 Subject: [PATCH] storage_proxy: make mutation holder responsible for mutation operation Currently the code that manipulates mutations during write need to check what kind of mutations are those and (sometimes) choose different code paths. This patch encapsulates the differences in virtual functions of mutation_holder object, so that high level code will not concern itself with the details. The functions that are added: apply_locally(), apply_remotely() and store_hint(). --- service/storage_proxy.cc | 150 +++++++++++++++++++++++++-------------- service/storage_proxy.hh | 4 ++ 2 files changed, 99 insertions(+), 55 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9d3fd80169..900b0c9cfa 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -136,9 +136,12 @@ protected: schema_ptr _schema; public: virtual ~mutation_holder() {} - // Can return a nullptr - virtual lw_shared_ptr get_mutation_for(gms::inet_address ep) = 0; - virtual std::optional get_ballot() const { return std::nullopt; } // valid only for CAS writes + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) = 0; + virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) = 0; + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) = 0; virtual bool is_shared() = 0; size_t size() const { return _size; @@ -166,8 +169,36 @@ public: _mutations.emplace(m.first, std::move(fm)); } } - virtual lw_shared_ptr get_mutation_for(gms::inet_address ep) override { - return _mutations[ep]; + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { + auto m = _mutations[ep]; + if (m) { + return hm.store_hint(ep, _schema, std::move(m), tr_state); + } else { + return false; + } + } + virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + auto m = _mutations[utils::fb_utilities::get_broadcast_address()]; + if (m) { + tracing::trace(tr_state, "Executing a mutation locally"); + return sp.mutate_locally(_schema, *m, timeout); + } + return make_ready_future<>(); + } + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + auto& ms = netw::get_local_messaging_service(); + auto m = _mutations[ep]; + if (m) { + tracing::trace(tr_state, "Sending a mutation to /{}", ep); + return ms.send_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, *m, + std::move(forward), utils::fb_utilities::get_broadcast_address(), engine().cpu_id(), + response_id, tracing::make_trace_info(tr_state)); + } + sp.got_response(response_id, ep, std::nullopt); + return make_ready_future<>(); } virtual bool is_shared() override { return false; @@ -195,8 +226,22 @@ public: } explicit shared_mutation(const mutation& m) : shared_mutation(frozen_mutation_and_schema{freeze(m), m.schema()}) { } - lw_shared_ptr get_mutation_for(gms::inet_address ep) override { - return _mutation; + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { + return hm.store_hint(ep, _schema, _mutation, tr_state); + } + virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + tracing::trace(tr_state, "Executing a mutation locally"); + return sp.mutate_locally(_schema, *_mutation, timeout); + } + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + tracing::trace(tr_state, "Sending a mutation to /{}", ep); + auto& ms = netw::get_local_messaging_service(); + return ms.send_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, *_mutation, + std::move(forward), utils::fb_utilities::get_broadcast_address(), engine().cpu_id(), + response_id, tracing::make_trace_info(tr_state)); } virtual bool is_shared() override { return true; @@ -207,24 +252,35 @@ public: }; class cas_mutation : public mutation_holder { - lw_shared_ptr _mutation; - utils::UUID _ballot; + lw_shared_ptr _proposal; public: explicit cas_mutation(paxos::proposal proposal , schema_ptr s) - : _mutation(make_lw_shared(std::move(proposal.update))) { - _size = _mutation->representation().size(); + : _proposal(make_lw_shared(std::move(proposal))) { + _size = _proposal->update.representation().size(); _schema = std::move(s); - _ballot = proposal.ballot; } - lw_shared_ptr get_mutation_for(gms::inet_address ep) override { - return _mutation; + virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override { + return false; // CAS does not save hints yet + } + virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + tracing::trace(tr_state, "Executing a learn locally"); + return paxos::paxos_state::learn(_schema, *_proposal, timeout, tr_state); + } + virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) override { + tracing::trace(tr_state, "Sending a learn to /{}", ep); + auto& ms = netw::get_local_messaging_service(); + return ms.send_paxos_learn(netw::messaging_service::msg_addr{ep, 0}, timeout, + *_proposal, std::move(forward), utils::fb_utilities::get_broadcast_address(), + engine().cpu_id(), response_id, tracing::make_trace_info(tr_state)); } - virtual std::optional get_ballot() const { return _ballot; } virtual bool is_shared() override { return true; } virtual void release_mutation() override { - _mutation.release(); + _proposal.release(); } }; @@ -298,9 +354,6 @@ public: bool is_counter() const { return _type == db::write_type::COUNTER; } - bool is_cas() const { - return _type == db::write_type::CAS; - } // While delayed, a request is not throttled. void unthrottle() { @@ -451,14 +504,22 @@ public: const std::vector& get_dead_endpoints() const { return _dead_endpoints; } - lw_shared_ptr get_mutation_for(gms::inet_address ep) { - return _mutation_holder->get_mutation_for(ep); + bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) { + return _mutation_holder->store_hint(hm, ep, tr_state); + } + future<> apply_locally(storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) { + return _mutation_holder->apply_locally(*_proxy, timeout, std::move(tr_state)); + } + future<> apply_remotely(gms::inet_address ep, std::vector&& forward, + storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout, + tracing::trace_state_ptr tr_state) { + return _mutation_holder->apply_remotely(*_proxy, ep, std::move(forward), response_id, timeout, std::move(tr_state)); } const schema_ptr& get_schema() const { return _mutation_holder->schema(); } - const std::optional get_ballot() const { - return _mutation_holder->get_ballot(); + const size_t get_mutation_size() const { + return _mutation_holder->size(); } storage_proxy::response_id_type id() const { return _id; @@ -2129,14 +2190,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo auto my_address = utils::fb_utilities::get_broadcast_address(); // lambda for applying mutation locally - auto lmutate = [handler_ptr, response_id, this, my_address, timeout] (lw_shared_ptr m) mutable { - tracing::trace(handler_ptr->get_trace_state(), "Executing a mutation locally"); - auto s = handler_ptr->get_schema(); - auto f = handler_ptr->is_cas() ? - paxos::paxos_state::learn(s, paxos::proposal(*handler_ptr->get_ballot(), *m), - timeout, handler_ptr->get_trace_state()) : - mutate_locally(std::move(s), *m, timeout); - return f.then([response_id, this, my_address, m, h = std::move(handler_ptr), p = shared_from_this()] { + auto lmutate = [handler_ptr, response_id, this, my_address, timeout] () mutable { + return handler_ptr->apply_locally(timeout, handler_ptr->get_trace_state()) + .then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] { // make mutation alive until it is processed locally, otherwise it // may disappear if write timeouts before this future is ready got_response(response_id, my_address, get_view_update_backlog()); @@ -2144,22 +2200,12 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo }; // lambda for applying mutation remotely - auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &stats] (gms::inet_address coordinator, std::vector&& forward, const frozen_mutation& m) { - auto& ms = netw::get_local_messaging_service(); - auto msize = m.representation().size(); + auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &stats] (gms::inet_address coordinator, std::vector&& forward) { + auto msize = handler_ptr->get_mutation_size(); // can overestimate for repair writes stats.queued_write_bytes += msize; - auto& tr_state = handler_ptr->get_trace_state(); - tracing::trace(tr_state, "Sending a mutation to /{}", coordinator); - - auto f = handler_ptr->is_cas() ? - ms.send_paxos_learn(netw::messaging_service::msg_addr{coordinator, 0}, timeout, - paxos::proposal(*handler_ptr->get_ballot(), m), std::move(forward), my_address, engine().cpu_id(), - response_id, tracing::make_trace_info(tr_state)) : - ms.send_mutation(netw::messaging_service::msg_addr{coordinator, 0}, timeout, m, - std::move(forward), my_address, engine().cpu_id(), response_id, tracing::make_trace_info(tr_state)); - - return f.finally([this, p = shared_from_this(), h = std::move(handler_ptr), msize, &stats] { + return handler_ptr->apply_remotely(coordinator, std::move(forward), response_id, timeout, handler_ptr->get_trace_state()) + .finally([this, p = shared_from_this(), h = std::move(handler_ptr), msize, &stats] { stats.queued_write_bytes -= msize; unthrottle(); }); @@ -2176,9 +2222,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo future<> f = make_ready_future<>(); - lw_shared_ptr m = handler.get_mutation_for(coordinator); - - if (!m || (handler.is_counter() && coordinator == my_address)) { + if (handler.is_counter() && coordinator == my_address) { got_response(response_id, coordinator, std::nullopt); } else { if (!handler.read_repair_write()) { @@ -2188,9 +2232,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo } if (coordinator == my_address) { - f = futurize::apply(lmutate, std::move(m)); + f = futurize_apply(lmutate); } else { - f = futurize::apply(rmutate, coordinator, std::move(forward), *m); + f = futurize_apply(rmutate, coordinator, std::move(forward)); } } @@ -2221,11 +2265,7 @@ size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& m if (hints_enabled(type)) { db::hints::manager& hints_manager = hints_manager_for(type); return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool { - auto m = mh->get_mutation_for(target); - if (!m) { - return 0; - } - return hints_manager.store_hint(target, mh->schema(), std::move(m), tr_state); + return mh->store_hint(hints_manager, target, tr_state); }); } else { return 0; diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 3cf809a47c..68ac70e6b6 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -521,6 +521,10 @@ public: friend class view_update_backlog_broker; friend class view_update_write_response_handler; friend class paxos_response_handler; + friend class mutation_holder; + friend class per_destination_mutation; + friend class shared_mutation; + friend class cas_mutation; }; // A Paxos (AKA Compare And Swap, CAS) protocol involves multiple roundtrips between the coordinator