From 115e8c85e4b5bbd0f5d596424e3253fd1a7fc7dd Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Sun, 29 Jun 2025 13:40:42 +0300 Subject: [PATCH 01/10] s3_client: Improve logging placement for current_range output Relocated logging to occur after determining the `current_range`, ensuring more relevant output during S3 client operations. (cherry picked from commit f1d06901949275ef8f5c19f2f34b1cb1fdbd642b) --- utils/s3/client.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/s3/client.cc b/utils/s3/client.cc index 99af7fba26..bc47efaef3 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -1144,12 +1144,12 @@ class client::chunked_download_source final : public seastar::data_source_impl { _object_name, current_range); } else if (_is_contiguous_mode) { - s3l.trace("Setting contiguous download mode for '{}'", _object_name); current_range = _range; + s3l.trace("Setting contiguous download mode for '{}', range: {}", _object_name, current_range); } else { // In non-contiguous mode we download the object in chunks of _max_buffers_size - s3l.trace("Setting ranged download mode for '{}'", _object_name); current_range = {_range.offset(), std::min(_range.length(), _max_buffers_size - _buffers_size)}; + s3l.trace("Setting ranged download mode for '{}', range: {}", _object_name, current_range); } req._headers["Range"] = current_range.to_header_string(); s3l.trace("Fiber for object '{}' will make HTTP request within range {}", _object_name, current_range); From 4cd17925287f14ad9c82592d416260cb6695c11e Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Sun, 29 Jun 2025 13:41:27 +0300 Subject: [PATCH 02/10] s3_client: Refine logging Fix typo in log message to improve clarity and accuracy during S3 operations. (cherry picked from commit e73b83e0391e9ecafff5811e92333d08f5ca3d19) --- utils/s3/client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/s3/client.cc b/utils/s3/client.cc index bc47efaef3..1075c9ec0a 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -1163,7 +1163,7 @@ class client::chunked_download_source final : public seastar::data_source_impl { if (_range == s3::full_range && reply.get_header("Content-Range").empty()) { auto content_range_header = parse_content_range(reply.get_header("Content-Range")); _range = range{content_range_header.start, content_range_header.total}; - s3l.trace("No range for object '{}' was provided. Setting the range to {} form the Content-Range header", + s3l.trace("No range for object '{}' was provided. Setting the range to {} from the Content-Range header", _object_name, _range); } From 00f10e7f1dfb5fa91a70ca78b939059acecf503d Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Sun, 29 Jun 2025 13:47:54 +0300 Subject: [PATCH 03/10] s3_client: Fix missing negation Restore a missing `not` in a conditional check that caused incorrect behavior during S3 client execution. (cherry picked from commit 6d9cec558a10d18806a6884836a9c452ffef69f7) --- utils/s3/client.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/s3/client.cc b/utils/s3/client.cc index 1075c9ec0a..fabc62f291 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -1160,7 +1160,7 @@ class client::chunked_download_source final : public seastar::data_source_impl { s3l.warn("Fiber for object '{}' failed: {}. Exiting", _object_name, reply._status); throw httpd::unexpected_status_error(reply._status); } - if (_range == s3::full_range && reply.get_header("Content-Range").empty()) { + if (_range == s3::full_range && !reply.get_header("Content-Range").empty()) { auto content_range_header = parse_content_range(reply.get_header("Content-Range")); _range = range{content_range_header.start, content_range_header.total}; s3l.trace("No range for object '{}' was provided. Setting the range to {} from the Content-Range header", From c748a97170e3249fab7b6f8df67fc7d5c20f441b Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Sun, 29 Jun 2025 13:46:54 +0300 Subject: [PATCH 04/10] s3_client: Add test for Content-Range fix Introduce a test that accurately verifies the Content-Range behavior, ensuring the previous fix is properly validated. (cherry picked from commit ec59fcd5e4a522e9adfad9d0c97aab1585e6b62d) --- test/boost/s3_test.cc | 57 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/test/boost/s3_test.cc b/test/boost/s3_test.cc index 66992d5624..49b018b5a3 100644 --- a/test/boost/s3_test.cc +++ b/test/boost/s3_test.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -675,6 +676,62 @@ SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_proxy) { test_download_data_source(make_proxy_client, true, 3 * 1024); } +void test_chunked_download_data_source(const client_maker_function& client_maker, size_t object_size) { + const sstring base_name(fmt::format("test_object-{}", ::getpid())); + + tmpdir tmp; + const auto file_path = tmp.path() / base_name; + + file f = open_file_dma(file_path.native(), open_flags::create | open_flags::wo).get(); + auto output = make_file_output_stream(std::move(f)).get(); + + for (size_t bytes_written = 0; bytes_written < object_size;) { + auto rnd = tests::random::get_bytes(std::min(object_size - bytes_written, 1024ul)); + output.write(reinterpret_cast(rnd.data()), rnd.size()).get(); + bytes_written += rnd.size(); + } + output.close().get(); + + testlog.info("Make client\n"); + semaphore mem(16 << 20); + auto cln = client_maker(mem); + auto close_client = deferred_close(*cln); + const auto object_name = fmt::format("/{}/{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), base_name); + auto delete_object = deferred_delete_object(cln, object_name); + cln->upload_file(file_path, object_name).get(); + + testlog.info("Download object"); + auto in = input_stream(cln->make_chunked_download_source(object_name, s3::full_range)); + auto close = seastar::deferred_close(in); + + file rf = open_file_dma(file_path.native(), open_flags::ro).get(); + auto file_input = make_file_input_stream(std::move(rf)); + auto close_file = seastar::deferred_close(file_input); + + size_t total_size = 0; + while (true) { + auto buf = in.read().get(); + total_size += buf.size(); + if (buf.empty()) { + break; + } + auto file_buf = file_input.read_exactly(buf.size()).get(); + BOOST_REQUIRE_EQUAL(memcmp(buf.begin(), file_buf.begin(), buf.size()), 0); + } + + cln->delete_object(object_name).get(); + cln->close().get(); + BOOST_REQUIRE_EQUAL(total_size, object_size); +} + +SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_with_delays_minio) { + test_chunked_download_data_source(make_minio_client, 20_MiB); +} + +SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_with_delays_proxy) { + test_chunked_download_data_source(make_proxy_client, 20_MiB); +} + void test_object_copy(const client_maker_function& client_maker, size_t chunk_size, size_t chunks) { const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid())); const sstring name_copy(fmt::format("/{}/testobject-{}-copy", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid())); From c841ffe398dff92d154b5b2a6f802b92b0678d44 Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Sun, 29 Jun 2025 13:49:46 +0300 Subject: [PATCH 05/10] s3_client: Enhance test coverage for retry logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend the S3 proxy to support error injection when the client makes multiple requests to the same resource—useful for testing retry behavior and failure handling. (cherry picked from commit c75acd274c41e591c36d8a28d919fd5673c0bc65) --- test/pylib/s3_proxy.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/pylib/s3_proxy.py b/test/pylib/s3_proxy.py index eb3edd5c4b..28104010aa 100644 --- a/test/pylib/s3_proxy.py +++ b/test/pylib/s3_proxy.py @@ -57,6 +57,11 @@ class LRUCache: if len(self.cache) > self.capacity: self.cache.popitem(last=False) + def remove(self, key: str): + with self.lock: + if key in self.cache: + del self.cache[key] + # Simple proxy between s3 client and minio to randomly inject errors and simulate cases when the request succeeds but the wire got "broken" def true_or_false(): @@ -187,6 +192,8 @@ class InjectingHandler(BaseHTTPRequestHandler): policy.error_count += 1 self.respond_with_error(reset_connection=policy.server_should_fail) else: + # Once the request is successfully processed, we remove the policy from the cache to make following request to the resource being illegible to fail + self.policies.remove(self.path) self.send_response(response.status_code) for key, value in response.headers.items(): if key.upper() != 'CONTENT-LENGTH': From 54db6ca08877d2c705c5855e2f102ae0617b1eb5 Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Sun, 29 Jun 2025 13:58:03 +0300 Subject: [PATCH 06/10] s3_client: Stop retries in chunked download source Disable retries for S3 requests in the chunked download source to prevent duplicate chunks from corrupting the buffer queue. The response handler now throws an exception to bypass the retry strategy, allowing the next range to be attempted cleanly. This exception is only triggered for retryable errors; unretryable ones immediately halt further requests. (cherry picked from commit d2d69cbc8c90cd3f863c7ec90bc132e54caaba3d) --- test/boost/s3_test.cc | 9 +++++++++ utils/s3/client.cc | 23 +++++++++++++++++++++++ utils/s3/client.hh | 4 ++++ 3 files changed, 36 insertions(+) diff --git a/test/boost/s3_test.cc b/test/boost/s3_test.cc index 49b018b5a3..fbb452d876 100644 --- a/test/boost/s3_test.cc +++ b/test/boost/s3_test.cc @@ -26,6 +26,7 @@ #include "test/lib/test_utils.hh" #include "test/lib/tmpdir.hh" #include "utils/assert.hh" +#include "utils/error_injection.hh" #include "utils/s3/client.hh" #include "utils/s3/creds.hh" #include "utils/s3/utils/manip_s3.hh" @@ -709,12 +710,20 @@ void test_chunked_download_data_source(const client_maker_function& client_maker auto close_file = seastar::deferred_close(file_input); size_t total_size = 0; + size_t trigger_counter = 0; while (true) { + // We want the background fiber to fill the buffer queue and start waiting to drain it + seastar::sleep(100us).get(); auto buf = in.read().get(); total_size += buf.size(); if (buf.empty()) { break; } + ++trigger_counter; + if (trigger_counter % 10 == 0) { + utils::get_local_injector().enable("break_s3_inflight_req", true); + } + auto file_buf = file_input.read_exactly(buf.size()).get(); BOOST_REQUIRE_EQUAL(memcmp(buf.begin(), file_buf.begin(), buf.size()), 0); } diff --git a/utils/s3/client.cc b/utils/s3/client.cc index fabc62f291..9e328daef6 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -287,6 +287,8 @@ client::group_client& client::find_or_create_client() { } throw storage_io_error {EIO, format("S3 request failed with ({})", status)}; + } catch (const filler_exception&) { + throw; } catch (...) { auto e = std::current_exception(); throw storage_io_error {EIO, format("S3 error ({})", e)}; @@ -1168,6 +1170,8 @@ class client::chunked_download_source final : public seastar::data_source_impl { _range); } auto in = std::move(in_); + std::exception_ptr ex; + try { while (_buffers_size < _max_buffers_size && !_is_finished) { auto start = s3_clock::now(); s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range); @@ -1190,12 +1194,31 @@ class client::chunked_download_source final : public seastar::data_source_impl { s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size); _buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size)); _get_cv.signal(); + utils::get_local_injector().inject("break_s3_inflight_req", [] { + // Inject retryable error after some data was already downloaded + throw aws::aws_exception(aws::aws_error::get_errors().at("ThrottlingException")); + }); + } + } catch (...) { + ex = std::current_exception(); } co_await in.close(); + if (ex) { + try { + std::rethrow_exception(ex); + } catch (const aws::aws_exception& aws_ex) { + if (aws_ex.error().is_retryable()) { + throw filler_exception(format("{}", std::current_exception()).c_str()); + } + throw; + } + } }, {}, _as); _is_contiguous_mode = _buffers_size < _max_buffers_size * _buffers_high_watermark; + } catch (const filler_exception& ex) { + s3l.warn("Fiber for object '{}' experienced an error in buffer filling loop. Reason: {}. Re-issuing the request", _object_name, ex.what()); } catch (...) { s3l.trace("Fiber for object '{}' failed: {}, exiting", _object_name, std::current_exception()); _get_cv.broken(std::current_exception()); diff --git a/utils/s3/client.hh b/utils/s3/client.hh index 3dc2b42452..a3b3495053 100644 --- a/utils/s3/client.hh +++ b/utils/s3/client.hh @@ -90,6 +90,10 @@ struct stats { std::time_t last_modified; }; +struct filler_exception final : std::runtime_error { + explicit filler_exception(const char* msg) : std::runtime_error(msg) {} +}; + future<> ignore_reply(const http::reply& rep, input_stream&& in_); [[noreturn]] void map_s3_client_exception(std::exception_ptr ex); From 22739df69f07bf89e52b1f349696c2dc906bab4f Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Sun, 29 Jun 2025 13:58:58 +0300 Subject: [PATCH 07/10] s3_client: Fix indentation in try..catch block Correct indentation in the `try..catch` block to improve code readability and maintain consistent formatting. (cherry picked from commit e50f247bf1bc531ce7a28a8fd92af542ac6a064c) --- utils/s3/client.cc | 48 +++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/utils/s3/client.cc b/utils/s3/client.cc index 9e328daef6..cce53fc7a6 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -1172,32 +1172,32 @@ class client::chunked_download_source final : public seastar::data_source_impl { auto in = std::move(in_); std::exception_ptr ex; try { - while (_buffers_size < _max_buffers_size && !_is_finished) { - auto start = s3_clock::now(); - s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range); - auto buf = co_await in.read(); - auto buff_size = buf.size(); - gc.read_stats.update(buff_size, s3_clock::now() - start); - _range += buff_size; - _buffers_size += buff_size; - if (buff_size == 0 && _range.length() == 0) { - s3l.trace("Fiber for object '{}' signals EOS", _object_name); + while (_buffers_size < _max_buffers_size && !_is_finished) { + auto start = s3_clock::now(); + s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range); + auto buf = co_await in.read(); + auto buff_size = buf.size(); + gc.read_stats.update(buff_size, s3_clock::now() - start); + _range += buff_size; + _buffers_size += buff_size; + if (buff_size == 0 && _range.length() == 0) { + s3l.trace("Fiber for object '{}' signals EOS", _object_name); + _buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size)); + _get_cv.signal(); + _is_finished = true; + break; + } + if (buff_size == 0) { + // The requested range is fully downloaded + break; + } + s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size); _buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size)); _get_cv.signal(); - _is_finished = true; - break; - } - if (buff_size == 0) { - // The requested range is fully downloaded - break; - } - s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size); - _buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size)); - _get_cv.signal(); - utils::get_local_injector().inject("break_s3_inflight_req", [] { - // Inject retryable error after some data was already downloaded - throw aws::aws_exception(aws::aws_error::get_errors().at("ThrottlingException")); - }); + utils::get_local_injector().inject("break_s3_inflight_req", [] { + // Inject retryable error after some data was already downloaded + throw aws::aws_exception(aws::aws_error::get_errors().at("ThrottlingException")); + }); } } catch (...) { ex = std::current_exception(); From 7f303bfda3aa3ad37910ebbf979419c9077a283e Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Tue, 1 Jul 2025 16:19:40 +0300 Subject: [PATCH 08/10] s3_client: Fix edge case when the range is exhausted Handle case where the download loop exits after consuming all data, but before receiving an empty buffer signaling EOF. Without this, the next request is sent with a non-zero offset and zero length, resulting in "Range request cannot be satisfied" errors. Now, an empty buffer is pushed to indicate completion and exit the fiber properly. (cherry picked from commit 49e8c14a862259562f5331203d9b9119258b879d) --- utils/s3/client.cc | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/utils/s3/client.cc b/utils/s3/client.cc index cce53fc7a6..4ddc75c687 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -1153,6 +1153,13 @@ class client::chunked_download_source final : public seastar::data_source_impl { current_range = {_range.offset(), std::min(_range.length(), _max_buffers_size - _buffers_size)}; s3l.trace("Setting ranged download mode for '{}', range: {}", _object_name, current_range); } + if (current_range.length() == 0) { + s3l.trace("Fiber for object '{}' completed downloading, signals EOS and leaving fiber", _object_name); + _buffers.emplace_back(temporary_buffer(), co_await _client->claim_memory(0)); + _get_cv.signal(); + _is_finished = true; + co_return; + } req._headers["Range"] = current_range.to_header_string(); s3l.trace("Fiber for object '{}' will make HTTP request within range {}", _object_name, current_range); co_await _client->make_request( @@ -1218,7 +1225,7 @@ class client::chunked_download_source final : public seastar::data_source_impl { _as); _is_contiguous_mode = _buffers_size < _max_buffers_size * _buffers_high_watermark; } catch (const filler_exception& ex) { - s3l.warn("Fiber for object '{}' experienced an error in buffer filling loop. Reason: {}. Re-issuing the request", _object_name, ex.what()); + s3l.warn("Fiber for object '{}' experienced an error in buffer filling loop. Reason: {}. Re-issuing the request", _object_name, ex); } catch (...) { s3l.trace("Fiber for object '{}' failed: {}, exiting", _object_name, std::current_exception()); _get_cv.broken(std::current_exception()); From dbf4bd162e95768fbe0cb532f25ddb54d7301e14 Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Sun, 29 Jun 2025 14:00:16 +0300 Subject: [PATCH 09/10] s3_test: Add trace logging for default_retry_strategy Introduce trace-level logging for `default_retry_strategy` in `s3_test` to improve visibility into retry logic during test execution. (cherry picked from commit a5246bbe53f98952eb2488c333b45d3f13b79474) --- test/boost/test_config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/boost/test_config.yaml b/test/boost/test_config.yaml index 53875549ff..819f923257 100644 --- a/test/boost/test_config.yaml +++ b/test/boost/test_config.yaml @@ -48,6 +48,6 @@ custom_args: bloom_filter_test: - '-c1' s3_test: - - '-c2 -m2G --logger-log-level s3=trace --logger-log-level http=trace' + - '-c2 -m2G --logger-log-level s3=trace --logger-log-level http=trace --logger-log-level default_retry_strategy=trace' run_in_debug: - logalloc_standard_allocator_segment_pool_backend_test From 873c8503cd08091f5add3162f7406f939c6f213c Mon Sep 17 00:00:00 2001 From: Ernest Zaslavsky Date: Sun, 29 Jun 2025 15:20:48 +0300 Subject: [PATCH 10/10] s3_test: Add s3_client test for non-retryable error handling Introduce a test that injects a non-retryable error and verifies that the chunked download source throws an exception as expected. (cherry picked from commit acf15eba8e2ca73fbf88049799eaaf9af5f7c1a1) --- test/boost/s3_test.cc | 24 +++++++++++++++++++++++- utils/s3/client.cc | 4 ++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/test/boost/s3_test.cc b/test/boost/s3_test.cc index fbb452d876..80c2812502 100644 --- a/test/boost/s3_test.cc +++ b/test/boost/s3_test.cc @@ -27,6 +27,7 @@ #include "test/lib/tmpdir.hh" #include "utils/assert.hh" #include "utils/error_injection.hh" +#include "utils/s3/aws_error.hh" #include "utils/s3/client.hh" #include "utils/s3/creds.hh" #include "utils/s3/utils/manip_s3.hh" @@ -728,9 +729,30 @@ void test_chunked_download_data_source(const client_maker_function& client_maker BOOST_REQUIRE_EQUAL(memcmp(buf.begin(), file_buf.begin(), buf.size()), 0); } + BOOST_REQUIRE_EQUAL(total_size, object_size); +#ifdef SCYLLA_ENABLE_ERROR_INJECTION + utils::get_local_injector().enable("kill_s3_inflight_req"); + auto in_throw = input_stream(cln->make_chunked_download_source(object_name, s3::full_range)); + auto close_throw = seastar::deferred_close(in_throw); + + auto reader = [&in_throw] { + while (true) { + auto buf = in_throw.read().get(); + if (buf.empty()) { + break; + } + } + }; + BOOST_REQUIRE_EXCEPTION( + reader(), storage_io_error, [](const storage_io_error& e) { + return e.what() == "S3 request failed. Code: 16. Reason: "sv; + }); +#else + testlog.info("Skipping error injection test, as it requires SCYLLA_ENABLE_ERROR_INJECTION to be enabled"); +#endif + cln->delete_object(object_name).get(); cln->close().get(); - BOOST_REQUIRE_EQUAL(total_size, object_size); } SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_with_delays_minio) { diff --git a/utils/s3/client.cc b/utils/s3/client.cc index 4ddc75c687..733c42065e 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -1180,6 +1180,10 @@ class client::chunked_download_source final : public seastar::data_source_impl { std::exception_ptr ex; try { while (_buffers_size < _max_buffers_size && !_is_finished) { + utils::get_local_injector().inject("kill_s3_inflight_req", [] { + // Inject non-retryable error to emulate source failure + throw aws::aws_exception(aws::aws_error::get_errors().at("ResourceNotFound")); + }); auto start = s3_clock::now(); s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range); auto buf = co_await in.read();