raft topology: make rollback_to_normal a transition state

After changing `left_token_ring` from a node state to a transition
state in scylladb/scylladb#17009, we do the same for
`rollback_to_normal`. `rollback_to_normal` was created as a node
state because `left_token_ring` was a node state.

This change will allow us to distinguish a failed removenode from
a failed decommission in the `rollback_to_normal` handler.
Currently, we use the same logic for both of them, so it's not
required. However, this might change, as it has happened with the
decommission and the failed bootstrap/replace in the
`left_token_ring` state (scylladb/scylladb#16797). We are making
this change now because it would be much harder after branching.

The change also simplifies the code in
`topology_coordinator:rollback_current_topology_op`.

Moving the `rollback_to_normal` handler from
`handle_node_transition` to `handle_topology_transition` created a
large diff. There is only one change - adding
`auto node = get_node_to_work_on(std::move(guard));`.
This commit is contained in:
Patryk Jędrzejczak
2024-02-02 16:09:21 +01:00
parent c7a01b9eb4
commit 25b90f5554
7 changed files with 60 additions and 69 deletions

View File

@@ -2758,11 +2758,6 @@ future<service::topology> system_keyspace::load_topology_state() {
}
ret.req_param.emplace(host_id, service::rebuild_param{*rebuild_option});
break;
case service::node_state::rollback_to_normal:
if (replaced_id) {
ret.req_param.emplace(host_id, service::removenode_param{std::move(ignored_ids)});
}
break;
default:
// no parameters for other operations
break;
@@ -2824,14 +2819,13 @@ future<service::topology> system_keyspace::load_topology_state() {
if (some_row.has("transition_state")) {
ret.tstate = service::transition_state_from_string(some_row.get_as<sstring>("transition_state"));
} else {
// Any remaining transition_nodes must be in rebuilding or rollback_to_normal state.
// Any remaining transition_nodes must be in rebuilding state.
auto it = std::find_if(ret.transition_nodes.begin(), ret.transition_nodes.end(),
[] (auto& p) { return p.second.state != service::node_state::rebuilding &&
p.second.state != service::node_state::rollback_to_normal; });
[] (auto& p) { return p.second.state != service::node_state::rebuilding; });
if (it != ret.transition_nodes.end()) {
on_internal_error(slogger, format(
"load_topology_state: topology not in transition state"
" but transition node {} in state {} is present", it->first, it->second.state));
" but transition node {} in rebuilding state is present", it->first));
}
}

View File

@@ -334,7 +334,6 @@ static locator::node::state to_topology_node_state(node_state ns) {
case node_state::decommissioning: return locator::node::state::being_decommissioned;
case node_state::removing: return locator::node::state::being_removed;
case node_state::normal: return locator::node::state::normal;
case node_state::rollback_to_normal: return locator::node::state::normal;
case node_state::left: return locator::node::state::left;
case node_state::replacing: return locator::node::state::replacing;
case node_state::rebuilding: return locator::node::state::normal;
@@ -463,6 +462,11 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
}
[[fallthrough]];
case node_state::removing:
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
// no need for double writes anymore since op failed
co_await process_normal_node(id, rs);
break;
}
update_topology(host_id, ip, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
tmptr->add_leaving_endpoint(host_id);
@@ -490,10 +494,6 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
// Rebuilding node is normal
co_await process_normal_node(id, rs);
break;
case node_state::rollback_to_normal:
// no need for double writes anymore since op failed
co_await process_normal_node(id, rs);
break;
default:
on_fatal_internal_error(rtlogger, ::format("Unexpected state {} for node {}", rs.state, id));
}
@@ -583,6 +583,8 @@ future<> storage_service::topology_state_load() {
case topology::transition_state::write_both_read_old:
[[fallthrough]];
case topology::transition_state::left_token_ring:
[[fallthrough]];
case topology::transition_state::rollback_to_normal:
return read_new_t::no;
case topology::transition_state::write_both_read_new:
return read_new_t::yes;
@@ -4908,7 +4910,6 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
case node_state::left:
case node_state::none:
case node_state::removing:
case node_state::rollback_to_normal:
on_fatal_internal_error(rtlogger, ::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster",
raft_server.id(), rs.state));
break;

View File

@@ -1678,6 +1678,41 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
}
break;
case topology::transition_state::rollback_to_normal: {
auto node = get_node_to_work_on(std::move(guard));
// The barrier waits for all double writes started during the operation to complete. It allowed to fail
// since we will fence the requests later.
bool barrier_failed = false;
try {
node.guard = co_await exec_global_command(std::move(node.guard),raft_topology_cmd::command::barrier_and_drain, get_excluded_nodes(node), drop_guard_and_retake::yes);
} catch (term_changed_error&) {
throw;
} catch(...) {
rtlogger.warn("failed to run barrier_and_drain during rollback {}", std::current_exception());
barrier_failed = true;
}
if (barrier_failed) {
node.guard =co_await start_operation();
}
node = retake_node(std::move(node.guard), node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.set_fence_version(_topo_sm._topology.version) // fence requests in case the drain above failed
.set_transition_state(topology::transition_state::tablet_migration) // in case tablet drain failed we need to complete tablet transitions
.with_node(node.id)
.set("node_state", node_state::normal);
rtbuilder.done();
auto str = fmt::format("complete rollback of {} to state normal", node.id);
rtlogger.info("{}", str);
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, str);
}
break;
}
co_return true;
};
@@ -1870,39 +1905,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "rebuilding completed");
}
break;
case node_state::rollback_to_normal: {
// The barrier waits for all double writes started during the operation to complete. It allowed to fail
// since we will fence the requests later.
bool barrier_failed = false;
try {
node.guard = co_await exec_global_command(std::move(node.guard),raft_topology_cmd::command::barrier_and_drain, get_excluded_nodes(node), drop_guard_and_retake::yes);
} catch (term_changed_error&) {
throw;
} catch(...) {
rtlogger.warn("failed to run barrier_and_drain during rollback {}", std::current_exception());
barrier_failed = true;
}
if (barrier_failed) {
node.guard =co_await start_operation();
}
node = retake_node(std::move(node.guard), node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.set_fence_version(_topo_sm._topology.version) // fence requests in case the drain above failed
.set_transition_state(topology::transition_state::tablet_migration) // in case tablet drain failed we need to complete tablet transitions
.with_node(node.id)
.set("node_state", node_state::normal);
rtbuilder.done();
auto str = fmt::format("complete rollback of {} to state normal", node.id);
rtlogger.info("{}", str);
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, str);
}
break;
case node_state::bootstrapping:
case node_state::decommissioning:
case node_state::removing:
@@ -2260,8 +2262,7 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
// Look for a node which operation should be aborted
// (there should be one since we are in the rollback)
node_to_work_on node = get_node_to_work_on(std::move(guard));
node_state state = node.rs->state;
std::optional<topology::transition_state> transition_state;
topology::transition_state transition_state;
switch (node.rs->state) {
case node_state::bootstrapping:
@@ -2277,8 +2278,8 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
co_await _group0.group0_server().modify_config({raft::config_member{{node.id, {}}, false}}, {}, &_as);
[[fallthrough]];
case node_state::decommissioning:
// to rollback decommission or remove just move a node that we tried to remove back to normal state
state = node_state::rollback_to_normal;
// To rollback decommission or remove just move the topology to rollback_to_normal.
transition_state = topology::transition_state::rollback_to_normal;
break;
default:
on_internal_error(rtlogger, fmt::format("tried to rollback in unsupported state {}", node.rs->state));
@@ -2286,18 +2287,8 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
std::string str;
if (transition_state) {
builder.set_transition_state(*transition_state);
str = fmt::format("rollback {} after {} failure, moving transition state to {} and setting cleanup flag",
node.id, node.rs->state, *transition_state);
} else {
builder.del_transition_state();
str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state);
}
builder.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", state);
builder.set_transition_state(transition_state)
.set_version(_topo_sm._topology.version + 1);
rtbuilder.set("error", fmt::format("Rolled back: {}", *_rollback));
std::vector<canonical_mutation> muts;
@@ -2307,6 +2298,8 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
muts.emplace_back(builder.build());
muts.emplace_back(rtbuilder.build());
std::string str = fmt::format("rollback {} after {} failure, moving transition state to {} and setting cleanup flag",
node.id, node.rs->state, transition_state);
rtlogger.info("{}", str);
co_await update_topology_state(std::move(node.guard), std::move(muts), str);
}

View File

@@ -169,6 +169,7 @@ static std::unordered_map<topology::transition_state, sstring> transition_state_
{topology::transition_state::tablet_migration, "tablet migration"},
{topology::transition_state::tablet_draining, "tablet draining"},
{topology::transition_state::left_token_ring, "left token ring"},
{topology::transition_state::rollback_to_normal, "rollback to normal"},
};
std::ostream& operator<<(std::ostream& os, topology::transition_state s) {
@@ -197,7 +198,6 @@ static std::unordered_map<node_state, sstring> node_state_to_name_map = {
{node_state::replacing, "replacing"},
{node_state::rebuilding, "rebuilding"},
{node_state::none, "none"},
{node_state::rollback_to_normal, "rollback_to_normal"},
};
std::ostream& operator<<(std::ostream& os, node_state s) {

View File

@@ -34,7 +34,6 @@ enum class node_state: uint16_t {
rebuilding, // the node is being rebuild and is streaming data from other replicas
normal, // the node does not do any streaming and serves the slice of the ring that belongs to it
left, // the node left the cluster and group0
rollback_to_normal, // the node rolls back failed decommission/remove node operation
};
// The order of the requests is a priority
@@ -116,6 +115,7 @@ struct topology {
write_both_read_new,
tablet_migration,
left_token_ring,
rollback_to_normal,
};
std::optional<transition_state> tstate;

View File

@@ -27,7 +27,8 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors")
servers = await manager.running_servers()
assert len(servers) == 3
matches = [await log.grep("raft_topology - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after decommissioning failure, moving transition state to rollback to normal",
from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
# remove failure
marks = [await log.mark() for log in logs]
@@ -36,7 +37,8 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_stop_gracefully(servers[3].server_id)
await manager.api.enable_injection(servers[2].ip_addr, 'stream_ranges_fail', one_shot=True)
await manager.remove_node(servers[0].server_id, servers[3].server_id, expected_error="Removenode failed. See earlier errors")
matches = [await log.grep("raft_topology - rollback.*after removing failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after removing failure, moving transition state to rollback to normal",
from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
await manager.server_start(servers[3].server_id)
await manager.servers_see_each_other(servers)

View File

@@ -38,7 +38,8 @@ async def test_tablet_drain_failure_during_decommission(manager: ManagerClient):
await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors")
matches = [await log.grep("raft_topology - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after decommissioning failure, moving transition state to rollback to normal",
from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
await cql.run_async("DROP KEYSPACE test;")