The code in upload_file std::move()-s vector of names into merge_objects() method, then iterates over this vector to delete objects. The iteration is apparently a no-op on moved-from vector. The fix is to make merge_objects() helper get vector of names by const reference -- the method doesn't modify the names collection, the caller keeps one in stable storage. Fixes #29060 Signed-off-by: Pavel Emelyanov <xemul@scylladb.com> Closes scylladb/scylladb#29061
1164 lines
46 KiB
C++
1164 lines
46 KiB
C++
/*
|
||
* Copyright (C) 2025-present ScyllaDB
|
||
*/
|
||
|
||
/*
|
||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||
*/
|
||
|
||
|
||
#include "object_storage.hh"
|
||
#include "gcp_credentials.hh"
|
||
#include "object_storage_retry_strategy.hh"
|
||
|
||
#include <algorithm>
|
||
#include <numeric>
|
||
#include <deque>
|
||
|
||
#include <boost/regex.hpp>
|
||
|
||
#include <seastar/core/align.hh>
|
||
#include <seastar/core/gate.hh>
|
||
#include <seastar/core/semaphore.hh>
|
||
#include <seastar/core/sleep.hh>
|
||
#include <seastar/core/units.hh>
|
||
#include <seastar/http/client.hh>
|
||
#include <seastar/util/short_streams.hh>
|
||
|
||
#include "utils/rest/client.hh"
|
||
#include "utils/exponential_backoff_retry.hh"
|
||
#include "utils/error_injection.hh"
|
||
#include "utils/exceptions.hh"
|
||
#include "utils/http.hh"
|
||
#include "utils/http_client_error_processing.hh"
|
||
#include "utils/overloaded_functor.hh"
|
||
|
||
static logger gcp_storage("gcp_storage");
|
||
|
||
static constexpr uint64_t min_gcp_storage_chunk_size = 256*1024;
|
||
static constexpr uint64_t default_gcp_storage_chunk_size = 8*1024*1024;
|
||
|
||
static constexpr char GCP_OBJECT_SCOPE_READ_ONLY[] = "https://www.googleapis.com/auth/devstorage.read_only";
|
||
static constexpr char GCP_OBJECT_SCOPE_READ_WRITE[] = "https://www.googleapis.com/auth/devstorage.read_write";
|
||
static constexpr char GCP_OBJECT_SCOPE_FULL_CONTROL[] = "https://www.googleapis.com/auth/devstorage.full_control";
|
||
|
||
static constexpr char STORAGE_APIS_URI[] = "https://storage.googleapis.com";
|
||
static constexpr char APPLICATION_JSON[] = "application/json";
|
||
static constexpr char LOCATION[] = "Location";
|
||
static constexpr char CONTENT_RANGE[] = "Content-Range";
|
||
static constexpr char RANGE[] = "Range";
|
||
|
||
using namespace std::string_literals;
|
||
using namespace utils::gcp;
|
||
|
||
static bool storage_scope_implies(const scopes_type& scopes, const scopes_type& check_for) {
|
||
if (default_scopes_implies_other_scope(scopes, check_for)) {
|
||
return true;
|
||
}
|
||
if (scopes_contains_scope(check_for, GCP_OBJECT_SCOPE_READ_ONLY)) {
|
||
return scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_READ_WRITE)
|
||
|| scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_FULL_CONTROL)
|
||
;
|
||
}
|
||
if (scopes_contains_scope(check_for, GCP_OBJECT_SCOPE_READ_WRITE)) {
|
||
return scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_FULL_CONTROL);
|
||
}
|
||
return false;
|
||
}
|
||
|
||
static auto parse_rfc3339(const std::string& s) {
|
||
std::chrono::system_clock::time_point t;
|
||
std::istringstream is(s);
|
||
is >> std::chrono::parse("%FT%TZ", t);
|
||
return t;
|
||
}
|
||
|
||
class utils::gcp::storage::client::object_data_sink : public data_sink_impl {
|
||
shared_ptr<impl> _impl;
|
||
std::string _bucket;
|
||
std::string _object_name;
|
||
rjson::value _metadata;
|
||
std::string _session_path;
|
||
std::string _content_type;
|
||
std::deque<temporary_buffer<char>> _buffers;
|
||
uint64_t _accumulated = 0;
|
||
seastar::semaphore_units<> _mem_held;
|
||
bool _closed = false;
|
||
bool _completed = false;
|
||
seastar::named_gate _gate;
|
||
seastar::semaphore _semaphore;
|
||
std::exception_ptr _exception;
|
||
seastar::abort_source* _as;
|
||
public:
|
||
object_data_sink(shared_ptr<impl> i, std::string_view bucket, std::string_view object_name, rjson::value metadata, seastar::abort_source* as)
|
||
: _impl(i)
|
||
, _bucket(bucket)
|
||
, _object_name(object_name)
|
||
, _metadata(std::move(metadata))
|
||
, _semaphore(1)
|
||
, _as(as)
|
||
{}
|
||
future<> put(std::span<temporary_buffer<char>> bufs) override {
|
||
for (auto&& buf : bufs) {
|
||
_buffers.emplace_back(std::move(buf));
|
||
}
|
||
co_await maybe_do_upload(false);
|
||
}
|
||
future<> flush() override {
|
||
return maybe_do_upload(true);
|
||
}
|
||
future<> close() override {
|
||
if (!std::exchange(_closed, true)) {
|
||
co_await flush();
|
||
co_await _gate.close();
|
||
try {
|
||
if (!_exception && !_completed) {
|
||
co_await check_upload();
|
||
}
|
||
} catch (...) {
|
||
_exception = std::current_exception();
|
||
}
|
||
if (auto ex = std::exchange(_exception, {})) {
|
||
co_await remove_upload();
|
||
std::rethrow_exception(ex);
|
||
}
|
||
}
|
||
}
|
||
|
||
size_t buffer_size() const noexcept override {
|
||
return default_gcp_storage_chunk_size;
|
||
}
|
||
|
||
future<> acquire_session();
|
||
future<> do_single_upload(std::deque<temporary_buffer<char>>, size_t offset, size_t len, bool final);
|
||
future<> check_upload();
|
||
future<> remove_upload();
|
||
future<> adjust_memory_limit(size_t);
|
||
future<> maybe_do_upload(bool force) {
|
||
auto total = std::accumulate(_buffers.begin(), _buffers.end(), size_t{}, [](size_t s, auto& buf) {
|
||
return s + buf.size();
|
||
});
|
||
if (total == 0) {
|
||
co_return;
|
||
}
|
||
|
||
co_await adjust_memory_limit(total);
|
||
|
||
// GCP only allows upload of less than 256k on last chunk
|
||
if (total < min_gcp_storage_chunk_size && !_closed) {
|
||
co_return;
|
||
}
|
||
// avoid uploading unless we accumulate enough data. GCP docs says to
|
||
// try to keep uploads to 8MB or more.
|
||
if (force || total >= default_gcp_storage_chunk_size) {
|
||
auto bufs = std::exchange(_buffers, {});
|
||
auto start = _accumulated;
|
||
auto final = _closed;
|
||
|
||
if (!final) {
|
||
// can only write in multiples of 256.
|
||
auto rem = total % min_gcp_storage_chunk_size;
|
||
total -= rem;
|
||
while (rem) {
|
||
auto& buf = bufs.back();
|
||
if (buf.size() > rem) {
|
||
auto keep = buf.size() - rem;
|
||
_buffers.emplace(_buffers.begin(), buf.share(keep, rem));
|
||
buf.trim(keep);
|
||
break;
|
||
} else {
|
||
rem -= buf.size();
|
||
_buffers.emplace(_buffers.begin(), std::move(buf));
|
||
bufs.pop_back();
|
||
}
|
||
}
|
||
}
|
||
assert(std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) { return s + buf.size(); }) == total);
|
||
_accumulated += total;
|
||
// allow this in background
|
||
(void)do_single_upload(std::move(bufs), start, total, final);
|
||
}
|
||
}
|
||
};
|
||
|
||
class utils::gcp::storage::client::object_data_source : public seekable_data_source_impl {
|
||
shared_ptr<impl> _impl;
|
||
std::string _bucket;
|
||
std::string _object_name;
|
||
std::string _session_path;
|
||
uint64_t _generation = 0;
|
||
uint64_t _size = 0;
|
||
uint64_t _position = 0;
|
||
std::chrono::system_clock::time_point _timestamp;
|
||
seastar::semaphore_units<> _limits;
|
||
seastar::abort_source* _as;
|
||
std::deque<temporary_buffer<char>> _buffers;
|
||
|
||
size_t buffer_size() const {
|
||
return std::accumulate(_buffers.begin(), _buffers.end(), 0, [](size_t sum, auto& b) { return sum + b.size(); });
|
||
}
|
||
|
||
public:
|
||
object_data_source(shared_ptr<impl> i, std::string_view bucket, std::string_view object_name, seastar::abort_source* as)
|
||
: _impl(i)
|
||
, _bucket(bucket)
|
||
, _object_name(object_name)
|
||
, _as(as)
|
||
{}
|
||
future<temporary_buffer<char>> get() override;
|
||
future<temporary_buffer<char>> skip(uint64_t n) override;
|
||
future<> read_info();
|
||
void adjust_lease();
|
||
|
||
future<temporary_buffer<char>> get(size_t limit) override;
|
||
future<> seek(uint64_t pos) override;
|
||
future<uint64_t> size() override;
|
||
future<std::chrono::system_clock::time_point> timestamp() override;
|
||
};
|
||
|
||
using body_writer = std::function<future<>(output_stream<char>&&)>;
|
||
using writer_and_size = std::pair<body_writer, size_t>;
|
||
using body_variant = std::variant<std::string, writer_and_size>;
|
||
using handler_func_ex = rest::handler_func_ex;
|
||
using headers_type = std::vector<rest::key_value>;
|
||
|
||
using namespace rest;
|
||
|
||
class utils::gcp::storage::client::impl {
|
||
std::string _endpoint;
|
||
std::optional<google_credentials> _credentials;
|
||
seastar::semaphore _unlimited;
|
||
seastar::semaphore& _limits;
|
||
seastar::http::experimental::client _client;
|
||
shared_ptr<seastar::tls::certificate_credentials> _certs;
|
||
future<> authorize(request_wrapper& req, const std::string& scope);
|
||
public:
|
||
impl(const utils::http::url_info&, std::optional<google_credentials>, seastar::semaphore*, shared_ptr<seastar::tls::certificate_credentials> creds);
|
||
impl(std::string_view endpoint, std::optional<google_credentials>, seastar::semaphore*, shared_ptr<seastar::tls::certificate_credentials> creds);
|
||
|
||
future<> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, handler_func_ex, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr);
|
||
future<> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, rest::httpclient::handler_func, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr);
|
||
future<rest::httpclient::result_type> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr);
|
||
|
||
auto get_units(size_t s) const {
|
||
return seastar::get_units(_limits, s);
|
||
}
|
||
auto try_get_units(size_t s) const {
|
||
return seastar::try_get_units(_limits, s);
|
||
}
|
||
future<> close();
|
||
};
|
||
|
||
future<> storage::client::impl::authorize(request_wrapper& req, const std::string& scope) {
|
||
if (_credentials) {
|
||
co_await _credentials->refresh(scope, &storage_scope_implies, _certs);
|
||
req.add_header(utils::gcp::AUTHORIZATION, format_bearer(_credentials->token));
|
||
}
|
||
}
|
||
|
||
utils::gcp::storage::client::impl::impl(const utils::http::url_info& url, std::optional<google_credentials> c, seastar::semaphore* memory, shared_ptr<seastar::tls::certificate_credentials> certs)
|
||
: _endpoint(url.host)
|
||
, _credentials(std::move(c))
|
||
, _unlimited(std::numeric_limits<ssize_t>::max())
|
||
, _limits(memory ? *memory : _unlimited)
|
||
, _client(std::make_unique<utils::http::dns_connection_factory>(url.host, url.port, url.is_https(), gcp_storage, certs), 100, seastar::http::experimental::client::retry_requests::yes)
|
||
{}
|
||
|
||
utils::gcp::storage::client::impl::impl(std::string_view endpoint, std::optional<google_credentials> c, seastar::semaphore* memory, shared_ptr<seastar::tls::certificate_credentials> certs)
|
||
: impl(utils::http::parse_simple_url(endpoint.empty() ? STORAGE_APIS_URI : endpoint), std::move(c), memory, std::move(certs))
|
||
{}
|
||
|
||
using status_type = seastar::http::reply::status_type;
|
||
|
||
static std::string get_gcp_error_message(std::string_view body) {
|
||
if (!body.empty()) {
|
||
try {
|
||
auto json = rjson::parse(body);
|
||
if (auto* error = rjson::find(json, "error")) {
|
||
if (auto msg = rjson::get_opt<std::string>(*error, "message")) {
|
||
return *msg;
|
||
}
|
||
}
|
||
} catch (...) {
|
||
}
|
||
}
|
||
return "no info";
|
||
}
|
||
|
||
static future<std::string> get_gcp_error_message(input_stream<char>& in) {
|
||
auto s = co_await util::read_entire_stream_contiguous(in);
|
||
co_return get_gcp_error_message(s);
|
||
}
|
||
|
||
utils::gcp::storage::storage_error::storage_error(const std::string& msg)
|
||
: std::runtime_error(msg)
|
||
, _status(-1)
|
||
{}
|
||
|
||
utils::gcp::storage::storage_error::storage_error(int status, const std::string& msg)
|
||
: std::runtime_error(fmt::format("{}: {}", status, msg))
|
||
, _status(status)
|
||
{}
|
||
|
||
using namespace seastar::http;
|
||
using namespace std::chrono_literals;
|
||
|
||
/**
|
||
* Performs a REST post/put/get with credential refresh/retry.
|
||
*/
|
||
future<>
|
||
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, handler_func_ex handler, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
|
||
rest::request_wrapper req(_endpoint);
|
||
req.target(path);
|
||
req.method(op);
|
||
|
||
for (auto& [k,v] : headers) {
|
||
req.add_header(k, v);
|
||
}
|
||
|
||
std::visit(overloaded_functor {
|
||
[&](const std::string& s) { req.content(content_type, s); },
|
||
[&](const writer_and_size& ws) { req.content(content_type, ws.first, ws.second); }
|
||
}, body);
|
||
|
||
// GCP storage requires this even if content is empty
|
||
req.add_header("Content-Length", std::to_string(req.request().content_length));
|
||
|
||
gcp_storage.trace("Sending: {}", redacted_request_type {
|
||
req.request(),
|
||
bearer_filter()
|
||
});
|
||
|
||
try {
|
||
try {
|
||
co_await authorize(req, scope);
|
||
} catch (...) {
|
||
// just disregard the failure, we will retry below in the wrapped handler
|
||
}
|
||
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, input_stream<char>& in) -> future<> {
|
||
auto _in = std::move(in);
|
||
auto status_class = reply::classify_status(rep._status);
|
||
/*
|
||
* Surprisingly Google Cloud Storage (GCS) commonly returns HTTP 308 during resumable uploads, including when you use PUT. This is expected behavior and
|
||
* not an error. The 308 tells the client to continue the upload at the same URL without changing the method or body, which is exactly how GCS’s
|
||
* resumable upload protocol works.
|
||
*/
|
||
if (status_class != reply::status_class::informational && status_class != reply::status_class::success &&
|
||
rep._status != status_type::permanent_redirect) {
|
||
if (rep._status == status_type::unauthorized) {
|
||
gcp_storage.warn("Request to failed with status {}. Refreshing credentials.", rep._status);
|
||
co_await authorize(req, scope);
|
||
}
|
||
auto content = co_await util::read_entire_stream_contiguous(_in);
|
||
auto error_msg = get_gcp_error_message(std::string_view(content));
|
||
gcp_storage.debug("Got unexpected response status: {}, content: {}", rep._status, content);
|
||
co_await coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
|
||
}
|
||
|
||
std::exception_ptr eptr;
|
||
try {
|
||
// TODO: rename the fault injection point to something more generic
|
||
if (utils::get_local_injector().enter("s3_client_fail_authorization")) {
|
||
throw httpd::unexpected_status_error(status_type::unauthorized);
|
||
}
|
||
co_await handler(rep, _in);
|
||
} catch (...) {
|
||
eptr = std::current_exception();
|
||
}
|
||
if (eptr) {
|
||
co_await coroutine::return_exception_ptr(std::move(eptr));
|
||
}
|
||
};
|
||
object_storage_retry_strategy retry_strategy(10,10ms,10000ms, as);
|
||
co_return co_await rest::simple_send(_client, req, wrapped_handler, &retry_strategy, as);
|
||
} catch (...) {
|
||
try {
|
||
std::rethrow_exception(std::current_exception());
|
||
} catch (const httpd::unexpected_status_error& e) {
|
||
auto status = e.status();
|
||
|
||
if (reply::classify_status(status) == reply::status_class::redirection || status == reply::status_type::not_found) {
|
||
throw storage_io_error{ENOENT, format("GCP object doesn't exist ({})", status)};
|
||
}
|
||
if (status == reply::status_type::forbidden || status == reply::status_type::unauthorized) {
|
||
throw storage_io_error{EACCES, format("GCP access denied ({})", status)};
|
||
}
|
||
|
||
throw storage_io_error{EIO, format("GCP request failed with ({})", status)};
|
||
} catch (...) {
|
||
throw storage_io_error{EIO, format("GCP error ({})", std::current_exception())};
|
||
}
|
||
}
|
||
}
|
||
|
||
future<>
|
||
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, rest::httpclient::handler_func f, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
|
||
co_await send_with_retry(path, scope, std::move(body), content_type, [f](const seastar::http::reply& rep, seastar::input_stream<char>& in) -> future<> {
|
||
// ensure these are on our coroutine frame.
|
||
auto& resp_handler = f;
|
||
auto result = co_await util::read_entire_stream_contiguous(in);
|
||
resp_handler(rep, result);
|
||
}, op, headers, as);
|
||
}
|
||
|
||
future<rest::httpclient::result_type>
|
||
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
|
||
rest::httpclient::result_type res;
|
||
co_await send_with_retry(path, scope, std::move(body), content_type, [&res](const seastar::http::reply& r, std::string_view body) {
|
||
gcp_storage.trace("{}", body);
|
||
res.reply._status = r._status;
|
||
res.reply._content = sstring(body);
|
||
res.reply._headers = r._headers;
|
||
res.reply._version = r._version;
|
||
}, op, headers, as);
|
||
co_return res;
|
||
}
|
||
|
||
future<> utils::gcp::storage::client::impl::close() {
|
||
co_await _client.close();
|
||
}
|
||
|
||
// Get an upload session for the given object
|
||
// See https://cloud.google.com/storage/docs/resumable-uploads
|
||
// See https://cloud.google.com/storage/docs/performing-resumable-uploads
|
||
future<> utils::gcp::storage::client::object_data_sink::acquire_session() {
|
||
std::string body;
|
||
if (!_metadata.IsNull()) {
|
||
body = rjson::print(_metadata);
|
||
}
|
||
auto path = fmt::format("/upload/storage/v1/b/{}/o?uploadType=resumable&name={}"
|
||
, _bucket
|
||
, seastar::http::internal::url_encode(_object_name)
|
||
);
|
||
|
||
auto reply = co_await _impl->send_with_retry(path
|
||
, GCP_OBJECT_SCOPE_READ_WRITE
|
||
, std::move(body)
|
||
, APPLICATION_JSON
|
||
, httpclient::method_type::POST
|
||
, {}
|
||
, _as
|
||
);
|
||
|
||
if (reply.result() != status_type::ok) {
|
||
throw failed_operation(int(reply.result()), get_gcp_error_message(reply.body()));
|
||
}
|
||
std::string location = reply.reply._headers[LOCATION];
|
||
gcp_storage.debug("Upload {}/{} -> session uri {}", _bucket, _object_name, location);
|
||
_session_path = utils::http::parse_simple_url(location).path;
|
||
}
|
||
|
||
static const boost::regex range_ex("bytes=(\\d+)-(\\d+)");
|
||
|
||
static bool parse_response_range(const seastar::http::reply& r, uint64_t& first, uint64_t& last) {
|
||
auto& res_headers = r._headers;
|
||
auto i = res_headers.find(RANGE);
|
||
if (i == res_headers.end()) {
|
||
return false;
|
||
}
|
||
|
||
boost::smatch m;
|
||
std::string tmp(i->second);
|
||
if (!boost::regex_match(tmp, m, range_ex)) {
|
||
return false;
|
||
}
|
||
first = std::stoull(m[1].str());
|
||
last = std::stoull(m[2].str());
|
||
|
||
return true;
|
||
}
|
||
|
||
future<> utils::gcp::storage::client::object_data_sink::adjust_memory_limit(size_t total) {
|
||
auto held = _mem_held.count();
|
||
if (held < total) {
|
||
auto want = align_up(total, default_gcp_storage_chunk_size) - held;
|
||
|
||
if (held == 0) {
|
||
// first put into buffer queue. enforce.
|
||
_mem_held = co_await _impl->get_units(want);
|
||
} else {
|
||
// try to get units to cover the bulk of buffers
|
||
// but if we fail, we accept it and try to get by
|
||
// with the lease we have. If we get here we should
|
||
// have at least 8M in our lease, and will in fact do
|
||
// a write, so data should get released.
|
||
auto h = _impl->try_get_units(want);
|
||
if (h) {
|
||
_mem_held.adopt(std::move(*h));
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Write a chunk to the dest object
|
||
// See https://cloud.google.com/storage/docs/resumable-uploads
|
||
// See https://cloud.google.com/storage/docs/performing-resumable-uploads
|
||
future<> utils::gcp::storage::client::object_data_sink::do_single_upload(std::deque<temporary_buffer<char>> bufs, size_t offset, size_t len, bool final) {
|
||
// always take the whole memory lease. This might be more or less than what we actually release
|
||
// but we only ever leave sub-256k amount of data in queue, and we want the next
|
||
// put to enforce waiting for a full 8M lease...
|
||
auto mine_held = std::exchange(_mem_held, {});
|
||
|
||
// Ensure to block close from completing
|
||
auto h = _gate.hold();
|
||
// Enforce our concurrency constraints
|
||
auto sem_units = co_await seastar::get_units(_semaphore, 1);
|
||
|
||
// our file range. if the sink was closed, we can set the
|
||
// final size, otherwise, leave it open (*)
|
||
auto last = offset + std::max(len, size_t(1)) - 1; // inclusive.
|
||
auto end = offset + len;
|
||
|
||
for (;;) {
|
||
auto range = fmt::format("bytes {}-{}/{}"
|
||
, offset // first byte
|
||
, last // last byte
|
||
, final ? std::to_string(end) : "*"s
|
||
);
|
||
|
||
try {
|
||
if (_session_path.empty()) {
|
||
co_await acquire_session();
|
||
}
|
||
|
||
gcp_storage.debug("{}:{} write range {}-{}", _bucket, _object_name, offset, offset+len);
|
||
|
||
auto res = co_await _impl->send_with_retry(_session_path
|
||
, GCP_OBJECT_SCOPE_READ_WRITE
|
||
, std::make_pair([&](output_stream<char>&& os_in) -> future<> {
|
||
auto os = std::move(os_in);
|
||
for (auto& buf : bufs) {
|
||
co_await os.write(buf.share());
|
||
}
|
||
co_await os.flush();
|
||
co_await os.close();
|
||
}, len)
|
||
, ""s // no content type
|
||
, httpclient::method_type::PUT
|
||
, rest::key_values({ { CONTENT_RANGE, range } })
|
||
, _as
|
||
);
|
||
|
||
switch (res.result()) {
|
||
case status_type::ok:
|
||
case status_type::created:
|
||
_completed = true;
|
||
gcp_storage.debug("{}:{} completed ({} bytes)", _bucket, _object_name, offset+len);
|
||
co_return; // done and happy
|
||
default:
|
||
if (int(res.result()) == 308) {
|
||
uint64_t first = 0, new_last = 0;
|
||
if (parse_response_range(res.reply, first, new_last) && last != new_last) {
|
||
auto written = (new_last + 1) - offset;
|
||
|
||
gcp_storage.debug("{}:{} partial upload ({} bytes)", _bucket, _object_name, written);
|
||
|
||
if (!final && (len - written) < min_gcp_storage_chunk_size) {
|
||
written = len - std::min(min_gcp_storage_chunk_size, len);
|
||
}
|
||
|
||
auto to_remove = written;
|
||
while (to_remove) {
|
||
auto& buf = bufs.front();
|
||
auto size = std::min(to_remove, buf.size());
|
||
buf.trim_front(size);
|
||
if (buf.empty()) {
|
||
bufs.pop_front();
|
||
}
|
||
to_remove -= size;
|
||
}
|
||
offset += written;
|
||
len -= written;
|
||
auto total = std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) {
|
||
return s + buf.size();
|
||
});
|
||
assert(len == total);
|
||
continue;
|
||
}
|
||
// incomplete. ok for partial
|
||
gcp_storage.debug("{}:{} chunk {}:{} done", _bucket, _object_name, offset, offset+len);
|
||
co_return;
|
||
}
|
||
throw failed_upload_error(int(res.result()), get_gcp_error_message(res.body()));
|
||
}
|
||
} catch (...) {
|
||
_exception = std::current_exception();
|
||
gcp_storage.warn("Exception in upload of {}:{} ({}/{}): {}"
|
||
, _bucket
|
||
, _object_name
|
||
, offset
|
||
, len
|
||
, _exception
|
||
);
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
// Check/close the final object.
|
||
future<> utils::gcp::storage::client::object_data_sink::check_upload() {
|
||
// Now we know the final size. Set it in range
|
||
auto range = fmt::format("bytes */{}", _accumulated);
|
||
|
||
auto res = co_await _impl->send_with_retry(_session_path
|
||
, GCP_OBJECT_SCOPE_READ_WRITE
|
||
, ""s
|
||
, APPLICATION_JSON
|
||
, httpclient::method_type::PUT
|
||
, rest::key_values({ { CONTENT_RANGE, range } })
|
||
, _as
|
||
);
|
||
|
||
switch (res.result()) {
|
||
case status_type::ok:
|
||
case status_type::created:
|
||
_completed = true;
|
||
gcp_storage.debug("{}:{} completed ({})", _bucket, _object_name, _accumulated);
|
||
co_return; // done and happy
|
||
default:
|
||
throw failed_upload_error(int(res.result()), fmt::format("{}:{} incomplete. ({}): {}"
|
||
, _bucket, _object_name, res.reply._headers[RANGE]
|
||
, get_gcp_error_message(res.body())
|
||
));
|
||
}
|
||
}
|
||
|
||
// https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload
|
||
future<> utils::gcp::storage::client::object_data_sink::remove_upload() {
|
||
if (_completed || _session_path.empty()) {
|
||
co_return;
|
||
}
|
||
|
||
gcp_storage.debug("Removing incomplete upload {}:{} ()", _bucket, _object_name, _session_path);
|
||
|
||
auto res = co_await _impl->send_with_retry(_session_path
|
||
, GCP_OBJECT_SCOPE_READ_WRITE
|
||
, ""s
|
||
, APPLICATION_JSON
|
||
, httpclient::method_type::DELETE
|
||
, {}
|
||
, _as
|
||
);
|
||
|
||
switch (int(res.result())) {
|
||
case 499: // not in enum yet
|
||
gcp_storage.debug("Upload of {}:{} removed ({})", _bucket, _object_name, _session_path);
|
||
co_return; // done and happy
|
||
default: {
|
||
auto msg = get_gcp_error_message(res.body());
|
||
gcp_storage.warn("Failed to remove broken upload of {}:{} ({})", _bucket, _object_name, msg);
|
||
if (!_exception) {
|
||
throw failed_upload_error(int(res.result()), fmt::format("{}:{} incomplete. ({}): {}"
|
||
, _bucket, _object_name, res.reply._headers[RANGE]
|
||
, msg
|
||
));
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Read a single buffer from the source object
|
||
future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::get(size_t limit) {
|
||
// If we don't know the source size yet, get the info from server
|
||
if (_size == 0) {
|
||
co_await read_info();
|
||
}
|
||
|
||
// If we don't have buffers to give, try getting one from server
|
||
if (_buffers.empty()) {
|
||
auto to_read = std::min(_size - _position, limit);
|
||
|
||
// to_read == 0 -> eof
|
||
if (to_read != 0) {
|
||
gcp_storage.debug("Reading object {}:{} ({}-{}/{})", _bucket, _object_name, _position, _position+to_read, _size);
|
||
|
||
auto lease = _impl->try_get_units(to_read);
|
||
if (lease) {
|
||
if (_limits) {
|
||
_limits.adopt(std::move(*lease));
|
||
} else {
|
||
_limits = std::move(*lease);
|
||
}
|
||
} else {
|
||
// If we can't get a lease to cover this read, don't wait, as this
|
||
// could cause deadlock in higher layers, but instead adjust the
|
||
// size down to decrease memory pressure.
|
||
to_read = std::min(to_read, min_gcp_storage_chunk_size);
|
||
gcp_storage.debug("Reading object (adjusted) {}:{} ({}-{}/{})", _bucket, _object_name, _position, _position+to_read, _size);
|
||
}
|
||
|
||
// Ensure we read from the same generation as we queried in read_info. Note: mock server ignores this.
|
||
auto path = fmt::format("/storage/v1/b/{}/o/{}?ifGenerationMatch={}&alt=media"
|
||
, _bucket
|
||
, seastar::http::internal::url_encode(_object_name)
|
||
, _generation
|
||
);
|
||
auto range = fmt::format("bytes={}-{}", _position, _position+to_read-1); // inclusive range
|
||
|
||
co_await _impl->send_with_retry(path
|
||
, GCP_OBJECT_SCOPE_READ_ONLY
|
||
, ""s
|
||
, ""s
|
||
, [&](const seastar::http::reply& rep, seastar::input_stream<char>& in) -> future<> {
|
||
if (rep._status != status_type::ok && rep._status != status_type::partial_content) {
|
||
throw failed_operation(fmt::format("Could not read object {}: {} ({}/{} - {})", _bucket, _object_name, _position, _size, int(rep._status)));
|
||
}
|
||
auto old = _position;
|
||
// ensure these are on our coroutine frame.
|
||
auto bufs = co_await util::read_entire_stream(in);
|
||
for (auto&& buf : bufs) {
|
||
_position += buf.size();
|
||
_buffers.emplace_back(std::move(buf));
|
||
}
|
||
gcp_storage.debug("Read object {}:{} ({}-{}/{})", _bucket, _object_name, old, _position, _size);
|
||
}
|
||
, httpclient::method_type::GET
|
||
, rest::key_values({ { RANGE, range } })
|
||
, _as
|
||
);
|
||
}
|
||
}
|
||
|
||
temporary_buffer<char> res;
|
||
|
||
if (!_buffers.empty()) {
|
||
auto&& buf = _buffers.front();
|
||
if (buf.size() >= limit) {
|
||
res = buf.share(0, limit);
|
||
buf.trim_front(limit);
|
||
} else {
|
||
res = std::move(buf);
|
||
_buffers.pop_front();
|
||
}
|
||
}
|
||
|
||
adjust_lease();
|
||
|
||
co_return res;
|
||
}
|
||
|
||
future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::get() {
|
||
// If we don't have buffers to give, try getting one from server
|
||
co_return co_await get(default_gcp_storage_chunk_size);
|
||
}
|
||
|
||
future<> utils::gcp::storage::client::object_data_source::seek(uint64_t pos) {
|
||
if (_size == 0) {
|
||
co_await read_info();
|
||
}
|
||
auto buf_size = buffer_size();
|
||
assert(buf_size <= _position);
|
||
auto read_pos = _position - buf_size;
|
||
if (pos < read_pos || pos >= _position) {
|
||
_buffers.clear();
|
||
_position = std::min(pos, _size);
|
||
co_return;
|
||
}
|
||
auto n = pos - read_pos;
|
||
// Drop superfluous cache
|
||
while (n > 0 && !_buffers.empty()) {
|
||
auto m = std::min(n, _buffers.front().size());
|
||
_buffers.front().trim_front(m);
|
||
if (_buffers.front().empty()) {
|
||
_buffers.pop_front();
|
||
}
|
||
n -= m;
|
||
}
|
||
adjust_lease();
|
||
}
|
||
|
||
void utils::gcp::storage::client::object_data_source::adjust_lease() {
|
||
auto total = std::accumulate(_buffers.begin(), _buffers.end(), size_t{}, [](size_t s, auto& buf) {
|
||
return s + buf.size();
|
||
});
|
||
if (total < _limits.count()) {
|
||
_limits.return_units(_limits.count() - total);
|
||
}
|
||
}
|
||
|
||
future<uint64_t> utils::gcp::storage::client::object_data_source::size() {
|
||
if (_size == 0) {
|
||
co_await read_info();
|
||
}
|
||
co_return _size;
|
||
}
|
||
|
||
future<std::chrono::system_clock::time_point> utils::gcp::storage::client::object_data_source::timestamp() {
|
||
if (_timestamp.time_since_epoch().count() == 0) {
|
||
co_await read_info();
|
||
}
|
||
co_return _timestamp;
|
||
}
|
||
|
||
future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::skip(uint64_t n) {
|
||
auto buf_size = buffer_size();
|
||
assert(buf_size <= _position);
|
||
auto read_pos = _position - buf_size;
|
||
co_await seek(read_pos + n);
|
||
// And get the next buffer
|
||
co_return co_await get();
|
||
}
|
||
|
||
future<> utils::gcp::storage::client::object_data_source::read_info() {
|
||
gcp_storage.debug("Read info {}:{}", _bucket, _object_name);
|
||
|
||
auto path = fmt::format("/storage/v1/b/{}/o/{}", _bucket, seastar::http::internal::url_encode(_object_name));
|
||
|
||
auto res = co_await _impl->send_with_retry(path
|
||
, GCP_OBJECT_SCOPE_READ_ONLY
|
||
, ""s
|
||
, ""s
|
||
, httpclient::method_type::GET
|
||
, {}
|
||
, _as
|
||
);
|
||
|
||
if (res.result() != status_type::ok) {
|
||
throw failed_operation(fmt::format("Could not query object {}:{} {}", _bucket, _object_name, res.result()));
|
||
}
|
||
|
||
auto item = rjson::parse(std::move(res.body()));
|
||
// Ensure we got the info we asked for/expect
|
||
if (rjson::get<std::string>(item, "kind") != "storage#object"s) {
|
||
throw failed_operation("Malformed query object reply");
|
||
}
|
||
|
||
_size = std::stoull(rjson::get<std::string>(item, "size"));
|
||
_generation = std::stoull(rjson::get<std::string>(item, "generation"));
|
||
_timestamp = parse_rfc3339(rjson::get<std::string>(item, "updated"));
|
||
}
|
||
|
||
utils::gcp::storage::client::client(std::string_view endpoint, std::optional<google_credentials> c, shared_ptr<seastar::tls::certificate_credentials> certs)
|
||
: _impl(seastar::make_shared<impl>(endpoint, std::move(c), nullptr, std::move(certs)))
|
||
{}
|
||
|
||
utils::gcp::storage::client::client(std::string_view endpoint, std::optional<google_credentials> c, seastar::semaphore& memory, shared_ptr<seastar::tls::certificate_credentials> certs)
|
||
: _impl(seastar::make_shared<impl>(endpoint, std::move(c), &memory, std::move(certs)))
|
||
{}
|
||
|
||
utils::gcp::storage::client::~client() = default;
|
||
|
||
|
||
future<> utils::gcp::storage::client::create_bucket(std::string_view project, rjson::value meta) {
|
||
gcp_storage.debug("Create bucket {}:{}", project, rjson::get(meta, "name"));
|
||
|
||
auto path = fmt::format("/storage/v1/b?project={}", project);
|
||
auto body = rjson::print(meta);
|
||
|
||
auto res = co_await _impl->send_with_retry(path
|
||
, GCP_OBJECT_SCOPE_FULL_CONTROL
|
||
, body
|
||
, APPLICATION_JSON
|
||
, httpclient::method_type::POST
|
||
);
|
||
|
||
switch (res.result()) {
|
||
case status_type::ok:
|
||
case status_type::created:
|
||
co_return; // done and happy
|
||
default:
|
||
throw failed_operation(fmt::format("Could not create bucket {}: {}", rjson::get(meta, "name"), res.result()));
|
||
}
|
||
}
|
||
|
||
future<> utils::gcp::storage::client::create_bucket(std::string_view project, std::string_view bucket, std::string_view region, std::string_view storage_class) {
|
||
// Construct metadata. Could fmt::format, but this is somewhat safer.
|
||
rjson::value meta = rjson::empty_object();
|
||
rjson::add(meta, "name", std::string(bucket));
|
||
rjson::add(meta, "location", std::string(region.empty() ? "US" : region));
|
||
rjson::add(meta, "storageClass", std::string(storage_class.empty() ? "STANDARD" : storage_class));
|
||
|
||
rjson::value uniformBucketLevelAccess = rjson::empty_object();
|
||
rjson::add(uniformBucketLevelAccess, "enabled", true);
|
||
rjson::value iamConfiguration = rjson::empty_object();
|
||
rjson::add(iamConfiguration, "uniformBucketLevelAccess", std::move(uniformBucketLevelAccess));
|
||
rjson::add(meta, "iamConfiguration", std::move(iamConfiguration));
|
||
|
||
co_await create_bucket(project, std::move(meta));
|
||
}
|
||
|
||
future<> utils::gcp::storage::client::delete_bucket(std::string_view bucket_in) {
|
||
std::string bucket(bucket_in);
|
||
|
||
gcp_storage.debug("Delete bucket {}", bucket);
|
||
|
||
auto path = fmt::format("/storage/v1/b/{}", bucket);
|
||
|
||
auto res = co_await _impl->send_with_retry(path
|
||
, GCP_OBJECT_SCOPE_FULL_CONTROL
|
||
, ""s
|
||
, ""s
|
||
, httpclient::method_type::DELETE
|
||
);
|
||
|
||
switch (res.result()) {
|
||
case status_type::ok: // mock server sends wrong code, but seems acceptable
|
||
case status_type::no_content:
|
||
co_return; // done and happy
|
||
default:
|
||
throw failed_operation(fmt::format("Could not delete bucket {}: {}", bucket, res.result()));
|
||
}
|
||
}
|
||
|
||
static utils::gcp::storage::object_info create_info(const rjson::value& item) {
|
||
utils::gcp::storage::object_info info;
|
||
|
||
info.name = rjson::get<std::string>(item, "name");
|
||
info.content_type = rjson::get_opt<std::string>(item, "contentType").value_or(""s);
|
||
info.size = std::stoull(rjson::get<std::string>(item, "size"));
|
||
info.generation = std::stoull(rjson::get<std::string>(item, "generation"));
|
||
info.modified = parse_rfc3339(rjson::get<std::string>(item, "updated"));
|
||
|
||
return info;
|
||
}
|
||
|
||
// See https://cloud.google.com/storage/docs/listing-objects
|
||
// TODO: maybe make a generator? However, we don't have a streaming
|
||
// json parsing routine as such, so however we do this, we need to
|
||
// read all data from network, etc. Thus there is not all that much
|
||
// point in it. Return chunked_vector to avoid large alloc, but keep it
|
||
// in one object... for now...
|
||
future<utils::chunked_vector<utils::gcp::storage::object_info>> utils::gcp::storage::client::list_objects(std::string_view bucket_in, std::string_view prefix, bucket_paging& pager) {
|
||
utils::chunked_vector<utils::gcp::storage::object_info> result;
|
||
|
||
if (pager.done) {
|
||
co_return result;
|
||
}
|
||
|
||
std::string bucket(bucket_in);
|
||
|
||
gcp_storage.debug("List bucket {} (prefix={}, max_results={})", bucket, prefix, pager.max_results);
|
||
|
||
auto path = fmt::format("/storage/v1/b/{}/o", bucket);
|
||
auto psep = "?";
|
||
if (!prefix.empty()) {
|
||
path += fmt::format("{}prefix={}", psep, prefix);
|
||
psep = "&&";
|
||
}
|
||
if (pager.max_results != 0) {
|
||
path += fmt::format("{}maxResults={}", psep, pager.max_results);
|
||
psep = "&&";
|
||
}
|
||
if (!pager.token.empty()) {
|
||
path += fmt::format("{}pageToken={}", psep, pager.token);
|
||
psep = "&&";
|
||
}
|
||
|
||
co_await _impl->send_with_retry(path
|
||
, GCP_OBJECT_SCOPE_READ_ONLY
|
||
, ""s
|
||
, ""s
|
||
, [&](const seastar::http::reply& rep, seastar::input_stream<char>& in) -> future<> {
|
||
if (rep._status != status_type::ok) {
|
||
throw failed_operation(fmt::format("Could not list bucket {}: {} ({})", bucket, rep._status
|
||
, co_await get_gcp_error_message(in)
|
||
));
|
||
}
|
||
|
||
// ensure these are on our coroutine frame.
|
||
auto bufs = co_await util::read_entire_stream(in);
|
||
auto root = rjson::parse(std::move(bufs));
|
||
|
||
if (rjson::get<std::string>(root, "kind") != "storage#objects"s) {
|
||
throw failed_operation("Malformed list object reply");
|
||
}
|
||
|
||
auto items = rjson::find(root, "items");
|
||
if (!items) {
|
||
co_return;
|
||
}
|
||
if (!items->IsArray()) {
|
||
throw failed_operation("Malformed list object items");
|
||
}
|
||
|
||
pager.token = rjson::get_opt<std::string>(root, "nextPageToken").value_or(""s);
|
||
pager.done = pager.token.empty();
|
||
|
||
for (auto& item : items->GetArray()) {
|
||
object_info info = create_info(item);
|
||
result.emplace_back(std::move(info));
|
||
}
|
||
|
||
}
|
||
, httpclient::method_type::GET
|
||
);
|
||
|
||
co_return result;
|
||
}
|
||
|
||
future<utils::chunked_vector<utils::gcp::storage::object_info>> utils::gcp::storage::client::list_objects(std::string_view bucket, std::string_view prefix) {
|
||
bucket_paging dummy(0);
|
||
co_return co_await list_objects(bucket, prefix, dummy);
|
||
}
|
||
|
||
// See https://cloud.google.com/storage/docs/deleting-objects
|
||
future<> utils::gcp::storage::client::delete_object(std::string_view bucket_in, std::string_view object_name_in) {
|
||
std::string bucket(bucket_in), object_name(object_name_in);
|
||
|
||
gcp_storage.debug("Delete object {}:{}", bucket, object_name);
|
||
|
||
auto path = fmt::format("/storage/v1/b/{}/o/{}", bucket, seastar::http::internal::url_encode(object_name));
|
||
|
||
httpclient::result_type res;
|
||
try {
|
||
res = co_await _impl->send_with_retry(path, GCP_OBJECT_SCOPE_READ_WRITE, ""s, ""s, httpclient::method_type::DELETE);
|
||
} catch (const storage_io_error& ex) {
|
||
if (ex.code().value() == ENOENT) {
|
||
gcp_storage.debug("Could not delete {}:{} - no such object", bucket, object_name);
|
||
co_return; // ok...?
|
||
}
|
||
std::rethrow_exception(std::current_exception());
|
||
}
|
||
|
||
switch (res.result()) {
|
||
case status_type::ok:
|
||
case status_type::no_content:
|
||
gcp_storage.debug("Deleted {}:{}", bucket, object_name);
|
||
co_return; // done and happy
|
||
default:
|
||
throw failed_operation(fmt::format("Could not delete object {}:{}: {} ({})", bucket, object_name, res.result()
|
||
, get_gcp_error_message(res.body())
|
||
));
|
||
}
|
||
}
|
||
|
||
// See https://cloud.google.com/storage/docs/copying-renaming-moving-objects
|
||
// GCP does not support moveTo across buckets.
|
||
future<> utils::gcp::storage::client::rename_object(std::string_view bucket, std::string_view object_name, std::string_view new_bucket, std::string_view new_name) {
|
||
co_await copy_object(bucket, object_name, new_bucket, new_name);
|
||
co_await delete_object(bucket, object_name);
|
||
}
|
||
|
||
// See https://cloud.google.com/storage/docs/copying-renaming-moving-objects
|
||
future<> utils::gcp::storage::client::rename_object(std::string_view bucket_in, std::string_view object_name_in, std::string_view new_name_in) {
|
||
std::string bucket(bucket_in), object_name(object_name_in), new_name(new_name_in);
|
||
|
||
gcp_storage.debug("Move object {}:{} -> {}", bucket, object_name, new_name);
|
||
|
||
auto path = fmt::format("/storage/v1/b/{}/o/{}/moveTo/o/{}"
|
||
, bucket
|
||
, seastar::http::internal::url_encode(object_name)
|
||
, seastar::http::internal::url_encode(new_name)
|
||
);
|
||
auto res = co_await _impl->send_with_retry(path
|
||
, GCP_OBJECT_SCOPE_READ_WRITE
|
||
, ""s
|
||
, ""s
|
||
, httpclient::method_type::PUT
|
||
);
|
||
|
||
switch (res.result()) {
|
||
case status_type::ok:
|
||
case status_type::created:
|
||
gcp_storage.debug("Moved {}:{} to {}", bucket, object_name, new_name);
|
||
co_return; // done and happy
|
||
default:
|
||
throw failed_operation(fmt::format("Could not rename object {}:{}: {} ({})", bucket, object_name, res.result()
|
||
, get_gcp_error_message(res.body())
|
||
));
|
||
}
|
||
}
|
||
|
||
// See https://cloud.google.com/storage/docs/copying-renaming-moving-objects
|
||
// Copying an object in GCP can only process a certain amount of data in one call
|
||
// Must keep doing it until all data is copied, and check response.
|
||
future<> utils::gcp::storage::client::copy_object(std::string_view bucket_in, std::string_view object_name_in, std::string_view new_bucket_in, std::string_view to_name_in) {
|
||
std::string bucket(bucket_in), object_name(object_name_in), new_bucket(new_bucket_in), to_name(to_name_in);
|
||
|
||
auto path = fmt::format("/storage/v1/b/{}/o/{}/rewriteTo/b/{}/o/{}"
|
||
, bucket
|
||
, seastar::http::internal::url_encode(object_name)
|
||
, new_bucket
|
||
, seastar::http::internal::url_encode(to_name)
|
||
);
|
||
std::string body;
|
||
|
||
for (;;) {
|
||
auto res = co_await _impl->send_with_retry(path
|
||
, GCP_OBJECT_SCOPE_READ_WRITE
|
||
, body
|
||
, APPLICATION_JSON
|
||
, httpclient::method_type::PUT
|
||
);
|
||
|
||
if (res.result() != status_type::ok) {
|
||
throw failed_operation(fmt::format("Could not copy object {}:{}: {} ({})", bucket, object_name, res.result()
|
||
, get_gcp_error_message(res.body())
|
||
));
|
||
}
|
||
|
||
auto resp = rjson::parse(res.body());
|
||
if (rjson::get<bool>(resp, "done")) {
|
||
gcp_storage.debug("Copied {}:{} to {}:{}", bucket, object_name, new_bucket, to_name);
|
||
co_return; // done and happy
|
||
}
|
||
|
||
auto token = rjson::get<std::string>(resp, "rewriteToken");
|
||
auto written = rjson::get<uint64_t>(resp, "totalBytesRewritten");
|
||
auto size = rjson::get<uint64_t>(resp, "objectSize");
|
||
|
||
// Call 2+ must include the rewriteToken
|
||
body = fmt::format("{{\"rewriteToken\": \"{}\"}}", token);
|
||
|
||
gcp_storage.debug("Partial copy of {}:{} to {}:{} ({}/{})", bucket, object_name, new_bucket, to_name, written, size);
|
||
}
|
||
}
|
||
|
||
future<utils::gcp::storage::object_info> utils::gcp::storage::client::merge_objects(std::string_view bucket_in, std::string_view dest_object_name, const std::vector<std::string>& source_object_names, rjson::value metadata, seastar::abort_source* as) {
|
||
rjson::value compose = rjson::empty_object();
|
||
rjson::value source_objects = rjson::empty_array();
|
||
|
||
if (source_object_names.size() > 32) {
|
||
throw std::invalid_argument(fmt::format("Can only merge up to 32 objects. {} requested.", source_object_names.size()));
|
||
}
|
||
|
||
for (auto& src : source_object_names) {
|
||
rjson::value obj = rjson::empty_object();
|
||
rjson::add(obj, "name", src);
|
||
rjson::push_back(source_objects, std::move(obj));
|
||
}
|
||
|
||
rjson::add(compose, "sourceObjects", std::move(source_objects));
|
||
rjson::add(compose, "destination", std::move(metadata));
|
||
|
||
std::string bucket(bucket_in), object_name(dest_object_name);
|
||
|
||
auto path = fmt::format("/storage/v1/b/{}/o/{}/compose", bucket, seastar::http::internal::url_encode(object_name));
|
||
auto body = rjson::print(compose);
|
||
|
||
auto res = co_await _impl->send_with_retry(path
|
||
, GCP_OBJECT_SCOPE_READ_WRITE
|
||
, body
|
||
, APPLICATION_JSON
|
||
, httpclient::method_type::POST
|
||
);
|
||
|
||
if (res.result() != status_type::ok) {
|
||
throw failed_operation(fmt::format("Could not merge to object {} -> {}:{}: {} ({})", source_object_names, bucket, object_name, res.result()
|
||
, get_gcp_error_message(res.body())
|
||
));
|
||
}
|
||
|
||
auto resp = rjson::parse(res.body());
|
||
|
||
co_return create_info(resp);
|
||
}
|
||
|
||
future<> utils::gcp::storage::client::copy_object(std::string_view bucket, std::string_view object_name, std::string_view to_name) {
|
||
co_await copy_object(bucket, object_name, bucket, to_name);
|
||
}
|
||
|
||
seastar::data_sink utils::gcp::storage::client::create_upload_sink(std::string_view bucket, std::string_view object_name, rjson::value metadata, seastar::abort_source* as) const {
|
||
return seastar::data_sink(std::make_unique<object_data_sink>(_impl, bucket, object_name, std::move(metadata), as));
|
||
}
|
||
|
||
seekable_data_source utils::gcp::storage::client::create_download_source(std::string_view bucket, std::string_view object_name, seastar::abort_source* as) const {
|
||
return seekable_data_source(std::make_unique<object_data_source>(_impl, bucket, object_name, as));
|
||
}
|
||
|
||
future<> utils::gcp::storage::client::close() {
|
||
return _impl->close();
|
||
}
|
||
|
||
const std::string utils::gcp::storage::client::DEFAULT_ENDPOINT = "https://storage.googleapis.com";
|