diff --git a/test/boost/s3_test.cc b/test/boost/s3_test.cc index 49b018b5a3..fbb452d876 100644 --- a/test/boost/s3_test.cc +++ b/test/boost/s3_test.cc @@ -26,6 +26,7 @@ #include "test/lib/test_utils.hh" #include "test/lib/tmpdir.hh" #include "utils/assert.hh" +#include "utils/error_injection.hh" #include "utils/s3/client.hh" #include "utils/s3/creds.hh" #include "utils/s3/utils/manip_s3.hh" @@ -709,12 +710,20 @@ void test_chunked_download_data_source(const client_maker_function& client_maker auto close_file = seastar::deferred_close(file_input); size_t total_size = 0; + size_t trigger_counter = 0; while (true) { + // We want the background fiber to fill the buffer queue and start waiting to drain it + seastar::sleep(100us).get(); auto buf = in.read().get(); total_size += buf.size(); if (buf.empty()) { break; } + ++trigger_counter; + if (trigger_counter % 10 == 0) { + utils::get_local_injector().enable("break_s3_inflight_req", true); + } + auto file_buf = file_input.read_exactly(buf.size()).get(); BOOST_REQUIRE_EQUAL(memcmp(buf.begin(), file_buf.begin(), buf.size()), 0); } diff --git a/utils/s3/client.cc b/utils/s3/client.cc index fabc62f291..9e328daef6 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -287,6 +287,8 @@ client::group_client& client::find_or_create_client() { } throw storage_io_error {EIO, format("S3 request failed with ({})", status)}; + } catch (const filler_exception&) { + throw; } catch (...) { auto e = std::current_exception(); throw storage_io_error {EIO, format("S3 error ({})", e)}; @@ -1168,6 +1170,8 @@ class client::chunked_download_source final : public seastar::data_source_impl { _range); } auto in = std::move(in_); + std::exception_ptr ex; + try { while (_buffers_size < _max_buffers_size && !_is_finished) { auto start = s3_clock::now(); s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range); @@ -1190,12 +1194,31 @@ class client::chunked_download_source final : public seastar::data_source_impl { s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size); _buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size)); _get_cv.signal(); + utils::get_local_injector().inject("break_s3_inflight_req", [] { + // Inject retryable error after some data was already downloaded + throw aws::aws_exception(aws::aws_error::get_errors().at("ThrottlingException")); + }); + } + } catch (...) { + ex = std::current_exception(); } co_await in.close(); + if (ex) { + try { + std::rethrow_exception(ex); + } catch (const aws::aws_exception& aws_ex) { + if (aws_ex.error().is_retryable()) { + throw filler_exception(format("{}", std::current_exception()).c_str()); + } + throw; + } + } }, {}, _as); _is_contiguous_mode = _buffers_size < _max_buffers_size * _buffers_high_watermark; + } catch (const filler_exception& ex) { + s3l.warn("Fiber for object '{}' experienced an error in buffer filling loop. Reason: {}. Re-issuing the request", _object_name, ex.what()); } catch (...) { s3l.trace("Fiber for object '{}' failed: {}, exiting", _object_name, std::current_exception()); _get_cv.broken(std::current_exception()); diff --git a/utils/s3/client.hh b/utils/s3/client.hh index 3dc2b42452..a3b3495053 100644 --- a/utils/s3/client.hh +++ b/utils/s3/client.hh @@ -90,6 +90,10 @@ struct stats { std::time_t last_modified; }; +struct filler_exception final : std::runtime_error { + explicit filler_exception(const char* msg) : std::runtime_error(msg) {} +}; + future<> ignore_reply(const http::reply& rep, input_stream&& in_); [[noreturn]] void map_s3_client_exception(std::exception_ptr ex);