diff --git a/api/api-doc/raft.json b/api/api-doc/raft.json new file mode 100644 index 0000000000..9673682069 --- /dev/null +++ b/api/api-doc/raft.json @@ -0,0 +1,43 @@ +{ + "apiVersion":"0.0.1", + "swaggerVersion":"1.2", + "basePath":"{{Protocol}}://{{Host}}", + "resourcePath":"/raft", + "produces":[ + "application/json" + ], + "apis":[ + { + "path":"/raft/trigger_snapshot/{group_id}", + "operations":[ + { + "method":"POST", + "summary":"Triggers snapshot creation and log truncation for the given Raft group", + "type":"string", + "nickname":"trigger_snapshot", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"group_id", + "description":"The ID of the group which should get snapshotted", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"path" + }, + { + "name":"timeout", + "description":"Timeout in seconds after which the endpoint returns a failure. If not provided, 60s is used.", + "required":false, + "allowMultiple":false, + "type":"long", + "paramType":"query" + } + ] + } + ] + } + ] +} diff --git a/api/api.cc b/api/api.cc index a9a0a881a0..b0242b09d5 100644 --- a/api/api.cc +++ b/api/api.cc @@ -31,6 +31,7 @@ #include "api/config.hh" #include "task_manager.hh" #include "task_manager_test.hh" +#include "raft.hh" logging::logger apilog("api"); @@ -277,6 +278,18 @@ future<> set_server_task_manager_test(http_context& ctx, lw_shared_ptr set_server_raft(http_context& ctx, sharded& raft_gr) { + auto rb = std::make_shared(ctx.api_doc); + return ctx.http_server.set_routes([rb, &ctx, &raft_gr] (routes& r) { + rb->register_function(r, "raft", "The Raft API"); + set_raft(ctx, r, raft_gr); + }); +} + +future<> unset_server_raft(http_context& ctx) { + return ctx.http_server.set_routes([&ctx] (routes& r) { unset_raft(ctx, r); }); +} + void req_params::process(const request& req) { // Process mandatory parameters for (auto& [name, ent] : params) { diff --git a/api/api_init.hh b/api/api_init.hh index fd50649d45..7cb42ca7b1 100644 --- a/api/api_init.hh +++ b/api/api_init.hh @@ -22,6 +22,7 @@ namespace service { class load_meter; class storage_proxy; class storage_service; +class raft_group_registry; } // namespace service @@ -116,5 +117,7 @@ future<> set_server_compaction_manager(http_context& ctx); future<> set_server_done(http_context& ctx); future<> set_server_task_manager(http_context& ctx); future<> set_server_task_manager_test(http_context& ctx, lw_shared_ptr cfg); +future<> set_server_raft(http_context&, sharded&); +future<> unset_server_raft(http_context&); } diff --git a/api/raft.cc b/api/raft.cc new file mode 100644 index 0000000000..eab13889dd --- /dev/null +++ b/api/raft.cc @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include + +#include "api/api.hh" +#include "api/api-doc/raft.json.hh" + +#include "service/raft/raft_group_registry.hh" + +using namespace seastar::httpd; + +extern logging::logger apilog; + +namespace api { + +namespace r = httpd::raft_json; +using namespace json; + +void set_raft(http_context&, httpd::routes& r, sharded& raft_gr) { + r::trigger_snapshot.set(r, [&raft_gr] (std::unique_ptr req) -> future { + raft::group_id gid{utils::UUID{req->param["group_id"]}}; + auto timeout_dur = std::invoke([timeout_str = req->get_query_param("timeout")] { + if (timeout_str.empty()) { + return std::chrono::seconds{60}; + } + auto dur = std::stoll(timeout_str); + if (dur <= 0) { + throw std::runtime_error{"Timeout must be a positive number."}; + } + return std::chrono::seconds{dur}; + }); + + std::atomic found_srv{false}; + co_await raft_gr.invoke_on_all([gid, timeout_dur, &found_srv] (service::raft_group_registry& raft_gr) -> future<> { + auto* srv = raft_gr.find_server(gid); + if (!srv) { + co_return; + } + + found_srv = true; + abort_on_expiry aoe(lowres_clock::now() + timeout_dur); + apilog.info("Triggering Raft group {} snapshot", gid); + auto result = co_await srv->trigger_snapshot(&aoe.abort_source()); + if (result) { + apilog.info("New snapshot for Raft group {} created", gid); + } else { + apilog.info("Could not create new snapshot for Raft group {}, no new entries applied", gid); + } + }); + + if (!found_srv) { + throw std::runtime_error{fmt::format("Server for group ID {} not found", gid)}; + } + + co_return json_void{}; + }); +} + +void unset_raft(http_context&, httpd::routes& r) { + r::trigger_snapshot.unset(r); +} + +} + diff --git a/api/raft.hh b/api/raft.hh new file mode 100644 index 0000000000..e514a1bc22 --- /dev/null +++ b/api/raft.hh @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2023-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "api_init.hh" + +namespace api { + +void set_raft(http_context& ctx, httpd::routes& r, sharded& raft_gr); +void unset_raft(http_context& ctx, httpd::routes& r); + +} diff --git a/configure.py b/configure.py index fde40fa8e5..38c8a67d9d 100755 --- a/configure.py +++ b/configure.py @@ -1077,6 +1077,8 @@ api = ['api/api.cc', Json2Code('api/api-doc/error_injection.json'), 'api/authorization_cache.cc', Json2Code('api/api-doc/authorization_cache.json'), + 'api/raft.cc', + Json2Code('api/api-doc/raft.json'), ] alternator = [ diff --git a/main.cc b/main.cc index aa6d19618a..08668003ac 100644 --- a/main.cc +++ b/main.cc @@ -1164,12 +1164,18 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }).get(); cfg->host_id = sys_ks.local().load_local_host_id().get0(); + std::any stop_raft_api; if (raft_gr.local().is_enabled()) { auto my_raft_id = raft::server_id{cfg->host_id.uuid()}; supervisor::notify("starting Raft Group Registry service"); raft_gr.invoke_on_all([my_raft_id] (service::raft_group_registry& raft_gr) { return raft_gr.start(my_raft_id); }).get(); + + api::set_server_raft(ctx, raft_gr).get(); + stop_raft_api = defer_verbose_shutdown("Raft API", [&ctx] { + api::unset_server_raft(ctx).get(); + }); } else { if (cfg->check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) { startlog.error("Bad configuration: RAFT feature has to be enabled if BROADCAST_TABLES is enabled"); @@ -1177,7 +1183,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl } } - group0_client.init().get(); db::sstables_format_selector sst_format_selector(gossiper.local(), feature_service, db); diff --git a/raft/fsm.cc b/raft/fsm.cc index a86d77d2d1..339a5a570d 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -19,9 +19,10 @@ leader::~leader() { } fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, - index_t commit_idx, failure_detector& failure_detector, fsm_config config) : + index_t commit_idx, failure_detector& failure_detector, fsm_config config, + seastar::condition_variable& sm_events) : _my_id(id), _current_term(current_term), _voted_for(voted_for), - _log(std::move(log)), _failure_detector(failure_detector), _config(config) { + _log(std::move(log)), _failure_detector(failure_detector), _config(config), _sm_events(sm_events) { if (id == raft::server_id{}) { throw std::invalid_argument("raft::fsm: raft instance cannot have id zero"); } @@ -41,10 +42,6 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, } } -fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, - failure_detector& failure_detector, fsm_config config) : - fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {} - future> fsm::wait_for_memory_permit(seastar::abort_source* as, size_t size) { check_is_leader(); @@ -296,20 +293,14 @@ void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) { } } -future fsm::poll_output() { - logger.trace("fsm::poll_output() {} stable index: {} last index: {}", +bool fsm::has_output() const { + logger.trace("fsm::has_output() {} stable index: {} last index: {}", _my_id, _log.stable_idx(), _log.last_idx()); - while (true) { - auto diff = _log.last_idx() - _log.stable_idx(); + auto diff = _log.last_idx() - _log.stable_idx(); - if (diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum || - (is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty()) { - break; - } - co_await _sm_events.wait(); - } - co_return get_output(); + return diff > 0 || !_messages.empty() || !_observed.is_equal(*this) || _output.max_read_id_with_quorum + || (is_leader() && leader_state().last_read_id_changed) || _output.snp || !_output.snps_to_drop.empty(); } fsm_output fsm::get_output() { @@ -1019,7 +1010,7 @@ bool fsm::apply_snapshot(snapshot_descriptor snp, size_t max_trailing_entries, s // If the snapshot is local, _commit_idx is larger than snp.idx. // Otherwise snp.idx becomes the new commit index. _commit_idx = std::max(_commit_idx, snp.idx); - _output.snp.emplace(fsm_output::applied_snapshot{snp, local}); + _output.snp.emplace(fsm_output::applied_snapshot{snp, local, max_trailing_entries}); size_t units = _log.apply_snapshot(std::move(snp), max_trailing_entries, max_trailing_bytes); if (is_leader()) { logger.trace("apply_snapshot[{}]: signal {} available units", _my_id, units); @@ -1132,7 +1123,6 @@ void fsm::stop() { // (in particular, abort waits on log_limiter_semaphore and prevent new ones). become_follower({}); } - _sm_events.broken(); } std::ostream& operator<<(std::ostream& os, const fsm& f) { diff --git a/raft/fsm.hh b/raft/fsm.hh index 2688a07fe0..0f0bfe464f 100644 --- a/raft/fsm.hh +++ b/raft/fsm.hh @@ -21,6 +21,11 @@ struct fsm_output { struct applied_snapshot { snapshot_descriptor snp; bool is_local; + + // Always 0 for non-local snapshots. + size_t max_trailing_entries; + + // FIXME: include max_trailing_bytes here and in store_snapshot_descriptor }; std::optional> term_and_vote; std::vector log_entries; @@ -36,14 +41,6 @@ struct fsm_output { std::optional max_read_id_with_quorum; // Set to true if a leadership transfer was aborted since the last output bool abort_leadership_transfer; - - // True if there is no new output - bool empty() const { - return !term_and_vote && - log_entries.size() == 0 && messages.size() == 0 && - committed.size() == 0 && !snp && snps_to_drop.empty() && - !configuration; - } }; struct fsm_config { @@ -136,9 +133,13 @@ struct leader { // in-memory state machine with a catch-all API step(message) // method. The method handles any kind of input and performs the // needed state machine state transitions. To get state machine output -// poll_output() function has to be called. This call produces an output +// get_output() function has to be called. To check first if +// any new output is present, call has_output(). To wait for new +// new output events, use the sm_events condition variable passed +// to fsm constructor; fs` signals it each time new output may appear. +// The get_output() call produces an output // object, which encapsulates a list of actions that must be -// performed until the next poll_output() call can be made. The time is +// performed until the next get_output() call can be made. The time is // represented with a logical timer. The client is responsible for // periodically invoking tick() method, which advances the state // machine time and allows it to track such events as election or @@ -226,7 +227,7 @@ private: std::vector> _messages; // Signaled when there is a IO event to process. - seastar::condition_variable _sm_events; + seastar::condition_variable& _sm_events; // Called when one of the replicas advances its match index // so it may be the case that some entries are committed now. @@ -338,10 +339,8 @@ protected: // For testing public: explicit fsm(server_id id, term_t current_term, server_id voted_for, log log, - index_t commit_idx, failure_detector& failure_detector, fsm_config conf); - - explicit fsm(server_id id, term_t current_term, server_id voted_for, log log, - failure_detector& failure_detector, fsm_config conf); + index_t commit_idx, failure_detector& failure_detector, fsm_config conf, + seastar::condition_variable& sm_events); bool is_leader() const { return std::holds_alternative(_state); @@ -409,12 +408,9 @@ public: // committed to the persistent Raft log afterwards. template const log_entry& add_entry(T command); - // Wait until there is, and return state machine output that - // needs to be handled. - // This includes a list of the entries that need - // to be logged. The logged entries are eventually - // discarded from the state machine after applying a snapshot. - future poll_output(); + // Check if there is any state machine output + // that `get_output()` will return. + bool has_output() const; // Get state machine output, if there is any. Doesn't // wait. It is public for use in testing. @@ -427,7 +423,7 @@ public: // Feed one Raft RPC message into the state machine. // Advances the state machine state and generates output, - // accessible via poll_output(). + // accessible via get_output(). template void step(server_id from, Message&& msg); diff --git a/raft/server.cc b/raft/server.cc index 39a850c2c0..7257ab9fcd 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -98,6 +98,8 @@ public: future add_entry_on_leader(command command, seastar::abort_source* as); void register_metrics() override; private: + seastar::condition_variable _events; + std::unique_ptr _rpc; std::unique_ptr _state_machine; std::unique_ptr _persistence; @@ -112,6 +114,8 @@ private: std::optional _non_joint_conf_commit_promise; // Index of the last entry applied to `_state_machine`. index_t _applied_idx; + // Index of the last persisted snapshot descriptor. + index_t _snapshot_desc_idx; std::list _reads; std::multimap _awaited_indexes; @@ -121,13 +125,20 @@ private: // Signaled when apply index is changed condition_variable _applied_index_changed; + // Signaled when _snapshot_desc_idx is changed + condition_variable _snapshot_desc_idx_changed; + struct stop_apply_fiber{}; // exception to send when apply fiber is needs to be stopepd struct removed_from_config{}; // sent to applier_fiber when we're not a leader and we're outside the current configuration + + struct trigger_snapshot_msg{}; + using applier_fiber_message = std::variant< std::vector, snapshot_descriptor, - removed_from_config>; + removed_from_config, + trigger_snapshot_msg>; queue _apply_entries = queue(10); struct stats { @@ -201,6 +212,16 @@ private: }; absl::flat_hash_map _append_request_status; + struct server_requests { + bool snapshot = false; + + bool empty() const { + return !snapshot; + } + }; + + server_requests _new_server_requests; + // Called to commit entries (on a leader or otherwise). void notify_waiters(std::map& waiters, const std::vector& entries); @@ -212,10 +233,15 @@ private: // to be applied. void signal_applied(); - // This fiber processes FSM output by doing the following steps in order: + // Processes FSM output by doing the following steps in order: // - persist the current term and vote // - persist unstable log entries on disk. // - send out messages + future<> process_fsm_output(index_t& stable_idx, fsm_output&&); + + future<> process_server_requests(server_requests&&); + + // Processes new FSM outputs and server requests as they appear. future<> io_fiber(index_t stable_idx); // This fiber runs in the background and applies committed entries. @@ -265,6 +291,8 @@ private: // A helper to wait for a leader to get elected future<> wait_for_leader(seastar::abort_source* as); + virtual future trigger_snapshot(seastar::abort_source* as) override; + // Get "safe to read" index from a leader future get_read_idx(server_id leader, seastar::abort_source* as); // Wait for an entry with a specific term to get committed or @@ -337,12 +365,14 @@ future<> server_impl::start() { .append_request_threshold = _config.append_request_threshold, .max_log_size = _config.max_log_size, .enable_prevoting = _config.enable_prevoting - }); + }, + _events); _applied_idx = index_t{0}; + _snapshot_desc_idx = index_t{0}; if (snapshot.id) { co_await _state_machine->load_snapshot(snapshot.id); - _applied_idx = snapshot.idx; + _snapshot_desc_idx = _applied_idx = snapshot.idx; } if (!rpc_config.current.empty()) { @@ -403,6 +433,54 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) { } } +future server_impl::trigger_snapshot(seastar::abort_source* as) { + check_not_aborted(); + + if (_applied_idx <= _snapshot_desc_idx) { + logger.debug( + "[{}] trigger_snapshot: last persisted snapshot descriptor index is up-to-date" + ", applied index: {}, persisted snapshot descriptor index: {}, last fsm log index: {}" + ", last fsm snapshot index: {}", _id, _applied_idx, _snapshot_desc_idx, + _fsm->log_last_idx(), _fsm->log_last_snapshot_idx()); + co_return false; + } + + _new_server_requests.snapshot = true; + _events.signal(); + + // Wait for persisted snapshot index to catch up to this index. + auto awaited_idx = _applied_idx; + + logger.debug("[{}] snapshot request waiting for index {}", _id, awaited_idx); + + try { + optimized_optional sub; + if (as) { + as->check(); + sub = as->subscribe([this] () noexcept { _snapshot_desc_idx_changed.broadcast(); }); + assert(sub); // due to `check()` above + } + co_await _snapshot_desc_idx_changed.when([this, as, awaited_idx] { + return (as && as->abort_requested()) || awaited_idx <= _snapshot_desc_idx; + }); + if (as) { + as->check(); + } + } catch (abort_requested_exception&) { + throw request_aborted(); + } catch (seastar::broken_condition_variable&) { + throw request_aborted(); + } + + logger.debug( + "[{}] snapshot request satisfied, awaited index {}, persisted snapshot descriptor index: {}" + ", current applied index {}, last fsm log index {}, last fsm snapshot index {}", + _id, awaited_idx, _snapshot_desc_idx, _applied_idx, + _fsm->log_last_idx(), _fsm->log_last_snapshot_idx()); + + co_return true; +} + future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abort_source* as) { // The entry may have been already committed and even applied // in case it was forwarded to the leader. In this case @@ -917,141 +995,168 @@ static rpc_config_diff diff_address_sets(const server_address_set& prev, const c return result; } +future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batch) { + if (batch.term_and_vote) { + // Current term and vote are always persisted + // together. A vote may change independently of + // term, but it's safe to update both in this + // case. + co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second); + _stats.store_term_and_vote++; + } + + if (batch.snp) { + auto& [snp, is_local, max_trailing_entries] = *batch.snp; + logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id); + // Persist the snapshot + co_await _persistence->store_snapshot_descriptor(snp, max_trailing_entries); + _snapshot_desc_idx = snp.idx; + _snapshot_desc_idx_changed.broadcast(); + _stats.store_snapshot++; + // If this is locally generated snapshot there is no need to + // load it. + if (!is_local) { + co_await _apply_entries.push_eventually(std::move(snp)); + } + } + + for (const auto& snp_id: batch.snps_to_drop) { + _state_machine->drop_snapshot(snp_id); + } + + if (batch.log_entries.size()) { + auto& entries = batch.log_entries; + + if (last_stable >= entries[0]->idx) { + co_await _persistence->truncate_log(entries[0]->idx); + _stats.truncate_persisted_log++; + } + + utils::get_local_injector().inject("store_log_entries/test-failure", + [] { throw std::runtime_error("store_log_entries/test-failure"); }); + + // Combine saving and truncating into one call? + // will require persistence to keep track of last idx + co_await _persistence->store_log_entries(entries); + + last_stable = (*entries.crbegin())->idx; + _stats.persisted_log_entries += entries.size(); + } + + // Update RPC server address mappings. Add servers which are joining + // the cluster according to the new configuration (obtained from the + // last_conf_idx). + // + // It should be done prior to sending the messages since the RPC + // module needs to know who should it send the messages to (actual + // network addresses of the joining servers). + rpc_config_diff rpc_diff; + if (batch.configuration) { + rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration); + for (const auto& addr: rpc_diff.joining) { + add_to_rpc_config(addr); + } + _rpc->on_configuration_change(rpc_diff.joining, {}); + } + + // After entries are persisted we can send messages. + for (auto&& m : batch.messages) { + try { + send_message(m.first, std::move(m.second)); + } catch(...) { + // Not being able to send a message is not a critical error + logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception()); + } + } + + if (batch.configuration) { + for (const auto& addr: rpc_diff.leaving) { + abort_snapshot_transfer(addr.id); + remove_from_rpc_config(addr); + } + _rpc->on_configuration_change({}, rpc_diff.leaving); + } + + // Process committed entries. + if (batch.committed.size()) { + if (_non_joint_conf_commit_promise) { + for (const auto& e: batch.committed) { + const auto* cfg = get_if(&e->data); + if (cfg != nullptr && !cfg->is_joint()) { + std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value(); + break; + } + } + } + co_await _persistence->store_commit_idx(batch.committed.back()->idx); + _stats.queue_entries_for_apply += batch.committed.size(); + co_await _apply_entries.push_eventually(std::move(batch.committed)); + } + + if (batch.max_read_id_with_quorum) { + while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) { + _reads.front().promise.set_value(_reads.front().idx); + _reads.pop_front(); + } + } + if (!_fsm->is_leader()) { + if (_stepdown_promise) { + std::exchange(_stepdown_promise, std::nullopt)->set_value(); + } + if (!_current_rpc_config.contains(_id)) { + // - It's important we push this after we pushed committed entries above. It + // will cause `applier_fiber` to drop waiters, which should be done after we + // notify all waiters for entries committed in this batch. + // - This may happen multiple times if `io_fiber` gets multiple batches when + // we're outside the configuration, but it should eventually (and generally + // quickly) stop happening (we're outside the config after all). + co_await _apply_entries.push_eventually(removed_from_config{}); + } + // request aborts of snapshot transfers + abort_snapshot_transfers(); + // abort all read barriers + for (auto& r : _reads) { + r.promise.set_value(not_a_leader{_fsm->current_leader()}); + } + _reads.clear(); + } else if (batch.abort_leadership_transfer) { + if (_stepdown_promise) { + std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out")); + } + } + if (_leader_promise && _fsm->current_leader()) { + std::exchange(_leader_promise, std::nullopt)->set_value(); + } +} + +future<> server_impl::process_server_requests(server_requests&& requests) { + if (requests.snapshot) { + co_await _apply_entries.push_eventually(trigger_snapshot_msg{}); + } +} + future<> server_impl::io_fiber(index_t last_stable) { logger.trace("[{}] io_fiber start", _id); try { while (true) { - auto batch = co_await _fsm->poll_output(); + bool has_fsm_output = false; + bool has_server_request = false; + co_await _events.when([this, &has_fsm_output, &has_server_request] { + has_fsm_output = _fsm->has_output(); + has_server_request = !_new_server_requests.empty(); + return has_fsm_output || has_server_request; + }); + _stats.polls++; - if (batch.term_and_vote) { - // Current term and vote are always persisted - // together. A vote may change independently of - // term, but it's safe to update both in this - // case. - co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second); - _stats.store_term_and_vote++; + if (has_fsm_output) { + auto batch = _fsm->get_output(); + co_await process_fsm_output(last_stable, std::move(batch)); } - if (batch.snp) { - auto& [snp, is_local] = *batch.snp; - logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id); - // Persist the snapshot - co_await _persistence->store_snapshot_descriptor(snp, is_local ? _config.snapshot_trailing : 0); - _stats.store_snapshot++; - // If this is locally generated snapshot there is no need to - // load it. - if (!is_local) { - co_await _apply_entries.push_eventually(std::move(snp)); - } - } - - for (const auto& snp_id: batch.snps_to_drop) { - _state_machine->drop_snapshot(snp_id); - } - - if (batch.log_entries.size()) { - auto& entries = batch.log_entries; - - if (last_stable >= entries[0]->idx) { - co_await _persistence->truncate_log(entries[0]->idx); - _stats.truncate_persisted_log++; - } - - utils::get_local_injector().inject("store_log_entries/test-failure", - [] { throw std::runtime_error("store_log_entries/test-failure"); }); - - // Combine saving and truncating into one call? - // will require persistence to keep track of last idx - co_await _persistence->store_log_entries(entries); - - last_stable = (*entries.crbegin())->idx; - _stats.persisted_log_entries += entries.size(); - } - - // Update RPC server address mappings. Add servers which are joining - // the cluster according to the new configuration (obtained from the - // last_conf_idx). - // - // It should be done prior to sending the messages since the RPC - // module needs to know who should it send the messages to (actual - // network addresses of the joining servers). - rpc_config_diff rpc_diff; - if (batch.configuration) { - rpc_diff = diff_address_sets(get_rpc_config(), *batch.configuration); - for (const auto& addr: rpc_diff.joining) { - add_to_rpc_config(addr); - } - _rpc->on_configuration_change(rpc_diff.joining, {}); - } - - // After entries are persisted we can send messages. - for (auto&& m : batch.messages) { - try { - send_message(m.first, std::move(m.second)); - } catch(...) { - // Not being able to send a message is not a critical error - logger.debug("[{}] io_fiber failed to send a message to {}: {}", _id, m.first, std::current_exception()); - } - } - - if (batch.configuration) { - for (const auto& addr: rpc_diff.leaving) { - abort_snapshot_transfer(addr.id); - remove_from_rpc_config(addr); - } - _rpc->on_configuration_change({}, rpc_diff.leaving); - } - - // Process committed entries. - if (batch.committed.size()) { - if (_non_joint_conf_commit_promise) { - for (const auto& e: batch.committed) { - const auto* cfg = get_if(&e->data); - if (cfg != nullptr && !cfg->is_joint()) { - std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_value(); - break; - } - } - } - co_await _persistence->store_commit_idx(batch.committed.back()->idx); - _stats.queue_entries_for_apply += batch.committed.size(); - co_await _apply_entries.push_eventually(std::move(batch.committed)); - } - - if (batch.max_read_id_with_quorum) { - while (!_reads.empty() && _reads.front().id <= batch.max_read_id_with_quorum) { - _reads.front().promise.set_value(_reads.front().idx); - _reads.pop_front(); - } - } - if (!_fsm->is_leader()) { - if (_stepdown_promise) { - std::exchange(_stepdown_promise, std::nullopt)->set_value(); - } - if (!_current_rpc_config.contains(_id)) { - // - It's important we push this after we pushed committed entries above. It - // will cause `applier_fiber` to drop waiters, which should be done after we - // notify all waiters for entries committed in this batch. - // - This may happen multiple times if `io_fiber` gets multiple batches when - // we're outside the configuration, but it should eventually (and generally - // quickly) stop happening (we're outside the config after all). - co_await _apply_entries.push_eventually(removed_from_config{}); - } - // request aborts of snapshot transfers - abort_snapshot_transfers(); - // abort all read barriers - for (auto& r : _reads) { - r.promise.set_value(not_a_leader{_fsm->current_leader()}); - } - _reads.clear(); - } else if (batch.abort_leadership_transfer) { - if (_stepdown_promise) { - std::exchange(_stepdown_promise, std::nullopt)->set_exception(timeout_error("Stepdown process timed out")); - } - } - if (_leader_promise && _fsm->current_leader()) { - std::exchange(_leader_promise, std::nullopt)->set_value(); + if (has_server_request) { + auto requests = std::exchange(_new_server_requests, server_requests{}); + co_await process_server_requests(std::move(requests)); } } } catch (seastar::broken_condition_variable&) { @@ -1216,6 +1321,23 @@ future<> server_impl::applier_fiber() { // it may never know the status of entries it submitted. drop_waiters(); co_return; + }, + [this] (const trigger_snapshot_msg&) -> future<> { + auto applied_term = _fsm->log_term_for(_applied_idx); + // last truncation index <= snapshot index <= applied index + assert(applied_term); + + snapshot_descriptor snp; + snp.term = *applied_term; + snp.idx = _applied_idx; + snp.config = _fsm->log_last_conf_for(_applied_idx); + logger.trace("[{}] taking snapshot at term={}, idx={} due to request", _id, snp.term, snp.idx); + snp.id = co_await _state_machine->take_snapshot(); + if (!_fsm->apply_snapshot(snp, 0, 0, true)) { + logger.trace("[{}] while taking snapshot term={} idx={} id={} due to request," + " fsm received a later snapshot at idx={}", _id, snp.term, snp.idx, snp.id, _fsm->log_last_snapshot_idx()); + } + _stats.snapshots_taken++; } ), v); @@ -1364,6 +1486,8 @@ future<> server_impl::abort(sstring reason) { _aborted = std::move(reason); logger.trace("[{}]: abort() called", _id); _fsm->stop(); + _events.broken(); + _snapshot_desc_idx_changed.broken(); // IO and applier fibers may update waiters and start new snapshot // transfers, so abort them first diff --git a/raft/server.hh b/raft/server.hh index a5da9b29ff..979f5de67c 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -224,6 +224,22 @@ public: // of two servers iff their IDs are different. virtual void register_metrics() = 0; + // Manually trigger snapshot creation and log truncation. + // + // Does nothing if the current apply index is less or equal to the last persisted snapshot descriptor index + // and returns `false`. + // + // Otherwise returns `true`; when the future resolves, it is guaranteed that the snapshot descriptor + // is persisted, but not that the snapshot is loaded to the state machine yet (it will be eventually). + // + // The request may be resolved by the regular snapshotting mechanisms (e.g. a snapshot + // is created because the Raft log grows too large). In this case there is no guarantee + // how many trailing entries will be left trailing behind the snapshot. However, + // if there are no operations running on the server concurrently with the request and all + // committed entries are already applied, the created snapshot is guaranteed to leave + // zero trailing entries. + virtual future trigger_snapshot(seastar::abort_source* as) = 0; + // Ad hoc functions for testing virtual void wait_until_candidate() = 0; virtual future<> wait_election_done() = 0; diff --git a/test/raft/etcd_test.cc b/test/raft/etcd_test.cc index 60fdb495f9..cde64a8513 100644 --- a/test/raft/etcd_test.cc +++ b/test/raft/etcd_test.cc @@ -298,7 +298,7 @@ BOOST_AUTO_TEST_CASE(test_vote_from_any_state) { server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)}; raft::configuration cfg = config_from_ids({id1, id2, id3}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd, fsm_cfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), fd, fsm_cfg); // Follower BOOST_CHECK(fsm.is_follower()); @@ -360,7 +360,7 @@ BOOST_AUTO_TEST_CASE(test_log_replication_1) { server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)}; raft::configuration cfg = config_from_ids({id1, id2, id3}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); election_timeout(fsm); BOOST_CHECK(fsm.is_candidate()); @@ -425,7 +425,7 @@ BOOST_AUTO_TEST_CASE(test_log_replication_2) { server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)}; raft::configuration cfg = config_from_ids({id1, id2, id3}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); election_timeout(fsm); output = fsm.get_output(); @@ -485,7 +485,7 @@ BOOST_AUTO_TEST_CASE(test_single_node_commit) { server_id id1{utils::UUID(0, 1)}; raft::configuration cfg = config_from_ids({id1}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); BOOST_CHECK(fsm.is_leader()); // Single node skips candidate state output = fsm.get_output(); @@ -578,11 +578,11 @@ BOOST_AUTO_TEST_CASE(test_dueling_candidates) { server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)}; raft::configuration cfg = config_from_ids({id1, id2, id3}); raft::log log1{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg); + fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg); raft::log log2{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg); + fsm_debug fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg); raft::log log3{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg); + fsm_debug fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg); // fsm1 and fsm3 don't see each other make_candidate(fsm1); @@ -621,11 +621,11 @@ BOOST_AUTO_TEST_CASE(test_dueling_pre_candidates) { server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)}; raft::configuration cfg = config_from_ids({id1, id2, id3}); raft::log log1{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre); + fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre); raft::log log2{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg_pre); + fsm_debug fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg_pre); raft::log log3{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg_pre); + fsm_debug fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg_pre); // fsm1 and fsm3 don't see each other make_candidate(fsm1); @@ -667,7 +667,7 @@ BOOST_AUTO_TEST_CASE(test_single_node_pre_candidate) { server_id id1{utils::UUID(0, 1)}; raft::configuration cfg = config_from_ids({id1}); raft::log log1{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre); + fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre); BOOST_CHECK(fsm1.is_leader()); } @@ -743,7 +743,7 @@ void handle_proposal(unsigned nodes, std::vector accepting_int) { raft::configuration cfg = config_from_ids(ids); raft::log log1{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm1(raft::server_id{utils::UUID(0, 1)}, term_t{}, server_id{}, std::move(log1), + fsm_debug fsm1(raft::server_id{utils::UUID(0, 1)}, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg); // promote 1 to become leader (i.e. gets votes) diff --git a/test/raft/fsm_test.cc b/test/raft/fsm_test.cc index 5d41b3bb67..e57640537c 100644 --- a/test/raft/fsm_test.cc +++ b/test/raft/fsm_test.cc @@ -304,7 +304,7 @@ void test_election_single_node_helper(raft::fsm_config fcfg) { server_id id1 = id(); raft::configuration cfg = config_from_ids({id1}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg); election_timeout(fsm); @@ -529,7 +529,7 @@ BOOST_AUTO_TEST_CASE(test_election_two_nodes_prevote) { raft::configuration cfg = config_from_ids({id1, id2}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg); // Initial state is follower BOOST_CHECK(fsm.is_follower()); @@ -595,7 +595,7 @@ BOOST_AUTO_TEST_CASE(test_election_four_nodes_prevote) { raft::configuration cfg = config_from_ids({id1, id2, id3, id4}); raft::log log{raft::snapshot_descriptor{.config = cfg}}; - raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd, fcfg); + fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), fd, fcfg); // Initial state is follower BOOST_CHECK(fsm.is_follower()); @@ -652,7 +652,7 @@ BOOST_AUTO_TEST_CASE(test_log_matching_rule) { log.emplace_back(seastar::make_lw_shared(raft::log_entry{term_t{10}, index_t{1000}})); log.stable_to(log.last_idx()); - raft::fsm fsm(id1, term_t{10}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug fsm(id1, term_t{10}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); // Initial state is follower BOOST_CHECK(fsm.is_follower()); @@ -929,7 +929,7 @@ BOOST_AUTO_TEST_CASE(test_leader_stepdown) { {server_addr_from_id(id1), true}, {server_addr_from_id(id2), true}, {server_addr_from_id(id3), false}}); raft::log log(raft::snapshot_descriptor{.config = cfg}); - raft::fsm fsm(id1, term_t{1}, /* voted for */ server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug fsm(id1, term_t{1}, /* voted for */ server_id{}, std::move(log), trivial_failure_detector, fsm_cfg); // Check that we move to candidate state on timeout_now message fsm.step(id2, raft::timeout_now{fsm.get_current_term()}); @@ -1034,7 +1034,7 @@ BOOST_AUTO_TEST_CASE(test_leader_stepdown) { {server_addr_from_id(id1), true}, {server_addr_from_id(id2), true}, {server_addr_from_id(id3), true}}); raft::log log2(raft::snapshot_descriptor{.config = cfg}); - raft::fsm fsm2(id1, term_t{1}, /* voted for */ server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg); + fsm_debug fsm2(id1, term_t{1}, /* voted for */ server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg); election_timeout(fsm2); // Turn to a leader @@ -1152,7 +1152,7 @@ BOOST_AUTO_TEST_CASE(test_confchange_a_to_b) { // A somewhat awkward way to obtain B's log for restart log.emplace_back(make_lw_shared(B.add_entry(config_from_ids({A_id})))); log.stable_to(log.last_idx()); - raft::fsm B_1(B_id, B.get_current_term(), B_id, std::move(log), trivial_failure_detector, fsm_cfg); + fsm_debug B_1(B_id, B.get_current_term(), B_id, std::move(log), trivial_failure_detector, fsm_cfg); election_timeout(B_1); communicate(A, B_1); BOOST_CHECK(B_1.is_follower()); @@ -1469,7 +1469,7 @@ BOOST_AUTO_TEST_CASE(test_zero) { BOOST_AUTO_TEST_CASE(test_reordered_reject) { auto id1 = id(); - raft::fsm fsm1(id1, term_t{1}, server_id{}, + fsm_debug fsm1(id1, term_t{1}, server_id{}, raft::log{raft::snapshot_descriptor{.config = config_from_ids({id1})}}, trivial_failure_detector, fsm_cfg); @@ -1481,7 +1481,7 @@ BOOST_AUTO_TEST_CASE(test_reordered_reject) { (void)fsm1.get_output(); auto id2 = id(); - raft::fsm fsm2(id2, term_t{1}, server_id{}, + fsm_debug fsm2(id2, term_t{1}, server_id{}, raft::log{raft::snapshot_descriptor{.config = raft::configuration{}}}, trivial_failure_detector, fsm_cfg); diff --git a/test/raft/helpers.cc b/test/raft/helpers.cc index abce00f5f5..06a68ee6db 100644 --- a/test/raft/helpers.cc +++ b/test/raft/helpers.cc @@ -94,8 +94,8 @@ communicate_impl(std::function stop_pred, raft_routing_map& map) { has_traffic = false; for (auto e : map) { raft::fsm& from = *e.second; - bool has_output; - for (auto output = from.get_output(); !output.empty(); output = from.get_output()) { + for (bool has_output = from.has_output(); has_output; has_output = from.has_output()) { + auto output = from.get_output(); if (stop_pred()) { return; } diff --git a/test/raft/helpers.hh b/test/raft/helpers.hh index 3b23a17b16..01810f7d27 100644 --- a/test/raft/helpers.hh +++ b/test/raft/helpers.hh @@ -63,9 +63,20 @@ raft::command create_command(T val) { extern raft::fsm_config fsm_cfg; extern raft::fsm_config fsm_cfg_pre; -class fsm_debug : public raft::fsm { +struct sm_events_container { + seastar::condition_variable sm_events; +}; + +class fsm_debug : public sm_events_container, public raft::fsm { public: using raft::fsm::fsm; + + explicit fsm_debug(raft::server_id id, raft::term_t current_term, raft::server_id voted_for, raft::log log, + raft::failure_detector& failure_detector, raft::fsm_config conf) + : sm_events_container() + , fsm(id, current_term, voted_for, std::move(log), raft::index_t{0}, failure_detector, conf, sm_events) { + } + void become_follower(raft::server_id leader) { raft::fsm::become_follower(leader); } diff --git a/test/topology_custom/test_raft_snapshot_request.py b/test/topology_custom/test_raft_snapshot_request.py new file mode 100644 index 0000000000..4a0af6073b --- /dev/null +++ b/test/topology_custom/test_raft_snapshot_request.py @@ -0,0 +1,102 @@ +# +# Copyright (C) 2024-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# + +import asyncio +import pytest +import time +import logging + +from test.pylib.manager_client import ManagerClient +from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, read_barrier + + +logger = logging.getLogger(__name__) + + +async def get_raft_log_size(cql, host) -> int: + query = "select count(\"index\") from system.raft" + return (await cql.run_async(query, host=host))[0][0] + + +async def get_raft_snap_id(cql, host) -> str: + query = "select snapshot_id from system.raft_snapshots" + return (await cql.run_async(query, host=host))[0].snapshot_id + + +async def trigger_snapshot(manager: ManagerClient, group0_id: str, ip_addr) -> None: + await manager.api.client.post(f"/raft/trigger_snapshot/{group0_id}", host=ip_addr) + + +@pytest.mark.asyncio +async def test_raft_snapshot_request(manager: ManagerClient): + servers = [await manager.server_add() for _ in range(3)] + cql = manager.cql + assert(cql) + + s1 = servers[0] + h1 = (await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60))[0] + group0_id = (await cql.run_async( + "select value from system.scylla_local where key = 'raft_group0_id'", + host=h1))[0].value + + # Verify that snapshotting updates the snapshot ID and truncates the log. + log_size = await get_raft_log_size(cql, h1) + logger.info(f"Log size on {s1}: {log_size}") + snap_id = await get_raft_snap_id(cql, h1) + logger.info(f"Snapshot ID on {s1}: {snap_id}") + assert log_size > 0 + await trigger_snapshot(manager, group0_id, s1.ip_addr) + new_log_size = await get_raft_log_size(cql, h1) + logger.info(f"New log size on {s1}: {new_log_size}") + new_snap_id = await get_raft_snap_id(cql, h1) + logger.info(f"New snapshot ID on {s1}: {new_snap_id}") + assert new_log_size == 0 + assert new_snap_id != snap_id + + # If a server misses a command and a snapshot is created on the leader, + # the server once alive should eventually receive that snapshot. + s2 = servers[2] + h2 = (await wait_for_cql_and_get_hosts(cql, [s2], time.time() + 60))[0] + s2_log_size = await get_raft_log_size(cql, h2) + logger.info(f"Log size on {s2}: {s2_log_size}") + s2_snap_id = await get_raft_snap_id(cql, h2) + logger.info(f"Snapshot ID on {s2}: {s2_snap_id}") + await manager.server_stop_gracefully(s2.server_id) + logger.info(f"Stopped {s2}") + # Restarting the two servers will cause a newly elected leader to append a dummy command. + await asyncio.gather(*(manager.server_restart(s.server_id) for s in servers[:2])) + logger.info(f"Restarted {servers[:2]}") + # Wait for one server to append the command and do a read_barrier on the other + # to make sure both appended + async def appended_command() -> int | None: + await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60) + s = await get_raft_log_size(cql, h1) + if s > 0: + return s + return None + log_size = await wait_for(appended_command, time.time() + 60) + logger.info(f"{servers[0]} appended new command") + h = (await wait_for_cql_and_get_hosts(cql, [servers[1]], time.time() + 60))[0] + await read_barrier(cql, h) + logger.info(f"Read barrier done on {servers[1]}") + # We don't know who the leader is, so trigger a snapshot on both servers. + for s in servers[:2]: + await trigger_snapshot(manager, group0_id, s.ip_addr) + h = (await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0] + snap = await get_raft_snap_id(cql, h) + logger.info("New snapshot ID on {s}: {snap}") + await manager.server_start(s2.server_id) + logger.info(f"Server {s2} restarted") + await wait_for_cql_and_get_hosts(cql, [s2], time.time() + 60) + async def received_snapshot() -> str | None: + new_s2_snap_id = await get_raft_snap_id(cql, h2) + if s2_snap_id != new_s2_snap_id: + return new_s2_snap_id + return None + new_s2_snap_id = await wait_for(received_snapshot, time.time() + 60) + logger.info(f"{s2} received new snapshot: {new_s2_snap_id}") + new_s2_log_size = await get_raft_log_size(cql, h2) + assert new_s2_log_size == 0