Compare commits

..

1 Commits

Author SHA1 Message Date
Yaniv Michael Kaul
9b65d6d0fc topology: propagate error messages through raft_topology_cmd_result
When a topology command (e.g., rebuild) fails on a target node, the
exception message was being swallowed at multiple levels:

1. raft_topology_cmd_handler caught exceptions and returned a bare
   fail status with no error details.
2. exec_direct_command_helper saw the fail status and threw a generic
   "failed status returned from {id}" message.
3. The rebuilding handler caught that and stored a hardcoded
   "streaming failed" message.

This meant users only saw "rebuild failed: streaming failed" instead
of the actionable error from the safety check (e.g., "it is unsafe
to use source_dc=dc2 to rebuild keyspace=...").

Fix by:
- Adding an error_message field to raft_topology_cmd_result (with
  [[version 2026.2]] for wire compatibility).
- Populating error_message with the exception text in the handler's
  catch blocks.
- Including error_message in the exception thrown by
  exec_direct_command_helper.
- Passing the actual error through to rtbuilder.done() instead of
  the hardcoded "streaming failed".

A follow-up test is in https://github.com/scylladb/scylladb/pull/29363

Fixes: SCYLLADB-1404
2026-04-24 11:25:31 +03:00
11 changed files with 38 additions and 26 deletions

View File

@@ -72,6 +72,7 @@ struct raft_topology_cmd_result {
success
};
service::raft_topology_cmd_result::command_status status;
sstring error_message [[version 2026.2]];
};
struct raft_snapshot {

View File

@@ -4792,8 +4792,13 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
} catch (const raft::request_aborted& e) {
rtlogger.warn("raft_topology_cmd {} failed with: {}", cmd.cmd, e);
result.error_message = e.what();
} catch (const std::exception& e) {
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, e);
result.error_message = e.what();
} catch (...) {
rtlogger.error("raft_topology_cmd {} failed with: {}", cmd.cmd, std::current_exception());
result.error_message = "unknown error";
}
rtlogger.info("topology cmd rpc {} completed with status={} index={}",

View File

@@ -443,8 +443,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd(
&_messaging, to_host_id(id), id, _term, cmd_index, cmd);
if (result.status == raft_topology_cmd_result::command_status::fail) {
auto msg = result.error_message.empty()
? ::format("failed status returned from {}", id)
: ::format("failed status returned from {}: {}", id, result.error_message);
co_await coroutine::exception(std::make_exception_ptr(
std::runtime_error(::format("failed status returned from {}", id))));
std::runtime_error(std::move(msg))));
}
};
@@ -3909,10 +3912,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
throw;
} catch (seastar::abort_requested_exception&) {
throw;
} catch (const std::exception& e) {
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
" (node state is rebuilding): {}", e);
rtbuilder.done(e.what());
retake = true;
} catch (...) {
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
" (node state is rebuilding): {}", std::current_exception());
rtbuilder.done("streaming failed");
rtbuilder.done("unknown error");
retake = true;
}
if (retake) {

View File

@@ -318,6 +318,9 @@ struct raft_topology_cmd_result {
success
};
command_status status = command_status::fail;
// Carries the error description back to the topology coordinator
// when the command fails.
sstring error_message;
};
// This class is used in RPC's signatures to hold the topology_version of the caller.

View File

@@ -18,7 +18,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from pathlib import Path
from typing import TYPE_CHECKING
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
from test import TOP_SRC_DIR, path_to
from test.pylib.random_tables import RandomTables
from test.pylib.skip_types import skip_env
from test.pylib.util import unique_name
@@ -394,8 +394,3 @@ async def key_provider(request, tmpdir, scylla_binary):
"""Encryption providers fixture"""
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
yield res
@pytest.fixture(scope="function")
def failure_detector_timeout(build_mode):
return 2000 * MODES_TIMEOUT_FACTOR[build_mode]

View File

@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
bootstrap, removenode, replace.
@@ -41,7 +41,7 @@ async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detect
"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': failure_detector_timeout
'failure_detector_timeout_in_ms': 2000
}
cmdline = [
'--logger-log-level', 'raft_topology=trace',

View File

@@ -22,11 +22,11 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.nightly
async def test_banned_node_notification(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_banned_node_notification(manager: ManagerClient) -> None:
"""Test that a node banned from the cluster get notification about been banned"""
# Decrease the failure detector threshold so we don't have to wait for too long.
config = {
'failure_detector_timeout_in_ms': failure_detector_timeout
'failure_detector_timeout_in_ms': 2000
}
srvs = await manager.servers_add(3, config=config, auto_rack_dc="dc")
cql = manager.get_cql()

View File

@@ -60,14 +60,14 @@ async def make_servers(manager: ManagerClient, servers_num: int,
@pytest.mark.asyncio
async def test_raft_replace_ignore_nodes(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_raft_replace_ignore_nodes(manager: ManagerClient) -> None:
"""Replace 3 dead nodes.
This is a slow test with a 7 node cluster and 3 replace operations,
we want to run it only in dev mode.
"""
logger.info("Booting initial cluster")
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': 2000})
s1_id = await manager.get_host_id(servers[1].server_id)
s2_id = await manager.get_host_id(servers[2].server_id)

View File

@@ -21,9 +21,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_different_ip(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_different_ip(manager: ManagerClient) -> None:
"""Replace an existing node with new node using a different IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
logger.info(f"cluster started, servers {servers}")
logger.info(f"replacing server {servers[0]}")
@@ -67,18 +67,18 @@ async def test_replace_different_ip(manager: ManagerClient, failure_detector_tim
logger.info(f"server {s} system.peers and gossiper state is valid")
@pytest.mark.asyncio
async def test_replace_different_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node reusing the replaced node host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
@pytest.mark.asyncio
async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout}, auto_rack_dc="dc1")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
logger.info(f"creating test table")
@@ -130,9 +130,9 @@ async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detecto
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
@pytest.mark.asyncio
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address and same host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
await manager.server_add(replace_cfg)

View File

@@ -14,9 +14,9 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_with_same_ip_twice(manager: ManagerClient, failure_detector_timeout) -> None:
async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None:
logger.info("starting a cluster with two nodes")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
logger.info(f"cluster started {servers}")
async def replace_with_same_ip(s: ServerInfo) -> ServerInfo:

View File

@@ -119,14 +119,14 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage, failure_detector_timeout):
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
if fail_stage == 'cleanup' and fail_replica == 'destination':
skip_env('Failing destination during cleanup is pointless')
if fail_stage == 'cleanup_target' and fail_replica == 'source':
skip_env('Failing source during target cleanup is pointless')
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': failure_detector_timeout}
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
host_ids = []
servers = []