diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 8a68f0c7a7..aac03a12e8 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -607,10 +607,27 @@ future<> gossiper::send_gossip(gossip_digest_syn message, std::set epset) { future<> gossiper::do_apply_state_locally(locator::host_id node, endpoint_state remote_state, bool shadow_round) { + + co_await utils::get_local_injector().inject("delay_gossiper_apply", [&node, &remote_state](auto& handler) -> future<> { + const auto gossip_delay_node = handler.template get("delay_node"); + if (gossip_delay_node && !remote_state.get_host_id() && inet_address(sstring(gossip_delay_node.value())) == remote_state.get_ip()) { + logger.debug("delay_gossiper_apply: suspend for node {}", node); + co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); + logger.debug("delay_gossiper_apply: resume for node {}", node); + } + }); + // If state does not exist just add it. If it does then add it if the remote generation is greater. // If there is a generation tie, attempt to break it by heartbeat version. auto permit = co_await lock_endpoint(node, null_permit_id); auto es = get_endpoint_state_ptr(node); + + // If remote state update does not contain a host id, check whether the endpoint still + // exists in the `_endpoint_state_map` since after a preemption point it could have been deleted. + if (!remote_state.get_host_id() && !es) { + throw std::runtime_error(format("Entry for host id {} does not exist in the endpoint state map.", node)); + } + if (es) { endpoint_state local_state = *es; auto local_generation = local_state.get_heart_beat_state().get_generation(); @@ -1333,6 +1350,12 @@ utils::chunked_vector gossiper::make_random_gossip_digest() const } future<> gossiper::replicate(endpoint_state es, permit_id pid) { + if (!es.get_host_id()) { + // TODO (#25818): re-introduce the on_internal_error() call once all the code paths leading to this are fixed + logger.warn("attempting to add a state with empty host id for ip: {}", es.get_ip()); + co_return; + } + verify_permit(es.get_host_id(), pid); // First pass: replicate the new endpoint_state on all shards. @@ -2066,15 +2089,19 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, applicat generation_nbr = gms::generation_type(_gcfg.force_gossip_generation()); logger.warn("Use the generation number provided by user: generation = {}", generation_nbr); } - endpoint_state local_state = my_endpoint_state(); + + // Create a new local state. + endpoint_state local_state{get_broadcast_address()}; local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr)); for (auto& entry : preload_local_states) { local_state.add_application_state(entry.first, entry.second); } + co_await utils::get_local_injector().inject("gossiper_publish_local_state_pause", utils::wait_for_message(5min)); + co_await replicate(local_state, permit.id()); - logger.info("Gossip started with local state: {}", local_state); + logger.info("Gossip started with local state: {}", my_endpoint_state()); _enabled = true; _nr_run = 0; _scheduled_gossip_task.arm(INTERVAL); @@ -2154,7 +2181,11 @@ future<> gossiper::do_shadow_round(std::unordered_set nodes, }); for (auto& response : responses) { - co_await apply_state_locally_in_shadow_round(std::move(response.endpoint_state_map)); + try { + co_await apply_state_locally_in_shadow_round(std::move(response.endpoint_state_map)); + } catch (const std::exception& exception) { + logger.warn("Error while applying node state {}", exception.what()); + } } if (!nodes_talked.empty()) { break; diff --git a/test/cluster/test_gossiper_empty_self_id_on_shadow_round.py b/test/cluster/test_gossiper_empty_self_id_on_shadow_round.py new file mode 100644 index 0000000000..290112c0f8 --- /dev/null +++ b/test/cluster/test_gossiper_empty_self_id_on_shadow_round.py @@ -0,0 +1,80 @@ +# +# Copyright (C) 2025-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 +# + + +from aiohttp import ServerDisconnectedError +import pytest +import asyncio +import logging + +from test.cluster.conftest import skip_mode +from test.cluster.util import get_coordinator_host +from test.pylib.manager_client import ManagerClient + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_gossiper_empty_self_id_on_shadow_round(manager: ManagerClient): + """ + Test gossiper race condition on bootstrap that can lead to an empty self host ID sent in replies to other nodes. + 1. Enable gossiper_publish_local_state_pause on one of the nodes to pause gossiper in + `gossiper::start_gossiping` when it has just created a self node entry in `_endpoint_state_map` + with an empty state. + 2. Start nodes normally to allow them join the cluster. + 3. Restart 1st node, to that will pause in `gossiper::start_gossiping`. + 4. After it pauses, start second node and make sure it's making a gossip shadow round. At this step + if a fix is not in place, the second node will receive an empty host ID (which is a race). + 5. After shadow round on the 2nd node done, unpause the 1st node. + 6. Make sure nodes started successfully. + """ + + cmdline = [ + '--logger-log-level=gossip=debug' + ] + + cfg = { + 'error_injections_at_startup': [ + { + 'name': 'gossiper_publish_local_state_pause' + } + ] + } + + logging.info("Starting cluster normally") + node1 = await manager.server_add(cmdline=cmdline, start=False, config=cfg) + manager.server_add(cmdline=cmdline, start=False) + node1_log = await manager.server_open_log(node1.server_id) + node2 = await manager.server_add(cmdline=cmdline, start=False, seeds=[node1.ip_addr]) + node2_log = await manager.server_open_log(node2.server_id) + task1 = asyncio.create_task(manager.server_start(node1.server_id)) + task2 = asyncio.create_task(manager.server_start(node2.server_id)) + await node1_log.wait_for("gossiper_publish_local_state_pause: waiting for message") + await manager.api.message_injection(node1.ip_addr, 'gossiper_publish_local_state_pause') + await task1 + await task2 + + logging.info("Stopping cluster") + await manager.server_stop_gracefully(node1.server_id) + await manager.server_stop_gracefully(node2.server_id) + + # Remember logs + paused_node_mark = await node1_log.mark() + reading_node_mark = await node2_log.mark() + + logging.info("Restarting cluster") + # Start first node and make sure it's paused on gossiper_publish_local_state_pause + task1 = asyncio.create_task( + manager.server_start(node1.server_id, wait_interval=120)) + await node1_log.wait_for("gossiper_publish_local_state_pause: waiting for message", from_mark=paused_node_mark) + logging.info("Found gossiper_publish_local_state_pause") + # After the first node started and paused in start_gossiping, start the second node/ + task2 = asyncio.create_task(manager.server_start(node2.server_id)) + # Make sure the 2nd node received a `get_endpoint_states` request response. + await node2_log.wait_for(f"Got get_endpoint_states response from {node1.ip_addr}", from_mark=reading_node_mark) + # Unpause the 1st node. + await manager.api.message_injection(node1.ip_addr, 'gossiper_publish_local_state_pause') + await task1 + await task2 diff --git a/test/cluster/test_gossiper_race.py b/test/cluster/test_gossiper_race.py new file mode 100644 index 0000000000..d42bc39937 --- /dev/null +++ b/test/cluster/test_gossiper_race.py @@ -0,0 +1,97 @@ +# +# Copyright (C) 2025-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 +# + + +from aiohttp import ServerDisconnectedError +import pytest + +from test.cluster.conftest import skip_mode +from test.cluster.util import get_coordinator_host +from test.pylib.manager_client import ManagerClient + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_gossiper_race_on_decommission(manager: ManagerClient): + """ + Test for gossiper race scenario (https://github.com/scylladb/scylladb/issues/25621): + - Create a cluster with multiple nodes + - Decommission one node while injecting delays in gossip processing + - Check for the race condition where get_host_id() is called on a removed endpoint + """ + cmdline = [ + '--logger-log-level=gossip=debug', + '--logger-log-level=raft_topology=debug' + ] + + # Create cluster with more nodes to increase gossip traffic + servers = await manager.servers_add(3, cmdline=cmdline) + + coordinator = await get_coordinator_host(manager=manager) + coordinator_log = await manager.server_open_log(server_id=coordinator.server_id) + coordinator_log_mark = await coordinator_log.mark() + + decom_node = next(s for s in servers if s.server_id != coordinator.server_id) + + # enable the delay_gossiper_apply injection + await manager.api.enable_injection( + node_ip=coordinator.ip_addr, + injection="delay_gossiper_apply", + one_shot=False, + parameters={"delay_node": decom_node.ip_addr}, + ) + + # wait for the "delay_gossiper_apply" error injection to take effect + # - wait for multiple occurrences to be batched, so that there is a higher chance of one of them + # failing down in the `gossiper::do_on_change_notifications()` + for _ in range(5): + log_mark = await coordinator_log.mark() + await coordinator_log.wait_for( + "delay_gossiper_apply: suspend for node", + from_mark=log_mark, + ) + + coordinator_log_mark = await coordinator_log.mark() + + # start the decommission task + await manager.decommission_node(decom_node.server_id) + + # wait for the node to finish the removal + await coordinator_log.wait_for( + "Finished to force remove node", + from_mark=coordinator_log_mark, + ) + + coordinator_log_mark = await coordinator_log.mark() + + try: + # unblock the delay_gossiper_apply injection + await manager.api.message_injection( + node_ip=coordinator.ip_addr, + injection="delay_gossiper_apply", + ) + except ServerDisconnectedError: + # the server might get disconnected in the failure case because of abort + # - we detect that later (with more informatiove error handling), so we ignore this here + pass + + # wait for the "delay_gossiper_apply" error injection to be unblocked + await coordinator_log.wait_for( + "delay_gossiper_apply: resume for node", + from_mark=coordinator_log_mark, + ) + + # test that the coordinator node didn't hit the case where it would try to add a state with empty host id + empty_host_found = await coordinator_log.grep( + "gossip - attempting to add a state with empty host id", + from_mark=coordinator_log_mark, + ) + + assert not empty_host_found, "Empty host ID has been found in gossiper::replicate()" + + # secondary test - ensure the coordinator node is still running + running_servers = await manager.running_servers() + assert coordinator.server_id in [s.server_id for s in running_servers]