diff --git a/test/boost/s3_test.cc b/test/boost/s3_test.cc index 66992d5624..80c2812502 100644 --- a/test/boost/s3_test.cc +++ b/test/boost/s3_test.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -25,6 +26,8 @@ #include "test/lib/test_utils.hh" #include "test/lib/tmpdir.hh" #include "utils/assert.hh" +#include "utils/error_injection.hh" +#include "utils/s3/aws_error.hh" #include "utils/s3/client.hh" #include "utils/s3/creds.hh" #include "utils/s3/utils/manip_s3.hh" @@ -675,6 +678,91 @@ SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_proxy) { test_download_data_source(make_proxy_client, true, 3 * 1024); } +void test_chunked_download_data_source(const client_maker_function& client_maker, size_t object_size) { + const sstring base_name(fmt::format("test_object-{}", ::getpid())); + + tmpdir tmp; + const auto file_path = tmp.path() / base_name; + + file f = open_file_dma(file_path.native(), open_flags::create | open_flags::wo).get(); + auto output = make_file_output_stream(std::move(f)).get(); + + for (size_t bytes_written = 0; bytes_written < object_size;) { + auto rnd = tests::random::get_bytes(std::min(object_size - bytes_written, 1024ul)); + output.write(reinterpret_cast(rnd.data()), rnd.size()).get(); + bytes_written += rnd.size(); + } + output.close().get(); + + testlog.info("Make client\n"); + semaphore mem(16 << 20); + auto cln = client_maker(mem); + auto close_client = deferred_close(*cln); + const auto object_name = fmt::format("/{}/{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), base_name); + auto delete_object = deferred_delete_object(cln, object_name); + cln->upload_file(file_path, object_name).get(); + + testlog.info("Download object"); + auto in = input_stream(cln->make_chunked_download_source(object_name, s3::full_range)); + auto close = seastar::deferred_close(in); + + file rf = open_file_dma(file_path.native(), open_flags::ro).get(); + auto file_input = make_file_input_stream(std::move(rf)); + auto close_file = seastar::deferred_close(file_input); + + size_t total_size = 0; + size_t trigger_counter = 0; + while (true) { + // We want the background fiber to fill the buffer queue and start waiting to drain it + seastar::sleep(100us).get(); + auto buf = in.read().get(); + total_size += buf.size(); + if (buf.empty()) { + break; + } + ++trigger_counter; + if (trigger_counter % 10 == 0) { + utils::get_local_injector().enable("break_s3_inflight_req", true); + } + + auto file_buf = file_input.read_exactly(buf.size()).get(); + BOOST_REQUIRE_EQUAL(memcmp(buf.begin(), file_buf.begin(), buf.size()), 0); + } + + BOOST_REQUIRE_EQUAL(total_size, object_size); +#ifdef SCYLLA_ENABLE_ERROR_INJECTION + utils::get_local_injector().enable("kill_s3_inflight_req"); + auto in_throw = input_stream(cln->make_chunked_download_source(object_name, s3::full_range)); + auto close_throw = seastar::deferred_close(in_throw); + + auto reader = [&in_throw] { + while (true) { + auto buf = in_throw.read().get(); + if (buf.empty()) { + break; + } + } + }; + BOOST_REQUIRE_EXCEPTION( + reader(), storage_io_error, [](const storage_io_error& e) { + return e.what() == "S3 request failed. Code: 16. Reason: "sv; + }); +#else + testlog.info("Skipping error injection test, as it requires SCYLLA_ENABLE_ERROR_INJECTION to be enabled"); +#endif + + cln->delete_object(object_name).get(); + cln->close().get(); +} + +SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_with_delays_minio) { + test_chunked_download_data_source(make_minio_client, 20_MiB); +} + +SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_with_delays_proxy) { + test_chunked_download_data_source(make_proxy_client, 20_MiB); +} + void test_object_copy(const client_maker_function& client_maker, size_t chunk_size, size_t chunks) { const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid())); const sstring name_copy(fmt::format("/{}/testobject-{}-copy", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid())); diff --git a/test/boost/test_config.yaml b/test/boost/test_config.yaml index 53875549ff..819f923257 100644 --- a/test/boost/test_config.yaml +++ b/test/boost/test_config.yaml @@ -48,6 +48,6 @@ custom_args: bloom_filter_test: - '-c1' s3_test: - - '-c2 -m2G --logger-log-level s3=trace --logger-log-level http=trace' + - '-c2 -m2G --logger-log-level s3=trace --logger-log-level http=trace --logger-log-level default_retry_strategy=trace' run_in_debug: - logalloc_standard_allocator_segment_pool_backend_test diff --git a/test/pylib/s3_proxy.py b/test/pylib/s3_proxy.py index eb3edd5c4b..28104010aa 100644 --- a/test/pylib/s3_proxy.py +++ b/test/pylib/s3_proxy.py @@ -57,6 +57,11 @@ class LRUCache: if len(self.cache) > self.capacity: self.cache.popitem(last=False) + def remove(self, key: str): + with self.lock: + if key in self.cache: + del self.cache[key] + # Simple proxy between s3 client and minio to randomly inject errors and simulate cases when the request succeeds but the wire got "broken" def true_or_false(): @@ -187,6 +192,8 @@ class InjectingHandler(BaseHTTPRequestHandler): policy.error_count += 1 self.respond_with_error(reset_connection=policy.server_should_fail) else: + # Once the request is successfully processed, we remove the policy from the cache to make following request to the resource being illegible to fail + self.policies.remove(self.path) self.send_response(response.status_code) for key, value in response.headers.items(): if key.upper() != 'CONTENT-LENGTH': diff --git a/utils/s3/client.cc b/utils/s3/client.cc index 99af7fba26..733c42065e 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -287,6 +287,8 @@ client::group_client& client::find_or_create_client() { } throw storage_io_error {EIO, format("S3 request failed with ({})", status)}; + } catch (const filler_exception&) { + throw; } catch (...) { auto e = std::current_exception(); throw storage_io_error {EIO, format("S3 error ({})", e)}; @@ -1144,12 +1146,19 @@ class client::chunked_download_source final : public seastar::data_source_impl { _object_name, current_range); } else if (_is_contiguous_mode) { - s3l.trace("Setting contiguous download mode for '{}'", _object_name); current_range = _range; + s3l.trace("Setting contiguous download mode for '{}', range: {}", _object_name, current_range); } else { // In non-contiguous mode we download the object in chunks of _max_buffers_size - s3l.trace("Setting ranged download mode for '{}'", _object_name); current_range = {_range.offset(), std::min(_range.length(), _max_buffers_size - _buffers_size)}; + s3l.trace("Setting ranged download mode for '{}', range: {}", _object_name, current_range); + } + if (current_range.length() == 0) { + s3l.trace("Fiber for object '{}' completed downloading, signals EOS and leaving fiber", _object_name); + _buffers.emplace_back(temporary_buffer(), co_await _client->claim_memory(0)); + _get_cv.signal(); + _is_finished = true; + co_return; } req._headers["Range"] = current_range.to_header_string(); s3l.trace("Fiber for object '{}' will make HTTP request within range {}", _object_name, current_range); @@ -1160,42 +1169,67 @@ class client::chunked_download_source final : public seastar::data_source_impl { s3l.warn("Fiber for object '{}' failed: {}. Exiting", _object_name, reply._status); throw httpd::unexpected_status_error(reply._status); } - if (_range == s3::full_range && reply.get_header("Content-Range").empty()) { + if (_range == s3::full_range && !reply.get_header("Content-Range").empty()) { auto content_range_header = parse_content_range(reply.get_header("Content-Range")); _range = range{content_range_header.start, content_range_header.total}; - s3l.trace("No range for object '{}' was provided. Setting the range to {} form the Content-Range header", + s3l.trace("No range for object '{}' was provided. Setting the range to {} from the Content-Range header", _object_name, _range); } auto in = std::move(in_); - while (_buffers_size < _max_buffers_size && !_is_finished) { - auto start = s3_clock::now(); - s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range); - auto buf = co_await in.read(); - auto buff_size = buf.size(); - gc.read_stats.update(buff_size, s3_clock::now() - start); - _range += buff_size; - _buffers_size += buff_size; - if (buff_size == 0 && _range.length() == 0) { - s3l.trace("Fiber for object '{}' signals EOS", _object_name); + std::exception_ptr ex; + try { + while (_buffers_size < _max_buffers_size && !_is_finished) { + utils::get_local_injector().inject("kill_s3_inflight_req", [] { + // Inject non-retryable error to emulate source failure + throw aws::aws_exception(aws::aws_error::get_errors().at("ResourceNotFound")); + }); + auto start = s3_clock::now(); + s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range); + auto buf = co_await in.read(); + auto buff_size = buf.size(); + gc.read_stats.update(buff_size, s3_clock::now() - start); + _range += buff_size; + _buffers_size += buff_size; + if (buff_size == 0 && _range.length() == 0) { + s3l.trace("Fiber for object '{}' signals EOS", _object_name); + _buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size)); + _get_cv.signal(); + _is_finished = true; + break; + } + if (buff_size == 0) { + // The requested range is fully downloaded + break; + } + s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size); _buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size)); _get_cv.signal(); - _is_finished = true; - break; + utils::get_local_injector().inject("break_s3_inflight_req", [] { + // Inject retryable error after some data was already downloaded + throw aws::aws_exception(aws::aws_error::get_errors().at("ThrottlingException")); + }); } - if (buff_size == 0) { - // The requested range is fully downloaded - break; - } - s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size); - _buffers.emplace_back(std::move(buf), co_await _client->claim_memory(buff_size)); - _get_cv.signal(); + } catch (...) { + ex = std::current_exception(); } co_await in.close(); + if (ex) { + try { + std::rethrow_exception(ex); + } catch (const aws::aws_exception& aws_ex) { + if (aws_ex.error().is_retryable()) { + throw filler_exception(format("{}", std::current_exception()).c_str()); + } + throw; + } + } }, {}, _as); _is_contiguous_mode = _buffers_size < _max_buffers_size * _buffers_high_watermark; + } catch (const filler_exception& ex) { + s3l.warn("Fiber for object '{}' experienced an error in buffer filling loop. Reason: {}. Re-issuing the request", _object_name, ex); } catch (...) { s3l.trace("Fiber for object '{}' failed: {}, exiting", _object_name, std::current_exception()); _get_cv.broken(std::current_exception()); diff --git a/utils/s3/client.hh b/utils/s3/client.hh index 3dc2b42452..a3b3495053 100644 --- a/utils/s3/client.hh +++ b/utils/s3/client.hh @@ -90,6 +90,10 @@ struct stats { std::time_t last_modified; }; +struct filler_exception final : std::runtime_error { + explicit filler_exception(const char* msg) : std::runtime_error(msg) {} +}; + future<> ignore_reply(const http::reply& rep, input_stream&& in_); [[noreturn]] void map_s3_client_exception(std::exception_ptr ex);