raft server, log size limit in bytes

Before this patch we could get an OOM if we
received several big commands. The number of
commands was small, but their total size
in bytes was large.

snapshot_trailing_size is needed to guarantee
progress. Without this limit the fsm could
get stuck if the size of the next item is
greater than max_log_size - (size of trailing entries).
This commit is contained in:
Petr Gusev
2022-09-01 18:19:29 +04:00
parent ad2f1dc704
commit 27e60ecbf4
11 changed files with 219 additions and 83 deletions

View File

@@ -44,10 +44,11 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config config) :
fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {}
future<> fsm::wait_max_log_size(seastar::abort_source* as) {
future<> fsm::consume_memory(seastar::abort_source* as, size_t size) {
check_is_leader();
return as ? leader_state().log_limiter_semaphore->wait(*as) : leader_state().log_limiter_semaphore->wait();
auto& sm = *leader_state().log_limiter_semaphore;
return as ? sm.wait(*as, size) : sm.wait(size);
}
const configuration& fsm::get_configuration() const {
@@ -156,7 +157,20 @@ void fsm::reset_election_timeout() {
void fsm::become_leader() {
assert(!std::holds_alternative<leader>(_state));
_state.emplace<leader>(_config.max_log_size, *this);
leader_state().log_limiter_semaphore->consume(_log.in_memory_size());
// The semaphore is not used on the follower, so the limit could
// be temporarily exceeded here, and the value of
// the counter in the semaphore could become negative.
// This is not a problem though as applier_fiber triggers a snapshot
// if memory usage approaches the limit.
// As _applied_idx moves forward, snapshots will eventually release
// sufficient memory for at least one waiter (add_entry) to proceed.
// The amount of memory used by log::apply_snapshot for trailing items
// is limited by the condition
// _config.snapshot_trailing_size <= _config.max_log_size - max_command_size,
// which means that at least one command will eventually return from semaphore::wait.
leader_state().log_limiter_semaphore->consume(_log.memory_usage());
_last_election_time = _clock.now();
_ping_leader = false;
// a new leader needs to commit at lease one entry to make sure that
@@ -976,7 +990,7 @@ void fsm::install_snapshot_reply(server_id from, snapshot_reply&& reply) {
// again and snapshot transfer will be attempted one more time.
}
bool fsm::apply_snapshot(snapshot_descriptor snp, size_t trailing, bool local) {
bool fsm::apply_snapshot(snapshot_descriptor snp, size_t max_trailing_entries, size_t max_trailing_bytes, bool local) {
logger.trace("apply_snapshot[{}]: current term: {}, term: {}, idx: {}, id: {}, local: {}",
_my_id, _current_term, snp.term, snp.idx, snp.id, local);
// If the snapshot is locally generated, all entries up to its index must have been locally applied,
@@ -1002,7 +1016,7 @@ bool fsm::apply_snapshot(snapshot_descriptor snp, size_t trailing, bool local) {
// Otherwise snp.idx becomes the new commit index.
_commit_idx = std::max(_commit_idx, snp.idx);
_output.snp.emplace(fsm_output::applied_snapshot{snp, local});
size_t units = _log.apply_snapshot(std::move(snp), trailing);
size_t units = _log.apply_snapshot(std::move(snp), max_trailing_entries, max_trailing_bytes);
if (is_leader()) {
logger.trace("apply_snapshot[{}]: signal {} available units", _my_id, units);
leader_state().log_limiter_semaphore->signal(units);

View File

@@ -49,11 +49,10 @@ struct fsm_output {
struct fsm_config {
// max size of appended entries in bytes
size_t append_request_threshold;
// Max number of entries of in-memory part of the log after
// Limit in bytes on the size of in-memory part of the log after
// which requests are stopped to be admitted until the log
// is shrunk back by a snapshot. Should be greater than
// whatever the default number of trailing log entries
// is configured by the snapshot, otherwise the state
// the sum of sizes of trailing log entries, otherwise the state
// machine will deadlock.
size_t max_log_size;
// If set to true will enable prevoting stage during election
@@ -86,7 +85,7 @@ struct candidate {
struct leader {
// A state for each follower
raft::tracker tracker;
// Used to acces new leader to set semaphore exception
// Used to access new leader to set semaphore exception
const raft::fsm& fsm;
// Used to limit log size
std::unique_ptr<seastar::semaphore> log_limiter_semaphore;
@@ -397,10 +396,11 @@ public:
_ping_leader = true;
}
// Call this function to wait for the number of log entries to
// go below max_log_size.
// Call this function to wait for the total size in bytes of log entries to
// go below max_log_size.
// Can only be called on a leader.
// On abort throws `semaphore_aborted`.
future<> wait_max_log_size(seastar::abort_source* as);
future<> consume_memory(seastar::abort_source* as, size_t size);
// Return current configuration.
const configuration& get_configuration() const;
@@ -460,15 +460,20 @@ public:
}
// This call will update the log to point to the new snapshot
// and will truncate the log prefix up to (snp.idx - trailing)
// entry. Returns false if the snapshot is older than existing one,
// and will truncate the log prefix so that the number of
// remaining applied entries is <= max_trailing_entries and their total size is <= max_trailing_bytes.
// Returns false if the snapshot is older than existing one,
// the passed snapshot will be dropped in this case.
bool apply_snapshot(snapshot_descriptor snp, size_t traling, bool local);
bool apply_snapshot(snapshot_descriptor snp, size_t max_trailing_entries, size_t max_trailing_bytes, bool local);
std::optional<std::pair<read_id, index_t>> start_read_barrier(server_id requester);
size_t in_memory_log_size() const {
return _log.in_memory_size();
}
size_t log_memory_usage() const {
return _log.memory_usage();
};
server_id id() const { return _my_id; }
@@ -517,7 +522,7 @@ void fsm::step(server_id from, const follower& c, Message&& msg) {
request_vote(from, std::move(msg));
} else if constexpr (std::is_same_v<Message, install_snapshot>) {
send_to(from, snapshot_reply{.current_term = _current_term,
.success = apply_snapshot(std::move(msg.snp), 0, false)});
.success = apply_snapshot(std::move(msg.snp), 0, 0, false)});
} else if constexpr (std::is_same_v<Message, timeout_now>) {
// Leadership transfers never use pre-vote; we know we are not
// recovering from a partition so there is no need for the

View File

@@ -17,6 +17,14 @@ const log_entry_ptr& log::get_entry(index_t i) const {
return _log[i - _first_idx];
}
size_t log::range_memory_usage(log_entries::iterator first, log_entries::iterator last) const {
size_t result = 0;
for (auto it = first; it != last; ++it) {
result += memory_usage_of(**it, _max_command_size);
}
return result;
}
log_entry_ptr& log::operator[](size_t i) {
assert(!_log.empty() && index_t(i) >= _first_idx);
return get_entry(index_t(i));
@@ -24,6 +32,7 @@ log_entry_ptr& log::operator[](size_t i) {
void log::emplace_back(log_entry_ptr&& e) {
_log.emplace_back(std::move(e));
_memory_usage += memory_usage_of(*_log.back(), _max_command_size);
if (std::holds_alternative<configuration>(_log.back()->data)) {
_prev_conf_idx = _last_conf_idx;
_last_conf_idx = last_idx();
@@ -55,7 +64,9 @@ index_t log::next_idx() const {
void log::truncate_uncommitted(index_t idx) {
assert(idx >= _first_idx);
auto it = _log.begin() + (idx - _first_idx);
const auto released_memory = range_memory_usage(it, _log.end());
_log.erase(it, _log.end());
_memory_usage -= released_memory;
stable_to(std::min(_stable_idx, last_idx()));
if (_last_conf_idx > last_idx()) {
// If _prev_conf_idx is 0, this log does not contain any
@@ -215,24 +226,33 @@ const configuration* log::get_prev_configuration() const {
return nullptr;
}
size_t log::apply_snapshot(snapshot_descriptor&& snp, size_t trailing) {
size_t log::apply_snapshot(snapshot_descriptor&& snp, size_t max_trailing_entries, size_t max_trailing_bytes) {
assert (snp.idx > _snapshot.idx);
size_t removed;
size_t released_memory;
auto idx = snp.idx;
if (idx > last_idx()) {
// Remove all entries ignoring the 'trailing' argument,
// Remove all entries ignoring 'trailing' arguments,
// since otherwise there would be a gap between old
// entries and the next entry index.
removed = _log.size();
released_memory = std::exchange(_memory_usage, 0);
_log.clear();
_first_idx = idx + index_t{1};
} else {
removed = _log.size() - (last_idx() - idx);
removed -= std::min(trailing, removed);
_log.erase(_log.begin(), _log.begin() + removed);
_first_idx = _first_idx + index_t{removed};
auto entries_to_remove = _log.size() - (last_idx() - idx);
size_t trailing_bytes = 0;
for (int i = 0; i < max_trailing_entries && entries_to_remove > 0; ++i) {
trailing_bytes += memory_usage_of(*_log[entries_to_remove - 1], _max_command_size);
if (trailing_bytes > max_trailing_bytes) {
break;
}
--entries_to_remove;
}
released_memory = range_memory_usage(_log.begin(), _log.begin() + entries_to_remove);
_log.erase(_log.begin(), _log.begin() + entries_to_remove);
_memory_usage -= released_memory;
_first_idx = _first_idx + index_t{entries_to_remove};
}
_stable_idx = std::max(idx, _stable_idx);
@@ -250,7 +270,7 @@ size_t log::apply_snapshot(snapshot_descriptor&& snp, size_t trailing) {
_snapshot = std::move(snp);
return removed;
return released_memory;
}
std::ostream& operator<<(std::ostream& os, const log& l) {

View File

@@ -22,9 +22,10 @@ namespace raft {
class log {
// Snapshot of the prefix of the log.
snapshot_descriptor _snapshot;
// We need something that can be truncated from both sides.
// std::deque move constructor is not nothrow hence cannot be used
// In-memory log data.
log_entries _log;
// Max command size, is used to calculate memory usage.
size_t _max_command_size;
// Raft log index of the first entry in the log.
// Usually it's simply _snapshot.idx + 1,
// but if apply_snapshot() with non-zero trailing was used,
@@ -57,6 +58,7 @@ class log {
// The previous value of _last_conf_idx, to avoid scanning
// the log backwards after truncate().
index_t _prev_conf_idx = index_t{0};
size_t _memory_usage;
private:
// Drop uncommitted log entries not present on the leader.
void truncate_uncommitted(index_t i);
@@ -65,9 +67,10 @@ private:
void init_last_conf_idx();
log_entry_ptr& get_entry(index_t);
const log_entry_ptr& get_entry(index_t) const;
size_t range_memory_usage(log_entries::iterator first, log_entries::iterator last) const;
public:
explicit log(snapshot_descriptor snp, log_entries log = {})
: _snapshot(std::move(snp)), _log(std::move(log)) {
explicit log(snapshot_descriptor snp, log_entries log = {}, size_t max_command_size = sizeof(log_entry))
: _snapshot(std::move(snp)), _log(std::move(log)), _max_command_size(max_command_size) {
if (_log.empty()) {
_first_idx = _snapshot.idx + index_t{1};
} else {
@@ -77,6 +80,7 @@ public:
// perform an initial state transfer.
assert(_first_idx <= _snapshot.idx + 1);
}
_memory_usage = range_memory_usage(_log.begin(), _log.end());
// The snapshot index is at least 0, so _first_idx
// is at least 1
assert(_first_idx > 0);
@@ -116,16 +120,21 @@ public:
size_t in_memory_size() const {
return _log.size();
}
// Returns memory usage of the log entries in bytes
size_t memory_usage() const {
return _memory_usage;
}
// The function returns current snapshot state of the log
const snapshot_descriptor& get_snapshot() const {
return _snapshot;
}
// This call will update the log to point to the new snaphot
// and will truncate the log prefix up to (snp.idx - trailing)
// entry. Return value specifies how many log entries were dropped
size_t apply_snapshot(snapshot_descriptor&& snp, size_t trailing);
// This call will update the log to point to the new snapshot
// and will truncate the log prefix so that the number of
// remaining applied entries is <= max_trailing_entries and their total size is <= max_trailing_bytes.
// Return value specifies the size in bytes of the dropped log entries.
size_t apply_snapshot(snapshot_descriptor&& snp, size_t max_trailing_entries, size_t max_trailing_bytes);
// 3.5
// Raft maintains the following properties, which
@@ -177,6 +186,32 @@ public:
index_t maybe_append(std::vector<log_entry_ptr>&& entries);
friend std::ostream& operator<<(std::ostream& os, const log& l);
// The log keeps track of the memory it uses. This function returns the number
// of bytes that will be marked as used when a log_entry is added to the log.
// This logic should match the handling of log_limiter_semaphore,
// which is currently incremented only for command, but not for configuration and log_entry::dummy.
// This is why zero is returned for other log_entry elements
// and why this function has been kept separate from entry_size,
// which returns non-zero for configuration.
template <typename T>
requires std::is_same_v<T, log_entry> ||
std::is_same_v<T, command> || std::is_same_v<T, configuration> || std::is_same_v<T, log_entry::dummy>
static inline size_t memory_usage_of(const T& v, size_t max_command_size) {
if constexpr(std::is_same_v<T, command>) {
// We account for sizeof(log_entry) for "small" commands,
// since the overhead of log_entries can take up significant memory.
return max_command_size > sizeof(log_entry) && v.size() < max_command_size - sizeof(log_entry)
? v.size() + sizeof(log_entry)
: v.size();
}
if constexpr(std::is_same_v<T, log_entry>) {
if (const auto* c = get_if<command>(&v.data); c != nullptr) {
return memory_usage_of(*c, max_command_size);
}
}
return 0;
}
};
}

View File

@@ -471,8 +471,10 @@ using rpc_message = std::variant<append_request,
read_quorum,
read_quorum_reply>;
// we need something that can be truncated form both sides.
// we need something that can be truncated from both sides.
// std::deque move constructor is not nothrow hence cannot be used
// also, boost::deque deallocates blocks when items are removed,
// we don't want to hold on to memory we don't use.
using log_entries = boost::container::deque<log_entry_ptr>;
// 3.4 Leader election

View File

@@ -288,8 +288,22 @@ server_impl::server_impl(server_id uuid, std::unique_ptr<rpc> rpc,
_persistence(std::move(persistence)), _failure_detector(failure_detector),
_id(uuid), _config(config) {
set_rpc_server(_rpc.get());
if (_config.snapshot_threshold > _config.max_log_size) {
throw config_error("snapshot_threshold has to be smaller than max_log_size");
if (_config.snapshot_threshold_log_size > _config.max_log_size) {
throw config_error(fmt::format("[{}] snapshot_threshold_log_size ({}) must not be greater than max_log_size ({})",
_id, _config.snapshot_threshold_log_size, _config.max_log_size));
}
if (_config.snapshot_trailing_size > _config.snapshot_threshold_log_size) {
throw config_error(fmt::format("[{}] snapshot_trailing_size ({}) must not be greater than snapshot_threshold_log_size ({})",
_id, _config.snapshot_trailing_size, _config.snapshot_threshold_log_size));
}
if (_config.max_command_size > _config.max_log_size - _config.snapshot_trailing_size) {
throw config_error(fmt::format(
"[{}] max_command_size ({}) must not be greater than "
"max_log_size - snapshot_trailing_size ({} - {} = {})",
_id,
_config.max_command_size,
_config.max_log_size, _config.snapshot_trailing_size,
_config.max_log_size - _config.snapshot_trailing_size));
}
}
@@ -297,7 +311,7 @@ future<> server_impl::start() {
auto [term, vote] = co_await _persistence->load_term_and_vote();
auto snapshot = co_await _persistence->load_snapshot_descriptor();
auto log_entries = co_await _persistence->load_log();
auto log = raft::log(snapshot, std::move(log_entries));
auto log = raft::log(snapshot, std::move(log_entries), _config.max_command_size);
auto commit_idx = co_await _persistence->load_commit_idx();
raft::configuration rpc_config = log.get_configuration();
index_t stable_idx = log.stable_idx();
@@ -457,9 +471,9 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
}
future<entry_id> server_impl::add_entry_on_leader(command cmd, seastar::abort_source* as) {
// Wait for a new slot to become available
// Wait for sufficient memory to become available
try {
co_await _fsm->wait_max_log_size(as);
co_await _fsm->consume_memory(as, log::memory_usage_of(cmd, _config.max_command_size));
} catch (semaphore_aborted&) {
throw request_aborted();
}
@@ -485,7 +499,7 @@ future<add_entry_reply> server_impl::execute_add_entry(server_id from, command c
}
future<> server_impl::add_entry(command command, wait_type type, seastar::abort_source* as) {
if (_config.max_command_size > 0 && command.size() > _config.max_command_size) {
if (command.size() > _config.max_command_size) {
logger.trace("[{}] add_entry command size exceeds the limit: {} > {}",
id(), command.size(), _config.max_command_size);
throw command_is_too_big_error(command.size(), _config.max_command_size);
@@ -1044,7 +1058,7 @@ future<> server_impl::applier_fiber() {
try {
co_await _state_machine->apply(std::move(commands));
} catch (abort_requested_exception& e) {
logger.info("[{}] applier fiber stopped because state machine was aborter: {}", _id, e);
logger.info("[{}] applier fiber stopped because state machine was aborted: {}", _id, e);
co_return;
} catch (...) {
std::throw_with_nested(raft::state_machine_error{});
@@ -1060,7 +1074,11 @@ future<> server_impl::applier_fiber() {
// (i.e. didn't yet receive from _apply_entries queue) but will soon. We avoid unnecessary work
// of taking snapshots ourselves but comparing our last index directly with what's currently in _fsm.
auto last_snap_idx = _fsm->log_last_snapshot_idx();
if (_applied_idx >= last_snap_idx && _applied_idx - last_snap_idx >= _config.snapshot_threshold) {
if (_applied_idx > last_snap_idx &&
(_applied_idx - last_snap_idx >= _config.snapshot_threshold ||
_fsm->log_memory_usage() >= _config.snapshot_threshold_log_size))
{
snapshot_descriptor snp;
snp.term = last_term;
snp.idx = _applied_idx;
@@ -1070,7 +1088,7 @@ future<> server_impl::applier_fiber() {
// Note that at this point (after the `co_await`), _fsm may already have applied a later snapshot.
// That's fine, `_fsm->apply_snapshot` will simply ignore our current attempt; we will soon receive
// a later snapshot from the queue.
if (!_fsm->apply_snapshot(snp, _config.snapshot_trailing, true)) {
if (!_fsm->apply_snapshot(snp, _config.snapshot_trailing, _config.snapshot_trailing_size, true)) {
logger.trace("[{}] applier fiber: while taking snapshot term={} idx={} id={},"
" fsm received a later snapshot at idx={}", _id, snp.term, snp.idx, snp.id, _fsm->log_last_snapshot_idx());
}
@@ -1446,7 +1464,9 @@ void server_impl::register_metrics() {
sm::description("how many time the user's state machine was snapshotted"), {server_id_label(_id)}),
sm::make_gauge("in_memory_log_size", [this] { return _fsm->in_memory_log_size(); },
sm::description("size of in-memory part of the log"), {server_id_label(_id)}),
sm::description("size of in-memory part of the log"), {server_id_label(_id)}),
sm::make_gauge("log_memory_usage", [this] { return _fsm->log_memory_usage(); },
sm::description("memory usage of in-memory part of the log in bytes"), {server_id_label(_id)}),
});
}

View File

@@ -23,17 +23,29 @@ public:
// automatically snapshot state machine after applying
// this number of entries
size_t snapshot_threshold = 1024;
// how many entries to leave in the log after tacking a snapshot
// Automatically snapshot state machine if the log memory usage exceeds this value.
// The value is in bytes.
// Must be smaller than max_log_size.
// It is recommended to set this value to no more than half of the max_log_size,
// so that snapshots are taken in advance and there is no backpressure due to max_log_size.
size_t snapshot_threshold_log_size = 2 * 1024 * 1024;
// how many entries to leave in the log after taking a snapshot
size_t snapshot_trailing = 200;
// Limit on the total number of bytes, consumed by snapshot trailing entries.
// Must be smaller than snapshot_threshold_log_size.
// It is recommended to set this value to no more than half of snapshot_threshold_log_size
// so that not all memory is held for trailing when taking a snapshot.
size_t snapshot_trailing_size = 1 * 1024 * 1024;
// max size of appended entries in bytes
size_t append_request_threshold = 100000;
// Max number of entries of in-memory part of the log after
// Limit in bytes on the size of in-memory part of the log after
// which requests are stopped to be admitted until the log
// is shrunk back by a snapshot. Should be greater than
// whatever the default number of trailing log entries
// is configured by the snapshot, otherwise the state
// machine will deadlock on attempt to submit a new entry.
size_t max_log_size = 5000;
// is shrunk back by a snapshot.
// The following condition must be satisfied:
// max_command_size <= max_log_size - snapshot_trailing_size
// this ensures that trailing log entries won't block incoming commands and at least
// one command can fit in the log
size_t max_log_size = 4 * 1024 * 1024;
// If set to true will enable prevoting stage during election
bool enable_prevoting = true;
// If set to true, forward configuration and entries from
@@ -43,8 +55,11 @@ public:
bool enable_forwarding = true;
// Max size of a single command, add_entry with a bigger command will throw command_is_too_big_error.
// The value of zero means no limit.
size_t max_command_size = 0;
// The following condition must be satisfied:
// max_command_size <= max_log_size - snapshot_trailing_size
// this ensures that trailing log entries won't block incoming commands and at least
// one command can fit in the log
size_t max_command_size = 100 * 1024;
// A callback to invoke if one of internal server
// background activities has stopped because of an error.
std::function<void(std::exception_ptr e)> on_background_error;

View File

@@ -175,18 +175,23 @@ raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid,
auto storage = std::make_unique<raft_sys_table_storage>(_qp, gid, my_addr.id);
auto& persistence_ref = *storage;
auto* cl = _qp.proxy().get_db().local().commitlog();
auto config = raft::server::configuration {
.on_background_error = [gid, this](std::exception_ptr e) {
_raft_gr.abort_server(gid, fmt::format("background error, {}", e));
_status_for_monitoring = status_for_monitoring::aborted;
}
};
if (cl) {
// Dividing by two is to protect against paddings that the
// commit log can add for each mutation, as well as
// against different commit log limits on different nodes.
config.max_command_size = cl->max_record_size() / 2;
config.max_log_size = 3 * config.max_command_size;
config.snapshot_threshold_log_size = config.max_log_size / 2;
config.snapshot_trailing_size = config.snapshot_threshold_log_size / 2;
};
auto server = raft::create_server(my_addr.id, std::move(rpc), std::move(state_machine),
std::move(storage), _raft_gr.failure_detector(),
raft::server::configuration {
// Dividing by two is to protect against paddings that the
// commit log can add for each mutation, as well as
// against different commit log limits on different nodes.
.max_command_size = cl ? cl->max_record_size() / 2 : 0,
.on_background_error = [gid, this](std::exception_ptr e) {
_raft_gr.abort_server(gid, fmt::format("background error, {}", e));
_status_for_monitoring = status_for_monitoring::aborted;
}
});
std::move(storage), _raft_gr.failure_detector(), config);
// initialize the corresponding timer to tick the raft server instance
auto ticker = std::make_unique<raft_ticker_type>([srv = server.get()] { srv->tick(); });

View File

@@ -248,7 +248,7 @@ BOOST_AUTO_TEST_CASE(test_log_last_conf_idx) {
add_entry(log, cfg);
BOOST_CHECK_EQUAL(log.last_conf_idx(), 3);
// apply snapshot truncates the log and resets last_conf_idx()
log.apply_snapshot(log_snapshot(log, log.last_idx()), 0);
log.apply_snapshot(log_snapshot(log, log.last_idx()), 0, 0);
BOOST_CHECK_EQUAL(log.last_conf_idx(), log.get_snapshot().idx);
// log::last_term() is maintained correctly by truncate_head/truncate_tail() (snapshotting)
BOOST_CHECK_EQUAL(log.last_term(), log.get_snapshot().term);
@@ -263,7 +263,7 @@ BOOST_AUTO_TEST_CASE(test_log_last_conf_idx) {
// entries, despite that trailing is given, a gap
// between old log entries and a snapshot would violate
// log continuity.
log.apply_snapshot(log_snapshot(log, log.last_idx() + index_t{GAP}), GAP * 2);
log.apply_snapshot(log_snapshot(log, log.last_idx() + index_t{GAP}), GAP * 2, std::numeric_limits<size_t>::max());
BOOST_CHECK(log.empty());
BOOST_CHECK_EQUAL(log.next_idx(), log.get_snapshot().idx + index_t{1});
add_entry(log, log_entry::dummy{});
@@ -271,31 +271,31 @@ BOOST_AUTO_TEST_CASE(test_log_last_conf_idx) {
add_entry(log, log_entry::dummy{});
BOOST_CHECK_EQUAL(log.in_memory_size(), 2);
// Set trailing longer than the length of the log.
log.apply_snapshot(log_snapshot(log, log.last_idx()), 3);
log.apply_snapshot(log_snapshot(log, log.last_idx()), 3, std::numeric_limits<size_t>::max());
BOOST_CHECK_EQUAL(log.in_memory_size(), 2);
// Set trailing the same length as the current log length
add_entry(log, log_entry::dummy{});
BOOST_CHECK_EQUAL(log.in_memory_size(), 3);
log.apply_snapshot(log_snapshot(log, log.last_idx()), 3);
log.apply_snapshot(log_snapshot(log, log.last_idx()), 3, std::numeric_limits<size_t>::max());
BOOST_CHECK_EQUAL(log.in_memory_size(), 3);
BOOST_CHECK_EQUAL(log.last_conf_idx(), log.get_snapshot().idx);
add_entry(log, log_entry::dummy{});
// Set trailing shorter than the length of the log
log.apply_snapshot(log_snapshot(log, log.last_idx()), 1);
log.apply_snapshot(log_snapshot(log, log.last_idx()), 1, std::numeric_limits<size_t>::max());
BOOST_CHECK_EQUAL(log.in_memory_size(), 1);
// check that configuration from snapshot is used and not config entries from a trailing
add_entry(log, cfg);
add_entry(log, cfg);
add_entry(log, log_entry::dummy{});
auto snp_idx = log.last_idx();
log.apply_snapshot(log_snapshot(log, snp_idx), 10);
log.apply_snapshot(log_snapshot(log, snp_idx), 10, std::numeric_limits<size_t>::max());
BOOST_CHECK_EQUAL(log.last_conf_idx(), snp_idx);
// Check that configuration from the log is used if it has higher index then snapshot idx
add_entry(log, log_entry::dummy{});
snp_idx = log.last_idx();
add_entry(log, cfg);
add_entry(log, cfg);
log.apply_snapshot(log_snapshot(log, snp_idx), 10);
log.apply_snapshot(log_snapshot(log, snp_idx), 10, std::numeric_limits<size_t>::max());
BOOST_CHECK_EQUAL(log.last_conf_idx(), log.last_idx());
}
@@ -1689,13 +1689,13 @@ BOOST_AUTO_TEST_CASE(test_non_voter_voter_loop) {
// If iteration count is large, this helps save some
// memory
if (rolladice(1./1000)) {
A.get_log().apply_snapshot(log_snapshot(A.get_log(), A.log_last_idx()), 0);
A.get_log().apply_snapshot(log_snapshot(A.get_log(), A.log_last_idx()), 0, 0);
}
if (rolladice(1./100)) {
B.get_log().apply_snapshot(log_snapshot(A.get_log(), B.log_last_idx()), 0);
B.get_log().apply_snapshot(log_snapshot(A.get_log(), B.log_last_idx()), 0, 0);
}
if (rolladice(1./5000)) {
C.get_log().apply_snapshot(log_snapshot(A.get_log(), B.log_last_idx()), 0);
C.get_log().apply_snapshot(log_snapshot(A.get_log(), B.log_last_idx()), 0, 0);
}
}
BOOST_CHECK(A.is_leader());
@@ -1731,7 +1731,7 @@ BOOST_AUTO_TEST_CASE(test_non_voter_confchange_in_snapshot) {
BOOST_CHECK_EQUAL(A.get_configuration().current.find(config_member_from_id(C_id))->can_vote, false);
A.tick();
raft::snapshot_descriptor A_snp{.idx = A.log_last_idx(), .term = A.log_last_term(), .config = A.get_configuration()};
A.apply_snapshot(A_snp, 0, true);
A.apply_snapshot(A_snp, 0, 0, true);
A.tick();
communicate(A, B, C);
BOOST_CHECK(A.is_leader());
@@ -1756,7 +1756,7 @@ BOOST_AUTO_TEST_CASE(test_non_voter_confchange_in_snapshot) {
BOOST_CHECK_EQUAL(A.get_configuration().current.find(config_member_from_id(C_id))->can_vote, true);
A.tick();
A_snp = raft::snapshot_descriptor{.idx = A.log_last_idx(), .term = A.log_last_term(), .config = A.get_configuration()};
A.apply_snapshot(A_snp, 0, true);
A.apply_snapshot(A_snp, 0, 0, true);
A.tick();
communicate(A, B, C);
BOOST_CHECK(A.is_leader());
@@ -2060,9 +2060,9 @@ BOOST_AUTO_TEST_CASE(test_reject_outdated_remote_snapshot) {
auto snp_term = B.get_log().term_for(snp_idx);
BOOST_CHECK(snp_term);
auto snp = raft::snapshot_descriptor{.idx = index_t{1}, .term = *snp_term, .config = cfg};
BOOST_CHECK(!B.apply_snapshot(snp, 0, false));
BOOST_CHECK(!B.apply_snapshot(snp, 0, 0, false));
// But it should apply this snapshot if it's locally generated
BOOST_CHECK(B.apply_snapshot(snp, 0, true));
BOOST_CHECK(B.apply_snapshot(snp, 0, 0, true));
}
// A server should sometimes become a candidate even though it is outside the current configuration,
@@ -2276,7 +2276,7 @@ BOOST_AUTO_TEST_CASE(test_append_entry_inside_snapshot) {
C.step(A_id, std::move(append));
(void)C.get_output();
// C snapshots the log
C.apply_snapshot(log_snapshot(C.get_log(), C.log_last_idx()), 0, true);
C.apply_snapshot(log_snapshot(C.get_log(), C.log_last_idx()), 0, 0, true);
// Try to add one more entry
A.add_entry(log_entry::dummy{});

View File

@@ -2216,9 +2216,12 @@ SEASTAR_TEST_CASE(test_frequent_snapshotting) {
}, 10'000);
const auto server_config = raft::server::configuration {
.snapshot_threshold{1},
.snapshot_threshold_log_size{150},
.snapshot_trailing{5},
.max_log_size{20},
.enable_forwarding{true}
.snapshot_trailing_size{75},
.max_log_size{300},
.enable_forwarding{true},
.max_command_size{30}
};
auto leader_id = co_await env.new_server(true, server_config);
@@ -3257,12 +3260,16 @@ SEASTAR_TEST_CASE(basic_generator_test) {
bool nemesis_crashes = true;
// TODO: randomize the snapshot thresholds between different servers for more chaos.
const auto max_command_size = 2 * sizeof(raft::log_entry);
auto srv_cfg = frequent_snapshotting
? raft::server::configuration {
.snapshot_threshold{10},
.snapshot_threshold_log_size{3 * (max_command_size + sizeof(raft::log_entry))},
.snapshot_trailing{5},
.max_log_size{20},
.snapshot_trailing_size{max_command_size + sizeof(raft::log_entry)},
.max_log_size{5 * (max_command_size + sizeof(raft::log_entry))},
.enable_forwarding{forwarding},
.max_command_size{max_command_size}
}
: raft::server::configuration {
.enable_forwarding{forwarding},

View File

@@ -158,7 +158,20 @@ RAFT_TEST_CASE(take_snapshot, (test_case{
// 2 nodes doing simple replication/snapshoting while leader's log size is limited
RAFT_TEST_CASE(backpressure, (test_case{
.nodes = 2,
.config = {{.snapshot_threshold = 10, .snapshot_trailing = 5, .max_log_size = 20}, {.snapshot_threshold = 20, .snapshot_trailing = 10}},
.config = {
{
.snapshot_threshold = 10,
.snapshot_threshold_log_size = 200,
.snapshot_trailing = 5,
.snapshot_trailing_size = 100,
.max_log_size = 450,
.max_command_size = 150
},
{
.snapshot_threshold = 20,
.snapshot_trailing = 10
}
},
.updates = {entries{100}}}));
// 3 nodes, add entries, drop leader 0, add entries [implicit re-join all]