diff --git a/streaming/stream_blob.cc b/streaming/stream_blob.cc index 0266c44b20..d200fec150 100644 --- a/streaming/stream_blob.cc +++ b/streaming/stream_blob.cc @@ -742,11 +742,14 @@ tablet_stream_files(netw::messaging_service& ms, std::list sou future tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) { stream_files_response resp; auto& table = db.find_column_family(req.table); + auto table_stream_op = table.stream_in_progress(); auto files = std::list(); auto reader = co_await db.obtain_reader_permit(table, "tablet_file_streaming", db::no_timeout, {}); + bool is_logstor_table = table.uses_logstor(); - if (table.uses_logstor()) { + if (is_logstor_table) { auto segments = co_await table.take_logstor_snapshot(req.range); + co_await utils::get_local_injector().inject("wait_before_tablet_stream_files_after_snapshot", utils::wait_for_message(std::chrono::seconds(60))); for (auto& seg : segments) { auto& info = files.emplace_back(); info.filename = format("logstor_segment_{}", seg.segment_id); // used only for logging @@ -759,6 +762,7 @@ future tablet_stream_files_handler(replica::database& db, req.ops_id, segments.size(), req.range); } else { auto sstables = co_await table.take_storage_snapshot(req.range); + co_await utils::get_local_injector().inject("wait_before_tablet_stream_files_after_snapshot", utils::wait_for_message(std::chrono::seconds(60))); co_await utils::get_local_injector().inject("order_sstables_for_streaming", [&sstables] (auto& handler) -> future<> { if (sstables.size() == 3) { // make sure the sstables are ordered so that the sstable containing shadowed data is streamed last @@ -807,20 +811,22 @@ future tablet_stream_files_handler(replica::database& db, // that sstable's content has been fully streamed. sstables.clear(); + // Release the table - we don't need to access it anymore and the files are held by the snapshot. + table_stream_op = {}; + blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}", req.ops_id, sstable_nr, files.size(), files, req.range); } if (files.empty()) { co_return resp; } - co_await utils::get_local_injector().inject("wait_before_tablet_stream_files_after_snapshot", utils::wait_for_message(std::chrono::seconds(60))); auto ops_start_time = std::chrono::steady_clock::now(); auto files_nr = files.size(); size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard); resp.stream_bytes = stream_bytes; auto duration = std::chrono::steady_clock::now() - ops_start_time; blogger.info("stream_{}[{}] Finished sending files_nr={} range={} stream_bytes={} stream_time={} stream_bw={}", - table.uses_logstor() ? "logstor_segments" : "sstables", + is_logstor_table ? "logstor_segments" : "sstables", req.ops_id, files_nr, req.range, stream_bytes, duration, get_bw(stream_bytes, ops_start_time)); co_return resp; } diff --git a/test/cluster/test_tablets2.py b/test/cluster/test_tablets2.py index 87fd594814..e876184b49 100644 --- a/test/cluster/test_tablets2.py +++ b/test/cluster/test_tablets2.py @@ -1598,6 +1598,72 @@ async def test_drop_table_and_truncate_after_migration(manager: ManagerClient, o logger.info(f"Running {operation} {ks}.test") await cql.run_async(f"{operation} {ks}.test") + +@pytest.mark.asyncio +@pytest.mark.parametrize("streaming_mode", ["sstables_during_snapshot", "sstables_after_snapshot", "logstor_after_snapshot"]) +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_drop_table_during_streaming(manager: ManagerClient, streaming_mode: str): + cmdline = ['--logger-log-level', 'stream_blob=debug'] + cfg = {} + uses_logstor = streaming_mode == "logstor_after_snapshot" + if uses_logstor: + cfg['experimental_features'] = ['logstor'] + + servers = await manager.servers_add(2, cmdline=cmdline, config=cfg) + + await manager.disable_tablet_balancing() + + cql = manager.get_cql() + inj = 'take_storage_snapshot' if streaming_mode == "sstables_during_snapshot" else 'wait_before_tablet_stream_files_after_snapshot' + + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: + table_opts = " WITH storage_engine = 'logstor'" if uses_logstor else "" + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c text){table_opts}") + + for k in range(128): + await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, '{k}')") + + tablet_token = 0 + src_host_id = await manager.get_host_id(servers[0].server_id) + dst_host_id = await manager.get_host_id(servers[1].server_id) + replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token) + if replica[0] != src_host_id: + src_host_id, dst_host_id = dst_host_id, src_host_id + servers = [servers[1], servers[0]] + + src_server = servers[0] + if uses_logstor: + await manager.api.logstor_flush(src_server.ip_addr) + else: + await manager.api.keyspace_flush(src_server.ip_addr, ks, "test") + + src_log = await manager.server_open_log(src_server.server_id) + src_mark = await src_log.mark() + + await manager.api.enable_injection(src_server.ip_addr, inj, one_shot=True) + + logger.info("Starting tablet migration with sender paused at %s", inj) + migration_task = asyncio.create_task( + manager.api.move_tablet(servers[0].ip_addr, ks, "test", src_host_id, replica[1], dst_host_id, 0, tablet_token) + ) + + await src_log.wait_for(f'{inj}: waiting for message', from_mark=src_mark) + + logger.info("Dropping table while sender-side tablet stream is paused after snapshot") + drop_task = cql.run_async(f"DROP TABLE {ks}.test") + + await src_log.wait_for(f"Dropping {ks}.test", from_mark=src_mark) + await asyncio.sleep(1) + + await manager.api.message_injection(src_server.ip_addr, inj) + + await drop_task + + try: + await migration_task + except HTTPError as e: + logger.info("Tablet migration failed after drop as expected: %s", e.message) + @pytest.mark.asyncio @pytest.mark.nightly @pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')