diff --git a/raft/raft.hh b/raft/raft.hh
index 9cc20b1cfe..fab1a3148c 100644
--- a/raft/raft.hh
+++ b/raft/raft.hh
@@ -200,7 +200,7 @@ struct error : public std::runtime_error {
struct not_a_leader : public error {
server_id leader;
- explicit not_a_leader(server_id l) : error("Not a leader"), leader(l) {}
+ explicit not_a_leader(server_id l) : error(format("Not a leader, leader: {}", l)), leader(l) {}
};
struct dropped_entry : public error {
diff --git a/test/raft/logical_timer.hh b/test/raft/logical_timer.hh
new file mode 100644
index 0000000000..1a86527891
--- /dev/null
+++ b/test/raft/logical_timer.hh
@@ -0,0 +1,224 @@
+/*
+ * Copyright (C) 2021 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#pragma once
+
+#include
+#include
+#include
+
+#include "raft/logical_clock.hh"
+
+using namespace seastar;
+
+constexpr raft::logical_clock::duration operator "" _t(unsigned long long ticks) {
+ return raft::logical_clock::duration{ticks};
+}
+
+// A wrapper around `raft::logical_clock` that allows scheduling events to happen after a certain number of ticks.
+class logical_timer {
+ struct scheduled_impl {
+ private:
+ bool _resolved = false;
+
+ virtual void do_resolve() = 0;
+
+ public:
+ virtual ~scheduled_impl() { }
+
+ void resolve() {
+ if (!_resolved) {
+ do_resolve();
+ _resolved = true;
+ }
+ }
+
+ void mark_resolved() { _resolved = true; }
+ bool resolved() { return _resolved; }
+ };
+
+ struct scheduled {
+ raft::logical_clock::time_point _at;
+ seastar::shared_ptr _impl;
+
+ void resolve() { _impl->resolve(); }
+ bool resolved() { return _impl->resolved(); }
+ };
+
+ raft::logical_clock _clock;
+
+ // A min-heap of `scheduled` events sorted by `_at`.
+ std::vector _scheduled;
+
+ // Comparator for the `_scheduled` min-heap.
+ static bool cmp(const scheduled& a, const scheduled& b) {
+ return a._at > b._at;
+ }
+
+public:
+ template
+ class timed_out : public raft::error {
+ // lw_shared_ptr to make the exception copyable
+ lw_shared_ptr> _fut;
+
+ public:
+ timed_out(seastar::future f) : error("timed out"), _fut(make_lw_shared(std::move(f))) {}
+ timed_out(const timed_out&) = default;
+ timed_out(timed_out&&) = default;
+
+ seastar::future& get_future() {
+ return *_fut;
+ }
+ };
+
+ logical_timer() = default;
+
+ logical_timer(const logical_timer&) = delete;
+ logical_timer(logical_timer&&) = default;
+
+ // Returns the current logical time (number of `tick()`s since initialization).
+ raft::logical_clock::time_point now() {
+ return _clock.now();
+ }
+
+ // Tick the internal clock.
+ // Resolve all events whose scheduled times arrive after that tick.
+ void tick() {
+ _clock.advance();
+ while (!_scheduled.empty() && _scheduled.front()._at <= _clock.now()) {
+ _scheduled.front().resolve();
+ std::pop_heap(_scheduled.begin(), _scheduled.end(), cmp);
+ _scheduled.pop_back();
+ }
+ }
+
+ // Given a future `f` and a logical time point `tp`, returns a future that resolves
+ // when either `f` resolves or the time point `tp` arrives (according to the number of `tick()` calls),
+ // whichever comes first.
+ //
+ // If `tp` comes first, the returned future is resolved with the `timed_out` exception.
+ // The exception contains a future equivalent to the original future (i.e. when the original future
+ // resolves, the future inside the exception will contain the original future's result).
+ //
+ // Note: it is highly recommended to pass in futures that always resolve eventually
+ // since we attach continuations to these futures, allocating memory.
+ //
+ // Note: there is a possibility that the internal heap will grow unbounded if we call `with_timeout`
+ // more often than we `tick`, so don't do that. It is recommended to call at least one
+ // `tick` per one `with_timeout` call (on average in the long run).
+ template
+ future with_timeout(raft::logical_clock::time_point tp, future f) {
+ if (f.available()) {
+ return f;
+ }
+
+ if (tp <= now()) {
+ return make_exception_future(timed_out{std::move(f)});
+ }
+
+ struct sched : public scheduled_impl {
+ // The original future (the `f` argument), when it resolves, will set value on `_p`.
+ // Before timeout, `_p` is connected to the future returned to the user (`res` below).
+ // After timeout, `_p` is a new promise connected to the future returned inside the timed_out exception.
+ // Therefore:
+ // 1. if `f` resolves before timeout, it will resolve `res`,
+ // 2. otherwise it will resolve the future we return inside `timed_out` which is returned through `res`.
+ promise _p;
+
+ virtual ~sched() override { }
+ virtual void do_resolve() override {
+ promise new_p;
+ _p.set_exception(timed_out{new_p.get_future()});
+ std::swap(new_p, _p);
+ }
+ };
+
+ auto s = make_shared();
+ auto res = s->_p.get_future();
+
+ _scheduled.push_back(scheduled{
+ ._at = tp,
+ ._impl = s
+ });
+ std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
+
+ (void)f.then_wrapped([s = std::move(s)] (future f) mutable {
+ // If we're before timeout, `s->_p` is connected to `res` so we're returning the result of `f` to our caller.
+ // Otherwise `s->_p` is connected to a future which was previously returned to our caller inside `timed_out`.
+ f.forward_to(std::move(s->_p));
+ s->mark_resolved();
+ // tick() will (eventually) clear the `_scheduled` entry
+ // (unless we're already past timeout, in which case the entry has already been cleared).
+ });
+
+ return res;
+ }
+
+ // Returns a future that resolves after a number of `tick()`s represented by `d`.
+ // Example usage: `sleep(20_t)` resolves after 20 `tick()`s.
+ // Note: analogous remark applies as for `with_timeout`, i.e. make sure to call at least one `tick`
+ // per one `sleep` call on average.
+ future<> sleep(raft::logical_clock::duration d) {
+ if (d == raft::logical_clock::duration{0}) {
+ return make_ready_future<>();
+ }
+
+ struct sched : public scheduled_impl {
+ promise<> _p;
+ virtual ~sched() override {}
+ virtual void do_resolve() override { _p.set_value(); }
+ };
+
+ auto s = make_shared();
+ auto f = s->_p.get_future();
+ _scheduled.push_back(scheduled{
+ ._at = now() + d,
+ ._impl = std::move(s)
+ });
+ std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
+
+ return f;
+ }
+
+ // Schedule `f` to be called at logical time point `tp` (according to this timer's clock).
+ template
+ void schedule(raft::logical_clock::time_point tp, F f) {
+ if (tp <= now()) {
+ f();
+ return;
+ }
+
+ struct sched : public scheduled_impl {
+ sched(F f) : _f(std::move(f)) {}
+ virtual ~sched() override {}
+ virtual void do_resolve() override { _f(); }
+
+ F _f;
+ };
+
+ _scheduled.push_back(scheduled {
+ ._at = tp,
+ ._impl = make_shared(std::move(f))
+ });
+ std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
+ }
+};
+
diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc
index 98d6758a7f..edd93bd320 100644
--- a/test/raft/randomized_nemesis_test.cc
+++ b/test/raft/randomized_nemesis_test.cc
@@ -36,6 +36,9 @@
#include "idl/uuid.dist.hh"
#include "idl/uuid.dist.impl.hh"
+#include "test/raft/logical_timer.hh"
+#include "test/raft/ticker.hh"
+
using namespace seastar;
using namespace std::chrono_literals;
@@ -66,145 +69,6 @@ requires (typename M::state_t s, typename M::input_t i) {
requires std::is_same_v;
};
-constexpr raft::logical_clock::duration operator "" _t(unsigned long long ticks) {
- return raft::logical_clock::duration{ticks};
-}
-
-// A wrapper around `raft::logical_clock` that allows scheduling events to happen after a certain number of ticks.
-class logical_timer {
- struct scheduled_impl {
- private:
- bool _resolved = false;
-
- virtual void do_resolve() = 0;
-
- public:
- virtual ~scheduled_impl() { }
-
- void resolve() {
- if (!_resolved) {
- do_resolve();
- _resolved = true;
- }
- }
-
- void mark_resolved() { _resolved = true; }
- bool resolved() { return _resolved; }
- };
-
- struct scheduled {
- raft::logical_clock::time_point _at;
- seastar::shared_ptr _impl;
-
- void resolve() { _impl->resolve(); }
- bool resolved() { return _impl->resolved(); }
- };
-
- raft::logical_clock _clock;
-
- // A min-heap of `scheduled` events sorted by `_at`.
- std::vector _scheduled;
-
- // Comparator for the `_scheduled` min-heap.
- static bool cmp(const scheduled& a, const scheduled& b) {
- return a._at > b._at;
- }
-
-public:
- logical_timer() = default;
-
- logical_timer(const logical_timer&) = delete;
- logical_timer(logical_timer&&) = default;
-
- // Returns the current logical time (number of `tick()`s since initialization).
- raft::logical_clock::time_point now() {
- return _clock.now();
- }
-
- // Tick the internal clock.
- // Resolve all events whose scheduled times arrive after that tick.
- void tick() {
- _clock.advance();
- while (!_scheduled.empty() && _scheduled.front()._at <= _clock.now()) {
- _scheduled.front().resolve();
- std::pop_heap(_scheduled.begin(), _scheduled.end(), cmp);
- _scheduled.pop_back();
- }
- }
-
- // Given a future `f` and a logical time point `tp`, returns a future that resolves
- // when either `f` resolves or the time point `tp` arrives (according to the number of `tick()` calls),
- // whichever comes first.
- //
- // Note: if `tp` comes first, that doesn't cancel any in-progress computations behind `f`.
- // `f` effectively becomes a discarded future.
- // Note: there is a possibility that the internal heap will grow unbounded if we call `with_timeout`
- // more often than we `tick`, so don't do that. It is recommended to call at least one
- // `tick` per one `with_timeout` call (on average in the long run).
- template
- future with_timeout(raft::logical_clock::time_point tp, future f) {
- if (f.available()) {
- return f;
- }
-
- struct sched : public scheduled_impl {
- promise _p;
-
- virtual ~sched() override { }
- virtual void do_resolve() override {
- _p.set_exception(std::make_exception_ptr(timed_out_error()));
- }
- };
-
- auto s = make_shared();
- auto res = s->_p.get_future();
-
- _scheduled.push_back(scheduled{
- ._at = tp,
- ._impl = s
- });
- std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
-
- (void)f.then_wrapped([s = std::move(s)] (auto&& f) mutable {
- if (s->resolved()) {
- f.ignore_ready_future();
- } else {
- f.forward_to(std::move(s->_p));
- s->mark_resolved();
- // tick() will (eventually) clear the `_scheduled` entry.
- }
- });
-
- return res;
- }
-
- // Returns a future that resolves after a number of `tick()`s represented by `d`.
- // Example usage: `sleep(20_t)` resolves after 20 `tick()`s.
- // Note: analogous remark applies as for `with_timeout`, i.e. make sure to call at least one `tick`
- // per one `sleep` call on average.
- future<> sleep(raft::logical_clock::duration d) {
- if (d == raft::logical_clock::duration{0}) {
- return make_ready_future<>();
- }
-
- struct sched : public scheduled_impl {
- promise<> _p;
- virtual ~sched() override {}
- virtual void do_resolve() override { _p.set_value(); }
- };
-
- auto s = make_shared();
- auto f = s->_p.get_future();
- _scheduled.push_back(scheduled{
- ._at = now() + d,
- ._impl = std::move(s)
- });
- std::push_heap(_scheduled.begin(), _scheduled.end(), cmp);
-
- return f;
- }
-};
-
// Used to uniquely identify commands passed into `apply` in order to return
// the outputs of these commands. See `impure_state_machine` and `call`.
using cmd_id_t = utils::UUID;
@@ -271,6 +135,7 @@ public:
// We are on the leader server where the client submitted the command
// and waits for the output. Send it to them.
it->second.set_value(std::move(output));
+ _output_channels.erase(it);
} else {
// This is not the leader on which the command was submitted,
// or it is but the client already gave up on us and deallocated the channel.
@@ -284,7 +149,7 @@ public:
future take_snapshot() override {
auto id = raft::snapshot_id::create_random_id();
- _snapshots[id] = _val;
+ assert(_snapshots.emplace(id, _val).second);
co_return id;
}
@@ -303,6 +168,10 @@ public:
return _gate.close();
}
+ struct output_channel_dropped : public raft::error {
+ output_channel_dropped() : error("output channel dropped") {}
+ };
+
// Before sending a command to Raft, the client must obtain a command ID
// and an output channel using this function.
template
@@ -313,7 +182,13 @@ public:
auto cmd_id = utils::make_random_uuid();
assert(_output_channels.emplace(cmd_id, std::move(p)).second);
- auto guard = defer([this, cmd_id] { _output_channels.erase(cmd_id); });
+ auto guard = defer([this, cmd_id] {
+ auto it = _output_channels.find(cmd_id);
+ if (it != _output_channels.end()) {
+ it->second.set_exception(output_channel_dropped{});
+ _output_channels.erase(it);
+ }
+ });
return f(cmd_id, std::move(fut)).finally([guard = std::move(guard)] {});
});
}
@@ -330,7 +205,7 @@ raft::command make_command(const cmd_id_t& cmd_id, const Input& input) {
// TODO: handle other errors?
template
-using call_result_t = std::variant;
+using call_result_t = std::variant;
// Sends a given `input` as a command to `server`, waits until the command gets replicated
// and applied on that server and returns the produced output.
@@ -349,13 +224,20 @@ future> call(
logical_timer& timer,
raft::server& server,
impure_state_machine& sm) {
+ using output_channel_dropped = typename impure_state_machine::output_channel_dropped;
return sm.with_output_channel([&, input = std::move(input), timeout] (cmd_id_t cmd_id, future f) {
return timer.with_timeout(timeout, [&] (typename M::input_t input, future f) {
return server.add_entry(
make_command(std::move(cmd_id), std::move(input)),
raft::wait_type::applied
- ).then([f = std::move(f)] () mutable {
- return std::move(f);
+ ).then_wrapped([output_f = std::move(f)] (future<> add_entry_f) mutable {
+ if (add_entry_f.failed()) {
+ // We need to discard `output_f`; the only expected exception is:
+ (void)output_f.discard_result().handle_exception_type([] (const output_channel_dropped&) {});
+ std::rethrow_exception(add_entry_f.get_exception());
+ }
+
+ return std::move(output_f);
});
}(std::move(input), std::move(f)));
}).then([] (typename M::output_t output) {
@@ -365,8 +247,13 @@ future> call(
std::rethrow_exception(eptr);
} catch (raft::not_a_leader e) {
return make_ready_future>(e);
- } catch (timed_out_error e) {
+ } catch (raft::dropped_entry e) {
return make_ready_future>(e);
+ } catch (logical_timer::timed_out e) {
+ (void)e.get_future().discard_result()
+ .handle_exception_type([] (const output_channel_dropped&) {})
+ .handle_exception_type([] (const raft::dropped_entry&) {});
+ return make_ready_future>(timed_out_error{});
}
});
}
@@ -546,7 +433,7 @@ public:
}
};
-template
+template
class persistence : public raft::persistence {
snapshots_t& _snapshots;
@@ -577,13 +464,13 @@ public:
// containing opnly this server's ID which must be also provided here as `init_config_id`.
// Otherwise it must be initialized with an empty configuration (it will be added to the cluster
// through a configuration change) and `init_config_id` must be `nullopt`.
- persistence(snapshots_t& snaps, std::optional init_config_id)
+ persistence(snapshots_t& snaps, std::optional init_config_id, State init_state)
: _snapshots(snaps)
, _stored_snapshot(
raft::snapshot{
.config = init_config_id ? raft::configuration{*init_config_id} : raft::configuration{}
},
- init_state)
+ std::move(init_state))
, _stored_term_and_vote(raft::term_t{1}, raft::server_id{})
{}
@@ -605,6 +492,12 @@ public:
assert(it != _snapshots.end());
_stored_snapshot = {snap, it->second};
+ if (!_stored_entries.empty() && snap.idx > _stored_entries.back()->idx) {
+ // Clear the log in order to not create a gap.
+ _stored_entries.clear();
+ co_return;
+ }
+
auto first_to_remain = snap.idx + 1 >= preserve_log_entries ? raft::index_t{snap.idx + 1 - preserve_log_entries} : raft::index_t{0};
_stored_entries.erase(_stored_entries.begin(), find(first_to_remain));
@@ -613,7 +506,7 @@ public:
virtual future load_snapshot() override {
auto [snap, state] = _stored_snapshot;
- _snapshots[snap.id] = std::move(state);
+ _snapshots.insert_or_assign(snap.id, std::move(state));
co_return snap;
}
@@ -679,11 +572,14 @@ private:
// The last time we sent a heartbeat.
raft::logical_clock::time_point _last_beat;
+ // How long from the last received heartbeat does it take to convict a node as dead.
+ const raft::logical_clock::duration _convict_threshold;
+
send_heartbeat_t _send_heartbeat;
public:
- failure_detector(send_heartbeat_t f)
- : _send_heartbeat(std::move(f))
+ failure_detector(raft::logical_clock::duration convict_threshold, send_heartbeat_t f)
+ : _convict_threshold(convict_threshold), _send_heartbeat(std::move(f))
{
send_heartbeats();
assert(_last_beat == _clock.now());
@@ -723,9 +619,6 @@ public:
}
bool is_alive(raft::server_id id) override {
- // TODO: make it adjustable
- static const raft::logical_clock::duration _convict_threshold = 50_t;
-
return _clock.now() < _last_heard[id] + _convict_threshold;
}
};
@@ -775,17 +668,19 @@ private:
raft::logical_clock _clock;
+ // How long does it take to deliver a message?
+ // TODO: use a random distribution or let the user change this dynamically
+ raft::logical_clock::duration _delivery_delay;
+
public:
- network(deliver_t f)
- : _deliver(std::move(f)) {}
+ network(raft::logical_clock::duration delivery_delay, deliver_t f)
+ : _deliver(std::move(f)), _delivery_delay(delivery_delay) {}
void send(raft::server_id src, raft::server_id dst, Payload payload) {
// Predict the delivery time in advance.
// Our prediction may be wrong if a grudge exists at this expected moment of delivery.
// Messages may also be reordered.
- // TODO: scale with number of msgs already in transit and payload size?
- // TODO: randomize the delivery time
- auto delivery_time = _clock.now() + 5_t;
+ auto delivery_time = _clock.now() + _delivery_delay;
_events.push_back(event{delivery_time, message{src, dst, make_lw_shared(std::move(payload))}});
std::push_heap(_events.begin(), _events.end(), cmp);
@@ -796,6 +691,14 @@ public:
deliver();
}
+ void add_grudge(raft::server_id src, raft::server_id dst) {
+ _grudges[dst].insert(src);
+ }
+
+ void remove_grudge(raft::server_id src, raft::server_id dst) {
+ _grudges[dst].erase(src);
+ }
+
private:
void deliver() {
// Deliver every message whose time has come.
@@ -897,6 +800,39 @@ private:
}
};
+using reconfigure_result_t = std::variant;
+
+future reconfigure(
+ const std::vector& ids,
+ raft::logical_clock::time_point timeout,
+ logical_timer& timer,
+ raft::server& server) {
+ raft::server_address_set config;
+ for (auto id : ids) {
+ config.insert(raft::server_address { .id = id });
+ }
+
+ try {
+ co_await timer.with_timeout(timeout, [&server, config = std::move(config)] () {
+ return server.set_configuration(std::move(config));
+ }());
+ co_return std::monostate{};
+ } catch (raft::not_a_leader e) {
+ co_return e;
+ } catch (raft::dropped_entry e) {
+ co_return e;
+ } catch (raft::commit_status_unknown e) {
+ co_return e;
+ } catch (raft::conf_change_in_progress e) {
+ co_return e;
+ } catch (logical_timer::timed_out e) {
+ (void)e.get_future().discard_result()
+ .handle_exception_type([] (const raft::dropped_entry&) {});
+ co_return timed_out_error{};
+ }
+}
+
// Contains a `raft::server` and other facilities needed for it and the underlying
// modules (persistence, rpc, etc.) to run, and to communicate with the external environment.
template
@@ -939,7 +875,7 @@ public:
auto snapshots = std::make_unique>();
auto sm = std::make_unique>(*snapshots);
auto rpc_ = std::make_unique>(*snapshots, std::move(send_rpc));
- auto persistence_ = std::make_unique>(*snapshots, first_server ? std::optional{id} : std::nullopt);
+ auto persistence_ = std::make_unique>(*snapshots, first_server ? std::optional{id} : std::nullopt, M::init);
auto queue = std::make_unique>(*rpc_);
auto& sm_ref = *sm;
@@ -1009,10 +945,13 @@ public:
});
}
- future<> set_configuration(raft::server_address_set c) {
+ future reconfigure(
+ const std::vector& ids,
+ raft::logical_clock::time_point timeout,
+ logical_timer& timer) {
assert(_started);
- return with_gate(_gate, [this, c = std::move(c)] {
- return _server->set_configuration(std::move(c));
+ return with_gate(_gate, [this, &ids, timeout, &timer] {
+ return ::reconfigure(ids, timeout, timer, *_server);
});
}
@@ -1059,6 +998,11 @@ static raft::server_id to_raft_id(size_t id) {
return raft::server_id{utils::UUID{0, id}};
}
+struct environment_config {
+ raft::logical_clock::duration network_delay;
+ raft::logical_clock::duration fd_convict_threshold;
+};
+
// A set of `raft_server`s connected by a `network`.
//
// The `network` is initialized with a message delivery function
@@ -1079,6 +1023,9 @@ class environment : public seastar::weakly_referencable> {
shared_ptr _fd;
};
+ // Passed to newly created failure detectors.
+ const raft::logical_clock::duration _fd_convict_threshold;
+
// Used to deliver messages coming from the network to appropriate servers and their failure detectors.
// Also keeps the servers and the failure detectors alive (owns them).
// Before we show a Raft server to others we must add it to this map.
@@ -1098,8 +1045,8 @@ class environment : public seastar::weakly_referencable> {
seastar::gate _gate;
public:
- environment()
- : _network(
+ environment(environment_config cfg)
+ : _fd_convict_threshold(cfg.fd_convict_threshold), _network(cfg.network_delay,
[this] (raft::server_id src, raft::server_id dst, const message_t& m) {
auto& [s, fd] = _routes.at(dst);
fd->receive_heartbeat(src);
@@ -1145,7 +1092,7 @@ public:
// the first creating the failure detector for a node and wiring it up, the second creating a server on a given node.
// We will also possibly need to introduce some kind of ``node IDs'' which `failure_detector` (and `network`)
// will operate on (currently they operate on `raft::server_id`s, assuming a 1-1 mapping of server-to-node).
- auto fd = seastar::make_shared([id, this] (raft::server_id dst) {
+ auto fd = seastar::make_shared(_fd_convict_threshold, [id, this] (raft::server_id dst) {
_network.send(id, dst, std::nullopt);
});
@@ -1176,6 +1123,10 @@ public:
return *_routes.at(id)._server;
}
+ network& get_network() {
+ return _network;
+ }
+
// Must be called before we are destroyed unless `new_server` was never called.
future<> abort() {
// Close the gate before iterating over _routes to prevent concurrent modification by other methods.
@@ -1187,88 +1138,18 @@ public:
}
};
-// Given a set of functions and associated positive natural numbers,
-// calls the functions with periods defined by their corresponding numbers,
-// yielding in between.
-//
-// Define a "tick" to be a (possibly empty) set of calls to some of the given functions
-// followed by a yield. The ticker executes a sequence of ticks. Given {n, f}, where n
-// is a number and f is a function, f will be called each nth tick.
-//
-// For example, suppose the ticker was started with the set {{2, f}, {4, g}, {4, h}}.
-// Then the functions called in each tick are:
-// tick 1: f, g, h tick 2: none, tick 3: f, tick 4: none, tick 5: f, g, h tick 2: none, and so on.
-//
-// The order of calls within a single tick is unspecified.
-//
-// The number of ticks can be limited. We crash if the ticker reaches the limit before it's `abort()`ed.
-//
-// Call `start` to provide the distribution and start the ticking.
-class ticker {
- bool _stop = false;
- std::optional> _ticker;
- std::optional> _promise;
+template &, ticker&> F>
+auto with_env_and_ticker(environment_config cfg, F f) {
+ return do_with(std::move(f), std::make_unique>(std::move(cfg)), std::make_unique(tlogger),
+ [] (F& f, std::unique_ptr>& env, std::unique_ptr& t) {
+ return f(*env, *t).finally([&env_ = env, &t_ = t] () mutable -> future<> {
+ // move into coroutine body so they don't get destroyed with the lambda (on first co_await)
+ auto& env = env_;
+ auto& t = t_;
-public:
- ticker() = default;
- ticker(const ticker&) = delete;
- ticker(ticker&&) = delete;
-
- ~ticker() {
- assert(!_ticker);
- }
-
- using on_tick_t = noncopyable_function;
-
- template
- void start(std::pair (&&tick_funs)[N], uint64_t limit = std::numeric_limits::max()) {
- assert(!_ticker);
- _ticker = tick(std::move(tick_funs), limit);
- }
-
- future<> abort() {
- if (_ticker) {
- _stop = true;
- co_await *std::exchange(_ticker, std::nullopt);
- }
- }
-
-private:
- template
- future<> tick(std::pair (&&tick_funs)[N], uint64_t limit) {
- static_assert(N > 0);
-
- auto funs = std::to_array(std::move(tick_funs));
- for (uint64_t tick = 0; tick < limit; ++tick) {
- if (_stop) {
- tlogger.debug("ticker: finishing after {} ticks", tick);
- co_return;
- }
-
- for (auto& [n, f] : funs) {
- if (tick % n == 0) {
- f();
- }
- }
-
- co_await seastar::later();
- }
-
- tlogger.error("ticker: limit reached");
- assert(false);
- }
-};
-
-template
-future<> with_env_and_ticker(noncopyable_function(environment&, ticker&)> f) {
- auto env = std::make_unique>();
- auto t = std::make_unique();
- return f(*env, *t).finally([env_ = std::move(env), t_ = std::move(t)] () mutable -> future<> {
- // move into coroutine body so they don't get destroyed with the lambda (on first co_await)
- auto env = std::move(env_);
- auto t = std::move(t_);
- co_await t->abort();
- co_await env->abort();
+ co_await t->abort();
+ co_await env->abort();
+ });
});
}
@@ -1299,9 +1180,11 @@ struct ExReg {
), input);
}
- static const state_t init = 0;
+ static const state_t init;
};
+const ExReg::state_t ExReg::init = 0;
+
namespace ser {
template <>
struct serializer {
@@ -1330,9 +1213,45 @@ namespace ser {
bool operator==(ExReg::ret a, ExReg::ret b) { return a.x == b.x; }
+// Wait until either one of `nodes` in `env` becomes a leader, or duration `d` expires according to `timer` (whichever happens first).
+// If the leader is found, returns it. Otherwise throws a `logical_timer::timed_out` exception.
+template
+struct wait_for_leader {
+ // FIXME: change into free function after clang bug #50345 is fixed
+ future operator()(
+ environment& env,
+ std::vector nodes,
+ logical_timer& timer,
+ raft::logical_clock::duration d) {
+ auto l = co_await timer.with_timeout(timer.now() + d, [] (weak_ptr> env, std::vector nodes) -> future {
+ while (true) {
+ if (!env) {
+ co_return raft::server_id{};
+ }
+
+ auto it = std::find_if(nodes.begin(), nodes.end(), [&env] (raft::server_id id) { return env->get_server(id).is_leader(); });
+ if (it != nodes.end()) {
+ co_return *it;
+ }
+
+ co_await seastar::later();
+ }
+ }(env.weak_from_this(), std::move(nodes)));
+
+ assert(l != raft::server_id{});
+ assert(env.get_server(l).is_leader());
+
+ co_return l;
+ }
+};
+
SEASTAR_TEST_CASE(basic_test) {
logical_timer timer;
- co_await with_env_and_ticker([&timer] (environment& env, ticker& t) -> future<> {
+ environment_config cfg {
+ .network_delay = 5_t,
+ .fd_convict_threshold = 50_t,
+ };
+ co_await with_env_and_ticker(cfg, [&timer] (environment& env, ticker& t) -> future<> {
using output_t = typename ExReg::output_t;
t.start({
@@ -1348,16 +1267,7 @@ SEASTAR_TEST_CASE(basic_test) {
auto leader_id = co_await env.new_server(true);
// Wait at most 1000 ticks for the server to elect itself as a leader.
- co_await timer.with_timeout(timer.now() + 1000_t, ([] (weak_ptr> env, raft::server_id leader_id) -> future<> {
- while (true) {
- if (!env || env->get_server(leader_id).is_leader()) {
- co_return;
- }
- co_await seastar::later();
- }
- })(env.weak_from_this(), leader_id));
-
- assert(env.get_server(leader_id).is_leader());
+ assert(co_await wait_for_leader{}(env, {leader_id}, timer, 1000_t) == leader_id);
auto call = [&] (ExReg::input_t input, raft::logical_clock::duration timeout) {
return env.get_server(leader_id).call(std::move(input), timer.now() + timeout, timer);
@@ -1378,7 +1288,8 @@ SEASTAR_TEST_CASE(basic_test) {
tlogger.debug("Started 2 more servers, changing configuration");
- co_await env.get_server(leader_id).set_configuration({{.id = leader_id}, {.id = id2}, {.id = id3}});
+ assert(std::holds_alternative(
+ co_await env.get_server(leader_id).reconfigure({leader_id, id2, id3}, timer.now() + 100_t, timer)));
tlogger.debug("Configuration changed");
diff --git a/test/raft/ticker.hh b/test/raft/ticker.hh
new file mode 100644
index 0000000000..549cfda653
--- /dev/null
+++ b/test/raft/ticker.hh
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2021 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#pragma once
+
+#include
+#include
+#include
+
+using namespace seastar;
+
+// Given a set of functions and associated positive natural numbers,
+// calls the functions with periods defined by their corresponding numbers,
+// yielding in between.
+//
+// Define a "tick" to be a (possibly empty) set of calls to some of the given functions
+// followed by a yield. The ticker executes a sequence of ticks. Given {n, f}, where n
+// is a number and f is a function, f will be called each nth tick.
+//
+// For example, suppose the ticker was started with the set {{2, f}, {4, g}, {4, h}}.
+// Then the functions called in each tick are:
+// tick 1: f, g, h tick 2: none, tick 3: f, tick 4: none, tick 5: f, g, h tick 2: none, and so on.
+//
+// The order of calls within a single tick is unspecified.
+//
+// The number of ticks can be limited. We crash if the ticker reaches the limit before it's `abort()`ed.
+//
+// Call `start` to provide the distribution and start the ticking.
+class ticker {
+ bool _stop = false;
+ std::optional> _ticker;
+ seastar::logger& _logger;
+
+public:
+ ticker(seastar::logger& l) : _logger(l) {}
+ ticker(const ticker&) = delete;
+ ticker(ticker&&) = delete;
+
+ ~ticker() {
+ assert(!_ticker);
+ }
+
+ using on_tick_t = noncopyable_function;
+
+ template
+ void start(std::pair (&&tick_funs)[N], uint64_t limit = std::numeric_limits::max()) {
+ assert(!_ticker);
+ _ticker = tick(std::move(tick_funs), limit);
+ }
+
+ future<> abort() {
+ if (_ticker) {
+ _stop = true;
+ co_await *std::exchange(_ticker, std::nullopt);
+ }
+ }
+
+private:
+ template
+ future<> tick(std::pair (&&tick_funs)[N], uint64_t limit) {
+ static_assert(N > 0);
+
+ auto funs = std::to_array(std::move(tick_funs));
+ for (uint64_t tick = 0; tick < limit; ++tick) {
+ if (_stop) {
+ _logger.debug("ticker: finishing after {} ticks", tick);
+ co_return;
+ }
+
+ for (auto& [n, f] : funs) {
+ if (tick % n == 0) {
+ f();
+ }
+ }
+
+ co_await seastar::later();
+ }
+
+ _logger.error("ticker: limit reached");
+ assert(false);
+ }
+};