diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 62c83eb277..70a2603220 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -4281,8 +4281,7 @@ public: {} private: future<> load_metadata() const { - auto metafile = _sst->filename(sstables::component_type::Scylla); - if (!co_await file_exists(fmt::to_string(metafile))) { + if (!co_await _sst->_storage->exists(*_sst, component_type::Scylla)) { // for compatibility with streaming a non-scylla table (no scylla component) co_return; } @@ -4318,15 +4317,18 @@ public: if (load_save_meta) { co_await load_metadata(); } - // now we can open the component file. any extensions applied should write info into metadata - auto f = co_await _sst->open_file(_type, open_flags::wo | open_flags::create, foptions); + // now we can open the component sink. any extensions applied should write info into metadata. + // We use make_component_sink rather than open_file + make_file_output_stream because + // object storage (S3) backends only support writing through upload sinks, not through + // file::write_dma (readable_file is read-only). + auto sink = co_await _sst->_storage->make_component_sink(*_sst, _type, open_flags::wo | open_flags::create, stream_options); // Save back to disk. if (load_save_meta) { co_await save_metadata(); } - co_return co_await make_file_output_stream(std::move(f), stream_options); + co_return output_stream(std::move(sink)); } future close() override { if (_last_component) { diff --git a/test/boost/file_stream_test.cc b/test/boost/file_stream_test.cc index 4bfff9e5c0..5a25889b70 100644 --- a/test/boost/file_stream_test.cc +++ b/test/boost/file_stream_test.cc @@ -12,8 +12,17 @@ #include "test/lib/log.hh" #include "test/lib/sstable_utils.hh" #include "sstables/exceptions.hh" +#include "sstables/sstables.hh" +#include "utils/overloaded_functor.hh" +#include "schema/schema_builder.hh" +#include "test/lib/random_utils.hh" +#include "test/lib/sstable_test_env.hh" +#include "test/lib/test_utils.hh" +#include "test/lib/gcs_fixture.hh" #include +#include +#include #include #include #include @@ -533,3 +542,55 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_compressed) { SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_uncompressed) { test_sstable_stream(compress_sstable::no, corrupt_digest_component, "Digest mismatch"); } + +// Exercises the sstable_stream_sink_impl::output() path with local and object storage. +// Before the fix, output() used open_file() which for S3 returns a readable_file +// (read-only). Writing through it threw std::logic_error("unsupported operation +// on s3 readable file"), breaking tablet migration streaming entirely. +static future<> test_stream_sink_write(sstables::test_env_config cfg) { + return sstables::test_env::do_with_async([](sstables::test_env& env) { + using namespace sstables; + + auto s = schema_builder(this_smp_shard_count(), "ks", "cf") + .with_column("pk", utf8_type, column_kind::partition_key) + .with_column("v", utf8_type) + .build(); + + auto version = get_highest_sstable_version(); + auto format = sstable::format_types::big; + auto generation = env.new_generation(); + auto& mgr = env.manager(); + auto s_opts = env.get_storage_options(); + + std::visit(overloaded_functor { + [&env] (data_dictionary::storage_options::local& o) { o.dir = env.tempdir().path().native(); }, + [&s] (data_dictionary::storage_options::object_storage& o) { o.location = s->id(); }, + }, s_opts.value); + + auto toc_basename = sstable::component_basename( + s->ks_name(), s->cf_name(), version, generation, format, component_type::TOC); + + auto sink = create_stream_sink(s, mgr, s_opts, sstable_state::normal, toc_basename, {}); + + // output() would succeed before the fix (wrapping the read-only file into a stream), + // but the write below would throw std::logic_error("unsupported operation on s3 readable file"). + auto out = sink->output(file_open_options{}, file_output_stream_options{}).get(); + + auto data = tests::random::get_bytes(4096); + out.write(reinterpret_cast(data.data()), data.size()).get(); + out.flush().get(); + out.close().get(); + }, std::move(cfg)); +} + +SEASTAR_TEST_CASE(test_stream_sink_write_local) { + return test_stream_sink_write(sstables::test_env_config{}); +} + +SEASTAR_TEST_CASE(test_stream_sink_write_s3, *boost::unit_test::precondition(tests::has_scylla_test_env)) { + return test_stream_sink_write(sstables::test_env_config{ .storage = make_test_object_storage_options("S3") }); +} + +SEASTAR_FIXTURE_TEST_CASE(test_stream_sink_write_gs, gcs_fixture, *tests::check_run_test_decorator("ENABLE_GCP_STORAGE_TEST", true)) { + return test_stream_sink_write(sstables::test_env_config{ .storage = make_test_object_storage_options("GS") }); +} diff --git a/test/cluster/conftest.py b/test/cluster/conftest.py index 7fe99c692f..dcd67b17fe 100644 --- a/test/cluster/conftest.py +++ b/test/cluster/conftest.py @@ -20,6 +20,7 @@ from pathlib import Path from typing import TYPE_CHECKING from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to from test.pylib.runner import PHASE_REPORT_KEY +from test.cluster.object_store.conftest import make_object_storage from test.pylib.random_tables import RandomTables from test.pylib.skip_types import skip_env from test.pylib.util import unique_name @@ -27,7 +28,7 @@ from test.pylib.manager_client import ManagerClient from test.pylib.async_cql import run_async from test.pylib.scylla_cluster import ScyllaClusterManager, ScyllaVersionDescription, get_scylla_2025_1_description from test.pylib.suite.base import get_testpy_test -from test.pylib.suite.python import add_cql_connection_options +from test.pylib.suite.python import add_cql_connection_options, add_s3_options from test.pylib.encryption_provider import KeyProvider, make_key_provider_factory import logging import pytest @@ -79,6 +80,7 @@ def pytest_addoption(parser): parser.addoption('--manager-api', action='store', help='Manager unix socket path') add_cql_connection_options(parser) + add_s3_options(parser) parser.addoption('--skip-internet-dependent-tests', action='store_true', default=False, help='Skip tests which depend on artifacts from the internet') parser.addoption('--artifacts_dir_url', action='store', type=str, default=None, dest='artifacts_dir_url', @@ -382,3 +384,17 @@ async def key_provider(request, tmpdir, scylla_binary): @pytest.fixture(scope="function") def failure_detector_timeout(build_mode): return 5000 * MODES_TIMEOUT_FACTOR[build_mode] + +@pytest.fixture(params=[None, 's3', 'gs'], ids=['local', 's3', 'gs']) +async def storage(request, pytestconfig, tmpdir): + """Parametrize tests over local / S3 / GCS storage. + + When storage is None the test runs with local (filesystem) storage. + Otherwise the fixture yields an object-storage server handle. + """ + if request.param is None: + yield None + return + + async with make_object_storage(request.param, pytestconfig, tmpdir, request.node.name) as server: + yield server diff --git a/test/cluster/object_store/conftest.py b/test/cluster/object_store/conftest.py index 7fba2c1971..42d283e8a0 100644 --- a/test/cluster/object_store/conftest.py +++ b/test/cluster/object_store/conftest.py @@ -5,6 +5,8 @@ # +from contextlib import asynccontextmanager + import pytest from test.pylib.suite.python import add_s3_options @@ -26,9 +28,9 @@ def pytest_addoption(parser): add_s3_options(parser) -@pytest.fixture(scope="function", params=['s3', 'gs']) -async def object_storage(request, pytestconfig, tmpdir): - if request.param == 'gs': +@asynccontextmanager +async def make_object_storage(kind, pytestconfig, tmpdir, test_name): + if kind == 'gs': server = create_gs_server(tmpdir) else: server = create_s3_server(pytestconfig, tmpdir) @@ -36,7 +38,7 @@ async def object_storage(request, pytestconfig, tmpdir): bucket_created = False try: await server.start() - server.create_test_bucket(request.node.name) + server.create_test_bucket(test_name) bucket_created = True yield server finally: @@ -45,16 +47,13 @@ async def object_storage(request, pytestconfig, tmpdir): await server.stop() +@pytest.fixture(scope="function", params=['s3', 'gs']) +async def object_storage(request, pytestconfig, tmpdir): + async with make_object_storage(request.param, pytestconfig, tmpdir, request.node.name) as server: + yield server + + @pytest.fixture(scope="function") async def s3_storage(request, pytestconfig, tmpdir): - server = create_s3_server(pytestconfig, tmpdir) - bucket_created = False - try: - await server.start() - server.create_test_bucket(request.node.name) - bucket_created = True + async with make_object_storage('s3', pytestconfig, tmpdir, request.node.name) as server: yield server - finally: - if bucket_created: - server.destroy_test_bucket() - await server.stop() diff --git a/test/cluster/test_encryption.py b/test/cluster/test_encryption.py index 6d021e9df9..fbc89fe355 100644 --- a/test/cluster/test_encryption.py +++ b/test/cluster/test_encryption.py @@ -17,6 +17,7 @@ import json import uuid from test.pylib.manager_client import ManagerClient, ServerInfo +from test.pylib.object_storage import format_tuples from test.pylib.util import wait_for_cql_and_get_hosts from test.pylib.tablets import get_all_tablet_replicas @@ -43,12 +44,16 @@ def workdir(): with tempfile.TemporaryDirectory() as tmp_dir: yield tmp_dir -async def test_file_streaming_respects_encryption(manager: ManagerClient, workdir): +async def test_file_streaming_respects_encryption(manager: ManagerClient, storage, workdir): # pylint: disable=missing-function-docstring cfg = { 'tablets_mode_for_new_keyspaces': 'enabled', } + if storage: + cfg['object_storage_endpoints'] = storage.create_endpoint_conf() + cfg['experimental_features'] = ['keyspace-storage-options'] + cmdline = ['--smp=1'] servers = [] servers.append(await manager.server_add(config=cfg, cmdline=cmdline)) @@ -56,7 +61,13 @@ async def test_file_streaming_respects_encryption(manager: ManagerClient, workdi cql = manager.cql await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - cql.execute("CREATE KEYSPACE ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") + ks_cmd = "CREATE KEYSPACE ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}" + if storage: + storage = format_tuples(type=storage.type, + endpoint=storage.address, + bucket=storage.bucket_name) + ks_cmd += f" AND STORAGE = {storage}" + cql.execute(ks_cmd) cql.execute(f"""CREATE TABLE ks.t(pk text primary key) WITH scylla_encryption_options = {{ 'cipher_algorithm' : 'AES/ECB/PKCS5Padding', 'secret_key_strength' : 128,