diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 6a64489654..7392bf27e1 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -607,6 +607,16 @@ future<> gossiper::send_gossip(gossip_digest_syn message, std::set epset) { future<> gossiper::do_apply_state_locally(locator::host_id node, endpoint_state remote_state, bool shadow_round) { + + co_await utils::get_local_injector().inject("delay_gossiper_apply", [&node, &remote_state](auto& handler) -> future<> { + const auto gossip_delay_node = handler.template get("delay_node"); + if (gossip_delay_node && !remote_state.get_host_id() && inet_address(sstring(gossip_delay_node.value())) == remote_state.get_ip()) { + logger.debug("delay_gossiper_apply: suspend for node {}", node); + co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); + logger.debug("delay_gossiper_apply: resume for node {}", node); + } + }); + // If state does not exist just add it. If it does then add it if the remote generation is greater. // If there is a generation tie, attempt to break it by heartbeat version. auto permit = co_await lock_endpoint(node, null_permit_id); @@ -1337,6 +1347,10 @@ utils::chunked_vector gossiper::make_random_gossip_digest() const } future<> gossiper::replicate(endpoint_state es, permit_id pid) { + if (!es.get_host_id()) { + on_internal_error(logger, fmt::format("adding a state with empty host id for ip: {}", es.get_ip())); + } + verify_permit(es.get_host_id(), pid); // First pass: replicate the new endpoint_state on all shards. diff --git a/test/cluster/test_gossiper_race.py b/test/cluster/test_gossiper_race.py new file mode 100644 index 0000000000..1eae59be18 --- /dev/null +++ b/test/cluster/test_gossiper_race.py @@ -0,0 +1,98 @@ +# +# Copyright (C) 2025-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 +# + + +from aiohttp import ServerDisconnectedError +import pytest + +from test.cluster.conftest import skip_mode +from test.cluster.util import get_coordinator_host +from test.pylib.manager_client import ManagerClient + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/25621") +async def test_gossiper_race_on_decommission(manager: ManagerClient): + """ + Test for gossiper race scenario (https://github.com/scylladb/scylladb/issues/25621): + - Create a cluster with multiple nodes + - Decommission one node while injecting delays in gossip processing + - Check for the race condition where get_host_id() is called on a removed endpoint + """ + cmdline = [ + '--logger-log-level=gossip=debug', + '--logger-log-level=raft_topology=debug' + ] + + # Create cluster with more nodes to increase gossip traffic + servers = await manager.servers_add(3, cmdline=cmdline) + + coordinator = await get_coordinator_host(manager=manager) + coordinator_log = await manager.server_open_log(server_id=coordinator.server_id) + coordinator_log_mark = await coordinator_log.mark() + + decom_node = next(s for s in servers if s.server_id != coordinator.server_id) + + # enable the delay_gossiper_apply injection + await manager.api.enable_injection( + node_ip=coordinator.ip_addr, + injection="delay_gossiper_apply", + one_shot=False, + parameters={"delay_node": decom_node.ip_addr}, + ) + + # wait for the "delay_gossiper_apply" error injection to take effect + # - wait for multiple occurrences to be batched, so that there is a higher chance of one of them + # failing down in the `gossiper::do_on_change_notifications()` + for _ in range(5): + log_mark = await coordinator_log.mark() + await coordinator_log.wait_for( + "delay_gossiper_apply: suspend for node", + from_mark=log_mark, + ) + + coordinator_log_mark = await coordinator_log.mark() + + # start the decommission task + await manager.decommission_node(decom_node.server_id) + + # wait for the node to finish the removal + await coordinator_log.wait_for( + "Finished to force remove node", + from_mark=coordinator_log_mark, + ) + + coordinator_log_mark = await coordinator_log.mark() + + try: + # unblock the delay_gossiper_apply injection + await manager.api.message_injection( + node_ip=coordinator.ip_addr, + injection="delay_gossiper_apply", + ) + except ServerDisconnectedError: + # the server might get disconnected in the failure case because of abort + # - we detect that later (with more informatiove error handling), so we ignore this here + pass + + # wait for the "delay_gossiper_apply" error injection to be unblocked + await coordinator_log.wait_for( + "delay_gossiper_apply: resume for node", + from_mark=coordinator_log_mark, + ) + + # test that the coordinator node didn't abort + empty_host_found = await coordinator_log.grep( + "gossip - adding a state with empty host id", + from_mark=coordinator_log_mark, + ) + + assert not empty_host_found, "Empty host ID has been found in gossiper::replicate()" + + # secondary test - ensure the coordinator node is still running + running_servers = await manager.running_servers() + assert coordinator.server_id in [s.server_id for s in running_servers]