Merge "Avoid some memory copies in lwt" from Gleb

* seastar-dev.git gleb/lwt-shared-proposal:
 lwt: pass paxos::proposal as a shared pointer everywhere
 lwt: do not copy proposal in paxos_state::accept
 lwt: make load_paxos_state to take partition_key_view instead of a
    deference
This commit is contained in:
Tomasz Grabiec
2020-04-22 13:41:38 +02:00
5 changed files with 75 additions and 76 deletions

View File

@@ -2190,13 +2190,13 @@ future<std::vector<view_build_progress>> load_view_build_progress() {
});
}
future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, schema_ptr s, gc_clock::time_point now,
future<service::paxos::paxos_state> load_paxos_state(partition_key_view key, schema_ptr s, gc_clock::time_point now,
db::timeout_clock::time_point timeout) {
static auto cql = format("SELECT * FROM system.{} WHERE row_key = ? AND cf_id = ?", PAXOS);
// FIXME: we need execute_cql_with_now()
(void)now;
auto f = execute_cql_with_timeout(cql, timeout, to_legacy(*key.get_compound_type(*s), key.representation()), s->id());
return f.then([s, key] (shared_ptr<cql3::untyped_result_set> results) mutable {
return f.then([s, key = std::move(key)] (shared_ptr<cql3::untyped_result_set> results) mutable {
if (results->empty()) {
return service::paxos::paxos_state();
}

View File

@@ -642,7 +642,7 @@ future<std::vector<view_name>> load_built_views();
future<std::vector<view_build_progress>> load_view_build_progress();
// Paxos related functions
future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, schema_ptr s, gc_clock::time_point now,
future<service::paxos::paxos_state> load_paxos_state(partition_key_view key, schema_ptr s, gc_clock::time_point now,
db::timeout_clock::time_point timeout);
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout);

View File

@@ -125,13 +125,12 @@ future<bool> paxos_state::accept(tracing::trace_state_ptr tr_state, schema_ptr s
utils::latency_counter lc;
lc.start();
return with_locked_key(token, timeout, [proposal = std::move(proposal), schema, tr_state, timeout] () mutable {
return with_locked_key(token, timeout, [&proposal, schema, tr_state, timeout] () mutable {
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
auto f = db::system_keyspace::load_paxos_state(proposal.update.decorated_key(*schema).key(), schema,
gc_clock::time_point(now_in_sec), timeout);
return f.then([proposal = std::move(proposal), tr_state, schema, timeout] (paxos_state state) {
auto f = db::system_keyspace::load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout);
return f.then([&proposal, tr_state, schema, timeout] (paxos_state state) {
return utils::get_local_injector().inject("paxos_state_accept_timeout", timeout).then(
[proposal = std::move(proposal), state = std::move(state), tr_state, schema, timeout] {
[&proposal, state = std::move(state), tr_state, schema, timeout] {
// Accept the proposal if we promised to accept it or the proposal is newer than the one we promised.
// Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected.
if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) {

View File

@@ -303,8 +303,8 @@ class cas_mutation : public mutation_holder {
lw_shared_ptr<paxos::proposal> _proposal;
shared_ptr<paxos_response_handler> _handler;
public:
explicit cas_mutation(paxos::proposal proposal, schema_ptr s, shared_ptr<paxos_response_handler> handler)
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))), _handler(std::move(handler)) {
explicit cas_mutation(lw_shared_ptr<paxos::proposal> proposal, schema_ptr s, shared_ptr<paxos_response_handler> handler)
: _proposal(std::move(proposal)), _handler(std::move(handler)) {
_size = _proposal->update.representation().size();
_schema = std::move(s);
}
@@ -835,25 +835,25 @@ paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& conte
} else {
++_proxy->get_stats().cas_read_unfinished_commit;
}
return do_with(paxos::proposal(ballot, std::move(in_progress->update)),
[this, &contentions] (paxos::proposal& refreshed_in_progress) {
return accept_proposal(refreshed_in_progress, false).then([this, &contentions, &refreshed_in_progress] (bool is_accepted) mutable {
if (is_accepted) {
return learn_decision(std::move(refreshed_in_progress), false).then([] {
return make_ready_future<std::optional<ballot_and_data>>(std::optional<ballot_and_data>());
}).handle_exception_type([] (mutation_write_timeout_exception& e) {
e.type = db::write_type::CAS;
// we're still doing preparation for the paxos rounds, so we want to use the CAS (see cASSANDRA-8672)
return make_exception_future<std::optional<ballot_and_data>>(std::move(e));
});
} else {
paxos::paxos_state::logger.debug("CAS[{}] Some replicas have already promised a higher ballot than ours; aborting", _id);
tracing::trace(tr_state, "Some replicas have already promised a higher ballot than ours; aborting");
// sleep a random amount to give the other proposer a chance to finish
contentions++;
return sleep_and_restart();
}
});
auto refreshed_in_progress = make_lw_shared<paxos::proposal>(ballot, std::move(in_progress->update));
return accept_proposal(refreshed_in_progress, false).then([this, &contentions, refreshed_in_progress] (bool is_accepted) mutable {
if (is_accepted) {
return learn_decision(std::move(refreshed_in_progress), false).then([] {
return make_ready_future<std::optional<ballot_and_data>>(std::optional<ballot_and_data>());
}).handle_exception_type([] (mutation_write_timeout_exception& e) {
e.type = db::write_type::CAS;
// we're still doing preparation for the paxos rounds, so we want to use the CAS (see cASSANDRA-8672)
return make_exception_future<std::optional<ballot_and_data>>(std::move(e));
});
} else {
paxos::paxos_state::logger.debug("CAS[{}] Some replicas have already promised a higher ballot than ours; aborting", _id);
tracing::trace(tr_state, "Some replicas have already promised a higher ballot than ours; aborting");
// sleep a random amount to give the other proposer a chance to finish
contentions++;
return sleep_and_restart();
}
});
}
@@ -869,8 +869,8 @@ paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& conte
if (missing_mrc.size() > 0) {
paxos::paxos_state::logger.debug("CAS[{}] Repairing replicas that missed the most recent commit", _id);
tracing::trace(tr_state, "Repairing replicas that missed the most recent commit");
std::array<std::tuple<paxos::proposal, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>, 1>
m{std::make_tuple(std::move(*summary.most_recent_commit), _schema, _key.token(), std::move(missing_mrc))};
std::array<std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>, 1>
m{std::make_tuple(make_lw_shared<paxos::proposal>(std::move(*summary.most_recent_commit)), _schema, _key.token(), std::move(missing_mrc))};
// create_write_response_handler is overloaded for paxos::proposal and will
// create cas_mutation holder, which consequently will ensure paxos::learn is
// used.
@@ -1055,7 +1055,7 @@ future<paxos::prepare_summary> paxos_response_handler::prepare_ballot(utils::UUI
}
// This function implements accept stage of the Paxos protocol.
future<bool> paxos_response_handler::accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted) {
future<bool> paxos_response_handler::accept_proposal(lw_shared_ptr<paxos::proposal> proposal, bool timeout_if_partially_accepted) {
struct {
// the promise can be set before all replies are received at which point
// the optional will be disengaged so further replies are ignored
@@ -1086,20 +1086,20 @@ future<bool> paxos_response_handler::accept_proposal(const paxos::proposal& prop
auto f = request_tracker.p->get_future();
// We may continue collecting propose responses in the background after the reply is ready
(void)do_with(std::move(request_tracker), shared_from_this(), [this, timeout_if_partially_accepted, &proposal]
(void)do_with(std::move(request_tracker), shared_from_this(), [this, timeout_if_partially_accepted, proposal = std::move(proposal)]
(auto& request_tracker, shared_ptr<paxos_response_handler>& prh) {
paxos::paxos_state::logger.trace("CAS[{}] accept_proposal: sending commit {} to {}", _id, proposal, _live_endpoints);
return parallel_for_each(_live_endpoints, [this, &request_tracker, timeout_if_partially_accepted, &proposal] (gms::inet_address peer) mutable {
paxos::paxos_state::logger.trace("CAS[{}] accept_proposal: sending commit {} to {}", _id, *proposal, _live_endpoints);
return parallel_for_each(_live_endpoints, [this, &request_tracker, timeout_if_partially_accepted, proposal = std::move(proposal)] (gms::inet_address peer) mutable {
return futurize_invoke([&] {
if (fbu::is_me(peer)) {
tracing::trace(tr_state, "accept_proposal: accept {} locally", proposal);
return paxos::paxos_state::accept(tr_state, _schema, proposal.update.decorated_key(*_schema).token(), proposal, _timeout);
tracing::trace(tr_state, "accept_proposal: accept {} locally", *proposal);
return paxos::paxos_state::accept(tr_state, _schema, proposal->update.decorated_key(*_schema).token(), *proposal, _timeout);
} else {
tracing::trace(tr_state, "accept_proposal: send accept {} to {}", proposal, peer);
tracing::trace(tr_state, "accept_proposal: send accept {} to {}", *proposal, peer);
netw::messaging_service& ms = netw::get_local_messaging_service();
return ms.send_paxos_accept(peer, _timeout, proposal, tracing::make_trace_info(tr_state));
return ms.send_paxos_accept(peer, _timeout, *proposal, tracing::make_trace_info(tr_state));
}
}).then_wrapped([this, &request_tracker, timeout_if_partially_accepted, &proposal, peer] (future<bool> accepted_f) {
}).then_wrapped([this, &request_tracker, timeout_if_partially_accepted, proposal, peer] (future<bool> accepted_f) {
if (!request_tracker.p) {
accepted_f.ignore_ready_future();
// Ignore the response since a completion was already signaled.
@@ -1111,11 +1111,11 @@ future<bool> paxos_response_handler::accept_proposal(const paxos::proposal& prop
auto ex = accepted_f.get_exception();
if (is_timeout_exception(ex)) {
paxos::paxos_state::logger.trace("CAS[{}] accept_proposal: timeout while sending proposal {} to {}",
_id, proposal, peer);
_id, *proposal, peer);
is_timeout = true;
} else {
paxos::paxos_state::logger.trace("CAS[{}] accept_proposal: failure while sending proposal {} to {}: {}", _id,
proposal, peer, ex);
*proposal, peer, ex);
request_tracker.errors++;
}
} else {
@@ -1198,9 +1198,9 @@ std::ostream& operator<<(std::ostream& os, const paxos_response_handler& h) {
}
// This function implements learning stage of Paxos protocol
future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool allow_hints) {
tracing::trace(tr_state, "learn_decision: committing {} with cl={}", decision, _cl_for_learn);
paxos::paxos_state::logger.trace("CAS[{}] learn_decision: committing {} with cl={}", _id, decision, _cl_for_learn);
future<> paxos_response_handler::learn_decision(lw_shared_ptr<paxos::proposal> decision, bool allow_hints) {
tracing::trace(tr_state, "learn_decision: committing {} with cl={}", *decision, _cl_for_learn);
paxos::paxos_state::logger.trace("CAS[{}] learn_decision: committing {} with cl={}", _id, *decision, _cl_for_learn);
// FIXME: allow_hints is ignored. Consider if we should follow it and remove if not.
// Right now we do not store hints for when committing decisions.
@@ -1210,7 +1210,7 @@ future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool a
// Attempts to send both kinds of mutations in one shot caused an infinite loop.
future<> f_cdc = make_ready_future<>();
if (_schema->cdc_options().enabled()) {
auto update_mut = decision.update.unfreeze(_schema);
auto update_mut = decision->update.unfreeze(_schema);
const auto base_tbl_id = update_mut.column_family_id();
std::vector<mutation> update_mut_vec{std::move(update_mut)};
@@ -1232,7 +1232,7 @@ future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool a
}
// Path for the "base" mutations
std::array<std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, shared_from_this(), _key.token())};
std::array<std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, shared_from_this(), _key.token())};
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout);
return when_all_succeed(std::move(f_cdc), std::move(f_lwt));
@@ -1930,7 +1930,7 @@ storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_
}
storage_proxy::response_id_type
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& meta,
storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
auto& [commit, s, h, t] = meta;
@@ -1939,7 +1939,7 @@ storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, s
}
storage_proxy::response_id_type
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
storage_proxy::create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
auto& [commit, s, token, endpoints] = meta;
@@ -4427,29 +4427,29 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
++get_stats().cas_write_condition_not_met;
return make_ready_future<std::optional<bool>>(false);
}
return do_with(paxos::proposal(ballot, freeze(*mutation)),
[handler, &contentions] (paxos::proposal& proposal) {
paxos::paxos_state::logger.debug("CAS[{}] precondition is met; proposing client-requested updates for {}",
handler->id(), proposal.ballot);
tracing::trace(handler->tr_state, "CAS precondition is met; proposing client-requested updates for {}",
proposal.ballot);
return handler->accept_proposal(proposal).then([handler, &proposal, &contentions] (bool is_accepted) {
if (is_accepted) {
// The majority (aka a QUORUM) has promised the coordinator to
// accept the action associated with the computed ballot.
// Apply the mutation.
return handler->learn_decision(std::move(proposal)).then([handler] {
paxos::paxos_state::logger.debug("CAS[{}] successful", handler->id());
tracing::trace(handler->tr_state, "CAS successful");
return std::optional<bool>(true);
});
}
paxos::paxos_state::logger.debug("CAS[{}] PAXOS proposal not accepted (pre-empted by a higher ballot)",
handler->id());
tracing::trace(handler->tr_state, "PAXOS proposal not accepted (pre-empted by a higher ballot)");
++contentions;
return sleep_approx_50ms().then([] { return std::optional<bool>(); });
});
auto proposal = make_lw_shared<paxos::proposal>(ballot, freeze(*mutation));
paxos::paxos_state::logger.debug("CAS[{}] precondition is met; proposing client-requested updates for {}",
handler->id(), proposal->ballot);
tracing::trace(handler->tr_state, "CAS precondition is met; proposing client-requested updates for {}",
proposal->ballot);
return handler->accept_proposal(proposal).then([handler, proposal, &contentions] (bool is_accepted) mutable {
if (is_accepted) {
// The majority (aka a QUORUM) has promised the coordinator to
// accept the action associated with the computed ballot.
// Apply the mutation.
return handler->learn_decision(std::move(proposal)).then([handler] {
paxos::paxos_state::logger.debug("CAS[{}] successful", handler->id());
tracing::trace(handler->tr_state, "CAS successful");
return std::optional<bool>(true);
});
}
paxos::paxos_state::logger.debug("CAS[{}] PAXOS proposal not accepted (pre-empted by a higher ballot)",
handler->id());
tracing::trace(handler->tr_state, "PAXOS proposal not accepted (pre-empted by a higher ballot)");
++contentions;
return sleep_approx_50ms().then([] { return std::optional<bool>(); });
});
});
});

View File

@@ -318,9 +318,9 @@ private:
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
response_id_type create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
response_id_type create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
void register_cdc_operation_result_tracker(const std::vector<storage_proxy::unique_response_handler>& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker);
void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout);
@@ -699,8 +699,8 @@ public:
// Steps of the Paxos protocol
future<ballot_and_data> begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write);
future<paxos::prepare_summary> prepare_ballot(utils::UUID ballot);
future<bool> accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true);
future<> learn_decision(paxos::proposal decision, bool allow_hints = false);
future<bool> accept_proposal(lw_shared_ptr<paxos::proposal> proposal, bool timeout_if_partially_accepted = true);
future<> learn_decision(lw_shared_ptr<paxos::proposal> proposal, bool allow_hints = false);
void prune(utils::UUID ballot);
uint64_t id() const {
return _id;