service: raft: check and update state IDs during group 0 operations

The group 0 state machine will only modify state during command
application if the provided "previous state ID" is equal to the
last state ID present in the history table. Otherwise, the command will
be a no-op.

To ensure linearizability of group 0 changes, the performer of the
change must first read the last state ID, only then read the state
and send a command for the state machine. If a concurrent change
races with this command and manages to modify the state, we will detect
that the last state ID does not match during `apply`; all calls to
`apply` are serialized, and `apply` adds the new entry to the history
table at the end, after modifying the group 0 state.

The details of this mechanism are abstracted away with `group0_guard`.
To perform a group 0 change, one needs to call `announce`, which
requires a `group0_guard` to be passed in. The only way to obtain a
`group0_guard` is by calling `start_group0_operation`, which underneath
performs a read barrier on group 0, obtains the last state ID from the
history table, and constructs a new state ID that the change will append
to the history table. The read barrier ensures that all previously
completed changes are visible to this operation. The caller can then
perform any necessary validation, construct mutations which modify group
0 state, and finally call `announce`.

The guard also provides a timestamp which is used by the caller
to construct the mutations. The timestamp is obtained from the new state ID.
We ensure that it is greater than the timestamp of the last state ID.
Thus, if the change is successful, the applied mutations will have greater
timestamps than the previously applied mutations.

We also add two locks. The more important one, used to ensure
correctness, is `read_apply_mutex`. It is held when modifying group 0
state (in `apply` and `transfer_snapshot`) and when reading it (it's
taken when obtaining a `group0_guard` and released before a command is
sent in `announce`). Its goal is to ensure that we don't read partial
state, which could happen without it because group 0 state consist of
many parts and `apply` (or `transfer_snapshot`) potentially modifies all
of them. Note: this doesn't give us 100% protection; if we crash in the
middle of `apply` (or `transfer_snapshot`), then after restart we may
read partial state. To remove this possibility we need to ensure that
commands which were being applied before restart but not finished are
re-applied after restart, before anyone can read the state. I left a
TODO in `apply`.

The second lock, `operation_mutex`, is used to improve liveness. It is
taken when obtaining a `group0_guard` and released after a command is
applied (compare to `read_apply_mutex` which is released before a
command is sent). It is not taken inside `apply` or `transfer_snapshot`.
This lock ensures that multiple fibers running on the same node do not
attempt to modify group0 concurrently - this would cause some of them to
fail (due to the concurrent modification protection described above).
This is mostly important during first boot of the first node, when
services start for the first time and try to create their internal
tables. This lock serializes these attempts, ensuring that all of them
succeed.
This commit is contained in:
Kamil Braun
2022-01-06 19:47:35 +01:00
parent 509ac2130f
commit 6a00e790c7
3 changed files with 284 additions and 32 deletions

View File

@@ -17,6 +17,7 @@
#include "schema_registry.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "service/raft/group0_state_machine.hh"
#include "service/migration_listener.hh"
#include "message/messaging_service.hh"
@@ -28,7 +29,7 @@
#include "replica/database.hh"
#include "db/schema_tables.hh"
#include "types/user.hh"
#include "db/schema_tables.hh"
#include "db/system_keyspace.hh"
#include "cql3/functions/user_aggregate.hh"
#include "serialization_visitors.hh"
@@ -38,6 +39,10 @@
#include "serializer_impl.hh"
#include "idl/frozen_schema.dist.impl.hh"
#include "idl/uuid.dist.impl.hh"
#include "idl/raft_storage.dist.hh"
#include "idl/raft_storage.dist.impl.hh"
#include "idl/group0_state_machine.dist.hh"
#include "idl/group0_state_machine.dist.impl.hh"
namespace service {
@@ -52,6 +57,7 @@ static future<schema_ptr> get_schema_definition(table_schema_version v, netw::me
migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms, service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group_registry& raft_gr) :
_notifier(notifier), _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _gossiper(gossiper), _raft_gr(raft_gr)
, _schema_push([this] { return passive_announce(); })
, _group0_read_apply_mutex{1}, _group0_operation_mutex{1}
{
}
@@ -870,15 +876,92 @@ future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoi
return _messaging.send_definitions_update(id, std::move(fm), std::move(cm));
}
/* *** Linearizing group 0 operations ***
*
* Group 0 changes (e.g. schema changes) are performed through Raft commands, which are executing in the same order
* on every node, according to the order they appear in the Raft log
* (executing a command happens in `group0_state_machine::apply`).
* The commands contain mutations which modify tables that store group 0 state.
*
* However, constructing these mutations often requires reading the current state and validating the change against it.
* This happens outside the code which applies the commands in order and may race with it. At the moment of applying
* a command, the mutations stored within may be 'invalid' because a different command managed to be concurrently applied,
* changing the state.
*
* For example, consider the sequence of commands:
*
* C1, C2, C3.
*
* Suppose that mutations inside C2 were constructed on a node which already applied C1. Thus, when applying C2,
* the state of group 0 is the same as when the change was validated and its mutations were constructed.
*
* On the other hand, suppose that mutations inside C3 were also constructed on a node which applied C1, but didn't
* apply C2 yet. This could easily happen e.g. when C2 and C3 were constructed concurrently on two different nodes.
* Thus, when applying C3, the state of group 0 is different than it was when validating the change and constructing
* its mutations: the state consists of the changes from C1 and C2, but when C3 was created, it used the state consisting
* of changes from C1 (but not C2). Thus the mutations in C3 are not valid and we must not apply them.
*
* To protect ourselves from applying such 'obsolete' changes, we detect such commands during `group0_state_machine:apply`
* and skip their mutations.
*
* For this, group 0 state was extended with a 'history table' (system.group0_history), which stores a sequence of
* 'group 0 state IDs' (which are timeuuids). Each group 0 command also holds a unique state ID; if the command is successful,
* the ID is appended to the history table. Each command also stores a 'previous state ID'; the change described by the command
* is only applied when this 'previous state ID' is equal to the last state ID in the history table. If it's different,
* we skip the change.
*
* To perform a group 0 change the user must first read the last state ID from the history table. This happens by obtaining
* a `group0_guard` through `migration_manager::start_group0_operation`; the observed last state ID is stored in
* `_observed_group0_state_id`. `start_group0_operation` also generates a new state ID for this change and stores it in
* `_new_group0_state_id`. We ensure that the new state ID is greater than the observed state ID (in timeuuid order).
*
* The user then reads group 0 state, validates the change against the observed state, and constructs the mutations
* which modify group 0 state. Finally, the user calls `announce`, passing the mutations and the guard.
*
* `announce` constructs a command for the group 0 state machine. The command stores the mutations and the state IDs.
*
* When the command is applied, we compare the stored observed state ID against the last state ID in the history table.
* If it's the same, that means no change happened in between - no other command managed to 'sneak in' between the moment
* the user started the operation and the moment the command was applied.
*
* The user must use `group0_guard::write_timestamp()` when constructing the mutations. The timestamp is extracted
* from the new state ID. This ensures that mutations applied by successful commands have monotonic timestamps.
* Indeed: the state IDs of successful commands are increasing (the previous state ID of a command that is successful
* is equal to the new state ID of the previous successful command, and we ensure that the new state ID of a command
* is greater than the previous state ID of this command).
*
* To perform a linearized group 0 read the user must also obtain a `group0_guard`. This ensures that all previously
* completed changes are visible on this node, as obtaining the guard requires performing a Raft read barrier.
*
* Furthermore, obtaining the guard ensures that we don't read partial state, since it holds a lock that is also taken
* during command application (`_read_apply_mutex_holder`). The lock is released just before sending the command to Raft.
* TODO: we may still read partial state if we crash in the middle of command application.
* See `group0_state_machine::apply` for a proposed fix.
*
* Obtaining the guard also ensures that there is no concurrent group 0 operation running on this node using another lock
* (`_operation_mutex_holder`); if we allowed multiple concurrent operations to run, some of them could fail
* due to the state ID protection. Concurrent operations may still run on different nodes. This lock is thus used
* for improving liveness of operations running on the same node by serializing them.
*/
struct group0_guard::impl {
api::timestamp_type _write_timestamp;
semaphore_units<> _operation_mutex_holder;
semaphore_units<> _read_apply_mutex_holder;
utils::UUID _observed_group0_state_id;
utils::UUID _new_group0_state_id;
impl(const impl&) = delete;
impl& operator=(const impl&) = delete;
impl(api::timestamp_type write_timestamp)
: _write_timestamp(write_timestamp)
impl(semaphore_units<> operation_mutex_holder, semaphore_units<> read_apply_mutex_holder, utils::UUID observed_group0_state_id, utils::UUID new_group0_state_id)
: _operation_mutex_holder(std::move(operation_mutex_holder)), _read_apply_mutex_holder(std::move(read_apply_mutex_holder)),
_observed_group0_state_id(observed_group0_state_id), _new_group0_state_id(new_group0_state_id)
{}
void release_read_apply_mutex() {
assert(_read_apply_mutex_holder.count() == 1);
_read_apply_mutex_holder.return_units(1);
}
};
group0_guard::group0_guard(std::unique_ptr<impl> p) : _impl(std::move(p)) {}
@@ -887,24 +970,46 @@ group0_guard::~group0_guard() = default;
group0_guard::group0_guard(group0_guard&&) noexcept = default;
utils::UUID group0_guard::observed_group0_state_id() const {
return _impl->_observed_group0_state_id;
}
utils::UUID group0_guard::new_group0_state_id() const {
return _impl->_new_group0_state_id;
}
api::timestamp_type group0_guard::write_timestamp() const {
return _impl->_write_timestamp;
return utils::UUID_gen::micros_timestamp(_impl->_new_group0_state_id);
}
future<> migration_manager::announce_with_raft(std::vector<mutation> schema, group0_guard guard) {
assert(this_shard_id() == 0);
auto schema_features = _feat.cluster_schema_features();
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(schema, schema_features);
group0_command group0_cmd {
.change{schema_change{
.mutations{adjusted_schema.begin(), adjusted_schema.end()},
}},
.history_append{db::system_keyspace::make_group0_history_state_id_mutation(guard.new_group0_state_id())},
.prev_state_id{guard.observed_group0_state_id()},
.new_state_id{guard.new_group0_state_id()},
.creator_addr{utils::fb_utilities::get_broadcast_address()},
.creator_id{_raft_gr.group0().id()},
};
raft::command cmd;
ser::serialize(cmd, std::vector<canonical_mutation>(adjusted_schema.begin(), adjusted_schema.end()));
// todo: add schema version into command, to apply
// only on condition the version is the same.
// qqq: what happens if there is a command in between?
// there is a new schema version, apply skipped, but
// we don't get a proper error.
co_return co_await _raft_gr.group0().add_entry(std::move(cmd), raft::wait_type::applied);
// TODO: return "retry" error if apply is a no-op - check
// new schema version
ser::serialize(cmd, group0_cmd);
// Release the read_apply mutex so `group0_state_machine::apply` can take it.
guard._impl->release_read_apply_mutex();
co_await _raft_gr.group0().add_entry(std::move(cmd), raft::wait_type::applied);
// dropping the guard releases `_group0_operation_mutex`, allowing other operations
// on this node to proceed
}
future<> migration_manager::announce_without_raft(std::vector<mutation> schema) {
@@ -931,9 +1036,34 @@ future<> migration_manager::announce_without_raft(std::vector<mutation> schema)
// Returns a future on the local application of the schema
future<> migration_manager::announce(std::vector<mutation> schema, group0_guard guard) {
if (_raft_gr.is_enabled()) {
return announce_with_raft(std::move(schema), std::move(guard));
if (this_shard_id() != 0) {
// This should not happen since all places which construct `group0_guard` also check that they are on shard 0.
// Note: `group0_guard::impl` is private to this module, making this easy to verify.
on_internal_error(mlogger, "announce: must run on shard 0");
}
auto new_group0_state_id = guard.new_group0_state_id();
co_await announce_with_raft(std::move(schema), std::move(guard));
if (!(co_await db::system_keyspace::group0_history_contains(new_group0_state_id))) {
// The command was applied but the history table does not contain the new group 0 state ID.
// This means `apply` skipped the change due to previous state ID mismatch.
throw group0_concurrent_modification{};
}
} else {
co_await announce_without_raft(std::move(schema));
}
return announce_without_raft(std::move(schema));
}
static utils::UUID generate_group0_state_id(utils::UUID prev_state_id) {
auto ts = api::new_timestamp();
if (prev_state_id != utils::UUID{}) {
auto lower_bound = utils::UUID_gen::micros_timestamp(prev_state_id);
if (ts <= lower_bound) {
ts = lower_bound + 1;
}
}
return utils::UUID_gen::get_random_time_UUID_from_micros(std::chrono::microseconds{ts});
}
future<group0_guard> migration_manager::start_group0_operation() {
@@ -942,12 +1072,32 @@ future<group0_guard> migration_manager::start_group0_operation() {
on_internal_error(mlogger, "start_group0_operation: must run on shard 0");
}
auto operation_holder = co_await get_units(_group0_operation_mutex, 1);
co_await _raft_gr.group0().read_barrier();
// Take `_group0_read_apply_mutex` *after* read barrier.
// Read barrier may wait for `group0_state_machine::apply` which also takes this mutex.
auto read_apply_holder = co_await get_units(_group0_read_apply_mutex, 1);
auto observed_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id);
co_return group0_guard {
std::make_unique<group0_guard::impl>(
std::move(operation_holder),
std::move(read_apply_holder),
observed_group0_state_id,
new_group0_state_id
)
};
}
co_return group0_guard {
std::make_unique<group0_guard::impl>(
api::new_timestamp()
semaphore_units<>{},
semaphore_units<>{},
utils::UUID{},
generate_group0_state_id(utils::UUID{})
)
};
}

View File

@@ -31,7 +31,9 @@
class canonical_mutation;
class frozen_mutation;
namespace cql3 { namespace functions { class user_function; class user_aggregate; }}
namespace cql3 {
namespace functions { class user_function; class user_aggregate; }
}
namespace netw { class messaging_service; }
namespace gms {
@@ -49,6 +51,9 @@ class storage_proxy;
template<typename M>
concept MergeableMutation = std::is_same<M, canonical_mutation>::value || std::is_same<M, frozen_mutation>::value;
// Obtaining this object means that all previously finished operations on group 0 are visible on this node.
// It is also required in order to perform group 0 changes (through `announce`).
// See `group0_guard::impl` for more detailed explanations.
class group0_guard {
friend class migration_manager;
struct impl;
@@ -60,10 +65,20 @@ public:
~group0_guard();
group0_guard(group0_guard&&) noexcept;
utils::UUID observed_group0_state_id() const;
utils::UUID new_group0_state_id() const;
// Use this timestamp when creating group 0 mutations.
api::timestamp_type write_timestamp() const;
};
class group0_concurrent_modification : public std::runtime_error {
public:
group0_concurrent_modification()
: std::runtime_error("Failed to apply group 0 change due to concurrent modification")
{}
};
class migration_manager : public seastar::async_sharded_service<migration_manager>,
public gms::i_endpoint_state_change_subscriber,
public seastar::peering_sharded_service<migration_manager> {
@@ -82,6 +97,11 @@ private:
service::raft_group_registry& _raft_gr;
serialized_action _schema_push;
utils::UUID _schema_version_to_publish;
friend class group0_state_machine;
// See `group0_guard::impl` for explanation of the purpose of these locks.
semaphore _group0_read_apply_mutex;
semaphore _group0_operation_mutex;
public:
migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group_registry& raft_gr);
@@ -153,10 +173,14 @@ public:
future<std::vector<mutation>> prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type);
// The function needs to be called if the user wants to read most up-to-date group0 state (including schema state)
// The function needs to be called if the user wants to read most up-to-date group 0 state (including schema state)
// (the function ensures that all previously finished group0 operations are visible on this node) or to write it.
//
// Call this *before* reading group 0 state (e.g. when performing a schema change, call this before validation).
// Use `group0_guard::write_timestamp()` when creating mutations which modify group 0 (e.g. schema tables mutations).
//
// Call ONLY on shard 0.
// Requires a quorum of nodes to be available.
// Requires a quorum of nodes to be available in order to finish.
future<group0_guard> start_group0_operation();
// used to check if raft is enabled on the cluster

View File

@@ -14,27 +14,87 @@
#include "frozen_schema.hh"
#include "serialization_visitors.hh"
#include "serializer.hh"
#include "idl/frozen_schema.dist.hh"
#include "idl/uuid.dist.hh"
#include "serializer_impl.hh"
#include "idl/frozen_schema.dist.impl.hh"
#include "idl/uuid.dist.hh"
#include "idl/uuid.dist.impl.hh"
#include "idl/frozen_schema.dist.hh"
#include "idl/frozen_schema.dist.impl.hh"
#include "idl/group0_state_machine.dist.hh"
#include "idl/group0_state_machine.dist.impl.hh"
#include "idl/raft_storage.dist.hh"
#include "idl/raft_storage.dist.impl.hh"
#include "service/migration_manager.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
namespace service {
static logging::logger slogger("schema_raft_sm");
static logging::logger slogger("group0_raft_sm");
static mutation extract_history_mutation(std::vector<canonical_mutation>& muts, const data_dictionary::database db) {
auto s = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY);
auto it = std::find_if(muts.begin(), muts.end(), [history_table_id = s->id()]
(canonical_mutation& m) { return m.column_family_id() == history_table_id; });
if (it == muts.end()) {
on_internal_error(slogger, "group0 history table mutation not found");
}
auto res = it->to_mutation(s);
muts.erase(it);
return res;
}
static mutation convert_history_mutation(canonical_mutation m, const data_dictionary::database db) {
return m.to_mutation(db.find_schema(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY));
}
future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
(void)_sp;
slogger.trace("apply() is called");
for (auto&& c : command) {
auto is = ser::as_input_stream(c);
std::vector<canonical_mutation> mutations =
ser::deserialize(is, boost::type<std::vector<canonical_mutation>>());
auto cmd = ser::deserialize(is, boost::type<group0_command>{});
slogger.trace("merging schema mutations");
co_await _mm.merge_schema_from(netw::messaging_service::msg_addr(gms::inet_address{}), std::move(mutations));
slogger.trace("cmd: prev_state_id: {}, new_state_id: {}, creator_addr: {}, creator_id: {}",
cmd.prev_state_id, cmd.new_state_id, cmd.creator_addr, cmd.creator_id);
slogger.trace("cmd.history_append: {}", cmd.history_append);
auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
if (cmd.prev_state_id) {
auto last_group0_state_id = co_await db::system_keyspace::get_last_group0_state_id();
if (*cmd.prev_state_id != last_group0_state_id) {
// This command used obsolete state. Make it a no-op.
// BTW. on restart, all commands after last snapshot descriptor become no-ops even when they originally weren't no-ops.
// This is because we don't restart from snapshot descriptor, but using current state of the tables so the last state ID
// is the one given by the last command.
// Similar thing may happen when we pull group0 state in transfer_snapshot - we pull the latest state of remote tables,
// not state at the snapshot descriptor.
slogger.trace("cmd.prev_state_id ({}) different than last group 0 state ID in history table ({})",
cmd.prev_state_id, last_group0_state_id);
continue;
}
} else {
slogger.trace("unconditional modification, cmd.new_state_id: {}", cmd.new_state_id);
}
// We assume that `cmd.change` was constructed using group0 state which was observed *after* `cmd.prev_state_id` was obtained.
// It is now important that we apply the change *before* we append the group0 state ID to the history table.
//
// If we crash before appending the state ID, when we reapply the command after restart, the change will be applied because
// the state ID was not yet appended so the above check will pass.
// TODO: reapplication of a command after a crash may require contacting a quorum (we need to learn that the command
// is committed from a leader). But we may want to ensure that group 0 state is consistent after restart even without
// access to quorum, which means we cannot allow partially applied commands. We need to ensure that either the entire
// change is applied and the state ID is updated or none of this happens.
// E.g. use a write-ahead-entry which contains all this information and make sure it's replayed during restarts.
co_await std::visit(make_visitor(
[&] (schema_change& chng) -> future<> {
return _mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations));
}
), cmd.change);
co_await _sp.mutate_locally({convert_history_mutation(std::move(cmd.history_append), _sp.data_dictionary())}, nullptr);
}
}
@@ -51,10 +111,28 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
}
future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) {
// Note that this may bring newer state than the schema state machine raft's
// Note that this may bring newer state than the group0 state machine raft's
// log, so some raft entries may be double applied, but since the state
// machine idempotent it is not a problem.
return _mm.submit_migration_task(from, false);
// machine is idempotent it is not a problem.
slogger.trace("transfer snapshot from {} index {} snp id {}", from, snp.idx, snp.id);
netw::messaging_service::msg_addr addr{from, 0};
// (Ab)use MIGRATION_REQUEST to also transfer group0 history table mutation besides schema tables mutations.
auto [_, cm] = co_await _mm._messaging.send_migration_request(addr, netw::schema_pull_options { .group0_snapshot_transfer = true });
if (!cm) {
// If we're running this code then remote supports Raft group 0, so it should also support canonical mutations
// (which were introduced a long time ago).
on_internal_error(slogger, "Expected MIGRATION_REQUEST to return canonical mutations");
}
auto history_mut = extract_history_mutation(*cm, _sp.data_dictionary());
// TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`)
auto read_apply_mutex_holder = co_await get_units(_mm._group0_read_apply_mutex, 1);
co_await _mm.merge_schema_from(addr, std::move(*cm));
co_await _sp.mutate_locally({std::move(history_mut)}, nullptr);
}
future<> group0_state_machine::abort() {