From d2d69cbc8c90cd3f863c7ec90bc132e54caaba3d Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Sun, 29 Jun 2025 13:58:03 +0300 Subject: [PATCH] s3_client: Stop retries in chunked download source Disable retries for S3 requests in the chunked download source to prevent duplicate chunks from corrupting the buffer queue. The response handler now throws an exception to bypass the retry strategy, allowing the next range to be attempted cleanly. This exception is only triggered for retryable errors; unretryable ones immediately halt further requests. --- test/boost/s3_test.cc | 9 +++++++++ utils/s3/client.cc | 23 +++++++++++++++++++++++ utils/s3/client.hh | 4 ++++ 3 files changed, 36 insertions(+) diff --git a/test/boost/s3_test.cc b/test/boost/s3_test.cc index 49b018b5a3..fbb452d876 100644 --- a/test/boost/s3_test.cc +++ b/test/boost/s3_test.cc @@ -26,6 +26,7 @@ #include "test/lib/test_utils.hh" #include "test/lib/tmpdir.hh" #include "utils/assert.hh" +#include "utils/error_injection.hh" #include "utils/s3/client.hh" #include "utils/s3/creds.hh" #include "utils/s3/utils/manip_s3.hh" @@ -709,12 +710,20 @@ void test_chunked_download_data_source(const client_maker_function& client_maker auto close_file = seastar::deferred_close(file_input); size_t total_size = 0; + size_t trigger_counter = 0; while (true) { + // We want the background fiber to fill the buffer queue and start waiting to drain it + seastar::sleep(100us).get(); auto buf = in.read().get(); total_size += buf.size(); if (buf.empty()) { break; } + ++trigger_counter; + if (trigger_counter % 10 == 0) { + utils::get_local_injector().enable("break_s3_inflight_req", true); + } + auto file_buf = file_input.read_exactly(buf.size()).get(); BOOST_REQUIRE_EQUAL(memcmp(buf.begin(), file_buf.begin(), buf.size()), 0); } diff --git a/utils/s3/client.cc b/utils/s3/client.cc index fabc62f291..9e328daef6 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -287,6 +287,8 @@ client::group_client& client::find_or_create_client() { } throw storage_io_error {EIO, format("S3 request failed with ({})", status)}; + } catch (const filler_exception&) { + throw; } catch (...) { auto e = std::current_exception(); throw storage_io_error {EIO, format("S3 error ({})", e)}; @@ -1168,6 +1170,8 @@ class client::chunked_download_source final : public seastar::data_source_impl { _range); } auto in = std::move(in_); + std::exception_ptr ex; + try { while (_buffers_size < _max_buffers_size && !_is_finished) { auto start = s3_clock::now(); s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range); @@ -1190,12 +1194,31 @@ class client::chunked_download_source final : public seastar::data_source_impl { s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size); _buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size)); _get_cv.signal(); + utils::get_local_injector().inject("break_s3_inflight_req", [] { + // Inject retryable error after some data was already downloaded + throw aws::aws_exception(aws::aws_error::get_errors().at("ThrottlingException")); + }); + } + } catch (...) { + ex = std::current_exception(); } co_await in.close(); + if (ex) { + try { + std::rethrow_exception(ex); + } catch (const aws::aws_exception& aws_ex) { + if (aws_ex.error().is_retryable()) { + throw filler_exception(format("{}", std::current_exception()).c_str()); + } + throw; + } + } }, {}, _as); _is_contiguous_mode = _buffers_size < _max_buffers_size * _buffers_high_watermark; + } catch (const filler_exception& ex) { + s3l.warn("Fiber for object '{}' experienced an error in buffer filling loop. Reason: {}. Re-issuing the request", _object_name, ex.what()); } catch (...) { s3l.trace("Fiber for object '{}' failed: {}, exiting", _object_name, std::current_exception()); _get_cv.broken(std::current_exception()); diff --git a/utils/s3/client.hh b/utils/s3/client.hh index 3dc2b42452..a3b3495053 100644 --- a/utils/s3/client.hh +++ b/utils/s3/client.hh @@ -90,6 +90,10 @@ struct stats { std::time_t last_modified; }; +struct filler_exception final : std::runtime_error { + explicit filler_exception(const char* msg) : std::runtime_error(msg) {} +}; + future<> ignore_reply(const http::reply& rep, input_stream&& in_); [[noreturn]] void map_s3_client_exception(std::exception_ptr ex);