diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 699950683b..cb8d851401 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -2758,11 +2758,6 @@ future system_keyspace::load_topology_state() { } ret.req_param.emplace(host_id, service::rebuild_param{*rebuild_option}); break; - case service::node_state::rollback_to_normal: - if (replaced_id) { - ret.req_param.emplace(host_id, service::removenode_param{std::move(ignored_ids)}); - } - break; default: // no parameters for other operations break; @@ -2824,14 +2819,13 @@ future system_keyspace::load_topology_state() { if (some_row.has("transition_state")) { ret.tstate = service::transition_state_from_string(some_row.get_as("transition_state")); } else { - // Any remaining transition_nodes must be in rebuilding or rollback_to_normal state. + // Any remaining transition_nodes must be in rebuilding state. auto it = std::find_if(ret.transition_nodes.begin(), ret.transition_nodes.end(), - [] (auto& p) { return p.second.state != service::node_state::rebuilding && - p.second.state != service::node_state::rollback_to_normal; }); + [] (auto& p) { return p.second.state != service::node_state::rebuilding; }); if (it != ret.transition_nodes.end()) { on_internal_error(slogger, format( "load_topology_state: topology not in transition state" - " but transition node {} in state {} is present", it->first, it->second.state)); + " but transition node {} in rebuilding state is present", it->first)); } } diff --git a/service/storage_service.cc b/service/storage_service.cc index ab71390d51..a7d5b333ec 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -334,7 +334,6 @@ static locator::node::state to_topology_node_state(node_state ns) { case node_state::decommissioning: return locator::node::state::being_decommissioned; case node_state::removing: return locator::node::state::being_removed; case node_state::normal: return locator::node::state::normal; - case node_state::rollback_to_normal: return locator::node::state::normal; case node_state::left: return locator::node::state::left; case node_state::replacing: return locator::node::state::replacing; case node_state::rebuilding: return locator::node::state::normal; @@ -463,6 +462,11 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm } [[fallthrough]]; case node_state::removing: + if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) { + // no need for double writes anymore since op failed + co_await process_normal_node(id, rs); + break; + } update_topology(host_id, ip, rs); co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id); tmptr->add_leaving_endpoint(host_id); @@ -490,10 +494,6 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm // Rebuilding node is normal co_await process_normal_node(id, rs); break; - case node_state::rollback_to_normal: - // no need for double writes anymore since op failed - co_await process_normal_node(id, rs); - break; default: on_fatal_internal_error(rtlogger, ::format("Unexpected state {} for node {}", rs.state, id)); } @@ -583,6 +583,8 @@ future<> storage_service::topology_state_load() { case topology::transition_state::write_both_read_old: [[fallthrough]]; case topology::transition_state::left_token_ring: + [[fallthrough]]; + case topology::transition_state::rollback_to_normal: return read_new_t::no; case topology::transition_state::write_both_read_new: return read_new_t::yes; @@ -4908,7 +4910,6 @@ future storage_service::raft_topology_cmd_handler(raft case node_state::left: case node_state::none: case node_state::removing: - case node_state::rollback_to_normal: on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster", raft_server.id(), rs.state)); break; diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 1ef5aa7af1..5364274183 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1678,6 +1678,41 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str)); } break; + case topology::transition_state::rollback_to_normal: { + auto node = get_node_to_work_on(std::move(guard)); + + // The barrier waits for all double writes started during the operation to complete. It allowed to fail + // since we will fence the requests later. + bool barrier_failed = false; + try { + node.guard = co_await exec_global_command(std::move(node.guard),raft_topology_cmd::command::barrier_and_drain, get_excluded_nodes(node), drop_guard_and_retake::yes); + } catch (term_changed_error&) { + throw; + } catch(...) { + rtlogger.warn("failed to run barrier_and_drain during rollback {}", std::current_exception()); + barrier_failed = true; + } + + if (barrier_failed) { + node.guard =co_await start_operation(); + } + + node = retake_node(std::move(node.guard), node.id); + + topology_mutation_builder builder(node.guard.write_timestamp()); + topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id); + builder.set_fence_version(_topo_sm._topology.version) // fence requests in case the drain above failed + .set_transition_state(topology::transition_state::tablet_migration) // in case tablet drain failed we need to complete tablet transitions + .with_node(node.id) + .set("node_state", node_state::normal); + rtbuilder.done(); + + auto str = fmt::format("complete rollback of {} to state normal", node.id); + + rtlogger.info("{}", str); + co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, str); + } + break; } co_return true; }; @@ -1870,39 +1905,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "rebuilding completed"); } break; - case node_state::rollback_to_normal: { - // The barrier waits for all double writes started during the operation to complete. It allowed to fail - // since we will fence the requests later. - bool barrier_failed = false; - try { - node.guard = co_await exec_global_command(std::move(node.guard),raft_topology_cmd::command::barrier_and_drain, get_excluded_nodes(node), drop_guard_and_retake::yes); - } catch (term_changed_error&) { - throw; - } catch(...) { - rtlogger.warn("failed to run barrier_and_drain during rollback {}", std::current_exception()); - barrier_failed = true; - } - - if (barrier_failed) { - node.guard =co_await start_operation(); - } - - node = retake_node(std::move(node.guard), node.id); - - topology_mutation_builder builder(node.guard.write_timestamp()); - topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id); - builder.set_fence_version(_topo_sm._topology.version) // fence requests in case the drain above failed - .set_transition_state(topology::transition_state::tablet_migration) // in case tablet drain failed we need to complete tablet transitions - .with_node(node.id) - .set("node_state", node_state::normal); - rtbuilder.done(); - - auto str = fmt::format("complete rollback of {} to state normal", node.id); - - rtlogger.info("{}", str); - co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, str); - } - break; case node_state::bootstrapping: case node_state::decommissioning: case node_state::removing: @@ -2260,8 +2262,7 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard // Look for a node which operation should be aborted // (there should be one since we are in the rollback) node_to_work_on node = get_node_to_work_on(std::move(guard)); - node_state state = node.rs->state; - std::optional transition_state; + topology::transition_state transition_state; switch (node.rs->state) { case node_state::bootstrapping: @@ -2277,8 +2278,8 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard co_await _group0.group0_server().modify_config({raft::config_member{{node.id, {}}, false}}, {}, &_as); [[fallthrough]]; case node_state::decommissioning: - // to rollback decommission or remove just move a node that we tried to remove back to normal state - state = node_state::rollback_to_normal; + // To rollback decommission or remove just move the topology to rollback_to_normal. + transition_state = topology::transition_state::rollback_to_normal; break; default: on_internal_error(rtlogger, fmt::format("tried to rollback in unsupported state {}", node.rs->state)); @@ -2286,18 +2287,8 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard topology_mutation_builder builder(node.guard.write_timestamp()); topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id); - std::string str; - if (transition_state) { - builder.set_transition_state(*transition_state); - str = fmt::format("rollback {} after {} failure, moving transition state to {} and setting cleanup flag", - node.id, node.rs->state, *transition_state); - } else { - builder.del_transition_state(); - str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state); - } - builder.set_version(_topo_sm._topology.version + 1) - .with_node(node.id) - .set("node_state", state); + builder.set_transition_state(transition_state) + .set_version(_topo_sm._topology.version + 1); rtbuilder.set("error", fmt::format("Rolled back: {}", *_rollback)); std::vector muts; @@ -2307,6 +2298,8 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard muts.emplace_back(builder.build()); muts.emplace_back(rtbuilder.build()); + std::string str = fmt::format("rollback {} after {} failure, moving transition state to {} and setting cleanup flag", + node.id, node.rs->state, transition_state); rtlogger.info("{}", str); co_await update_topology_state(std::move(node.guard), std::move(muts), str); } diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index 5e8a09b3dd..0436356ad5 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -169,6 +169,7 @@ static std::unordered_map transition_state_ {topology::transition_state::tablet_migration, "tablet migration"}, {topology::transition_state::tablet_draining, "tablet draining"}, {topology::transition_state::left_token_ring, "left token ring"}, + {topology::transition_state::rollback_to_normal, "rollback to normal"}, }; std::ostream& operator<<(std::ostream& os, topology::transition_state s) { @@ -197,7 +198,6 @@ static std::unordered_map node_state_to_name_map = { {node_state::replacing, "replacing"}, {node_state::rebuilding, "rebuilding"}, {node_state::none, "none"}, - {node_state::rollback_to_normal, "rollback_to_normal"}, }; std::ostream& operator<<(std::ostream& os, node_state s) { diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index e63deb617b..12a869b2fc 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -34,7 +34,6 @@ enum class node_state: uint16_t { rebuilding, // the node is being rebuild and is streaming data from other replicas normal, // the node does not do any streaming and serves the slice of the ring that belongs to it left, // the node left the cluster and group0 - rollback_to_normal, // the node rolls back failed decommission/remove node operation }; // The order of the requests is a priority @@ -116,6 +115,7 @@ struct topology { write_both_read_new, tablet_migration, left_token_ring, + rollback_to_normal, }; std::optional tstate; diff --git a/test/topology/test_topology_failure_recovery.py b/test/topology/test_topology_failure_recovery.py index c83019c358..2930742d6f 100644 --- a/test/topology/test_topology_failure_recovery.py +++ b/test/topology/test_topology_failure_recovery.py @@ -27,7 +27,8 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors") servers = await manager.running_servers() assert len(servers) == 3 - matches = [await log.grep("raft_topology - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after decommissioning failure, moving transition state to rollback to normal", + from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 # remove failure marks = [await log.mark() for log in logs] @@ -36,7 +37,8 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_stop_gracefully(servers[3].server_id) await manager.api.enable_injection(servers[2].ip_addr, 'stream_ranges_fail', one_shot=True) await manager.remove_node(servers[0].server_id, servers[3].server_id, expected_error="Removenode failed. See earlier errors") - matches = [await log.grep("raft_topology - rollback.*after removing failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after removing failure, moving transition state to rollback to normal", + from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 await manager.server_start(servers[3].server_id) await manager.servers_see_each_other(servers) diff --git a/test/topology_custom/test_topology_failure_recovery.py b/test/topology_custom/test_topology_failure_recovery.py index b73766a3b8..91cc6206cc 100644 --- a/test/topology_custom/test_topology_failure_recovery.py +++ b/test/topology_custom/test_topology_failure_recovery.py @@ -38,7 +38,8 @@ async def test_tablet_drain_failure_during_decommission(manager: ManagerClient): await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors") - matches = [await log.grep("raft_topology - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after decommissioning failure, moving transition state to rollback to normal", + from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 await cql.run_async("DROP KEYSPACE test;")