paxos: co-routinize paxos_state::learn function
This commit is contained in:
@@ -194,60 +194,56 @@ future<bool> paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks,
|
||||
future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout,
|
||||
tracing::trace_state_ptr tr_state) {
|
||||
if (utils::get_local_injector().enter("paxos_error_before_learn")) {
|
||||
return make_exception_future<>(utils::injected_error("injected_error_before_learn"));
|
||||
co_await coroutine::return_exception(utils::injected_error("injected_error_before_learn"));
|
||||
}
|
||||
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
return do_with(std::move(decision), [&sp, &sys_ks, tr_state = std::move(tr_state), schema, timeout] (proposal& decision) {
|
||||
auto f = utils::get_local_injector().inject("paxos_state_learn_timeout", timeout);
|
||||
|
||||
replica::table& cf = sp.get_db().local().find_column_family(schema);
|
||||
db_clock::time_point t = cf.get_truncation_time();
|
||||
auto truncated_at = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch());
|
||||
// When saving a decision, also delete the last accepted proposal. This is just an
|
||||
// optimization to save space.
|
||||
// Even though there is no guarantee we will see decisions in the right order,
|
||||
// because messages can get delayed, so this decision can be older than our current most
|
||||
// recent accepted proposal/committed decision, saving it is always safe due to column timestamps.
|
||||
// Since the mutation uses the decision ballot timestamp, if cell timestamp of any current cell
|
||||
// is strictly greater than the decision one, saving the decision will not erase it.
|
||||
//
|
||||
// The table may have been truncated since the proposal was initiated. In that case, we
|
||||
// don't want to perform the mutation and potentially resurrect truncated data.
|
||||
if (utils::UUID_gen::unix_timestamp(decision.ballot) >= truncated_at) {
|
||||
f = f.then([&sp, schema, &decision, timeout, tr_state] {
|
||||
logger.debug("Committing decision {}", decision);
|
||||
tracing::trace(tr_state, "Committing decision {}", decision);
|
||||
|
||||
// In case current schema is not the same as the schema in the decision
|
||||
// try to look it up first in the local schema_registry cache and upgrade
|
||||
// the mutation using schema from the cache.
|
||||
//
|
||||
// If there's no schema in the cache, then retrieve persisted column mapping
|
||||
// for that version and upgrade the mutation with it.
|
||||
if (decision.update.schema_version() != schema->version()) {
|
||||
on_internal_error(logger, format("schema version in learn does not match current schema"));
|
||||
}
|
||||
|
||||
return sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
|
||||
});
|
||||
} else {
|
||||
logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
|
||||
tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
|
||||
}
|
||||
return f.then([&sys_ks, &decision, schema, timeout] {
|
||||
// We don't need to lock the partition key if there is no gap between loading paxos
|
||||
// state and saving it, and here we're just blindly updating.
|
||||
return utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout, [&sys_ks, &decision, schema, timeout] {
|
||||
return sys_ks.save_paxos_decision(*schema, decision, timeout);
|
||||
});
|
||||
});
|
||||
}).finally([&sp, schema, lc] () mutable {
|
||||
auto stats_updater = defer([&sp, schema, lc] () mutable {
|
||||
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
|
||||
stats.cas_learn.mark(lc.stop().latency());
|
||||
});
|
||||
|
||||
co_await utils::get_local_injector().inject("paxos_state_learn_timeout", timeout);
|
||||
|
||||
replica::table& cf = sp.get_db().local().find_column_family(schema);
|
||||
db_clock::time_point t = cf.get_truncation_time();
|
||||
auto truncated_at = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch());
|
||||
// When saving a decision, also delete the last accepted proposal. This is just an
|
||||
// optimization to save space.
|
||||
// Even though there is no guarantee we will see decisions in the right order,
|
||||
// because messages can get delayed, so this decision can be older than our current most
|
||||
// recent accepted proposal/committed decision, saving it is always safe due to column timestamps.
|
||||
// Since the mutation uses the decision ballot timestamp, if cell timestamp of any current cell
|
||||
// is strictly greater than the decision one, saving the decision will not erase it.
|
||||
//
|
||||
// The table may have been truncated since the proposal was initiated. In that case, we
|
||||
// don't want to perform the mutation and potentially resurrect truncated data.
|
||||
if (utils::UUID_gen::unix_timestamp(decision.ballot) >= truncated_at) {
|
||||
logger.debug("Committing decision {}", decision);
|
||||
tracing::trace(tr_state, "Committing decision {}", decision);
|
||||
|
||||
// In case current schema is not the same as the schema in the decision
|
||||
// try to look it up first in the local schema_registry cache and upgrade
|
||||
// the mutation using schema from the cache.
|
||||
//
|
||||
// If there's no schema in the cache, then retrieve persisted column mapping
|
||||
// for that version and upgrade the mutation with it.
|
||||
if (decision.update.schema_version() != schema->version()) {
|
||||
on_internal_error(logger, format("schema version in learn does not match current schema"));
|
||||
}
|
||||
|
||||
co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
|
||||
} else {
|
||||
logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
|
||||
tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
|
||||
}
|
||||
|
||||
// We don't need to lock the partition key if there is no gap between loading paxos
|
||||
// state and saving it, and here we're just blindly updating.
|
||||
co_await utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout);
|
||||
co_return co_await sys_ks.save_paxos_decision(*schema, decision, timeout);
|
||||
}
|
||||
|
||||
future<> paxos_state::prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
|
||||
|
||||
Reference in New Issue
Block a user