Files
scylladb/utils/s3/client.cc
Pavel Emelyanov 89d8ae5cb6 Merge 'http: prepare http clients retry machinery refactoring' from Ernest Zaslavsky
Today S3 client has well established and well testes (hopefully) http request retry strategy, in the rest of clients it looks like we are trying to achieve the same writing the same code over and over again and of course missing corner cases that already been addressed in the S3 client.
This PR aims to extract the code that could assist other clients to detect the retryability of an error originating from the http client, reuse the built in seastar http client retryability and to minimize the boilerplate of http client exception handling

No backport needed since it is only refactoring of the existing code

Closes scylladb/scylladb#28250

* github.com:scylladb/scylladb:
  exceptions: add helper to build a chain of error handlers
  http: extract error classification code
  aws_error: extract `retryable` from aws_error
2026-02-18 10:06:37 +03:00

1960 lines
86 KiB
C++

/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <fmt/format.h>
#include <exception>
#include <initializer_list>
#include <memory>
#include <regex>
#include <stdexcept>
#if __has_include(<rapidxml.h>)
#include <rapidxml.h>
#else
#include <rapidxml/rapidxml.hpp>
#endif
#include <seastar/core/coroutine.hh>
#include <seastar/core/fstream.hh>
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/on_internal_error.hh>
#include <seastar/core/pipe.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/units.hh>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/short_streams.hh>
#include <seastar/util/lazy.hh>
#include <seastar/http/request.hh>
#include <seastar/http/exception.hh>
#include "default_aws_retry_strategy.hh"
#include "db/config.hh"
#include "utils/assert.hh"
#include "utils/s3/aws_error.hh"
#include "utils/s3/client.hh"
#include "utils/s3/credentials_providers/environment_aws_credentials_provider.hh"
#include "utils/s3/credentials_providers/instance_profile_credentials_provider.hh"
#include "utils/s3/credentials_providers/sts_assume_role_credentials_provider.hh"
#include "utils/div_ceil.hh"
#include "utils/http.hh"
#include "utils/memory_data_sink.hh"
#include "utils/chunked_vector.hh"
#include "utils/aws_sigv4.hh"
#include "db_clock.hh"
#include "utils/log.hh"
using namespace std::chrono_literals;
using namespace aws;
template <>
struct fmt::formatter<s3::tag> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const s3::tag& tag, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "<Tag><Key>{}</Key><Value>{}</Value></Tag>",
tag.key, tag.value);
}
};
namespace utils {
inline size_t iovec_len(const std::vector<iovec>& iov)
{
size_t ret = 0;
for (auto&& e : iov) {
ret += e.iov_len;
}
return ret;
}
}
namespace s3 {
static logging::logger s3l("s3");
// "Each part must be at least 5 MB in size, except the last part."
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html
static constexpr size_t aws_minimum_part_size = 5_MiB;
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
static constexpr size_t aws_maximum_part_size = 5_GiB;
// "Part numbers can be any number from 1 to 10,000, inclusive."
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html
static constexpr unsigned aws_maximum_parts_in_piece = 10'000;
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingObjects.html
static constexpr size_t aws_maximum_object_size = aws_maximum_parts_in_piece * aws_maximum_part_size;
future<> ignore_reply(const http::reply& rep, input_stream<char>&& in_) {
auto in = std::move(in_);
co_await util::skip_entire_stream(in);
}
client::client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global_factory gf, private_tag, std::unique_ptr<http::experimental::retry_strategy> rs)
: _host(std::move(host))
, _cfg(std::move(cfg))
, _creds_sem(1)
, _creds_invalidation_timer([this] {
std::ignore = [this]() -> future<> {
auto units = co_await get_units(_creds_sem, 1);
s3l.info("Credentials update attempt in background failed. Outdated credentials will be discarded, triggering synchronous re-obtainment"
" attempts for future requests.");
_credentials = {};
}();
})
, _creds_update_timer([this] {
std::ignore = [this]() -> future<> {
auto units = co_await get_units(_creds_sem, 1);
s3l.info("Update creds in the background");
try {
co_await update_credentials_and_rearm();
} catch (...) {
_credentials = {};
}
}();
})
, _gf(std::move(gf))
, _memory(mem)
, _retry_strategy(std::move(rs)) {
_creds_provider_chain
.add_credentials_provider(std::make_unique<aws::environment_aws_credentials_provider>())
.add_credentials_provider(std::make_unique<aws::instance_profile_credentials_provider>())
.add_credentials_provider(std::make_unique<aws::sts_assume_role_credentials_provider>(_cfg->region, _cfg->role_arn));
_creds_update_timer.arm(lowres_clock::now());
if (!_retry_strategy) {
_retry_strategy = std::make_unique<aws::default_aws_retry_strategy>();
}
}
future<> client::update_config(std::string region, std::string ira) {
endpoint_config new_cfg = {
.port = _cfg->port,
.use_https = _cfg->use_https,
.region = std::move(region),
.role_arn = std::move(ira),
};
_cfg = make_lw_shared<endpoint_config>(std::move(new_cfg));
auto units = co_await get_units(_creds_sem, 1);
_creds_provider_chain.invalidate_credentials();
_credentials = {};
_creds_update_timer.rearm(lowres_clock::now());
}
shared_ptr<client> client::make(std::string endpoint, endpoint_config_ptr cfg, semaphore& mem, global_factory gf) {
return seastar::make_shared<client>(std::move(endpoint), std::move(cfg), mem, std::move(gf), private_tag{});
}
shared_ptr<client> client::make(std::string ep, std::string region, std::string iam_role_arn, semaphore& memory, global_factory gf) {
auto url = utils::http::parse_simple_url(ep);
endpoint_config cfg = {
.port = url.port,
.use_https = url.is_https(),
.region = std::move(region),
.role_arn = std::move(iam_role_arn),
};
return make(url.host, make_lw_shared<endpoint_config>(std::move(cfg)), memory, gf);
}
future<> client::update_credentials_and_rearm() {
_credentials = co_await _creds_provider_chain.get_aws_credentials();
_creds_invalidation_timer.rearm(_credentials.expires_at);
_creds_update_timer.rearm(_credentials.expires_at - 1h);
}
future<> client::authorize(http::request& req) {
if (!_credentials) [[unlikely]] {
auto units = co_await get_units(_creds_sem, 1);
if (!_credentials) {
s3l.info("Update creds synchronously");
co_await update_credentials_and_rearm();
}
}
auto time_point_str = utils::aws::format_time_point(db_clock::now());
auto time_point_st = time_point_str.substr(0, 8);
req._headers["x-amz-date"] = time_point_str;
req._headers["x-amz-content-sha256"] = "UNSIGNED-PAYLOAD";
if (!_credentials.session_token.empty()) {
req._headers["x-amz-security-token"] = _credentials.session_token;
}
std::map<std::string_view, std::string_view> signed_headers;
sstring signed_headers_list = "";
// AWS requires all x-... and Host: headers to be signed
signed_headers["host"] = req._headers["Host"];
for (const auto& [name, value] : req._headers) {
if (name.starts_with("x-")) {
signed_headers[name] = value;
}
}
unsigned header_nr = signed_headers.size();
for (const auto& h : signed_headers) {
signed_headers_list += seastar::format("{}{}", h.first, header_nr == 1 ? "" : ";");
header_nr--;
}
sstring query_string = "";
std::map<std::string_view, std::string_view> query_parameters;
for (const auto& q : req.get_query_params()) {
query_parameters[q.first] = q.second.back();
}
unsigned query_nr = query_parameters.size();
for (const auto& q : query_parameters) {
query_string += seastar::format("{}={}{}", http::internal::url_encode(q.first), http::internal::url_encode(q.second), query_nr == 1 ? "" : "&");
query_nr--;
}
auto sig = utils::aws::get_signature(
_credentials.access_key_id, _credentials.secret_access_key,
_host, req._url, req._method,
utils::aws::omit_datestamp_expiration_check,
signed_headers_list, signed_headers,
utils::aws::unsigned_content,
_cfg->region, "s3", query_string);
req._headers["Authorization"] = seastar::format("AWS4-HMAC-SHA256 Credential={}/{}/{}/s3/aws4_request,SignedHeaders={},Signature={}", _credentials.access_key_id, time_point_st, _cfg->region, signed_headers_list, sig);
}
future<semaphore_units<>> client::claim_memory(size_t size, abort_source* as) {
if (as) {
return get_units(_memory, size, *as);
}
return get_units(_memory, size);
}
client::group_client::group_client(std::unique_ptr<http::experimental::connection_factory> f, unsigned max_conn) : http(std::move(f), max_conn) {
}
void client::group_client::register_metrics(std::string class_name, std::string host) {
namespace sm = seastar::metrics;
auto ep_label = sm::label("endpoint")(host);
auto sg_label = sm::label("class")(class_name);
metrics.add_group("s3", {
sm::make_gauge("nr_connections", [this] { return http.connections_nr(); },
sm::description("Total number of connections"), {ep_label, sg_label}),
sm::make_gauge("nr_active_connections", [this] { return http.connections_nr() - http.idle_connections_nr(); },
sm::description("Total number of connections with running requests"), {ep_label, sg_label}),
sm::make_counter("total_new_connections", [this] { return http.total_new_connections_nr(); },
sm::description("Total number of new connections created so far"), {ep_label, sg_label}),
sm::make_counter("total_read_requests", [this] { return read_stats.ops; },
sm::description("Total number of object read requests"), {ep_label, sg_label}),
sm::make_counter("total_write_requests", [this] { return write_stats.ops; },
sm::description("Total number of object write requests"), {ep_label, sg_label}),
sm::make_counter("total_read_bytes", [this] { return read_stats.bytes; },
sm::description("Total number of bytes read from objects"), {ep_label, sg_label}),
sm::make_counter("total_write_bytes", [this] { return write_stats.bytes; },
sm::description("Total number of bytes written to objects"), {ep_label, sg_label}),
sm::make_counter("total_read_latency_sec", [this] { return read_stats.duration.count(); },
sm::description("Total time spent reading data from objects"), {ep_label, sg_label}),
sm::make_counter("total_write_latency_sec", [this] { return write_stats.duration.count(); },
sm::description("Total time spend writing data to objects"), {ep_label, sg_label}),
sm::make_counter("total_read_prefetch_bytes", [this] { return prefetch_bytes; },
sm::description("Total number of bytes requested from object"), {ep_label, sg_label}),
sm::make_counter("downloads_blocked_on_memory",
[this] { return downloads_blocked_on_memory; },
sm::description("Counts the number of times S3 client downloads were delayed due to insufficient memory availability"),
{ep_label, sg_label})
});
}
client::group_client& client::find_or_create_client() {
auto sg = current_scheduling_group();
auto it = _https.find(sg);
if (it == _https.end()) [[unlikely]] {
auto factory = std::make_unique<utils::http::dns_connection_factory>(_host, _cfg->port, _cfg->use_https, s3l);
// Limit the maximum number of connections this group's http client
// may have proportional to its shares. Shares are typically in the
// range of 100...1000, thus resulting in 1..10 connections
unsigned max_connections = _cfg->max_connections.has_value() ? *_cfg->max_connections : std::max((unsigned)(sg.get_shares() / 100), 1u);
it = _https.emplace(std::piecewise_construct,
std::forward_as_tuple(sg),
std::forward_as_tuple(std::move(factory), max_connections)
).first;
it->second.register_metrics(sg.name(), _host);
}
return it->second;
}
[[noreturn]] void map_s3_client_exception(std::exception_ptr ex) {
seastar::memory::scoped_critical_alloc_section alloc;
try {
std::rethrow_exception(std::move(ex));
} catch (const aws::aws_exception& e) {
int error_code;
switch (e.error().get_error_type()) {
case aws::aws_error_type::HTTP_NOT_FOUND:
case aws::aws_error_type::RESOURCE_NOT_FOUND:
case aws::aws_error_type::NO_SUCH_BUCKET:
case aws::aws_error_type::NO_SUCH_KEY:
case aws::aws_error_type::NO_SUCH_UPLOAD:
error_code = ENOENT;
break;
case aws::aws_error_type::HTTP_FORBIDDEN:
case aws::aws_error_type::HTTP_UNAUTHORIZED:
case aws::aws_error_type::ACCESS_DENIED:
error_code = EACCES;
break;
default:
error_code = EIO;
}
throw storage_io_error{error_code, format("S3 request failed. Code: {}. Reason: {}", e.error().get_error_type(), e.what())};
} catch (const httpd::unexpected_status_error& e) {
auto status = e.status();
if (http::reply::classify_status(status) == http::reply::status_class::redirection || status == http::reply::status_type::not_found) {
throw storage_io_error {ENOENT, format("S3 object doesn't exist ({})", status)};
}
if (status == http::reply::status_type::forbidden || status == http::reply::status_type::unauthorized) {
throw storage_io_error {EACCES, format("S3 access denied ({})", status)};
}
throw storage_io_error {EIO, format("S3 request failed with ({})", status)};
} catch (...) {
auto e = std::current_exception();
throw storage_io_error {EIO, format("S3 error ({})", e)};
}
}
http::experimental::client::reply_handler client::wrap_handler(http::request& request,
http::experimental::client::reply_handler handler,
std::optional<http::reply::status_type> expected) {
return [this, &request, expected, handler = std::move(handler)](const http::reply& rep, input_stream<char>&& in) -> future<> {
auto _in = std::move(in);
auto status_class = seastar::http::reply::classify_status(rep._status);
std::optional<aws_error> possible_error;
if (status_class != seastar::http::reply::status_class::informational && status_class != seastar::http::reply::status_class::success) {
possible_error = aws_error::parse(co_await seastar::util::read_entire_stream_contiguous(_in));
if (!possible_error) {
possible_error = aws_error::from_http_code(rep._status);
}
}
if (possible_error) {
auto should_retry = possible_error->is_retryable();
if (possible_error->get_error_type() == aws::aws_error_type::REQUEST_TIME_TOO_SKEWED) {
s3l.warn("Request failed with REQUEST_TIME_TOO_SKEWED. Machine time: {}, request timestamp: {}",
utils::aws::format_time_point(db_clock::now()),
request.get_header("x-amz-date"));
should_retry = utils::http::retryable::yes;
co_await authorize(request);
}
if (possible_error->get_error_type() == aws::aws_error_type::EXPIRED_TOKEN) {
s3l.warn("Request failed with EXPIRED_TOKEN. Resetting credentials");
_credentials = {};
should_retry = utils::http::retryable::yes;
co_await authorize(request);
}
co_await coroutine::return_exception_ptr(std::make_exception_ptr(
aws::aws_exception(aws_error(possible_error->get_error_type(), possible_error->get_error_message().c_str(), should_retry))));
}
if (expected && rep._status != *expected) {
throw seastar::httpd::unexpected_status_error(rep._status);
}
std::exception_ptr eptr;
try {
// We need to be able to simulate a retry in s3 tests
if (utils::get_local_injector().enter("s3_client_fail_authorization")) {
throw aws::aws_exception(
aws::aws_error{aws::aws_error_type::HTTP_UNAUTHORIZED, "EACCESS fault injected to simulate authorization failure", utils::http::retryable::no});
}
co_return co_await handler(rep, std::move(_in));
} catch (...) {
eptr = std::current_exception();
}
if (eptr) {
co_await coroutine::return_exception_ptr(std::make_exception_ptr(aws::aws_exception(aws_error::from_exception_ptr(eptr))));
}
};
}
future<> client::make_request(http::request req,
http::experimental::client::reply_handler handle,
std::optional<http::reply::status_type> expected,
seastar::abort_source* as) {
return make_request(
std::move(req), std::move(handle), *_retry_strategy, [](std::exception_ptr ex) {
map_s3_client_exception(std::move(ex));
}, expected, as);
}
future<> client::make_request(http::request req,
http::experimental::client::reply_handler handle,
const http::experimental::retry_strategy& rs,
error_handler err_handler,
std::optional<http::reply::status_type> expected,
seastar::abort_source* as) {
auto request = std::move(req);
auto handler = wrap_handler(request, std::move(handle), expected);
co_await authorize(request);
auto& gc = find_or_create_client();
co_await gc.http.make_request(request, handler, rs, std::nullopt, as).handle_exception([err_handler = std::move(err_handler)](auto ex) {
err_handler(std::move(ex));
});
}
future<> client::make_request(http::request req,
reply_handler_ext handle_ex,
const http::experimental::retry_strategy& rs,
error_handler err_handler,
std::optional<http::reply::status_type> expected,
seastar::abort_source* as) {
auto& gc = find_or_create_client();
auto handle = [&gc, handle = std::move(handle_ex)] (const http::reply& rep, input_stream<char>&& in) {
return handle(gc, rep, std::move(in));
};
return make_request(std::move(req), std::move(handle), rs, std::move(err_handler), expected, as);
}
future<> client::make_request(http::request req, reply_handler_ext handle_ex, std::optional<http::reply::status_type> expected, seastar::abort_source* as) {
return make_request(
std::move(req), std::move(handle_ex), *_retry_strategy, [](std::exception_ptr ex) {
map_s3_client_exception(std::move(ex));
}, expected, as);
}
future<> client::get_object_header(sstring object_name, http::experimental::client::reply_handler handler, seastar::abort_source* as) {
s3l.trace("HEAD {}", object_name);
auto req = http::request::make("HEAD", _host, object_name);
return make_request(std::move(req), std::move(handler), http::reply::status_type::ok, as);
}
future<uint64_t> client::get_object_size(sstring object_name, seastar::abort_source* as) {
uint64_t len = 0;
co_await get_object_header(std::move(object_name), [&len] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
len = rep.content_length;
return make_ready_future<>(); // it's HEAD with no body
}, as);
co_return len;
}
// TODO: possibly move this to seastar's http subsystem.
static std::time_t parse_http_last_modified_time(const sstring& object_name, sstring last_modified) {
std::tm tm = {0};
// format conforms to HTTP-date, defined in the specification (RFC 7231).
if (strptime(last_modified.c_str(), "%a, %d %b %Y %H:%M:%S %Z", &tm) == nullptr) {
s3l.warn("Unable to parse {} as Last-Modified for {}", last_modified, object_name);
} else {
s3l.trace("Successfully parsed {} as Last-modified for {}", last_modified, object_name);
}
return std::mktime(&tm);
}
future<stats> client::get_object_stats(sstring object_name, seastar::abort_source* as) {
struct stats st{};
co_await get_object_header(object_name, [&] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
st.size = rep.content_length;
st.last_modified = parse_http_last_modified_time(object_name, rep.get_header("Last-Modified"));
return make_ready_future<>();
}, as);
co_return st;
}
static rapidxml::xml_node<>* first_node_of(rapidxml::xml_node<>* root,
std::initializer_list<std::string_view> names) {
SCYLLA_ASSERT(root);
auto* node = root;
for (auto name : names) {
node = node->first_node(name.data(), name.size());
if (!node) {
throw std::runtime_error(fmt::format("'{}' is not found", name));
}
}
return node;
}
static tag_set parse_tagging(sstring& body) {
auto doc = std::make_unique<rapidxml::xml_document<>>();
try {
doc->parse<0>(body.data());
} catch (const rapidxml::parse_error& e) {
s3l.warn("cannot parse tagging response: {}", e.what());
throw std::runtime_error("cannot parse tagging response");
}
tag_set tags;
auto tagset_node = first_node_of(doc.get(), {"Tagging", "TagSet"});
for (auto tag_node = tagset_node->first_node("Tag"); tag_node; tag_node = tag_node->next_sibling()) {
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_Tag.html,
// both "Key" and "Value" are required, but we still need to check them.
auto key = tag_node->first_node("Key");
if (!key) {
throw std::runtime_error("'Key' missing in 'Tag'");
}
auto value = tag_node->first_node("Value");
if (!value) {
throw std::runtime_error("'Value' missing in 'Tag'");
}
tags.emplace_back(tag{key->value(), value->value()});
}
return tags;
}
future<tag_set> client::get_object_tagging(sstring object_name, seastar::abort_source* as) {
// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html
auto req = http::request::make("GET", _host, object_name);
req.set_query_param("tagging", "");
s3l.trace("GET {} tagging", object_name);
tag_set tags;
co_await make_request(std::move(req),
[&tags] (const http::reply& reply, input_stream<char>&& in) mutable -> future<> {
auto& retval = tags;
auto input = std::move(in);
auto body = co_await util::read_entire_stream_contiguous(input);
retval = parse_tagging(body);
}, http::reply::status_type::ok, as);
co_return tags;
}
static auto dump_tagging(const tag_set& tags) {
// print the tags as an XML as defined by the API definition.
fmt::memory_buffer body;
fmt::format_to(fmt::appender(body), "<Tagging><TagSet>{}</TagSet></Tagging>", fmt::join(tags, ""));
return body;
}
future<> client::put_object_tagging(sstring object_name, tag_set tagging, seastar::abort_source* as) {
// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html
auto req = http::request::make("PUT", _host, object_name);
req.set_query_param("tagging", "");
s3l.trace("PUT {} tagging", object_name);
auto body = dump_tagging(tagging);
size_t body_size = body.size();
req.write_body("xml", body_size, [body=std::move(body)] (output_stream<char>&& out) -> future<> {
auto output = std::move(out);
std::exception_ptr ex;
try {
co_await output.write(body.data(), body.size());
co_await output.flush();
} catch (...) {
ex = std::current_exception();
}
co_await output.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req), ignore_reply, http::reply::status_type::ok, as);
}
future<> client::delete_object_tagging(sstring object_name, seastar::abort_source* as) {
// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjectTagging.html
auto req = http::request::make("DELETE", _host, object_name);
req.set_query_param("tagging", "");
s3l.trace("DELETE {} tagging", object_name);
co_await make_request(std::move(req), ignore_reply, http::reply::status_type::no_content, as);
}
future<temporary_buffer<char>> client::get_object_contiguous(sstring object_name, range download_range, seastar::abort_source* as) {
auto req = http::request::make("GET", _host, object_name);
http::reply::status_type expected = http::reply::status_type::ok;
if (download_range != s3::full_range) {
if (download_range.length() == 0) {
co_return temporary_buffer<char>();
}
s3l.trace("GET {} contiguous range='{}'", object_name, download_range);
req._headers["Range"] = download_range.to_header_string();
expected = http::reply::status_type::partial_content;
} else {
s3l.trace("GET {} contiguous", object_name);
}
size_t off = 0;
std::optional<temporary_buffer<char>> ret;
co_await make_request(std::move(req), [&off, &ret, &object_name, start = s3_clock::now()] (group_client& gc, const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
auto in = std::move(in_);
ret = temporary_buffer<char>(rep.content_length);
off = 0;
s3l.trace("Consume {} bytes for {}", ret->size(), object_name);
co_await in.consume([&off, &ret] (temporary_buffer<char> buf) mutable {
if (buf.empty()) {
return make_ready_future<consumption_result<char>>(stop_consuming(std::move(buf)));
}
size_t to_copy = std::min(ret->size() - off, buf.size());
if (to_copy > 0) {
std::copy_n(buf.get(), to_copy, ret->get_write() + off);
off += to_copy;
}
return make_ready_future<consumption_result<char>>(continue_consuming());
}).then([&gc, &off, start] {
gc.read_stats.update(off, s3_clock::now() - start);
});
}, expected, as);
ret->trim(off);
s3l.trace("Consumed {} bytes of {}", off, object_name);
co_return std::move(*ret);
}
future<> client::put_object(sstring object_name, temporary_buffer<char> buf, seastar::abort_source* as) {
s3l.trace("PUT {}", object_name);
auto req = http::request::make("PUT", _host, object_name);
auto len = buf.size();
req.write_body("bin", len, [buf = std::move(buf)] (output_stream<char>&& out_) -> future<> {
auto out = std::move(out_);
std::exception_ptr ex;
try {
co_await out.write(buf.get(), buf.size());
co_await out.flush();
} catch (...) {
ex = std::current_exception();
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
gc.write_stats.update(len, s3_clock::now() - start);
return ignore_reply(rep, std::move(in));
}, http::reply::status_type::ok, as);
}
future<> client::put_object(sstring object_name, ::memory_data_sink_buffers bufs, seastar::abort_source* as) {
s3l.trace("PUT {} (buffers)", object_name);
auto req = http::request::make("PUT", _host, object_name);
auto len = bufs.size();
req.write_body("bin", len, [bufs = std::move(bufs)] (output_stream<char>&& out_) -> future<> {
auto out = std::move(out_);
std::exception_ptr ex;
try {
for (const auto& buf : bufs.buffers()) {
co_await out.write(buf.get(), buf.size());
}
co_await out.flush();
} catch (...) {
ex = std::current_exception();
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
gc.write_stats.update(len, s3_clock::now() - start);
return ignore_reply(rep, std::move(in));
}, http::reply::status_type::ok, as);
}
future<> client::delete_object(sstring object_name, seastar::abort_source* as) {
s3l.trace("DELETE {}", object_name);
auto req = http::request::make("DELETE", _host, object_name);
co_await make_request(std::move(req), ignore_reply, http::reply::status_type::no_content, as);
}
sstring parse_multipart_copy_upload_etag(sstring& body) {
auto doc = std::make_unique<rapidxml::xml_document<>>();
try {
doc->parse<0>(body.data());
} catch (const rapidxml::parse_error& e) {
s3l.warn("cannot parse multipart copy upload response: {}", e.what());
// The caller is supposed to check the etag to be empty
// and handle the error the way it prefers
return "";
}
auto root_node = doc->first_node("CopyPartResult");
auto etag_node = root_node->first_node("ETag");
return etag_node->value();
}
class client::multipart_upload {
protected:
static constexpr size_t _max_multipart_concurrency = 16;
shared_ptr<client> _client;
sstring _object_name;
sstring _upload_id;
utils::chunked_vector<sstring> _part_etags;
named_gate _bg_flushes;
std::optional<tag> _tag;
seastar::abort_source* _as;
future<> start_upload();
future<> finalize_upload();
future<> upload_part(memory_data_sink_buffers bufs);
future<> upload_part(std::unique_ptr<upload_sink> source);
future<> abort_upload();
bool upload_started() const noexcept {
return !_upload_id.empty();
}
multipart_upload(shared_ptr<client> cln, sstring object_name, std::optional<tag> tag, seastar::abort_source* as)
: _client(std::move(cln))
, _object_name(std::move(object_name))
, _bg_flushes("s3::client::multipart_upload::bg_flushes")
, _tag(std::move(tag))
, _as(as)
{
}
public:
unsigned parts_count() const noexcept { return _part_etags.size(); }
};
class client::copy_s3_object final : multipart_upload {
public:
copy_s3_object(shared_ptr<client> cln, sstring source_object, sstring target_object, size_t part_size, std::optional<tag> tag, abort_source* as)
: multipart_upload(std::move(cln), std::move(target_object), std::move(tag), as)
, _max_copy_part_size(part_size)
, _source_object(std::move(source_object)) {
assert(_max_copy_part_size > 0 && _max_copy_part_size <= _default_copy_part_size);
}
copy_s3_object(shared_ptr<client> cln, sstring source_object, sstring target_object, std::optional<tag> tag, abort_source* as)
: copy_s3_object(std::move(cln), std::move(source_object), std::move(target_object), _default_copy_part_size, std::move(tag), as) {}
future<> copy() {
auto source_size = co_await _client->get_object_size(_source_object);
if (source_size <= _max_copy_part_size) {
co_await copy_put();
} else {
co_await copy_multipart(source_size);
}
}
private:
future<> copy_put() {
auto req = http::request::make("PUT", _client->_host, _object_name);
if (_tag) {
req._headers["x-amz-tagging"] = seastar::format("{}={}", _tag->key, _tag->value);
}
req._headers["x-amz-copy-source"] = _source_object;
co_await _client->make_request(std::move(req), ignore_reply, http::reply::status_type::ok, _as);
}
future<> copy_multipart(size_t source_size) {
co_await start_upload();
auto part_size = _max_copy_part_size;
std::exception_ptr ex;
try {
auto parts = std::views::iota(size_t{0}, (source_size + part_size - 1) / part_size);
_part_etags.resize(parts.size());
co_await max_concurrent_for_each(parts,
_max_multipart_concurrency,
[part_size, source_size, this](auto part_num) -> future<> {
auto part_offset = part_num * part_size;
auto actual_part_size = std::min(source_size - part_offset, part_size);
co_await copy_part(part_offset, actual_part_size, part_num);
});
// Here we are going to finalize the upload and close the _bg_flushes, in case an exception is thrown the
// gate will be closed and the upload will be aborted. See below.
co_await finalize_upload();
} catch (...) {
ex = std::current_exception();
}
if (ex) {
if (!_bg_flushes.is_closed()) {
co_await _bg_flushes.close();
}
co_await abort_upload();
std::rethrow_exception(ex);
}
}
future<> copy_part(size_t offset, size_t part_size, size_t part_number) {
auto req = http::request::make("PUT", _client->_host, _object_name);
req._headers["x-amz-copy-source"] = _source_object;
auto range = format("bytes={}-{}", offset, offset + part_size - 1);
s3l.trace("PUT part {}, Upload range: {}, Upload ID:", part_number, range, _upload_id);
req._headers["x-amz-copy-source-range"] = range;
req.set_query_param("partNumber", to_sstring(part_number + 1));
req.set_query_param("uploadId", _upload_id);
co_await _client->make_request(std::move(req),[this, part_number, start = s3_clock::now()](group_client& gc, const http::reply& reply, input_stream<char>&& in) -> future<> {
auto _in = std::move(in);
auto body = co_await util::read_entire_stream_contiguous(_in);
auto etag = parse_multipart_copy_upload_etag(body);
if (etag.empty()) {
throw std::runtime_error("Cannot parse ETag");
}
s3l.trace("Part data -> etag = {} (upload id {})", part_number, etag, _upload_id);
_part_etags[part_number] = std::move(etag);
},http::reply::status_type::ok, _as)
.handle_exception([this, part_number](auto ex) {
s3l.warn("Failed to upload part {}, upload id {}. Reason: {}", part_number, _upload_id, ex);
});
co_return;
}
static constexpr size_t _default_copy_part_size = 5_GiB;
const size_t _max_copy_part_size;
sstring _source_object;
};
future<> client::copy_object(sstring source_object, sstring target_object, std::optional<size_t> part_size, std::optional<tag> tag, seastar::abort_source*) {
if (!part_size)
co_return co_await copy_s3_object(shared_from_this(), std::move(source_object), std::move(target_object), tag, nullptr).copy();
co_return co_await copy_s3_object(shared_from_this(), std::move(source_object), std::move(target_object), part_size.value(), tag, nullptr).copy();
}
class client::upload_sink_base : public multipart_upload, public data_sink_impl {
public:
upload_sink_base(shared_ptr<client> cln, sstring object_name, std::optional<tag> tag, seastar::abort_source* as)
: multipart_upload(std::move(cln), std::move(object_name), std::move(tag), as)
{
}
virtual future<> close() override;
virtual size_t buffer_size() const noexcept override {
return 128 * 1024;
}
};
sstring parse_multipart_upload_id(sstring& body) {
auto get_node_safe = []<typename T>(const T* node, const std::string_view node_name) {
auto child = node->first_node(node_name.data());
if (!child) {
throw std::runtime_error(seastar::format("'{}' node is missing in InitiateMultipartUploadResult response", node_name));
}
return child;
};
auto doc = std::make_unique<rapidxml::xml_document<>>();
try {
doc->parse<0>(body.data());
} catch (const rapidxml::parse_error& e) {
s3l.warn("cannot parse initiate multipart upload response: {}", e.what());
// The caller is supposed to check the upload-id to be empty
// and handle the error the way it prefers
return "";
}
auto root_node = get_node_safe(doc.get(), "InitiateMultipartUploadResult");
auto uploadid_node = get_node_safe(root_node, "UploadId");
return uploadid_node->value();
}
static constexpr std::string_view multipart_upload_complete_header =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\r\n"
"<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">";
static constexpr std::string_view multipart_upload_complete_entry =
"<Part><ETag>{}</ETag><PartNumber>{}</PartNumber></Part>";
static constexpr std::string_view multipart_upload_complete_trailer =
"</CompleteMultipartUpload>";
unsigned prepare_multipart_upload_parts(const utils::chunked_vector<sstring>& etags) {
unsigned ret = multipart_upload_complete_header.size();
unsigned nr = 1;
for (auto& etag : etags) {
if (etag.empty()) {
// 0 here means some part failed to upload, see comment in upload_part()
// Caller checks it an aborts the multipart upload altogether
return 0;
}
// length of the format string - four braces + length of the etag + length of the number
ret += multipart_upload_complete_entry.size() - 4 + etag.size() + format("{}", nr).size();
nr++;
}
ret += multipart_upload_complete_trailer.size();
return ret;
}
future<> dump_multipart_upload_parts(output_stream<char> out, const utils::chunked_vector<sstring>& etags) {
std::exception_ptr ex;
try {
co_await out.write(multipart_upload_complete_header.data(), multipart_upload_complete_header.size());
unsigned nr = 1;
for (auto& etag : etags) {
SCYLLA_ASSERT(!etag.empty());
co_await out.write(format(multipart_upload_complete_entry.data(), etag, nr));
nr++;
}
co_await out.write(multipart_upload_complete_trailer.data(), multipart_upload_complete_trailer.size());
co_await out.flush();
} catch (...) {
ex = std::current_exception();
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
}
future<> client::multipart_upload::start_upload() {
s3l.trace("POST uploads {} (tag {})", _object_name, seastar::value_of([this] { return _tag ? _tag->key + "=" + _tag->value : "none"; }));
auto rep = http::request::make("POST", _client->_host, _object_name);
rep.set_query_param("uploads", "");
if (_tag) {
rep._headers["x-amz-tagging"] = seastar::format("{}={}", _tag->key, _tag->value);
}
co_await _client->make_request(std::move(rep), [this] (const http::reply& rep, input_stream<char>&& in_) -> future<> {
auto in = std::move(in_);
auto body = co_await util::read_entire_stream_contiguous(in);
_upload_id = parse_multipart_upload_id(body);
if (_upload_id.empty()) {
co_await coroutine::return_exception(std::runtime_error("cannot initiate upload"));
}
s3l.trace("created uploads for {} -> id = {}", _object_name, _upload_id);
}, http::reply::status_type::ok, _as);
}
future<> client::multipart_upload::upload_part(memory_data_sink_buffers bufs) {
if (!upload_started()) {
co_await start_upload();
}
auto claim = co_await _client->claim_memory(bufs.size(), _as);
unsigned part_number = _part_etags.size();
_part_etags.emplace_back();
s3l.trace("PUT part {} {} bytes in {} buffers (upload id {})", part_number, bufs.size(), bufs.buffers().size(), _upload_id);
auto req = http::request::make("PUT", _client->_host, _object_name);
auto size = bufs.size();
req._headers["Content-Length"] = seastar::format("{}", size);
req.set_query_param("partNumber", seastar::format("{}", part_number + 1));
req.set_query_param("uploadId", _upload_id);
req.write_body("bin", size, [this, part_number, bufs = std::move(bufs), p = std::move(claim)] (output_stream<char>&& out_) mutable -> future<> {
auto out = std::move(out_);
std::exception_ptr ex;
s3l.trace("upload {} part data (upload id {})", part_number, _upload_id);
try {
for (auto&& buf : bufs.buffers()) {
co_await out.write(buf.get(), buf.size());
}
co_await out.flush();
} catch (...) {
ex = std::current_exception();
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
// note: At this point the buffers are sent, but the response is not yet
// received. However, claim is released and next part may start uploading
});
// Do upload in the background so that several parts could go in parallel.
// The gate lets the finalize_upload() wait in any background activity
// before checking the progress.
//
// Upload parallelizm is managed per-sched-group -- client maintains a set
// of http clients each with its own max-connections. When upload happens it
// will naturally be limited with the relevant http client's connections
// limit not affecting other groups' requests concurrency
//
// In case part upload goes wrong and doesn't happen, the _part_etags[part]
// is not set, so the finalize_upload() sees it and aborts the whole thing.
auto gh = _bg_flushes.hold();
(void)_client->make_request(std::move(req), [this, size, part_number, start = s3_clock::now()] (group_client& gc, const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
auto etag = rep.get_header("ETag");
s3l.trace("uploaded {} part data -> etag = {} (upload id {})", part_number, etag, _upload_id);
_part_etags[part_number] = std::move(etag);
gc.write_stats.update(size, s3_clock::now() - start);
return make_ready_future<>();
}, http::reply::status_type::ok, _as).handle_exception([this, part_number] (auto ex) {
// ... the exact exception only remains in logs
s3l.warn("couldn't upload part {}: {} (upload id {})", part_number, ex, _upload_id);
}).finally([gh = std::move(gh)] {});
}
future<> client::multipart_upload::abort_upload() {
s3l.trace("DELETE upload {}", _upload_id);
auto req = http::request::make("DELETE", _client->_host, _object_name);
req.set_query_param("uploadId", std::exchange(_upload_id, "")); // now upload_started() returns false
co_await _client->make_request(std::move(req), ignore_reply, http::reply::status_type::no_content)
.handle_exception([this](const std::exception_ptr& ex) -> future<> {
// Here we discard whatever exception is thrown when aborting multipart upload since we don't care about cleanly aborting it since there are other
// means to clean up dangling parts, for example `rclone cleanup` or S3 bucket's Lifecycle Management Policy
s3l.warn("Failed to abort multipart upload. Object: '{}'. Reason: {})", _object_name, ex);
co_return;
});
}
future<> client::multipart_upload::finalize_upload() {
s3l.trace("wait for {} parts to complete (upload id {})", _part_etags.size(), _upload_id);
co_await _bg_flushes.close();
unsigned parts_xml_len = prepare_multipart_upload_parts(_part_etags);
if (parts_xml_len == 0) {
co_await coroutine::return_exception(std::runtime_error("Failed to parse ETag list. Aborting multipart upload."));
}
s3l.trace("POST upload completion {} parts (upload id {})", _part_etags.size(), _upload_id);
auto req = http::request::make("POST", _client->_host, _object_name);
req.set_query_param("uploadId", _upload_id);
req.write_body("xml", parts_xml_len, [this] (output_stream<char>&& out) -> future<> {
return dump_multipart_upload_parts(std::move(out), _part_etags);
});
// If this request fails, finalize_upload() throws, the upload should then
// be aborted in .close() method
co_await _client->make_request(std::move(req), [](const http::reply& rep, input_stream<char>&& in) -> future<> {
auto payload = std::move(in);
auto status_class = http::reply::classify_status(rep._status);
std::optional<aws::aws_error> possible_error = aws::aws_error::parse(co_await util::read_entire_stream_contiguous(payload));
if (possible_error) {
co_await coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
}
if (status_class != http::reply::status_class::informational && status_class != http::reply::status_class::success) {
co_await coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
}
if (rep._status != http::reply::status_type::ok) {
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
}
// If we reach this point it means the request succeeded. However, the body payload was already consumed, so no response handler was invoked. At
// this point it is ok since we are not interested in parsing this particular response
}, http::reply::status_type::ok);
_upload_id = ""; // now upload_started() returns false
}
future<> client::upload_sink_base::close() {
if (upload_started()) {
s3l.warn("closing incomplete multipart upload -> aborting");
// If we got here, we need to pick up any background activity as it may
// still trying to handle successful request and 'this' should remain alive
//
// The gate might have been closed by finalize_upload() so need to avoid
// double close
if (!_bg_flushes.is_closed()) {
co_await _bg_flushes.close();
}
co_await abort_upload();
} else {
s3l.trace("closing multipart upload");
}
}
class client::upload_sink final : public client::upload_sink_base {
memory_data_sink_buffers _bufs;
future<> maybe_flush() {
if (_bufs.size() >= aws_minimum_part_size) {
co_await upload_part(std::move(_bufs));
}
}
public:
upload_sink(shared_ptr<client> cln, sstring object_name, std::optional<tag> tag = {}, seastar::abort_source* as = nullptr)
: upload_sink_base(std::move(cln), std::move(object_name), std::move(tag), as)
{}
virtual future<> put(std::span<temporary_buffer<char>> data) override {
append_buffers(_bufs, std::move(data));
return maybe_flush();
}
virtual future<> flush() override {
if (_bufs.size() != 0) {
// This is handy for small objects that are uploaded via the sink. It makes
// upload happen in one REST call, instead of three (create + PUT + wrap-up)
if (!upload_started()) {
s3l.trace("Sink fallback to plain PUT for {}", _object_name);
co_return co_await _client->put_object(_object_name, std::move(_bufs));
}
co_await upload_part(std::move(_bufs));
}
if (upload_started()) {
std::exception_ptr ex;
try {
co_await finalize_upload();
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_await abort_upload();
std::rethrow_exception(ex);
}
}
}
};
future<> client::multipart_upload::upload_part(std::unique_ptr<upload_sink> piece_ptr) {
if (!upload_started()) {
co_await start_upload();
}
auto& piece = *piece_ptr;
unsigned part_number = _part_etags.size();
_part_etags.emplace_back();
s3l.trace("PUT part {} from {} (upload id {})", part_number, piece._object_name, _upload_id);
auto req = http::request::make("PUT", _client->_host, _object_name);
req.set_query_param("partNumber", format("{}", part_number + 1));
req.set_query_param("uploadId", _upload_id);
req._headers["x-amz-copy-source"] = piece._object_name;
// See comment in upload_part(memory_data_sink_buffers) overload regarding the
// _bg_flushes usage and _part_etags assignments
//
// Before the piece's object can be copied into the target one, it should be
// flushed and closed. After the object is copied, it can be removed. If copy
// goes wrong, the object should be removed anyway.
auto gh = _bg_flushes.hold();
(void)piece.flush().then([&piece] () {
return piece.close();
}).then([this, part_number, req = std::move(req)] () mutable {
return _client->make_request(std::move(req), [this, part_number] (const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
return do_with(std::move(in_), [this, part_number] (auto& in) mutable {
return util::read_entire_stream_contiguous(in).then([this, part_number] (auto body) mutable {
auto etag = parse_multipart_copy_upload_etag(body);
if (etag.empty()) {
return make_exception_future<>(std::runtime_error("cannot copy part upload"));
}
s3l.trace("copy-uploaded {} part data -> etag = {} (upload id {})", part_number, etag, _upload_id);
_part_etags[part_number] = std::move(etag);
return make_ready_future<>();
});
});
}, http::reply::status_type::ok, _as).handle_exception([this, part_number] (auto ex) {
// ... the exact exception only remains in logs
s3l.warn("couldn't copy-upload part {}: {} (upload id {})", part_number, ex, _upload_id);
});
}).then_wrapped([this, &piece] (auto f) {
if (f.failed()) {
s3l.warn("couldn't flush piece {}: {} (upload id {})", piece._object_name, f.get_exception(), _upload_id);
}
return _client->delete_object(piece._object_name).handle_exception([&piece] (auto ex) {
s3l.warn("failed to remove copy-upload piece {}", piece._object_name);
});
}).finally([gh = std::move(gh), piece_ptr = std::move(piece_ptr)] {});
}
class client::upload_jumbo_sink final : public upload_sink_base {
static constexpr tag piece_tag = { .key = "kind", .value = "piece" };
const unsigned _maximum_parts_in_piece;
std::unique_ptr<upload_sink> _current;
future<> maybe_flush() {
if (_current->parts_count() >= _maximum_parts_in_piece) {
auto next = std::make_unique<upload_sink>(_client, format("{}_{}", _object_name, parts_count() + 1), piece_tag);
co_await upload_part(std::exchange(_current, std::move(next)));
s3l.trace("Initiated {} piece (upload_id {})", parts_count(), _upload_id);
}
}
public:
upload_jumbo_sink(shared_ptr<client> cln, sstring object_name, std::optional<unsigned> max_parts_per_piece, seastar::abort_source* as)
: upload_sink_base(std::move(cln), std::move(object_name), std::nullopt, as)
, _maximum_parts_in_piece(max_parts_per_piece.value_or(aws_maximum_parts_in_piece))
, _current(std::make_unique<upload_sink>(_client, format("{}_{}", _object_name, parts_count()), piece_tag))
{}
virtual future<> put(std::span<temporary_buffer<char>> data) override {
co_await _current->put(std::move(data));
co_await maybe_flush();
}
virtual future<> flush() override {
if (_current) {
co_await upload_part(std::exchange(_current, nullptr));
}
if (upload_started()) {
std::exception_ptr ex;
try {
co_await finalize_upload();
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_await abort_upload();
std::rethrow_exception(ex);
}
}
}
virtual future<> close() override {
if (_current) {
co_await _current->close();
_current.reset();
}
co_await upload_sink_base::close();
}
};
data_sink client::make_upload_sink(sstring object_name, seastar::abort_source* as) {
return data_sink(std::make_unique<upload_sink>(shared_from_this(), std::move(object_name), std::nullopt, as));
}
data_sink client::make_upload_jumbo_sink(sstring object_name, std::optional<unsigned> max_parts_per_piece, seastar::abort_source* as) {
return data_sink(std::make_unique<upload_jumbo_sink>(shared_from_this(), std::move(object_name), max_parts_per_piece, as));
}
class client::chunked_download_source final : public seastar::data_source_impl {
struct claimed_buffer {
temporary_buffer<char> _buffer;
std::optional<semaphore_units<>> _claimed_memory;
claimed_buffer(temporary_buffer<char>&& buf, std::optional<semaphore_units<>>&& claimed_memory) noexcept
: _buffer(std::move(buf)), _claimed_memory(std::move(claimed_memory)) {}
};
struct content_range {
uint64_t start;
uint64_t end;
uint64_t total;
};
shared_ptr<client> _client;
sstring _object_name;
seastar::abort_source* _as;
range _range;
static constexpr size_t _socket_buff_size = 128_KiB;
static constexpr size_t _max_buffers_size = 5_MiB;
static constexpr double _buffers_low_watermark = 0.5;
static constexpr double _buffers_high_watermark = 0.9;
std::deque<claimed_buffer> _buffers;
size_t _buffers_size = 0;
bool _is_finished = false;
bool _is_contiguous_mode = false;
condition_variable _bg_fiber_cv;
condition_variable _get_cv;
future<> _filling_fiber = make_ready_future<>();
static content_range parse_content_range(const std::string& header) {
std::regex pattern(R"(bytes (\d+)-(\d+)/(\d+))");
std::smatch match;
if (std::regex_match(header, match, pattern)) {
return {std::stoull(match[1].str()), std::stoull(match[2].str()), std::stoull(match[3].str())};
}
throw std::runtime_error("Invalid Content-Range header format");
}
future<> make_filling_fiber() {
seastar::http::experimental::no_retry_strategy no_retry;
s3l.trace("Fiber starts cycle for object '{}'", _object_name);
while (!_is_finished) {
try {
if (!_is_finished && _buffers_size >= _max_buffers_size * _buffers_low_watermark) {
co_await _bg_fiber_cv.when([this] { return _is_finished || (_buffers_size < _max_buffers_size * _buffers_low_watermark); });
}
if (auto units = try_get_units(_client->_memory, _socket_buff_size); !_is_finished && !_buffers.empty() && !units) {
auto& gc = _client->find_or_create_client();
++gc.downloads_blocked_on_memory;
co_await _bg_fiber_cv.when([this] {
return _is_finished || _buffers.empty() || try_get_units(_client->_memory, _socket_buff_size);
});
}
if (_is_finished) {
s3l.trace("Abandoning fiber for object '{}'", _object_name);
co_return;
}
range current_range{0};
if (_range == s3::full_range) {
current_range = {0, _max_buffers_size};
s3l.trace(
"No download range for object '{}' was provided. Setting the download range to `_max_buffers_size` {}", _object_name, current_range);
} else if (_is_contiguous_mode) {
current_range = _range;
s3l.trace("Setting contiguous download mode for '{}', range: {}", _object_name, current_range);
} else {
// In non-contiguous mode we download the object in chunks of _max_buffers_size
current_range = {_range.offset(), std::min(_range.length(), _max_buffers_size - _buffers_size)};
s3l.trace("Setting ranged download mode for '{}', range: {}", _object_name, current_range);
}
if (current_range.length() == 0) {
s3l.trace("Fiber for object '{}' completed downloading, signals EOS and leaving fiber", _object_name);
_buffers.emplace_back(temporary_buffer<char>(), co_await _client->claim_memory(0, _as));
_get_cv.signal();
_is_finished = true;
co_return;
}
auto req = http::request::make("GET", _client->_host, _object_name);
req._headers["Range"] = current_range.to_header_string();
s3l.trace("Fiber for object '{}' will make HTTP request within range {}", _object_name, current_range);
co_await _client->make_request(
std::move(req),
[this, start = s3_clock::now(), pf_length = current_range.length()](group_client& gc, const http::reply& reply, input_stream<char>&& in_) mutable -> future<> {
if (reply._status != http::reply::status_type::ok && reply._status != http::reply::status_type::partial_content) {
s3l.warn("Fiber for object '{}' failed: {}. Exiting", _object_name, reply._status);
throw httpd::unexpected_status_error(reply._status);
}
gc.read_stats.ops++;
gc.read_stats.duration += s3_clock::now() - start;
gc.prefetch_bytes += pf_length;
if (_range == s3::full_range && !reply.get_header("Content-Range").empty()) {
auto content_range_header = parse_content_range(reply.get_header("Content-Range"));
_range = range{content_range_header.start, content_range_header.total};
s3l.trace("No range for object '{}' was provided. Setting the range to {} from the Content-Range header", _object_name, _range);
}
auto in = std::move(in_);
while (_buffers_size < _max_buffers_size && !_is_finished) {
utils::get_local_injector().inject("kill_s3_inflight_req", [] {
// Inject non-retryable error to emulate source failure
throw aws::aws_exception(aws::aws_error(aws::aws_error_type::RESOURCE_NOT_FOUND, "Injected ResourceNotFound", utils::http::retryable::no));
});
s3l.trace("Fiber for object '{}' will try to read within range {}", _object_name, _range);
temporary_buffer<char> buf;
auto units = try_get_units(_client->_memory, _socket_buff_size);
if (_buffers.empty() || units) {
buf = co_await in.read();
assert(buf.size() <= _socket_buff_size);
if (units) {
units->return_units(_socket_buff_size - buf.size());
}
} else {
break;
}
auto buff_size = buf.size();
gc.read_stats.bytes += buff_size;
_range += buff_size;
_buffers_size += buff_size;
if (buff_size == 0 && _range.length() == 0) {
s3l.trace("Fiber for object '{}' signals EOS", _object_name);
_buffers.emplace_back(std::move(buf), std::move(units));
_get_cv.signal();
_is_finished = true;
break;
}
if (buff_size == 0) {
// The requested range is fully downloaded
break;
}
s3l.trace("Fiber for object '{}' pushes {} bytes buffer", _object_name, buff_size);
_buffers.emplace_back(std::move(buf), std::move(units));
_get_cv.signal();
utils::get_local_injector().inject("break_s3_inflight_req", [] {
// Inject a non-`aws_error` after partial data download to verify proper
// handling and that the fiber retries missing chunks
throw std::system_error(ECONNRESET, std::system_category());
});
}
co_await in.close();
},
no_retry,
[](std::exception_ptr ex) { std::rethrow_exception(std::move(ex)); },
{},
_as);
_is_contiguous_mode = _buffers_size < _max_buffers_size * _buffers_high_watermark;
} catch (...) {
auto ex = std::current_exception();
auto aws_ex = aws::aws_error::from_exception_ptr(ex);
if (!aws_ex.is_retryable()) {
s3l.info("Fiber for object '{}' failed: {}, exiting", _object_name, ex);
_get_cv.broken(ex);
co_return;
}
}
}
s3l.trace("Fiber for object '{}' completed", _object_name);
}
public:
chunked_download_source(shared_ptr<client> cln, sstring object_name, range range, seastar::abort_source* as)
: _client(std::move(cln)), _object_name(std::move(object_name)), _as(as), _range(range) {
s3l.trace("Constructing chunked_download_source for object '{}'", _object_name);
_filling_fiber = make_filling_fiber();
}
future<temporary_buffer<char>> get() override {
while (true) {
if (!_buffers.empty()) {
auto claimed_buff = std::move(_buffers.front());
_buffers.pop_front();
_buffers_size -= claimed_buff._buffer.size();
if (_buffers_size < _max_buffers_size * _buffers_low_watermark) {
_bg_fiber_cv.signal();
}
s3l.trace("get() for object '{}' popped buffer of {} bytes", _object_name, claimed_buff._buffer.size());
co_return std::move(claimed_buff._buffer);
}
_bg_fiber_cv.signal();
s3l.trace("get() for object '{}' waiting for buffer", _object_name);
co_await _get_cv.wait();
}
}
future<> close() override {
_is_finished = true;
_bg_fiber_cv.broadcast();
_get_cv.broadcast();
s3l.trace("Closing chunked_download_source for object '{}'", _object_name);
co_await std::move(_filling_fiber);
}
};
data_source client::make_chunked_download_source(sstring object_name, range range, seastar::abort_source* as) {
return data_source(std::make_unique<chunked_download_source>(shared_from_this(), std::move(object_name), range, as));
}
class client::download_source final : public seastar::data_source_impl {
shared_ptr<client> _client;
sstring _object_name;
seastar::abort_source* _as;
range _range;
struct external_body {
input_stream<char>& b;
promise<> done;
external_body(input_stream<char>& b_) noexcept : b(b_), done() {}
};
std::optional<external_body> _body;
named_gate _bg;
future<external_body> request_body();
public:
download_source(shared_ptr<client> cln, sstring object_name, range download_range, seastar::abort_source* as)
: _client(std::move(cln))
, _object_name(std::move(object_name))
, _as(as)
, _range(std::move(download_range))
, _bg("s3::client::download_source")
{
}
virtual future<temporary_buffer<char>> get() override;
virtual future<> close() override {
if (_body.has_value()) {
_body->done.set_value();
_body.reset();
}
return _bg.close();
}
};
data_source client::make_download_source(sstring object_name, range download_range, seastar::abort_source* as) {
return data_source(std::make_unique<download_source>(shared_from_this(), std::move(object_name), download_range, as));
}
auto client::download_source::request_body() -> future<external_body> {
auto req = http::request::make("GET", _client->_host, _object_name);
s3l.trace("GET {} download range {}", _object_name, _range);
req._headers["Range"] = _range.to_header_string();
auto bp = std::make_unique<std::optional<promise<external_body>>>(std::in_place);
auto& p = *bp;
future<external_body> f = p->get_future();
(void)_client->make_request(std::move(req), [this, &p] (group_client& gc, const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
s3l.trace("GET {} got the body ({} {} bytes)", _object_name, rep._status, rep.content_length);
if (rep._status != http::reply::status_type::partial_content && rep._status != http::reply::status_type::ok) {
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
}
auto in = std::move(in_);
external_body xb(in);
auto f = xb.done.get_future();
p->set_value(std::move(xb));
p.reset();
co_await std::move(f);
}, {}, _as).handle_exception([&p] (auto ex) {
if (p.has_value()) {
p->set_exception(std::move(ex));
}
}).finally([bp = std::move(bp), h = _bg.hold()] {});
return f;
}
future<temporary_buffer<char>> client::download_source::get() {
while (true) {
if (_body.has_value()) {
try {
temporary_buffer<char> buf = co_await _body->b.read_up_to(_range.length());
_range += buf.size();
s3l.trace("GET {} got the {}-bytes buffer", _object_name, buf.size());
if (buf.empty()) {
_body->done.set_value();
_body.reset();
}
co_return std::move(buf);
} catch (...) {
s3l.trace("GET {} error reading body, completing it and re-trying", _object_name);
_body->done.set_exception(std::current_exception());
_body.reset();
}
}
auto xb = co_await request_body();
_body.emplace(std::move(xb));
}
}
// unlike upload_sink and upload_jumbo_sink, do_upload_file reads from the
// specified file, and sends the data read from disk right away to the wire,
// without accumulating them first.
class client::do_upload_file : private multipart_upload {
const std::filesystem::path _path;
size_t _part_size;
upload_progress& _progress;
// each time, we read up to transmit size from disk.
// this is also an option which limits the number of multipart upload tasks.
//
// connected_socket::output() uses 8 KiB for its buffer_size, and
// file_input_stream_options.buffer_size is also 8 KiB, taking the
// read-ahead into consideration, for maximizing the throughput,
// we use 64K buffer size.
static constexpr size_t _transmit_size = 64_KiB;
static file_input_stream_options input_stream_options() {
// optimized for throughput
return {
.buffer_size = 128_KiB,
.read_ahead = 4,
};
}
// transmit data from input to output in chunks sized up to unit_size
static future<> copy_to(input_stream<char> input,
output_stream<char> output,
size_t unit_size,
upload_progress& progress) {
std::exception_ptr ex;
try {
for (;;) {
auto buf = co_await input.read_up_to(unit_size);
if (buf.empty()) {
break;
}
const size_t buf_size = buf.size();
co_await output.write(std::move(buf));
progress.uploaded += buf_size;
}
co_await output.flush();
} catch (...) {
ex = std::current_exception();
}
co_await output.close();
co_await input.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
}
future<> upload_part(file f, uint64_t offset, uint64_t part_size, uint64_t part_number) {
// upload a part in a multipart upload, see
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html
auto mem_units = co_await _client->claim_memory(_transmit_size, _as);
auto req = http::request::make("PUT", _client->_host, _object_name);
req._headers["Content-Length"] = to_sstring(part_size);
req.set_query_param("partNumber", to_sstring(part_number + 1));
req.set_query_param("uploadId", _upload_id);
s3l.trace("PUT part {}, {} bytes (upload id {})", part_number, part_size, _upload_id);
req.write_body("bin", part_size, [f=std::move(f), mem_units=std::move(mem_units), offset, part_size, &progress = _progress] (output_stream<char>&& out_) {
auto input = make_file_input_stream(f, offset, part_size, input_stream_options());
auto output = std::move(out_);
return copy_to(std::move(input), std::move(output), _transmit_size, progress);
});
co_await _client->make_request(std::move(req), [this, part_size, part_number, start = s3_clock::now()] (group_client& gc, const http::reply& reply, input_stream<char>&& in_) mutable -> future<> {
auto etag = reply.get_header("ETag");
s3l.trace("uploaded {} part data -> etag = {} (upload id {})", part_number, etag, _upload_id);
_part_etags[part_number] = std::move(etag);
gc.write_stats.update(part_size, s3_clock::now() - start);
return make_ready_future();
}, http::reply::status_type::ok, _as).handle_exception([this, part_number] (auto ex) {
s3l.warn("couldn't upload part {}: {} (upload id {})", part_number, ex, _upload_id);
});
}
future<> multi_part_upload(file&& f, uint64_t total_size, size_t part_size) {
co_await start_upload();
std::exception_ptr ex;
try {
co_await max_concurrent_for_each(std::views::iota(size_t{0}, (total_size + part_size - 1) / part_size),
_max_multipart_concurrency,
[part_size, total_size, this, f = file{f}](auto part_num) -> future<> {
auto part_offset = part_num * part_size;
auto actual_part_size = std::min(total_size - part_offset, part_size);
s3l.trace("upload_part: {}~{}/{}", part_offset, actual_part_size, total_size);
co_await upload_part(f, part_offset, actual_part_size, part_num);
});
co_await finalize_upload();
} catch (...) {
ex = std::current_exception();
}
if (ex) {
if (!_bg_flushes.is_closed()) {
co_await _bg_flushes.close();
}
co_await abort_upload();
std::rethrow_exception(ex);
}
}
future<> put_object(file&& f, uint64_t len) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
s3l.trace("PUT {} ({})", _object_name, _path.native());
auto mem_units = co_await _client->claim_memory(_transmit_size, _as);
auto req = http::request::make("PUT", _client->_host, _object_name);
if (_tag) {
req._headers["x-amz-tagging"] = seastar::format("{}={}", _tag->key, _tag->value);
}
req.write_body("bin", len, [f = std::move(f), &progress = _progress] (output_stream<char>&& out_) {
auto input = make_file_input_stream(f, input_stream_options());
auto output = std::move(out_);
return copy_to(std::move(input), std::move(output), _transmit_size, progress);
});
co_await _client->make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
gc.write_stats.update(len, s3_clock::now() - start);
return ignore_reply(rep, std::move(in));
}, http::reply::status_type::ok, _as);
}
public:
do_upload_file(shared_ptr<client> cln,
std::filesystem::path path,
sstring object_name,
std::optional<tag> tag,
size_t part_size,
upload_progress& up,
seastar::abort_source* as)
: multipart_upload(std::move(cln), std::move(object_name), std::move(tag), as)
, _path{std::move(path)}
, _part_size(part_size)
, _progress(up)
{
}
future<> upload() {
auto f = co_await open_file_dma(_path.native(), open_flags::ro);
const auto stat = co_await f.stat();
const uint64_t file_size = stat.st_size;
_progress.total += file_size;
// use multipart upload when possible in order to transmit parts in
// parallel to improve throughput
if (file_size > aws_minimum_part_size) {
auto [num_parts, part_size] = calc_part_size(file_size, _part_size);
_part_etags.resize(num_parts);
co_await multi_part_upload(std::move(f), file_size, part_size);
} else {
// single part upload
co_await put_object(std::move(f), file_size);
}
}
};
future<> client::upload_file(std::filesystem::path path,
sstring object_name,
upload_progress& up,
seastar::abort_source* as) {
do_upload_file do_upload{shared_from_this(),
std::move(path),
std::move(object_name),
{}, 0, up, as};
co_await do_upload.upload();
}
future<> client::upload_file(std::filesystem::path path,
sstring object_name,
std::optional<tag> tag,
std::optional<size_t> part_size,
seastar::abort_source* as
) {
upload_progress noop;
do_upload_file do_upload{shared_from_this(),
std::move(path),
std::move(object_name),
std::move(tag),
part_size.value_or(0),
noop,
as};
co_await do_upload.upload();
}
class client::readable_file : public file_impl {
shared_ptr<client> _client;
sstring _object_name;
std::optional<stats> _stats;
seastar::abort_source* _as;
[[noreturn]] void unsupported() {
throw_with_backtrace<std::logic_error>("unsupported operation on s3 readable file");
}
future<> maybe_update_stats() {
if (_stats) {
return make_ready_future<>();
}
return _client->get_object_stats(_object_name).then([this] (auto st) {
_stats = std::move(st);
return make_ready_future<>();
});
}
public:
readable_file(shared_ptr<client> cln, sstring object_name, seastar::abort_source* as = nullptr)
: _client(std::move(cln))
, _object_name(std::move(object_name))
, _as(as)
{
}
virtual future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, io_intent*) override { unsupported(); }
virtual future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, io_intent*) override { unsupported(); }
virtual future<> truncate(uint64_t length) override { unsupported(); }
virtual subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) override { unsupported(); }
virtual future<> flush(void) override { return make_ready_future<>(); }
virtual future<> allocate(uint64_t position, uint64_t length) override { return make_ready_future<>(); }
virtual future<> discard(uint64_t offset, uint64_t length) override { return make_ready_future<>(); }
class readable_file_handle_impl final : public file_handle_impl {
client::handle _h;
sstring _object_name;
public:
readable_file_handle_impl(client::handle h, sstring object_name)
: _h(std::move(h))
, _object_name(std::move(object_name))
{}
virtual std::unique_ptr<file_handle_impl> clone() const override {
return std::make_unique<readable_file_handle_impl>(_h, _object_name);
}
virtual shared_ptr<file_impl> to_file() && override {
// TODO: cannot traverse abort source across shards.
return make_shared<readable_file>(std::move(_h).to_client(), std::move(_object_name), nullptr);
}
};
virtual std::unique_ptr<file_handle_impl> dup() override {
return std::make_unique<readable_file_handle_impl>(client::handle(*_client), _object_name);
}
virtual future<uint64_t> size(void) override {
return _client->get_object_size(_object_name);
}
virtual future<struct stat> stat(void) override {
co_await maybe_update_stats();
struct stat ret {};
ret.st_nlink = 1;
ret.st_mode = S_IFREG | S_IRUSR | S_IRGRP | S_IROTH;
ret.st_size = _stats->size;
ret.st_blksize = 1 << 10; // huh?
ret.st_blocks = _stats->size >> 9;
// objects are immutable on S3, therefore we can use Last-Modified to set both st_mtime and st_ctime
ret.st_mtime = _stats->last_modified;
ret.st_ctime = _stats->last_modified;
co_return ret;
}
virtual future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, io_intent*) override {
co_await maybe_update_stats();
if (pos >= _stats->size) {
co_return 0;
}
auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, len }, _as);
std::copy_n(buf.get(), buf.size(), reinterpret_cast<uint8_t*>(buffer));
co_return buf.size();
}
virtual future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, io_intent*) override {
co_await maybe_update_stats();
if (pos >= _stats->size) {
co_return 0;
}
auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, utils::iovec_len(iov) }, _as);
uint64_t off = 0;
for (auto& v : iov) {
auto sz = std::min(v.iov_len, buf.size() - off);
if (sz == 0) {
break;
}
std::copy_n(buf.get() + off, sz, reinterpret_cast<uint8_t*>(v.iov_base));
off += sz;
}
co_return off;
}
virtual future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, io_intent*) override {
co_await maybe_update_stats();
if (offset >= _stats->size) {
co_return temporary_buffer<uint8_t>();
}
auto buf = co_await _client->get_object_contiguous(_object_name, range{ offset, range_size }, _as);
co_return temporary_buffer<uint8_t>(reinterpret_cast<uint8_t*>(buf.get_write()), buf.size(), buf.release());
}
virtual future<> close() override {
return make_ready_future<>();
}
};
file client::make_readable_file(sstring object_name, seastar::abort_source* as) {
return file(make_shared<readable_file>(shared_from_this(), std::move(object_name), as));
}
future<> client::close() {
{
auto units = co_await get_units(_creds_sem, 1);
_creds_invalidation_timer.cancel();
_creds_update_timer.cancel();
}
co_await coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
co_await it.second.http.close();
});
}
client::bucket_lister::bucket_lister(shared_ptr<client> client, sstring bucket, sstring prefix, size_t objects_per_page, size_t entries_batch)
: bucket_lister(std::move(client), std::move(bucket), std::move(prefix),
[] (const fs::path& parent_dir, const directory_entry& entry) { return true; },
objects_per_page, entries_batch)
{}
client::bucket_lister::bucket_lister(shared_ptr<client> client, sstring bucket, sstring prefix, lister::filter_type filter, size_t objects_per_page, size_t entries_batch)
: _client(std::move(client))
, _bucket(std::move(bucket))
, _prefix(std::move(prefix))
, _max_keys(format("{}", objects_per_page))
, _filter(std::move(filter))
, _queue(entries_batch)
{
}
static std::pair<std::vector<sstring>, sstring> parse_list_of_objects(sstring body) {
auto doc = std::make_unique<rapidxml::xml_document<>>();
try {
doc->parse<0>(body.data());
} catch (const rapidxml::parse_error& e) {
s3l.warn("cannot parse list-objects-v2 response: {}", e.what());
throw std::runtime_error("cannot parse objects list response");
}
std::vector<sstring> names;
auto root_node = doc->first_node("ListBucketResult");
for (auto contents = root_node->first_node("Contents"); contents; contents = contents->next_sibling()) {
auto key = contents->first_node("Key");
names.push_back(key->value());
}
sstring continuation_token;
auto is_truncated = root_node->first_node("IsTruncated");
if (is_truncated && std::string_view(is_truncated->value()) == "true") {
auto continuation = root_node->first_node("NextContinuationToken");
if (!continuation) {
throw std::runtime_error("no continuation token in truncated list of objects");
}
continuation_token = continuation->value();
}
return {std::move(names), std::move(continuation_token)};
}
future<> client::bucket_lister::start_listing() {
// This is the implementation of paged ListObjectsV2 API call
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
sstring continuation_token;
do {
s3l.trace("GET /?list-type=2 (prefix={})", _prefix);
auto req = http::request::make("GET", _client->_host, format("/{}", _bucket));
req.set_query_param("list-type", "2");
req.set_query_param("max-keys", _max_keys);
if (!continuation_token.empty()) {
req.set_query_param("continuation-token", std::exchange(continuation_token, ""));
}
if (!_prefix.empty()) {
req.set_query_param("prefix", _prefix);
}
std::vector<sstring> names;
try {
co_await _client->make_request(std::move(req),
[&names, &continuation_token] (const http::reply& reply, input_stream<char>&& in) mutable -> future<> {
auto input = std::move(in);
auto body = co_await util::read_entire_stream_contiguous(input);
auto list = parse_list_of_objects(std::move(body));
names = std::move(list.first);
continuation_token = std::move(list.second);
}, http::reply::status_type::ok);
} catch (...) {
_queue.abort(std::current_exception());
co_return;
}
fs::path dir(_prefix);
for (auto&& o : names) {
directory_entry ent{o.substr(_prefix.size())};
if (!_filter(dir, ent)) {
continue;
}
co_await _queue.push_eventually(std::move(ent));
}
} while (!continuation_token.empty());
co_await _queue.push_eventually(std::nullopt);
}
future<std::optional<directory_entry>> client::bucket_lister::get() {
if (!_opt_done_fut) {
_opt_done_fut = start_listing();
}
std::exception_ptr ex;
try {
auto ret = co_await _queue.pop_eventually();
if (ret) {
co_return ret;
}
} catch (...) {
ex = std::current_exception();
}
co_await close();
if (ex) {
co_return coroutine::exception(std::move(ex));
}
co_return std::nullopt;
}
future<> client::bucket_lister::close() noexcept {
if (_opt_done_fut) {
_queue.abort(std::make_exception_ptr(broken_pipe_exception()));
try {
co_await std::exchange(_opt_done_fut, std::make_optional<future<>>(make_ready_future<>())).value();
} catch (...) {
// ignore all errors
}
}
}
// returns pair<num_of_parts, part_size>
std::pair<unsigned, size_t> calc_part_size(size_t total_size, size_t part_size) {
if (total_size > aws_maximum_object_size) {
on_internal_error(s3l, fmt::format("object size too large: {} is larger than maximum S3 object size: {}", total_size, aws_maximum_object_size));
}
if (part_size > 0) {
if (part_size > aws_maximum_part_size) {
on_internal_error(s3l, fmt::format("part_size too large: {} is larger than maximum part size: {}", part_size, aws_maximum_part_size));
}
if (part_size < aws_minimum_part_size) {
on_internal_error(s3l, fmt::format("part_size too small: {} is smaller than minimum part size: {}", part_size, aws_minimum_part_size));
}
const size_t num_parts = div_ceil(total_size, part_size);
if (num_parts > aws_maximum_parts_in_piece) {
on_internal_error(s3l, fmt::format("too many parts: {} > {}", num_parts, aws_maximum_parts_in_piece));
}
return {num_parts, part_size};
}
// if part_size is 0, this means the caller leaves it to us to decide the part_size. The default part size for multipart upload is set to 50MiB. This
// value was determined empirically by running `perf_s3_client` with various part sizes to find the optimal one.
static constexpr size_t default_part_size = 50_MiB;
const size_t num_parts = div_ceil(total_size, default_part_size);
if (num_parts <= aws_maximum_parts_in_piece) {
return {num_parts, default_part_size};
}
part_size = align_up(div_ceil(total_size, aws_maximum_parts_in_piece), 1_MiB);
return {div_ceil(total_size, part_size), part_size};
}
} // s3 namespace