raft: group0_state_machine: extract merger to its own header
Move `merger` to its own header file. Leave the logic of applying commands to `group0_state_machine`. Remove `group0_state_machine` dependencies from `merger` to make it an independent module. Add `static` and `const` keywords to its methods signature. Change it to `class`. Add documentation. With this patch, it is easier to write unit tests for the merger.
This commit is contained in:
@@ -1083,6 +1083,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'lang/wasm_alien_thread_runner.cc',
|
||||
'lang/wasm_instance_cache.cc',
|
||||
'service/raft/group0_state_machine.cc',
|
||||
'service/raft/group0_state_machine_merger.cc',
|
||||
'service/raft/raft_sys_table_storage.cc',
|
||||
'serializer.cc',
|
||||
'release.cc',
|
||||
|
||||
@@ -17,6 +17,7 @@ target_sources(service
|
||||
qos/standard_service_level_distributed_data_accessor.cc
|
||||
raft/discovery.cc
|
||||
raft/group0_state_machine.cc
|
||||
raft/group0_state_machine_merger.cc
|
||||
raft/raft_group0.cc
|
||||
raft/raft_group0_client.cc
|
||||
raft/raft_group_registry.cc
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
#include <optional>
|
||||
#include "db/config.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "service/raft/group0_state_machine_merger.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -57,10 +58,6 @@ static mutation extract_history_mutation(std::vector<canonical_mutation>& muts,
|
||||
return res;
|
||||
}
|
||||
|
||||
static mutation convert_history_mutation(canonical_mutation m, const data_dictionary::database db) {
|
||||
return m.to_mutation(db.find_schema(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY));
|
||||
}
|
||||
|
||||
static future<> write_mutations_to_database(storage_proxy& proxy, gms::inet_address from, std::vector<canonical_mutation> cms) {
|
||||
std::vector<mutation> mutations;
|
||||
mutations.reserve(cms.size());
|
||||
@@ -77,169 +74,51 @@ static future<> write_mutations_to_database(storage_proxy& proxy, gms::inet_addr
|
||||
co_await proxy.mutate_locally(std::move(mutations), tracing::trace_state_ptr());
|
||||
}
|
||||
|
||||
future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merger) {
|
||||
auto [_cmd, history] = merger.merge();
|
||||
auto cmd = std::move(_cmd);
|
||||
|
||||
// We assume that `cmd.change` was constructed using group0 state which was observed *after* `cmd.prev_state_id` was obtained.
|
||||
// It is now important that we apply the change *before* we append the group0 state ID to the history table.
|
||||
//
|
||||
// If we crash before appending the state ID, when we reapply the command after restart, the change will be applied because
|
||||
// the state ID was not yet appended so the above check will pass.
|
||||
|
||||
// TODO: reapplication of a command after a crash may require contacting a quorum (we need to learn that the command
|
||||
// is committed from a leader). But we may want to ensure that group 0 state is consistent after restart even without
|
||||
// access to quorum, which means we cannot allow partially applied commands. We need to ensure that either the entire
|
||||
// change is applied and the state ID is updated or none of this happens.
|
||||
// E.g. use a write-ahead-entry which contains all this information and make sure it's replayed during restarts.
|
||||
|
||||
co_await std::visit(make_visitor(
|
||||
[&] (schema_change& chng) -> future<> {
|
||||
return _mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations));
|
||||
},
|
||||
[&] (broadcast_table_query& query) -> future<> {
|
||||
auto result = co_await service::broadcast_tables::execute_broadcast_table_query(_sp, query.query, cmd.new_state_id);
|
||||
_client.set_query_result(cmd.new_state_id, std::move(result));
|
||||
},
|
||||
[&] (topology_change& chng) -> future<> {
|
||||
co_await write_mutations_to_database(_sp, cmd.creator_addr, std::move(chng.mutations));
|
||||
co_await _ss.topology_transition(_cdc_gen_svc);
|
||||
},
|
||||
[&] (write_mutations& muts) -> future<> {
|
||||
return write_mutations_to_database(_sp, cmd.creator_addr, std::move(muts.mutations));
|
||||
}
|
||||
), cmd.change);
|
||||
|
||||
co_await _sp.mutate_locally({std::move(history)}, nullptr);
|
||||
}
|
||||
|
||||
future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
|
||||
slogger.trace("apply() is called with {} commands", command.size());
|
||||
|
||||
struct merger {
|
||||
std::vector<group0_command> cmd_to_merge;
|
||||
std::optional<mutation> merged_history_mutation;
|
||||
utils::UUID last_group0_state_id;
|
||||
group0_state_machine& sm;
|
||||
size_t size = 0;
|
||||
semaphore_units<> read_apply_mutex_holder;
|
||||
const size_t max_command_size;
|
||||
merger(group0_state_machine& sm_, utils::UUID id, semaphore_units<> mux) : last_group0_state_id(id)
|
||||
, sm(sm_)
|
||||
, read_apply_mutex_holder(std::move(mux))
|
||||
// max_mutation_size = 1/2 of commitlog segment size, thus max_command_size is set 1/3 of commitlog segment size to leave space for metadata.
|
||||
, max_command_size(sm._sp.data_dictionary().get_config().commitlog_segment_size_in_mb() * 1024 * 1024 / 3) {}
|
||||
|
||||
size_t cmd_size(group0_command& cmd) {
|
||||
if (holds_alternative<broadcast_table_query>(cmd.change)) {
|
||||
return 0;
|
||||
}
|
||||
auto r = get_command_mutations(cmd) | boost::adaptors::transformed([] (const canonical_mutation& m) { return m.representation().size(); });
|
||||
return std::accumulate(std::begin(r), std::end(r), size_t(0));
|
||||
}
|
||||
bool can_merge(group0_command& cmd, size_t s) {
|
||||
if (!cmd_to_merge.empty()) {
|
||||
// broadcast table commands or different type of commands cannot be merged
|
||||
if (cmd_to_merge[0].change.index() != cmd.change.index() || holds_alternative<broadcast_table_query>(cmd.change)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Check that merged command will not be larger than half of commitlog segment.
|
||||
// Merged command can be, in fact, much smaller but better to be safe than sorry.
|
||||
// Skip the check for the first command.
|
||||
if (size && size + s > max_command_size) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
void add(group0_command&& cmd, size_t added_size) {
|
||||
slogger.trace("add to merging set new_state_id: {}", cmd.new_state_id);
|
||||
auto m = convert_history_mutation(std::move(cmd.history_append), sm._sp.data_dictionary());
|
||||
// Set `last_group0_state_id` to the maximum of the current value and `cmd.new_state_id`,
|
||||
// but make sure we compare them the same way timeuuids are compared in clustering keys
|
||||
// (i.e. in the same order that the history table is sorted).
|
||||
if (utils::timeuuid_tri_compare(last_group0_state_id, cmd.new_state_id) < 0) {
|
||||
last_group0_state_id = cmd.new_state_id;
|
||||
}
|
||||
cmd_to_merge.push_back(std::move(cmd));
|
||||
size += added_size;
|
||||
if (merged_history_mutation) {
|
||||
merged_history_mutation->apply(std::move(m));
|
||||
} else {
|
||||
merged_history_mutation = std::move(m);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<canonical_mutation>& get_command_mutations(group0_command& cmd) {
|
||||
return std::visit(make_visitor(
|
||||
[] (schema_change& chng) -> std::vector<canonical_mutation>& {
|
||||
return chng.mutations;
|
||||
},
|
||||
[] (broadcast_table_query& query) -> std::vector<canonical_mutation>& {
|
||||
on_internal_error(slogger, "trying to merge broadcast table command");
|
||||
},
|
||||
[] (topology_change& chng) -> std::vector<canonical_mutation>& {
|
||||
return chng.mutations;
|
||||
},
|
||||
[] (write_mutations& muts) -> std::vector<canonical_mutation>& {
|
||||
return muts.mutations;
|
||||
}
|
||||
), cmd.change);
|
||||
}
|
||||
|
||||
std::pair<group0_command, mutation> merge() {
|
||||
auto& cmd = cmd_to_merge.back(); // use metadata from the last merged command
|
||||
slogger.trace("merge new_state_id: {}", cmd.new_state_id);
|
||||
using mutation_set_type = std::unordered_set<mutation, mutation_hash_by_key, mutation_equals_by_key>;
|
||||
std::unordered_map<table_id, mutation_set_type> mutations;
|
||||
|
||||
if (cmd_to_merge.size() > 1) {
|
||||
// skip merging if there is only one command
|
||||
for (auto&& c : cmd_to_merge) {
|
||||
for (auto&& cm : get_command_mutations(c)) {
|
||||
auto schema = sm._sp.data_dictionary().find_schema(cm.column_family_id());
|
||||
auto m = cm.to_mutation(schema);
|
||||
auto& tbl_muts = mutations[cm.column_family_id()];
|
||||
auto it = tbl_muts.find(m);
|
||||
if (it == tbl_muts.end()) {
|
||||
tbl_muts.emplace(std::move(m));
|
||||
} else {
|
||||
const_cast<mutation&>(*it).apply(std::move(m)); // Won't change key
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<canonical_mutation> ms;
|
||||
for (auto&& tables : mutations) {
|
||||
for (auto&& partitions : tables.second) {
|
||||
ms.push_back(canonical_mutation(partitions));
|
||||
}
|
||||
}
|
||||
|
||||
get_command_mutations(cmd) = std::move(ms);
|
||||
}
|
||||
auto res = std::make_pair(std::move(cmd), std::move(merged_history_mutation).value());
|
||||
cmd_to_merge.clear();
|
||||
merged_history_mutation.reset();
|
||||
return res;
|
||||
}
|
||||
|
||||
future<> apply(group0_command cmd, mutation history) {
|
||||
// We assume that `cmd.change` was constructed using group0 state which was observed *after* `cmd.prev_state_id` was obtained.
|
||||
// It is now important that we apply the change *before* we append the group0 state ID to the history table.
|
||||
//
|
||||
// If we crash before appending the state ID, when we reapply the command after restart, the change will be applied because
|
||||
// the state ID was not yet appended so the above check will pass.
|
||||
|
||||
// TODO: reapplication of a command after a crash may require contacting a quorum (we need to learn that the command
|
||||
// is committed from a leader). But we may want to ensure that group 0 state is consistent after restart even without
|
||||
// access to quorum, which means we cannot allow partially applied commands. We need to ensure that either the entire
|
||||
// change is applied and the state ID is updated or none of this happens.
|
||||
// E.g. use a write-ahead-entry which contains all this information and make sure it's replayed during restarts.
|
||||
|
||||
co_await std::visit(make_visitor(
|
||||
[&] (schema_change& chng) -> future<> {
|
||||
return sm._mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations));
|
||||
},
|
||||
[&] (broadcast_table_query& query) -> future<> {
|
||||
auto result = co_await service::broadcast_tables::execute_broadcast_table_query(sm._sp, query.query, cmd.new_state_id);
|
||||
sm._client.set_query_result(cmd.new_state_id, std::move(result));
|
||||
},
|
||||
[&] (topology_change& chng) -> future<> {
|
||||
co_await write_mutations_to_database(sm._sp, cmd.creator_addr, std::move(chng.mutations));
|
||||
co_await sm._ss.topology_transition(sm._cdc_gen_svc);
|
||||
},
|
||||
[&] (write_mutations& muts) -> future<> {
|
||||
return write_mutations_to_database(sm._sp, cmd.creator_addr, std::move(muts.mutations));
|
||||
}
|
||||
), cmd.change);
|
||||
|
||||
co_await sm._sp.mutate_locally({std::move(history)}, nullptr);
|
||||
}
|
||||
|
||||
future<> merge_and_apply() {
|
||||
auto [c, h] = merge();
|
||||
return apply(std::move(c), std::move(h));
|
||||
}
|
||||
|
||||
bool empty() const {
|
||||
return cmd_to_merge.empty();
|
||||
}
|
||||
|
||||
utils::UUID last_id() const {
|
||||
return last_group0_state_id;
|
||||
}
|
||||
};
|
||||
|
||||
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex();
|
||||
|
||||
merger m(*this, co_await db::system_keyspace::get_last_group0_state_id(), std::move(read_apply_mutex_holder));
|
||||
// max_mutation_size = 1/2 of commitlog segment size, thus max_command_size is set 1/3 of commitlog segment size to leave space for metadata.
|
||||
size_t max_command_size = _sp.data_dictionary().get_config().commitlog_segment_size_in_mb() * 1024 * 1024 / 3;
|
||||
group0_state_machine_merger m(co_await db::system_keyspace::get_last_group0_state_id(), std::move(read_apply_mutex_holder),
|
||||
max_command_size, _sp.data_dictionary());
|
||||
|
||||
for (auto&& c : command) {
|
||||
auto is = ser::as_input_stream(c);
|
||||
@@ -267,7 +146,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
|
||||
|
||||
auto size = m.cmd_size(cmd);
|
||||
if (!m.can_merge(cmd, size)) {
|
||||
co_await m.merge_and_apply();
|
||||
co_await merge_and_apply(m);
|
||||
}
|
||||
|
||||
m.add(std::move(cmd), size);
|
||||
@@ -275,7 +154,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
|
||||
|
||||
if (!m.empty()) {
|
||||
// apply remainder
|
||||
co_await m.merge_and_apply();
|
||||
co_await merge_and_apply(m);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ class raft_group0_client;
|
||||
class migration_manager;
|
||||
class storage_proxy;
|
||||
class storage_service;
|
||||
struct group0_state_machine_merger;
|
||||
|
||||
struct schema_change {
|
||||
// Mutations of schema tables (such as `system_schema.keyspaces`, `system_schema.tables` etc.)
|
||||
@@ -84,6 +85,8 @@ class group0_state_machine : public raft_state_machine {
|
||||
storage_proxy& _sp;
|
||||
storage_service& _ss;
|
||||
cdc::generation_service& _cdc_gen_svc;
|
||||
|
||||
future<> merge_and_apply(group0_state_machine_merger& merger);
|
||||
public:
|
||||
group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss, cdc::generation_service& cdc_gen_svc) : _client(client), _mm(mm), _sp(sp), _ss(ss), _cdc_gen_svc(cdc_gen_svc) {}
|
||||
future<> apply(std::vector<raft::command_cref> command) override;
|
||||
|
||||
124
service/raft/group0_state_machine_merger.cc
Normal file
124
service/raft/group0_state_machine_merger.cc
Normal file
@@ -0,0 +1,124 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
#include "db/config.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "service/raft/group0_state_machine_merger.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
static logging::logger slogger("group0_raft_sm_merger");
|
||||
|
||||
static mutation convert_history_mutation(canonical_mutation m, const data_dictionary::database db) {
|
||||
return m.to_mutation(db.find_schema(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY));
|
||||
}
|
||||
|
||||
group0_state_machine_merger::group0_state_machine_merger(utils::UUID id, semaphore_units<> mux, size_t max_command_size, data_dictionary::database db)
|
||||
: _last_group0_state_id(id)
|
||||
, _read_apply_mutex_holder(std::move(mux))
|
||||
, _max_command_size(max_command_size)
|
||||
, _db{std::move(db)} {}
|
||||
|
||||
size_t group0_state_machine_merger::cmd_size(group0_command& cmd) {
|
||||
if (holds_alternative<broadcast_table_query>(cmd.change)) {
|
||||
return 0;
|
||||
}
|
||||
auto r = get_command_mutations(cmd) | boost::adaptors::transformed([] (const canonical_mutation& m) { return m.representation().size(); });
|
||||
return std::accumulate(std::begin(r), std::end(r), size_t(0));
|
||||
}
|
||||
|
||||
bool group0_state_machine_merger::can_merge(group0_command& cmd, size_t s) const {
|
||||
if (!_cmd_to_merge.empty()) {
|
||||
// broadcast table commands or different type of commands cannot be merged
|
||||
if (_cmd_to_merge[0].change.index() != cmd.change.index() || holds_alternative<broadcast_table_query>(cmd.change)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Check that merged command will not be larger than half of commitlog segment.
|
||||
// Merged command can be, in fact, much smaller but better to be safe than sorry.
|
||||
// Skip the check for the first command.
|
||||
if (_size && _size + s > _max_command_size) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void group0_state_machine_merger::add(group0_command&& cmd, size_t added_size) {
|
||||
slogger.trace("add to merging set new_state_id: {}", cmd.new_state_id);
|
||||
auto m = convert_history_mutation(std::move(cmd.history_append), _db);
|
||||
// Set `last_group0_state_id` to the maximum of the current value and `cmd.new_state_id`,
|
||||
// but make sure we compare them the same way timeuuids are compared in clustering keys
|
||||
// (i.e. in the same order that the history table is sorted).
|
||||
if (utils::timeuuid_tri_compare(_last_group0_state_id, cmd.new_state_id) < 0) {
|
||||
_last_group0_state_id = cmd.new_state_id;
|
||||
}
|
||||
_cmd_to_merge.push_back(std::move(cmd));
|
||||
_size += added_size;
|
||||
if (_merged_history_mutation) {
|
||||
_merged_history_mutation->apply(std::move(m));
|
||||
} else {
|
||||
_merged_history_mutation = std::move(m);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<canonical_mutation>& group0_state_machine_merger::get_command_mutations(group0_command& cmd) {
|
||||
return std::visit(make_visitor(
|
||||
[] (schema_change& chng) -> std::vector<canonical_mutation>& {
|
||||
return chng.mutations;
|
||||
},
|
||||
[] (broadcast_table_query& query) -> std::vector<canonical_mutation>& {
|
||||
on_internal_error(slogger, "trying to merge broadcast table command");
|
||||
},
|
||||
[] (topology_change& chng) -> std::vector<canonical_mutation>& {
|
||||
return chng.mutations;
|
||||
},
|
||||
[] (write_mutations& muts) -> std::vector<canonical_mutation>& {
|
||||
return muts.mutations;
|
||||
}
|
||||
), cmd.change);
|
||||
}
|
||||
|
||||
std::pair<group0_command, mutation> group0_state_machine_merger::merge() {
|
||||
auto& cmd = _cmd_to_merge.back(); // use metadata from the last merged command
|
||||
slogger.trace("merge new_state_id: {}", cmd.new_state_id);
|
||||
using mutation_set_type = std::unordered_set<mutation, mutation_hash_by_key, mutation_equals_by_key>;
|
||||
std::unordered_map<table_id, mutation_set_type> mutations;
|
||||
|
||||
if (_cmd_to_merge.size() > 1) {
|
||||
// skip merging if there is only one command
|
||||
for (auto&& c : _cmd_to_merge) {
|
||||
for (auto&& cm : get_command_mutations(c)) {
|
||||
auto schema = _db.find_schema(cm.column_family_id());
|
||||
auto m = cm.to_mutation(schema);
|
||||
auto& tbl_muts = mutations[cm.column_family_id()];
|
||||
auto it = tbl_muts.find(m);
|
||||
if (it == tbl_muts.end()) {
|
||||
tbl_muts.emplace(std::move(m));
|
||||
} else {
|
||||
const_cast<mutation&>(*it).apply(std::move(m)); // Won't change key
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<canonical_mutation> ms;
|
||||
for (auto&& tables : mutations) {
|
||||
for (auto&& partitions : tables.second) {
|
||||
ms.push_back(canonical_mutation(partitions));
|
||||
}
|
||||
}
|
||||
|
||||
get_command_mutations(cmd) = std::move(ms);
|
||||
}
|
||||
auto res = std::make_pair(std::move(cmd), std::move(_merged_history_mutation).value());
|
||||
_cmd_to_merge.clear();
|
||||
_merged_history_mutation.reset();
|
||||
return res;
|
||||
}
|
||||
|
||||
} // end of namespace service
|
||||
77
service/raft/group0_state_machine_merger.hh
Normal file
77
service/raft/group0_state_machine_merger.hh
Normal file
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "service/raft/group0_state_machine.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
|
||||
#include <boost/range/algorithm/transform.hpp>
|
||||
|
||||
namespace service {
|
||||
|
||||
/**
|
||||
* Since most group0 commands are just mutations it is easy to combine them
|
||||
* before passing them to a subsystem they are destined to since it is more
|
||||
* efficient. The logic that handles those mutations in a subsystem will
|
||||
* run once for each batch of commands instead of for each individual
|
||||
* command. This is especially useful when a node catches up to a leader and
|
||||
* gets a lot of commands together.
|
||||
*
|
||||
* The `group0_state_machine_merger` does exactly that. It combines commands
|
||||
* into a single command if possible, but it preserves an order between commands,
|
||||
* so each time it encounters a command to a different subsystem it flushes already
|
||||
* combined batch and starts a new one. This extra safety assumes that
|
||||
* there are dependencies between subsystems managed by group0, so the order
|
||||
* matters. It may be not the case now, but we prefer to be on a safe side.
|
||||
*
|
||||
* Broadcast table commands are not mutations, so they are never combined.
|
||||
*/
|
||||
class group0_state_machine_merger {
|
||||
std::vector<group0_command> _cmd_to_merge;
|
||||
std::optional<mutation> _merged_history_mutation;
|
||||
utils::UUID _last_group0_state_id;
|
||||
size_t _size = 0;
|
||||
semaphore_units<> _read_apply_mutex_holder;
|
||||
const size_t _max_command_size;
|
||||
const data_dictionary::database _db;
|
||||
|
||||
public:
|
||||
group0_state_machine_merger(utils::UUID id, semaphore_units<> mux, size_t max_command_size, data_dictionary::database db);
|
||||
|
||||
// Returns size in bytes of mutations stored in the command.
|
||||
// Broadcast table commands have size 0.
|
||||
static size_t cmd_size(group0_command& cmd);
|
||||
|
||||
// Returns true if the command can be merged with the current batch.
|
||||
// Command can be merged if it is of the same type as commands in the current batch
|
||||
// and the size of the batch will not exceed the limit. Empties the current batch.
|
||||
// Broadcast table commands cannot be merged with any other type of commands.
|
||||
bool can_merge(group0_command& cmd, size_t s) const;
|
||||
|
||||
// Adds a command to the current batch.
|
||||
void add(group0_command&& cmd, size_t added_size);
|
||||
|
||||
// Returns mutations stored in the command.
|
||||
// It must not be called for broadcast table commands.
|
||||
static std::vector<canonical_mutation>& get_command_mutations(group0_command& cmd);
|
||||
|
||||
// Returns a command that contains all mutations from the current batch and
|
||||
// merged history mutation.
|
||||
std::pair<group0_command, mutation> merge();
|
||||
|
||||
bool empty() const {
|
||||
return _cmd_to_merge.empty();
|
||||
}
|
||||
|
||||
utils::UUID last_id() const {
|
||||
return _last_group0_state_id;
|
||||
}
|
||||
};
|
||||
|
||||
} // end of namespace service
|
||||
Reference in New Issue
Block a user