raft: replication test: partition helper

Add a partition handling helper to raft_cluster.

Signed-off-by: Alejo Sanchez <alejo.sanchez@scylladb.com>
This commit is contained in:
Alejo Sanchez
2021-05-07 17:46:04 -04:00
parent 848c244932
commit 6db730c500

View File

@@ -513,6 +513,8 @@ public:
future<> change_configuration(size_t total_values, set_config sc, size_t& leader,
std::vector<seastar::timer<lowres_clock>>& tickers);
future<> reconfigure_all(size_t leader, std::vector<seastar::timer<lowres_clock>>& tickers);
future<> partition(::partition p, size_t& leader,
std::vector<seastar::timer<lowres_clock>>& tickers);
const std::unordered_set<size_t>& get_configuration() {
return _in_configuration; // Servers in current configuration
}
@@ -803,6 +805,47 @@ future<> raft_cluster::reconfigure_all(size_t leader,
}
}
future<> raft_cluster::partition(::partition p, size_t& leader,
std::vector<seastar::timer<lowres_clock>>& tickers) {
std::unordered_set<size_t> partition_servers;
std::optional<size_t> next_leader;
for (auto s: p) {
size_t id;
if (std::holds_alternative<struct leader>(s)) {
next_leader = std::get<struct leader>(s).id;
id = *next_leader;
} else {
id = std::get<int>(s);
}
partition_servers.insert(id);
}
if (next_leader) {
// Wait for log to propagate to next leader, before disconnections
co_await wait_log(leader, *next_leader);
} else {
// No leader specified, wait log for all connected servers, before disconnections
for (auto s: partition_servers) {
co_await wait_log(leader, s);
}
}
pause_tickers(tickers);
_connected->connect_all();
for (size_t s = 0; s < _servers.size(); ++s) {
if (partition_servers.find(s) == partition_servers.end()) {
// Disconnect servers not in main partition
_connected->disconnect(to_raft_id(s));
}
}
if (next_leader) {
// New leader specified, elect it
leader = co_await elect_new_leader(leader, *next_leader);
} else if (partition_servers.find(leader) == partition_servers.end() && p.size() > 0) {
// Old leader disconnected and not specified new, free election
leader = co_await free_election();
}
restart_tickers(tickers);
}
using raft_ticker_type = seastar::timer<lowres_clock>;
std::vector<raft_ticker_type> init_raft_tickers(raft_cluster& rafts) {
@@ -880,44 +923,7 @@ future<> run_test(test_case test, bool prevote, bool packet_drops) {
leader = co_await rafts.elect_new_leader(leader, next_leader);
restart_tickers(tickers);
} else if (std::holds_alternative<partition>(update)) {
auto p = std::get<partition>(update);
std::unordered_set<size_t> partition_servers;
std::optional<size_t> next_leader;
for (auto s: p) {
size_t id;
if (std::holds_alternative<struct leader>(s)) {
next_leader = std::get<struct leader>(s).id;
id = *next_leader;
} else {
id = std::get<int>(s);
}
partition_servers.insert(id);
}
if (next_leader) {
// Wait for log to propagate to next leader, before disconnections
co_await rafts.wait_log(leader, *next_leader);
} else {
// No leader specified, wait log for all connected servers, before disconnections
for (auto s: partition_servers) {
co_await rafts.wait_log(leader, s);
}
}
pause_tickers(tickers);
connected->connect_all();
for (size_t s = 0; s < test.nodes; ++s) {
if (partition_servers.find(s) == partition_servers.end()) {
// Disconnect servers not in main partition
connected->disconnect(to_raft_id(s));
}
}
if (next_leader) {
// New leader specified, elect it
leader = co_await rafts.elect_new_leader(leader, *next_leader);
} else if (partition_servers.find(leader) == partition_servers.end() && p.size() > 0) {
// Old leader disconnected and not specified new, free election
leader = co_await rafts.free_election();
}
restart_tickers(tickers);
co_await rafts.partition(std::get<partition>(update), leader, tickers);
} else if (std::holds_alternative<set_config>(update)) {
co_await rafts.wait_log_all(leader);
auto sc = std::get<set_config>(update);