From 2312a7cd236594fe4e589fecc4b0287f02f28d68 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 19 Sep 2024 14:43:25 +0300 Subject: [PATCH] topology coordinator: do metadata barrier before calling finish_accepting_node() during replace During replace with the same IP a node may get queries that were intended for the node it was replacing since the new node declares itself UP before it advertises that it is a replacement. But after the node starts replacing procedure the old node is marked as "being replaced" and queries no longer sent there. It is important to do so before the new node start to get raft snapshot since the snapshot application is not atomic and queries that run parallel with it may see partial state and fail in weird ways. Queries that are sent before that will fail because schema is empty, so they will not find any tables in the first place. The is pre-existing and not addressed by this patch. (cherry picked from commit 644e7a20122e9b53bf62b284d947605e252ba6ef) --- service/topology_coordinator.cc | 41 +++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index ce058b20e8..04b50749a5 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1567,7 +1567,30 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { rtlogger.info("entered `{}` transition state", *tstate); switch (*tstate) { case topology::transition_state::join_group0: { - auto [node, accepted] = co_await finish_accepting_node(get_node_to_work_on(std::move(guard))); + auto node = get_node_to_work_on(std::move(guard)); + if (node.rs->state == node_state::replacing) { + // Make sure all nodes are no longer trying to write to a node being replaced. This is important + // if the new node have the same IP, so that old write will not go to the new node by mistake after this point. + // It is important to do so before the call to finish_accepting_node() below since after this call the new node becomes + // a full member of the cluster and it starts loading an initial snapshot. Since snapshot loading is not atomic any queries + // that are done in parallel may see a partial state. + try { + node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id); + } catch (term_changed_error&) { + throw; + } catch (group0_concurrent_modification&) { + throw; + } catch (...) { + rtlogger.error("transition_state::join_group0, " + "global_token_metadata_barrier failed, error {}", + std::current_exception()); + _rollback = fmt::format("global_token_metadata_barrier failed in join_group0 state {}", std::current_exception()); + break; + } + } + + bool accepted; + std::tie(node, accepted) = co_await finish_accepting_node(std::move(node)); // If responding to the joining node failed, move the node to the left state and // stop the topology transition. @@ -1618,22 +1641,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { break; case node_state::replacing: { assert(!node.rs->ring); - // Make sure all nodes are no longer trying to write to a node being replaced. This is important if the new node have the same IP, so that old write will not - // go to the new node by mistake - try { - node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id); - } catch (term_changed_error&) { - throw; - } catch (group0_concurrent_modification&) { - throw; - } catch (...) { - rtlogger.error("transition_state::join_group0, " - "global_token_metadata_barrier failed, error {}", - std::current_exception()); - _rollback = fmt::format("global_token_metadata_barrier failed in join_group0 state {}", std::current_exception()); - break; - } - auto replaced_id = std::get(node.req_param.value()).replaced_id; auto it = _topo_sm._topology.normal_nodes.find(replaced_id); assert(it != _topo_sm._topology.normal_nodes.end());