Merge 'sstables: fix stream sink write failure with S3-backed storage' from Ernest Zaslavsky

During tablet migration with S3-backed storage, every streaming attempt fails with `std::logic_error("unsupported operation on s3 readable file")`. The tablet load balancer continuously retries, and after prolonged failure the cluster degrades — S3 objects start returning 404, write timeouts cascade, and the cluster becomes effectively non-functional.

Two bugs in `sstable_stream_sink_impl`:

1. **`output()` used `open_file()` + `make_file_output_stream()`** to create the writable stream for receiving SSTable data on the follower side. For object storage backends, `open_file()` ignores write flags and always returns a read-only `readable_file`. Writing through it throws `std::logic_error("unsupported operation on s3 readable file")`, breaking all tablet migration streaming with S3-backed storage.

2. **`load_metadata()` used `file_exists()` on a local filesystem path** to check whether the Scylla metadata component exists before loading it. For S3-backed storage this always returns false. When encryption is enabled, each streamed SSTable component gets a fresh encryption key (because the previous key from scylla metadata is never loaded from S3). Only the last key survives, making all other components unreadable — decryption with the wrong key produces garbage, leading to parse failures or OOM crashes on `sst->load()`.

- Replace `open_file()` + `make_file_output_stream()` with `make_component_sink()` + `output_stream()`, which correctly produces an upload sink for S3/GCS and a `file_data_sink` for local filesystem. This is the same mechanism already used by `save_metadata()` in the same class.

- Replace `file_exists(local_path)` with `_sst->_storage->exists()` which correctly checks for the component on whatever backend the SSTable uses.

- C++ unit tests (`test_stream_sink_write_local`, `test_stream_sink_write_s3`, `test_stream_sink_write_gs`) in `file_stream_test.cc` exercise `sstable_stream_sink_impl::output()` with all storage backends.

- Parametrize the existing `test_file_streaming_respects_encryption` Python integration test over local, S3, and GCS backends using a new `storage` fixture in `test/cluster/conftest.py`. The S3/GCS variants exercise the `load_metadata` fix — without it, streaming fails with `malformed_sstable_exception` (decryption with the wrong key) and the migration never completes.

- Add reusable `make_cfg()` and `make_ks_opts()` helpers to `test/cluster/util.py` for building storage-parametrized server config and keyspace CQL options.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1704
Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1829

Backport is not needed since no release supports object-storage-backed keyspaces.

Closes scylladb/scylladb#29698

* github.com:scylladb/scylladb:
  test: parametrize encrypted streaming test with storage backends
  sstables: fix load_metadata to check S3 for scylla metadata component
  test: verify sstable stream sink writes to object storage
  sstables: use make_component_sink for stream sink output
This commit is contained in:
Pavel Emelyanov
2026-06-03 10:21:31 +03:00
5 changed files with 111 additions and 22 deletions

View File

@@ -4281,8 +4281,7 @@ public:
{}
private:
future<> load_metadata() const {
auto metafile = _sst->filename(sstables::component_type::Scylla);
if (!co_await file_exists(fmt::to_string(metafile))) {
if (!co_await _sst->_storage->exists(*_sst, component_type::Scylla)) {
// for compatibility with streaming a non-scylla table (no scylla component)
co_return;
}
@@ -4318,15 +4317,18 @@ public:
if (load_save_meta) {
co_await load_metadata();
}
// now we can open the component file. any extensions applied should write info into metadata
auto f = co_await _sst->open_file(_type, open_flags::wo | open_flags::create, foptions);
// now we can open the component sink. any extensions applied should write info into metadata.
// We use make_component_sink rather than open_file + make_file_output_stream because
// object storage (S3) backends only support writing through upload sinks, not through
// file::write_dma (readable_file is read-only).
auto sink = co_await _sst->_storage->make_component_sink(*_sst, _type, open_flags::wo | open_flags::create, stream_options);
// Save back to disk.
if (load_save_meta) {
co_await save_metadata();
}
co_return co_await make_file_output_stream(std::move(f), stream_options);
co_return output_stream<char>(std::move(sink));
}
future<shared_sstable> close() override {
if (_last_component) {

View File

@@ -12,8 +12,17 @@
#include "test/lib/log.hh"
#include "test/lib/sstable_utils.hh"
#include "sstables/exceptions.hh"
#include "sstables/sstables.hh"
#include "utils/overloaded_functor.hh"
#include "schema/schema_builder.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/sstable_test_env.hh"
#include "test/lib/test_utils.hh"
#include "test/lib/gcs_fixture.hh"
#include <boost/lexical_cast.hpp>
#include <seastar/testing/test_case.hh>
#include <seastar/testing/test_fixture.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/short_streams.hh>
#include <seastar/core/seastar.hh>
@@ -533,3 +542,55 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_compressed) {
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_uncompressed) {
test_sstable_stream(compress_sstable::no, corrupt_digest_component, "Digest mismatch");
}
// Exercises the sstable_stream_sink_impl::output() path with local and object storage.
// Before the fix, output() used open_file() which for S3 returns a readable_file
// (read-only). Writing through it threw std::logic_error("unsupported operation
// on s3 readable file"), breaking tablet migration streaming entirely.
static future<> test_stream_sink_write(sstables::test_env_config cfg) {
return sstables::test_env::do_with_async([](sstables::test_env& env) {
using namespace sstables;
auto s = schema_builder(this_smp_shard_count(), "ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("v", utf8_type)
.build();
auto version = get_highest_sstable_version();
auto format = sstable::format_types::big;
auto generation = env.new_generation();
auto& mgr = env.manager();
auto s_opts = env.get_storage_options();
std::visit(overloaded_functor {
[&env] (data_dictionary::storage_options::local& o) { o.dir = env.tempdir().path().native(); },
[&s] (data_dictionary::storage_options::object_storage& o) { o.location = s->id(); },
}, s_opts.value);
auto toc_basename = sstable::component_basename(
s->ks_name(), s->cf_name(), version, generation, format, component_type::TOC);
auto sink = create_stream_sink(s, mgr, s_opts, sstable_state::normal, toc_basename, {});
// output() would succeed before the fix (wrapping the read-only file into a stream),
// but the write below would throw std::logic_error("unsupported operation on s3 readable file").
auto out = sink->output(file_open_options{}, file_output_stream_options{}).get();
auto data = tests::random::get_bytes(4096);
out.write(reinterpret_cast<const char*>(data.data()), data.size()).get();
out.flush().get();
out.close().get();
}, std::move(cfg));
}
SEASTAR_TEST_CASE(test_stream_sink_write_local) {
return test_stream_sink_write(sstables::test_env_config{});
}
SEASTAR_TEST_CASE(test_stream_sink_write_s3, *boost::unit_test::precondition(tests::has_scylla_test_env)) {
return test_stream_sink_write(sstables::test_env_config{ .storage = make_test_object_storage_options("S3") });
}
SEASTAR_FIXTURE_TEST_CASE(test_stream_sink_write_gs, gcs_fixture, *tests::check_run_test_decorator("ENABLE_GCP_STORAGE_TEST", true)) {
return test_stream_sink_write(sstables::test_env_config{ .storage = make_test_object_storage_options("GS") });
}

View File

@@ -20,6 +20,7 @@ from pathlib import Path
from typing import TYPE_CHECKING
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
from test.pylib.runner import PHASE_REPORT_KEY
from test.cluster.object_store.conftest import make_object_storage
from test.pylib.random_tables import RandomTables
from test.pylib.skip_types import skip_env
from test.pylib.util import unique_name
@@ -27,7 +28,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.async_cql import run_async
from test.pylib.scylla_cluster import ScyllaClusterManager, ScyllaVersionDescription, get_scylla_2025_1_description
from test.pylib.suite.base import get_testpy_test
from test.pylib.suite.python import add_cql_connection_options
from test.pylib.suite.python import add_cql_connection_options, add_s3_options
from test.pylib.encryption_provider import KeyProvider, make_key_provider_factory
import logging
import pytest
@@ -79,6 +80,7 @@ def pytest_addoption(parser):
parser.addoption('--manager-api', action='store',
help='Manager unix socket path')
add_cql_connection_options(parser)
add_s3_options(parser)
parser.addoption('--skip-internet-dependent-tests', action='store_true', default=False,
help='Skip tests which depend on artifacts from the internet')
parser.addoption('--artifacts_dir_url', action='store', type=str, default=None, dest='artifacts_dir_url',
@@ -382,3 +384,17 @@ async def key_provider(request, tmpdir, scylla_binary):
@pytest.fixture(scope="function")
def failure_detector_timeout(build_mode):
return 5000 * MODES_TIMEOUT_FACTOR[build_mode]
@pytest.fixture(params=[None, 's3', 'gs'], ids=['local', 's3', 'gs'])
async def storage(request, pytestconfig, tmpdir):
"""Parametrize tests over local / S3 / GCS storage.
When storage is None the test runs with local (filesystem) storage.
Otherwise the fixture yields an object-storage server handle.
"""
if request.param is None:
yield None
return
async with make_object_storage(request.param, pytestconfig, tmpdir, request.node.name) as server:
yield server

View File

@@ -5,6 +5,8 @@
#
from contextlib import asynccontextmanager
import pytest
from test.pylib.suite.python import add_s3_options
@@ -26,9 +28,9 @@ def pytest_addoption(parser):
add_s3_options(parser)
@pytest.fixture(scope="function", params=['s3', 'gs'])
async def object_storage(request, pytestconfig, tmpdir):
if request.param == 'gs':
@asynccontextmanager
async def make_object_storage(kind, pytestconfig, tmpdir, test_name):
if kind == 'gs':
server = create_gs_server(tmpdir)
else:
server = create_s3_server(pytestconfig, tmpdir)
@@ -36,7 +38,7 @@ async def object_storage(request, pytestconfig, tmpdir):
bucket_created = False
try:
await server.start()
server.create_test_bucket(request.node.name)
server.create_test_bucket(test_name)
bucket_created = True
yield server
finally:
@@ -45,16 +47,13 @@ async def object_storage(request, pytestconfig, tmpdir):
await server.stop()
@pytest.fixture(scope="function", params=['s3', 'gs'])
async def object_storage(request, pytestconfig, tmpdir):
async with make_object_storage(request.param, pytestconfig, tmpdir, request.node.name) as server:
yield server
@pytest.fixture(scope="function")
async def s3_storage(request, pytestconfig, tmpdir):
server = create_s3_server(pytestconfig, tmpdir)
bucket_created = False
try:
await server.start()
server.create_test_bucket(request.node.name)
bucket_created = True
async with make_object_storage('s3', pytestconfig, tmpdir, request.node.name) as server:
yield server
finally:
if bucket_created:
server.destroy_test_bucket()
await server.stop()

View File

@@ -17,6 +17,7 @@ import json
import uuid
from test.pylib.manager_client import ManagerClient, ServerInfo
from test.pylib.object_storage import format_tuples
from test.pylib.util import wait_for_cql_and_get_hosts
from test.pylib.tablets import get_all_tablet_replicas
@@ -43,12 +44,16 @@ def workdir():
with tempfile.TemporaryDirectory() as tmp_dir:
yield tmp_dir
async def test_file_streaming_respects_encryption(manager: ManagerClient, workdir):
async def test_file_streaming_respects_encryption(manager: ManagerClient, storage, workdir):
# pylint: disable=missing-function-docstring
cfg = {
'tablets_mode_for_new_keyspaces': 'enabled',
}
if storage:
cfg['object_storage_endpoints'] = storage.create_endpoint_conf()
cfg['experimental_features'] = ['keyspace-storage-options']
cmdline = ['--smp=1']
servers = []
servers.append(await manager.server_add(config=cfg, cmdline=cmdline))
@@ -56,7 +61,13 @@ async def test_file_streaming_respects_encryption(manager: ManagerClient, workdi
cql = manager.cql
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
cql.execute("CREATE KEYSPACE ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
ks_cmd = "CREATE KEYSPACE ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}"
if storage:
storage = format_tuples(type=storage.type,
endpoint=storage.address,
bucket=storage.bucket_name)
ks_cmd += f" AND STORAGE = {storage}"
cql.execute(ks_cmd)
cql.execute(f"""CREATE TABLE ks.t(pk text primary key) WITH scylla_encryption_options = {{
'cipher_algorithm' : 'AES/ECB/PKCS5Padding',
'secret_key_strength' : 128,