diff --git a/raft/fsm.cc b/raft/fsm.cc index 2a122a0ccc..b7a55f64d2 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -44,10 +44,11 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, failure_detector& failure_detector, fsm_config config) : fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {} -future<> fsm::wait_max_log_size(seastar::abort_source* as) { +future<> fsm::consume_memory(seastar::abort_source* as, size_t size) { check_is_leader(); - return as ? leader_state().log_limiter_semaphore->wait(*as) : leader_state().log_limiter_semaphore->wait(); + auto& sm = *leader_state().log_limiter_semaphore; + return as ? sm.wait(*as, size) : sm.wait(size); } const configuration& fsm::get_configuration() const { @@ -156,7 +157,20 @@ void fsm::reset_election_timeout() { void fsm::become_leader() { assert(!std::holds_alternative(_state)); _state.emplace(_config.max_log_size, *this); - leader_state().log_limiter_semaphore->consume(_log.in_memory_size()); + + // The semaphore is not used on the follower, so the limit could + // be temporarily exceeded here, and the value of + // the counter in the semaphore could become negative. + // This is not a problem though as applier_fiber triggers a snapshot + // if memory usage approaches the limit. + // As _applied_idx moves forward, snapshots will eventually release + // sufficient memory for at least one waiter (add_entry) to proceed. + // The amount of memory used by log::apply_snapshot for trailing items + // is limited by the condition + // _config.snapshot_trailing_size <= _config.max_log_size - max_command_size, + // which means that at least one command will eventually return from semaphore::wait. + leader_state().log_limiter_semaphore->consume(_log.memory_usage()); + _last_election_time = _clock.now(); _ping_leader = false; // a new leader needs to commit at lease one entry to make sure that @@ -976,7 +990,7 @@ void fsm::install_snapshot_reply(server_id from, snapshot_reply&& reply) { // again and snapshot transfer will be attempted one more time. } -bool fsm::apply_snapshot(snapshot_descriptor snp, size_t trailing, bool local) { +bool fsm::apply_snapshot(snapshot_descriptor snp, size_t max_trailing_entries, size_t max_trailing_bytes, bool local) { logger.trace("apply_snapshot[{}]: current term: {}, term: {}, idx: {}, id: {}, local: {}", _my_id, _current_term, snp.term, snp.idx, snp.id, local); // If the snapshot is locally generated, all entries up to its index must have been locally applied, @@ -1002,7 +1016,7 @@ bool fsm::apply_snapshot(snapshot_descriptor snp, size_t trailing, bool local) { // Otherwise snp.idx becomes the new commit index. _commit_idx = std::max(_commit_idx, snp.idx); _output.snp.emplace(fsm_output::applied_snapshot{snp, local}); - size_t units = _log.apply_snapshot(std::move(snp), trailing); + size_t units = _log.apply_snapshot(std::move(snp), max_trailing_entries, max_trailing_bytes); if (is_leader()) { logger.trace("apply_snapshot[{}]: signal {} available units", _my_id, units); leader_state().log_limiter_semaphore->signal(units); diff --git a/raft/fsm.hh b/raft/fsm.hh index 4416f9d7f5..59b27af1a4 100644 --- a/raft/fsm.hh +++ b/raft/fsm.hh @@ -49,11 +49,10 @@ struct fsm_output { struct fsm_config { // max size of appended entries in bytes size_t append_request_threshold; - // Max number of entries of in-memory part of the log after + // Limit in bytes on the size of in-memory part of the log after // which requests are stopped to be admitted until the log // is shrunk back by a snapshot. Should be greater than - // whatever the default number of trailing log entries - // is configured by the snapshot, otherwise the state + // the sum of sizes of trailing log entries, otherwise the state // machine will deadlock. size_t max_log_size; // If set to true will enable prevoting stage during election @@ -86,7 +85,7 @@ struct candidate { struct leader { // A state for each follower raft::tracker tracker; - // Used to acces new leader to set semaphore exception + // Used to access new leader to set semaphore exception const raft::fsm& fsm; // Used to limit log size std::unique_ptr log_limiter_semaphore; @@ -397,10 +396,11 @@ public: _ping_leader = true; } - // Call this function to wait for the number of log entries to - // go below max_log_size. + // Call this function to wait for the total size in bytes of log entries to + // go below max_log_size. + // Can only be called on a leader. // On abort throws `semaphore_aborted`. - future<> wait_max_log_size(seastar::abort_source* as); + future<> consume_memory(seastar::abort_source* as, size_t size); // Return current configuration. const configuration& get_configuration() const; @@ -460,15 +460,20 @@ public: } // This call will update the log to point to the new snapshot - // and will truncate the log prefix up to (snp.idx - trailing) - // entry. Returns false if the snapshot is older than existing one, + // and will truncate the log prefix so that the number of + // remaining applied entries is <= max_trailing_entries and their total size is <= max_trailing_bytes. + // Returns false if the snapshot is older than existing one, // the passed snapshot will be dropped in this case. - bool apply_snapshot(snapshot_descriptor snp, size_t traling, bool local); + bool apply_snapshot(snapshot_descriptor snp, size_t max_trailing_entries, size_t max_trailing_bytes, bool local); std::optional> start_read_barrier(server_id requester); size_t in_memory_log_size() const { return _log.in_memory_size(); + } + + size_t log_memory_usage() const { + return _log.memory_usage(); }; server_id id() const { return _my_id; } @@ -517,7 +522,7 @@ void fsm::step(server_id from, const follower& c, Message&& msg) { request_vote(from, std::move(msg)); } else if constexpr (std::is_same_v) { send_to(from, snapshot_reply{.current_term = _current_term, - .success = apply_snapshot(std::move(msg.snp), 0, false)}); + .success = apply_snapshot(std::move(msg.snp), 0, 0, false)}); } else if constexpr (std::is_same_v) { // Leadership transfers never use pre-vote; we know we are not // recovering from a partition so there is no need for the diff --git a/raft/log.cc b/raft/log.cc index 21c53b4767..472038ea8c 100644 --- a/raft/log.cc +++ b/raft/log.cc @@ -17,6 +17,14 @@ const log_entry_ptr& log::get_entry(index_t i) const { return _log[i - _first_idx]; } +size_t log::range_memory_usage(log_entries::iterator first, log_entries::iterator last) const { + size_t result = 0; + for (auto it = first; it != last; ++it) { + result += memory_usage_of(**it, _max_command_size); + } + return result; +} + log_entry_ptr& log::operator[](size_t i) { assert(!_log.empty() && index_t(i) >= _first_idx); return get_entry(index_t(i)); @@ -24,6 +32,7 @@ log_entry_ptr& log::operator[](size_t i) { void log::emplace_back(log_entry_ptr&& e) { _log.emplace_back(std::move(e)); + _memory_usage += memory_usage_of(*_log.back(), _max_command_size); if (std::holds_alternative(_log.back()->data)) { _prev_conf_idx = _last_conf_idx; _last_conf_idx = last_idx(); @@ -55,7 +64,9 @@ index_t log::next_idx() const { void log::truncate_uncommitted(index_t idx) { assert(idx >= _first_idx); auto it = _log.begin() + (idx - _first_idx); + const auto released_memory = range_memory_usage(it, _log.end()); _log.erase(it, _log.end()); + _memory_usage -= released_memory; stable_to(std::min(_stable_idx, last_idx())); if (_last_conf_idx > last_idx()) { // If _prev_conf_idx is 0, this log does not contain any @@ -215,24 +226,33 @@ const configuration* log::get_prev_configuration() const { return nullptr; } -size_t log::apply_snapshot(snapshot_descriptor&& snp, size_t trailing) { +size_t log::apply_snapshot(snapshot_descriptor&& snp, size_t max_trailing_entries, size_t max_trailing_bytes) { assert (snp.idx > _snapshot.idx); - size_t removed; + size_t released_memory; auto idx = snp.idx; if (idx > last_idx()) { - // Remove all entries ignoring the 'trailing' argument, + // Remove all entries ignoring 'trailing' arguments, // since otherwise there would be a gap between old // entries and the next entry index. - removed = _log.size(); + released_memory = std::exchange(_memory_usage, 0); _log.clear(); _first_idx = idx + index_t{1}; } else { - removed = _log.size() - (last_idx() - idx); - removed -= std::min(trailing, removed); - _log.erase(_log.begin(), _log.begin() + removed); - _first_idx = _first_idx + index_t{removed}; + auto entries_to_remove = _log.size() - (last_idx() - idx); + size_t trailing_bytes = 0; + for (int i = 0; i < max_trailing_entries && entries_to_remove > 0; ++i) { + trailing_bytes += memory_usage_of(*_log[entries_to_remove - 1], _max_command_size); + if (trailing_bytes > max_trailing_bytes) { + break; + } + --entries_to_remove; + } + released_memory = range_memory_usage(_log.begin(), _log.begin() + entries_to_remove); + _log.erase(_log.begin(), _log.begin() + entries_to_remove); + _memory_usage -= released_memory; + _first_idx = _first_idx + index_t{entries_to_remove}; } _stable_idx = std::max(idx, _stable_idx); @@ -250,7 +270,7 @@ size_t log::apply_snapshot(snapshot_descriptor&& snp, size_t trailing) { _snapshot = std::move(snp); - return removed; + return released_memory; } std::ostream& operator<<(std::ostream& os, const log& l) { diff --git a/raft/log.hh b/raft/log.hh index fc7b92cd4f..c94f7a3d23 100644 --- a/raft/log.hh +++ b/raft/log.hh @@ -22,9 +22,10 @@ namespace raft { class log { // Snapshot of the prefix of the log. snapshot_descriptor _snapshot; - // We need something that can be truncated from both sides. - // std::deque move constructor is not nothrow hence cannot be used + // In-memory log data. log_entries _log; + // Max command size, is used to calculate memory usage. + size_t _max_command_size; // Raft log index of the first entry in the log. // Usually it's simply _snapshot.idx + 1, // but if apply_snapshot() with non-zero trailing was used, @@ -57,6 +58,7 @@ class log { // The previous value of _last_conf_idx, to avoid scanning // the log backwards after truncate(). index_t _prev_conf_idx = index_t{0}; + size_t _memory_usage; private: // Drop uncommitted log entries not present on the leader. void truncate_uncommitted(index_t i); @@ -65,9 +67,10 @@ private: void init_last_conf_idx(); log_entry_ptr& get_entry(index_t); const log_entry_ptr& get_entry(index_t) const; + size_t range_memory_usage(log_entries::iterator first, log_entries::iterator last) const; public: - explicit log(snapshot_descriptor snp, log_entries log = {}) - : _snapshot(std::move(snp)), _log(std::move(log)) { + explicit log(snapshot_descriptor snp, log_entries log = {}, size_t max_command_size = sizeof(log_entry)) + : _snapshot(std::move(snp)), _log(std::move(log)), _max_command_size(max_command_size) { if (_log.empty()) { _first_idx = _snapshot.idx + index_t{1}; } else { @@ -77,6 +80,7 @@ public: // perform an initial state transfer. assert(_first_idx <= _snapshot.idx + 1); } + _memory_usage = range_memory_usage(_log.begin(), _log.end()); // The snapshot index is at least 0, so _first_idx // is at least 1 assert(_first_idx > 0); @@ -116,16 +120,21 @@ public: size_t in_memory_size() const { return _log.size(); } + // Returns memory usage of the log entries in bytes + size_t memory_usage() const { + return _memory_usage; + } // The function returns current snapshot state of the log const snapshot_descriptor& get_snapshot() const { return _snapshot; } - // This call will update the log to point to the new snaphot - // and will truncate the log prefix up to (snp.idx - trailing) - // entry. Return value specifies how many log entries were dropped - size_t apply_snapshot(snapshot_descriptor&& snp, size_t trailing); + // This call will update the log to point to the new snapshot + // and will truncate the log prefix so that the number of + // remaining applied entries is <= max_trailing_entries and their total size is <= max_trailing_bytes. + // Return value specifies the size in bytes of the dropped log entries. + size_t apply_snapshot(snapshot_descriptor&& snp, size_t max_trailing_entries, size_t max_trailing_bytes); // 3.5 // Raft maintains the following properties, which @@ -177,6 +186,32 @@ public: index_t maybe_append(std::vector&& entries); friend std::ostream& operator<<(std::ostream& os, const log& l); + + // The log keeps track of the memory it uses. This function returns the number + // of bytes that will be marked as used when a log_entry is added to the log. + // This logic should match the handling of log_limiter_semaphore, + // which is currently incremented only for command, but not for configuration and log_entry::dummy. + // This is why zero is returned for other log_entry elements + // and why this function has been kept separate from entry_size, + // which returns non-zero for configuration. + template + requires std::is_same_v || + std::is_same_v || std::is_same_v || std::is_same_v + static inline size_t memory_usage_of(const T& v, size_t max_command_size) { + if constexpr(std::is_same_v) { + // We account for sizeof(log_entry) for "small" commands, + // since the overhead of log_entries can take up significant memory. + return max_command_size > sizeof(log_entry) && v.size() < max_command_size - sizeof(log_entry) + ? v.size() + sizeof(log_entry) + : v.size(); + } + if constexpr(std::is_same_v) { + if (const auto* c = get_if(&v.data); c != nullptr) { + return memory_usage_of(*c, max_command_size); + } + } + return 0; + } }; } diff --git a/raft/raft.hh b/raft/raft.hh index c18d50423a..aac53d7229 100644 --- a/raft/raft.hh +++ b/raft/raft.hh @@ -471,8 +471,10 @@ using rpc_message = std::variant; -// we need something that can be truncated form both sides. +// we need something that can be truncated from both sides. // std::deque move constructor is not nothrow hence cannot be used +// also, boost::deque deallocates blocks when items are removed, +// we don't want to hold on to memory we don't use. using log_entries = boost::container::deque; // 3.4 Leader election diff --git a/raft/server.cc b/raft/server.cc index b873871a04..93a425f45f 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -288,8 +288,22 @@ server_impl::server_impl(server_id uuid, std::unique_ptr rpc, _persistence(std::move(persistence)), _failure_detector(failure_detector), _id(uuid), _config(config) { set_rpc_server(_rpc.get()); - if (_config.snapshot_threshold > _config.max_log_size) { - throw config_error("snapshot_threshold has to be smaller than max_log_size"); + if (_config.snapshot_threshold_log_size > _config.max_log_size) { + throw config_error(fmt::format("[{}] snapshot_threshold_log_size ({}) must not be greater than max_log_size ({})", + _id, _config.snapshot_threshold_log_size, _config.max_log_size)); + } + if (_config.snapshot_trailing_size > _config.snapshot_threshold_log_size) { + throw config_error(fmt::format("[{}] snapshot_trailing_size ({}) must not be greater than snapshot_threshold_log_size ({})", + _id, _config.snapshot_trailing_size, _config.snapshot_threshold_log_size)); + } + if (_config.max_command_size > _config.max_log_size - _config.snapshot_trailing_size) { + throw config_error(fmt::format( + "[{}] max_command_size ({}) must not be greater than " + "max_log_size - snapshot_trailing_size ({} - {} = {})", + _id, + _config.max_command_size, + _config.max_log_size, _config.snapshot_trailing_size, + _config.max_log_size - _config.snapshot_trailing_size)); } } @@ -297,7 +311,7 @@ future<> server_impl::start() { auto [term, vote] = co_await _persistence->load_term_and_vote(); auto snapshot = co_await _persistence->load_snapshot_descriptor(); auto log_entries = co_await _persistence->load_log(); - auto log = raft::log(snapshot, std::move(log_entries)); + auto log = raft::log(snapshot, std::move(log_entries), _config.max_command_size); auto commit_idx = co_await _persistence->load_commit_idx(); raft::configuration rpc_config = log.get_configuration(); index_t stable_idx = log.stable_idx(); @@ -457,9 +471,9 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor } future server_impl::add_entry_on_leader(command cmd, seastar::abort_source* as) { - // Wait for a new slot to become available + // Wait for sufficient memory to become available try { - co_await _fsm->wait_max_log_size(as); + co_await _fsm->consume_memory(as, log::memory_usage_of(cmd, _config.max_command_size)); } catch (semaphore_aborted&) { throw request_aborted(); } @@ -485,7 +499,7 @@ future server_impl::execute_add_entry(server_id from, command c } future<> server_impl::add_entry(command command, wait_type type, seastar::abort_source* as) { - if (_config.max_command_size > 0 && command.size() > _config.max_command_size) { + if (command.size() > _config.max_command_size) { logger.trace("[{}] add_entry command size exceeds the limit: {} > {}", id(), command.size(), _config.max_command_size); throw command_is_too_big_error(command.size(), _config.max_command_size); @@ -1044,7 +1058,7 @@ future<> server_impl::applier_fiber() { try { co_await _state_machine->apply(std::move(commands)); } catch (abort_requested_exception& e) { - logger.info("[{}] applier fiber stopped because state machine was aborter: {}", _id, e); + logger.info("[{}] applier fiber stopped because state machine was aborted: {}", _id, e); co_return; } catch (...) { std::throw_with_nested(raft::state_machine_error{}); @@ -1060,7 +1074,11 @@ future<> server_impl::applier_fiber() { // (i.e. didn't yet receive from _apply_entries queue) but will soon. We avoid unnecessary work // of taking snapshots ourselves but comparing our last index directly with what's currently in _fsm. auto last_snap_idx = _fsm->log_last_snapshot_idx(); - if (_applied_idx >= last_snap_idx && _applied_idx - last_snap_idx >= _config.snapshot_threshold) { + + if (_applied_idx > last_snap_idx && + (_applied_idx - last_snap_idx >= _config.snapshot_threshold || + _fsm->log_memory_usage() >= _config.snapshot_threshold_log_size)) + { snapshot_descriptor snp; snp.term = last_term; snp.idx = _applied_idx; @@ -1070,7 +1088,7 @@ future<> server_impl::applier_fiber() { // Note that at this point (after the `co_await`), _fsm may already have applied a later snapshot. // That's fine, `_fsm->apply_snapshot` will simply ignore our current attempt; we will soon receive // a later snapshot from the queue. - if (!_fsm->apply_snapshot(snp, _config.snapshot_trailing, true)) { + if (!_fsm->apply_snapshot(snp, _config.snapshot_trailing, _config.snapshot_trailing_size, true)) { logger.trace("[{}] applier fiber: while taking snapshot term={} idx={} id={}," " fsm received a later snapshot at idx={}", _id, snp.term, snp.idx, snp.id, _fsm->log_last_snapshot_idx()); } @@ -1446,7 +1464,9 @@ void server_impl::register_metrics() { sm::description("how many time the user's state machine was snapshotted"), {server_id_label(_id)}), sm::make_gauge("in_memory_log_size", [this] { return _fsm->in_memory_log_size(); }, - sm::description("size of in-memory part of the log"), {server_id_label(_id)}), + sm::description("size of in-memory part of the log"), {server_id_label(_id)}), + sm::make_gauge("log_memory_usage", [this] { return _fsm->log_memory_usage(); }, + sm::description("memory usage of in-memory part of the log in bytes"), {server_id_label(_id)}), }); } diff --git a/raft/server.hh b/raft/server.hh index 2e7a9d46f7..14764879c3 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -23,17 +23,29 @@ public: // automatically snapshot state machine after applying // this number of entries size_t snapshot_threshold = 1024; - // how many entries to leave in the log after tacking a snapshot + // Automatically snapshot state machine if the log memory usage exceeds this value. + // The value is in bytes. + // Must be smaller than max_log_size. + // It is recommended to set this value to no more than half of the max_log_size, + // so that snapshots are taken in advance and there is no backpressure due to max_log_size. + size_t snapshot_threshold_log_size = 2 * 1024 * 1024; + // how many entries to leave in the log after taking a snapshot size_t snapshot_trailing = 200; + // Limit on the total number of bytes, consumed by snapshot trailing entries. + // Must be smaller than snapshot_threshold_log_size. + // It is recommended to set this value to no more than half of snapshot_threshold_log_size + // so that not all memory is held for trailing when taking a snapshot. + size_t snapshot_trailing_size = 1 * 1024 * 1024; // max size of appended entries in bytes size_t append_request_threshold = 100000; - // Max number of entries of in-memory part of the log after + // Limit in bytes on the size of in-memory part of the log after // which requests are stopped to be admitted until the log - // is shrunk back by a snapshot. Should be greater than - // whatever the default number of trailing log entries - // is configured by the snapshot, otherwise the state - // machine will deadlock on attempt to submit a new entry. - size_t max_log_size = 5000; + // is shrunk back by a snapshot. + // The following condition must be satisfied: + // max_command_size <= max_log_size - snapshot_trailing_size + // this ensures that trailing log entries won't block incoming commands and at least + // one command can fit in the log + size_t max_log_size = 4 * 1024 * 1024; // If set to true will enable prevoting stage during election bool enable_prevoting = true; // If set to true, forward configuration and entries from @@ -43,8 +55,11 @@ public: bool enable_forwarding = true; // Max size of a single command, add_entry with a bigger command will throw command_is_too_big_error. - // The value of zero means no limit. - size_t max_command_size = 0; + // The following condition must be satisfied: + // max_command_size <= max_log_size - snapshot_trailing_size + // this ensures that trailing log entries won't block incoming commands and at least + // one command can fit in the log + size_t max_command_size = 100 * 1024; // A callback to invoke if one of internal server // background activities has stopped because of an error. std::function on_background_error; diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index 96300f5009..3e577778cd 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -175,18 +175,23 @@ raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, auto storage = std::make_unique(_qp, gid, my_addr.id); auto& persistence_ref = *storage; auto* cl = _qp.proxy().get_db().local().commitlog(); + auto config = raft::server::configuration { + .on_background_error = [gid, this](std::exception_ptr e) { + _raft_gr.abort_server(gid, fmt::format("background error, {}", e)); + _status_for_monitoring = status_for_monitoring::aborted; + } + }; + if (cl) { + // Dividing by two is to protect against paddings that the + // commit log can add for each mutation, as well as + // against different commit log limits on different nodes. + config.max_command_size = cl->max_record_size() / 2; + config.max_log_size = 3 * config.max_command_size; + config.snapshot_threshold_log_size = config.max_log_size / 2; + config.snapshot_trailing_size = config.snapshot_threshold_log_size / 2; + }; auto server = raft::create_server(my_addr.id, std::move(rpc), std::move(state_machine), - std::move(storage), _raft_gr.failure_detector(), - raft::server::configuration { - // Dividing by two is to protect against paddings that the - // commit log can add for each mutation, as well as - // against different commit log limits on different nodes. - .max_command_size = cl ? cl->max_record_size() / 2 : 0, - .on_background_error = [gid, this](std::exception_ptr e) { - _raft_gr.abort_server(gid, fmt::format("background error, {}", e)); - _status_for_monitoring = status_for_monitoring::aborted; - } - }); + std::move(storage), _raft_gr.failure_detector(), config); // initialize the corresponding timer to tick the raft server instance auto ticker = std::make_unique([srv = server.get()] { srv->tick(); }); diff --git a/test/raft/fsm_test.cc b/test/raft/fsm_test.cc index 6e87896ff0..5d41b3bb67 100644 --- a/test/raft/fsm_test.cc +++ b/test/raft/fsm_test.cc @@ -248,7 +248,7 @@ BOOST_AUTO_TEST_CASE(test_log_last_conf_idx) { add_entry(log, cfg); BOOST_CHECK_EQUAL(log.last_conf_idx(), 3); // apply snapshot truncates the log and resets last_conf_idx() - log.apply_snapshot(log_snapshot(log, log.last_idx()), 0); + log.apply_snapshot(log_snapshot(log, log.last_idx()), 0, 0); BOOST_CHECK_EQUAL(log.last_conf_idx(), log.get_snapshot().idx); // log::last_term() is maintained correctly by truncate_head/truncate_tail() (snapshotting) BOOST_CHECK_EQUAL(log.last_term(), log.get_snapshot().term); @@ -263,7 +263,7 @@ BOOST_AUTO_TEST_CASE(test_log_last_conf_idx) { // entries, despite that trailing is given, a gap // between old log entries and a snapshot would violate // log continuity. - log.apply_snapshot(log_snapshot(log, log.last_idx() + index_t{GAP}), GAP * 2); + log.apply_snapshot(log_snapshot(log, log.last_idx() + index_t{GAP}), GAP * 2, std::numeric_limits::max()); BOOST_CHECK(log.empty()); BOOST_CHECK_EQUAL(log.next_idx(), log.get_snapshot().idx + index_t{1}); add_entry(log, log_entry::dummy{}); @@ -271,31 +271,31 @@ BOOST_AUTO_TEST_CASE(test_log_last_conf_idx) { add_entry(log, log_entry::dummy{}); BOOST_CHECK_EQUAL(log.in_memory_size(), 2); // Set trailing longer than the length of the log. - log.apply_snapshot(log_snapshot(log, log.last_idx()), 3); + log.apply_snapshot(log_snapshot(log, log.last_idx()), 3, std::numeric_limits::max()); BOOST_CHECK_EQUAL(log.in_memory_size(), 2); // Set trailing the same length as the current log length add_entry(log, log_entry::dummy{}); BOOST_CHECK_EQUAL(log.in_memory_size(), 3); - log.apply_snapshot(log_snapshot(log, log.last_idx()), 3); + log.apply_snapshot(log_snapshot(log, log.last_idx()), 3, std::numeric_limits::max()); BOOST_CHECK_EQUAL(log.in_memory_size(), 3); BOOST_CHECK_EQUAL(log.last_conf_idx(), log.get_snapshot().idx); add_entry(log, log_entry::dummy{}); // Set trailing shorter than the length of the log - log.apply_snapshot(log_snapshot(log, log.last_idx()), 1); + log.apply_snapshot(log_snapshot(log, log.last_idx()), 1, std::numeric_limits::max()); BOOST_CHECK_EQUAL(log.in_memory_size(), 1); // check that configuration from snapshot is used and not config entries from a trailing add_entry(log, cfg); add_entry(log, cfg); add_entry(log, log_entry::dummy{}); auto snp_idx = log.last_idx(); - log.apply_snapshot(log_snapshot(log, snp_idx), 10); + log.apply_snapshot(log_snapshot(log, snp_idx), 10, std::numeric_limits::max()); BOOST_CHECK_EQUAL(log.last_conf_idx(), snp_idx); // Check that configuration from the log is used if it has higher index then snapshot idx add_entry(log, log_entry::dummy{}); snp_idx = log.last_idx(); add_entry(log, cfg); add_entry(log, cfg); - log.apply_snapshot(log_snapshot(log, snp_idx), 10); + log.apply_snapshot(log_snapshot(log, snp_idx), 10, std::numeric_limits::max()); BOOST_CHECK_EQUAL(log.last_conf_idx(), log.last_idx()); } @@ -1689,13 +1689,13 @@ BOOST_AUTO_TEST_CASE(test_non_voter_voter_loop) { // If iteration count is large, this helps save some // memory if (rolladice(1./1000)) { - A.get_log().apply_snapshot(log_snapshot(A.get_log(), A.log_last_idx()), 0); + A.get_log().apply_snapshot(log_snapshot(A.get_log(), A.log_last_idx()), 0, 0); } if (rolladice(1./100)) { - B.get_log().apply_snapshot(log_snapshot(A.get_log(), B.log_last_idx()), 0); + B.get_log().apply_snapshot(log_snapshot(A.get_log(), B.log_last_idx()), 0, 0); } if (rolladice(1./5000)) { - C.get_log().apply_snapshot(log_snapshot(A.get_log(), B.log_last_idx()), 0); + C.get_log().apply_snapshot(log_snapshot(A.get_log(), B.log_last_idx()), 0, 0); } } BOOST_CHECK(A.is_leader()); @@ -1731,7 +1731,7 @@ BOOST_AUTO_TEST_CASE(test_non_voter_confchange_in_snapshot) { BOOST_CHECK_EQUAL(A.get_configuration().current.find(config_member_from_id(C_id))->can_vote, false); A.tick(); raft::snapshot_descriptor A_snp{.idx = A.log_last_idx(), .term = A.log_last_term(), .config = A.get_configuration()}; - A.apply_snapshot(A_snp, 0, true); + A.apply_snapshot(A_snp, 0, 0, true); A.tick(); communicate(A, B, C); BOOST_CHECK(A.is_leader()); @@ -1756,7 +1756,7 @@ BOOST_AUTO_TEST_CASE(test_non_voter_confchange_in_snapshot) { BOOST_CHECK_EQUAL(A.get_configuration().current.find(config_member_from_id(C_id))->can_vote, true); A.tick(); A_snp = raft::snapshot_descriptor{.idx = A.log_last_idx(), .term = A.log_last_term(), .config = A.get_configuration()}; - A.apply_snapshot(A_snp, 0, true); + A.apply_snapshot(A_snp, 0, 0, true); A.tick(); communicate(A, B, C); BOOST_CHECK(A.is_leader()); @@ -2060,9 +2060,9 @@ BOOST_AUTO_TEST_CASE(test_reject_outdated_remote_snapshot) { auto snp_term = B.get_log().term_for(snp_idx); BOOST_CHECK(snp_term); auto snp = raft::snapshot_descriptor{.idx = index_t{1}, .term = *snp_term, .config = cfg}; - BOOST_CHECK(!B.apply_snapshot(snp, 0, false)); + BOOST_CHECK(!B.apply_snapshot(snp, 0, 0, false)); // But it should apply this snapshot if it's locally generated - BOOST_CHECK(B.apply_snapshot(snp, 0, true)); + BOOST_CHECK(B.apply_snapshot(snp, 0, 0, true)); } // A server should sometimes become a candidate even though it is outside the current configuration, @@ -2276,7 +2276,7 @@ BOOST_AUTO_TEST_CASE(test_append_entry_inside_snapshot) { C.step(A_id, std::move(append)); (void)C.get_output(); // C snapshots the log - C.apply_snapshot(log_snapshot(C.get_log(), C.log_last_idx()), 0, true); + C.apply_snapshot(log_snapshot(C.get_log(), C.log_last_idx()), 0, 0, true); // Try to add one more entry A.add_entry(log_entry::dummy{}); diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc index d3418d4e05..85eb10d28c 100644 --- a/test/raft/randomized_nemesis_test.cc +++ b/test/raft/randomized_nemesis_test.cc @@ -2216,9 +2216,12 @@ SEASTAR_TEST_CASE(test_frequent_snapshotting) { }, 10'000); const auto server_config = raft::server::configuration { .snapshot_threshold{1}, + .snapshot_threshold_log_size{150}, .snapshot_trailing{5}, - .max_log_size{20}, - .enable_forwarding{true} + .snapshot_trailing_size{75}, + .max_log_size{300}, + .enable_forwarding{true}, + .max_command_size{30} }; auto leader_id = co_await env.new_server(true, server_config); @@ -3257,12 +3260,16 @@ SEASTAR_TEST_CASE(basic_generator_test) { bool nemesis_crashes = true; // TODO: randomize the snapshot thresholds between different servers for more chaos. + const auto max_command_size = 2 * sizeof(raft::log_entry); auto srv_cfg = frequent_snapshotting ? raft::server::configuration { .snapshot_threshold{10}, + .snapshot_threshold_log_size{3 * (max_command_size + sizeof(raft::log_entry))}, .snapshot_trailing{5}, - .max_log_size{20}, + .snapshot_trailing_size{max_command_size + sizeof(raft::log_entry)}, + .max_log_size{5 * (max_command_size + sizeof(raft::log_entry))}, .enable_forwarding{forwarding}, + .max_command_size{max_command_size} } : raft::server::configuration { .enable_forwarding{forwarding}, diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index 4125df2481..4637a2d13b 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -158,7 +158,20 @@ RAFT_TEST_CASE(take_snapshot, (test_case{ // 2 nodes doing simple replication/snapshoting while leader's log size is limited RAFT_TEST_CASE(backpressure, (test_case{ .nodes = 2, - .config = {{.snapshot_threshold = 10, .snapshot_trailing = 5, .max_log_size = 20}, {.snapshot_threshold = 20, .snapshot_trailing = 10}}, + .config = { + { + .snapshot_threshold = 10, + .snapshot_threshold_log_size = 200, + .snapshot_trailing = 5, + .snapshot_trailing_size = 100, + .max_log_size = 450, + .max_command_size = 150 + }, + { + .snapshot_threshold = 20, + .snapshot_trailing = 10 + } + }, .updates = {entries{100}}})); // 3 nodes, add entries, drop leader 0, add entries [implicit re-join all]