mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
Merge 'raft topology: wait_for_peers_to_enter_synchronize_state doesn't need to resolve all IPs' from Mikołaj Grzebieluch
Another node can stop after it joined the group0 but before it advertised itself in gossip. `get_inet_addrs` will try to resolve all IPs and `wait_for_peers_to_enter_synchronize_state` will loop indefinitely. But `wait_for_peers_to_enter_synchronize_state` can return early if one of the nodes confirms that the upgrade procedure has finished. For that, it doesn't need the IPs of all group 0 members - only the IP of some nodes which can do the confirmation. This pr restructures the code so that IPs of nodes are resolved inside the `max_concurrent_for_each` that `wait_for_peers_to_enter_synchronize_state` performs. Then, even if some IPs won't be resolved, but one of the nodes confirms a successful upgrade, we can continue. Fixes #13543 Closes #14046 * github.com:scylladb/scylladb: raft topology: test: check if aborted node replacing blocks bootstrap raft topology: `wait_for_peers_to_enter_synchronize_state` doesn't need to resolve all IPs
This commit is contained in:
@@ -33,6 +33,7 @@
|
||||
#include <seastar/util/log.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include <stdexcept>
|
||||
|
||||
#include "idl/group0.dist.hh"
|
||||
|
||||
@@ -469,6 +470,13 @@ struct group0_members {
|
||||
const raft::server& _group0_server;
|
||||
const raft_address_map& _address_map;
|
||||
|
||||
raft::config_member_set get_members() const {
|
||||
return _group0_server.get_configuration().current;
|
||||
}
|
||||
|
||||
std::optional<gms::inet_address> get_inet_addr(const raft::config_member& member) const {
|
||||
return _address_map.find(member.addr.id);
|
||||
}
|
||||
|
||||
std::vector<gms::inet_address> get_inet_addrs(seastar::compat::source_location l =
|
||||
seastar::compat::source_location::current()) const {
|
||||
@@ -576,6 +584,10 @@ future<> raft_group0::setup_group0(
|
||||
co_await join_group0(std::move(seeds), false /* non-voter */, ss, qp, mm, cdc_gen_service);
|
||||
group0_log.info("setup_group0: successfully joined group 0.");
|
||||
|
||||
utils::get_local_injector().inject("stop_after_joining_group0", [&] {
|
||||
throw std::runtime_error{"injection: stop_after_joining_group0"};
|
||||
});
|
||||
|
||||
if (replace_info) {
|
||||
// Insert the replaced node's (Raft ID, IP address) pair into `raft_address_map`.
|
||||
// In general, the mapping won't be obtained through the regular gossiping route:
|
||||
@@ -1192,10 +1204,7 @@ static future<bool> wait_for_peers_to_enter_synchronize_state(
|
||||
|
||||
for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) {
|
||||
// We fetch the config again on every attempt to handle the possibility of removing failed nodes.
|
||||
auto current_config = members0.get_inet_addrs();
|
||||
if (current_config.empty()) {
|
||||
continue;
|
||||
}
|
||||
auto current_members_set = members0.get_members();
|
||||
|
||||
::tracker<bool> tracker;
|
||||
auto retry = make_lw_shared<bool>(false);
|
||||
@@ -1209,10 +1218,20 @@ static future<bool> wait_for_peers_to_enter_synchronize_state(
|
||||
}
|
||||
|
||||
(void) [] (netw::messaging_service& ms, abort_source& as, gate::holder pause_shutdown,
|
||||
std::vector<gms::inet_address> current_config,
|
||||
raft::config_member_set current_members_set, group0_members members0,
|
||||
lw_shared_ptr<std::unordered_set<gms::inet_address>> entered_synchronize,
|
||||
lw_shared_ptr<bool> retry, ::tracker<bool> tracker) -> future<> {
|
||||
co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const gms::inet_address& node) -> future<> {
|
||||
co_await max_concurrent_for_each(current_members_set, max_concurrency, [&] (const raft::config_member& member) -> future<> {
|
||||
auto node_opt = members0.get_inet_addr(member);
|
||||
|
||||
if (!node_opt.has_value()) {
|
||||
upgrade_log.warn("wait_for_peers_to_enter_synchronize_state: cannot resolve the IP of {}", member);
|
||||
*retry = true;
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto node = *node_opt;
|
||||
|
||||
if (entered_synchronize->contains(node)) {
|
||||
co_return;
|
||||
}
|
||||
@@ -1249,7 +1268,7 @@ static future<bool> wait_for_peers_to_enter_synchronize_state(
|
||||
});
|
||||
|
||||
tracker.set_value(false);
|
||||
}(ms, as, pause_shutdown, std::move(current_config), entered_synchronize, retry, tracker);
|
||||
}(ms, as, pause_shutdown, std::move(current_members_set), members0, entered_synchronize, retry, tracker);
|
||||
|
||||
auto finish_early = co_await tracker.get();
|
||||
if (finish_early) {
|
||||
|
||||
@@ -6,3 +6,5 @@ extra_scylla_config_options:
|
||||
authenticator: AllowAllAuthenticator
|
||||
authorizer: AllowAllAuthorizer
|
||||
experimental_features: ['raft']
|
||||
skip_in_release:
|
||||
- test_blocked_bootstrap
|
||||
|
||||
51
test/topology_experimental_raft/test_blocked_bootstrap.py
Normal file
51
test/topology_experimental_raft/test_blocked_bootstrap.py
Normal file
@@ -0,0 +1,51 @@
|
||||
# Copyright (C) 2023-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
|
||||
import pytest
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_blocked_bootstrap(manager: ManagerClient):
|
||||
"""
|
||||
Scenario:
|
||||
1. Start a cluster with nodes node1, node2, node3
|
||||
2. Start node4 replacing node node2
|
||||
3. Stop node node4 after it joined group0 but before it advertised itself in gossip
|
||||
4. Start node5 replacing node node2
|
||||
|
||||
Test simulates the behavior described in #13543.
|
||||
|
||||
Test passes only if `wait_for_peers_to_enter_synchronize_state` doesn't need to
|
||||
resolve all IPs to return early. If not, node5 will hang trying to resolve the
|
||||
IP of node4:
|
||||
```
|
||||
raft_group0_upgrade - : failed to resolve IP addresses of some of the cluster members ([node4's host ID])
|
||||
```
|
||||
"""
|
||||
servers = [await manager.server_add() for _ in range(3)]
|
||||
|
||||
logger.info(f"Stopping node {servers[0]}")
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
|
||||
logger.info(f"Replacing node {servers[0]}")
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False)
|
||||
|
||||
try:
|
||||
await manager.server_add(replace_cfg, config={
|
||||
'error_injections_at_startup': ['stop_after_joining_group0']
|
||||
})
|
||||
except:
|
||||
# Node stops before it advertised itself in gossip, so manager.server_add throws an exception
|
||||
pass
|
||||
else:
|
||||
assert False, "Node should stop before it advertised itself in gossip"
|
||||
|
||||
logger.info(f"Replacing node {servers[0]}")
|
||||
await manager.server_add(replace_cfg)
|
||||
Reference in New Issue
Block a user