Files
scylladb/service/strong_consistency/state_machine.cc
Petr Gusev 4eee5bc273 strong_consistency: add state_machine and raft_command
These commands will be used by strongly consistent tablets to submit
mutations to Raft. A simple state_machine implementation is introduced
to apply these commands.

We apply commands in batches to reduce commitlog I/O overhead. The
batched variant of database::apply has known atomicity issues. For
example, it does not guarantee atomicity under memory pressure: some
mutations may be published to the memtable while others are blocked in
run_when_memory_available. We will address these issues later.
2026-01-21 14:56:00 +01:00

77 lines
2.2 KiB
C++

/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "state_machine.hh"
#include "serializer_impl.hh"
#include "idl/strong_consistency/state_machine.dist.hh"
#include "idl/strong_consistency/state_machine.dist.impl.hh"
#include "replica/database.hh"
namespace service::strong_consistency {
class state_machine : public raft_state_machine {
locator::global_tablet_id _tablet;
raft::group_id _group_id;
replica::database& _db;
public:
state_machine(locator::global_tablet_id tablet,
raft::group_id gid,
replica::database& db)
: _tablet(tablet)
, _group_id(gid)
, _db(db)
{
}
future<> apply(std::vector<raft::command_cref> command) override {
try {
utils::chunked_vector<frozen_mutation> muts;
muts.reserve(command.size());
for (const auto& c: command) {
auto is = ser::as_input_stream(c);
auto cmd = ser::deserialize(is, std::type_identity<raft_command>{});
muts.push_back(std::move(cmd.mutation));
}
co_await _db.apply(std::move(muts), db::no_timeout);
} catch (...) {
throw std::runtime_error(::format(
"tablet {}, group id {}: error while applying mutations {}",
_tablet, _group_id, std::current_exception()));
}
}
future<raft::snapshot_id> take_snapshot() override {
throw std::runtime_error("take_snapshot() not implemented");
}
void drop_snapshot(raft::snapshot_id id) override {
throw std::runtime_error("drop_snapshot() not implemented");
}
future<> load_snapshot(raft::snapshot_id id) override {
return make_ready_future<>();
}
future<> abort() override {
return make_ready_future<>();
}
future<> transfer_snapshot(raft::server_id from_id, raft::snapshot_descriptor snp) override {
throw std::runtime_error("transfer_snapshot() not implemented");
}
};
std::unique_ptr<raft_state_machine> make_state_machine(locator::global_tablet_id tablet,
raft::group_id gid,
replica::database& db)
{
return std::make_unique<state_machine>(tablet, gid, db);
}
};