Files
scylladb/utils/gcp/object_storage.cc
Pavel Emelyanov c4a0f6f2e6 object_store: Don't leave dangling objects by iterating moved-from names vector
The code in upload_file std::move()-s vector of names into
merge_objects() method, then iterates over this vector to delete
objects. The iteration is apparently a no-op on moved-from vector.

The fix is to make merge_objects() helper get vector of names by const
reference -- the method doesn't modify the names collection, the caller
keeps one in stable storage.

Fixes #29060

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#29061
2026-03-20 10:09:30 +02:00

1164 lines
46 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "object_storage.hh"
#include "gcp_credentials.hh"
#include "object_storage_retry_strategy.hh"
#include <algorithm>
#include <numeric>
#include <deque>
#include <boost/regex.hpp>
#include <seastar/core/align.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/units.hh>
#include <seastar/http/client.hh>
#include <seastar/util/short_streams.hh>
#include "utils/rest/client.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/error_injection.hh"
#include "utils/exceptions.hh"
#include "utils/http.hh"
#include "utils/http_client_error_processing.hh"
#include "utils/overloaded_functor.hh"
static logger gcp_storage("gcp_storage");
static constexpr uint64_t min_gcp_storage_chunk_size = 256*1024;
static constexpr uint64_t default_gcp_storage_chunk_size = 8*1024*1024;
static constexpr char GCP_OBJECT_SCOPE_READ_ONLY[] = "https://www.googleapis.com/auth/devstorage.read_only";
static constexpr char GCP_OBJECT_SCOPE_READ_WRITE[] = "https://www.googleapis.com/auth/devstorage.read_write";
static constexpr char GCP_OBJECT_SCOPE_FULL_CONTROL[] = "https://www.googleapis.com/auth/devstorage.full_control";
static constexpr char STORAGE_APIS_URI[] = "https://storage.googleapis.com";
static constexpr char APPLICATION_JSON[] = "application/json";
static constexpr char LOCATION[] = "Location";
static constexpr char CONTENT_RANGE[] = "Content-Range";
static constexpr char RANGE[] = "Range";
using namespace std::string_literals;
using namespace utils::gcp;
static bool storage_scope_implies(const scopes_type& scopes, const scopes_type& check_for) {
if (default_scopes_implies_other_scope(scopes, check_for)) {
return true;
}
if (scopes_contains_scope(check_for, GCP_OBJECT_SCOPE_READ_ONLY)) {
return scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_READ_WRITE)
|| scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_FULL_CONTROL)
;
}
if (scopes_contains_scope(check_for, GCP_OBJECT_SCOPE_READ_WRITE)) {
return scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_FULL_CONTROL);
}
return false;
}
static auto parse_rfc3339(const std::string& s) {
std::chrono::system_clock::time_point t;
std::istringstream is(s);
is >> std::chrono::parse("%FT%TZ", t);
return t;
}
class utils::gcp::storage::client::object_data_sink : public data_sink_impl {
shared_ptr<impl> _impl;
std::string _bucket;
std::string _object_name;
rjson::value _metadata;
std::string _session_path;
std::string _content_type;
std::deque<temporary_buffer<char>> _buffers;
uint64_t _accumulated = 0;
seastar::semaphore_units<> _mem_held;
bool _closed = false;
bool _completed = false;
seastar::named_gate _gate;
seastar::semaphore _semaphore;
std::exception_ptr _exception;
seastar::abort_source* _as;
public:
object_data_sink(shared_ptr<impl> i, std::string_view bucket, std::string_view object_name, rjson::value metadata, seastar::abort_source* as)
: _impl(i)
, _bucket(bucket)
, _object_name(object_name)
, _metadata(std::move(metadata))
, _semaphore(1)
, _as(as)
{}
future<> put(std::span<temporary_buffer<char>> bufs) override {
for (auto&& buf : bufs) {
_buffers.emplace_back(std::move(buf));
}
co_await maybe_do_upload(false);
}
future<> flush() override {
return maybe_do_upload(true);
}
future<> close() override {
if (!std::exchange(_closed, true)) {
co_await flush();
co_await _gate.close();
try {
if (!_exception && !_completed) {
co_await check_upload();
}
} catch (...) {
_exception = std::current_exception();
}
if (auto ex = std::exchange(_exception, {})) {
co_await remove_upload();
std::rethrow_exception(ex);
}
}
}
size_t buffer_size() const noexcept override {
return default_gcp_storage_chunk_size;
}
future<> acquire_session();
future<> do_single_upload(std::deque<temporary_buffer<char>>, size_t offset, size_t len, bool final);
future<> check_upload();
future<> remove_upload();
future<> adjust_memory_limit(size_t);
future<> maybe_do_upload(bool force) {
auto total = std::accumulate(_buffers.begin(), _buffers.end(), size_t{}, [](size_t s, auto& buf) {
return s + buf.size();
});
if (total == 0) {
co_return;
}
co_await adjust_memory_limit(total);
// GCP only allows upload of less than 256k on last chunk
if (total < min_gcp_storage_chunk_size && !_closed) {
co_return;
}
// avoid uploading unless we accumulate enough data. GCP docs says to
// try to keep uploads to 8MB or more.
if (force || total >= default_gcp_storage_chunk_size) {
auto bufs = std::exchange(_buffers, {});
auto start = _accumulated;
auto final = _closed;
if (!final) {
// can only write in multiples of 256.
auto rem = total % min_gcp_storage_chunk_size;
total -= rem;
while (rem) {
auto& buf = bufs.back();
if (buf.size() > rem) {
auto keep = buf.size() - rem;
_buffers.emplace(_buffers.begin(), buf.share(keep, rem));
buf.trim(keep);
break;
} else {
rem -= buf.size();
_buffers.emplace(_buffers.begin(), std::move(buf));
bufs.pop_back();
}
}
}
assert(std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) { return s + buf.size(); }) == total);
_accumulated += total;
// allow this in background
(void)do_single_upload(std::move(bufs), start, total, final);
}
}
};
class utils::gcp::storage::client::object_data_source : public seekable_data_source_impl {
shared_ptr<impl> _impl;
std::string _bucket;
std::string _object_name;
std::string _session_path;
uint64_t _generation = 0;
uint64_t _size = 0;
uint64_t _position = 0;
std::chrono::system_clock::time_point _timestamp;
seastar::semaphore_units<> _limits;
seastar::abort_source* _as;
std::deque<temporary_buffer<char>> _buffers;
size_t buffer_size() const {
return std::accumulate(_buffers.begin(), _buffers.end(), 0, [](size_t sum, auto& b) { return sum + b.size(); });
}
public:
object_data_source(shared_ptr<impl> i, std::string_view bucket, std::string_view object_name, seastar::abort_source* as)
: _impl(i)
, _bucket(bucket)
, _object_name(object_name)
, _as(as)
{}
future<temporary_buffer<char>> get() override;
future<temporary_buffer<char>> skip(uint64_t n) override;
future<> read_info();
void adjust_lease();
future<temporary_buffer<char>> get(size_t limit) override;
future<> seek(uint64_t pos) override;
future<uint64_t> size() override;
future<std::chrono::system_clock::time_point> timestamp() override;
};
using body_writer = std::function<future<>(output_stream<char>&&)>;
using writer_and_size = std::pair<body_writer, size_t>;
using body_variant = std::variant<std::string, writer_and_size>;
using handler_func_ex = rest::handler_func_ex;
using headers_type = std::vector<rest::key_value>;
using namespace rest;
class utils::gcp::storage::client::impl {
std::string _endpoint;
std::optional<google_credentials> _credentials;
seastar::semaphore _unlimited;
seastar::semaphore& _limits;
seastar::http::experimental::client _client;
shared_ptr<seastar::tls::certificate_credentials> _certs;
future<> authorize(request_wrapper& req, const std::string& scope);
public:
impl(const utils::http::url_info&, std::optional<google_credentials>, seastar::semaphore*, shared_ptr<seastar::tls::certificate_credentials> creds);
impl(std::string_view endpoint, std::optional<google_credentials>, seastar::semaphore*, shared_ptr<seastar::tls::certificate_credentials> creds);
future<> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, handler_func_ex, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr);
future<> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, rest::httpclient::handler_func, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr);
future<rest::httpclient::result_type> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr);
auto get_units(size_t s) const {
return seastar::get_units(_limits, s);
}
auto try_get_units(size_t s) const {
return seastar::try_get_units(_limits, s);
}
future<> close();
};
future<> storage::client::impl::authorize(request_wrapper& req, const std::string& scope) {
if (_credentials) {
co_await _credentials->refresh(scope, &storage_scope_implies, _certs);
req.add_header(utils::gcp::AUTHORIZATION, format_bearer(_credentials->token));
}
}
utils::gcp::storage::client::impl::impl(const utils::http::url_info& url, std::optional<google_credentials> c, seastar::semaphore* memory, shared_ptr<seastar::tls::certificate_credentials> certs)
: _endpoint(url.host)
, _credentials(std::move(c))
, _unlimited(std::numeric_limits<ssize_t>::max())
, _limits(memory ? *memory : _unlimited)
, _client(std::make_unique<utils::http::dns_connection_factory>(url.host, url.port, url.is_https(), gcp_storage, certs), 100, seastar::http::experimental::client::retry_requests::yes)
{}
utils::gcp::storage::client::impl::impl(std::string_view endpoint, std::optional<google_credentials> c, seastar::semaphore* memory, shared_ptr<seastar::tls::certificate_credentials> certs)
: impl(utils::http::parse_simple_url(endpoint.empty() ? STORAGE_APIS_URI : endpoint), std::move(c), memory, std::move(certs))
{}
using status_type = seastar::http::reply::status_type;
static std::string get_gcp_error_message(std::string_view body) {
if (!body.empty()) {
try {
auto json = rjson::parse(body);
if (auto* error = rjson::find(json, "error")) {
if (auto msg = rjson::get_opt<std::string>(*error, "message")) {
return *msg;
}
}
} catch (...) {
}
}
return "no info";
}
static future<std::string> get_gcp_error_message(input_stream<char>& in) {
auto s = co_await util::read_entire_stream_contiguous(in);
co_return get_gcp_error_message(s);
}
utils::gcp::storage::storage_error::storage_error(const std::string& msg)
: std::runtime_error(msg)
, _status(-1)
{}
utils::gcp::storage::storage_error::storage_error(int status, const std::string& msg)
: std::runtime_error(fmt::format("{}: {}", status, msg))
, _status(status)
{}
using namespace seastar::http;
using namespace std::chrono_literals;
/**
* Performs a REST post/put/get with credential refresh/retry.
*/
future<>
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, handler_func_ex handler, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
rest::request_wrapper req(_endpoint);
req.target(path);
req.method(op);
for (auto& [k,v] : headers) {
req.add_header(k, v);
}
std::visit(overloaded_functor {
[&](const std::string& s) { req.content(content_type, s); },
[&](const writer_and_size& ws) { req.content(content_type, ws.first, ws.second); }
}, body);
// GCP storage requires this even if content is empty
req.add_header("Content-Length", std::to_string(req.request().content_length));
gcp_storage.trace("Sending: {}", redacted_request_type {
req.request(),
bearer_filter()
});
try {
try {
co_await authorize(req, scope);
} catch (...) {
// just disregard the failure, we will retry below in the wrapped handler
}
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, input_stream<char>& in) -> future<> {
auto _in = std::move(in);
auto status_class = reply::classify_status(rep._status);
/*
* Surprisingly Google Cloud Storage (GCS) commonly returns HTTP 308 during resumable uploads, including when you use PUT. This is expected behavior and
* not an error. The 308 tells the client to continue the upload at the same URL without changing the method or body, which is exactly how GCSs
* resumable upload protocol works.
*/
if (status_class != reply::status_class::informational && status_class != reply::status_class::success &&
rep._status != status_type::permanent_redirect) {
if (rep._status == status_type::unauthorized) {
gcp_storage.warn("Request to failed with status {}. Refreshing credentials.", rep._status);
co_await authorize(req, scope);
}
auto content = co_await util::read_entire_stream_contiguous(_in);
auto error_msg = get_gcp_error_message(std::string_view(content));
gcp_storage.debug("Got unexpected response status: {}, content: {}", rep._status, content);
co_await coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
}
std::exception_ptr eptr;
try {
// TODO: rename the fault injection point to something more generic
if (utils::get_local_injector().enter("s3_client_fail_authorization")) {
throw httpd::unexpected_status_error(status_type::unauthorized);
}
co_await handler(rep, _in);
} catch (...) {
eptr = std::current_exception();
}
if (eptr) {
co_await coroutine::return_exception_ptr(std::move(eptr));
}
};
object_storage_retry_strategy retry_strategy(10,10ms,10000ms, as);
co_return co_await rest::simple_send(_client, req, wrapped_handler, &retry_strategy, as);
} catch (...) {
try {
std::rethrow_exception(std::current_exception());
} catch (const httpd::unexpected_status_error& e) {
auto status = e.status();
if (reply::classify_status(status) == reply::status_class::redirection || status == reply::status_type::not_found) {
throw storage_io_error{ENOENT, format("GCP object doesn't exist ({})", status)};
}
if (status == reply::status_type::forbidden || status == reply::status_type::unauthorized) {
throw storage_io_error{EACCES, format("GCP access denied ({})", status)};
}
throw storage_io_error{EIO, format("GCP request failed with ({})", status)};
} catch (...) {
throw storage_io_error{EIO, format("GCP error ({})", std::current_exception())};
}
}
}
future<>
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, rest::httpclient::handler_func f, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
co_await send_with_retry(path, scope, std::move(body), content_type, [f](const seastar::http::reply& rep, seastar::input_stream<char>& in) -> future<> {
// ensure these are on our coroutine frame.
auto& resp_handler = f;
auto result = co_await util::read_entire_stream_contiguous(in);
resp_handler(rep, result);
}, op, headers, as);
}
future<rest::httpclient::result_type>
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
rest::httpclient::result_type res;
co_await send_with_retry(path, scope, std::move(body), content_type, [&res](const seastar::http::reply& r, std::string_view body) {
gcp_storage.trace("{}", body);
res.reply._status = r._status;
res.reply._content = sstring(body);
res.reply._headers = r._headers;
res.reply._version = r._version;
}, op, headers, as);
co_return res;
}
future<> utils::gcp::storage::client::impl::close() {
co_await _client.close();
}
// Get an upload session for the given object
// See https://cloud.google.com/storage/docs/resumable-uploads
// See https://cloud.google.com/storage/docs/performing-resumable-uploads
future<> utils::gcp::storage::client::object_data_sink::acquire_session() {
std::string body;
if (!_metadata.IsNull()) {
body = rjson::print(_metadata);
}
auto path = fmt::format("/upload/storage/v1/b/{}/o?uploadType=resumable&name={}"
, _bucket
, seastar::http::internal::url_encode(_object_name)
);
auto reply = co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_READ_WRITE
, std::move(body)
, APPLICATION_JSON
, httpclient::method_type::POST
, {}
, _as
);
if (reply.result() != status_type::ok) {
throw failed_operation(int(reply.result()), get_gcp_error_message(reply.body()));
}
std::string location = reply.reply._headers[LOCATION];
gcp_storage.debug("Upload {}/{} -> session uri {}", _bucket, _object_name, location);
_session_path = utils::http::parse_simple_url(location).path;
}
static const boost::regex range_ex("bytes=(\\d+)-(\\d+)");
static bool parse_response_range(const seastar::http::reply& r, uint64_t& first, uint64_t& last) {
auto& res_headers = r._headers;
auto i = res_headers.find(RANGE);
if (i == res_headers.end()) {
return false;
}
boost::smatch m;
std::string tmp(i->second);
if (!boost::regex_match(tmp, m, range_ex)) {
return false;
}
first = std::stoull(m[1].str());
last = std::stoull(m[2].str());
return true;
}
future<> utils::gcp::storage::client::object_data_sink::adjust_memory_limit(size_t total) {
auto held = _mem_held.count();
if (held < total) {
auto want = align_up(total, default_gcp_storage_chunk_size) - held;
if (held == 0) {
// first put into buffer queue. enforce.
_mem_held = co_await _impl->get_units(want);
} else {
// try to get units to cover the bulk of buffers
// but if we fail, we accept it and try to get by
// with the lease we have. If we get here we should
// have at least 8M in our lease, and will in fact do
// a write, so data should get released.
auto h = _impl->try_get_units(want);
if (h) {
_mem_held.adopt(std::move(*h));
}
}
}
}
// Write a chunk to the dest object
// See https://cloud.google.com/storage/docs/resumable-uploads
// See https://cloud.google.com/storage/docs/performing-resumable-uploads
future<> utils::gcp::storage::client::object_data_sink::do_single_upload(std::deque<temporary_buffer<char>> bufs, size_t offset, size_t len, bool final) {
// always take the whole memory lease. This might be more or less than what we actually release
// but we only ever leave sub-256k amount of data in queue, and we want the next
// put to enforce waiting for a full 8M lease...
auto mine_held = std::exchange(_mem_held, {});
// Ensure to block close from completing
auto h = _gate.hold();
// Enforce our concurrency constraints
auto sem_units = co_await seastar::get_units(_semaphore, 1);
// our file range. if the sink was closed, we can set the
// final size, otherwise, leave it open (*)
auto last = offset + std::max(len, size_t(1)) - 1; // inclusive.
auto end = offset + len;
for (;;) {
auto range = fmt::format("bytes {}-{}/{}"
, offset // first byte
, last // last byte
, final ? std::to_string(end) : "*"s
);
try {
if (_session_path.empty()) {
co_await acquire_session();
}
gcp_storage.debug("{}:{} write range {}-{}", _bucket, _object_name, offset, offset+len);
auto res = co_await _impl->send_with_retry(_session_path
, GCP_OBJECT_SCOPE_READ_WRITE
, std::make_pair([&](output_stream<char>&& os_in) -> future<> {
auto os = std::move(os_in);
for (auto& buf : bufs) {
co_await os.write(buf.share());
}
co_await os.flush();
co_await os.close();
}, len)
, ""s // no content type
, httpclient::method_type::PUT
, rest::key_values({ { CONTENT_RANGE, range } })
, _as
);
switch (res.result()) {
case status_type::ok:
case status_type::created:
_completed = true;
gcp_storage.debug("{}:{} completed ({} bytes)", _bucket, _object_name, offset+len);
co_return; // done and happy
default:
if (int(res.result()) == 308) {
uint64_t first = 0, new_last = 0;
if (parse_response_range(res.reply, first, new_last) && last != new_last) {
auto written = (new_last + 1) - offset;
gcp_storage.debug("{}:{} partial upload ({} bytes)", _bucket, _object_name, written);
if (!final && (len - written) < min_gcp_storage_chunk_size) {
written = len - std::min(min_gcp_storage_chunk_size, len);
}
auto to_remove = written;
while (to_remove) {
auto& buf = bufs.front();
auto size = std::min(to_remove, buf.size());
buf.trim_front(size);
if (buf.empty()) {
bufs.pop_front();
}
to_remove -= size;
}
offset += written;
len -= written;
auto total = std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) {
return s + buf.size();
});
assert(len == total);
continue;
}
// incomplete. ok for partial
gcp_storage.debug("{}:{} chunk {}:{} done", _bucket, _object_name, offset, offset+len);
co_return;
}
throw failed_upload_error(int(res.result()), get_gcp_error_message(res.body()));
}
} catch (...) {
_exception = std::current_exception();
gcp_storage.warn("Exception in upload of {}:{} ({}/{}): {}"
, _bucket
, _object_name
, offset
, len
, _exception
);
break;
}
}
}
// Check/close the final object.
future<> utils::gcp::storage::client::object_data_sink::check_upload() {
// Now we know the final size. Set it in range
auto range = fmt::format("bytes */{}", _accumulated);
auto res = co_await _impl->send_with_retry(_session_path
, GCP_OBJECT_SCOPE_READ_WRITE
, ""s
, APPLICATION_JSON
, httpclient::method_type::PUT
, rest::key_values({ { CONTENT_RANGE, range } })
, _as
);
switch (res.result()) {
case status_type::ok:
case status_type::created:
_completed = true;
gcp_storage.debug("{}:{} completed ({})", _bucket, _object_name, _accumulated);
co_return; // done and happy
default:
throw failed_upload_error(int(res.result()), fmt::format("{}:{} incomplete. ({}): {}"
, _bucket, _object_name, res.reply._headers[RANGE]
, get_gcp_error_message(res.body())
));
}
}
// https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload
future<> utils::gcp::storage::client::object_data_sink::remove_upload() {
if (_completed || _session_path.empty()) {
co_return;
}
gcp_storage.debug("Removing incomplete upload {}:{} ()", _bucket, _object_name, _session_path);
auto res = co_await _impl->send_with_retry(_session_path
, GCP_OBJECT_SCOPE_READ_WRITE
, ""s
, APPLICATION_JSON
, httpclient::method_type::DELETE
, {}
, _as
);
switch (int(res.result())) {
case 499: // not in enum yet
gcp_storage.debug("Upload of {}:{} removed ({})", _bucket, _object_name, _session_path);
co_return; // done and happy
default: {
auto msg = get_gcp_error_message(res.body());
gcp_storage.warn("Failed to remove broken upload of {}:{} ({})", _bucket, _object_name, msg);
if (!_exception) {
throw failed_upload_error(int(res.result()), fmt::format("{}:{} incomplete. ({}): {}"
, _bucket, _object_name, res.reply._headers[RANGE]
, msg
));
}
}
}
}
// Read a single buffer from the source object
future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::get(size_t limit) {
// If we don't know the source size yet, get the info from server
if (_size == 0) {
co_await read_info();
}
// If we don't have buffers to give, try getting one from server
if (_buffers.empty()) {
auto to_read = std::min(_size - _position, limit);
// to_read == 0 -> eof
if (to_read != 0) {
gcp_storage.debug("Reading object {}:{} ({}-{}/{})", _bucket, _object_name, _position, _position+to_read, _size);
auto lease = _impl->try_get_units(to_read);
if (lease) {
if (_limits) {
_limits.adopt(std::move(*lease));
} else {
_limits = std::move(*lease);
}
} else {
// If we can't get a lease to cover this read, don't wait, as this
// could cause deadlock in higher layers, but instead adjust the
// size down to decrease memory pressure.
to_read = std::min(to_read, min_gcp_storage_chunk_size);
gcp_storage.debug("Reading object (adjusted) {}:{} ({}-{}/{})", _bucket, _object_name, _position, _position+to_read, _size);
}
// Ensure we read from the same generation as we queried in read_info. Note: mock server ignores this.
auto path = fmt::format("/storage/v1/b/{}/o/{}?ifGenerationMatch={}&alt=media"
, _bucket
, seastar::http::internal::url_encode(_object_name)
, _generation
);
auto range = fmt::format("bytes={}-{}", _position, _position+to_read-1); // inclusive range
co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_READ_ONLY
, ""s
, ""s
, [&](const seastar::http::reply& rep, seastar::input_stream<char>& in) -> future<> {
if (rep._status != status_type::ok && rep._status != status_type::partial_content) {
throw failed_operation(fmt::format("Could not read object {}: {} ({}/{} - {})", _bucket, _object_name, _position, _size, int(rep._status)));
}
auto old = _position;
// ensure these are on our coroutine frame.
auto bufs = co_await util::read_entire_stream(in);
for (auto&& buf : bufs) {
_position += buf.size();
_buffers.emplace_back(std::move(buf));
}
gcp_storage.debug("Read object {}:{} ({}-{}/{})", _bucket, _object_name, old, _position, _size);
}
, httpclient::method_type::GET
, rest::key_values({ { RANGE, range } })
, _as
);
}
}
temporary_buffer<char> res;
if (!_buffers.empty()) {
auto&& buf = _buffers.front();
if (buf.size() >= limit) {
res = buf.share(0, limit);
buf.trim_front(limit);
} else {
res = std::move(buf);
_buffers.pop_front();
}
}
adjust_lease();
co_return res;
}
future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::get() {
// If we don't have buffers to give, try getting one from server
co_return co_await get(default_gcp_storage_chunk_size);
}
future<> utils::gcp::storage::client::object_data_source::seek(uint64_t pos) {
if (_size == 0) {
co_await read_info();
}
auto buf_size = buffer_size();
assert(buf_size <= _position);
auto read_pos = _position - buf_size;
if (pos < read_pos || pos >= _position) {
_buffers.clear();
_position = std::min(pos, _size);
co_return;
}
auto n = pos - read_pos;
// Drop superfluous cache
while (n > 0 && !_buffers.empty()) {
auto m = std::min(n, _buffers.front().size());
_buffers.front().trim_front(m);
if (_buffers.front().empty()) {
_buffers.pop_front();
}
n -= m;
}
adjust_lease();
}
void utils::gcp::storage::client::object_data_source::adjust_lease() {
auto total = std::accumulate(_buffers.begin(), _buffers.end(), size_t{}, [](size_t s, auto& buf) {
return s + buf.size();
});
if (total < _limits.count()) {
_limits.return_units(_limits.count() - total);
}
}
future<uint64_t> utils::gcp::storage::client::object_data_source::size() {
if (_size == 0) {
co_await read_info();
}
co_return _size;
}
future<std::chrono::system_clock::time_point> utils::gcp::storage::client::object_data_source::timestamp() {
if (_timestamp.time_since_epoch().count() == 0) {
co_await read_info();
}
co_return _timestamp;
}
future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::skip(uint64_t n) {
auto buf_size = buffer_size();
assert(buf_size <= _position);
auto read_pos = _position - buf_size;
co_await seek(read_pos + n);
// And get the next buffer
co_return co_await get();
}
future<> utils::gcp::storage::client::object_data_source::read_info() {
gcp_storage.debug("Read info {}:{}", _bucket, _object_name);
auto path = fmt::format("/storage/v1/b/{}/o/{}", _bucket, seastar::http::internal::url_encode(_object_name));
auto res = co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_READ_ONLY
, ""s
, ""s
, httpclient::method_type::GET
, {}
, _as
);
if (res.result() != status_type::ok) {
throw failed_operation(fmt::format("Could not query object {}:{} {}", _bucket, _object_name, res.result()));
}
auto item = rjson::parse(std::move(res.body()));
// Ensure we got the info we asked for/expect
if (rjson::get<std::string>(item, "kind") != "storage#object"s) {
throw failed_operation("Malformed query object reply");
}
_size = std::stoull(rjson::get<std::string>(item, "size"));
_generation = std::stoull(rjson::get<std::string>(item, "generation"));
_timestamp = parse_rfc3339(rjson::get<std::string>(item, "updated"));
}
utils::gcp::storage::client::client(std::string_view endpoint, std::optional<google_credentials> c, shared_ptr<seastar::tls::certificate_credentials> certs)
: _impl(seastar::make_shared<impl>(endpoint, std::move(c), nullptr, std::move(certs)))
{}
utils::gcp::storage::client::client(std::string_view endpoint, std::optional<google_credentials> c, seastar::semaphore& memory, shared_ptr<seastar::tls::certificate_credentials> certs)
: _impl(seastar::make_shared<impl>(endpoint, std::move(c), &memory, std::move(certs)))
{}
utils::gcp::storage::client::~client() = default;
future<> utils::gcp::storage::client::create_bucket(std::string_view project, rjson::value meta) {
gcp_storage.debug("Create bucket {}:{}", project, rjson::get(meta, "name"));
auto path = fmt::format("/storage/v1/b?project={}", project);
auto body = rjson::print(meta);
auto res = co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_FULL_CONTROL
, body
, APPLICATION_JSON
, httpclient::method_type::POST
);
switch (res.result()) {
case status_type::ok:
case status_type::created:
co_return; // done and happy
default:
throw failed_operation(fmt::format("Could not create bucket {}: {}", rjson::get(meta, "name"), res.result()));
}
}
future<> utils::gcp::storage::client::create_bucket(std::string_view project, std::string_view bucket, std::string_view region, std::string_view storage_class) {
// Construct metadata. Could fmt::format, but this is somewhat safer.
rjson::value meta = rjson::empty_object();
rjson::add(meta, "name", std::string(bucket));
rjson::add(meta, "location", std::string(region.empty() ? "US" : region));
rjson::add(meta, "storageClass", std::string(storage_class.empty() ? "STANDARD" : storage_class));
rjson::value uniformBucketLevelAccess = rjson::empty_object();
rjson::add(uniformBucketLevelAccess, "enabled", true);
rjson::value iamConfiguration = rjson::empty_object();
rjson::add(iamConfiguration, "uniformBucketLevelAccess", std::move(uniformBucketLevelAccess));
rjson::add(meta, "iamConfiguration", std::move(iamConfiguration));
co_await create_bucket(project, std::move(meta));
}
future<> utils::gcp::storage::client::delete_bucket(std::string_view bucket_in) {
std::string bucket(bucket_in);
gcp_storage.debug("Delete bucket {}", bucket);
auto path = fmt::format("/storage/v1/b/{}", bucket);
auto res = co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_FULL_CONTROL
, ""s
, ""s
, httpclient::method_type::DELETE
);
switch (res.result()) {
case status_type::ok: // mock server sends wrong code, but seems acceptable
case status_type::no_content:
co_return; // done and happy
default:
throw failed_operation(fmt::format("Could not delete bucket {}: {}", bucket, res.result()));
}
}
static utils::gcp::storage::object_info create_info(const rjson::value& item) {
utils::gcp::storage::object_info info;
info.name = rjson::get<std::string>(item, "name");
info.content_type = rjson::get_opt<std::string>(item, "contentType").value_or(""s);
info.size = std::stoull(rjson::get<std::string>(item, "size"));
info.generation = std::stoull(rjson::get<std::string>(item, "generation"));
info.modified = parse_rfc3339(rjson::get<std::string>(item, "updated"));
return info;
}
// See https://cloud.google.com/storage/docs/listing-objects
// TODO: maybe make a generator? However, we don't have a streaming
// json parsing routine as such, so however we do this, we need to
// read all data from network, etc. Thus there is not all that much
// point in it. Return chunked_vector to avoid large alloc, but keep it
// in one object... for now...
future<utils::chunked_vector<utils::gcp::storage::object_info>> utils::gcp::storage::client::list_objects(std::string_view bucket_in, std::string_view prefix, bucket_paging& pager) {
utils::chunked_vector<utils::gcp::storage::object_info> result;
if (pager.done) {
co_return result;
}
std::string bucket(bucket_in);
gcp_storage.debug("List bucket {} (prefix={}, max_results={})", bucket, prefix, pager.max_results);
auto path = fmt::format("/storage/v1/b/{}/o", bucket);
auto psep = "?";
if (!prefix.empty()) {
path += fmt::format("{}prefix={}", psep, prefix);
psep = "&&";
}
if (pager.max_results != 0) {
path += fmt::format("{}maxResults={}", psep, pager.max_results);
psep = "&&";
}
if (!pager.token.empty()) {
path += fmt::format("{}pageToken={}", psep, pager.token);
psep = "&&";
}
co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_READ_ONLY
, ""s
, ""s
, [&](const seastar::http::reply& rep, seastar::input_stream<char>& in) -> future<> {
if (rep._status != status_type::ok) {
throw failed_operation(fmt::format("Could not list bucket {}: {} ({})", bucket, rep._status
, co_await get_gcp_error_message(in)
));
}
// ensure these are on our coroutine frame.
auto bufs = co_await util::read_entire_stream(in);
auto root = rjson::parse(std::move(bufs));
if (rjson::get<std::string>(root, "kind") != "storage#objects"s) {
throw failed_operation("Malformed list object reply");
}
auto items = rjson::find(root, "items");
if (!items) {
co_return;
}
if (!items->IsArray()) {
throw failed_operation("Malformed list object items");
}
pager.token = rjson::get_opt<std::string>(root, "nextPageToken").value_or(""s);
pager.done = pager.token.empty();
for (auto& item : items->GetArray()) {
object_info info = create_info(item);
result.emplace_back(std::move(info));
}
}
, httpclient::method_type::GET
);
co_return result;
}
future<utils::chunked_vector<utils::gcp::storage::object_info>> utils::gcp::storage::client::list_objects(std::string_view bucket, std::string_view prefix) {
bucket_paging dummy(0);
co_return co_await list_objects(bucket, prefix, dummy);
}
// See https://cloud.google.com/storage/docs/deleting-objects
future<> utils::gcp::storage::client::delete_object(std::string_view bucket_in, std::string_view object_name_in) {
std::string bucket(bucket_in), object_name(object_name_in);
gcp_storage.debug("Delete object {}:{}", bucket, object_name);
auto path = fmt::format("/storage/v1/b/{}/o/{}", bucket, seastar::http::internal::url_encode(object_name));
httpclient::result_type res;
try {
res = co_await _impl->send_with_retry(path, GCP_OBJECT_SCOPE_READ_WRITE, ""s, ""s, httpclient::method_type::DELETE);
} catch (const storage_io_error& ex) {
if (ex.code().value() == ENOENT) {
gcp_storage.debug("Could not delete {}:{} - no such object", bucket, object_name);
co_return; // ok...?
}
std::rethrow_exception(std::current_exception());
}
switch (res.result()) {
case status_type::ok:
case status_type::no_content:
gcp_storage.debug("Deleted {}:{}", bucket, object_name);
co_return; // done and happy
default:
throw failed_operation(fmt::format("Could not delete object {}:{}: {} ({})", bucket, object_name, res.result()
, get_gcp_error_message(res.body())
));
}
}
// See https://cloud.google.com/storage/docs/copying-renaming-moving-objects
// GCP does not support moveTo across buckets.
future<> utils::gcp::storage::client::rename_object(std::string_view bucket, std::string_view object_name, std::string_view new_bucket, std::string_view new_name) {
co_await copy_object(bucket, object_name, new_bucket, new_name);
co_await delete_object(bucket, object_name);
}
// See https://cloud.google.com/storage/docs/copying-renaming-moving-objects
future<> utils::gcp::storage::client::rename_object(std::string_view bucket_in, std::string_view object_name_in, std::string_view new_name_in) {
std::string bucket(bucket_in), object_name(object_name_in), new_name(new_name_in);
gcp_storage.debug("Move object {}:{} -> {}", bucket, object_name, new_name);
auto path = fmt::format("/storage/v1/b/{}/o/{}/moveTo/o/{}"
, bucket
, seastar::http::internal::url_encode(object_name)
, seastar::http::internal::url_encode(new_name)
);
auto res = co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_READ_WRITE
, ""s
, ""s
, httpclient::method_type::PUT
);
switch (res.result()) {
case status_type::ok:
case status_type::created:
gcp_storage.debug("Moved {}:{} to {}", bucket, object_name, new_name);
co_return; // done and happy
default:
throw failed_operation(fmt::format("Could not rename object {}:{}: {} ({})", bucket, object_name, res.result()
, get_gcp_error_message(res.body())
));
}
}
// See https://cloud.google.com/storage/docs/copying-renaming-moving-objects
// Copying an object in GCP can only process a certain amount of data in one call
// Must keep doing it until all data is copied, and check response.
future<> utils::gcp::storage::client::copy_object(std::string_view bucket_in, std::string_view object_name_in, std::string_view new_bucket_in, std::string_view to_name_in) {
std::string bucket(bucket_in), object_name(object_name_in), new_bucket(new_bucket_in), to_name(to_name_in);
auto path = fmt::format("/storage/v1/b/{}/o/{}/rewriteTo/b/{}/o/{}"
, bucket
, seastar::http::internal::url_encode(object_name)
, new_bucket
, seastar::http::internal::url_encode(to_name)
);
std::string body;
for (;;) {
auto res = co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_READ_WRITE
, body
, APPLICATION_JSON
, httpclient::method_type::PUT
);
if (res.result() != status_type::ok) {
throw failed_operation(fmt::format("Could not copy object {}:{}: {} ({})", bucket, object_name, res.result()
, get_gcp_error_message(res.body())
));
}
auto resp = rjson::parse(res.body());
if (rjson::get<bool>(resp, "done")) {
gcp_storage.debug("Copied {}:{} to {}:{}", bucket, object_name, new_bucket, to_name);
co_return; // done and happy
}
auto token = rjson::get<std::string>(resp, "rewriteToken");
auto written = rjson::get<uint64_t>(resp, "totalBytesRewritten");
auto size = rjson::get<uint64_t>(resp, "objectSize");
// Call 2+ must include the rewriteToken
body = fmt::format("{{\"rewriteToken\": \"{}\"}}", token);
gcp_storage.debug("Partial copy of {}:{} to {}:{} ({}/{})", bucket, object_name, new_bucket, to_name, written, size);
}
}
future<utils::gcp::storage::object_info> utils::gcp::storage::client::merge_objects(std::string_view bucket_in, std::string_view dest_object_name, const std::vector<std::string>& source_object_names, rjson::value metadata, seastar::abort_source* as) {
rjson::value compose = rjson::empty_object();
rjson::value source_objects = rjson::empty_array();
if (source_object_names.size() > 32) {
throw std::invalid_argument(fmt::format("Can only merge up to 32 objects. {} requested.", source_object_names.size()));
}
for (auto& src : source_object_names) {
rjson::value obj = rjson::empty_object();
rjson::add(obj, "name", src);
rjson::push_back(source_objects, std::move(obj));
}
rjson::add(compose, "sourceObjects", std::move(source_objects));
rjson::add(compose, "destination", std::move(metadata));
std::string bucket(bucket_in), object_name(dest_object_name);
auto path = fmt::format("/storage/v1/b/{}/o/{}/compose", bucket, seastar::http::internal::url_encode(object_name));
auto body = rjson::print(compose);
auto res = co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_READ_WRITE
, body
, APPLICATION_JSON
, httpclient::method_type::POST
);
if (res.result() != status_type::ok) {
throw failed_operation(fmt::format("Could not merge to object {} -> {}:{}: {} ({})", source_object_names, bucket, object_name, res.result()
, get_gcp_error_message(res.body())
));
}
auto resp = rjson::parse(res.body());
co_return create_info(resp);
}
future<> utils::gcp::storage::client::copy_object(std::string_view bucket, std::string_view object_name, std::string_view to_name) {
co_await copy_object(bucket, object_name, bucket, to_name);
}
seastar::data_sink utils::gcp::storage::client::create_upload_sink(std::string_view bucket, std::string_view object_name, rjson::value metadata, seastar::abort_source* as) const {
return seastar::data_sink(std::make_unique<object_data_sink>(_impl, bucket, object_name, std::move(metadata), as));
}
seekable_data_source utils::gcp::storage::client::create_download_source(std::string_view bucket, std::string_view object_name, seastar::abort_source* as) const {
return seekable_data_source(std::make_unique<object_data_source>(_impl, bucket, object_name, as));
}
future<> utils::gcp::storage::client::close() {
return _impl->close();
}
const std::string utils::gcp::storage::client::DEFAULT_ENDPOINT = "https://storage.googleapis.com";