raft: group0_state_machine: extract merger to its own header

Move `merger` to its own header file. Leave the logic of applying commands to
`group0_state_machine`. Remove `group0_state_machine` dependencies from `merger`
to make it an independent module. Add `static` and `const` keywords to its
methods signature. Change it to `class`. Add documentation.

With this patch, it is easier to write unit tests for the merger.
This commit is contained in:
Mikołaj Grzebieluch
2023-07-13 11:56:56 +02:00
parent dad5caf141
commit 96c6e0d0f7
6 changed files with 249 additions and 164 deletions

View File

@@ -1083,6 +1083,7 @@ scylla_core = (['message/messaging_service.cc',
'lang/wasm_alien_thread_runner.cc',
'lang/wasm_instance_cache.cc',
'service/raft/group0_state_machine.cc',
'service/raft/group0_state_machine_merger.cc',
'service/raft/raft_sys_table_storage.cc',
'serializer.cc',
'release.cc',

View File

@@ -17,6 +17,7 @@ target_sources(service
qos/standard_service_level_distributed_data_accessor.cc
raft/discovery.cc
raft/group0_state_machine.cc
raft/group0_state_machine_merger.cc
raft/raft_group0.cc
raft/raft_group0_client.cc
raft/raft_group_registry.cc

View File

@@ -40,6 +40,7 @@
#include <optional>
#include "db/config.hh"
#include "replica/database.hh"
#include "service/raft/group0_state_machine_merger.hh"
namespace service {
@@ -57,10 +58,6 @@ static mutation extract_history_mutation(std::vector<canonical_mutation>& muts,
return res;
}
static mutation convert_history_mutation(canonical_mutation m, const data_dictionary::database db) {
return m.to_mutation(db.find_schema(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY));
}
static future<> write_mutations_to_database(storage_proxy& proxy, gms::inet_address from, std::vector<canonical_mutation> cms) {
std::vector<mutation> mutations;
mutations.reserve(cms.size());
@@ -77,169 +74,51 @@ static future<> write_mutations_to_database(storage_proxy& proxy, gms::inet_addr
co_await proxy.mutate_locally(std::move(mutations), tracing::trace_state_ptr());
}
future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merger) {
auto [_cmd, history] = merger.merge();
auto cmd = std::move(_cmd);
// We assume that `cmd.change` was constructed using group0 state which was observed *after* `cmd.prev_state_id` was obtained.
// It is now important that we apply the change *before* we append the group0 state ID to the history table.
//
// If we crash before appending the state ID, when we reapply the command after restart, the change will be applied because
// the state ID was not yet appended so the above check will pass.
// TODO: reapplication of a command after a crash may require contacting a quorum (we need to learn that the command
// is committed from a leader). But we may want to ensure that group 0 state is consistent after restart even without
// access to quorum, which means we cannot allow partially applied commands. We need to ensure that either the entire
// change is applied and the state ID is updated or none of this happens.
// E.g. use a write-ahead-entry which contains all this information and make sure it's replayed during restarts.
co_await std::visit(make_visitor(
[&] (schema_change& chng) -> future<> {
return _mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations));
},
[&] (broadcast_table_query& query) -> future<> {
auto result = co_await service::broadcast_tables::execute_broadcast_table_query(_sp, query.query, cmd.new_state_id);
_client.set_query_result(cmd.new_state_id, std::move(result));
},
[&] (topology_change& chng) -> future<> {
co_await write_mutations_to_database(_sp, cmd.creator_addr, std::move(chng.mutations));
co_await _ss.topology_transition(_cdc_gen_svc);
},
[&] (write_mutations& muts) -> future<> {
return write_mutations_to_database(_sp, cmd.creator_addr, std::move(muts.mutations));
}
), cmd.change);
co_await _sp.mutate_locally({std::move(history)}, nullptr);
}
future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
slogger.trace("apply() is called with {} commands", command.size());
struct merger {
std::vector<group0_command> cmd_to_merge;
std::optional<mutation> merged_history_mutation;
utils::UUID last_group0_state_id;
group0_state_machine& sm;
size_t size = 0;
semaphore_units<> read_apply_mutex_holder;
const size_t max_command_size;
merger(group0_state_machine& sm_, utils::UUID id, semaphore_units<> mux) : last_group0_state_id(id)
, sm(sm_)
, read_apply_mutex_holder(std::move(mux))
// max_mutation_size = 1/2 of commitlog segment size, thus max_command_size is set 1/3 of commitlog segment size to leave space for metadata.
, max_command_size(sm._sp.data_dictionary().get_config().commitlog_segment_size_in_mb() * 1024 * 1024 / 3) {}
size_t cmd_size(group0_command& cmd) {
if (holds_alternative<broadcast_table_query>(cmd.change)) {
return 0;
}
auto r = get_command_mutations(cmd) | boost::adaptors::transformed([] (const canonical_mutation& m) { return m.representation().size(); });
return std::accumulate(std::begin(r), std::end(r), size_t(0));
}
bool can_merge(group0_command& cmd, size_t s) {
if (!cmd_to_merge.empty()) {
// broadcast table commands or different type of commands cannot be merged
if (cmd_to_merge[0].change.index() != cmd.change.index() || holds_alternative<broadcast_table_query>(cmd.change)) {
return false;
}
}
// Check that merged command will not be larger than half of commitlog segment.
// Merged command can be, in fact, much smaller but better to be safe than sorry.
// Skip the check for the first command.
if (size && size + s > max_command_size) {
return false;
}
return true;
}
void add(group0_command&& cmd, size_t added_size) {
slogger.trace("add to merging set new_state_id: {}", cmd.new_state_id);
auto m = convert_history_mutation(std::move(cmd.history_append), sm._sp.data_dictionary());
// Set `last_group0_state_id` to the maximum of the current value and `cmd.new_state_id`,
// but make sure we compare them the same way timeuuids are compared in clustering keys
// (i.e. in the same order that the history table is sorted).
if (utils::timeuuid_tri_compare(last_group0_state_id, cmd.new_state_id) < 0) {
last_group0_state_id = cmd.new_state_id;
}
cmd_to_merge.push_back(std::move(cmd));
size += added_size;
if (merged_history_mutation) {
merged_history_mutation->apply(std::move(m));
} else {
merged_history_mutation = std::move(m);
}
}
std::vector<canonical_mutation>& get_command_mutations(group0_command& cmd) {
return std::visit(make_visitor(
[] (schema_change& chng) -> std::vector<canonical_mutation>& {
return chng.mutations;
},
[] (broadcast_table_query& query) -> std::vector<canonical_mutation>& {
on_internal_error(slogger, "trying to merge broadcast table command");
},
[] (topology_change& chng) -> std::vector<canonical_mutation>& {
return chng.mutations;
},
[] (write_mutations& muts) -> std::vector<canonical_mutation>& {
return muts.mutations;
}
), cmd.change);
}
std::pair<group0_command, mutation> merge() {
auto& cmd = cmd_to_merge.back(); // use metadata from the last merged command
slogger.trace("merge new_state_id: {}", cmd.new_state_id);
using mutation_set_type = std::unordered_set<mutation, mutation_hash_by_key, mutation_equals_by_key>;
std::unordered_map<table_id, mutation_set_type> mutations;
if (cmd_to_merge.size() > 1) {
// skip merging if there is only one command
for (auto&& c : cmd_to_merge) {
for (auto&& cm : get_command_mutations(c)) {
auto schema = sm._sp.data_dictionary().find_schema(cm.column_family_id());
auto m = cm.to_mutation(schema);
auto& tbl_muts = mutations[cm.column_family_id()];
auto it = tbl_muts.find(m);
if (it == tbl_muts.end()) {
tbl_muts.emplace(std::move(m));
} else {
const_cast<mutation&>(*it).apply(std::move(m)); // Won't change key
}
}
}
std::vector<canonical_mutation> ms;
for (auto&& tables : mutations) {
for (auto&& partitions : tables.second) {
ms.push_back(canonical_mutation(partitions));
}
}
get_command_mutations(cmd) = std::move(ms);
}
auto res = std::make_pair(std::move(cmd), std::move(merged_history_mutation).value());
cmd_to_merge.clear();
merged_history_mutation.reset();
return res;
}
future<> apply(group0_command cmd, mutation history) {
// We assume that `cmd.change` was constructed using group0 state which was observed *after* `cmd.prev_state_id` was obtained.
// It is now important that we apply the change *before* we append the group0 state ID to the history table.
//
// If we crash before appending the state ID, when we reapply the command after restart, the change will be applied because
// the state ID was not yet appended so the above check will pass.
// TODO: reapplication of a command after a crash may require contacting a quorum (we need to learn that the command
// is committed from a leader). But we may want to ensure that group 0 state is consistent after restart even without
// access to quorum, which means we cannot allow partially applied commands. We need to ensure that either the entire
// change is applied and the state ID is updated or none of this happens.
// E.g. use a write-ahead-entry which contains all this information and make sure it's replayed during restarts.
co_await std::visit(make_visitor(
[&] (schema_change& chng) -> future<> {
return sm._mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations));
},
[&] (broadcast_table_query& query) -> future<> {
auto result = co_await service::broadcast_tables::execute_broadcast_table_query(sm._sp, query.query, cmd.new_state_id);
sm._client.set_query_result(cmd.new_state_id, std::move(result));
},
[&] (topology_change& chng) -> future<> {
co_await write_mutations_to_database(sm._sp, cmd.creator_addr, std::move(chng.mutations));
co_await sm._ss.topology_transition(sm._cdc_gen_svc);
},
[&] (write_mutations& muts) -> future<> {
return write_mutations_to_database(sm._sp, cmd.creator_addr, std::move(muts.mutations));
}
), cmd.change);
co_await sm._sp.mutate_locally({std::move(history)}, nullptr);
}
future<> merge_and_apply() {
auto [c, h] = merge();
return apply(std::move(c), std::move(h));
}
bool empty() const {
return cmd_to_merge.empty();
}
utils::UUID last_id() const {
return last_group0_state_id;
}
};
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex();
merger m(*this, co_await db::system_keyspace::get_last_group0_state_id(), std::move(read_apply_mutex_holder));
// max_mutation_size = 1/2 of commitlog segment size, thus max_command_size is set 1/3 of commitlog segment size to leave space for metadata.
size_t max_command_size = _sp.data_dictionary().get_config().commitlog_segment_size_in_mb() * 1024 * 1024 / 3;
group0_state_machine_merger m(co_await db::system_keyspace::get_last_group0_state_id(), std::move(read_apply_mutex_holder),
max_command_size, _sp.data_dictionary());
for (auto&& c : command) {
auto is = ser::as_input_stream(c);
@@ -267,7 +146,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
auto size = m.cmd_size(cmd);
if (!m.can_merge(cmd, size)) {
co_await m.merge_and_apply();
co_await merge_and_apply(m);
}
m.add(std::move(cmd), size);
@@ -275,7 +154,7 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
if (!m.empty()) {
// apply remainder
co_await m.merge_and_apply();
co_await merge_and_apply(m);
}
}

View File

@@ -22,6 +22,7 @@ class raft_group0_client;
class migration_manager;
class storage_proxy;
class storage_service;
struct group0_state_machine_merger;
struct schema_change {
// Mutations of schema tables (such as `system_schema.keyspaces`, `system_schema.tables` etc.)
@@ -84,6 +85,8 @@ class group0_state_machine : public raft_state_machine {
storage_proxy& _sp;
storage_service& _ss;
cdc::generation_service& _cdc_gen_svc;
future<> merge_and_apply(group0_state_machine_merger& merger);
public:
group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss, cdc::generation_service& cdc_gen_svc) : _client(client), _mm(mm), _sp(sp), _ss(ss), _cdc_gen_svc(cdc_gen_svc) {}
future<> apply(std::vector<raft::command_cref> command) override;

View File

@@ -0,0 +1,124 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "service/raft/group0_state_machine_merger.hh"
namespace service {
static logging::logger slogger("group0_raft_sm_merger");
static mutation convert_history_mutation(canonical_mutation m, const data_dictionary::database db) {
return m.to_mutation(db.find_schema(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY));
}
group0_state_machine_merger::group0_state_machine_merger(utils::UUID id, semaphore_units<> mux, size_t max_command_size, data_dictionary::database db)
: _last_group0_state_id(id)
, _read_apply_mutex_holder(std::move(mux))
, _max_command_size(max_command_size)
, _db{std::move(db)} {}
size_t group0_state_machine_merger::cmd_size(group0_command& cmd) {
if (holds_alternative<broadcast_table_query>(cmd.change)) {
return 0;
}
auto r = get_command_mutations(cmd) | boost::adaptors::transformed([] (const canonical_mutation& m) { return m.representation().size(); });
return std::accumulate(std::begin(r), std::end(r), size_t(0));
}
bool group0_state_machine_merger::can_merge(group0_command& cmd, size_t s) const {
if (!_cmd_to_merge.empty()) {
// broadcast table commands or different type of commands cannot be merged
if (_cmd_to_merge[0].change.index() != cmd.change.index() || holds_alternative<broadcast_table_query>(cmd.change)) {
return false;
}
}
// Check that merged command will not be larger than half of commitlog segment.
// Merged command can be, in fact, much smaller but better to be safe than sorry.
// Skip the check for the first command.
if (_size && _size + s > _max_command_size) {
return false;
}
return true;
}
void group0_state_machine_merger::add(group0_command&& cmd, size_t added_size) {
slogger.trace("add to merging set new_state_id: {}", cmd.new_state_id);
auto m = convert_history_mutation(std::move(cmd.history_append), _db);
// Set `last_group0_state_id` to the maximum of the current value and `cmd.new_state_id`,
// but make sure we compare them the same way timeuuids are compared in clustering keys
// (i.e. in the same order that the history table is sorted).
if (utils::timeuuid_tri_compare(_last_group0_state_id, cmd.new_state_id) < 0) {
_last_group0_state_id = cmd.new_state_id;
}
_cmd_to_merge.push_back(std::move(cmd));
_size += added_size;
if (_merged_history_mutation) {
_merged_history_mutation->apply(std::move(m));
} else {
_merged_history_mutation = std::move(m);
}
}
std::vector<canonical_mutation>& group0_state_machine_merger::get_command_mutations(group0_command& cmd) {
return std::visit(make_visitor(
[] (schema_change& chng) -> std::vector<canonical_mutation>& {
return chng.mutations;
},
[] (broadcast_table_query& query) -> std::vector<canonical_mutation>& {
on_internal_error(slogger, "trying to merge broadcast table command");
},
[] (topology_change& chng) -> std::vector<canonical_mutation>& {
return chng.mutations;
},
[] (write_mutations& muts) -> std::vector<canonical_mutation>& {
return muts.mutations;
}
), cmd.change);
}
std::pair<group0_command, mutation> group0_state_machine_merger::merge() {
auto& cmd = _cmd_to_merge.back(); // use metadata from the last merged command
slogger.trace("merge new_state_id: {}", cmd.new_state_id);
using mutation_set_type = std::unordered_set<mutation, mutation_hash_by_key, mutation_equals_by_key>;
std::unordered_map<table_id, mutation_set_type> mutations;
if (_cmd_to_merge.size() > 1) {
// skip merging if there is only one command
for (auto&& c : _cmd_to_merge) {
for (auto&& cm : get_command_mutations(c)) {
auto schema = _db.find_schema(cm.column_family_id());
auto m = cm.to_mutation(schema);
auto& tbl_muts = mutations[cm.column_family_id()];
auto it = tbl_muts.find(m);
if (it == tbl_muts.end()) {
tbl_muts.emplace(std::move(m));
} else {
const_cast<mutation&>(*it).apply(std::move(m)); // Won't change key
}
}
}
std::vector<canonical_mutation> ms;
for (auto&& tables : mutations) {
for (auto&& partitions : tables.second) {
ms.push_back(canonical_mutation(partitions));
}
}
get_command_mutations(cmd) = std::move(ms);
}
auto res = std::make_pair(std::move(cmd), std::move(_merged_history_mutation).value());
_cmd_to_merge.clear();
_merged_history_mutation.reset();
return res;
}
} // end of namespace service

View File

@@ -0,0 +1,77 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "data_dictionary/data_dictionary.hh"
#include "service/raft/group0_state_machine.hh"
#include "service/storage_proxy.hh"
#include <boost/range/algorithm/transform.hpp>
namespace service {
/**
* Since most group0 commands are just mutations it is easy to combine them
* before passing them to a subsystem they are destined to since it is more
* efficient. The logic that handles those mutations in a subsystem will
* run once for each batch of commands instead of for each individual
* command. This is especially useful when a node catches up to a leader and
* gets a lot of commands together.
*
* The `group0_state_machine_merger` does exactly that. It combines commands
* into a single command if possible, but it preserves an order between commands,
* so each time it encounters a command to a different subsystem it flushes already
* combined batch and starts a new one. This extra safety assumes that
* there are dependencies between subsystems managed by group0, so the order
* matters. It may be not the case now, but we prefer to be on a safe side.
*
* Broadcast table commands are not mutations, so they are never combined.
*/
class group0_state_machine_merger {
std::vector<group0_command> _cmd_to_merge;
std::optional<mutation> _merged_history_mutation;
utils::UUID _last_group0_state_id;
size_t _size = 0;
semaphore_units<> _read_apply_mutex_holder;
const size_t _max_command_size;
const data_dictionary::database _db;
public:
group0_state_machine_merger(utils::UUID id, semaphore_units<> mux, size_t max_command_size, data_dictionary::database db);
// Returns size in bytes of mutations stored in the command.
// Broadcast table commands have size 0.
static size_t cmd_size(group0_command& cmd);
// Returns true if the command can be merged with the current batch.
// Command can be merged if it is of the same type as commands in the current batch
// and the size of the batch will not exceed the limit. Empties the current batch.
// Broadcast table commands cannot be merged with any other type of commands.
bool can_merge(group0_command& cmd, size_t s) const;
// Adds a command to the current batch.
void add(group0_command&& cmd, size_t added_size);
// Returns mutations stored in the command.
// It must not be called for broadcast table commands.
static std::vector<canonical_mutation>& get_command_mutations(group0_command& cmd);
// Returns a command that contains all mutations from the current batch and
// merged history mutation.
std::pair<group0_command, mutation> merge();
bool empty() const {
return _cmd_to_merge.empty();
}
utils::UUID last_id() const {
return _last_group0_state_id;
}
};
} // end of namespace service