From 69d2a90106e1e8f7b8d835db52b57b54bad20106 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Wed, 15 Apr 2026 12:49:41 +0200 Subject: [PATCH 1/2] streaming: stream_blob: hold table for streaming When initializing streaming sources in tablet_stream_files_handler we use a reference to the table. We should hold the table while doing so, because otherwise the table may be dropped and destroyed when we yield. Use the table.stream_in_progress() phaser to hold the table while we access it. For sstable file streaming we can release the table after the snapshot is initialized, and the table may be dropped safely because the files are held by the snapshot and we don't access the table anymore. There was a single access to the table for logging but it is replaced by a pre-calculated variable. For logstor segment streaming, currently it doesn't support discarding the segments while they are streamed - when the table is dropped it discard the segments by overwriting and freeing them, so they shouldn't be accessed after that. Therefore, in that case continue to hold the table until streaming is completed. Fixes SCYLLADB-1533 --- streaming/stream_blob.cc | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/streaming/stream_blob.cc b/streaming/stream_blob.cc index 0266c44b20..f1f898b301 100644 --- a/streaming/stream_blob.cc +++ b/streaming/stream_blob.cc @@ -742,10 +742,12 @@ tablet_stream_files(netw::messaging_service& ms, std::list sou future tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) { stream_files_response resp; auto& table = db.find_column_family(req.table); + auto table_stream_op = table.stream_in_progress(); auto files = std::list(); auto reader = co_await db.obtain_reader_permit(table, "tablet_file_streaming", db::no_timeout, {}); + bool is_logstor_table = table.uses_logstor(); - if (table.uses_logstor()) { + if (is_logstor_table) { auto segments = co_await table.take_logstor_snapshot(req.range); for (auto& seg : segments) { auto& info = files.emplace_back(); @@ -807,6 +809,9 @@ future tablet_stream_files_handler(replica::database& db, // that sstable's content has been fully streamed. sstables.clear(); + // Release the table - we don't need to access it anymore and the files are held by the snapshot. + table_stream_op = {}; + blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}", req.ops_id, sstable_nr, files.size(), files, req.range); } @@ -820,7 +825,7 @@ future tablet_stream_files_handler(replica::database& db, resp.stream_bytes = stream_bytes; auto duration = std::chrono::steady_clock::now() - ops_start_time; blogger.info("stream_{}[{}] Finished sending files_nr={} range={} stream_bytes={} stream_time={} stream_bw={}", - table.uses_logstor() ? "logstor_segments" : "sstables", + is_logstor_table ? "logstor_segments" : "sstables", req.ops_id, files_nr, req.range, stream_bytes, duration, get_bw(stream_bytes, ops_start_time)); co_return resp; } From cc94467097b6e23e9fe2a25b0fb1a1a713dfac82 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Wed, 15 Apr 2026 13:13:16 +0200 Subject: [PATCH 2/2] test: test drop table during streaming Add a test that drops a table while tablet streaming is running for the table. The table is dropped after taking the storage snapshot and initializating streaming sources - after that streaming should be able to complete or abort correctly if the table is dropped. We want to verify there is no incorrect access to the destroyed table. The test tests both types of streaming in stream_blob - sstables and logstor segments. --- streaming/stream_blob.cc | 3 +- test/cluster/test_tablets2.py | 66 +++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/streaming/stream_blob.cc b/streaming/stream_blob.cc index f1f898b301..d200fec150 100644 --- a/streaming/stream_blob.cc +++ b/streaming/stream_blob.cc @@ -749,6 +749,7 @@ future tablet_stream_files_handler(replica::database& db, if (is_logstor_table) { auto segments = co_await table.take_logstor_snapshot(req.range); + co_await utils::get_local_injector().inject("wait_before_tablet_stream_files_after_snapshot", utils::wait_for_message(std::chrono::seconds(60))); for (auto& seg : segments) { auto& info = files.emplace_back(); info.filename = format("logstor_segment_{}", seg.segment_id); // used only for logging @@ -761,6 +762,7 @@ future tablet_stream_files_handler(replica::database& db, req.ops_id, segments.size(), req.range); } else { auto sstables = co_await table.take_storage_snapshot(req.range); + co_await utils::get_local_injector().inject("wait_before_tablet_stream_files_after_snapshot", utils::wait_for_message(std::chrono::seconds(60))); co_await utils::get_local_injector().inject("order_sstables_for_streaming", [&sstables] (auto& handler) -> future<> { if (sstables.size() == 3) { // make sure the sstables are ordered so that the sstable containing shadowed data is streamed last @@ -818,7 +820,6 @@ future tablet_stream_files_handler(replica::database& db, if (files.empty()) { co_return resp; } - co_await utils::get_local_injector().inject("wait_before_tablet_stream_files_after_snapshot", utils::wait_for_message(std::chrono::seconds(60))); auto ops_start_time = std::chrono::steady_clock::now(); auto files_nr = files.size(); size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard); diff --git a/test/cluster/test_tablets2.py b/test/cluster/test_tablets2.py index 87fd594814..e876184b49 100644 --- a/test/cluster/test_tablets2.py +++ b/test/cluster/test_tablets2.py @@ -1598,6 +1598,72 @@ async def test_drop_table_and_truncate_after_migration(manager: ManagerClient, o logger.info(f"Running {operation} {ks}.test") await cql.run_async(f"{operation} {ks}.test") + +@pytest.mark.asyncio +@pytest.mark.parametrize("streaming_mode", ["sstables_during_snapshot", "sstables_after_snapshot", "logstor_after_snapshot"]) +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_drop_table_during_streaming(manager: ManagerClient, streaming_mode: str): + cmdline = ['--logger-log-level', 'stream_blob=debug'] + cfg = {} + uses_logstor = streaming_mode == "logstor_after_snapshot" + if uses_logstor: + cfg['experimental_features'] = ['logstor'] + + servers = await manager.servers_add(2, cmdline=cmdline, config=cfg) + + await manager.disable_tablet_balancing() + + cql = manager.get_cql() + inj = 'take_storage_snapshot' if streaming_mode == "sstables_during_snapshot" else 'wait_before_tablet_stream_files_after_snapshot' + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: + table_opts = " WITH storage_engine = 'logstor'" if uses_logstor else "" + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c text){table_opts}") + + for k in range(128): + await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, '{k}')") + + tablet_token = 0 + src_host_id = await manager.get_host_id(servers[0].server_id) + dst_host_id = await manager.get_host_id(servers[1].server_id) + replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token) + if replica[0] != src_host_id: + src_host_id, dst_host_id = dst_host_id, src_host_id + servers = [servers[1], servers[0]] + + src_server = servers[0] + if uses_logstor: + await manager.api.logstor_flush(src_server.ip_addr) + else: + await manager.api.keyspace_flush(src_server.ip_addr, ks, "test") + + src_log = await manager.server_open_log(src_server.server_id) + src_mark = await src_log.mark() + + await manager.api.enable_injection(src_server.ip_addr, inj, one_shot=True) + + logger.info("Starting tablet migration with sender paused at %s", inj) + migration_task = asyncio.create_task( + manager.api.move_tablet(servers[0].ip_addr, ks, "test", src_host_id, replica[1], dst_host_id, 0, tablet_token) + ) + + await src_log.wait_for(f'{inj}: waiting for message', from_mark=src_mark) + + logger.info("Dropping table while sender-side tablet stream is paused after snapshot") + drop_task = cql.run_async(f"DROP TABLE {ks}.test") + + await src_log.wait_for(f"Dropping {ks}.test", from_mark=src_mark) + await asyncio.sleep(1) + + await manager.api.message_injection(src_server.ip_addr, inj) + + await drop_task + + try: + await migration_task + except HTTPError as e: + logger.info("Tablet migration failed after drop as expected: %s", e.message) + @pytest.mark.asyncio @pytest.mark.nightly @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')