Merge '[Backport 2025.3] S3 chunked download source bug fixes' from Scylladb[bot]

- Fix missing negation in the `if` in the background downloading fiber
- Add test to catch this case
- Improve the s3 proxy to inject errors if the same resource requested more than once
- Suppress client retry since retrying the same request when each produces multiple buffers may lead to the same data appear more than once in the buffer deque
- Inject exception from the test to simulate response callback failure in the middle

No need to backport anything since this class in not used yet

- (cherry picked from commit f1d0690194)

- (cherry picked from commit e73b83e039)

- (cherry picked from commit 6d9cec558a)

- (cherry picked from commit ec59fcd5e4)

- (cherry picked from commit c75acd274c)

- (cherry picked from commit d2d69cbc8c)

- (cherry picked from commit e50f247bf1)

- (cherry picked from commit 49e8c14a86)

- (cherry picked from commit a5246bbe53)

- (cherry picked from commit acf15eba8e)

Parent PR: #24657

Closes scylladb/scylladb#24943

* github.com:scylladb/scylladb:
  s3_test: Add s3_client test for non-retryable error handling
  s3_test: Add trace logging for default_retry_strategy
  s3_client: Fix edge case when the range is exhausted
  s3_client: Fix indentation in try..catch block
  s3_client: Stop retries in chunked download source
  s3_client: Enhance test coverage for retry logic
  s3_client: Add test for Content-Range fix
  s3_client: Fix missing negation
  s3_client: Refine logging
  s3_client: Improve logging placement for current_range output
This commit is contained in:
Botond Dénes
2025-07-15 15:28:48 +03:00
5 changed files with 157 additions and 24 deletions

View File

@@ -15,6 +15,7 @@
#include <seastar/core/reactor.hh>
#include <seastar/core/file.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/sleep.hh>
#include <seastar/http/exception.hh>
#include <seastar/util/closeable.hh>
#include <seastar/util/short_streams.hh>
@@ -25,6 +26,8 @@
#include "test/lib/test_utils.hh"
#include "test/lib/tmpdir.hh"
#include "utils/assert.hh"
#include "utils/error_injection.hh"
#include "utils/s3/aws_error.hh"
#include "utils/s3/client.hh"
#include "utils/s3/creds.hh"
#include "utils/s3/utils/manip_s3.hh"
@@ -675,6 +678,91 @@ SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_proxy) {
test_download_data_source(make_proxy_client, true, 3 * 1024);
}
void test_chunked_download_data_source(const client_maker_function& client_maker, size_t object_size) {
const sstring base_name(fmt::format("test_object-{}", ::getpid()));
tmpdir tmp;
const auto file_path = tmp.path() / base_name;
file f = open_file_dma(file_path.native(), open_flags::create | open_flags::wo).get();
auto output = make_file_output_stream(std::move(f)).get();
for (size_t bytes_written = 0; bytes_written < object_size;) {
auto rnd = tests::random::get_bytes(std::min(object_size - bytes_written, 1024ul));
output.write(reinterpret_cast<char*>(rnd.data()), rnd.size()).get();
bytes_written += rnd.size();
}
output.close().get();
testlog.info("Make client\n");
semaphore mem(16 << 20);
auto cln = client_maker(mem);
auto close_client = deferred_close(*cln);
const auto object_name = fmt::format("/{}/{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), base_name);
auto delete_object = deferred_delete_object(cln, object_name);
cln->upload_file(file_path, object_name).get();
testlog.info("Download object");
auto in = input_stream<char>(cln->make_chunked_download_source(object_name, s3::full_range));
auto close = seastar::deferred_close(in);
file rf = open_file_dma(file_path.native(), open_flags::ro).get();
auto file_input = make_file_input_stream(std::move(rf));
auto close_file = seastar::deferred_close(file_input);
size_t total_size = 0;
size_t trigger_counter = 0;
while (true) {
// We want the background fiber to fill the buffer queue and start waiting to drain it
seastar::sleep(100us).get();
auto buf = in.read().get();
total_size += buf.size();
if (buf.empty()) {
break;
}
++trigger_counter;
if (trigger_counter % 10 == 0) {
utils::get_local_injector().enable("break_s3_inflight_req", true);
}
auto file_buf = file_input.read_exactly(buf.size()).get();
BOOST_REQUIRE_EQUAL(memcmp(buf.begin(), file_buf.begin(), buf.size()), 0);
}
BOOST_REQUIRE_EQUAL(total_size, object_size);
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
utils::get_local_injector().enable("kill_s3_inflight_req");
auto in_throw = input_stream<char>(cln->make_chunked_download_source(object_name, s3::full_range));
auto close_throw = seastar::deferred_close(in_throw);
auto reader = [&in_throw] {
while (true) {
auto buf = in_throw.read().get();
if (buf.empty()) {
break;
}
}
};
BOOST_REQUIRE_EXCEPTION(
reader(), storage_io_error, [](const storage_io_error& e) {
return e.what() == "S3 request failed. Code: 16. Reason: "sv;
});
#else
testlog.info("Skipping error injection test, as it requires SCYLLA_ENABLE_ERROR_INJECTION to be enabled");
#endif
cln->delete_object(object_name).get();
cln->close().get();
}
SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_with_delays_minio) {
test_chunked_download_data_source(make_minio_client, 20_MiB);
}
SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_with_delays_proxy) {
test_chunked_download_data_source(make_proxy_client, 20_MiB);
}
void test_object_copy(const client_maker_function& client_maker, size_t chunk_size, size_t chunks) {
const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
const sstring name_copy(fmt::format("/{}/testobject-{}-copy", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));

View File

@@ -48,6 +48,6 @@ custom_args:
bloom_filter_test:
- '-c1'
s3_test:
- '-c2 -m2G --logger-log-level s3=trace --logger-log-level http=trace'
- '-c2 -m2G --logger-log-level s3=trace --logger-log-level http=trace --logger-log-level default_retry_strategy=trace'
run_in_debug:
- logalloc_standard_allocator_segment_pool_backend_test

View File

@@ -57,6 +57,11 @@ class LRUCache:
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)
def remove(self, key: str):
with self.lock:
if key in self.cache:
del self.cache[key]
# Simple proxy between s3 client and minio to randomly inject errors and simulate cases when the request succeeds but the wire got "broken"
def true_or_false():
@@ -187,6 +192,8 @@ class InjectingHandler(BaseHTTPRequestHandler):
policy.error_count += 1
self.respond_with_error(reset_connection=policy.server_should_fail)
else:
# Once the request is successfully processed, we remove the policy from the cache to make following request to the resource being illegible to fail
self.policies.remove(self.path)
self.send_response(response.status_code)
for key, value in response.headers.items():
if key.upper() != 'CONTENT-LENGTH':

View File

@@ -287,6 +287,8 @@ client::group_client& client::find_or_create_client() {
}
throw storage_io_error {EIO, format("S3 request failed with ({})", status)};
} catch (const filler_exception&) {
throw;
} catch (...) {
auto e = std::current_exception();
throw storage_io_error {EIO, format("S3 error ({})", e)};
@@ -1144,12 +1146,19 @@ class client::chunked_download_source final : public seastar::data_source_impl {
_object_name,
current_range);
} else if (_is_contiguous_mode) {
s3l.trace("Setting contiguous download mode for '{}'", _object_name);
current_range = _range;
s3l.trace("Setting contiguous download mode for '{}', range: {}", _object_name, current_range);
} else {
// In non-contiguous mode we download the object in chunks of _max_buffers_size
s3l.trace("Setting ranged download mode for '{}'", _object_name);
current_range = {_range.offset(), std::min(_range.length(), _max_buffers_size - _buffers_size)};
s3l.trace("Setting ranged download mode for '{}', range: {}", _object_name, current_range);
}
if (current_range.length() == 0) {
s3l.trace("Fiber for object '{}' completed downloading, signals EOS and leaving fiber", _object_name);
_buffers.emplace_back(temporary_buffer<char>(), co_await _client->claim_memory(0));
_get_cv.signal();
_is_finished = true;
co_return;
}
req._headers["Range"] = current_range.to_header_string();
s3l.trace("Fiber for object '{}' will make HTTP request within range {}", _object_name, current_range);
@@ -1160,42 +1169,67 @@ class client::chunked_download_source final : public seastar::data_source_impl {
s3l.warn("Fiber for object '{}' failed: {}. Exiting", _object_name, reply._status);
throw httpd::unexpected_status_error(reply._status);
}
if (_range == s3::full_range && reply.get_header("Content-Range").empty()) {
if (_range == s3::full_range && !reply.get_header("Content-Range").empty()) {
auto content_range_header = parse_content_range(reply.get_header("Content-Range"));
_range = range{content_range_header.start, content_range_header.total};
s3l.trace("No range for object '{}' was provided. Setting the range to {} form the Content-Range header",
s3l.trace("No range for object '{}' was provided. Setting the range to {} from the Content-Range header",
_object_name,
_range);
}
auto in = std::move(in_);
while (_buffers_size < _max_buffers_size && !_is_finished) {
auto start = s3_clock::now();
s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range);
auto buf = co_await in.read();
auto buff_size = buf.size();
gc.read_stats.update(buff_size, s3_clock::now() - start);
_range += buff_size;
_buffers_size += buff_size;
if (buff_size == 0 && _range.length() == 0) {
s3l.trace("Fiber for object '{}' signals EOS", _object_name);
std::exception_ptr ex;
try {
while (_buffers_size < _max_buffers_size && !_is_finished) {
utils::get_local_injector().inject("kill_s3_inflight_req", [] {
// Inject non-retryable error to emulate source failure
throw aws::aws_exception(aws::aws_error::get_errors().at("ResourceNotFound"));
});
auto start = s3_clock::now();
s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range);
auto buf = co_await in.read();
auto buff_size = buf.size();
gc.read_stats.update(buff_size, s3_clock::now() - start);
_range += buff_size;
_buffers_size += buff_size;
if (buff_size == 0 && _range.length() == 0) {
s3l.trace("Fiber for object '{}' signals EOS", _object_name);
_buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size));
_get_cv.signal();
_is_finished = true;
break;
}
if (buff_size == 0) {
// The requested range is fully downloaded
break;
}
s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size);
_buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size));
_get_cv.signal();
_is_finished = true;
break;
utils::get_local_injector().inject("break_s3_inflight_req", [] {
// Inject retryable error after some data was already downloaded
throw aws::aws_exception(aws::aws_error::get_errors().at("ThrottlingException"));
});
}
if (buff_size == 0) {
// The requested range is fully downloaded
break;
}
s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size);
_buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size));
_get_cv.signal();
} catch (...) {
ex = std::current_exception();
}
co_await in.close();
if (ex) {
try {
std::rethrow_exception(ex);
} catch (const aws::aws_exception& aws_ex) {
if (aws_ex.error().is_retryable()) {
throw filler_exception(format("{}", std::current_exception()).c_str());
}
throw;
}
}
},
{},
_as);
_is_contiguous_mode = _buffers_size < _max_buffers_size * _buffers_high_watermark;
} catch (const filler_exception& ex) {
s3l.warn("Fiber for object '{}' experienced an error in buffer filling loop. Reason: {}. Re-issuing the request", _object_name, ex);
} catch (...) {
s3l.trace("Fiber for object '{}' failed: {}, exiting", _object_name, std::current_exception());
_get_cv.broken(std::current_exception());

View File

@@ -90,6 +90,10 @@ struct stats {
std::time_t last_modified;
};
struct filler_exception final : std::runtime_error {
explicit filler_exception(const char* msg) : std::runtime_error(msg) {}
};
future<> ignore_reply(const http::reply& rep, input_stream<char>&& in_);
[[noreturn]] void map_s3_client_exception(std::exception_ptr ex);