Merge 'compaction_test: Make compaction tests backend‑agnostic and add S3/GCS support' from Ernest Zaslavsky

This series updates the storage abstraction and extends the compaction tests to support object‑storage backends (S3 and GCS), while tightening several parts of the test environment.

The changes include:

- New exists/object_exists helpers across storage backends and clock fixes in the S3 client to make signature generation stable under test conditions.

- A new get_storage_for_tests accessor and adjustments to the test environment to avoid premature teardown of the sstable registry.

- Refactoring of compaction tests to remove direct sstable access, ensure proper schema setup, and avoid use of moved‑from objects.

- Extraction of test_env‑based logic into reusable functions and addition of S3/GCS variants of the compaction tests.

Not all tests were converted to be backend‑agnostic yet, and a few require further investigation before they can run cleanly against S3/GCS backends. These will be addressed in follow‑up work.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-704 however, followup is needed

No backport needed since this change targeting future feature

Closes scylladb/scylladb#28790

* github.com:scylladb/scylladb:
  compaction_test: fix formatting after previous patches
  compaction_test: add S3/GCS variations to tests
  compaction_test: extract test_env-based tests into functions
  compaction_test: replace file_exists with storage::exists
  compaction_test: initialize tables with schema via make_table_for_tests
  compaction_test: use sstable APIs to manipulate component files
  compaction_test: fix use-after-move issue
  sstable_utils: add `get_storage` and `open_file` helpers
  test_env: delay unplugging sstable registry
  storage: add `exists` method to storage abstraction
  s3_client: use lowres_system_clock for aws_sigv4
  s3_client: add `object_exists` helper
  gcs_client: add `object_exists` helper
This commit is contained in:
Pavel Emelyanov
2026-04-07 13:53:48 +03:00
14 changed files with 4511 additions and 3403 deletions

View File

@@ -96,6 +96,9 @@ public:
data_source make_download_source(object_name name, abort_source* as) override {
return _client->make_chunked_download_source(name.str(), s3::full_range, as);
}
future<bool> object_exists(object_name name, abort_source* as) override {
return _client->object_exists(name.str(), as);
}
abstract_lister make_object_lister(std::string bucket, std::string prefix, lister::filter_type filter) override {
return abstract_lister::make<s3::client::bucket_lister>(_client, std::move(bucket), std::move(prefix), std::move(filter));
}
@@ -170,6 +173,9 @@ public:
data_source make_download_source(object_name name, abort_source* as) override {
return _client->create_download_source(name.bucket(), name.object(), as);
}
future<bool> object_exists(object_name name, abort_source* as) override {
return _client->object_exists(name.bucket(), name.object(), as);
}
abstract_lister make_object_lister(std::string bucket, std::string prefix, lister::filter_type filter) override {
class list_impl : public abstract_lister::impl {
shared_ptr<gcp::storage::client> _client;

View File

@@ -76,6 +76,7 @@ public:
virtual data_sink make_data_upload_sink(object_name, std::optional<unsigned> max_parts_per_piece, abort_source* = nullptr) = 0;
virtual data_sink make_upload_sink(object_name, abort_source* = nullptr) = 0;
virtual data_source make_download_source(object_name, abort_source* = nullptr) = 0;
virtual future<bool> object_exists(object_name name, abort_source* as = nullptr) = 0;
virtual abstract_lister make_object_lister(std::string bucket, std::string prefix, lister::filter_type) = 0;

View File

@@ -109,6 +109,9 @@ public:
virtual future<> unlink_component(const sstable& sst, component_type) noexcept override;
virtual sstring prefix() const override { return _dir.native(); }
future<bool> exists(const sstable& sst, component_type type) const override {
return file_exists(sst.get_filename(type).format());
}
};
future<data_sink> filesystem_storage::make_data_or_index_sink(sstable& sst, component_type type) {
@@ -667,6 +670,10 @@ public:
return std::visit([] (const auto& v) { return fmt::to_string(v); }, _location);
}
future<bool> exists(const sstable& sst, component_type type) const override {
return _client->object_exists(make_object_name(sst, type), abort_source());
}
future<> put_object(object_name name, ::memory_data_sink_buffers bufs) {
return _client->put_object(std::move(name), std::move(bufs), abort_source());
}

View File

@@ -124,6 +124,7 @@ public:
virtual future<> unlink_component(const sstable& sst, component_type) noexcept = 0;
virtual sstring prefix() const = 0;
virtual future<bool> exists(const sstable& sst, component_type type) const = 0;
};
std::unique_ptr<sstables::storage> make_storage(sstables_manager& manager, const data_dictionary::storage_options& s_opts, sstable_state state);

File diff suppressed because it is too large Load Diff

View File

@@ -32,7 +32,7 @@ custom_args:
sstable_datafile_test:
- '-c1 -m2G'
sstable_compaction_test:
- '-c1 -m2G --logger-log-level compaction=debug --logger-log-level compaction_manager=debug'
- '-c1 -m2G --logger-log-level compaction=debug --logger-log-level compaction_manager=debug --logger-log-level s3=debug --logger-log-level gcp_storage=debug'
sstable_3_x_test:
- '-c1 -m2G'
cql_query_test:

View File

@@ -226,6 +226,14 @@ public:
void set_digest(std::optional<uint32_t> digest) {
_sst->_components->digest = digest;
}
storage& get_storage() {
return *_sst->_storage;
}
future<file> open_file(component_type type, open_flags flags, file_open_options opts) {
return _sst->open_file(type, flags, opts);
}
};
inline auto replacer_fn_no_op() {

View File

@@ -301,6 +301,7 @@ future<> test_env::stop() {
}
}
co_await _impl->mgr.close();
_impl->mgr.unplug_sstables_registry();
co_await _impl->semaphore.stop();
}
@@ -366,7 +367,6 @@ future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, te
test_env env(std::move(cfg), *scf, &sstm.local());
auto close_env = defer([&] { env.stop().get(); });
env.manager().plug_sstables_registry(std::make_unique<mock_sstables_registry>());
auto unplu = defer([&env] { env.manager().unplug_sstables_registry(); });
func(env);
});
}

View File

@@ -10,7 +10,6 @@
#include "utils/aws_sigv4.hh"
#include "utils/hashers.hh"
#include "bytes.hh"
#include "db_clock.hh"
using namespace std::chrono_literals;
@@ -48,8 +47,8 @@ static std::string apply_sha256(const std::vector<temporary_buffer<char>>& msg)
return to_hex(hasher.finalize());
}
std::string format_time_point(db_clock::time_point tp) {
time_t time_point_repr = db_clock::to_time_t(tp);
std::string format_time_point(lowres_system_clock::time_point tp) {
time_t time_point_repr = lowres_system_clock::to_time_t(tp);
std::string time_point_str;
time_point_str.resize(17);
::tm time_buf;
@@ -61,8 +60,8 @@ std::string format_time_point(db_clock::time_point tp) {
void check_expiry(std::string_view signature_date) {
//FIXME: The default 15min can be changed with X-Amz-Expires header - we should honor it
std::string expiration_str = format_time_point(db_clock::now() - 15min);
std::string validity_str = format_time_point(db_clock::now() + 15min);
std::string expiration_str = format_time_point(lowres_system_clock::now() - 15min);
std::string validity_str = format_time_point(lowres_system_clock::now() + 15min);
if (signature_date < expiration_str) {
throw std::runtime_error(
fmt::format("Signature expired: {} is now earlier than {} (current time - 15 min.)",

View File

@@ -8,7 +8,8 @@
#pragma once
#include "db_clock.hh"
#include <map>
#include <seastar/core/lowres_clock.hh>
// The declared below get_signature() method makes the Signature string for AWS
// authenticated requests as described in [1]. It can be used in two ways.
@@ -33,14 +34,14 @@ namespace aws {
std::string get_signature(std::string_view access_key_id, std::string_view secret_access_key,
std::string_view host, std::string_view canonical_uri, std::string_view method,
std::optional<std::string_view> orig_datestamp, std::string_view signed_headers_str, const std::map<std::string_view, std::string_view>& signed_headers_map,
const std::vector<temporary_buffer<char>>* body_content, std::string_view region, std::string_view service, std::string_view query_string);
const std::vector<seastar::temporary_buffer<char>>* body_content, std::string_view region, std::string_view service, std::string_view query_string);
// Convenience alias not to pass obscure nullptr argument to get_signature()
inline constexpr std::vector<temporary_buffer<char>>* unsigned_content = nullptr;
inline constexpr std::vector<seastar::temporary_buffer<char>>* unsigned_content = nullptr;
// Same for datestamp checking
inline auto omit_datestamp_expiration_check = std::nullopt;
std::string format_time_point(db_clock::time_point tp);
std::string format_time_point(seastar::lowres_system_clock::time_point tp);
} // aws namespace
} // utils namespace

View File

@@ -1156,6 +1156,25 @@ seekable_data_source utils::gcp::storage::client::create_download_source(std::st
return seekable_data_source(std::make_unique<object_data_source>(_impl, bucket, object_name, as));
}
future<bool> storage::client::object_exists(std::string_view bucket, std::string_view object_name, seastar::abort_source* as) const {
gcp_storage.debug("Get object metadata {}:{}", bucket, object_name);
auto path = fmt::format("/storage/v1/b/{}/o/{}", bucket, seastar::http::internal::url_encode(object_name));
try {
auto res = co_await _impl->send_with_retry(path, GCP_OBJECT_SCOPE_READ_ONLY, ""s, ""s, httpclient::method_type::GET, {}, as);
if (res.result() != status_type::ok) {
throw failed_operation(
fmt::format("Could not retrieve object metadata {}:{}: {} ({})", bucket, object_name, res.result(), get_gcp_error_message(res.body())));
}
} catch (const storage_io_error& e) {
if (e.code().value() == ENOENT) {
co_return false;
}
throw;
}
co_return true;
}
future<> utils::gcp::storage::client::close() {
return _impl->close();
}

View File

@@ -154,7 +154,10 @@ namespace utils::gcp::storage {
* Creates a data_source for reading from a named object.
*/
seekable_data_source create_download_source(std::string_view bucket, std::string_view object_name, seastar::abort_source* = nullptr) const;
/**
* Checks if an object exists.
*/
future<bool> object_exists(std::string_view bucket, std::string_view object_name, seastar::abort_source* as = nullptr) const;
/**
* Destroys resources. Must be called before releasing object
*/

View File

@@ -166,7 +166,7 @@ future<> client::authorize(http::request& req) {
}
}
auto time_point_str = utils::aws::format_time_point(db_clock::now());
auto time_point_str = utils::aws::format_time_point(lowres_system_clock::now());
auto time_point_st = time_point_str.substr(0, 8);
req._headers["x-amz-date"] = time_point_str;
req._headers["x-amz-content-sha256"] = "UNSIGNED-PAYLOAD";
@@ -328,7 +328,7 @@ http::experimental::client::reply_handler client::wrap_handler(http::request& re
auto should_retry = possible_error->is_retryable();
if (possible_error->get_error_type() == aws::aws_error_type::REQUEST_TIME_TOO_SKEWED) {
s3l.warn("Request failed with REQUEST_TIME_TOO_SKEWED. Machine time: {}, request timestamp: {}",
utils::aws::format_time_point(db_clock::now()),
utils::aws::format_time_point(lowres_system_clock::now()),
request.get_header("x-amz-date"));
should_retry = utils::http::retryable::yes;
co_await authorize(request);
@@ -447,6 +447,18 @@ future<stats> client::get_object_stats(sstring object_name, seastar::abort_sourc
co_return st;
}
future<bool> client::object_exists(sstring object_name, seastar::abort_source* as) {
try {
co_await get_object_header(object_name, ignore_reply, as);
} catch (const storage_io_error& e) {
if (e.code().value() == ENOENT) {
co_return false;
}
throw;
}
co_return true;
}
static rapidxml::xml_node<>* first_node_of(rapidxml::xml_node<>* root,
std::initializer_list<std::string_view> names) {
SCYLLA_ASSERT(root);

View File

@@ -189,6 +189,7 @@ public:
future<uint64_t> get_object_size(sstring object_name, seastar::abort_source* = nullptr);
future<stats> get_object_stats(sstring object_name, seastar::abort_source* = nullptr);
future<bool> object_exists(sstring object_name, seastar::abort_source* = nullptr);
future<tag_set> get_object_tagging(sstring object_name, seastar::abort_source* = nullptr);
future<> put_object_tagging(sstring object_name, tag_set tagging, seastar::abort_source* = nullptr);
future<> delete_object_tagging(sstring object_name, seastar::abort_source* = nullptr);