diff --git a/db/schema_tables.cc b/db/schema_tables.cc index efcd4a8e88..cdd1c6cb67 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -1294,7 +1294,6 @@ static future<> do_merge_schema(distributed& proxy, shar schema_ptr s = keyspaces(); // compare before/after schemas of the affected keyspaces only std::set keyspaces; - std::set column_families; std::unordered_map affected_tables; bool has_tablet_mutations = false; for (auto&& mutation : mutations) { @@ -1309,7 +1308,6 @@ static future<> do_merge_schema(distributed& proxy, shar } keyspaces.emplace(std::move(keyspace_name)); - column_families.emplace(mutation.column_family_id()); // We must force recalculation of schema version after the merge, since the resulting // schema may be a mix of the old and new schemas, with the exception of entries // that originate from group 0. diff --git a/service/storage_service.cc b/service/storage_service.cc index ce6bda9c1c..4854af3a73 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -502,20 +502,29 @@ future storage_service::sync_raft_t switch (rs.state) { case node_state::bootstrapping: if (rs.ring.has_value()) { - if (ip && !is_me(*ip)) { - // Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned - sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, {})); - } - update_topology(host_id, ip, rs); - if (_topology_state_machine._topology.normal_nodes.empty()) { - // This is the first node in the cluster. Insert the tokens as normal to the token ring early - // so we can perform writes to regular 'distributed' tables during the bootstrap procedure - // (such as the CDC generation write). - // It doesn't break anything to set the tokens to normal early in this single-node case. - co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id); - } else { - tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id); - co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip)); + if (ip) { + if (!is_me(*ip)) { + utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] { + rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node"); + _exit(1); + }); + + // Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned + sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, {})); + } + update_topology(host_id, ip, rs); + if (_topology_state_machine._topology.normal_nodes.empty()) { + // This is the first node in the cluster. Insert the tokens as normal to the token ring early + // so we can perform writes to regular 'distributed' tables during the bootstrap procedure + // (such as the CDC generation write). + // It doesn't break anything to set the tokens to normal early in this single-node case. + co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id); + } else { + tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id); + co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip)); + } + } else if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_new) { + on_internal_error(rtlogger, format("Bootstrapping node {} does not have IP mapping but the topology is in the write_both_read_new state", id)); } } break; @@ -5346,6 +5355,14 @@ future storage_service::raft_topology_cmd_handler(raft } break; case raft_topology_cmd::command::barrier_and_drain: { + if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_old) { + for (auto& n : _topology_state_machine._topology.transition_nodes) { + if (!_group0->address_map().find(n.first)) { + rtlogger.error("The topology transition is in a double write state but the IP of the node in transition is not known"); + break; + } + } + } co_await container().invoke_on_all([version] (storage_service& ss) -> future<> { const auto current_version = ss._shared_token_metadata.get()->get_version(); rtlogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}", diff --git a/test/topology_custom/test_ip_mappings.py b/test/topology_custom/test_ip_mappings.py new file mode 100644 index 0000000000..913a3aa612 --- /dev/null +++ b/test/topology_custom/test_ip_mappings.py @@ -0,0 +1,55 @@ +# Copyright (C) 2024-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +import asyncio +from test.pylib.manager_client import ManagerClient + +import pytest +import logging + +from test.pylib.rest_client import inject_error_one_shot + +logger = logging.getLogger(__name__) + +@pytest.mark.asyncio +async def test_broken_bootstrap(manager: ManagerClient): + server_a = await manager.server_add() + server_b = await manager.server_add(start=False) + + await manager.cql.run_async("CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}") + await manager.cql.run_async("CREATE TABLE test.test (a int PRIMARY KEY, b int)") + for i in range(100): + await manager.cql.run_async(f"INSERT INTO test.test (a, b) VALUES ({i}, {i})") + await inject_error_one_shot(manager.api, server_a.ip_addr, "crash-before-bootstrapping-node-added") + try: + # Timeout fast since we do not expect the operation to complete + # because the coordinator is dead by now due to the error injection + # above + await manager.server_start(server_b.server_id, timeout=5) + pytest.fail("Expected server_add to fail") + except Exception: + pass + + await manager.server_stop(server_b.server_id) + await manager.server_stop(server_a.server_id) + + stop_event = asyncio.Event() + async def worker(): + logger.info("Worker started") + while not stop_event.is_set(): + for i in range(100): + await manager.cql.run_async(f"INSERT INTO test.test (a, b) VALUES ({i}, {i})") + response = await manager.cql.run_async(f"SELECT * FROM test.test WHERE a = {i}") + assert response[0].b == i + await asyncio.sleep(0.1) + logger.info("Worker stopped") + + await manager.server_start(server_a.server_id) + await manager.driver_connect() + + worker_task = asyncio.create_task(worker()) + + await asyncio.sleep(20) + stop_event.set() + await worker_task