storage_proxy: make mutation holder responsible for mutation operation

Currently the code that manipulates mutations during write need to
check what kind of mutations are those and (sometimes) choose different
code paths. This patch encapsulates the differences in virtual
functions of mutation_holder object, so that high level code will not
concern itself with the details. The functions that are added:
apply_locally(), apply_remotely() and store_hint().
This commit is contained in:
Gleb Natapov
2019-10-23 15:33:57 +03:00
committed by Konstantin Osipov
parent b3e01a45d7
commit 70adf65341
2 changed files with 99 additions and 55 deletions

View File

@@ -136,9 +136,12 @@ protected:
schema_ptr _schema;
public:
virtual ~mutation_holder() {}
// Can return a nullptr
virtual lw_shared_ptr<const frozen_mutation> get_mutation_for(gms::inet_address ep) = 0;
virtual std::optional<utils::UUID> get_ballot() const { return std::nullopt; } // valid only for CAS writes
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) = 0;
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) = 0;
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) = 0;
virtual bool is_shared() = 0;
size_t size() const {
return _size;
@@ -166,8 +169,36 @@ public:
_mutations.emplace(m.first, std::move(fm));
}
}
virtual lw_shared_ptr<const frozen_mutation> get_mutation_for(gms::inet_address ep) override {
return _mutations[ep];
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override {
auto m = _mutations[ep];
if (m) {
return hm.store_hint(ep, _schema, std::move(m), tr_state);
} else {
return false;
}
}
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
auto m = _mutations[utils::fb_utilities::get_broadcast_address()];
if (m) {
tracing::trace(tr_state, "Executing a mutation locally");
return sp.mutate_locally(_schema, *m, timeout);
}
return make_ready_future<>();
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
auto& ms = netw::get_local_messaging_service();
auto m = _mutations[ep];
if (m) {
tracing::trace(tr_state, "Sending a mutation to /{}", ep);
return ms.send_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, *m,
std::move(forward), utils::fb_utilities::get_broadcast_address(), engine().cpu_id(),
response_id, tracing::make_trace_info(tr_state));
}
sp.got_response(response_id, ep, std::nullopt);
return make_ready_future<>();
}
virtual bool is_shared() override {
return false;
@@ -195,8 +226,22 @@ public:
}
explicit shared_mutation(const mutation& m) : shared_mutation(frozen_mutation_and_schema{freeze(m), m.schema()}) {
}
lw_shared_ptr<const frozen_mutation> get_mutation_for(gms::inet_address ep) override {
return _mutation;
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override {
return hm.store_hint(ep, _schema, _mutation, tr_state);
}
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
tracing::trace(tr_state, "Executing a mutation locally");
return sp.mutate_locally(_schema, *_mutation, timeout);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
tracing::trace(tr_state, "Sending a mutation to /{}", ep);
auto& ms = netw::get_local_messaging_service();
return ms.send_mutation(netw::messaging_service::msg_addr{ep, 0}, timeout, *_mutation,
std::move(forward), utils::fb_utilities::get_broadcast_address(), engine().cpu_id(),
response_id, tracing::make_trace_info(tr_state));
}
virtual bool is_shared() override {
return true;
@@ -207,24 +252,35 @@ public:
};
class cas_mutation : public mutation_holder {
lw_shared_ptr<const frozen_mutation> _mutation;
utils::UUID _ballot;
lw_shared_ptr<paxos::proposal> _proposal;
public:
explicit cas_mutation(paxos::proposal proposal , schema_ptr s)
: _mutation(make_lw_shared<const frozen_mutation>(std::move(proposal.update))) {
_size = _mutation->representation().size();
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))) {
_size = _proposal->update.representation().size();
_schema = std::move(s);
_ballot = proposal.ballot;
}
lw_shared_ptr<const frozen_mutation> get_mutation_for(gms::inet_address ep) override {
return _mutation;
virtual bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) override {
return false; // CAS does not save hints yet
}
virtual future<> apply_locally(storage_proxy& sp, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
tracing::trace(tr_state, "Executing a learn locally");
return paxos::paxos_state::learn(_schema, *_proposal, timeout, tr_state);
}
virtual future<> apply_remotely(storage_proxy& sp, gms::inet_address ep, std::vector<gms::inet_address>&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) override {
tracing::trace(tr_state, "Sending a learn to /{}", ep);
auto& ms = netw::get_local_messaging_service();
return ms.send_paxos_learn(netw::messaging_service::msg_addr{ep, 0}, timeout,
*_proposal, std::move(forward), utils::fb_utilities::get_broadcast_address(),
engine().cpu_id(), response_id, tracing::make_trace_info(tr_state));
}
virtual std::optional<utils::UUID> get_ballot() const { return _ballot; }
virtual bool is_shared() override {
return true;
}
virtual void release_mutation() override {
_mutation.release();
_proposal.release();
}
};
@@ -298,9 +354,6 @@ public:
bool is_counter() const {
return _type == db::write_type::COUNTER;
}
bool is_cas() const {
return _type == db::write_type::CAS;
}
// While delayed, a request is not throttled.
void unthrottle() {
@@ -451,14 +504,22 @@ public:
const std::vector<gms::inet_address>& get_dead_endpoints() const {
return _dead_endpoints;
}
lw_shared_ptr<const frozen_mutation> get_mutation_for(gms::inet_address ep) {
return _mutation_holder->get_mutation_for(ep);
bool store_hint(db::hints::manager& hm, gms::inet_address ep, tracing::trace_state_ptr tr_state) {
return _mutation_holder->store_hint(hm, ep, tr_state);
}
future<> apply_locally(storage_proxy::clock_type::time_point timeout, tracing::trace_state_ptr tr_state) {
return _mutation_holder->apply_locally(*_proxy, timeout, std::move(tr_state));
}
future<> apply_remotely(gms::inet_address ep, std::vector<gms::inet_address>&& forward,
storage_proxy::response_id_type response_id, storage_proxy::clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
return _mutation_holder->apply_remotely(*_proxy, ep, std::move(forward), response_id, timeout, std::move(tr_state));
}
const schema_ptr& get_schema() const {
return _mutation_holder->schema();
}
const std::optional<utils::UUID> get_ballot() const {
return _mutation_holder->get_ballot();
const size_t get_mutation_size() const {
return _mutation_holder->size();
}
storage_proxy::response_id_type id() const {
return _id;
@@ -2129,14 +2190,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
auto my_address = utils::fb_utilities::get_broadcast_address();
// lambda for applying mutation locally
auto lmutate = [handler_ptr, response_id, this, my_address, timeout] (lw_shared_ptr<const frozen_mutation> m) mutable {
tracing::trace(handler_ptr->get_trace_state(), "Executing a mutation locally");
auto s = handler_ptr->get_schema();
auto f = handler_ptr->is_cas() ?
paxos::paxos_state::learn(s, paxos::proposal(*handler_ptr->get_ballot(), *m),
timeout, handler_ptr->get_trace_state()) :
mutate_locally(std::move(s), *m, timeout);
return f.then([response_id, this, my_address, m, h = std::move(handler_ptr), p = shared_from_this()] {
auto lmutate = [handler_ptr, response_id, this, my_address, timeout] () mutable {
return handler_ptr->apply_locally(timeout, handler_ptr->get_trace_state())
.then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] {
// make mutation alive until it is processed locally, otherwise it
// may disappear if write timeouts before this future is ready
got_response(response_id, my_address, get_view_update_backlog());
@@ -2144,22 +2200,12 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
};
// lambda for applying mutation remotely
auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &stats] (gms::inet_address coordinator, std::vector<gms::inet_address>&& forward, const frozen_mutation& m) {
auto& ms = netw::get_local_messaging_service();
auto msize = m.representation().size();
auto rmutate = [this, handler_ptr, timeout, response_id, my_address, &stats] (gms::inet_address coordinator, std::vector<gms::inet_address>&& forward) {
auto msize = handler_ptr->get_mutation_size(); // can overestimate for repair writes
stats.queued_write_bytes += msize;
auto& tr_state = handler_ptr->get_trace_state();
tracing::trace(tr_state, "Sending a mutation to /{}", coordinator);
auto f = handler_ptr->is_cas() ?
ms.send_paxos_learn(netw::messaging_service::msg_addr{coordinator, 0}, timeout,
paxos::proposal(*handler_ptr->get_ballot(), m), std::move(forward), my_address, engine().cpu_id(),
response_id, tracing::make_trace_info(tr_state)) :
ms.send_mutation(netw::messaging_service::msg_addr{coordinator, 0}, timeout, m,
std::move(forward), my_address, engine().cpu_id(), response_id, tracing::make_trace_info(tr_state));
return f.finally([this, p = shared_from_this(), h = std::move(handler_ptr), msize, &stats] {
return handler_ptr->apply_remotely(coordinator, std::move(forward), response_id, timeout, handler_ptr->get_trace_state())
.finally([this, p = shared_from_this(), h = std::move(handler_ptr), msize, &stats] {
stats.queued_write_bytes -= msize;
unthrottle();
});
@@ -2176,9 +2222,7 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
future<> f = make_ready_future<>();
lw_shared_ptr<const frozen_mutation> m = handler.get_mutation_for(coordinator);
if (!m || (handler.is_counter() && coordinator == my_address)) {
if (handler.is_counter() && coordinator == my_address) {
got_response(response_id, coordinator, std::nullopt);
} else {
if (!handler.read_repair_write()) {
@@ -2188,9 +2232,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
}
if (coordinator == my_address) {
f = futurize<void>::apply(lmutate, std::move(m));
f = futurize_apply(lmutate);
} else {
f = futurize<void>::apply(rmutate, coordinator, std::move(forward), *m);
f = futurize_apply(rmutate, coordinator, std::move(forward));
}
}
@@ -2221,11 +2265,7 @@ size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& m
if (hints_enabled(type)) {
db::hints::manager& hints_manager = hints_manager_for(type);
return boost::count_if(targets, [this, &mh, tr_state = std::move(tr_state), &hints_manager] (gms::inet_address target) mutable -> bool {
auto m = mh->get_mutation_for(target);
if (!m) {
return 0;
}
return hints_manager.store_hint(target, mh->schema(), std::move(m), tr_state);
return mh->store_hint(hints_manager, target, tr_state);
});
} else {
return 0;

View File

@@ -521,6 +521,10 @@ public:
friend class view_update_backlog_broker;
friend class view_update_write_response_handler;
friend class paxos_response_handler;
friend class mutation_holder;
friend class per_destination_mutation;
friend class shared_mutation;
friend class cas_mutation;
};
// A Paxos (AKA Compare And Swap, CAS) protocol involves multiple roundtrips between the coordinator