Merge '[Backport 2025.2] gossiper: fix issues in processing gossip status during the startup and when messages are delayed to avoid empty host ids' from Scylladb[bot]

Populate the local state during gossiper initialization in start_gossiping, preventing an empty state from being added to _endpoint_state_map and returned in get_endpoint_states responses, that was causing an 'empty host id issue' on the other nodes during nodes restart.

Check for a race condition in do_apply_state_locally In do_apply_state_locally, a race condition can occur if a task is suspended at a preemption point while the node entry is not locked.
During this time, the host may be removed from _endpoint_state_map. When the task resumes, this can lead to inserting an entry with an empty host ID into the map, causing various errors, including a node crash.

This change adds a check after locking the map entry: if a gossip ACK update does not contain a host ID, we verify that an entry with that host ID still exists in the gossiper’s _endpoint_state_map.

Fixes https://github.com/scylladb/scylladb/issues/25831
Fixes https://github.com/scylladb/scylladb/issues/25803
Fixes https://github.com/scylladb/scylladb/issues/25702
Fixes https://github.com/scylladb/scylladb/issues/25621

Ref https://github.com/scylladb/scylla-enterprise/issues/5613

Backport: The issue affects all current releases(2025.x), therefore this PR needs to be backported to all 2025.1-2025.3.

- (cherry picked from commit 28e0f42a83)

- (cherry picked from commit f08df7c9d7)

- (cherry picked from commit 775642ea23)

- (cherry picked from commit b34d543f30)

Parent PR: #25849

Closes scylladb/scylladb#25897

* https://github.com/scylladb/scylladb:
  gossiper: fix empty initial local node state
  gossiper: add test for a race condition in start_gossiping
  gossiper: check for a race condition in `do_apply_state_locally`
  test/gossiper: add reproducible test for race condition during node decommission
This commit is contained in:
Patryk Jędrzejczak
2025-09-09 12:23:56 +02:00
3 changed files with 211 additions and 3 deletions

View File

@@ -607,10 +607,27 @@ future<> gossiper::send_gossip(gossip_digest_syn message, std::set<T> epset) {
future<> gossiper::do_apply_state_locally(locator::host_id node, endpoint_state remote_state, bool shadow_round) {
co_await utils::get_local_injector().inject("delay_gossiper_apply", [&node, &remote_state](auto& handler) -> future<> {
const auto gossip_delay_node = handler.template get<std::string_view>("delay_node");
if (gossip_delay_node && !remote_state.get_host_id() && inet_address(sstring(gossip_delay_node.value())) == remote_state.get_ip()) {
logger.debug("delay_gossiper_apply: suspend for node {}", node);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
logger.debug("delay_gossiper_apply: resume for node {}", node);
}
});
// If state does not exist just add it. If it does then add it if the remote generation is greater.
// If there is a generation tie, attempt to break it by heartbeat version.
auto permit = co_await lock_endpoint(node, null_permit_id);
auto es = get_endpoint_state_ptr(node);
// If remote state update does not contain a host id, check whether the endpoint still
// exists in the `_endpoint_state_map` since after a preemption point it could have been deleted.
if (!remote_state.get_host_id() && !es) {
throw std::runtime_error(format("Entry for host id {} does not exist in the endpoint state map.", node));
}
if (es) {
endpoint_state local_state = *es;
auto local_generation = local_state.get_heart_beat_state().get_generation();
@@ -1333,6 +1350,12 @@ utils::chunked_vector<gossip_digest> gossiper::make_random_gossip_digest() const
}
future<> gossiper::replicate(endpoint_state es, permit_id pid) {
if (!es.get_host_id()) {
// TODO (#25818): re-introduce the on_internal_error() call once all the code paths leading to this are fixed
logger.warn("attempting to add a state with empty host id for ip: {}", es.get_ip());
co_return;
}
verify_permit(es.get_host_id(), pid);
// First pass: replicate the new endpoint_state on all shards.
@@ -2066,15 +2089,19 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, applicat
generation_nbr = gms::generation_type(_gcfg.force_gossip_generation());
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
}
endpoint_state local_state = my_endpoint_state();
// Create a new local state.
endpoint_state local_state{get_broadcast_address()};
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
for (auto& entry : preload_local_states) {
local_state.add_application_state(entry.first, entry.second);
}
co_await utils::get_local_injector().inject("gossiper_publish_local_state_pause", utils::wait_for_message(5min));
co_await replicate(local_state, permit.id());
logger.info("Gossip started with local state: {}", local_state);
logger.info("Gossip started with local state: {}", my_endpoint_state());
_enabled = true;
_nr_run = 0;
_scheduled_gossip_task.arm(INTERVAL);
@@ -2154,7 +2181,11 @@ future<> gossiper::do_shadow_round(std::unordered_set<gms::inet_address> nodes,
});
for (auto& response : responses) {
co_await apply_state_locally_in_shadow_round(std::move(response.endpoint_state_map));
try {
co_await apply_state_locally_in_shadow_round(std::move(response.endpoint_state_map));
} catch (const std::exception& exception) {
logger.warn("Error while applying node state {}", exception.what());
}
}
if (!nodes_talked.empty()) {
break;

View File

@@ -0,0 +1,80 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from aiohttp import ServerDisconnectedError
import pytest
import asyncio
import logging
from test.cluster.conftest import skip_mode
from test.cluster.util import get_coordinator_host
from test.pylib.manager_client import ManagerClient
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_gossiper_empty_self_id_on_shadow_round(manager: ManagerClient):
"""
Test gossiper race condition on bootstrap that can lead to an empty self host ID sent in replies to other nodes.
1. Enable gossiper_publish_local_state_pause on one of the nodes to pause gossiper in
`gossiper::start_gossiping` when it has just created a self node entry in `_endpoint_state_map`
with an empty state.
2. Start nodes normally to allow them join the cluster.
3. Restart 1st node, to that will pause in `gossiper::start_gossiping`.
4. After it pauses, start second node and make sure it's making a gossip shadow round. At this step
if a fix is not in place, the second node will receive an empty host ID (which is a race).
5. After shadow round on the 2nd node done, unpause the 1st node.
6. Make sure nodes started successfully.
"""
cmdline = [
'--logger-log-level=gossip=debug'
]
cfg = {
'error_injections_at_startup': [
{
'name': 'gossiper_publish_local_state_pause'
}
]
}
logging.info("Starting cluster normally")
node1 = await manager.server_add(cmdline=cmdline, start=False, config=cfg)
manager.server_add(cmdline=cmdline, start=False)
node1_log = await manager.server_open_log(node1.server_id)
node2 = await manager.server_add(cmdline=cmdline, start=False, seeds=[node1.ip_addr])
node2_log = await manager.server_open_log(node2.server_id)
task1 = asyncio.create_task(manager.server_start(node1.server_id))
task2 = asyncio.create_task(manager.server_start(node2.server_id))
await node1_log.wait_for("gossiper_publish_local_state_pause: waiting for message")
await manager.api.message_injection(node1.ip_addr, 'gossiper_publish_local_state_pause')
await task1
await task2
logging.info("Stopping cluster")
await manager.server_stop_gracefully(node1.server_id)
await manager.server_stop_gracefully(node2.server_id)
# Remember logs
paused_node_mark = await node1_log.mark()
reading_node_mark = await node2_log.mark()
logging.info("Restarting cluster")
# Start first node and make sure it's paused on gossiper_publish_local_state_pause
task1 = asyncio.create_task(
manager.server_start(node1.server_id, wait_interval=120))
await node1_log.wait_for("gossiper_publish_local_state_pause: waiting for message", from_mark=paused_node_mark)
logging.info("Found gossiper_publish_local_state_pause")
# After the first node started and paused in start_gossiping, start the second node/
task2 = asyncio.create_task(manager.server_start(node2.server_id))
# Make sure the 2nd node received a `get_endpoint_states` request response.
await node2_log.wait_for(f"Got get_endpoint_states response from {node1.ip_addr}", from_mark=reading_node_mark)
# Unpause the 1st node.
await manager.api.message_injection(node1.ip_addr, 'gossiper_publish_local_state_pause')
await task1
await task2

View File

@@ -0,0 +1,97 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from aiohttp import ServerDisconnectedError
import pytest
from test.cluster.conftest import skip_mode
from test.cluster.util import get_coordinator_host
from test.pylib.manager_client import ManagerClient
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_gossiper_race_on_decommission(manager: ManagerClient):
"""
Test for gossiper race scenario (https://github.com/scylladb/scylladb/issues/25621):
- Create a cluster with multiple nodes
- Decommission one node while injecting delays in gossip processing
- Check for the race condition where get_host_id() is called on a removed endpoint
"""
cmdline = [
'--logger-log-level=gossip=debug',
'--logger-log-level=raft_topology=debug'
]
# Create cluster with more nodes to increase gossip traffic
servers = await manager.servers_add(3, cmdline=cmdline)
coordinator = await get_coordinator_host(manager=manager)
coordinator_log = await manager.server_open_log(server_id=coordinator.server_id)
coordinator_log_mark = await coordinator_log.mark()
decom_node = next(s for s in servers if s.server_id != coordinator.server_id)
# enable the delay_gossiper_apply injection
await manager.api.enable_injection(
node_ip=coordinator.ip_addr,
injection="delay_gossiper_apply",
one_shot=False,
parameters={"delay_node": decom_node.ip_addr},
)
# wait for the "delay_gossiper_apply" error injection to take effect
# - wait for multiple occurrences to be batched, so that there is a higher chance of one of them
# failing down in the `gossiper::do_on_change_notifications()`
for _ in range(5):
log_mark = await coordinator_log.mark()
await coordinator_log.wait_for(
"delay_gossiper_apply: suspend for node",
from_mark=log_mark,
)
coordinator_log_mark = await coordinator_log.mark()
# start the decommission task
await manager.decommission_node(decom_node.server_id)
# wait for the node to finish the removal
await coordinator_log.wait_for(
"Finished to force remove node",
from_mark=coordinator_log_mark,
)
coordinator_log_mark = await coordinator_log.mark()
try:
# unblock the delay_gossiper_apply injection
await manager.api.message_injection(
node_ip=coordinator.ip_addr,
injection="delay_gossiper_apply",
)
except ServerDisconnectedError:
# the server might get disconnected in the failure case because of abort
# - we detect that later (with more informatiove error handling), so we ignore this here
pass
# wait for the "delay_gossiper_apply" error injection to be unblocked
await coordinator_log.wait_for(
"delay_gossiper_apply: resume for node",
from_mark=coordinator_log_mark,
)
# test that the coordinator node didn't hit the case where it would try to add a state with empty host id
empty_host_found = await coordinator_log.grep(
"gossip - attempting to add a state with empty host id",
from_mark=coordinator_log_mark,
)
assert not empty_host_found, "Empty host ID has been found in gossiper::replicate()"
# secondary test - ensure the coordinator node is still running
running_servers = await manager.running_servers()
assert coordinator.server_id in [s.server_id for s in running_servers]