Merge 'streaming: stream_blob: hold table for streaming' from Michael Litvak

When initializing streaming sources in tablet_stream_files_handler we
use a reference to the table. We should hold the table while doing so,
because otherwise the table may be dropped and destroyed when we yield.
Use the table.stream_in_progress() phaser to hold the table while we
access it.

For sstable file streaming we can release the table after the snapshot
is initialized, and the table may be dropped safely because the files
are held by the snapshot and we don't access the table anymore. There
was a single access to the table for logging but it is replaced by a
pre-calculated variable.

For logstor segment streaming, currently it doesn't support discarding
the segments while they are streamed - when the table is dropped it
discard the segments by overwriting and freeing them, so they shouldn't
be accessed after that. Therefore, in that case continue to hold the
table until streaming is completed.

Fixes [SCYLLADB-1533](https://scylladb.atlassian.net/browse/SCYLLADB-1533)

It's a pre-existing use-after-free issue in sstable file streaming so should be backported to all releases.
It's also made worse with the recent changes of logstor, and affects also non-logstor tables, so the logstor fixes should be in the same release (2026.2).

[SCYLLADB-1533]: https://scylladb.atlassian.net/browse/SCYLLADB-1533?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

Closes scylladb/scylladb#29488

* github.com:scylladb/scylladb:
  test: test drop table during streaming
  streaming: stream_blob: hold table for streaming
This commit is contained in:
Botond Dénes
2026-04-16 12:12:42 +03:00
2 changed files with 75 additions and 3 deletions

View File

@@ -742,11 +742,14 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) {
stream_files_response resp;
auto& table = db.find_column_family(req.table);
auto table_stream_op = table.stream_in_progress();
auto files = std::list<stream_blob_info>();
auto reader = co_await db.obtain_reader_permit(table, "tablet_file_streaming", db::no_timeout, {});
bool is_logstor_table = table.uses_logstor();
if (table.uses_logstor()) {
if (is_logstor_table) {
auto segments = co_await table.take_logstor_snapshot(req.range);
co_await utils::get_local_injector().inject("wait_before_tablet_stream_files_after_snapshot", utils::wait_for_message(std::chrono::seconds(60)));
for (auto& seg : segments) {
auto& info = files.emplace_back();
info.filename = format("logstor_segment_{}", seg.segment_id); // used only for logging
@@ -759,6 +762,7 @@ future<stream_files_response> tablet_stream_files_handler(replica::database& db,
req.ops_id, segments.size(), req.range);
} else {
auto sstables = co_await table.take_storage_snapshot(req.range);
co_await utils::get_local_injector().inject("wait_before_tablet_stream_files_after_snapshot", utils::wait_for_message(std::chrono::seconds(60)));
co_await utils::get_local_injector().inject("order_sstables_for_streaming", [&sstables] (auto& handler) -> future<> {
if (sstables.size() == 3) {
// make sure the sstables are ordered so that the sstable containing shadowed data is streamed last
@@ -807,20 +811,22 @@ future<stream_files_response> tablet_stream_files_handler(replica::database& db,
// that sstable's content has been fully streamed.
sstables.clear();
// Release the table - we don't need to access it anymore and the files are held by the snapshot.
table_stream_op = {};
blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}",
req.ops_id, sstable_nr, files.size(), files, req.range);
}
if (files.empty()) {
co_return resp;
}
co_await utils::get_local_injector().inject("wait_before_tablet_stream_files_after_snapshot", utils::wait_for_message(std::chrono::seconds(60)));
auto ops_start_time = std::chrono::steady_clock::now();
auto files_nr = files.size();
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard);
resp.stream_bytes = stream_bytes;
auto duration = std::chrono::steady_clock::now() - ops_start_time;
blogger.info("stream_{}[{}] Finished sending files_nr={} range={} stream_bytes={} stream_time={} stream_bw={}",
table.uses_logstor() ? "logstor_segments" : "sstables",
is_logstor_table ? "logstor_segments" : "sstables",
req.ops_id, files_nr, req.range, stream_bytes, duration, get_bw(stream_bytes, ops_start_time));
co_return resp;
}

View File

@@ -1598,6 +1598,72 @@ async def test_drop_table_and_truncate_after_migration(manager: ManagerClient, o
logger.info(f"Running {operation} {ks}.test")
await cql.run_async(f"{operation} {ks}.test")
@pytest.mark.asyncio
@pytest.mark.parametrize("streaming_mode", ["sstables_during_snapshot", "sstables_after_snapshot", "logstor_after_snapshot"])
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_drop_table_during_streaming(manager: ManagerClient, streaming_mode: str):
cmdline = ['--logger-log-level', 'stream_blob=debug']
cfg = {}
uses_logstor = streaming_mode == "logstor_after_snapshot"
if uses_logstor:
cfg['experimental_features'] = ['logstor']
servers = await manager.servers_add(2, cmdline=cmdline, config=cfg)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
inj = 'take_storage_snapshot' if streaming_mode == "sstables_during_snapshot" else 'wait_before_tablet_stream_files_after_snapshot'
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
table_opts = " WITH storage_engine = 'logstor'" if uses_logstor else ""
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c text){table_opts}")
for k in range(128):
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, '{k}')")
tablet_token = 0
src_host_id = await manager.get_host_id(servers[0].server_id)
dst_host_id = await manager.get_host_id(servers[1].server_id)
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
if replica[0] != src_host_id:
src_host_id, dst_host_id = dst_host_id, src_host_id
servers = [servers[1], servers[0]]
src_server = servers[0]
if uses_logstor:
await manager.api.logstor_flush(src_server.ip_addr)
else:
await manager.api.keyspace_flush(src_server.ip_addr, ks, "test")
src_log = await manager.server_open_log(src_server.server_id)
src_mark = await src_log.mark()
await manager.api.enable_injection(src_server.ip_addr, inj, one_shot=True)
logger.info("Starting tablet migration with sender paused at %s", inj)
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", src_host_id, replica[1], dst_host_id, 0, tablet_token)
)
await src_log.wait_for(f'{inj}: waiting for message', from_mark=src_mark)
logger.info("Dropping table while sender-side tablet stream is paused after snapshot")
drop_task = cql.run_async(f"DROP TABLE {ks}.test")
await src_log.wait_for(f"Dropping {ks}.test", from_mark=src_mark)
await asyncio.sleep(1)
await manager.api.message_injection(src_server.ip_addr, inj)
await drop_task
try:
await migration_task
except HTTPError as e:
logger.info("Tablet migration failed after drop as expected: %s", e.message)
@pytest.mark.asyncio
@pytest.mark.nightly
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')