Merge 'Add an API to trigger snapshot in Raft servers' from Kamil Braun

This allows the user of `raft::server` to cause it to create a snapshot
and truncate the Raft log (leaving no trailing entries; in the future we
may extend the API to specify number of trailing entries left if
needed). In a later commit we'll add a REST endpoint to Scylla to
trigger group 0 snapshots.

One use case for this API is to create group 0 snapshots in Scylla
deployments which upgraded to Raft in version 5.2 and started with an
empty Raft log with no snapshot at the beginning. This causes problems,
e.g. when a new node bootstraps to the cluster, it will not receive a
snapshot that would contain both schema and group 0 history, which would
then lead to inconsistent schema state and trigger assertion failures as
observed in scylladb/scylladb#16683.

In 5.4 the logic of initial group 0 setup was changed to start the Raft
log with a snapshot at index 1 (ff386e7a44)
but a problem remains with these existing deployments coming from 5.2,
we need a way to trigger a snapshot in them (other than performing 1000
arbitrary schema changes).

Another potential use case in the future would be to trigger snapshots
based on external memory pressure in tablet Raft groups (for strongly
consistent tables).

The PR adds the API to `raft::server` and a HTTP endpoint that uses it.

In a follow-up PR, we plan to modify group 0 server startup logic to automatically
call this API if it sees that no snapshot is present yet (to automatically
fix the aforementioned 5.2 deployments once they upgrade.)

Closes scylladb/scylladb#16816

* github.com:scylladb/scylladb:
  raft: remove `empty()` from `fsm_output`
  test: add test for manual triggering of Raft snapshots
  api: add HTTP endpoint to trigger Raft snapshots
  raft: server: add `trigger_snapshot` API
  raft: server: track last persisted snapshot descriptor index
  raft: server: framework for handling server requests
  raft: server: inline `poll_fsm_output`
  raft: server: fix indentation
  raft: server: move `io_fiber`'s processing of `batch` to a separate function
  raft: move `poll_output()` from `fsm` to `server`
  raft: move `_sm_events` from `fsm` to `server`
  raft: fsm: remove constructor used only in tests
  raft: fsm: move trace message from `poll_output` to `has_output`
  raft: fsm: extract `has_output()`
  raft: pass `max_trailing_entries` through `fsm_output` to `store_snapshot_descriptor`
  raft: server: pass `*_aborted` to `set_exception` call

(cherry picked from commit d202d32f81)

Backport notes:
- `has_output()` has a smaller condition in the backported version
  (because the condition was smaller in `poll_output()`)
- `process_fsm_output` has a smaller body (because `io_fiber` had a
  smaller body) in the backported version
- the HTTP API is only started if `raft_group_registry` is started
This commit is contained in:
Botond Dénes
2024-01-29 15:06:04 +02:00
committed by Kamil Braun
parent cbe8e05ef6
commit ce0ed29ad6
16 changed files with 590 additions and 197 deletions

View File

@@ -298,7 +298,7 @@ BOOST_AUTO_TEST_CASE(test_vote_from_any_state) {
server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)};
raft::configuration cfg = config_from_ids({id1, id2, id3});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd, fsm_cfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), fd, fsm_cfg);
// Follower
BOOST_CHECK(fsm.is_follower());
@@ -360,7 +360,7 @@ BOOST_AUTO_TEST_CASE(test_log_replication_1) {
server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)};
raft::configuration cfg = config_from_ids({id1, id2, id3});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
election_timeout(fsm);
BOOST_CHECK(fsm.is_candidate());
@@ -425,7 +425,7 @@ BOOST_AUTO_TEST_CASE(test_log_replication_2) {
server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)};
raft::configuration cfg = config_from_ids({id1, id2, id3});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
election_timeout(fsm);
output = fsm.get_output();
@@ -485,7 +485,7 @@ BOOST_AUTO_TEST_CASE(test_single_node_commit) {
server_id id1{utils::UUID(0, 1)};
raft::configuration cfg = config_from_ids({id1});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
BOOST_CHECK(fsm.is_leader()); // Single node skips candidate state
output = fsm.get_output();
@@ -578,11 +578,11 @@ BOOST_AUTO_TEST_CASE(test_dueling_candidates) {
server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)};
raft::configuration cfg = config_from_ids({id1, id2, id3});
raft::log log1{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg);
fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg);
raft::log log2{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg);
fsm_debug fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg);
raft::log log3{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg);
fsm_debug fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg);
// fsm1 and fsm3 don't see each other
make_candidate(fsm1);
@@ -621,11 +621,11 @@ BOOST_AUTO_TEST_CASE(test_dueling_pre_candidates) {
server_id id1{utils::UUID(0, 1)}, id2{utils::UUID(0, 2)}, id3{utils::UUID(0, 3)};
raft::configuration cfg = config_from_ids({id1, id2, id3});
raft::log log1{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre);
fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre);
raft::log log2{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg_pre);
fsm_debug fsm2(id2, term_t{}, server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg_pre);
raft::log log3{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg_pre);
fsm_debug fsm3(id3, term_t{}, server_id{}, std::move(log3), trivial_failure_detector, fsm_cfg_pre);
// fsm1 and fsm3 don't see each other
make_candidate(fsm1);
@@ -667,7 +667,7 @@ BOOST_AUTO_TEST_CASE(test_single_node_pre_candidate) {
server_id id1{utils::UUID(0, 1)};
raft::configuration cfg = config_from_ids({id1});
raft::log log1{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre);
fsm_debug fsm1(id1, term_t{}, server_id{}, std::move(log1), trivial_failure_detector, fsm_cfg_pre);
BOOST_CHECK(fsm1.is_leader());
}
@@ -743,7 +743,7 @@ void handle_proposal(unsigned nodes, std::vector<int> accepting_int) {
raft::configuration cfg = config_from_ids(ids);
raft::log log1{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm1(raft::server_id{utils::UUID(0, 1)}, term_t{}, server_id{}, std::move(log1),
fsm_debug fsm1(raft::server_id{utils::UUID(0, 1)}, term_t{}, server_id{}, std::move(log1),
trivial_failure_detector, fsm_cfg);
// promote 1 to become leader (i.e. gets votes)

View File

@@ -304,7 +304,7 @@ void test_election_single_node_helper(raft::fsm_config fcfg) {
server_id id1 = id();
raft::configuration cfg = config_from_ids({id1});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg);
election_timeout(fsm);
@@ -529,7 +529,7 @@ BOOST_AUTO_TEST_CASE(test_election_two_nodes_prevote) {
raft::configuration cfg = config_from_ids({id1, id2});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), trivial_failure_detector, fcfg);
// Initial state is follower
BOOST_CHECK(fsm.is_follower());
@@ -595,7 +595,7 @@ BOOST_AUTO_TEST_CASE(test_election_four_nodes_prevote) {
raft::configuration cfg = config_from_ids({id1, id2, id3, id4});
raft::log log{raft::snapshot_descriptor{.config = cfg}};
raft::fsm fsm(id1, term_t{}, server_id{}, std::move(log), fd, fcfg);
fsm_debug fsm(id1, term_t{}, server_id{}, std::move(log), fd, fcfg);
// Initial state is follower
BOOST_CHECK(fsm.is_follower());
@@ -652,7 +652,7 @@ BOOST_AUTO_TEST_CASE(test_log_matching_rule) {
log.emplace_back(seastar::make_lw_shared<raft::log_entry>(raft::log_entry{term_t{10}, index_t{1000}}));
log.stable_to(log.last_idx());
raft::fsm fsm(id1, term_t{10}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug fsm(id1, term_t{10}, server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
// Initial state is follower
BOOST_CHECK(fsm.is_follower());
@@ -929,7 +929,7 @@ BOOST_AUTO_TEST_CASE(test_leader_stepdown) {
{server_addr_from_id(id1), true}, {server_addr_from_id(id2), true}, {server_addr_from_id(id3), false}});
raft::log log(raft::snapshot_descriptor{.config = cfg});
raft::fsm fsm(id1, term_t{1}, /* voted for */ server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug fsm(id1, term_t{1}, /* voted for */ server_id{}, std::move(log), trivial_failure_detector, fsm_cfg);
// Check that we move to candidate state on timeout_now message
fsm.step(id2, raft::timeout_now{fsm.get_current_term()});
@@ -1034,7 +1034,7 @@ BOOST_AUTO_TEST_CASE(test_leader_stepdown) {
{server_addr_from_id(id1), true}, {server_addr_from_id(id2), true}, {server_addr_from_id(id3), true}});
raft::log log2(raft::snapshot_descriptor{.config = cfg});
raft::fsm fsm2(id1, term_t{1}, /* voted for */ server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg);
fsm_debug fsm2(id1, term_t{1}, /* voted for */ server_id{}, std::move(log2), trivial_failure_detector, fsm_cfg);
election_timeout(fsm2);
// Turn to a leader
@@ -1152,7 +1152,7 @@ BOOST_AUTO_TEST_CASE(test_confchange_a_to_b) {
// A somewhat awkward way to obtain B's log for restart
log.emplace_back(make_lw_shared<raft::log_entry>(B.add_entry(config_from_ids({A_id}))));
log.stable_to(log.last_idx());
raft::fsm B_1(B_id, B.get_current_term(), B_id, std::move(log), trivial_failure_detector, fsm_cfg);
fsm_debug B_1(B_id, B.get_current_term(), B_id, std::move(log), trivial_failure_detector, fsm_cfg);
election_timeout(B_1);
communicate(A, B_1);
BOOST_CHECK(B_1.is_follower());
@@ -1469,7 +1469,7 @@ BOOST_AUTO_TEST_CASE(test_zero) {
BOOST_AUTO_TEST_CASE(test_reordered_reject) {
auto id1 = id();
raft::fsm fsm1(id1, term_t{1}, server_id{},
fsm_debug fsm1(id1, term_t{1}, server_id{},
raft::log{raft::snapshot_descriptor{.config = config_from_ids({id1})}},
trivial_failure_detector, fsm_cfg);
@@ -1481,7 +1481,7 @@ BOOST_AUTO_TEST_CASE(test_reordered_reject) {
(void)fsm1.get_output();
auto id2 = id();
raft::fsm fsm2(id2, term_t{1}, server_id{},
fsm_debug fsm2(id2, term_t{1}, server_id{},
raft::log{raft::snapshot_descriptor{.config = raft::configuration{}}},
trivial_failure_detector, fsm_cfg);

View File

@@ -94,8 +94,8 @@ communicate_impl(std::function<bool()> stop_pred, raft_routing_map& map) {
has_traffic = false;
for (auto e : map) {
raft::fsm& from = *e.second;
bool has_output;
for (auto output = from.get_output(); !output.empty(); output = from.get_output()) {
for (bool has_output = from.has_output(); has_output; has_output = from.has_output()) {
auto output = from.get_output();
if (stop_pred()) {
return;
}

View File

@@ -63,9 +63,20 @@ raft::command create_command(T val) {
extern raft::fsm_config fsm_cfg;
extern raft::fsm_config fsm_cfg_pre;
class fsm_debug : public raft::fsm {
struct sm_events_container {
seastar::condition_variable sm_events;
};
class fsm_debug : public sm_events_container, public raft::fsm {
public:
using raft::fsm::fsm;
explicit fsm_debug(raft::server_id id, raft::term_t current_term, raft::server_id voted_for, raft::log log,
raft::failure_detector& failure_detector, raft::fsm_config conf)
: sm_events_container()
, fsm(id, current_term, voted_for, std::move(log), raft::index_t{0}, failure_detector, conf, sm_events) {
}
void become_follower(raft::server_id leader) {
raft::fsm::become_follower(leader);
}

View File

@@ -0,0 +1,102 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import asyncio
import pytest
import time
import logging
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, read_barrier
logger = logging.getLogger(__name__)
async def get_raft_log_size(cql, host) -> int:
query = "select count(\"index\") from system.raft"
return (await cql.run_async(query, host=host))[0][0]
async def get_raft_snap_id(cql, host) -> str:
query = "select snapshot_id from system.raft_snapshots"
return (await cql.run_async(query, host=host))[0].snapshot_id
async def trigger_snapshot(manager: ManagerClient, group0_id: str, ip_addr) -> None:
await manager.api.client.post(f"/raft/trigger_snapshot/{group0_id}", host=ip_addr)
@pytest.mark.asyncio
async def test_raft_snapshot_request(manager: ManagerClient):
servers = [await manager.server_add() for _ in range(3)]
cql = manager.cql
assert(cql)
s1 = servers[0]
h1 = (await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60))[0]
group0_id = (await cql.run_async(
"select value from system.scylla_local where key = 'raft_group0_id'",
host=h1))[0].value
# Verify that snapshotting updates the snapshot ID and truncates the log.
log_size = await get_raft_log_size(cql, h1)
logger.info(f"Log size on {s1}: {log_size}")
snap_id = await get_raft_snap_id(cql, h1)
logger.info(f"Snapshot ID on {s1}: {snap_id}")
assert log_size > 0
await trigger_snapshot(manager, group0_id, s1.ip_addr)
new_log_size = await get_raft_log_size(cql, h1)
logger.info(f"New log size on {s1}: {new_log_size}")
new_snap_id = await get_raft_snap_id(cql, h1)
logger.info(f"New snapshot ID on {s1}: {new_snap_id}")
assert new_log_size == 0
assert new_snap_id != snap_id
# If a server misses a command and a snapshot is created on the leader,
# the server once alive should eventually receive that snapshot.
s2 = servers[2]
h2 = (await wait_for_cql_and_get_hosts(cql, [s2], time.time() + 60))[0]
s2_log_size = await get_raft_log_size(cql, h2)
logger.info(f"Log size on {s2}: {s2_log_size}")
s2_snap_id = await get_raft_snap_id(cql, h2)
logger.info(f"Snapshot ID on {s2}: {s2_snap_id}")
await manager.server_stop_gracefully(s2.server_id)
logger.info(f"Stopped {s2}")
# Restarting the two servers will cause a newly elected leader to append a dummy command.
await asyncio.gather(*(manager.server_restart(s.server_id) for s in servers[:2]))
logger.info(f"Restarted {servers[:2]}")
# Wait for one server to append the command and do a read_barrier on the other
# to make sure both appended
async def appended_command() -> int | None:
await wait_for_cql_and_get_hosts(cql, [s1], time.time() + 60)
s = await get_raft_log_size(cql, h1)
if s > 0:
return s
return None
log_size = await wait_for(appended_command, time.time() + 60)
logger.info(f"{servers[0]} appended new command")
h = (await wait_for_cql_and_get_hosts(cql, [servers[1]], time.time() + 60))[0]
await read_barrier(cql, h)
logger.info(f"Read barrier done on {servers[1]}")
# We don't know who the leader is, so trigger a snapshot on both servers.
for s in servers[:2]:
await trigger_snapshot(manager, group0_id, s.ip_addr)
h = (await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0]
snap = await get_raft_snap_id(cql, h)
logger.info("New snapshot ID on {s}: {snap}")
await manager.server_start(s2.server_id)
logger.info(f"Server {s2} restarted")
await wait_for_cql_and_get_hosts(cql, [s2], time.time() + 60)
async def received_snapshot() -> str | None:
new_s2_snap_id = await get_raft_snap_id(cql, h2)
if s2_snap_id != new_s2_snap_id:
return new_s2_snap_id
return None
new_s2_snap_id = await wait_for(received_snapshot, time.time() + 60)
logger.info(f"{s2} received new snapshot: {new_s2_snap_id}")
new_s2_log_size = await get_raft_log_size(cql, h2)
assert new_s2_log_size == 0