Fixes #27268 Refs #27268 Includes the auth call in code covered by backoff-retry on server error, as well as moves the code to use the shared primitive for this and increase the resilience a bit (increase retry count). v2: * Don't do backoff if we need to refresh credentials. * Use abort source for backoff if avail v3: * Include other retryable conditions in auth check Closes scylladb/scylladb#27269
1146 lines
44 KiB
C++
1146 lines
44 KiB
C++
/*
|
|
* Copyright (C) 2025-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
|
|
#include "object_storage.hh"
|
|
#include "gcp_credentials.hh"
|
|
|
|
#include <algorithm>
|
|
#include <numeric>
|
|
#include <deque>
|
|
|
|
#include <boost/regex.hpp>
|
|
|
|
#include <seastar/core/align.hh>
|
|
#include <seastar/core/gate.hh>
|
|
#include <seastar/core/semaphore.hh>
|
|
#include <seastar/core/sleep.hh>
|
|
#include <seastar/http/client.hh>
|
|
#include <seastar/util/short_streams.hh>
|
|
|
|
#include "utils/rest/client.hh"
|
|
#include "utils/exponential_backoff_retry.hh"
|
|
#include "utils/http.hh"
|
|
#include "utils/overloaded_functor.hh"
|
|
|
|
static logger gcp_storage("gcp_storage");
|
|
|
|
static constexpr uint64_t min_gcp_storage_chunk_size = 256*1024;
|
|
static constexpr uint64_t default_gcp_storage_chunk_size = 8*1024*1024;
|
|
|
|
static constexpr char GCP_OBJECT_SCOPE_READ_ONLY[] = "https://www.googleapis.com/auth/devstorage.read_only";
|
|
static constexpr char GCP_OBJECT_SCOPE_READ_WRITE[] = "https://www.googleapis.com/auth/devstorage.read_write";
|
|
static constexpr char GCP_OBJECT_SCOPE_FULL_CONTROL[] = "https://www.googleapis.com/auth/devstorage.full_control";
|
|
|
|
static constexpr char STORAGE_APIS_URI[] = "https://storage.googleapis.com";
|
|
static constexpr char APPLICATION_JSON[] = "application/json";
|
|
static constexpr char LOCATION[] = "Location";
|
|
static constexpr char CONTENT_RANGE[] = "Content-Range";
|
|
static constexpr char RANGE[] = "Range";
|
|
|
|
using namespace std::string_literals;
|
|
using namespace utils::gcp;
|
|
|
|
static bool storage_scope_implies(const scopes_type& scopes, const scopes_type& check_for) {
|
|
if (default_scopes_implies_other_scope(scopes, check_for)) {
|
|
return true;
|
|
}
|
|
if (scopes_contains_scope(check_for, GCP_OBJECT_SCOPE_READ_ONLY)) {
|
|
return scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_READ_WRITE)
|
|
|| scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_FULL_CONTROL)
|
|
;
|
|
}
|
|
if (scopes_contains_scope(check_for, GCP_OBJECT_SCOPE_READ_WRITE)) {
|
|
return scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_FULL_CONTROL);
|
|
}
|
|
return false;
|
|
}
|
|
|
|
static auto parse_rfc3339(const std::string& s) {
|
|
std::chrono::system_clock::time_point t;
|
|
std::istringstream is(s);
|
|
is >> std::chrono::parse("%FT%TZ", t);
|
|
return t;
|
|
}
|
|
|
|
class utils::gcp::storage::client::object_data_sink : public data_sink_impl {
|
|
shared_ptr<impl> _impl;
|
|
std::string _bucket;
|
|
std::string _object_name;
|
|
rjson::value _metadata;
|
|
std::string _session_path;
|
|
std::string _content_type;
|
|
std::deque<temporary_buffer<char>> _buffers;
|
|
uint64_t _accumulated = 0;
|
|
seastar::semaphore_units<> _mem_held;
|
|
bool _closed = false;
|
|
bool _completed = false;
|
|
seastar::named_gate _gate;
|
|
seastar::semaphore _semaphore;
|
|
std::exception_ptr _exception;
|
|
seastar::abort_source* _as;
|
|
public:
|
|
object_data_sink(shared_ptr<impl> i, std::string_view bucket, std::string_view object_name, rjson::value metadata, seastar::abort_source* as)
|
|
: _impl(i)
|
|
, _bucket(bucket)
|
|
, _object_name(object_name)
|
|
, _metadata(std::move(metadata))
|
|
, _semaphore(1)
|
|
, _as(as)
|
|
{}
|
|
future<> put(std::span<temporary_buffer<char>> bufs) override {
|
|
for (auto&& buf : bufs) {
|
|
_buffers.emplace_back(std::move(buf));
|
|
}
|
|
co_await maybe_do_upload(false);
|
|
}
|
|
future<> flush() override {
|
|
return maybe_do_upload(true);
|
|
}
|
|
future<> close() override {
|
|
if (!std::exchange(_closed, true)) {
|
|
co_await flush();
|
|
co_await _gate.close();
|
|
try {
|
|
if (!_exception && !_completed) {
|
|
co_await check_upload();
|
|
}
|
|
} catch (...) {
|
|
_exception = std::current_exception();
|
|
}
|
|
if (auto ex = std::exchange(_exception, {})) {
|
|
co_await remove_upload();
|
|
std::rethrow_exception(ex);
|
|
}
|
|
}
|
|
}
|
|
|
|
size_t buffer_size() const noexcept override {
|
|
return default_gcp_storage_chunk_size;
|
|
}
|
|
|
|
future<> acquire_session();
|
|
future<> do_single_upload(std::deque<temporary_buffer<char>>, size_t offset, size_t len, bool final);
|
|
future<> check_upload();
|
|
future<> remove_upload();
|
|
future<> adjust_memory_limit(size_t);
|
|
future<> maybe_do_upload(bool force) {
|
|
auto total = std::accumulate(_buffers.begin(), _buffers.end(), size_t{}, [](size_t s, auto& buf) {
|
|
return s + buf.size();
|
|
});
|
|
if (total == 0) {
|
|
co_return;
|
|
}
|
|
|
|
co_await adjust_memory_limit(total);
|
|
|
|
// GCP only allows upload of less than 256k on last chunk
|
|
if (total < min_gcp_storage_chunk_size && !_closed) {
|
|
co_return;
|
|
}
|
|
// avoid uploading unless we accumulate enough data. GCP docs says to
|
|
// try to keep uploads to 8MB or more.
|
|
if (force || total >= default_gcp_storage_chunk_size) {
|
|
auto bufs = std::exchange(_buffers, {});
|
|
auto start = _accumulated;
|
|
auto final = _closed;
|
|
|
|
if (!final) {
|
|
// can only write in multiples of 256.
|
|
auto rem = total % min_gcp_storage_chunk_size;
|
|
total -= rem;
|
|
while (rem) {
|
|
auto& buf = bufs.back();
|
|
if (buf.size() > rem) {
|
|
auto keep = buf.size() - rem;
|
|
_buffers.emplace(_buffers.begin(), buf.share(keep, rem));
|
|
buf.trim(keep);
|
|
break;
|
|
} else {
|
|
rem -= buf.size();
|
|
_buffers.emplace(_buffers.begin(), std::move(buf));
|
|
bufs.pop_back();
|
|
}
|
|
}
|
|
}
|
|
assert(std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) { return s + buf.size(); }) == total);
|
|
_accumulated += total;
|
|
// allow this in background
|
|
(void)do_single_upload(std::move(bufs), start, total, final);
|
|
}
|
|
}
|
|
};
|
|
|
|
class utils::gcp::storage::client::object_data_source : public seekable_data_source_impl {
|
|
shared_ptr<impl> _impl;
|
|
std::string _bucket;
|
|
std::string _object_name;
|
|
std::string _session_path;
|
|
uint64_t _generation = 0;
|
|
uint64_t _size = 0;
|
|
uint64_t _position = 0;
|
|
std::chrono::system_clock::time_point _timestamp;
|
|
seastar::semaphore_units<> _limits;
|
|
seastar::abort_source* _as;
|
|
std::deque<temporary_buffer<char>> _buffers;
|
|
|
|
size_t buffer_size() const {
|
|
return std::accumulate(_buffers.begin(), _buffers.end(), 0, [](size_t sum, auto& b) { return sum + b.size(); });
|
|
}
|
|
|
|
public:
|
|
object_data_source(shared_ptr<impl> i, std::string_view bucket, std::string_view object_name, seastar::abort_source* as)
|
|
: _impl(i)
|
|
, _bucket(bucket)
|
|
, _object_name(object_name)
|
|
, _as(as)
|
|
{}
|
|
future<temporary_buffer<char>> get() override;
|
|
future<temporary_buffer<char>> skip(uint64_t n) override;
|
|
future<> read_info();
|
|
void adjust_lease();
|
|
|
|
future<temporary_buffer<char>> get(size_t limit) override;
|
|
future<> seek(uint64_t pos) override;
|
|
future<uint64_t> size() override;
|
|
future<std::chrono::system_clock::time_point> timestamp() override;
|
|
};
|
|
|
|
using body_writer = std::function<future<>(output_stream<char>&&)>;
|
|
using writer_and_size = std::pair<body_writer, size_t>;
|
|
using body_variant = std::variant<std::string, writer_and_size>;
|
|
using handler_func_ex = rest::handler_func_ex;
|
|
using headers_type = std::vector<rest::key_value>;
|
|
|
|
using namespace rest;
|
|
|
|
class utils::gcp::storage::client::impl {
|
|
std::string _endpoint;
|
|
std::optional<google_credentials> _credentials;
|
|
seastar::semaphore _unlimited;
|
|
seastar::semaphore& _limits;
|
|
seastar::http::experimental::client _client;
|
|
shared_ptr<seastar::tls::certificate_credentials> _certs;
|
|
public:
|
|
impl(const utils::http::url_info&, std::optional<google_credentials>, seastar::semaphore*, shared_ptr<seastar::tls::certificate_credentials> creds);
|
|
impl(std::string_view endpoint, std::optional<google_credentials>, seastar::semaphore*, shared_ptr<seastar::tls::certificate_credentials> creds);
|
|
|
|
future<> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, handler_func_ex, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr);
|
|
future<> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, rest::httpclient::handler_func, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr);
|
|
future<rest::httpclient::result_type> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr);
|
|
|
|
auto get_units(size_t s) const {
|
|
return seastar::get_units(_limits, s);
|
|
}
|
|
auto try_get_units(size_t s) const {
|
|
return seastar::try_get_units(_limits, s);
|
|
}
|
|
future<> close();
|
|
};
|
|
|
|
utils::gcp::storage::client::impl::impl(const utils::http::url_info& url, std::optional<google_credentials> c, seastar::semaphore* memory, shared_ptr<seastar::tls::certificate_credentials> certs)
|
|
: _endpoint(url.host)
|
|
, _credentials(std::move(c))
|
|
, _unlimited(std::numeric_limits<ssize_t>::max())
|
|
, _limits(memory ? *memory : _unlimited)
|
|
, _client(std::make_unique<utils::http::dns_connection_factory>(url.host, url.port, url.is_https(), gcp_storage, certs), 100, seastar::http::experimental::client::retry_requests::yes)
|
|
{}
|
|
|
|
utils::gcp::storage::client::impl::impl(std::string_view endpoint, std::optional<google_credentials> c, seastar::semaphore* memory, shared_ptr<seastar::tls::certificate_credentials> certs)
|
|
: impl(utils::http::parse_simple_url(endpoint.empty() ? STORAGE_APIS_URI : endpoint), std::move(c), memory, std::move(certs))
|
|
{}
|
|
|
|
using status_type = seastar::http::reply::status_type;
|
|
|
|
static std::string get_gcp_error_message(std::string_view body) {
|
|
if (!body.empty()) {
|
|
try {
|
|
auto json = rjson::parse(body);
|
|
if (auto* error = rjson::find(json, "error")) {
|
|
if (auto msg = rjson::get_opt<std::string>(*error, "message")) {
|
|
return *msg;
|
|
}
|
|
}
|
|
} catch (...) {
|
|
}
|
|
}
|
|
return "no info";
|
|
}
|
|
|
|
static future<std::string> get_gcp_error_message(input_stream<char>& in) {
|
|
auto s = co_await util::read_entire_stream_contiguous(in);
|
|
co_return get_gcp_error_message(s);
|
|
}
|
|
|
|
utils::gcp::storage::storage_error::storage_error(const std::string& msg)
|
|
: std::runtime_error(msg)
|
|
, _status(-1)
|
|
{}
|
|
|
|
utils::gcp::storage::storage_error::storage_error(int status, const std::string& msg)
|
|
: std::runtime_error(fmt::format("{}: {}", status, msg))
|
|
, _status(status)
|
|
{}
|
|
|
|
using namespace seastar::http;
|
|
using namespace std::chrono_literals;
|
|
|
|
/**
|
|
* Performs a REST post/put/get with credential refresh/retry.
|
|
*/
|
|
future<>
|
|
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, handler_func_ex handler, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
|
|
static constexpr auto max_retries = 10;
|
|
|
|
exponential_backoff_retry exr(10ms, 10000ms);
|
|
bool do_backoff = false;
|
|
|
|
for (int retry = 0; ; ++retry) {
|
|
if (std::exchange(do_backoff, false)) {
|
|
co_await (as ? exr.retry(*as) : exr.retry());
|
|
}
|
|
|
|
rest::request_wrapper req(_endpoint);
|
|
req.target(path);
|
|
req.method(op);
|
|
|
|
if (_credentials) {
|
|
try {
|
|
try {
|
|
co_await _credentials->refresh(scope, &storage_scope_implies, _certs);
|
|
req.add_header(utils::gcp::AUTHORIZATION, format_bearer(_credentials->token));
|
|
} catch (httpd::unexpected_status_error& e) {
|
|
switch (e.status()) {
|
|
case status_type::request_timeout:
|
|
default:
|
|
if (reply::classify_status(e.status()) != reply::status_class::server_error) {
|
|
break;
|
|
}
|
|
if (retry < max_retries) {
|
|
gcp_storage.debug("Got {}: {}", e.status(), std::current_exception());
|
|
// service unavailable etc -> retry
|
|
do_backoff = true;
|
|
continue;
|
|
}
|
|
break;
|
|
}
|
|
throw;
|
|
}
|
|
} catch (...) {
|
|
gcp_storage.error("Error refreshing credentials: {}", std::current_exception());
|
|
std::throw_with_nested(permission_error("Error refreshing credentials"));
|
|
}
|
|
}
|
|
|
|
for (auto& [k,v] : headers) {
|
|
req.add_header(k, v);
|
|
}
|
|
|
|
std::visit(overloaded_functor {
|
|
[&](const std::string& s) { req.content(content_type, s); },
|
|
[&](const writer_and_size& ws) { req.content(content_type, ws.first, ws.second); }
|
|
}, body);
|
|
|
|
// GCP storage requires this even if content is empty
|
|
req.add_header("Content-Length", std::to_string(req.request().content_length));
|
|
|
|
gcp_storage.trace("Sending: {}", redacted_request_type {
|
|
req.request(),
|
|
bearer_filter()
|
|
});
|
|
|
|
try {
|
|
co_await rest::simple_send(_client, req, [&handler](const seastar::http::reply& res, seastar::input_stream<char>& in) -> future<> {
|
|
gcp_storage.trace("Result: {}", res);
|
|
if (res._status == status_type::unauthorized) {
|
|
throw permission_error(int(res._status), co_await get_gcp_error_message(in));
|
|
} else if (res._status == status_type::request_timeout || reply::classify_status(res._status) == reply::status_class::server_error) {
|
|
throw storage_error(int(res._status), co_await get_gcp_error_message(in));
|
|
}
|
|
co_await handler(res, in);
|
|
}, as);
|
|
break;
|
|
} catch (storage_error& e) {
|
|
gcp_storage.debug("{}: Got unexpected response: {}", _endpoint, e.what());
|
|
auto s = status_type(e.status());
|
|
switch (s) {
|
|
default:
|
|
if (reply::classify_status(s) != reply::status_class::server_error) {
|
|
break;
|
|
}
|
|
[[fallthrough]];
|
|
case status_type::request_timeout:
|
|
do_backoff = true;
|
|
[[fallthrough]];
|
|
case status_type::unauthorized:
|
|
if (retry < max_retries) {
|
|
continue; // retry loop.
|
|
}
|
|
break;
|
|
}
|
|
throw;
|
|
} catch (...) {
|
|
// network, whatnot. maybe add retries here as well, but should really
|
|
// be on seastar level
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
|
|
future<>
|
|
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, rest::httpclient::handler_func f, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
|
|
co_await send_with_retry(path, scope, std::move(body), content_type, [f](const seastar::http::reply& rep, seastar::input_stream<char>& in) -> future<> {
|
|
// ensure these are on our coroutine frame.
|
|
auto& resp_handler = f;
|
|
auto result = co_await util::read_entire_stream_contiguous(in);
|
|
resp_handler(rep, result);
|
|
}, op, headers, as);
|
|
}
|
|
|
|
future<rest::httpclient::result_type>
|
|
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
|
|
rest::httpclient::result_type res;
|
|
co_await send_with_retry(path, scope, std::move(body), content_type, [&res](const seastar::http::reply& r, std::string_view body) {
|
|
gcp_storage.trace("{}", body);
|
|
res.reply._status = r._status;
|
|
res.reply._content = sstring(body);
|
|
res.reply._headers = r._headers;
|
|
res.reply._version = r._version;
|
|
}, op, headers, as);
|
|
co_return res;
|
|
}
|
|
|
|
future<> utils::gcp::storage::client::impl::close() {
|
|
co_await _client.close();
|
|
}
|
|
|
|
// Get an upload session for the given object
|
|
// See https://cloud.google.com/storage/docs/resumable-uploads
|
|
// See https://cloud.google.com/storage/docs/performing-resumable-uploads
|
|
future<> utils::gcp::storage::client::object_data_sink::acquire_session() {
|
|
std::string body;
|
|
if (!_metadata.IsNull()) {
|
|
body = rjson::print(_metadata);
|
|
}
|
|
auto path = fmt::format("/upload/storage/v1/b/{}/o?uploadType=resumable&name={}"
|
|
, _bucket
|
|
, _object_name
|
|
);
|
|
|
|
auto reply = co_await _impl->send_with_retry(path
|
|
, GCP_OBJECT_SCOPE_READ_WRITE
|
|
, std::move(body)
|
|
, APPLICATION_JSON
|
|
, httpclient::method_type::POST
|
|
, {}
|
|
, _as
|
|
);
|
|
|
|
if (reply.result() != status_type::ok) {
|
|
throw failed_operation(int(reply.result()), get_gcp_error_message(reply.body()));
|
|
}
|
|
std::string location = reply.reply._headers[LOCATION];
|
|
gcp_storage.debug("Upload {}/{} -> session uri {}", _bucket, _object_name, location);
|
|
_session_path = utils::http::parse_simple_url(location).path;
|
|
}
|
|
|
|
static const boost::regex range_ex("bytes=(\\d+)-(\\d+)");
|
|
|
|
static bool parse_response_range(const seastar::http::reply& r, uint64_t& first, uint64_t& last) {
|
|
auto& res_headers = r._headers;
|
|
auto i = res_headers.find(RANGE);
|
|
if (i == res_headers.end()) {
|
|
return false;
|
|
}
|
|
|
|
boost::smatch m;
|
|
std::string tmp(i->second);
|
|
if (!boost::regex_match(tmp, m, range_ex)) {
|
|
return false;
|
|
}
|
|
first = std::stoull(m[1].str());
|
|
last = std::stoull(m[2].str());
|
|
|
|
return true;
|
|
}
|
|
|
|
future<> utils::gcp::storage::client::object_data_sink::adjust_memory_limit(size_t total) {
|
|
auto held = _mem_held.count();
|
|
if (held < total) {
|
|
auto want = align_up(total, default_gcp_storage_chunk_size) - held;
|
|
|
|
if (held == 0) {
|
|
// first put into buffer queue. enforce.
|
|
_mem_held = co_await _impl->get_units(want);
|
|
} else {
|
|
// try to get units to cover the bulk of buffers
|
|
// but if we fail, we accept it and try to get by
|
|
// with the lease we have. If we get here we should
|
|
// have at least 8M in our lease, and will in fact do
|
|
// a write, so data should get released.
|
|
auto h = _impl->try_get_units(want);
|
|
if (h) {
|
|
_mem_held.adopt(std::move(*h));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Write a chunk to the dest object
|
|
// See https://cloud.google.com/storage/docs/resumable-uploads
|
|
// See https://cloud.google.com/storage/docs/performing-resumable-uploads
|
|
future<> utils::gcp::storage::client::object_data_sink::do_single_upload(std::deque<temporary_buffer<char>> bufs, size_t offset, size_t len, bool final) {
|
|
// always take the whole memory lease. This might be more or less than what we actually release
|
|
// but we only ever leave sub-256k amount of data in queue, and we want the next
|
|
// put to enforce waiting for a full 8M lease...
|
|
auto mine_held = std::exchange(_mem_held, {});
|
|
|
|
// Ensure to block close from completing
|
|
auto h = _gate.hold();
|
|
// Enforce our concurrency constraints
|
|
auto sem_units = co_await seastar::get_units(_semaphore, 1);
|
|
|
|
// our file range. if the sink was closed, we can set the
|
|
// final size, otherwise, leave it open (*)
|
|
auto last = offset + std::max(len, size_t(1)) - 1; // inclusive.
|
|
auto end = offset + len;
|
|
|
|
for (;;) {
|
|
auto range = fmt::format("bytes {}-{}/{}"
|
|
, offset // first byte
|
|
, last // last byte
|
|
, final ? std::to_string(end) : "*"s
|
|
);
|
|
|
|
try {
|
|
if (_session_path.empty()) {
|
|
co_await acquire_session();
|
|
}
|
|
|
|
gcp_storage.debug("{}:{} write range {}-{}", _bucket, _object_name, offset, offset+len);
|
|
|
|
auto res = co_await _impl->send_with_retry(_session_path
|
|
, GCP_OBJECT_SCOPE_READ_WRITE
|
|
, std::make_pair([&](output_stream<char>&& os_in) -> future<> {
|
|
auto os = std::move(os_in);
|
|
for (auto& buf : bufs) {
|
|
co_await os.write(buf.share());
|
|
}
|
|
co_await os.flush();
|
|
co_await os.close();
|
|
}, len)
|
|
, ""s // no content type
|
|
, httpclient::method_type::PUT
|
|
, rest::key_values({ { CONTENT_RANGE, range } })
|
|
, _as
|
|
);
|
|
|
|
switch (res.result()) {
|
|
case status_type::ok:
|
|
case status_type::created:
|
|
_completed = true;
|
|
gcp_storage.debug("{}:{} completed ({} bytes)", _bucket, _object_name, offset+len);
|
|
co_return; // done and happy
|
|
default:
|
|
if (int(res.result()) == 308) {
|
|
uint64_t first = 0, new_last = 0;
|
|
if (parse_response_range(res.reply, first, new_last) && last != new_last) {
|
|
auto written = (new_last + 1) - offset;
|
|
|
|
gcp_storage.debug("{}:{} partial upload ({} bytes)", _bucket, _object_name, written);
|
|
|
|
if (!final && (len - written) < min_gcp_storage_chunk_size) {
|
|
written = len - std::min(min_gcp_storage_chunk_size, len);
|
|
}
|
|
|
|
auto to_remove = written;
|
|
while (to_remove) {
|
|
auto& buf = bufs.front();
|
|
auto size = std::min(to_remove, buf.size());
|
|
buf.trim_front(size);
|
|
if (buf.empty()) {
|
|
bufs.pop_front();
|
|
}
|
|
to_remove -= size;
|
|
}
|
|
offset += written;
|
|
len -= written;
|
|
auto total = std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) {
|
|
return s + buf.size();
|
|
});
|
|
assert(len == total);
|
|
continue;
|
|
}
|
|
// incomplete. ok for partial
|
|
gcp_storage.debug("{}:{} chunk {}:{} done", _bucket, _object_name, offset, offset+len);
|
|
co_return;
|
|
}
|
|
throw failed_upload_error(int(res.result()), get_gcp_error_message(res.body()));
|
|
}
|
|
} catch (...) {
|
|
_exception = std::current_exception();
|
|
gcp_storage.warn("Exception in upload of {}:{} ({}/{}): {}"
|
|
, _bucket
|
|
, _object_name
|
|
, offset
|
|
, len
|
|
, _exception
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check/close the final object.
|
|
future<> utils::gcp::storage::client::object_data_sink::check_upload() {
|
|
// Now we know the final size. Set it in range
|
|
auto range = fmt::format("bytes */{}", _accumulated);
|
|
|
|
auto res = co_await _impl->send_with_retry(_session_path
|
|
, GCP_OBJECT_SCOPE_READ_WRITE
|
|
, ""s
|
|
, APPLICATION_JSON
|
|
, httpclient::method_type::PUT
|
|
, rest::key_values({ { CONTENT_RANGE, range } })
|
|
, _as
|
|
);
|
|
|
|
switch (res.result()) {
|
|
case status_type::ok:
|
|
case status_type::created:
|
|
_completed = true;
|
|
gcp_storage.debug("{}:{} completed ({})", _bucket, _object_name, _accumulated);
|
|
co_return; // done and happy
|
|
default:
|
|
throw failed_upload_error(int(res.result()), fmt::format("{}:{} incomplete. ({}): {}"
|
|
, _bucket, _object_name, res.reply._headers[RANGE]
|
|
, get_gcp_error_message(res.body())
|
|
));
|
|
}
|
|
}
|
|
|
|
// https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload
|
|
future<> utils::gcp::storage::client::object_data_sink::remove_upload() {
|
|
if (_completed || _session_path.empty()) {
|
|
co_return;
|
|
}
|
|
|
|
gcp_storage.debug("Removing incomplete upload {}:{} ()", _bucket, _object_name, _session_path);
|
|
|
|
auto res = co_await _impl->send_with_retry(_session_path
|
|
, GCP_OBJECT_SCOPE_READ_WRITE
|
|
, ""s
|
|
, APPLICATION_JSON
|
|
, httpclient::method_type::DELETE
|
|
, {}
|
|
, _as
|
|
);
|
|
|
|
switch (int(res.result())) {
|
|
case 499: // not in enum yet
|
|
gcp_storage.debug("Upload of {}:{} removed ({})", _bucket, _object_name, _session_path);
|
|
co_return; // done and happy
|
|
default: {
|
|
auto msg = get_gcp_error_message(res.body());
|
|
gcp_storage.warn("Failed to remove broken upload of {}:{} ({})", _bucket, _object_name, msg);
|
|
if (!_exception) {
|
|
throw failed_upload_error(int(res.result()), fmt::format("{}:{} incomplete. ({}): {}"
|
|
, _bucket, _object_name, res.reply._headers[RANGE]
|
|
, msg
|
|
));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Read a single buffer from the source object
|
|
future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::get(size_t limit) {
|
|
// If we don't know the source size yet, get the info from server
|
|
if (_size == 0) {
|
|
co_await read_info();
|
|
}
|
|
|
|
// If we don't have buffers to give, try getting one from server
|
|
if (_buffers.empty()) {
|
|
auto to_read = std::min(_size - _position, limit);
|
|
|
|
// to_read == 0 -> eof
|
|
if (to_read != 0) {
|
|
gcp_storage.debug("Reading object {}:{} ({}-{}/{})", _bucket, _object_name, _position, _position+to_read, _size);
|
|
|
|
auto lease = _impl->try_get_units(to_read);
|
|
if (lease) {
|
|
if (_limits) {
|
|
_limits.adopt(std::move(*lease));
|
|
} else {
|
|
_limits = std::move(*lease);
|
|
}
|
|
} else {
|
|
// If we can't get a lease to cover this read, don't wait, as this
|
|
// could cause deadlock in higher layers, but instead adjust the
|
|
// size down to decrease memory pressure.
|
|
to_read = std::min(to_read, min_gcp_storage_chunk_size);
|
|
gcp_storage.debug("Reading object (adjusted) {}:{} ({}-{}/{})", _bucket, _object_name, _position, _position+to_read, _size);
|
|
}
|
|
|
|
// Ensure we read from the same generation as we queried in read_info. Note: mock server ignores this.
|
|
auto path = fmt::format("/storage/v1/b/{}/o/{}?ifGenerationMatch={}&alt=media", _bucket, _object_name, _generation);
|
|
auto range = fmt::format("bytes={}-{}", _position, _position+to_read-1); // inclusive range
|
|
|
|
co_await _impl->send_with_retry(path
|
|
, GCP_OBJECT_SCOPE_READ_ONLY
|
|
, ""s
|
|
, ""s
|
|
, [&](const seastar::http::reply& rep, seastar::input_stream<char>& in) -> future<> {
|
|
if (rep._status != status_type::ok && rep._status != status_type::partial_content) {
|
|
throw failed_operation(fmt::format("Could not read object {}: {} ({}/{} - {})", _bucket, _object_name, _position, _size, int(rep._status)));
|
|
}
|
|
auto old = _position;
|
|
// ensure these are on our coroutine frame.
|
|
auto bufs = co_await util::read_entire_stream(in);
|
|
for (auto&& buf : bufs) {
|
|
_position += buf.size();
|
|
_buffers.emplace_back(std::move(buf));
|
|
}
|
|
gcp_storage.debug("Read object {}:{} ({}-{}/{})", _bucket, _object_name, old, _position, _size);
|
|
}
|
|
, httpclient::method_type::GET
|
|
, rest::key_values({ { RANGE, range } })
|
|
, _as
|
|
);
|
|
}
|
|
}
|
|
|
|
temporary_buffer<char> res;
|
|
|
|
if (!_buffers.empty()) {
|
|
auto&& buf = _buffers.front();
|
|
if (buf.size() >= limit) {
|
|
res = buf.share(0, limit);
|
|
buf.trim_front(limit);
|
|
} else {
|
|
res = std::move(buf);
|
|
_buffers.pop_front();
|
|
}
|
|
}
|
|
|
|
adjust_lease();
|
|
|
|
co_return res;
|
|
}
|
|
|
|
future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::get() {
|
|
// If we don't have buffers to give, try getting one from server
|
|
co_return co_await get(default_gcp_storage_chunk_size);
|
|
}
|
|
|
|
future<> utils::gcp::storage::client::object_data_source::seek(uint64_t pos) {
|
|
if (_size == 0) {
|
|
co_await read_info();
|
|
}
|
|
auto buf_size = buffer_size();
|
|
assert(buf_size <= _position);
|
|
auto read_pos = _position - buf_size;
|
|
if (pos < read_pos || pos >= _position) {
|
|
_buffers.clear();
|
|
_position = std::min(pos, _size);
|
|
co_return;
|
|
}
|
|
auto n = pos - read_pos;
|
|
// Drop superfluous cache
|
|
while (n > 0 && !_buffers.empty()) {
|
|
auto m = std::min(n, _buffers.front().size());
|
|
_buffers.front().trim_front(m);
|
|
if (_buffers.front().empty()) {
|
|
_buffers.pop_front();
|
|
}
|
|
n -= m;
|
|
}
|
|
adjust_lease();
|
|
}
|
|
|
|
void utils::gcp::storage::client::object_data_source::adjust_lease() {
|
|
auto total = std::accumulate(_buffers.begin(), _buffers.end(), size_t{}, [](size_t s, auto& buf) {
|
|
return s + buf.size();
|
|
});
|
|
if (total < _limits.count()) {
|
|
_limits.return_units(_limits.count() - total);
|
|
}
|
|
}
|
|
|
|
future<uint64_t> utils::gcp::storage::client::object_data_source::size() {
|
|
if (_size == 0) {
|
|
co_await read_info();
|
|
}
|
|
co_return _size;
|
|
}
|
|
|
|
future<std::chrono::system_clock::time_point> utils::gcp::storage::client::object_data_source::timestamp() {
|
|
if (_timestamp.time_since_epoch().count() == 0) {
|
|
co_await read_info();
|
|
}
|
|
co_return _timestamp;
|
|
}
|
|
|
|
future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::skip(uint64_t n) {
|
|
auto buf_size = buffer_size();
|
|
assert(buf_size <= _position);
|
|
auto read_pos = _position - buf_size;
|
|
co_await seek(read_pos + n);
|
|
// And get the next buffer
|
|
co_return co_await get();
|
|
}
|
|
|
|
future<> utils::gcp::storage::client::object_data_source::read_info() {
|
|
gcp_storage.debug("Read info {}:{}", _bucket, _object_name);
|
|
|
|
auto path = fmt::format("/storage/v1/b/{}/o/{}", _bucket, _object_name);
|
|
|
|
auto res = co_await _impl->send_with_retry(path
|
|
, GCP_OBJECT_SCOPE_READ_ONLY
|
|
, ""s
|
|
, ""s
|
|
, httpclient::method_type::GET
|
|
, {}
|
|
, _as
|
|
);
|
|
|
|
if (res.result() != status_type::ok) {
|
|
throw failed_operation(fmt::format("Could not query object {}:{} {}", _bucket, _object_name, res.result()));
|
|
}
|
|
|
|
auto item = rjson::parse(std::move(res.body()));
|
|
// Ensure we got the info we asked for/expect
|
|
if (rjson::get<std::string>(item, "kind") != "storage#object"s) {
|
|
throw failed_operation("Malformed query object reply");
|
|
}
|
|
|
|
_size = std::stoull(rjson::get<std::string>(item, "size"));
|
|
_generation = std::stoull(rjson::get<std::string>(item, "generation"));
|
|
_timestamp = parse_rfc3339(rjson::get<std::string>(item, "updated"));
|
|
}
|
|
|
|
utils::gcp::storage::client::client(std::string_view endpoint, std::optional<google_credentials> c, shared_ptr<seastar::tls::certificate_credentials> certs)
|
|
: _impl(seastar::make_shared<impl>(endpoint, std::move(c), nullptr, std::move(certs)))
|
|
{}
|
|
|
|
utils::gcp::storage::client::client(std::string_view endpoint, std::optional<google_credentials> c, seastar::semaphore& memory, shared_ptr<seastar::tls::certificate_credentials> certs)
|
|
: _impl(seastar::make_shared<impl>(endpoint, std::move(c), &memory, std::move(certs)))
|
|
{}
|
|
|
|
utils::gcp::storage::client::~client() = default;
|
|
|
|
|
|
future<> utils::gcp::storage::client::create_bucket(std::string_view project, rjson::value meta) {
|
|
gcp_storage.debug("Create bucket {}:{}", project, rjson::get(meta, "name"));
|
|
|
|
auto path = fmt::format("/storage/v1/b?project={}", project);
|
|
auto body = rjson::print(meta);
|
|
|
|
auto res = co_await _impl->send_with_retry(path
|
|
, GCP_OBJECT_SCOPE_FULL_CONTROL
|
|
, body
|
|
, APPLICATION_JSON
|
|
, httpclient::method_type::POST
|
|
);
|
|
|
|
switch (res.result()) {
|
|
case status_type::ok:
|
|
case status_type::created:
|
|
co_return; // done and happy
|
|
default:
|
|
throw failed_operation(fmt::format("Could not create bucket {}: {}", rjson::get(meta, "name"), res.result()));
|
|
}
|
|
}
|
|
|
|
future<> utils::gcp::storage::client::create_bucket(std::string_view project, std::string_view bucket, std::string_view region, std::string_view storage_class) {
|
|
// Construct metadata. Could fmt::format, but this is somewhat safer.
|
|
rjson::value meta = rjson::empty_object();
|
|
rjson::add(meta, "name", std::string(bucket));
|
|
rjson::add(meta, "location", std::string(region.empty() ? "US" : region));
|
|
rjson::add(meta, "storageClass", std::string(storage_class.empty() ? "STANDARD" : storage_class));
|
|
|
|
rjson::value uniformBucketLevelAccess = rjson::empty_object();
|
|
rjson::add(uniformBucketLevelAccess, "enabled", true);
|
|
rjson::value iamConfiguration = rjson::empty_object();
|
|
rjson::add(iamConfiguration, "uniformBucketLevelAccess", std::move(uniformBucketLevelAccess));
|
|
rjson::add(meta, "iamConfiguration", std::move(iamConfiguration));
|
|
|
|
co_await create_bucket(project, std::move(meta));
|
|
}
|
|
|
|
future<> utils::gcp::storage::client::delete_bucket(std::string_view bucket_in) {
|
|
std::string bucket(bucket_in);
|
|
|
|
gcp_storage.debug("Delete bucket {}", bucket);
|
|
|
|
auto path = fmt::format("/storage/v1/b/{}", bucket);
|
|
|
|
auto res = co_await _impl->send_with_retry(path
|
|
, GCP_OBJECT_SCOPE_FULL_CONTROL
|
|
, ""s
|
|
, ""s
|
|
, httpclient::method_type::DELETE
|
|
);
|
|
|
|
switch (res.result()) {
|
|
case status_type::ok: // mock server sends wrong code, but seems acceptable
|
|
case status_type::no_content:
|
|
co_return; // done and happy
|
|
default:
|
|
throw failed_operation(fmt::format("Could not delete bucket {}: {}", bucket, res.result()));
|
|
}
|
|
}
|
|
|
|
static utils::gcp::storage::object_info create_info(const rjson::value& item) {
|
|
utils::gcp::storage::object_info info;
|
|
|
|
info.name = rjson::get<std::string>(item, "name");
|
|
info.content_type = rjson::get_opt<std::string>(item, "contentType").value_or(""s);
|
|
info.size = std::stoull(rjson::get<std::string>(item, "size"));
|
|
info.generation = std::stoull(rjson::get<std::string>(item, "generation"));
|
|
info.modified = parse_rfc3339(rjson::get<std::string>(item, "updated"));
|
|
|
|
return info;
|
|
}
|
|
|
|
// See https://cloud.google.com/storage/docs/listing-objects
|
|
// TODO: maybe make a generator? However, we don't have a streaming
|
|
// json parsing routine as such, so however we do this, we need to
|
|
// read all data from network, etc. Thus there is not all that much
|
|
// point in it. Return chunked_vector to avoid large alloc, but keep it
|
|
// in one object... for now...
|
|
future<utils::chunked_vector<utils::gcp::storage::object_info>> utils::gcp::storage::client::list_objects(std::string_view bucket_in, std::string_view prefix, bucket_paging& pager) {
|
|
std::string bucket(bucket_in);
|
|
|
|
gcp_storage.debug("List bucket {} (prefix={}, max_results={})", bucket, prefix, pager.max_results);
|
|
|
|
auto path = fmt::format("/storage/v1/b/{}/o", bucket);
|
|
auto psep = "?";
|
|
if (!prefix.empty()) {
|
|
path += fmt::format("{}prefix={}", psep, prefix);
|
|
psep = "&&";
|
|
}
|
|
if (pager.max_results != 0) {
|
|
path += fmt::format("{}maxResults={}", psep, pager.max_results);
|
|
psep = "&&";
|
|
}
|
|
if (!pager.token.empty()) {
|
|
path += fmt::format("{}pageToken={}", psep, pager.token);
|
|
psep = "&&";
|
|
}
|
|
|
|
utils::chunked_vector<utils::gcp::storage::object_info> result;
|
|
|
|
co_await _impl->send_with_retry(path
|
|
, GCP_OBJECT_SCOPE_READ_ONLY
|
|
, ""s
|
|
, ""s
|
|
, [&](const seastar::http::reply& rep, seastar::input_stream<char>& in) -> future<> {
|
|
if (rep._status != status_type::ok) {
|
|
throw failed_operation(fmt::format("Could not list bucket {}: {} ({})", bucket, rep._status
|
|
, co_await get_gcp_error_message(in)
|
|
));
|
|
}
|
|
|
|
// ensure these are on our coroutine frame.
|
|
auto bufs = co_await util::read_entire_stream(in);
|
|
auto root = rjson::parse(std::move(bufs));
|
|
|
|
if (rjson::get<std::string>(root, "kind") != "storage#objects"s) {
|
|
throw failed_operation("Malformed list object reply");
|
|
}
|
|
|
|
auto items = rjson::find(root, "items");
|
|
if (!items) {
|
|
co_return;
|
|
}
|
|
if (!items->IsArray()) {
|
|
throw failed_operation("Malformed list object items");
|
|
}
|
|
|
|
pager.token = rjson::get_opt<std::string>(root, "nextPageToken").value_or(""s);
|
|
|
|
for (auto& item : items->GetArray()) {
|
|
object_info info = create_info(item);
|
|
result.emplace_back(std::move(info));
|
|
}
|
|
|
|
}
|
|
, httpclient::method_type::GET
|
|
);
|
|
|
|
co_return result;
|
|
}
|
|
|
|
future<utils::chunked_vector<utils::gcp::storage::object_info>> utils::gcp::storage::client::list_objects(std::string_view bucket, std::string_view prefix) {
|
|
bucket_paging dummy(0);
|
|
co_return co_await list_objects(bucket, prefix, dummy);
|
|
}
|
|
|
|
// See https://cloud.google.com/storage/docs/deleting-objects
|
|
future<> utils::gcp::storage::client::delete_object(std::string_view bucket_in, std::string_view object_name_in) {
|
|
std::string bucket(bucket_in), object_name(object_name_in);
|
|
|
|
gcp_storage.debug("Delete object {}:{}", bucket, object_name);
|
|
|
|
auto path = fmt::format("/storage/v1/b/{}/o/{}", bucket, object_name);
|
|
|
|
auto res = co_await _impl->send_with_retry(path
|
|
, GCP_OBJECT_SCOPE_READ_WRITE
|
|
, ""s
|
|
, ""s
|
|
, httpclient::method_type::DELETE
|
|
);
|
|
|
|
switch (res.result()) {
|
|
case status_type::ok:
|
|
case status_type::no_content:
|
|
gcp_storage.debug("Deleted {}:{}", bucket, object_name);
|
|
co_return; // done and happy
|
|
case status_type::not_found:
|
|
gcp_storage.debug("Could not delete {}:{} - no such object", bucket, object_name);
|
|
co_return; // ok...?
|
|
default:
|
|
throw failed_operation(fmt::format("Could not delete object {}:{}: {} ({})", bucket, object_name, res.result()
|
|
, get_gcp_error_message(res.body())
|
|
));
|
|
}
|
|
}
|
|
|
|
// See https://cloud.google.com/storage/docs/copying-renaming-moving-objects
|
|
// GCP does not support moveTo across buckets.
|
|
future<> utils::gcp::storage::client::rename_object(std::string_view bucket, std::string_view object_name, std::string_view new_bucket, std::string_view new_name) {
|
|
co_await copy_object(bucket, object_name, new_bucket, new_name);
|
|
co_await delete_object(bucket, object_name);
|
|
}
|
|
|
|
// See https://cloud.google.com/storage/docs/copying-renaming-moving-objects
|
|
future<> utils::gcp::storage::client::rename_object(std::string_view bucket_in, std::string_view object_name_in, std::string_view new_name_in) {
|
|
std::string bucket(bucket_in), object_name(object_name_in), new_name(new_name_in);
|
|
|
|
gcp_storage.debug("Move object {}:{} -> {}", bucket, object_name, new_name);
|
|
|
|
auto path = fmt::format("/storage/v1/b/{}/o/{}/moveTo/o/{}", bucket, object_name, new_name);
|
|
auto res = co_await _impl->send_with_retry(path
|
|
, GCP_OBJECT_SCOPE_READ_WRITE
|
|
, ""s
|
|
, ""s
|
|
, httpclient::method_type::PUT
|
|
);
|
|
|
|
switch (res.result()) {
|
|
case status_type::ok:
|
|
case status_type::created:
|
|
gcp_storage.debug("Moved {}:{} to {}", bucket, object_name, new_name);
|
|
co_return; // done and happy
|
|
default:
|
|
throw failed_operation(fmt::format("Could not rename object {}:{}: {} ({})", bucket, object_name, res.result()
|
|
, get_gcp_error_message(res.body())
|
|
));
|
|
}
|
|
}
|
|
|
|
// See https://cloud.google.com/storage/docs/copying-renaming-moving-objects
|
|
// Copying an object in GCP can only process a certain amount of data in one call
|
|
// Must keep doing it until all data is copied, and check response.
|
|
future<> utils::gcp::storage::client::copy_object(std::string_view bucket_in, std::string_view object_name_in, std::string_view new_bucket_in, std::string_view to_name_in) {
|
|
std::string bucket(bucket_in), object_name(object_name_in), new_bucket(new_bucket_in), to_name(to_name_in);
|
|
|
|
auto path = fmt::format("/storage/v1/b/{}/o/{}/rewriteTo/b/{}/o/{}", bucket, object_name, new_bucket, to_name);
|
|
std::string body;
|
|
|
|
for (;;) {
|
|
auto res = co_await _impl->send_with_retry(path
|
|
, GCP_OBJECT_SCOPE_READ_WRITE
|
|
, body
|
|
, APPLICATION_JSON
|
|
, httpclient::method_type::PUT
|
|
);
|
|
|
|
if (res.result() != status_type::ok) {
|
|
throw failed_operation(fmt::format("Could not copy object {}:{}: {} ({})", bucket, object_name, res.result()
|
|
, get_gcp_error_message(res.body())
|
|
));
|
|
}
|
|
|
|
auto resp = rjson::parse(res.body());
|
|
if (rjson::get<bool>(resp, "done")) {
|
|
gcp_storage.debug("Copied {}:{} to {}:{}", bucket, object_name, new_bucket, to_name);
|
|
co_return; // done and happy
|
|
}
|
|
|
|
auto token = rjson::get<std::string>(resp, "rewriteToken");
|
|
auto written = rjson::get<uint64_t>(resp, "totalBytesRewritten");
|
|
auto size = rjson::get<uint64_t>(resp, "objectSize");
|
|
|
|
// Call 2+ must include the rewriteToken
|
|
body = fmt::format("{{\"rewriteToken\": \"{}\"}}", token);
|
|
|
|
gcp_storage.debug("Partial copy of {}:{} to {}:{} ({}/{})", bucket, object_name, new_bucket, to_name, written, size);
|
|
}
|
|
}
|
|
|
|
future<utils::gcp::storage::object_info> utils::gcp::storage::client::merge_objects(std::string_view bucket_in, std::string_view dest_object_name, std::vector<std::string> source_object_names, rjson::value metadata, seastar::abort_source* as) {
|
|
rjson::value compose = rjson::empty_object();
|
|
rjson::value source_objects = rjson::empty_array();
|
|
|
|
if (source_object_names.size() > 32) {
|
|
throw std::invalid_argument(fmt::format("Can only merge up to 32 objects. {} requested.", source_object_names.size()));
|
|
}
|
|
|
|
for (auto& src : source_object_names) {
|
|
rjson::value obj = rjson::empty_object();
|
|
rjson::add(obj, "name", src);
|
|
rjson::push_back(source_objects, std::move(obj));
|
|
}
|
|
|
|
rjson::add(compose, "sourceObjects", std::move(source_objects));
|
|
rjson::add(compose, "destination", std::move(metadata));
|
|
|
|
std::string bucket(bucket_in), object_name(dest_object_name);
|
|
|
|
auto path = fmt::format("/storage/v1/b/{}/o/{}/compose", bucket, object_name);
|
|
auto body = rjson::print(compose);
|
|
|
|
auto res = co_await _impl->send_with_retry(path
|
|
, GCP_OBJECT_SCOPE_READ_WRITE
|
|
, body
|
|
, APPLICATION_JSON
|
|
, httpclient::method_type::POST
|
|
);
|
|
|
|
if (res.result() != status_type::ok) {
|
|
throw failed_operation(fmt::format("Could not merge to object {} -> {}:{}: {} ({})", source_object_names, bucket, object_name, res.result()
|
|
, get_gcp_error_message(res.body())
|
|
));
|
|
}
|
|
|
|
auto resp = rjson::parse(res.body());
|
|
|
|
co_return create_info(resp);
|
|
}
|
|
|
|
future<> utils::gcp::storage::client::copy_object(std::string_view bucket, std::string_view object_name, std::string_view to_name) {
|
|
co_await copy_object(bucket, object_name, bucket, to_name);
|
|
}
|
|
|
|
seastar::data_sink utils::gcp::storage::client::create_upload_sink(std::string_view bucket, std::string_view object_name, rjson::value metadata, seastar::abort_source* as) const {
|
|
return seastar::data_sink(std::make_unique<object_data_sink>(_impl, bucket, object_name, std::move(metadata), as));
|
|
}
|
|
|
|
seekable_data_source utils::gcp::storage::client::create_download_source(std::string_view bucket, std::string_view object_name, seastar::abort_source* as) const {
|
|
return seekable_data_source(std::make_unique<object_data_source>(_impl, bucket, object_name, as));
|
|
}
|
|
|
|
future<> utils::gcp::storage::client::close() {
|
|
return _impl->close();
|
|
}
|
|
|
|
const std::string utils::gcp::storage::client::DEFAULT_ENDPOINT = "https://storage.googleapis.com";
|