mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
s3_client: Stop retries in chunked download source
Disable retries for S3 requests in the chunked download source to prevent duplicate chunks from corrupting the buffer queue. The response handler now throws an exception to bypass the retry strategy, allowing the next range to be attempted cleanly. This exception is only triggered for retryable errors; unretryable ones immediately halt further requests.
This commit is contained in:
@@ -26,6 +26,7 @@
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/s3/client.hh"
|
||||
#include "utils/s3/creds.hh"
|
||||
#include "utils/s3/utils/manip_s3.hh"
|
||||
@@ -709,12 +710,20 @@ void test_chunked_download_data_source(const client_maker_function& client_maker
|
||||
auto close_file = seastar::deferred_close(file_input);
|
||||
|
||||
size_t total_size = 0;
|
||||
size_t trigger_counter = 0;
|
||||
while (true) {
|
||||
// We want the background fiber to fill the buffer queue and start waiting to drain it
|
||||
seastar::sleep(100us).get();
|
||||
auto buf = in.read().get();
|
||||
total_size += buf.size();
|
||||
if (buf.empty()) {
|
||||
break;
|
||||
}
|
||||
++trigger_counter;
|
||||
if (trigger_counter % 10 == 0) {
|
||||
utils::get_local_injector().enable("break_s3_inflight_req", true);
|
||||
}
|
||||
|
||||
auto file_buf = file_input.read_exactly(buf.size()).get();
|
||||
BOOST_REQUIRE_EQUAL(memcmp(buf.begin(), file_buf.begin(), buf.size()), 0);
|
||||
}
|
||||
|
||||
@@ -287,6 +287,8 @@ client::group_client& client::find_or_create_client() {
|
||||
}
|
||||
|
||||
throw storage_io_error {EIO, format("S3 request failed with ({})", status)};
|
||||
} catch (const filler_exception&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
auto e = std::current_exception();
|
||||
throw storage_io_error {EIO, format("S3 error ({})", e)};
|
||||
@@ -1168,6 +1170,8 @@ class client::chunked_download_source final : public seastar::data_source_impl {
|
||||
_range);
|
||||
}
|
||||
auto in = std::move(in_);
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
while (_buffers_size < _max_buffers_size && !_is_finished) {
|
||||
auto start = s3_clock::now();
|
||||
s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range);
|
||||
@@ -1190,12 +1194,31 @@ class client::chunked_download_source final : public seastar::data_source_impl {
|
||||
s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size);
|
||||
_buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size));
|
||||
_get_cv.signal();
|
||||
utils::get_local_injector().inject("break_s3_inflight_req", [] {
|
||||
// Inject retryable error after some data was already downloaded
|
||||
throw aws::aws_exception(aws::aws_error::get_errors().at("ThrottlingException"));
|
||||
});
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await in.close();
|
||||
if (ex) {
|
||||
try {
|
||||
std::rethrow_exception(ex);
|
||||
} catch (const aws::aws_exception& aws_ex) {
|
||||
if (aws_ex.error().is_retryable()) {
|
||||
throw filler_exception(format("{}", std::current_exception()).c_str());
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
},
|
||||
{},
|
||||
_as);
|
||||
_is_contiguous_mode = _buffers_size < _max_buffers_size * _buffers_high_watermark;
|
||||
} catch (const filler_exception& ex) {
|
||||
s3l.warn("Fiber for object '{}' experienced an error in buffer filling loop. Reason: {}. Re-issuing the request", _object_name, ex.what());
|
||||
} catch (...) {
|
||||
s3l.trace("Fiber for object '{}' failed: {}, exiting", _object_name, std::current_exception());
|
||||
_get_cv.broken(std::current_exception());
|
||||
|
||||
@@ -90,6 +90,10 @@ struct stats {
|
||||
std::time_t last_modified;
|
||||
};
|
||||
|
||||
struct filler_exception final : std::runtime_error {
|
||||
explicit filler_exception(const char* msg) : std::runtime_error(msg) {}
|
||||
};
|
||||
|
||||
future<> ignore_reply(const http::reply& rep, input_stream<char>&& in_);
|
||||
[[noreturn]] void map_s3_client_exception(std::exception_ptr ex);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user