From 4c466ace4fced32e667bc487c8d95c321e2b0356 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 27 Oct 2025 20:34:25 -0300 Subject: [PATCH] sstables_loader: Don't bypass synchronization with busy topology The patch c543059f86 fixed the synchronization issue between tablet split and load-and-stream. The synchronization worked only with raft topology, and therefore was disabled with gossip. To do the check, storage_service::raft_topology_change_enabled() but the topology kind is only available/set on shard 0, so it caused the synchronization to be bypassed when load-and-stream runs on any shard other than 0. The reason the reproducer didn't catch it is that it was restricted to single cpu. It will now run with multi cpu and catch the problem observed. Fixes #22707 Signed-off-by: Raphael S. Carvalho Closes scylladb/scylladb#26730 (cherry picked from commit 7f34366b9ded7cf0af006e835f4db4a5e07a7600) --- service/storage_service.cc | 14 +++ service/storage_service.hh | 8 +- sstables_loader.cc | 61 +++++++++++ test/cluster/test_tablets2.py | 185 ++++++++++++++++++++++++++++++++++ 4 files changed, 262 insertions(+), 6 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 6015391cc6..6c8e7a9cfb 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -7890,6 +7890,20 @@ void storage_service::set_topology_change_kind(topology_change_kind kind) { _gossiper.set_topology_state_machine(kind == topology_change_kind::raft ? & _topology_state_machine : nullptr); } +bool storage_service::raft_topology_change_enabled() const { + if (this_shard_id() != 0) { + on_internal_error(slogger, "raft_topology_change_enabled() must run on shard 0"); + } + return _topology_change_kind_enabled == topology_change_kind::raft; +} + +bool storage_service::legacy_topology_change_enabled() const { + if (this_shard_id() != 0) { + on_internal_error(slogger, "legacy_topology_change_enabled() must run on shard 0"); + } + return _topology_change_kind_enabled == topology_change_kind::legacy; +} + future<> storage_service::register_protocol_server(protocol_server& server, bool start_instantly) { _protocol_servers.push_back(&server); if (start_instantly) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 4457828a40..ee1f93db65 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -840,12 +840,8 @@ private: topology_change_kind upgrade_state_to_topology_op_kind(topology::upgrade_state_type upgrade_state) const; public: - bool raft_topology_change_enabled() const { - return _topology_change_kind_enabled == topology_change_kind::raft; - } - bool legacy_topology_change_enabled() const { - return _topology_change_kind_enabled == topology_change_kind::legacy; - } + bool raft_topology_change_enabled() const; + bool legacy_topology_change_enabled() const; private: future<> _raft_state_monitor = make_ready_future<>(); diff --git a/sstables_loader.cc b/sstables_loader.cc index b5dcfba05c..a823566273 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -512,6 +512,67 @@ static std::unique_ptr make_sstable_streamer(bool uses_tablets return std::make_unique(std::forward(args)...); } +<<<<<<< HEAD +||||||| parent of 7f34366b9d (sstables_loader: Don't bypass synchronization with busy topology) +future sstables_loader::await_topology_quiesced_and_get_erm(::table_id table_id) { + // By waiting for topology to quiesce, we guarantee load-and-stream will not start in the middle + // of a topology operation that changes the token range boundaries, e.g. split or merge. + // Split, for example, first executes the barrier and then splits the tablets. + // So it can happen a sstable is generated between those steps and will incorrectly span two + // tablets. We want to serialize load-and-stream and split finalization (a topology op). + + locator::effective_replication_map_ptr erm; + while (true) { + auto& t = _db.local().find_column_family(table_id); + erm = t.get_effective_replication_map(); + auto expected_topology_version = erm->get_token_metadata().get_version(); + auto& ss = _ss.local(); + + // optimistically attempt to grab an erm on quiesced topology + // The awaiting is only needed with tablet over raft, so we're bypassing the check + // when raft is disabled. + if (!ss.raft_topology_change_enabled() || co_await ss.verify_topology_quiesced(expected_topology_version)) { + break; + } + erm = nullptr; + co_await _ss.local().await_topology_quiesced(); + } + + co_return std::move(erm); +} + +======= +future sstables_loader::await_topology_quiesced_and_get_erm(::table_id table_id) { + // By waiting for topology to quiesce, we guarantee load-and-stream will not start in the middle + // of a topology operation that changes the token range boundaries, e.g. split or merge. + // Split, for example, first executes the barrier and then splits the tablets. + // So it can happen a sstable is generated between those steps and will incorrectly span two + // tablets. We want to serialize load-and-stream and split finalization (a topology op). + + locator::effective_replication_map_ptr erm; + while (true) { + auto& t = _db.local().find_column_family(table_id); + erm = t.get_effective_replication_map(); + auto expected_topology_version = erm->get_token_metadata().get_version(); + auto& ss = _ss.local(); + + // The awaiting only works with raft enabled, and we only need it with tablets, + // so let's bypass the awaiting when tablet is disabled. + if (!t.uses_tablets()) { + break; + } + // optimistically attempt to grab an erm on quiesced topology + if (co_await ss.verify_topology_quiesced(expected_topology_version)) { + break; + } + erm = nullptr; + co_await _ss.local().await_topology_quiesced(); + } + + co_return std::move(erm); +} + +>>>>>>> 7f34366b9d (sstables_loader: Don't bypass synchronization with busy topology) future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name, ::table_id table_id, std::vector sstables, bool primary, bool unlink, stream_scope scope, shared_ptr progress) { diff --git a/test/cluster/test_tablets2.py b/test/cluster/test_tablets2.py index 927f6e5186..dba2c56e4f 100644 --- a/test/cluster/test_tablets2.py +++ b/test/cluster/test_tablets2.py @@ -1974,3 +1974,188 @@ async def test_split_correctness_on_tablet_count_change(manager: ManagerClient): await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait") await asyncio.sleep(.1) await manager.api.message_injection(server.ip_addr, "merge_completion_fiber") +<<<<<<< HEAD +||||||| parent of 7f34366b9d (sstables_loader: Don't bypass synchronization with busy topology) + +# Reproducer for https://github.com/scylladb/scylladb/issues/26041. +@pytest.mark.parametrize("primary_replica_only", [False, True]) +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_load_and_stream_and_split_synchronization(manager: ManagerClient, primary_replica_only): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'storage_service=debug', + '--logger-log-level', 'table=debug', + '--smp', '1', + ] + servers = [await manager.server_add(config={ + 'tablet_load_stats_refresh_interval_in_seconds': 1 + }, cmdline=cmdline)] + server = servers[0] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + + initial_tablets = 1 + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};") + + keys = range(100) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys]) + + async def check(ks_name: str): + logger.info("Checking table") + cql = manager.get_cql() + rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;") + assert len(rows) == len(keys) + for r in rows: + assert r.c == r.pk + + await manager.api.flush_keyspace(servers[0].ip_addr, ks) + await check(ks) + + node_workdir = await manager.server_get_workdir(servers[0].server_id) + + cql = await safe_server_stop_gracefully(manager, servers[0].server_id) + + table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0] + logger.info(f"Table dir: {table_dir}") + + def move_sstables_to_upload(table_dir: str): + logger.info("Moving sstables to upload dir") + table_upload_dir = os.path.join(table_dir, "upload") + for sst in glob.glob(os.path.join(table_dir, "*-Data.db")): + for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")): + dst_path = os.path.join(table_upload_dir, os.path.basename(src_path)) + logger.info(f"Moving sstable file {src_path} to {dst_path}") + os.rename(src_path, dst_path) + + move_sstables_to_upload(table_dir) + + await manager.server_start(servers[0].server_id) + cql = manager.get_cql() + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;") + assert len(rows) == 0 + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True) + + s1_log = await manager.server_open_log(servers[0].server_id) + s1_mark = await s1_log.mark() + + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}") + + await s1_log.wait_for(f"tablet_resize_finalization_post_barrier: waiting", from_mark=s1_mark) + + await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments", one_shot=True) + + load_and_stream_task = asyncio.create_task(manager.api.load_new_sstables(servers[0].ip_addr, ks, "test", primary_replica_only)) + await s1_log.wait_for(f"Loading new SSTables for keyspace", from_mark=s1_mark) + + await manager.api.message_injection(server.ip_addr, "tablet_resize_finalization_post_barrier") + await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) + + await s1_log.wait_for(f"stream_mutation_fragments: waiting", from_mark=s1_mark) + await manager.api.message_injection(server.ip_addr, "stream_mutation_fragments") + + await load_and_stream_task + + await check(ks) +======= + +# Reproducer for https://github.com/scylladb/scylladb/issues/26041. +@pytest.mark.parametrize("primary_replica_only", [False, True]) +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_load_and_stream_and_split_synchronization(manager: ManagerClient, primary_replica_only): + logger.info("Bootstrapping cluster") + cmdline = [ + '--logger-log-level', 'storage_service=debug', + '--logger-log-level', 'table=debug', + ] + servers = [await manager.server_add(config={ + 'tablet_load_stats_refresh_interval_in_seconds': 1 + }, cmdline=cmdline)] + server = servers[0] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + + initial_tablets = 1 + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};") + + keys = range(100) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys]) + + async def check(ks_name: str): + logger.info("Checking table") + cql = manager.get_cql() + rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;") + assert len(rows) == len(keys) + for r in rows: + assert r.c == r.pk + + await manager.api.flush_keyspace(servers[0].ip_addr, ks) + await check(ks) + + node_workdir = await manager.server_get_workdir(servers[0].server_id) + + cql = await safe_server_stop_gracefully(manager, servers[0].server_id) + + table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0] + logger.info(f"Table dir: {table_dir}") + + def move_sstables_to_upload(table_dir: str): + logger.info("Moving sstables to upload dir") + table_upload_dir = os.path.join(table_dir, "upload") + for sst in glob.glob(os.path.join(table_dir, "*-Data.db")): + for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")): + dst_path = os.path.join(table_upload_dir, os.path.basename(src_path)) + logger.info(f"Moving sstable file {src_path} to {dst_path}") + os.rename(src_path, dst_path) + + move_sstables_to_upload(table_dir) + + await manager.server_start(servers[0].server_id) + cql = manager.get_cql() + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;") + assert len(rows) == 0 + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True) + + s1_log = await manager.server_open_log(servers[0].server_id) + s1_mark = await s1_log.mark() + + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + + await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}") + + await s1_log.wait_for(f"tablet_resize_finalization_post_barrier: waiting", from_mark=s1_mark) + + await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments", one_shot=True) + + load_and_stream_task = asyncio.create_task(manager.api.load_new_sstables(servers[0].ip_addr, ks, "test", primary_replica_only)) + await s1_log.wait_for(f"Loading new SSTables for keyspace", from_mark=s1_mark) + + await manager.api.message_injection(server.ip_addr, "tablet_resize_finalization_post_barrier") + await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark) + + await s1_log.wait_for(f"stream_mutation_fragments: waiting", from_mark=s1_mark) + await manager.api.message_injection(server.ip_addr, "stream_mutation_fragments") + + await load_and_stream_task + + await check(ks) +>>>>>>> 7f34366b9d (sstables_loader: Don't bypass synchronization with busy topology)