paxos: co-routinize paxos_state::accept function
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
#include <exception>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/all.hh>
|
||||
#include "seastar/coroutine/exception.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/paxos/proposal.hh"
|
||||
#include "service/paxos/paxos_state.hh"
|
||||
@@ -151,43 +152,43 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keys
|
||||
|
||||
future<bool> paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
|
||||
clock_type::time_point timeout) {
|
||||
return utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout,
|
||||
[&sp, &sys_ks, token = std::move(token), &proposal, schema, tr_state, timeout] {
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
// FIXME: Handle tablet intra-node migration: #16594.
|
||||
// The shard can change concurrently, so we cannot rely on locking on this shard.
|
||||
return with_locked_key(token, timeout, [&sys_ks, &proposal, schema, tr_state, timeout] () mutable {
|
||||
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
|
||||
auto f = sys_ks.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout);
|
||||
return f.then([&sys_ks, &proposal, tr_state, schema, timeout] (paxos_state state) {
|
||||
// Accept the proposal if we promised to accept it or the proposal is newer than the one we promised.
|
||||
// Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected.
|
||||
if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) {
|
||||
logger.debug("Accepting proposal {}", proposal);
|
||||
tracing::trace(tr_state, "Accepting proposal {}", proposal);
|
||||
co_await utils::get_local_injector().inject("paxos_accept_proposal_timeout", timeout);
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
if (utils::get_local_injector().enter("paxos_error_before_save_proposal")) {
|
||||
return make_exception_future<bool>(utils::injected_error("injected_error_before_save_proposal"));
|
||||
}
|
||||
|
||||
return sys_ks.save_paxos_proposal(*schema, proposal, timeout).then([] {
|
||||
if (utils::get_local_injector().enter("paxos_error_after_save_proposal")) {
|
||||
return make_exception_future<bool>(utils::injected_error("injected_error_after_save_proposal"));
|
||||
}
|
||||
return make_ready_future<bool>(true);
|
||||
});
|
||||
} else {
|
||||
logger.debug("Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
|
||||
tracing::trace(tr_state, "Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
});
|
||||
}).finally([&sp, schema, lc] () mutable {
|
||||
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
|
||||
stats.cas_accept.mark(lc.stop().latency());
|
||||
});
|
||||
auto stats_updater = defer([&sp, schema, lc] () mutable {
|
||||
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
|
||||
stats.cas_accept.mark(lc.stop().latency());
|
||||
});
|
||||
|
||||
// FIXME: Handle tablet intra-node migration: #16594.
|
||||
// The shard can change concurrently, so we cannot rely on locking on this shard.
|
||||
auto guard = co_await get_replica_lock(token, timeout);
|
||||
|
||||
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
|
||||
paxos_state state = co_await sys_ks.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout);
|
||||
|
||||
// Accept the proposal if we promised to accept it or the proposal is newer than the one we promised.
|
||||
// Otherwise the proposal was cutoff by another Paxos proposer and has to be rejected.
|
||||
if (proposal.ballot == state._promised_ballot || proposal.ballot.timestamp() > state._promised_ballot.timestamp()) {
|
||||
logger.debug("Accepting proposal {}", proposal);
|
||||
tracing::trace(tr_state, "Accepting proposal {}", proposal);
|
||||
|
||||
if (utils::get_local_injector().enter("paxos_error_before_save_proposal")) {
|
||||
co_await coroutine::return_exception(utils::injected_error("injected_error_before_save_proposal"));
|
||||
}
|
||||
|
||||
co_await sys_ks.save_paxos_proposal(*schema, proposal, timeout);
|
||||
|
||||
if (utils::get_local_injector().enter("paxos_error_after_save_proposal")) {
|
||||
co_await coroutine::return_exception(utils::injected_error("injected_error_after_save_proposal"));
|
||||
}
|
||||
co_return true;
|
||||
} else {
|
||||
logger.debug("Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
|
||||
tracing::trace(tr_state, "Rejecting proposal for {} because in_progress is now {}", proposal, state._promised_ballot);
|
||||
co_return false;
|
||||
}
|
||||
}
|
||||
|
||||
future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout,
|
||||
|
||||
Reference in New Issue
Block a user