From ce0ed29ad659ad0639329dff428a281dd2889cc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 29 Jan 2024 15:06:04 +0200 Subject: [PATCH] Merge 'Add an API to trigger snapshot in Raft servers' from Kamil Braun This allows the user of `raft::server` to cause it to create a snapshot and truncate the Raft log (leaving no trailing entries; in the future we may extend the API to specify number of trailing entries left if needed). In a later commit we'll add a REST endpoint to Scylla to trigger group 0 snapshots. One use case for this API is to create group 0 snapshots in Scylla deployments which upgraded to Raft in version 5.2 and started with an empty Raft log with no snapshot at the beginning. This causes problems, e.g. when a new node bootstraps to the cluster, it will not receive a snapshot that would contain both schema and group 0 history, which would then lead to inconsistent schema state and trigger assertion failures as observed in scylladb/scylladb#16683. In 5.4 the logic of initial group 0 setup was changed to start the Raft log with a snapshot at index 1 (ff386e7a445d17ed2199a26e4ac75905b071a91b) but a problem remains with these existing deployments coming from 5.2, we need a way to trigger a snapshot in them (other than performing 1000 arbitrary schema changes). Another potential use case in the future would be to trigger snapshots based on external memory pressure in tablet Raft groups (for strongly consistent tables). The PR adds the API to `raft::server` and a HTTP endpoint that uses it. In a follow-up PR, we plan to modify group 0 server startup logic to automatically call this API if it sees that no snapshot is present yet (to automatically fix the aforementioned 5.2 deployments once they upgrade.) Closes scylladb/scylladb#16816 * github.com:scylladb/scylladb: raft: remove `empty()` from `fsm_output` test: add test for manual triggering of Raft snapshots api: add HTTP endpoint to trigger Raft snapshots raft: server: add `trigger_snapshot` API raft: server: track last persisted snapshot descriptor index raft: server: framework for handling server requests raft: server: inline `poll_fsm_output` raft: server: fix indentation raft: server: move `io_fiber`'s processing of `batch` to a separate function raft: move `poll_output()` from `fsm` to `server` raft: move `_sm_events` from `fsm` to `server` raft: fsm: remove constructor used only in tests raft: fsm: move trace message from `poll_output` to `has_output` raft: fsm: extract `has_output()` raft: pass `max_trailing_entries` through `fsm_output` to `store_snapshot_descriptor` raft: server: pass `*_aborted` to `set_exception` call (cherry picked from commit d202d32f8136a2db75a35423bec31bd5ad16df84) Backport notes: - `has_output()` has a smaller condition in the backported version (because the condition was smaller in `poll_output()`) - `process_fsm_output` has a smaller body (because `io_fiber` had a smaller body) in the backported version - the HTTP API is only started if `raft_group_registry` is started --- api/api-doc/raft.json | 43 ++ api/api.cc | 13 + api/api_init.hh | 3 + api/raft.cc | 70 ++++ api/raft.hh | 18 + configure.py | 2 + main.cc | 7 +- raft/fsm.cc | 28 +- raft/fsm.hh | 40 +- raft/server.cc | 386 ++++++++++++------ raft/server.hh | 16 + test/raft/etcd_test.cc | 24 +- test/raft/fsm_test.cc | 18 +- test/raft/helpers.cc | 4 +- test/raft/helpers.hh | 13 +- .../test_raft_snapshot_request.py | 102 +++++ 16 files changed, 590 insertions(+), 197 deletions(-) create mode 100644 api/api-doc/raft.json create mode 100644 api/raft.cc create mode 100644 api/raft.hh create mode 100644 test/topology_custom/test_raft_snapshot_request.py diff --git a/api/api-doc/raft.json b/api/api-doc/raft.json new file mode 100644 index 0000000000..9673682069 --- /dev/null +++ b/api/api-doc/raft.json @@ -0,0 +1,43 @@ +{ + "apiVersion":"0.0.1", + "swaggerVersion":"1.2", + "basePath":"{{Protocol}}://{{Host}}", + "resourcePath":"/raft", + "produces":[ + "application/json" + ], + "apis":[ + { + "path":"/raft/trigger_snapshot/{group_id}", + "operations":[ + { + "method":"POST", + "summary":"Triggers snapshot creation and log truncation for the given Raft group", + "type":"string", + "nickname":"trigger_snapshot", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"group_id", + "description":"The ID of the group which should get snapshotted", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"path" + }, + { + "name":"timeout", + "description":"Timeout in seconds after which the endpoint returns a failure. If not provided, 60s is used.", + "required":false, + "allowMultiple":false, + "type":"long", + "paramType":"query" + } + ] + } + ] + } + ] +} diff --git a/api/api.cc b/api/api.cc index a9a0a881a0..b0242b09d5 100644 --- a/api/api.cc +++ b/api/api.cc @@ -31,6 +31,7 @@ #include "api/config.hh" #include "task_manager.hh" #include "task_manager_test.hh" +#include "raft.hh" logging::logger apilog("api"); @@ -277,6 +278,18 @@ future<> set_server_task_manager_test(http_context& ctx, lw_shared_ptr set_server_raft(http_context& ctx, sharded& raft_gr) { + auto rb = std::make_shared(ctx.api_doc); + return ctx.http_server.set_routes([rb, &ctx, &raft_gr] (routes& r) { + rb->register_function(r, "raft", "The Raft API"); + set_raft(ctx, r, raft_gr); + }); +} + +future<> unset_server_raft(http_context& ctx) { + return ctx.http_server.set_routes([&ctx] (routes& r) { unset_raft(ctx, r); }); +} + void req_params::process(const request& req) { // Process mandatory parameters for (auto& [name, ent] : params) { diff --git a/api/api_init.hh b/api/api_init.hh index fd50649d45..7cb42ca7b1 100644 --- a/api/api_init.hh +++ b/api/api_init.hh @@ -22,6 +22,7 @@ namespace service { class load_meter; class storage_proxy; class storage_service; +class raft_group_registry; } // namespace service @@ -116,5 +117,7 @@ future<> set_server_compaction_manager(http_context& ctx); future<> set_server_done(http_context& ctx); future<> set_server_task_manager(http_context& ctx); future<> set_server_task_manager_test(http_context& ctx, lw_shared_ptr cfg); +future<> set_server_raft(http_context&, sharded&); +future<> unset_server_raft(http_context&); } diff --git a/api/raft.cc b/api/raft.cc new file mode 100644 index 0000000000..eab13889dd --- /dev/null +++ b/api/raft.cc @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include + +#include "api/api.hh" +#include "api/api-doc/raft.json.hh" + +#include "service/raft/raft_group_registry.hh" + +using namespace seastar::httpd; + +extern logging::logger apilog; + +namespace api { + +namespace r = httpd::raft_json; +using namespace json; + +void set_raft(http_context&, httpd::routes& r, sharded& raft_gr) { + r::trigger_snapshot.set(r, [&raft_gr] (std::unique_ptr req) -> future { + raft::group_id gid{utils::UUID{req->param["group_id"]}}; + auto timeout_dur = std::invoke([timeout_str = req->get_query_param("timeout")] { + if (timeout_str.empty()) { + return std::chrono::seconds{60}; + } + auto dur = std::stoll(timeout_str); + if (dur <= 0) { + throw std::runtime_error{"Timeout must be a positive number."}; + } + return std::chrono::seconds{dur}; + }); + + std::atomic found_srv{false}; + co_await raft_gr.invoke_on_all([gid, timeout_dur, &found_srv] (service::raft_group_registry& raft_gr) -> future<> { + auto* srv = raft_gr.find_server(gid); + if (!srv) { + co_return; + } + + found_srv = true; + abort_on_expiry aoe(lowres_clock::now() + timeout_dur); + apilog.info("Triggering Raft group {} snapshot", gid); + auto result = co_await srv->trigger_snapshot(&aoe.abort_source()); + if (result) { + apilog.info("New snapshot for Raft group {} created", gid); + } else { + apilog.info("Could not create new snapshot for Raft group {}, no new entries applied", gid); + } + }); + + if (!found_srv) { + throw std::runtime_error{fmt::format("Server for group ID {} not found", gid)}; + } + + co_return json_void{}; + }); +} + +void unset_raft(http_context&, httpd::routes& r) { + r::trigger_snapshot.unset(r); +} + +} + diff --git a/api/raft.hh b/api/raft.hh new file mode 100644 index 0000000000..e514a1bc22 --- /dev/null +++ b/api/raft.hh @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "api_init.hh" + +namespace api { + +void set_raft(http_context& ctx, httpd::routes& r, sharded& raft_gr); +void unset_raft(http_context& ctx, httpd::routes& r); + +} diff --git a/configure.py b/configure.py index fde40fa8e5..38c8a67d9d 100755 --- a/configure.py +++ b/configure.py @@ -1077,6 +1077,8 @@ api = ['api/api.cc', Json2Code('api/api-doc/error_injection.json'), 'api/authorization_cache.cc', Json2Code('api/api-doc/authorization_cache.json'), + 'api/raft.cc', + Json2Code('api/api-doc/raft.json'), ] alternator = [ diff --git a/main.cc b/main.cc index aa6d19618a..08668003ac 100644 --- a/main.cc +++ b/main.cc @@ -1164,12 +1164,18 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }).get(); cfg->host_id = sys_ks.local().load_local_host_id().get0(); + std::any stop_raft_api; if (raft_gr.local().is_enabled()) { auto my_raft_id = raft::server_id{cfg->host_id.uuid()}; supervisor::notify("starting Raft Group Registry service"); raft_gr.invoke_on_all([my_raft_id] (service::raft_group_registry& raft_gr) { return raft_gr.start(my_raft_id); }).get(); + + api::set_server_raft(ctx, raft_gr).get(); + stop_raft_api = defer_verbose_shutdown("Raft API", [&ctx] { + api::unset_server_raft(ctx).get(); + }); } else { if (cfg->check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) { startlog.error("Bad configuration: RAFT feature has to be enabled if BROADCAST_TABLES is enabled"); @@ -1177,7 +1183,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl } } - group0_client.init().get(); db::sstables_format_selector sst_format_selector(gossiper.local(), feature_service, db); diff --git a/raft/fsm.cc b/raft/fsm.cc index a86d77d2d1..339a5a570d 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -19,9 +19,10 @@ leader::~leader() { } fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, - index_t commit_idx, failure_detector& failure_detector, fsm_config config) : + index_t commit_idx, failure_detector& failure_detector, fsm_config config, + seastar::condition_variable& sm_events) : _my_id(id), _current_term(current_term), _voted_for(voted_for), - _log(std::move(log)), _failure_detector(failure_detector), _config(config) { + _log(std::move(log)), _failure_detector(failure_detector), _config(config), _sm_events(sm_events) { if (id == raft::server_id{}) { throw std::invalid_argument("raft::fsm: raft instance cannot have id zero"); } @@ -41,10 +42,6 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, } } -fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, - failure_detector& failure_detector, fsm_config config) : - fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {} - future> fsm::wait_for_memory_permit(seastar::abort_source* as, size_t size) { check_is_leader(); @@ -296,20 +293,14 @@ void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) { } } -future fsm::poll_output() { - logger.trace("fsm::poll_output() {} stable index: {} last index: {}", +bool fsm::has_output() const { + logger.trace("fsm::has_output() {} stable index: {} last index: {}", _my_id, _log.stable_idx(), _log.last_idx()); - while (true) { - auto diff = _log.last_idx() - _log.stable_idx(); + auto diff = _log.last_idx() - _log.stable_idx(); - if (diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum || - (is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty()) { - break; - } - co_await _sm_events.wait(); - } - co_return get_output(); + return diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum + || (is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty(); } fsm_output fsm::get_output() { @@ -1019,7 +1010,7 @@ bool fsm::apply_snapshot(snapshot_descriptor snp, size_t max_trailing_entries, s // If the snapshot is local, _commit_idx is larger than snp.idx. // Otherwise snp.idx becomes the new commit index. _commit_idx = std::max(_commit_idx, snp.idx); - _output.snp.emplace(fsm_output::applied_snapshot{snp, local}); + _output.snp.emplace(fsm_output::applied_snapshot{snp, local, max_trailing_entries}); size_t units = _log.apply_snapshot(std::move(snp), max_trailing_entries, max_trailing_bytes); if (is_leader()) { logger.trace("apply_snapshot[{}]: signal {} available units", _my_id, units); @@ -1132,7 +1123,6 @@ void fsm::stop() { // (in particular, abort waits on log_limiter_semaphore and prevent new ones). become_follower({}); } - _sm_events.broken(); } std::ostream& operator<<(std::ostream& os, const fsm& f) { diff --git a/raft/fsm.hh b/raft/fsm.hh index 2688a07fe0..0f0bfe464f 100644 --- a/raft/fsm.hh +++ b/raft/fsm.hh @@ -21,6 +21,11 @@ struct fsm_output { struct applied_snapshot { snapshot_descriptor snp; bool is_local; + + // Always 0 for non-local snapshots. + size_t max_trailing_entries; + + // FIXME: include max_trailing_bytes here and in store_snapshot_descriptor }; std::optional> term_and_vote; std::vector log_entries; @@ -36,14 +41,6 @@ struct fsm_output { std::optional max_read_id_with_quorum; // Set to true if a leadership transfer was aborted since the last output bool abort_leadership_transfer; - - // True if there is no new output - bool empty() const { - return !term_and_vote && - log_entries.size() == 0 && messages.size() == 0 && - committed.size() == 0 && !snp && snps_to_drop.empty() && - !configuration; - } }; struct fsm_config { @@ -136,9 +133,13 @@ struct leader { // in-memory state machine with a catch-all API step(message) // method. The method handles any kind of input and performs the // needed state machine state transitions. To get state machine output -// poll_output() function has to be called. This call produces an output +// get_output() function has to be called. To check first if +// any new output is present, call has_output(). To wait for new +// new output events, use the sm_events condition variable passed +// to fsm constructor; fs` signals it each time new output may appear. +// The get_output() call produces an output // object, which encapsulates a list of actions that must be -// performed until the next poll_output() call can be made. The time is +// performed until the next get_output() call can be made. The time is // represented with a logical timer. The client is responsible for // periodically invoking tick() method, which advances the state // machine time and allows it to track such events as election or @@ -226,7 +227,7 @@ private: std::vector> _messages; // Signaled when there is a IO event to process. - seastar::condition_variable _sm_events; + seastar::condition_variable& _sm_events; // Called when one of the replicas advances its match index // so it may be the case that some entries are committed now. @@ -338,10 +339,8 @@ protected: // For testing public: explicit fsm(server_id id, term_t current_term, server_id voted_for, log log, - index_t commit_idx, failure_detector& failure_detector, fsm_config conf); - - explicit fsm(server_id id, term_t current_term, server_id voted_for, log log, - failure_detector& failure_detector, fsm_config conf); + index_t commit_idx, failure_detector& failure_detector, fsm_config conf, + seastar::condition_variable& sm_events); bool is_leader() const { return std::holds_alternative(_state); @@ -409,12 +408,9 @@ public: // committed to the persistent Raft log afterwards. template const log_entry& add_entry(T command); - // Wait until there is, and return state machine output that - // needs to be handled. - // This includes a list of the entries that need - // to be logged. The logged entries are eventually - // discarded from the state machine after applying a snapshot. - future poll_output(); + // Check if there is any state machine output + // that `get_output()` will return. + bool has_output() const; // Get state machine output, if there is any. Doesn't // wait. It is public for use in testing. @@ -427,7 +423,7 @@ public: // Feed one Raft RPC message into the state machine. // Advances the state machine state and generates output, - // accessible via poll_output(). + // accessible via get_output(). template void step(server_id from, Message&& msg); diff --git a/raft/server.cc b/raft/server.cc index 39a850c2c0..7257ab9fcd 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -98,6 +98,8 @@ public: future add_entry_on_leader(command command, seastar::abort_source* as); void register_metrics() override; private: + seastar::condition_variable _events; + std::unique_ptr _rpc; std::unique_ptr _state_machine; std::unique_ptr _persistence; @@ -112,6 +114,8 @@ private: std::optional _non_joint_conf_commit_promise; // Index of the last entry applied to `_state_machine`. index_t _applied_idx; + // Index of the last persisted snapshot descriptor. + index_t _snapshot_desc_idx; std::list _reads; std::multimap _awaited_indexes; @@ -121,13 +125,20 @@ private: // Signaled when apply index is changed condition_variable _applied_index_changed; + // Signaled when _snapshot_desc_idx is changed + condition_variable _snapshot_desc_idx_changed; + struct stop_apply_fiber{}; // exception to send when apply fiber is needs to be stopepd struct removed_from_config{}; // sent to applier_fiber when we're not a leader and we're outside the current configuration + + struct trigger_snapshot_msg{}; + using applier_fiber_message = std::variant< std::vector, snapshot_descriptor, - removed_from_config>; + removed_from_config, + trigger_snapshot_msg>; queue _apply_entries = queue(10); struct stats { @@ -201,6 +212,16 @@ private: }; absl::flat_hash_map _append_request_status; + struct server_requests { + bool snapshot = false; + + bool empty() const { + return !snapshot; + } + }; + + server_requests _new_server_requests; + // Called to commit entries (on a leader or otherwise). void notify_waiters(std::map& waiters, const std::vector& entries); @@ -212,10 +233,15 @@ private: // to be applied. void signal_applied(); - // This fiber processes FSM output by doing the following steps in order: + // Processes FSM output by doing the following steps in order: // - persist the current term and vote // - persist unstable log entries on disk. // - send out messages + future<> process_fsm_output(index_t& stable_idx, fsm_output&&); + + future<> process_server_requests(server_requests&&); + + // Processes new FSM outputs and server requests as they appear. future<> io_fiber(index_t stable_idx); // This fiber runs in the background and applies committed entries. @@ -265,6 +291,8 @@ private: // A helper to wait for a leader to get elected future<> wait_for_leader(seastar::abort_source* as); + virtual future trigger_snapshot(seastar::abort_source* as) override; + // Get "safe to read" index from a leader future get_read_idx(server_id leader, seastar::abort_source* as); // Wait for an entry with a specific term to get committed or @@ -337,12 +365,14 @@ future<> server_impl::start() { .append_request_threshold = _config.append_request_threshold, .max_log_size = _config.max_log_size, .enable_prevoting = _config.enable_prevoting - }); + }, + _events); _applied_idx = index_t{0}; + _snapshot_desc_idx = index_t{0}; if (snapshot.id) { co_await _state_machine->load_snapshot(snapshot.id); - _applied_idx = snapshot.idx; + _snapshot_desc_idx = _applied_idx = snapshot.idx; } if (!rpc_config.current.empty()) { @@ -403,6 +433,54 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) { } } +future server_impl::trigger_snapshot(seastar::abort_source* as) { + check_not_aborted(); + + if (_applied_idx <= _snapshot_desc_idx) { + logger.debug( + "[{}] trigger_snapshot: last persisted snapshot descriptor index is up-to-date" + ", applied index: {}, persisted snapshot descriptor index: {}, last fsm log index: {}" + ", last fsm snapshot index: {}", _id, _applied_idx, _snapshot_desc_idx, + _fsm->log_last_idx(), _fsm->log_last_snapshot_idx()); + co_return false; + } + + _new_server_requests.snapshot = true; + _events.signal(); + + // Wait for persisted snapshot index to catch up to this index. + auto awaited_idx = _applied_idx; + + logger.debug("[{}] snapshot request waiting for index {}", _id, awaited_idx); + + try { + optimized_optional sub; + if (as) { + as->check(); + sub = as->subscribe([this] () noexcept { _snapshot_desc_idx_changed.broadcast(); }); + assert(sub); // due to `check()` above + } + co_await _snapshot_desc_idx_changed.when([this, as, awaited_idx] { + return (as && as->abort_requested()) || awaited_idx <= _snapshot_desc_idx; + }); + if (as) { + as->check(); + } + } catch (abort_requested_exception&) { + throw request_aborted(); + } catch (seastar::broken_condition_variable&) { + throw request_aborted(); + } + + logger.debug( + "[{}] snapshot request satisfied, awaited index {}, persisted snapshot descriptor index: {}" + ", current applied index {}, last fsm log index {}, last fsm snapshot index {}", + _id, awaited_idx, _snapshot_desc_idx, _applied_idx, + _fsm->log_last_idx(), _fsm->log_last_snapshot_idx()); + + co_return true; +} + future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abort_source* as) { // The entry may have been already committed and even applied // in case it was forwarded to the leader. In this case @@ -917,141 +995,168 @@ static rpc_config_diff diff_address_sets(const server_address_set& prev, const c return result; } +future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batch) { + if (batch.term_and_vote) { + // Current term and vote are always persisted + // together. A vote may change independently of + // term, but it's safe to update both in this + // case. + co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second); + _stats.store_term_and_vote++; + } + + if (batch.snp) { + auto& [snp, is_local, max_trailing_entries] = *batch.snp; + logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id); + // Persist the snapshot + co_await _persistence->store_snapshot_descriptor(snp, max_trailing_entries); + _snapshot_desc_idx = snp.idx; + _snapshot_desc_idx_changed.broadcast(); + _stats.store_snapshot++; + // If this is locally generated snapshot there is no need to + // load it. + if (!is_local) { + co_await _apply_entries.push_eventually(std::move(snp)); + } + } + + for (const auto& snp_id: batch.snps_to_drop) { + _state_machine->drop_snapshot(snp_id); + } + + if (batch.log_entries.size()) { + auto& entries = batch.log_entries; + + if (last_stable >= entries[0]->idx) { + co_await _persistence->truncate_log(entries[0]->idx); + _stats.truncate_persisted_log++; + } + + utils::get_local_injector().inject("store_log_entries/test-failure", + [] { throw std::runtime_error("store_log_entries/test-failure"); }); + + // Combine saving and truncating into one call? + // will require persistence to keep track of last idx + co_await _persistence->store_log_entries(entries); + + last_stable = (*entries.crbegin())->idx; + _stats.persisted_log_entries += entries.size(); + } + + // Update RPC server address mappings. Add servers which are joining + // the cluster according to the new configuration (obtained from the + // last_conf_idx). + // + // It should be done prior to sending the messages since the RPC + // module needs to know who should it send the messages to (actual + // network addresses of the joining servers). + rpc_config_diff rpc_diff; + if (batch.configuration) { + rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration); + for (const auto& addr: rpc_diff.joining) { + add_to_rpc_config(addr); + } + _rpc->on_configuration_change(rpc_diff.joining, {}); + } + + // After entries are persisted we can send messages. + for (auto&& m : batch.messages) { + try { + send_message(m.first, std::move(m.second)); + } catch(...) { + // Not being able to send a message is not a critical error + logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception()); + } + } + + if (batch.configuration) { + for (const auto& addr: rpc_diff.leaving) { + abort_snapshot_transfer(addr.id); + remove_from_rpc_config(addr); + } + _rpc->on_configuration_change({}, rpc_diff.leaving); + } + + // Process committed entries. + if (batch.committed.size()) { + if (_non_joint_conf_commit_promise) { + for (const auto& e: batch.committed) { + const auto* cfg = get_if(&e->data); + if (cfg != nullptr && !cfg->is_joint()) { + std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value(); + break; + } + } + } + co_await _persistence->store_commit_idx(batch.committed.back()->idx); + _stats.queue_entries_for_apply += batch.committed.size(); + co_await _apply_entries.push_eventually(std::move(batch.committed)); + } + + if (batch.max_read_id_with_quorum) { + while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) { + _reads.front().promise.set_value(_reads.front().idx); + _reads.pop_front(); + } + } + if (!_fsm->is_leader()) { + if (_stepdown_promise) { + std::exchange(_stepdown_promise, std::nullopt)->set_value(); + } + if (!_current_rpc_config.contains(_id)) { + // - It's important we push this after we pushed committed entries above. It + // will cause `applier_fiber` to drop waiters, which should be done after we + // notify all waiters for entries committed in this batch. + // - This may happen multiple times if `io_fiber` gets multiple batches when + // we're outside the configuration, but it should eventually (and generally + // quickly) stop happening (we're outside the config after all). + co_await _apply_entries.push_eventually(removed_from_config{}); + } + // request aborts of snapshot transfers + abort_snapshot_transfers(); + // abort all read barriers + for (auto& r : _reads) { + r.promise.set_value(not_a_leader{_fsm->current_leader()}); + } + _reads.clear(); + } else if (batch.abort_leadership_transfer) { + if (_stepdown_promise) { + std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out")); + } + } + if (_leader_promise && _fsm->current_leader()) { + std::exchange(_leader_promise, std::nullopt)->set_value(); + } +} + +future<> server_impl::process_server_requests(server_requests&& requests) { + if (requests.snapshot) { + co_await _apply_entries.push_eventually(trigger_snapshot_msg{}); + } +} + future<> server_impl::io_fiber(index_t last_stable) { logger.trace("[{}] io_fiber start", _id); try { while (true) { - auto batch = co_await _fsm->poll_output(); + bool has_fsm_output = false; + bool has_server_request = false; + co_await _events.when([this, &has_fsm_output, &has_server_request] { + has_fsm_output = _fsm->has_output(); + has_server_request = !_new_server_requests.empty(); + return has_fsm_output || has_server_request; + }); + _stats.polls++; - if (batch.term_and_vote) { - // Current term and vote are always persisted - // together. A vote may change independently of - // term, but it's safe to update both in this - // case. - co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second); - _stats.store_term_and_vote++; + if (has_fsm_output) { + auto batch = _fsm->get_output(); + co_await process_fsm_output(last_stable, std::move(batch)); } - if (batch.snp) { - auto& [snp, is_local] = *batch.snp; - logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id); - // Persist the snapshot - co_await _persistence->store_snapshot_descriptor(snp, is_local ? _config.snapshot_trailing : 0); - _stats.store_snapshot++; - // If this is locally generated snapshot there is no need to - // load it. - if (!is_local) { - co_await _apply_entries.push_eventually(std::move(snp)); - } - } - - for (const auto& snp_id: batch.snps_to_drop) { - _state_machine->drop_snapshot(snp_id); - } - - if (batch.log_entries.size()) { - auto& entries = batch.log_entries; - - if (last_stable >= entries[0]->idx) { - co_await _persistence->truncate_log(entries[0]->idx); - _stats.truncate_persisted_log++; - } - - utils::get_local_injector().inject("store_log_entries/test-failure", - [] { throw std::runtime_error("store_log_entries/test-failure"); }); - - // Combine saving and truncating into one call? - // will require persistence to keep track of last idx - co_await _persistence->store_log_entries(entries); - - last_stable = (*entries.crbegin())->idx; - _stats.persisted_log_entries += entries.size(); - } - - // Update RPC server address mappings. Add servers which are joining - // the cluster according to the new configuration (obtained from the - // last_conf_idx). - // - // It should be done prior to sending the messages since the RPC - // module needs to know who should it send the messages to (actual - // network addresses of the joining servers). - rpc_config_diff rpc_diff; - if (batch.configuration) { - rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration); - for (const auto& addr: rpc_diff.joining) { - add_to_rpc_config(addr); - } - _rpc->on_configuration_change(rpc_diff.joining, {}); - } - - // After entries are persisted we can send messages. - for (auto&& m : batch.messages) { - try { - send_message(m.first, std::move(m.second)); - } catch(...) { - // Not being able to send a message is not a critical error - logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception()); - } - } - - if (batch.configuration) { - for (const auto& addr: rpc_diff.leaving) { - abort_snapshot_transfer(addr.id); - remove_from_rpc_config(addr); - } - _rpc->on_configuration_change({}, rpc_diff.leaving); - } - - // Process committed entries. - if (batch.committed.size()) { - if (_non_joint_conf_commit_promise) { - for (const auto& e: batch.committed) { - const auto* cfg = get_if(&e->data); - if (cfg != nullptr && !cfg->is_joint()) { - std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value(); - break; - } - } - } - co_await _persistence->store_commit_idx(batch.committed.back()->idx); - _stats.queue_entries_for_apply += batch.committed.size(); - co_await _apply_entries.push_eventually(std::move(batch.committed)); - } - - if (batch.max_read_id_with_quorum) { - while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) { - _reads.front().promise.set_value(_reads.front().idx); - _reads.pop_front(); - } - } - if (!_fsm->is_leader()) { - if (_stepdown_promise) { - std::exchange(_stepdown_promise, std::nullopt)->set_value(); - } - if (!_current_rpc_config.contains(_id)) { - // - It's important we push this after we pushed committed entries above. It - // will cause `applier_fiber` to drop waiters, which should be done after we - // notify all waiters for entries committed in this batch. - // - This may happen multiple times if `io_fiber` gets multiple batches when - // we're outside the configuration, but it should eventually (and generally - // quickly) stop happening (we're outside the config after all). - co_await _apply_entries.push_eventually(removed_from_config{}); - } - // request aborts of snapshot transfers - abort_snapshot_transfers(); - // abort all read barriers - for (auto& r : _reads) { - r.promise.set_value(not_a_leader{_fsm->current_leader()}); - } - _reads.clear(); - } else if (batch.abort_leadership_transfer) { - if (_stepdown_promise) { - std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out")); - } - } - if (_leader_promise && _fsm->current_leader()) { - std::exchange(_leader_promise, std::nullopt)->set_value(); + if (has_server_request) { + auto requests = std::exchange(_new_server_requests, server_requests{}); + co_await process_server_requests(std::move(requests)); } } } catch (seastar::broken_condition_variable&) { @@ -1216,6 +1321,23 @@ future<> server_impl::applier_fiber() { // it may never know the status of entries it submitted. drop_waiters(); co_return; + }, + [this] (const trigger_snapshot_msg&) -> future<> { + auto applied_term = _fsm->log_term_for(_applied_idx); + // last truncation index <= snapshot index <= applied index + assert(applied_term); + + snapshot_descriptor snp; + snp.term = *applied_term; + snp.idx = _applied_idx; + snp.config = _fsm->log_last_conf_for(_applied_idx); + logger.trace("[{}] taking snapshot at term={}, idx={} due to request", _id, snp.term, snp.idx); + snp.id = co_await _state_machine->take_snapshot(); + if (!_fsm->apply_snapshot(snp, 0, 0, true)) { + logger.trace("[{}] while taking snapshot term={} idx={} id={} due to request," + " fsm received a later snapshot at idx={}", _id, snp.term, snp.idx, snp.id, _fsm->log_last_snapshot_idx()); + } + _stats.snapshots_taken++; } ), v); @@ -1364,6 +1486,8 @@ future<> server_impl::abort(sstring reason) { _aborted = std::move(reason); logger.trace("[{}]: abort() called", _id); _fsm->stop(); + _events.broken(); + _snapshot_desc_idx_changed.broken(); // IO and applier fibers may update waiters and start new snapshot // transfers, so abort them first diff --git a/raft/server.hh b/raft/server.hh index a5da9b29ff..979f5de67c 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -224,6 +224,22 @@ public: // of two servers iff their IDs are different. virtual void register_metrics() = 0; + // Manually trigger snapshot creation and log truncation. + // + // Does nothing if the current apply index is less or equal to the last persisted snapshot descriptor index + // and returns `false`. + // + // Otherwise returns `true`; when the future resolves, it is guaranteed that the snapshot descriptor + // is persisted, but not that the snapshot is loaded to the state machine yet (it will be eventually). + // + // The request may be resolved by the regular snapshotting mechanisms (e.g. a snapshot + // is created because the Raft log grows too large). In this case there is no guarantee + // how many trailing entries will be left trailing behind the snapshot. However, + // if there are no operations running on the server concurrently with the request and all + // committed entries are already applied, the created snapshot is guaranteed to leave + // zero trailing entries. + virtual future trigger_snapshot(seastar::abort_source* as) = 0; + // Ad hoc functions for testing virtual void wait_until_candidate() = 0; virtual future<> wait_election_done() = 0; diff --git a/test/raft/etcd_test.cc b/test/raft/etcd_test.cc index 60fdb495f9..cde64a8513 100644 --- a/test/raft/etcd_test.cc +++ b/test/raft/etcd_test.cc @@ -298,7 +298,7 @@ BOOST_AUTO_TEST_CASE(test_vote_from_any_state) { server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)}; raft::configuration cfg = config_from_ids({id1, id2, id3}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd, fsm_cfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), fd, fsm_cfg); // Follower BOOST_CHECK(fsm.is_follower()); @@ -360,7 +360,7 @@ BOOST_AUTO_TEST_CASE(test_log_replication_1) { server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)}; raft::configuration cfg = config_from_ids({id1, id2, id3}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); election_timeout(fsm); BOOST_CHECK(fsm.is_candidate()); @@ -425,7 +425,7 @@ BOOST_AUTO_TEST_CASE(test_log_replication_2) { server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)}; raft::configuration cfg = config_from_ids({id1, id2, id3}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); election_timeout(fsm); output = fsm.get_output(); @@ -485,7 +485,7 @@ BOOST_AUTO_TEST_CASE(test_single_node_commit) { server_id id1{utils::UUID(0, 1)}; raft::configuration cfg = config_from_ids({id1}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); BOOST_CHECK(fsm.is_leader()); // Single node skips candidate state output = fsm.get_output(); @@ -578,11 +578,11 @@ BOOST_AUTO_TEST_CASE(test_dueling_candidates) { server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)}; raft::configuration cfg = config_from_ids({id1, id2, id3}); raft::log log1{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg); + fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg); raft::log log2{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg); + fsm_debug fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg); raft::log log3{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg); + fsm_debug fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg); // fsm1 and fsm3 don't see each other make_candidate(fsm1); @@ -621,11 +621,11 @@ BOOST_AUTO_TEST_CASE(test_dueling_pre_candidates) { server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)}; raft::configuration cfg = config_from_ids({id1, id2, id3}); raft::log log1{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre); + fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre); raft::log log2{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg_pre); + fsm_debug fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg_pre); raft::log log3{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg_pre); + fsm_debug fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg_pre); // fsm1 and fsm3 don't see each other make_candidate(fsm1); @@ -667,7 +667,7 @@ BOOST_AUTO_TEST_CASE(test_single_node_pre_candidate) { server_id id1{utils::UUID(0, 1)}; raft::configuration cfg = config_from_ids({id1}); raft::log log1{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre); + fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre); BOOST_CHECK(fsm1.is_leader()); } @@ -743,7 +743,7 @@ void handle_proposal(unsigned nodes, std::vector accepting_int) { raft::configuration cfg = config_from_ids(ids); raft::log log1{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm1(raft::server_id{utils::UUID(0, 1)}, term_t{}, server_id{}, std::move(log1), + fsm_debug fsm1(raft::server_id{utils::UUID(0, 1)}, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg); // promote 1 to become leader (i.e. gets votes) diff --git a/test/raft/fsm_test.cc b/test/raft/fsm_test.cc index 5d41b3bb67..e57640537c 100644 --- a/test/raft/fsm_test.cc +++ b/test/raft/fsm_test.cc @@ -304,7 +304,7 @@ void test_election_single_node_helper(raft::fsm_config fcfg) { server_id id1 = id(); raft::configuration cfg = config_from_ids({id1}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg); election_timeout(fsm); @@ -529,7 +529,7 @@ BOOST_AUTO_TEST_CASE(test_election_two_nodes_prevote) { raft::configuration cfg = config_from_ids({id1, id2}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg); // Initial state is follower BOOST_CHECK(fsm.is_follower()); @@ -595,7 +595,7 @@ BOOST_AUTO_TEST_CASE(test_election_four_nodes_prevote) { raft::configuration cfg = config_from_ids({id1, id2, id3, id4}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd, fcfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), fd, fcfg); // Initial state is follower BOOST_CHECK(fsm.is_follower()); @@ -652,7 +652,7 @@ BOOST_AUTO_TEST_CASE(test_log_matching_rule) { log.emplace_back(seastar::make_lw_shared(raft::log_entry{term_t{10}, index_t{1000}})); log.stable_to(log.last_idx()); - raft::fsm fsm(id1, term_t{10}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug fsm(id1, term_t{10}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); // Initial state is follower BOOST_CHECK(fsm.is_follower()); @@ -929,7 +929,7 @@ BOOST_AUTO_TEST_CASE(test_leader_stepdown) { {server_addr_from_id(id1), true}, {server_addr_from_id(id2), true}, {server_addr_from_id(id3), false}}); raft::log log(raft::snapshot_descriptor{.config = cfg}); - raft::fsm fsm(id1, term_t{1}, /* voted for */ server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug fsm(id1, term_t{1}, /* voted for */ server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); // Check that we move to candidate state on timeout_now message fsm.step(id2, raft::timeout_now{fsm.get_current_term()}); @@ -1034,7 +1034,7 @@ BOOST_AUTO_TEST_CASE(test_leader_stepdown) { {server_addr_from_id(id1), true}, {server_addr_from_id(id2), true}, {server_addr_from_id(id3), true}}); raft::log log2(raft::snapshot_descriptor{.config = cfg}); - raft::fsm fsm2(id1, term_t{1}, /* voted for */ server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg); + fsm_debug fsm2(id1, term_t{1}, /* voted for */ server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg); election_timeout(fsm2); // Turn to a leader @@ -1152,7 +1152,7 @@ BOOST_AUTO_TEST_CASE(test_confchange_a_to_b) { // A somewhat awkward way to obtain B's log for restart log.emplace_back(make_lw_shared(B.add_entry(config_from_ids({A_id})))); log.stable_to(log.last_idx()); - raft::fsm B_1(B_id, B.get_current_term(), B_id, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug B_1(B_id, B.get_current_term(), B_id, std::move(log), trivial_failure_detector, fsm_cfg); election_timeout(B_1); communicate(A, B_1); BOOST_CHECK(B_1.is_follower()); @@ -1469,7 +1469,7 @@ BOOST_AUTO_TEST_CASE(test_zero) { BOOST_AUTO_TEST_CASE(test_reordered_reject) { auto id1 = id(); - raft::fsm fsm1(id1, term_t{1}, server_id{}, + fsm_debug fsm1(id1, term_t{1}, server_id{}, raft::log{raft::snapshot_descriptor{.config = config_from_ids({id1})}}, trivial_failure_detector, fsm_cfg); @@ -1481,7 +1481,7 @@ BOOST_AUTO_TEST_CASE(test_reordered_reject) { (void)fsm1.get_output(); auto id2 = id(); - raft::fsm fsm2(id2, term_t{1}, server_id{}, + fsm_debug fsm2(id2, term_t{1}, server_id{}, raft::log{raft::snapshot_descriptor{.config = raft::configuration{}}}, trivial_failure_detector, fsm_cfg); diff --git a/test/raft/helpers.cc b/test/raft/helpers.cc index abce00f5f5..06a68ee6db 100644 --- a/test/raft/helpers.cc +++ b/test/raft/helpers.cc @@ -94,8 +94,8 @@ communicate_impl(std::function stop_pred, raft_routing_map& map) { has_traffic = false; for (auto e : map) { raft::fsm& from = *e.second; - bool has_output; - for (auto output = from.get_output(); !output.empty(); output = from.get_output()) { + for (bool has_output = from.has_output(); has_output; has_output = from.has_output()) { + auto output = from.get_output(); if (stop_pred()) { return; } diff --git a/test/raft/helpers.hh b/test/raft/helpers.hh index 3b23a17b16..01810f7d27 100644 --- a/test/raft/helpers.hh +++ b/test/raft/helpers.hh @@ -63,9 +63,20 @@ raft::command create_command(T val) { extern raft::fsm_config fsm_cfg; extern raft::fsm_config fsm_cfg_pre; -class fsm_debug : public raft::fsm { +struct sm_events_container { + seastar::condition_variable sm_events; +}; + +class fsm_debug : public sm_events_container, public raft::fsm { public: using raft::fsm::fsm; + + explicit fsm_debug(raft::server_id id, raft::term_t current_term, raft::server_id voted_for, raft::log log, + raft::failure_detector& failure_detector, raft::fsm_config conf) + : sm_events_container() + , fsm(id, current_term, voted_for, std::move(log), raft::index_t{0}, failure_detector, conf, sm_events) { + } + void become_follower(raft::server_id leader) { raft::fsm::become_follower(leader); } diff --git a/test/topology_custom/test_raft_snapshot_request.py b/test/topology_custom/test_raft_snapshot_request.py new file mode 100644 index 0000000000..4a0af6073b --- /dev/null +++ b/test/topology_custom/test_raft_snapshot_request.py @@ -0,0 +1,102 @@ +# +# Copyright (C) 2024-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# + +import asyncio +import pytest +import time +import logging + +from test.pylib.manager_client import ManagerClient +from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, read_barrier + + +logger = logging.getLogger(__name__) + + +async def get_raft_log_size(cql, host) -> int: + query = "select count(\"index\") from system.raft" + return (await cql.run_async(query, host=host))[0][0] + + +async def get_raft_snap_id(cql, host) -> str: + query = "select snapshot_id from system.raft_snapshots" + return (await cql.run_async(query, host=host))[0].snapshot_id + + +async def trigger_snapshot(manager: ManagerClient, group0_id: str, ip_addr) -> None: + await manager.api.client.post(f"/raft/trigger_snapshot/{group0_id}", host=ip_addr) + + +@pytest.mark.asyncio +async def test_raft_snapshot_request(manager: ManagerClient): + servers = [await manager.server_add() for _ in range(3)] + cql = manager.cql + assert(cql) + + s1 = servers[0] + h1 = (await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60))[0] + group0_id = (await cql.run_async( + "select value from system.scylla_local where key = 'raft_group0_id'", + host=h1))[0].value + + # Verify that snapshotting updates the snapshot ID and truncates the log. + log_size = await get_raft_log_size(cql, h1) + logger.info(f"Log size on {s1}: {log_size}") + snap_id = await get_raft_snap_id(cql, h1) + logger.info(f"Snapshot ID on {s1}: {snap_id}") + assert log_size > 0 + await trigger_snapshot(manager, group0_id, s1.ip_addr) + new_log_size = await get_raft_log_size(cql, h1) + logger.info(f"New log size on {s1}: {new_log_size}") + new_snap_id = await get_raft_snap_id(cql, h1) + logger.info(f"New snapshot ID on {s1}: {new_snap_id}") + assert new_log_size == 0 + assert new_snap_id != snap_id + + # If a server misses a command and a snapshot is created on the leader, + # the server once alive should eventually receive that snapshot. + s2 = servers[2] + h2 = (await wait_for_cql_and_get_hosts(cql, [s2], time.time() + 60))[0] + s2_log_size = await get_raft_log_size(cql, h2) + logger.info(f"Log size on {s2}: {s2_log_size}") + s2_snap_id = await get_raft_snap_id(cql, h2) + logger.info(f"Snapshot ID on {s2}: {s2_snap_id}") + await manager.server_stop_gracefully(s2.server_id) + logger.info(f"Stopped {s2}") + # Restarting the two servers will cause a newly elected leader to append a dummy command. + await asyncio.gather(*(manager.server_restart(s.server_id) for s in servers[:2])) + logger.info(f"Restarted {servers[:2]}") + # Wait for one server to append the command and do a read_barrier on the other + # to make sure both appended + async def appended_command() -> int | None: + await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60) + s = await get_raft_log_size(cql, h1) + if s > 0: + return s + return None + log_size = await wait_for(appended_command, time.time() + 60) + logger.info(f"{servers[0]} appended new command") + h = (await wait_for_cql_and_get_hosts(cql, [servers[1]], time.time() + 60))[0] + await read_barrier(cql, h) + logger.info(f"Read barrier done on {servers[1]}") + # We don't know who the leader is, so trigger a snapshot on both servers. + for s in servers[:2]: + await trigger_snapshot(manager, group0_id, s.ip_addr) + h = (await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0] + snap = await get_raft_snap_id(cql, h) + logger.info("New snapshot ID on {s}: {snap}") + await manager.server_start(s2.server_id) + logger.info(f"Server {s2} restarted") + await wait_for_cql_and_get_hosts(cql, [s2], time.time() + 60) + async def received_snapshot() -> str | None: + new_s2_snap_id = await get_raft_snap_id(cql, h2) + if s2_snap_id != new_s2_snap_id: + return new_s2_snap_id + return None + new_s2_snap_id = await wait_for(received_snapshot, time.time() + 60) + logger.info(f"{s2} received new snapshot: {new_s2_snap_id}") + new_s2_log_size = await get_raft_log_size(cql, h2) + assert new_s2_log_size == 0