From 2b8f90e1071ac9587d590032abaecc4862ffc3c8 Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Wed, 29 Apr 2026 09:50:25 +0300 Subject: [PATCH 1/4] sstables: use make_component_sink for stream sink output sstable_stream_sink_impl::output() used open_file() followed by make_file_output_stream() to create the writable stream. For object storage (S3/GCS) backends, open_file() always returns a readable_file (read-only), so any write through file::dma_write() throws std::logic_error("unsupported operation on s3 readable file"). This broke all tablet migration streaming with S3-backed storage. Replace the open_file() + make_file_output_stream() path with make_component_sink() + output_stream(), which correctly produces an upload sink for object storage and a file_data_sink for local filesystem. This is consistent with how save_metadata() already writes to object storage in the same class. The maybe_wrap_sink() call inside make_component_sink preserves file_io_extensions support (e.g., encryption). --- sstables/sstables.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 62c83eb277..4f045e7513 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -4318,15 +4318,18 @@ public: if (load_save_meta) { co_await load_metadata(); } - // now we can open the component file. any extensions applied should write info into metadata - auto f = co_await _sst->open_file(_type, open_flags::wo | open_flags::create, foptions); + // now we can open the component sink. any extensions applied should write info into metadata. + // We use make_component_sink rather than open_file + make_file_output_stream because + // object storage (S3) backends only support writing through upload sinks, not through + // file::write_dma (readable_file is read-only). + auto sink = co_await _sst->_storage->make_component_sink(*_sst, _type, open_flags::wo | open_flags::create, stream_options); // Save back to disk. if (load_save_meta) { co_await save_metadata(); } - co_return co_await make_file_output_stream(std::move(f), stream_options); + co_return output_stream(std::move(sink)); } future close() override { if (_last_component) { From a3d07a468fa8716fc30f5afe1fc287b40e018bec Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Wed, 29 Apr 2026 09:50:51 +0300 Subject: [PATCH 2/4] test: verify sstable stream sink writes to object storage Add test_stream_sink_write_local, test_stream_sink_write_s3, and test_stream_sink_write_gs that exercise sstable_stream_sink_impl:: output() with local filesystem and S3/GCS backends respectively. This is the code path used by stream_blob during tablet migration streaming. The local variant ensures the fix did not regress filesystem storage. The S3/GCS variants would throw std::logic_error before the fix. --- test/boost/file_stream_test.cc | 61 ++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/test/boost/file_stream_test.cc b/test/boost/file_stream_test.cc index 4bfff9e5c0..5a25889b70 100644 --- a/test/boost/file_stream_test.cc +++ b/test/boost/file_stream_test.cc @@ -12,8 +12,17 @@ #include "test/lib/log.hh" #include "test/lib/sstable_utils.hh" #include "sstables/exceptions.hh" +#include "sstables/sstables.hh" +#include "utils/overloaded_functor.hh" +#include "schema/schema_builder.hh" +#include "test/lib/random_utils.hh" +#include "test/lib/sstable_test_env.hh" +#include "test/lib/test_utils.hh" +#include "test/lib/gcs_fixture.hh" #include +#include +#include #include #include #include @@ -533,3 +542,55 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_compressed) { SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_uncompressed) { test_sstable_stream(compress_sstable::no, corrupt_digest_component, "Digest mismatch"); } + +// Exercises the sstable_stream_sink_impl::output() path with local and object storage. +// Before the fix, output() used open_file() which for S3 returns a readable_file +// (read-only). Writing through it threw std::logic_error("unsupported operation +// on s3 readable file"), breaking tablet migration streaming entirely. +static future<> test_stream_sink_write(sstables::test_env_config cfg) { + return sstables::test_env::do_with_async([](sstables::test_env& env) { + using namespace sstables; + + auto s = schema_builder(this_smp_shard_count(), "ks", "cf") + .with_column("pk", utf8_type, column_kind::partition_key) + .with_column("v", utf8_type) + .build(); + + auto version = get_highest_sstable_version(); + auto format = sstable::format_types::big; + auto generation = env.new_generation(); + auto& mgr = env.manager(); + auto s_opts = env.get_storage_options(); + + std::visit(overloaded_functor { + [&env] (data_dictionary::storage_options::local& o) { o.dir = env.tempdir().path().native(); }, + [&s] (data_dictionary::storage_options::object_storage& o) { o.location = s->id(); }, + }, s_opts.value); + + auto toc_basename = sstable::component_basename( + s->ks_name(), s->cf_name(), version, generation, format, component_type::TOC); + + auto sink = create_stream_sink(s, mgr, s_opts, sstable_state::normal, toc_basename, {}); + + // output() would succeed before the fix (wrapping the read-only file into a stream), + // but the write below would throw std::logic_error("unsupported operation on s3 readable file"). + auto out = sink->output(file_open_options{}, file_output_stream_options{}).get(); + + auto data = tests::random::get_bytes(4096); + out.write(reinterpret_cast(data.data()), data.size()).get(); + out.flush().get(); + out.close().get(); + }, std::move(cfg)); +} + +SEASTAR_TEST_CASE(test_stream_sink_write_local) { + return test_stream_sink_write(sstables::test_env_config{}); +} + +SEASTAR_TEST_CASE(test_stream_sink_write_s3, *boost::unit_test::precondition(tests::has_scylla_test_env)) { + return test_stream_sink_write(sstables::test_env_config{ .storage = make_test_object_storage_options("S3") }); +} + +SEASTAR_FIXTURE_TEST_CASE(test_stream_sink_write_gs, gcs_fixture, *tests::check_run_test_decorator("ENABLE_GCP_STORAGE_TEST", true)) { + return test_stream_sink_write(sstables::test_env_config{ .storage = make_test_object_storage_options("GS") }); +} From 54571ffacbafd8210174b5df909bac002eeb3f60 Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Tue, 5 May 2026 16:29:57 +0300 Subject: [PATCH 3/4] sstables: fix load_metadata to check S3 for scylla metadata component MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit sstable_stream_sink_impl::load_metadata() used file_exists() on a local filesystem path to check whether the Scylla metadata component exists. For S3-backed storage this always returns false because the file lives on object storage, not locally. This means that when streaming encrypted SSTables, each component gets a fresh encryption key (because the previous key from scylla metadata is never loaded). Only the last key survives in the metadata on S3, so all other components become unreadable — decryption with the wrong key produces garbage, leading to parse failures or OOM crashes on sst->load(). Replace file_exists() with _sst->_storage->exists() which correctly checks for the component on whatever backend the SSTable uses (local filesystem or object storage). Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1704 --- sstables/sstables.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 4f045e7513..70a2603220 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -4281,8 +4281,7 @@ public: {} private: future<> load_metadata() const { - auto metafile = _sst->filename(sstables::component_type::Scylla); - if (!co_await file_exists(fmt::to_string(metafile))) { + if (!co_await _sst->_storage->exists(*_sst, component_type::Scylla)) { // for compatibility with streaming a non-scylla table (no scylla component) co_return; } From a651f7eef04c9d7c18b59bfc3ed6f54e3067d9ce Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Tue, 5 May 2026 16:30:14 +0300 Subject: [PATCH 4/4] test: parametrize encrypted streaming test with storage backends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a `storage` fixture to test/cluster/conftest.py that parametrizes tests over local filesystem, S3 (MinIO), and GCS backends. When S3 is selected the fixture reuses the global MinIO instance started by test.py (via environment variables) or falls back to a per-test MinioWrapper. For GCS it starts a local fake-gcs-server container. Add make_cfg() and make_ks_opts() helpers to test/cluster/util.py for building server config and keyspace CQL options with optional object storage — reusable by any tablet test that needs storage parametrization. Parametrize test_file_streaming_respects_encryption with the new fixture so it runs on all three backends. The local variant is the original test, the S3/GCS variants exercise the load_metadata fix (SCYLLADB-1704). --- test/cluster/conftest.py | 18 +++++++++++++++++- test/cluster/object_store/conftest.py | 27 +++++++++++++-------------- test/cluster/test_encryption.py | 15 +++++++++++++-- 3 files changed, 43 insertions(+), 17 deletions(-) diff --git a/test/cluster/conftest.py b/test/cluster/conftest.py index 7fe99c692f..dcd67b17fe 100644 --- a/test/cluster/conftest.py +++ b/test/cluster/conftest.py @@ -20,6 +20,7 @@ from pathlib import Path from typing import TYPE_CHECKING from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to from test.pylib.runner import PHASE_REPORT_KEY +from test.cluster.object_store.conftest import make_object_storage from test.pylib.random_tables import RandomTables from test.pylib.skip_types import skip_env from test.pylib.util import unique_name @@ -27,7 +28,7 @@ from test.pylib.manager_client import ManagerClient from test.pylib.async_cql import run_async from test.pylib.scylla_cluster import ScyllaClusterManager, ScyllaVersionDescription, get_scylla_2025_1_description from test.pylib.suite.base import get_testpy_test -from test.pylib.suite.python import add_cql_connection_options +from test.pylib.suite.python import add_cql_connection_options, add_s3_options from test.pylib.encryption_provider import KeyProvider, make_key_provider_factory import logging import pytest @@ -79,6 +80,7 @@ def pytest_addoption(parser): parser.addoption('--manager-api', action='store', help='Manager unix socket path') add_cql_connection_options(parser) + add_s3_options(parser) parser.addoption('--skip-internet-dependent-tests', action='store_true', default=False, help='Skip tests which depend on artifacts from the internet') parser.addoption('--artifacts_dir_url', action='store', type=str, default=None, dest='artifacts_dir_url', @@ -382,3 +384,17 @@ async def key_provider(request, tmpdir, scylla_binary): @pytest.fixture(scope="function") def failure_detector_timeout(build_mode): return 5000 * MODES_TIMEOUT_FACTOR[build_mode] + +@pytest.fixture(params=[None, 's3', 'gs'], ids=['local', 's3', 'gs']) +async def storage(request, pytestconfig, tmpdir): + """Parametrize tests over local / S3 / GCS storage. + + When storage is None the test runs with local (filesystem) storage. + Otherwise the fixture yields an object-storage server handle. + """ + if request.param is None: + yield None + return + + async with make_object_storage(request.param, pytestconfig, tmpdir, request.node.name) as server: + yield server diff --git a/test/cluster/object_store/conftest.py b/test/cluster/object_store/conftest.py index 7fba2c1971..42d283e8a0 100644 --- a/test/cluster/object_store/conftest.py +++ b/test/cluster/object_store/conftest.py @@ -5,6 +5,8 @@ # +from contextlib import asynccontextmanager + import pytest from test.pylib.suite.python import add_s3_options @@ -26,9 +28,9 @@ def pytest_addoption(parser): add_s3_options(parser) -@pytest.fixture(scope="function", params=['s3', 'gs']) -async def object_storage(request, pytestconfig, tmpdir): - if request.param == 'gs': +@asynccontextmanager +async def make_object_storage(kind, pytestconfig, tmpdir, test_name): + if kind == 'gs': server = create_gs_server(tmpdir) else: server = create_s3_server(pytestconfig, tmpdir) @@ -36,7 +38,7 @@ async def object_storage(request, pytestconfig, tmpdir): bucket_created = False try: await server.start() - server.create_test_bucket(request.node.name) + server.create_test_bucket(test_name) bucket_created = True yield server finally: @@ -45,16 +47,13 @@ async def object_storage(request, pytestconfig, tmpdir): await server.stop() +@pytest.fixture(scope="function", params=['s3', 'gs']) +async def object_storage(request, pytestconfig, tmpdir): + async with make_object_storage(request.param, pytestconfig, tmpdir, request.node.name) as server: + yield server + + @pytest.fixture(scope="function") async def s3_storage(request, pytestconfig, tmpdir): - server = create_s3_server(pytestconfig, tmpdir) - bucket_created = False - try: - await server.start() - server.create_test_bucket(request.node.name) - bucket_created = True + async with make_object_storage('s3', pytestconfig, tmpdir, request.node.name) as server: yield server - finally: - if bucket_created: - server.destroy_test_bucket() - await server.stop() diff --git a/test/cluster/test_encryption.py b/test/cluster/test_encryption.py index 6d021e9df9..fbc89fe355 100644 --- a/test/cluster/test_encryption.py +++ b/test/cluster/test_encryption.py @@ -17,6 +17,7 @@ import json import uuid from test.pylib.manager_client import ManagerClient, ServerInfo +from test.pylib.object_storage import format_tuples from test.pylib.util import wait_for_cql_and_get_hosts from test.pylib.tablets import get_all_tablet_replicas @@ -43,12 +44,16 @@ def workdir(): with tempfile.TemporaryDirectory() as tmp_dir: yield tmp_dir -async def test_file_streaming_respects_encryption(manager: ManagerClient, workdir): +async def test_file_streaming_respects_encryption(manager: ManagerClient, storage, workdir): # pylint: disable=missing-function-docstring cfg = { 'tablets_mode_for_new_keyspaces': 'enabled', } + if storage: + cfg['object_storage_endpoints'] = storage.create_endpoint_conf() + cfg['experimental_features'] = ['keyspace-storage-options'] + cmdline = ['--smp=1'] servers = [] servers.append(await manager.server_add(config=cfg, cmdline=cmdline)) @@ -56,7 +61,13 @@ async def test_file_streaming_respects_encryption(manager: ManagerClient, workdi cql = manager.cql await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - cql.execute("CREATE KEYSPACE ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") + ks_cmd = "CREATE KEYSPACE ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}" + if storage: + storage = format_tuples(type=storage.type, + endpoint=storage.address, + bucket=storage.bucket_name) + ks_cmd += f" AND STORAGE = {storage}" + cql.execute(ks_cmd) cql.execute(f"""CREATE TABLE ks.t(pk text primary key) WITH scylla_encryption_options = {{ 'cipher_algorithm' : 'AES/ECB/PKCS5Padding', 'secret_key_strength' : 128,