diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 1a75fefeca..3848f41995 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1934,6 +1934,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { rtbuilder.done(); switch(node.rs->state) { case node_state::bootstrapping: { + co_await utils::get_local_injector().inject("delay_node_bootstrap", [](auto& handler) { + rtlogger.info("delay_node_bootstrap: waiting for message"); + return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); + }); std::vector muts; // Since after bootstrapping a new node some nodes lost some ranges they need to cleanup muts = mark_nodes_as_cleanup_needed(node, false); @@ -1947,8 +1951,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { "bootstrap: read fence completed"); } break; - case node_state::removing: + case node_state::removing: { + co_await utils::get_local_injector().inject("delay_node_removal", [](auto& handler) { + rtlogger.info("delay_node_removal: waiting for message"); + return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); + }); node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id); + } [[fallthrough]]; case node_state::decommissioning: { topology_mutation_builder builder(node.guard.write_timestamp()); diff --git a/test/nodetool/test_status.py b/test/nodetool/test_status.py index 97e596947e..06c50eaa48 100644 --- a/test/nodetool/test_status.py +++ b/test/nodetool/test_status.py @@ -462,38 +462,6 @@ def test_status_keyspace_multi_dc(request, nodetool, uses_tablets, table): _do_test_status(request, nodetool, status_target, nodes) -def test_status_keyspace_joining_node(request, nodetool): - """ Joining nodes do not have some attributes available yet: - * load - * host_id - """ - nodes = [ - Node( - endpoint="127.0.0.1", - host_id="78a9c1d0-b341-467e-a076-9eff4cf7ffc6", - load=206015, - tokens=["-9175818098208185248", "-3983536194780899528"], - datacenter="datacenter1", - rack="rack1", - status=NodeStatus.Up, - state=NodeState.Normal, - ), - Node( - endpoint="127.0.0.2", - host_id=None, - load=None, - tokens=["-1810801828328238220", "2983536194780899528"], - datacenter="datacenter1", - rack="rack2", - status=NodeStatus.Up, - state=NodeState.Joining, - ), - ] - - status_target = StatusQueryTarget(keyspace="ks", table=None, uses_tablets=False) - _do_test_status(request, nodetool, status_target, nodes) - - @pytest.mark.parametrize("resolve", (None, '-r', '--resolve-ip')) def test_status_resolve(request, nodetool, resolve): nodes = [ @@ -510,3 +478,40 @@ def test_status_resolve(request, nodetool, resolve): ] _do_test_status(request, nodetool, None, nodes, resolve) + + +def test_status_with_zero_token_nodes(request, nodetool): + nodes = [ + Node( + endpoint="127.0.0.1", + host_id="78a9c1d0-b341-467e-a076-9eff4cf7ffc6", + load=206015, + tokens=["-9175818098208185248", "-3983536194780899528"], + datacenter="datacenter1", + rack="rack1", + status=NodeStatus.Up, + state=NodeState.Normal, + ), + Node( + endpoint="127.0.0.2", + host_id="ed341f60-b12a-4fd4-9917-e80977ded0f9", + load=277624, + tokens=[], + datacenter="datacenter1", + rack="rack2", + status=NodeStatus.Up, + state=NodeState.Normal, + ), + Node( + endpoint="127.0.0.3", + host_id="1e77eb26-a372-4eb4-aeaa-72f224cf6b4c", + load=353236, + tokens=[], + datacenter="datacenter1", + rack="rack3", + status=NodeStatus.Down, + state=NodeState.Leaving, + ), + ] + + _do_test_status(request, nodetool, None, nodes) diff --git a/test/topology/test_nodetool.py b/test/topology/test_nodetool.py new file mode 100644 index 0000000000..7725b5e680 --- /dev/null +++ b/test/topology/test_nodetool.py @@ -0,0 +1,240 @@ +import pytest +import asyncio +import subprocess +from test.pylib.manager_client import ManagerClient +from test.pylib.util import wait_for_first_completed +from test.topology.conftest import skip_mode + + +async def validate_status_operation(result: str, live_eps: list, down_eps: list, leaving: list, joining: list, + zero_token_eps: list, host_id_map: dict, num_tokens: object): + """Validates the nodetool status output""" + + lines = result.split('\n') + i = 0 + dc_line = lines[i] + assert dc_line.startswith("Datacenter:") + result_dc = dc_line.split()[1] + assert result_dc == "datacenter1" + dc_line_len = len(dc_line) + + i += 1 + assert lines[i] == "=" * dc_line_len + + i += 1 + assert lines[i] == "Status=Up/Down" + + i += 1 + assert lines[i] == "|/ State=Normal/Leaving/Joining/Moving" + + i += 1 + assert lines[i].split() == ["--", "Address", "Load", "Tokens", "Owns", "Host", "ID", "Rack"] + + visited_eps = set() + for j in range(len(live_eps + down_eps)): + i += 1 + assert lines[i] != "" + pieces = tuple(lines[i].split()) + if len(pieces) == 8: + status_state, ep, load, load_unit, tokens, owns, host_id, rack = pieces + else: + status_state, ep, load, tokens, owns, host_id, rack = pieces + + assert ep not in visited_eps + visited_eps.add(ep) + + assert ep in (live_eps + down_eps) + + assert status_state[0] == ('U' if ep in live_eps else 'D') + + if ep in joining: + assert status_state[1] == 'J' + elif ep in leaving: + assert status_state[1] == 'L' + else: + assert status_state[1] == 'N' + + if ep in zero_token_eps: + assert int(tokens) == 0 + else: + assert int(tokens) == num_tokens + + assert host_id == host_id_map[ep] + assert rack == "rack1" + + i += 1 + assert lines[i] == "" + + +@pytest.mark.asyncio +async def test_zero_token_node_normal(manager: ManagerClient): + zero_token_nodes = await manager.servers_add(servers_num=2, config={'join_ring': False}) + + servers = await manager.running_servers() + + exe_path = await manager.server_get_exe(servers[0].server_id) + cmd = [exe_path, "nodetool", "status"] + ["--logger-log-level", + "scylla-nodetool=trace", + "-h", servers[0].ip_addr] + + result = subprocess.run(cmd, capture_output=True, text=True) + + live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr) + down_eps = await manager.api.client.get_json("/gossiper/endpoint/down", host=servers[0].ip_addr) + leaving = await manager.api.client.get_json("/storage_service/nodes/leaving", host=servers[0].ip_addr) + + server_eps = [s.ip_addr for s in servers] + + assert len(server_eps) == len(live_eps) + for ep in live_eps: + assert ep in server_eps + assert down_eps == [] + assert leaving == [] + + zero_token_eps = [z.ip_addr for z in zero_token_nodes] + + host_id_map = {} + for srv in servers: + host_id_map[srv.ip_addr] = await manager.get_host_id(srv.server_id) + for srv in zero_token_nodes: + host_id_map[srv.ip_addr] = await manager.get_host_id(srv.server_id) + + config = await manager.server_get_config(servers[0].server_id) + await validate_status_operation(result.stdout, live_eps, down_eps, leaving, [], zero_token_eps, host_id_map, config['num_tokens']) + + zero_token_cmd = [exe_path, "nodetool", "status"] + ["--logger-log-level", + "scylla-nodetool=trace", + "-h", zero_token_eps[0]] + + zero_token_result = subprocess.run(zero_token_cmd, capture_output=True, text=True) + await validate_status_operation(zero_token_result.stdout, live_eps, down_eps, leaving, [], zero_token_eps, + host_id_map, config['num_tokens']) + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_zero_token_node_down_leaving(manager: ManagerClient): + servers = await manager.running_servers() + [await manager.api.enable_injection(s.ip_addr, 'delay_node_removal', one_shot=True) for s in servers] + + zero_token_node = await manager.server_add(config={'join_ring': False}) + await manager.server_stop_gracefully(zero_token_node.server_id) + task = asyncio.create_task(manager.remove_node(servers[0].server_id, zero_token_node.server_id)) + + exe_path = await manager.server_get_exe(servers[0].server_id) + cmd = [exe_path, "nodetool", "status"] + ["--logger-log-level", + "scylla-nodetool=trace", + "-h", servers[0].ip_addr] + + logs = [await manager.server_open_log(srv.server_id) for srv in servers] + + await wait_for_first_completed([log.wait_for("delay_node_removal: waiting for message") for log in logs]) + + result = subprocess.run(cmd, capture_output=True, text=True) + + live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr) + down_eps = await manager.api.client.get_json("/gossiper/endpoint/down", host=servers[0].ip_addr) + leaving = await manager.api.client.get_json("/storage_service/nodes/leaving", host=servers[0].ip_addr) + + server_eps = [s.ip_addr for s in servers] + + assert len(server_eps) == len(live_eps) + for ep in live_eps: + assert ep in server_eps + assert down_eps == [zero_token_node.ip_addr] + assert leaving == [zero_token_node.ip_addr] + + host_id_map = {} + for srv in servers: + host_id_map[srv.ip_addr] = await manager.get_host_id(srv.server_id) + host_id_map[zero_token_node.ip_addr] = await manager.get_host_id(zero_token_node.server_id) + + config = await manager.server_get_config(servers[0].server_id) + await validate_status_operation(result.stdout, live_eps, down_eps, leaving, [], [zero_token_node.ip_addr], + host_id_map, config['num_tokens']) + [await manager.api.message_injection(s.ip_addr, 'delay_node_removal') for s in servers] + + await task + + +@pytest.mark.asyncio +async def test_zero_token_node_down_normal(manager: ManagerClient): + servers = await manager.running_servers() + + zero_token_node = await manager.server_add(config={'join_ring': False}) + await manager.server_stop_gracefully(zero_token_node.server_id) + + exe_path = await manager.server_get_exe(servers[0].server_id) + cmd = [exe_path, "nodetool", "status"] + ["--logger-log-level", + "scylla-nodetool=trace", + "-h", servers[0].ip_addr] + + result = subprocess.run(cmd, capture_output=True, text=True) + + live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr) + down_eps = await manager.api.client.get_json("/gossiper/endpoint/down", host=servers[0].ip_addr) + leaving = await manager.api.client.get_json("/storage_service/nodes/leaving", host=servers[0].ip_addr) + + server_eps = [s.ip_addr for s in servers] + + assert len(server_eps) == len(live_eps) + for ep in live_eps: + assert ep in server_eps + assert down_eps == [zero_token_node.ip_addr] + assert leaving == [] + + host_id_map = {} + for srv in servers: + host_id_map[srv.ip_addr] = await manager.get_host_id(srv.server_id) + host_id_map[zero_token_node.ip_addr] = await manager.get_host_id(zero_token_node.server_id) + + config = await manager.server_get_config(servers[0].server_id) + await validate_status_operation(result.stdout, live_eps, down_eps, leaving, [], [zero_token_node.ip_addr], + host_id_map, config['num_tokens']) + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_regular_node_joining(manager: ManagerClient): + servers = await manager.running_servers() + [await manager.api.enable_injection(s.ip_addr, 'delay_node_bootstrap', one_shot=True) for s in servers] + + joining_server = await manager.server_add(start=False) + task = asyncio.create_task(manager.server_start(joining_server.server_id)) + + exe_path = await manager.server_get_exe(servers[0].server_id) + cmd = [exe_path, "nodetool", "status"] + ["--logger-log-level", + "scylla-nodetool=trace", + "-h", servers[0].ip_addr] + + logs = [await manager.server_open_log(srv.server_id) for srv in servers] + + await wait_for_first_completed([log.wait_for("delay_node_bootstrap: waiting for message") for log in logs]) + result = subprocess.run(cmd, capture_output=True, text=True) + + live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr) + down_eps = await manager.api.client.get_json("/gossiper/endpoint/down", host=servers[0].ip_addr) + leaving = await manager.api.client.get_json("/storage_service/nodes/leaving", host=servers[0].ip_addr) + joining = await manager.api.client.get_json("/storage_service/nodes/joining", host=servers[0].ip_addr) + + server_eps = [s.ip_addr for s in servers] + + assert len(live_eps) == 4 + assert len(server_eps) == 3 + assert len(joining) == 1 + for ep in live_eps: + assert ep in (server_eps + joining) + assert down_eps == [] + assert leaving == [] + + host_id_map = {} + for srv in servers: + host_id_map[srv.ip_addr] = await manager.get_host_id(srv.server_id) + host_id_map[joining_server.ip_addr] = await manager.get_host_id(joining_server.server_id) + + config = await manager.server_get_config(servers[0].server_id) + await validate_status_operation(result.stdout, live_eps, down_eps, leaving, joining, [], host_id_map, + config['num_tokens']) + [await manager.api.message_injection(s.ip_addr, 'delay_node_bootstrap') for s in servers] + + await task diff --git a/tools/scylla-nodetool.cc b/tools/scylla-nodetool.cc index b9d321c110..f7148fe810 100644 --- a/tools/scylla-nodetool.cc +++ b/tools/scylla-nodetool.cc @@ -2082,6 +2082,9 @@ void status_operation(scylla_rest_client& client, const bpo::variables_map& vm) const auto ep = sstring(rjson::to_string_view(te["value"])); // We are not printing the actual tokens, so it is enough just to count them. ++endpoint_tokens[ep]; + } + + for (const auto& [ep, host_id] : endpoint_host_id) { if (endpoint_rack.contains(ep)) { continue; } @@ -2129,7 +2132,7 @@ void status_operation(scylla_rest_client& client, const bpo::variables_map& vm) fmt::format("{}{}", status, state), address, load, - token_count_unknown ? "?" : fmt::to_string(endpoint_tokens.at(ep)), + token_count_unknown ? "?" : fmt::to_string(endpoint_tokens.contains(ep) ? endpoint_tokens.at(ep) : 0), !is_effective_ownership_unknown ? format("{:.1f}%", endpoint_ownership.at(ep) * 100) : "?", endpoint_host_id.contains(ep) ? endpoint_host_id.at(ep) : "?", endpoint_rack.at(ep));