Files
scylladb/service/strong_consistency/state_machine.cc

192 lines
8.3 KiB
C++

/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/shard_id.hh>
#include <seastar/core/on_internal_error.hh>
#include "state_machine.hh"
#include "db/schema_tables.hh"
#include "mutation/frozen_mutation.hh"
#include "schema/schema_registry.hh"
#include "serializer_impl.hh"
#include "idl/strong_consistency/state_machine.dist.hh"
#include "idl/strong_consistency/state_machine.dist.impl.hh"
#include "replica/database.hh"
#include "service/migration_manager.hh"
#include "db/system_keyspace.hh"
#include "utils/loading_cache.hh"
#include "utils/error_injection.hh"
using namespace std::chrono_literals;
namespace service::strong_consistency {
static logging::logger logger("sc_state_machine");
class state_machine : public raft_state_machine {
locator::global_tablet_id _tablet;
raft::group_id _group_id;
replica::database& _db;
service::migration_manager& _mm;
db::system_keyspace& _sys_ks;
public:
state_machine(locator::global_tablet_id tablet,
raft::group_id gid,
replica::database& db,
service::migration_manager& mm,
db::system_keyspace& sys_ks)
: _tablet(tablet)
, _group_id(gid)
, _db(db)
, _mm(mm)
, _sys_ks(sys_ks)
{
}
future<> apply(std::vector<raft::command_cref> command) override {
static thread_local logging::logger::rate_limit rate_limit(std::chrono::seconds(10));
try {
co_await utils::get_local_injector().inject("strong_consistency_state_machine_wait_before_apply", utils::wait_for_message(20min));
utils::chunked_vector<frozen_mutation> muts;
muts.reserve(command.size());
for (const auto& c: command) {
auto is = ser::as_input_stream(c);
auto cmd = ser::deserialize(is, std::type_identity<raft_command>{});
muts.push_back(std::move(cmd.mutation));
}
// Hold pointers to schemas until `_db.apply()` is finished
auto schemas = co_await get_schema_and_upgrade_mutations(muts);
co_await _db.apply(std::move(muts), db::no_timeout);
} catch (replica::no_such_column_family&) {
// If the table doesn't exist, it means it was already dropped.
// This cannot happen if the table wasn't created yet on the node
// because the state machine is created only after the table is created
// (see `schema_applier::commit_on_shard()` and `storage_service::commit_token_metadata_change()`).
// In this case, we should just ignore mutations without throwing an error.
logger.log(log_level::warn, rate_limit, "apply(): table {} was already dropped, ignoring mutations", _tablet.table);
}
catch (...) {
throw std::runtime_error(::format(
"tablet {}, group id {}: error while applying mutations {}",
_tablet, _group_id, std::current_exception()));
}
}
future<raft::snapshot_id> take_snapshot() override {
throw std::runtime_error("take_snapshot() not implemented");
}
void drop_snapshot(raft::snapshot_id id) override {
throw std::runtime_error("drop_snapshot() not implemented");
}
future<> load_snapshot(raft::snapshot_id id) override {
return make_ready_future<>();
}
future<> abort() override {
return make_ready_future<>();
}
future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) override {
throw std::runtime_error("transfer_snapshot() not implemented");
}
private:
using column_mappings_cache = utils::loading_cache<table_schema_version, column_mapping>;
using schema_store = std::unordered_map<table_schema_version, std::pair<schema_ptr, column_mappings_cache::value_ptr>>;
future<schema_store> get_schema_and_upgrade_mutations(utils::chunked_vector<frozen_mutation>& muts) {
// Cache column mappings to avoid querying `system.scylla_table_schema_history` multiple times.
static thread_local column_mappings_cache column_mapping_cache(std::numeric_limits<size_t>::max(), 1h, logger);
// Stores schema pointer and optional column mapping for each schema version present in the mutations
schema_store schema_mappings;
bool barrier_executed = false;
auto get_schema = [&] (table_schema_version schema_version) -> future<std::pair<schema_ptr, column_mappings_cache::value_ptr>> {
auto schema = local_schema_registry().get_or_null(schema_version);
if (schema) {
co_return std::pair{std::move(schema), nullptr};
}
// `_db.find_schema()` may throw `replica::no_such_column_family` if the table was already dropped.
schema = _db.find_schema(_tablet.table);
// The column mapping may be already present in the cache from another `apply()` call
auto cm_ptr = column_mapping_cache.find(schema_version);
if (cm_ptr) {
co_return std::pair{std::move(schema), std::move(cm_ptr)};
}
// We may not find the column mapping if the mutation schema is newer than the present schema.
// In this case, we should trigger the barrier to wait for the schema to be updated and then try again.
auto cm_opt = co_await db::schema_tables::get_column_mapping_if_exists(_sys_ks, _tablet.table, schema_version);
if (!cm_opt) {
co_return std::pair{nullptr, nullptr};
}
cm_ptr = co_await column_mapping_cache.get_ptr(schema_version, [cm = std::move(*cm_opt)] (auto schema_version) -> future<column_mapping> {
co_return std::move(cm);
});
co_return std::pair{std::move(schema), std::move(cm_ptr)};
};
auto resolve_schema = [&] (const frozen_mutation& mut) -> future<const schema_store::mapped_type*> {
auto schema_version = mut.schema_version();
auto it = schema_mappings.find(schema_version);
if (it != schema_mappings.end()) {
co_return &it->second;
}
auto schema_cm = co_await get_schema(schema_version);
if (!schema_cm.first && !barrier_executed) {
if (utils::get_local_injector().enter("disable_raft_drop_append_entries_for_specified_group")) {
utils::get_local_injector().disable("raft_drop_incoming_append_entries_for_specified_group");
}
// TODO: pass valid abort source
co_await _mm.get_group0_barrier().trigger();
barrier_executed = true;
schema_cm = co_await get_schema(schema_version);
}
if (schema_cm.first) {
const auto [it, _] = schema_mappings.insert({schema_version, std::move(schema_cm)});
co_return &it->second;
}
co_return nullptr;
};
for (auto& m: muts) {
auto schema_entry = co_await resolve_schema(m);
if (!schema_entry) {
// Old schema are TTLed after 10 days (see comment in `schema_applier::finalize_tables_and_views()`),
// so this error theoretically may be triggered if a node is stuck longer than this.
// But in practice we should do a snapshot much earlier, that's why `on_internal_error()` here.
// And if the table was already dropped, `no_such_column_family` will be dropped earlier.
on_internal_error(logger, fmt::format("couldn't find schema for table {} and mutation schema version {}", _tablet.table, m.schema_version()));
}
if (schema_entry->second) {
m = freeze(m.unfreeze_upgrading(schema_entry->first, *schema_entry->second));
}
}
// We only need vector of schema pointers but we're returning the whole map
// to avoid another allocation
co_return std::move(schema_mappings);
}
};
std::unique_ptr<raft_state_machine> make_state_machine(locator::global_tablet_id tablet,
raft::group_id gid,
replica::database& db,
service::migration_manager& mm,
db::system_keyspace& sys_ks)
{
return std::make_unique<state_machine>(tablet, gid, db, mm, sys_ks);
}
};