Merge 'Make node replace procedure work with Raft' from Kamil Braun

We need to obtain the Raft ID of the replaced node during the shadow round and
place it in the address map. It won't be placed by the regular gossiping route
if we're replacing using the same IP, because we override the application state
of the replaced node. Even if we replace a node with a different IP, it is not
guaranteed that background gossiping manages update the address map before we
need it, especially in tests where we set ring_delay to 0 and disable
wait_for_gossip_to_settle. The shadow round, on the other hand, performs a
synchronous request (and if it fails during bootstrap, bootstrap will fail -
because we also won't be able to obtain the tokens and Host ID of the replaced
node).

Fetch the Raft ID of the replaced node in `prepare_replacement_info`,
which runs the shadow round. Return it in `replacement_info`. Then
`join_token_ring` passes it to `setup_group0`, which stores it in the
address map. It does that after `join_group0` so the entry is
non-expiring (the replaced node is a member of group 0). Later in the
replace procedure, we call `remove_from_group0` for the replaced node.
`remove_from_group0` will be able to reverse-translate the IP of the
replaced node to its Raft ID using the address map.

Also remove an unconditional 60 seconds sleep from the replace code. Make it
dependent on ring_delay.

Enable the replace tests.

Modify some code related to removing servers from group 0 which depended on
storing IP addresses in the group 0 configuration.

Closes #12172

* github.com:scylladb/scylladb:
  test/topology: enable replace tests
  service/raft: report an error when Raft ID can't be found in `raft_group0::remove_from_group0`
  service: handle replace correctly with Raft enabled
  gms/gossiper: fetch RAFT_SERVER_ID during shadow round
  service: storage_service: sleep 2*ring_delay instead of BROADCAST_INTERVAL before replace
This commit is contained in:
Tomasz Grabiec
2022-12-07 15:30:25 +01:00
6 changed files with 90 additions and 33 deletions

View File

@@ -1885,7 +1885,8 @@ future<> gossiper::do_shadow_round(std::unordered_set<gms::inet_address> nodes)
gms::application_state::DC,
gms::application_state::RACK,
gms::application_state::SUPPORTED_FEATURES,
gms::application_state::SNITCH_NAME}};
gms::application_state::SNITCH_NAME,
gms::application_state::RAFT_SERVER_ID}};
logger.info("Gossip shadow round started with nodes={}", nodes);
std::unordered_set<gms::inet_address> nodes_talked;
size_t nodes_down = 0;

View File

@@ -488,7 +488,9 @@ static future<bool> synchronize_schema(
const noncopyable_function<future<bool>()>& can_finish_early,
abort_source&);
future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unordered_set<gms::inet_address>& initial_contact_nodes) {
future<> raft_group0::setup_group0(
db::system_keyspace& sys_ks, const std::unordered_set<gms::inet_address>& initial_contact_nodes,
std::optional<replace_info> replace_info) {
assert(this_shard_id() == 0);
if (!_raft_gr.is_enabled()) {
@@ -524,6 +526,13 @@ future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unord
co_return;
}
if (replace_info && !replace_info->raft_id) {
auto msg = format("Cannot perform replace operation: Raft ID of the replaced server (IP: {}) is missing.",
replace_info->ip_addr);
group0_log.error("{}", msg);
throw std::runtime_error{std::move(msg)};
}
std::vector<gms::inet_address> seeds;
for (auto& addr: initial_contact_nodes) {
if (addr != _gossiper.get_broadcast_address()) {
@@ -535,6 +544,25 @@ future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unord
co_await join_group0(std::move(seeds), false /* non-voter */);
group0_log.info("setup_group0: successfully joined group 0.");
if (replace_info) {
// Insert the replaced node's (Raft ID, IP address) pair into `raft_address_map`.
// In general, the mapping won't be obtained through the regular gossiping route:
// if we (the replacing node) use the replaced node's IP address, the replaced node's
// application states are gone by this point (our application states overrode them
// - the application states map is using the IP address as the key).
// Even when we use a different IP, there's no guarantee the IPs were exchanged by now.
// Instead, we obtain `replace_info` during the shadow round (which guarantees to contact
// another node and fetch application states from it) and pass it to `setup_group0`.
// Checked earlier.
assert(replace_info->raft_id);
group0_log.info("Replacing a node with Raft ID: {}, IP address: {}",
*replace_info->raft_id, replace_info->ip_addr);
// `opt_add_entry` is shard-local, but that's fine - we only need this info on shard 0.
_raft_gr.address_map().opt_add_entry(*replace_info->raft_id, replace_info->ip_addr);
}
// Enter `synchronize` upgrade state in case the cluster we're joining has recently enabled Raft
// and is currently in the middle of `upgrade_to_group0()`. For that procedure to finish
// every member of group 0 (now including us) needs to enter `synchronize` state.
@@ -808,23 +836,15 @@ future<> raft_group0::remove_from_group0(gms::inet_address node) {
// (if they are still a member of group 0); hence we provide `my_id` to skip us in the search.
auto their_id = _raft_gr.address_map().find_replace_id(node, my_id);
if (!their_id) {
// The address map is updated with the ID of every member of the configuration.
// We could not find them in the address map. This could mean two things:
// 1. they are not a member.
// 2. They are a member, but we don't know about it yet; e.g. we just upgraded
// and joined group 0 but the leader is still pushing entires to us (including config entries)
// and we don't yet have the entry which contains `their_id`.
// This may mean that the node is a member of group 0 but due to a bug we're missing
// an entry from the address map so we're not able to identify the member that must be removed.
// In this case group 0 will have a 'garbage' member corresponding to a node that has left,
// and that member may be a voter, which is bad because group 0's availability is reduced.
//
// To handle the second case we perform a read barrier now and check the address again.
// Ignore the returned guard, we don't need it.
group0_log.info("remove_from_group0({}): did not find them in group 0 configuration, synchronizing Raft before retrying...", node);
co_await _raft_gr.group0().read_barrier(&_abort_source);
their_id = _raft_gr.address_map().find_replace_id(node, my_id);
if (!their_id) {
group0_log.info("remove_from_group0({}): did not find them in group 0 configuration. Skipping.", node);
co_return;
}
// TODO: there is no API to remove members from group 0 manually. #12153
group0_log.error("remove_from_group0({}): could not find the Raft ID of this node."
" Manual removal from group 0 may be required.", node);
co_return;
}
group0_log.info(
@@ -832,6 +852,10 @@ future<> raft_group0::remove_from_group0(gms::inet_address node) {
node, *their_id);
co_await remove_from_raft_config(*their_id);
group0_log.info(
"remove_from_group0({}): finished removing from group 0 configuration (Raft ID: {})",
node, *their_id);
}
future<> raft_group0::remove_from_raft_config(raft::server_id id) {

View File

@@ -95,6 +95,14 @@ class raft_group0 {
} _status_for_monitoring;
public:
// Passed to `setup_group0` when replacing a node.
struct replace_info {
gms::inet_address ip_addr;
// Optional because it might be missing when Raft is disabled or in RECOVERY mode.
// `setup_group0` will verify that it's present when it's required.
std::optional<raft::server_id> raft_id;
};
// Assumes that the provided services are fully started.
raft_group0(seastar::abort_source& abort_source,
service::raft_group_registry& raft_gr,
@@ -114,18 +122,21 @@ public:
// Call before destroying the object.
future<> abort();
// Call during the startup procedure.
// Call during the startup procedure, after gossiping has started.
//
// If we're performing the replace operation, pass the IP and Raft ID of the replaced node
// obtained using the shadow round through the `replace_info` parameter.
//
// If the local RAFT feature is enabled, does one of the following:
// - join group 0 (if we're bootstrapping),
// - start existing group 0 server (if we bootstrapped before),
// - (TODO: not implemented yet) prepare us for the upgrade procedure, which will create group 0 later.
// - prepare us for the upgrade procedure, which will create group 0 later (if we're upgrading).
//
// Cannot be called twice.
//
// Also make sure to call `finish_setup_after_join` after the node has joined the cluster and entered NORMAL state.
// TODO: specify dependencies on other services: where during startup should we setup group 0?
future<> setup_group0(db::system_keyspace&, const std::unordered_set<gms::inet_address>& initial_contact_nodes);
future<> setup_group0(db::system_keyspace&, const std::unordered_set<gms::inet_address>& initial_contact_nodes,
std::optional<replace_info>);
// Call at the end of the startup procedure, after the node entered NORMAL state.
// `setup_group0()` must have finished earlier.
@@ -157,7 +168,7 @@ public:
// In both cases, `setup_group0()` must have finished earlier.
//
// The provided address may be our own - if we're replacing a node that had the same address as ours.
// We'll look for the other node's Raft ID in the group 0 configuration.
// We'll look for the other node's Raft ID in the Raft address map.
future<> remove_from_group0(gms::inet_address host);
// Assumes that this node's Raft server ID is already initialized and returns it.

View File

@@ -313,6 +313,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
bool replacing_a_node_with_same_ip = false;
bool replacing_a_node_with_diff_ip = false;
std::optional<raft_group0::replace_info> raft_replace_info;
auto tmlock = std::make_unique<token_metadata_lock>(co_await get_token_metadata_lock());
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (is_replacing()) {
@@ -330,6 +331,10 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
get_broadcast_address(), *replace_address);
tmptr->update_topology(*replace_address, std::move(ri.dc_rack));
co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address);
raft_replace_info = raft_group0::replace_info {
.ip_addr = *replace_address,
.raft_id = std::move(ri.raft_id)
};
} else if (should_bootstrap()) {
co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features);
} else {
@@ -441,7 +446,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
co_await _gossiper.start_gossiping(generation_number, app_states, advertise);
assert(_group0);
co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes);
co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, raft_replace_info);
auto schema_change_announce = _db.local().observable_schema_version().observe([this] (table_schema_version schema_version) mutable {
_migration_manager.local().passive_announce(std::move(schema_version));
@@ -497,7 +502,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
// Sleep additionally to make sure that the server actually is not alive
// and giving it more time to gossip if alive.
slogger.info("Sleeping before replacing {}...", *replace_addr);
co_await sleep_abortable(service::load_broadcaster::BROADCAST_INTERVAL, _abort_source);
co_await sleep_abortable(2 * get_ring_delay(), _abort_source);
// check for operator errors...
const auto tmptr = get_token_metadata_ptr();
@@ -735,7 +740,6 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
slogger.info("Replace: removing {} from group 0...", *replace_addr);
assert(_group0);
_group0->remove_from_group0(*replace_addr).get();
slogger.info("Replace: {} removed from group 0.", *replace_addr);
slogger.info("Starting to bootstrap...");
run_replace_ops(bootstrap_tokens);
@@ -1674,6 +1678,19 @@ future<> storage_service::remove_endpoint(inet_address endpoint) {
}
}
// If the Raft ID is present in gossiper and non-empty, return it.
// Otherwise return nullopt.
static std::optional<raft::server_id> get_raft_id_for(gms::gossiper& g, gms::inet_address ep) {
auto app_state = g.get_application_state_ptr(ep, gms::application_state::RAFT_SERVER_ID);
if (app_state) {
auto value = utils::UUID{app_state->value};
if (value) {
return raft::server_id{value};
}
}
return std::nullopt;
}
future<storage_service::replacement_info>
storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features) {
if (!get_replace_address()) {
@@ -1713,6 +1730,7 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
}
auto dc_rack = get_dc_rack_for(replace_address);
auto raft_id = get_raft_id_for(_gossiper, replace_address);
// use the replacee's host Id as our own so we receive hints, etc
auto host_id = _gossiper.get_host_id(replace_address);
@@ -1723,6 +1741,7 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
co_return replacement_info {
.tokens = std::move(tokens),
.dc_rack = std::move(dc_rack),
.raft_id = std::move(raft_id),
};
}
@@ -2489,7 +2508,6 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
slogger.info("removenode[{}]: removing node {} from group 0", uuid, endpoint);
assert(ss._group0);
ss._group0->remove_from_group0(endpoint).get();
slogger.info("removenode[{}]: node {} removed from group 0", uuid, endpoint);
slogger.info("removenode[{}]: Finished removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);
} catch (...) {

View File

@@ -41,6 +41,7 @@
#include <seastar/core/lowres_clock.hh>
#include "locator/snitch_base.hh"
#include "cdc/generation_id.hh"
#include "raft/raft.hh"
class node_ops_cmd_request;
class node_ops_cmd_response;
@@ -289,10 +290,11 @@ private:
future<> do_stop_ms();
future<> shutdown_protocol_servers();
// Tokens and the CDC streams timestamp of the replaced node.
struct replacement_info {
std::unordered_set<token> tokens;
locator::endpoint_dc_rack dc_rack;
// Present only if Raft is enabled.
std::optional<raft::server_id> raft_id;
};
future<replacement_info> prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes,
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);

View File

@@ -60,6 +60,7 @@ async def test_remove_node_add_column(manager, random_tables):
await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1]
await table.add_column()
await random_tables.verify_schema()
# TODO: check that group 0 no longer contains the removed node (#12153)
@pytest.mark.asyncio
@@ -92,24 +93,23 @@ async def test_decommission_node_add_column(manager, random_tables):
await manager.decommission_node(decommission_target.server_id)
await table.add_column()
await random_tables.verify_schema()
# TODO: check that group 0 no longer contains the decommissioned node (#12153)
@pytest.mark.asyncio
@pytest.mark.skip(reason="Replace operation sleeps for 60 seconds")
async def test_replace_different_ip(manager: ManagerClient, random_tables) -> None:
servers = await manager.running_servers()
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False)
await manager.server_add(replace_cfg)
# TODO: check that group 0 no longer contains the replaced node
# TODO: check that group 0 no longer contains the replaced node (#12153)
@pytest.mark.asyncio
@pytest.mark.skip(reason="As above + the new node cannot join group 0")
async def replace_reuse_ip(manager: ManagerClient, random_tables) -> None:
async def test_replace_reuse_ip(manager: ManagerClient, random_tables) -> None:
servers = await manager.running_servers()
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True)
await manager.server_add(replace_cfg)
# TODO: check that group 0 no longer contains the replaced node
# TODO: check that group 0 no longer contains the replaced node (#12153)
@pytest.mark.asyncio
@@ -162,6 +162,7 @@ async def test_remove_node_with_concurrent_ddl(manager, random_tables):
await manager.wait_for_host_down(initiator_ip, target_ip)
logger.info(f'do_remove_node [{i}], invoking remove_node')
await manager.remove_node(initiator_ip, target_ip, target_host_id)
# TODO: check that group 0 no longer contains the removed node (#12153)
logger.info(f'do_remove_node [{i}], remove_node done')
new_server_ip = await manager.server_add()
logger.info(f'do_remove_node [{i}], server_add [{new_server_ip}] done')