From c5aa3f2897ca6be0ab9fa4b43c7314dd0640baf5 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 12 Dec 2025 12:42:59 -0300 Subject: [PATCH] test: Add reproducer for split vs intra-node migration race This is a problem caught after removing split from add_sstable_and_update_cache(), which was used by intra node migration when loading new sstables into the destination shard. Signed-off-by: Raphael S. Carvalho (cherry picked from commit a0a7941eb1c4811a7c386c3860b2816e8da3f9e6) --- compaction/compaction_manager.cc | 4 ++ test/cluster/test_tablets2.py | 77 ++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 23e002e202..0d2aa989b4 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1855,6 +1855,10 @@ protected: throw make_compaction_stopped_exception(); } }, false); + if (utils::get_local_injector().is_enabled("split_sstable_force_stop_exception")) { + throw make_compaction_stopped_exception(); + } + co_return co_await do_rewrite_sstable(std::move(sst)); } }; diff --git a/test/cluster/test_tablets2.py b/test/cluster/test_tablets2.py index 80805673ae..805269367e 100644 --- a/test/cluster/test_tablets2.py +++ b/test/cluster/test_tablets2.py @@ -1991,3 +1991,80 @@ async def test_split_and_incremental_repair_synchronization(manager: ManagerClie await manager.server_start(servers[0].server_id) hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) await manager.servers_see_each_other(servers) + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_split_and_intranode_synchronization(manager: ManagerClient): + logger.info('Bootstrapping cluster') + cfg = { 'enable_tablets': True, + 'tablet_load_stats_refresh_interval_in_seconds': 1 + } + cmdline = [ + '--logger-log-level', 'load_balancer=debug', + '--logger-log-level', 'debug_error_injection=debug', + '--logger-log-level', 'compaction=debug', + '--smp', '2', + ] + servers = await manager.servers_add(1, cmdline=cmdline, config=cfg) + server = servers[0] + + cql = manager.get_cql() + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + initial_tablets = 1 + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};") + + # insert data + pks = range(256) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks]) + + # flush the table + await manager.api.flush_keyspace(server.ip_addr, ks) + + log = await manager.server_open_log(server.server_id) + mark = await log.mark() + + tablet_token = 0 # Doesn't matter since there is one tablet + replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token) + + host_id = await manager.get_host_id(server.server_id) + src_shard = replica[1] + + # if tablet replica is at shard 0, move it to shard 1. + if src_shard == 0: + dst_shard = 1 + await manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token) + + await manager.api.enable_tablet_balancing(server.ip_addr) + + await manager.api.enable_injection(server.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False) + await manager.api.enable_injection(server.ip_addr, "split_sstable_force_stop_exception", one_shot=False) + + # force split on the test table + expected_tablet_count = initial_tablets * 2 + await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}") + + # Check that shard 0 ACKed split. + mark, _ = await log.wait_for('Setting split ready sequence number to', from_mark=mark) + + # Move tablet replica back to shard 0, where split was already ACKed. + src_shard = 1 + dst_shard = 0 + migration_task = asyncio.create_task(manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)) + + mark, _ = await log.wait_for("Finished intra-node streaming of tablet", from_mark=mark) + + await manager.api.stop_compaction(server.ip_addr, "SPLIT") + + await migration_task + + await manager.api.disable_injection(server.ip_addr, "tablet_resize_finalization_postpone") + + async def finished_splitting(): + tablet_count = await get_tablet_count(manager, server, ks, 'test') + return tablet_count >= expected_tablet_count or None + # Give enough time for split to happen in debug mode + await wait_for(finished_splitting, time.time() + 120) \ No newline at end of file