/* * Copyright (C) 2025-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "object_storage.hh" #include "gcp_credentials.hh" #include #include #include #include #include #include #include #include #include #include #include "utils/rest/client.hh" #include "utils/exponential_backoff_retry.hh" #include "utils/http.hh" #include "utils/overloaded_functor.hh" static logger gcp_storage("gcp_storage"); static constexpr uint64_t min_gcp_storage_chunk_size = 256*1024; static constexpr uint64_t default_gcp_storage_chunk_size = 8*1024*1024; static constexpr char GCP_OBJECT_SCOPE_READ_ONLY[] = "https://www.googleapis.com/auth/devstorage.read_only"; static constexpr char GCP_OBJECT_SCOPE_READ_WRITE[] = "https://www.googleapis.com/auth/devstorage.read_write"; static constexpr char GCP_OBJECT_SCOPE_FULL_CONTROL[] = "https://www.googleapis.com/auth/devstorage.full_control"; static constexpr char STORAGE_APIS_URI[] = "https://storage.googleapis.com"; static constexpr char APPLICATION_JSON[] = "application/json"; static constexpr char LOCATION[] = "Location"; static constexpr char CONTENT_RANGE[] = "Content-Range"; static constexpr char RANGE[] = "Range"; using namespace std::string_literals; using namespace utils::gcp; static bool storage_scope_implies(const scopes_type& scopes, const scopes_type& check_for) { if (default_scopes_implies_other_scope(scopes, check_for)) { return true; } if (scopes_contains_scope(check_for, GCP_OBJECT_SCOPE_READ_ONLY)) { return scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_READ_WRITE) || scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_FULL_CONTROL) ; } if (scopes_contains_scope(check_for, GCP_OBJECT_SCOPE_READ_WRITE)) { return scopes_contains_scope(scopes, GCP_OBJECT_SCOPE_FULL_CONTROL); } return false; } static auto parse_rfc3339(const std::string& s) { std::chrono::system_clock::time_point t; std::istringstream is(s); is >> std::chrono::parse("%FT%TZ", t); return t; } class utils::gcp::storage::client::object_data_sink : public data_sink_impl { shared_ptr _impl; std::string _bucket; std::string _object_name; rjson::value _metadata; std::string _session_path; std::string _content_type; std::deque> _buffers; uint64_t _accumulated = 0; seastar::semaphore_units<> _mem_held; bool _closed = false; bool _completed = false; seastar::named_gate _gate; seastar::semaphore _semaphore; std::exception_ptr _exception; seastar::abort_source* _as; public: object_data_sink(shared_ptr i, std::string_view bucket, std::string_view object_name, rjson::value metadata, seastar::abort_source* as) : _impl(i) , _bucket(bucket) , _object_name(object_name) , _metadata(std::move(metadata)) , _semaphore(1) , _as(as) {} future<> put(std::span> bufs) override { for (auto&& buf : bufs) { _buffers.emplace_back(std::move(buf)); } co_await maybe_do_upload(false); } future<> flush() override { return maybe_do_upload(true); } future<> close() override { if (!std::exchange(_closed, true)) { co_await flush(); co_await _gate.close(); try { if (!_exception && !_completed) { co_await check_upload(); } } catch (...) { _exception = std::current_exception(); } if (auto ex = std::exchange(_exception, {})) { co_await remove_upload(); std::rethrow_exception(ex); } } } size_t buffer_size() const noexcept override { return default_gcp_storage_chunk_size; } future<> acquire_session(); future<> do_single_upload(std::deque>, size_t offset, size_t len, bool final); future<> check_upload(); future<> remove_upload(); future<> adjust_memory_limit(size_t); future<> maybe_do_upload(bool force) { auto total = std::accumulate(_buffers.begin(), _buffers.end(), size_t{}, [](size_t s, auto& buf) { return s + buf.size(); }); if (total == 0) { co_return; } co_await adjust_memory_limit(total); // GCP only allows upload of less than 256k on last chunk if (total < min_gcp_storage_chunk_size && !_closed) { co_return; } // avoid uploading unless we accumulate enough data. GCP docs says to // try to keep uploads to 8MB or more. if (force || total >= default_gcp_storage_chunk_size) { auto bufs = std::exchange(_buffers, {}); auto start = _accumulated; auto final = _closed; if (!final) { // can only write in multiples of 256. auto rem = total % min_gcp_storage_chunk_size; total -= rem; while (rem) { auto& buf = bufs.back(); if (buf.size() > rem) { auto keep = buf.size() - rem; _buffers.emplace(_buffers.begin(), buf.share(keep, rem)); buf.trim(keep); break; } else { rem -= buf.size(); _buffers.emplace(_buffers.begin(), std::move(buf)); bufs.pop_back(); } } } assert(std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) { return s + buf.size(); }) == total); _accumulated += total; // allow this in background (void)do_single_upload(std::move(bufs), start, total, final); } } }; class utils::gcp::storage::client::object_data_source : public seekable_data_source_impl { shared_ptr _impl; std::string _bucket; std::string _object_name; std::string _session_path; uint64_t _generation = 0; uint64_t _size = 0; uint64_t _position = 0; std::chrono::system_clock::time_point _timestamp; seastar::semaphore_units<> _limits; seastar::abort_source* _as; std::deque> _buffers; size_t buffer_size() const { return std::accumulate(_buffers.begin(), _buffers.end(), 0, [](size_t sum, auto& b) { return sum + b.size(); }); } public: object_data_source(shared_ptr i, std::string_view bucket, std::string_view object_name, seastar::abort_source* as) : _impl(i) , _bucket(bucket) , _object_name(object_name) , _as(as) {} future> get() override; future> skip(uint64_t n) override; future<> read_info(); void adjust_lease(); future> get(size_t limit) override; future<> seek(uint64_t pos) override; future size() override; future timestamp() override; }; using body_writer = std::function(output_stream&&)>; using writer_and_size = std::pair; using body_variant = std::variant; using handler_func_ex = rest::handler_func_ex; using headers_type = std::vector; using namespace rest; class utils::gcp::storage::client::impl { std::string _endpoint; std::optional _credentials; seastar::semaphore _unlimited; seastar::semaphore& _limits; seastar::http::experimental::client _client; shared_ptr _certs; public: impl(const utils::http::url_info&, std::optional, seastar::semaphore*, shared_ptr creds); impl(std::string_view endpoint, std::optional, seastar::semaphore*, shared_ptr creds); future<> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, handler_func_ex, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr); future<> send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, rest::httpclient::handler_func, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr); future send_with_retry(const std::string& path, const std::string& scope, body_variant, std::string_view content_type, httpclient::method_type op, key_values headers = {}, seastar::abort_source* = nullptr); auto get_units(size_t s) const { return seastar::get_units(_limits, s); } auto try_get_units(size_t s) const { return seastar::try_get_units(_limits, s); } future<> close(); }; utils::gcp::storage::client::impl::impl(const utils::http::url_info& url, std::optional c, seastar::semaphore* memory, shared_ptr certs) : _endpoint(url.host) , _credentials(std::move(c)) , _unlimited(std::numeric_limits::max()) , _limits(memory ? *memory : _unlimited) , _client(std::make_unique(url.host, url.port, url.is_https(), gcp_storage, certs), 100, seastar::http::experimental::client::retry_requests::yes) {} utils::gcp::storage::client::impl::impl(std::string_view endpoint, std::optional c, seastar::semaphore* memory, shared_ptr certs) : impl(utils::http::parse_simple_url(endpoint.empty() ? STORAGE_APIS_URI : endpoint), std::move(c), memory, std::move(certs)) {} using status_type = seastar::http::reply::status_type; static std::string get_gcp_error_message(std::string_view body) { if (!body.empty()) { try { auto json = rjson::parse(body); if (auto* error = rjson::find(json, "error")) { if (auto msg = rjson::get_opt(*error, "message")) { return *msg; } } } catch (...) { } } return "no info"; } static future get_gcp_error_message(input_stream& in) { auto s = co_await util::read_entire_stream_contiguous(in); co_return get_gcp_error_message(s); } utils::gcp::storage::storage_error::storage_error(const std::string& msg) : std::runtime_error(msg) , _status(-1) {} utils::gcp::storage::storage_error::storage_error(int status, const std::string& msg) : std::runtime_error(fmt::format("{}: {}", status, msg)) , _status(status) {} using namespace seastar::http; using namespace std::chrono_literals; /** * Performs a REST post/put/get with credential refresh/retry. */ future<> utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, handler_func_ex handler, httpclient::method_type op, key_values headers, seastar::abort_source* as) { static constexpr auto max_retries = 10; exponential_backoff_retry exr(10ms, 10000ms); bool do_backoff = false; for (int retry = 0; ; ++retry) { if (std::exchange(do_backoff, false)) { co_await (as ? exr.retry(*as) : exr.retry()); } rest::request_wrapper req(_endpoint); req.target(path); req.method(op); if (_credentials) { try { try { co_await _credentials->refresh(scope, &storage_scope_implies, _certs); req.add_header(utils::gcp::AUTHORIZATION, format_bearer(_credentials->token)); } catch (httpd::unexpected_status_error& e) { switch (e.status()) { case status_type::request_timeout: default: if (reply::classify_status(e.status()) != reply::status_class::server_error) { break; } if (retry < max_retries) { gcp_storage.debug("Got {}: {}", e.status(), std::current_exception()); // service unavailable etc -> retry do_backoff = true; continue; } break; } throw; } } catch (...) { gcp_storage.error("Error refreshing credentials: {}", std::current_exception()); std::throw_with_nested(permission_error("Error refreshing credentials")); } } for (auto& [k,v] : headers) { req.add_header(k, v); } std::visit(overloaded_functor { [&](const std::string& s) { req.content(content_type, s); }, [&](const writer_and_size& ws) { req.content(content_type, ws.first, ws.second); } }, body); // GCP storage requires this even if content is empty req.add_header("Content-Length", std::to_string(req.request().content_length)); gcp_storage.trace("Sending: {}", redacted_request_type { req.request(), bearer_filter() }); try { co_await rest::simple_send(_client, req, [&handler](const seastar::http::reply& res, seastar::input_stream& in) -> future<> { gcp_storage.trace("Result: {}", res); if (res._status == status_type::unauthorized) { throw permission_error(int(res._status), co_await get_gcp_error_message(in)); } else if (res._status == status_type::request_timeout || reply::classify_status(res._status) == reply::status_class::server_error) { throw storage_error(int(res._status), co_await get_gcp_error_message(in)); } co_await handler(res, in); }, as); break; } catch (storage_error& e) { gcp_storage.debug("{}: Got unexpected response: {}", _endpoint, e.what()); auto s = status_type(e.status()); switch (s) { default: if (reply::classify_status(s) != reply::status_class::server_error) { break; } [[fallthrough]]; case status_type::request_timeout: do_backoff = true; [[fallthrough]]; case status_type::unauthorized: if (retry < max_retries) { continue; // retry loop. } break; } throw; } catch (...) { // network, whatnot. maybe add retries here as well, but should really // be on seastar level throw; } } } future<> utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, rest::httpclient::handler_func f, httpclient::method_type op, key_values headers, seastar::abort_source* as) { co_await send_with_retry(path, scope, std::move(body), content_type, [f](const seastar::http::reply& rep, seastar::input_stream& in) -> future<> { // ensure these are on our coroutine frame. auto& resp_handler = f; auto result = co_await util::read_entire_stream_contiguous(in); resp_handler(rep, result); }, op, headers, as); } future utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, httpclient::method_type op, key_values headers, seastar::abort_source* as) { rest::httpclient::result_type res; co_await send_with_retry(path, scope, std::move(body), content_type, [&res](const seastar::http::reply& r, std::string_view body) { gcp_storage.trace("{}", body); res.reply._status = r._status; res.reply._content = sstring(body); res.reply._headers = r._headers; res.reply._version = r._version; }, op, headers, as); co_return res; } future<> utils::gcp::storage::client::impl::close() { co_await _client.close(); } // Get an upload session for the given object // See https://cloud.google.com/storage/docs/resumable-uploads // See https://cloud.google.com/storage/docs/performing-resumable-uploads future<> utils::gcp::storage::client::object_data_sink::acquire_session() { std::string body; if (!_metadata.IsNull()) { body = rjson::print(_metadata); } auto path = fmt::format("/upload/storage/v1/b/{}/o?uploadType=resumable&name={}" , _bucket , _object_name ); auto reply = co_await _impl->send_with_retry(path , GCP_OBJECT_SCOPE_READ_WRITE , std::move(body) , APPLICATION_JSON , httpclient::method_type::POST , {} , _as ); if (reply.result() != status_type::ok) { throw failed_operation(int(reply.result()), get_gcp_error_message(reply.body())); } std::string location = reply.reply._headers[LOCATION]; gcp_storage.debug("Upload {}/{} -> session uri {}", _bucket, _object_name, location); _session_path = utils::http::parse_simple_url(location).path; } static const boost::regex range_ex("bytes=(\\d+)-(\\d+)"); static bool parse_response_range(const seastar::http::reply& r, uint64_t& first, uint64_t& last) { auto& res_headers = r._headers; auto i = res_headers.find(RANGE); if (i == res_headers.end()) { return false; } boost::smatch m; std::string tmp(i->second); if (!boost::regex_match(tmp, m, range_ex)) { return false; } first = std::stoull(m[1].str()); last = std::stoull(m[2].str()); return true; } future<> utils::gcp::storage::client::object_data_sink::adjust_memory_limit(size_t total) { auto held = _mem_held.count(); if (held < total) { auto want = align_up(total, default_gcp_storage_chunk_size) - held; if (held == 0) { // first put into buffer queue. enforce. _mem_held = co_await _impl->get_units(want); } else { // try to get units to cover the bulk of buffers // but if we fail, we accept it and try to get by // with the lease we have. If we get here we should // have at least 8M in our lease, and will in fact do // a write, so data should get released. auto h = _impl->try_get_units(want); if (h) { _mem_held.adopt(std::move(*h)); } } } } // Write a chunk to the dest object // See https://cloud.google.com/storage/docs/resumable-uploads // See https://cloud.google.com/storage/docs/performing-resumable-uploads future<> utils::gcp::storage::client::object_data_sink::do_single_upload(std::deque> bufs, size_t offset, size_t len, bool final) { // always take the whole memory lease. This might be more or less than what we actually release // but we only ever leave sub-256k amount of data in queue, and we want the next // put to enforce waiting for a full 8M lease... auto mine_held = std::exchange(_mem_held, {}); // Ensure to block close from completing auto h = _gate.hold(); // Enforce our concurrency constraints auto sem_units = co_await seastar::get_units(_semaphore, 1); // our file range. if the sink was closed, we can set the // final size, otherwise, leave it open (*) auto last = offset + std::max(len, size_t(1)) - 1; // inclusive. auto end = offset + len; for (;;) { auto range = fmt::format("bytes {}-{}/{}" , offset // first byte , last // last byte , final ? std::to_string(end) : "*"s ); try { if (_session_path.empty()) { co_await acquire_session(); } gcp_storage.debug("{}:{} write range {}-{}", _bucket, _object_name, offset, offset+len); auto res = co_await _impl->send_with_retry(_session_path , GCP_OBJECT_SCOPE_READ_WRITE , std::make_pair([&](output_stream&& os_in) -> future<> { auto os = std::move(os_in); for (auto& buf : bufs) { co_await os.write(buf.share()); } co_await os.flush(); co_await os.close(); }, len) , ""s // no content type , httpclient::method_type::PUT , rest::key_values({ { CONTENT_RANGE, range } }) , _as ); switch (res.result()) { case status_type::ok: case status_type::created: _completed = true; gcp_storage.debug("{}:{} completed ({} bytes)", _bucket, _object_name, offset+len); co_return; // done and happy default: if (int(res.result()) == 308) { uint64_t first = 0, new_last = 0; if (parse_response_range(res.reply, first, new_last) && last != new_last) { auto written = (new_last + 1) - offset; gcp_storage.debug("{}:{} partial upload ({} bytes)", _bucket, _object_name, written); if (!final && (len - written) < min_gcp_storage_chunk_size) { written = len - std::min(min_gcp_storage_chunk_size, len); } auto to_remove = written; while (to_remove) { auto& buf = bufs.front(); auto size = std::min(to_remove, buf.size()); buf.trim_front(size); if (buf.empty()) { bufs.pop_front(); } to_remove -= size; } offset += written; len -= written; auto total = std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) { return s + buf.size(); }); assert(len == total); continue; } // incomplete. ok for partial gcp_storage.debug("{}:{} chunk {}:{} done", _bucket, _object_name, offset, offset+len); co_return; } throw failed_upload_error(int(res.result()), get_gcp_error_message(res.body())); } } catch (...) { _exception = std::current_exception(); gcp_storage.warn("Exception in upload of {}:{} ({}/{}): {}" , _bucket , _object_name , offset , len , _exception ); break; } } } // Check/close the final object. future<> utils::gcp::storage::client::object_data_sink::check_upload() { // Now we know the final size. Set it in range auto range = fmt::format("bytes */{}", _accumulated); auto res = co_await _impl->send_with_retry(_session_path , GCP_OBJECT_SCOPE_READ_WRITE , ""s , APPLICATION_JSON , httpclient::method_type::PUT , rest::key_values({ { CONTENT_RANGE, range } }) , _as ); switch (res.result()) { case status_type::ok: case status_type::created: _completed = true; gcp_storage.debug("{}:{} completed ({})", _bucket, _object_name, _accumulated); co_return; // done and happy default: throw failed_upload_error(int(res.result()), fmt::format("{}:{} incomplete. ({}): {}" , _bucket, _object_name, res.reply._headers[RANGE] , get_gcp_error_message(res.body()) )); } } // https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload future<> utils::gcp::storage::client::object_data_sink::remove_upload() { if (_completed || _session_path.empty()) { co_return; } gcp_storage.debug("Removing incomplete upload {}:{} ()", _bucket, _object_name, _session_path); auto res = co_await _impl->send_with_retry(_session_path , GCP_OBJECT_SCOPE_READ_WRITE , ""s , APPLICATION_JSON , httpclient::method_type::DELETE , {} , _as ); switch (int(res.result())) { case 499: // not in enum yet gcp_storage.debug("Upload of {}:{} removed ({})", _bucket, _object_name, _session_path); co_return; // done and happy default: { auto msg = get_gcp_error_message(res.body()); gcp_storage.warn("Failed to remove broken upload of {}:{} ({})", _bucket, _object_name, msg); if (!_exception) { throw failed_upload_error(int(res.result()), fmt::format("{}:{} incomplete. ({}): {}" , _bucket, _object_name, res.reply._headers[RANGE] , msg )); } } } } // Read a single buffer from the source object future> utils::gcp::storage::client::object_data_source::get(size_t limit) { // If we don't know the source size yet, get the info from server if (_size == 0) { co_await read_info(); } // If we don't have buffers to give, try getting one from server if (_buffers.empty()) { auto to_read = std::min(_size - _position, limit); // to_read == 0 -> eof if (to_read != 0) { gcp_storage.debug("Reading object {}:{} ({}-{}/{})", _bucket, _object_name, _position, _position+to_read, _size); auto lease = _impl->try_get_units(to_read); if (lease) { if (_limits) { _limits.adopt(std::move(*lease)); } else { _limits = std::move(*lease); } } else { // If we can't get a lease to cover this read, don't wait, as this // could cause deadlock in higher layers, but instead adjust the // size down to decrease memory pressure. to_read = std::min(to_read, min_gcp_storage_chunk_size); gcp_storage.debug("Reading object (adjusted) {}:{} ({}-{}/{})", _bucket, _object_name, _position, _position+to_read, _size); } // Ensure we read from the same generation as we queried in read_info. Note: mock server ignores this. auto path = fmt::format("/storage/v1/b/{}/o/{}?ifGenerationMatch={}&alt=media", _bucket, _object_name, _generation); auto range = fmt::format("bytes={}-{}", _position, _position+to_read-1); // inclusive range co_await _impl->send_with_retry(path , GCP_OBJECT_SCOPE_READ_ONLY , ""s , ""s , [&](const seastar::http::reply& rep, seastar::input_stream& in) -> future<> { if (rep._status != status_type::ok && rep._status != status_type::partial_content) { throw failed_operation(fmt::format("Could not read object {}: {} ({}/{} - {})", _bucket, _object_name, _position, _size, int(rep._status))); } auto old = _position; // ensure these are on our coroutine frame. auto bufs = co_await util::read_entire_stream(in); for (auto&& buf : bufs) { _position += buf.size(); _buffers.emplace_back(std::move(buf)); } gcp_storage.debug("Read object {}:{} ({}-{}/{})", _bucket, _object_name, old, _position, _size); } , httpclient::method_type::GET , rest::key_values({ { RANGE, range } }) , _as ); } } temporary_buffer res; if (!_buffers.empty()) { auto&& buf = _buffers.front(); if (buf.size() >= limit) { res = buf.share(0, limit); buf.trim_front(limit); } else { res = std::move(buf); _buffers.pop_front(); } } adjust_lease(); co_return res; } future> utils::gcp::storage::client::object_data_source::get() { // If we don't have buffers to give, try getting one from server co_return co_await get(default_gcp_storage_chunk_size); } future<> utils::gcp::storage::client::object_data_source::seek(uint64_t pos) { if (_size == 0) { co_await read_info(); } auto buf_size = buffer_size(); assert(buf_size <= _position); auto read_pos = _position - buf_size; if (pos < read_pos || pos >= _position) { _buffers.clear(); _position = std::min(pos, _size); co_return; } auto n = pos - read_pos; // Drop superfluous cache while (n > 0 && !_buffers.empty()) { auto m = std::min(n, _buffers.front().size()); _buffers.front().trim_front(m); if (_buffers.front().empty()) { _buffers.pop_front(); } n -= m; } adjust_lease(); } void utils::gcp::storage::client::object_data_source::adjust_lease() { auto total = std::accumulate(_buffers.begin(), _buffers.end(), size_t{}, [](size_t s, auto& buf) { return s + buf.size(); }); if (total < _limits.count()) { _limits.return_units(_limits.count() - total); } } future utils::gcp::storage::client::object_data_source::size() { if (_size == 0) { co_await read_info(); } co_return _size; } future utils::gcp::storage::client::object_data_source::timestamp() { if (_timestamp.time_since_epoch().count() == 0) { co_await read_info(); } co_return _timestamp; } future> utils::gcp::storage::client::object_data_source::skip(uint64_t n) { auto buf_size = buffer_size(); assert(buf_size <= _position); auto read_pos = _position - buf_size; co_await seek(read_pos + n); // And get the next buffer co_return co_await get(); } future<> utils::gcp::storage::client::object_data_source::read_info() { gcp_storage.debug("Read info {}:{}", _bucket, _object_name); auto path = fmt::format("/storage/v1/b/{}/o/{}", _bucket, _object_name); auto res = co_await _impl->send_with_retry(path , GCP_OBJECT_SCOPE_READ_ONLY , ""s , ""s , httpclient::method_type::GET , {} , _as ); if (res.result() != status_type::ok) { throw failed_operation(fmt::format("Could not query object {}:{} {}", _bucket, _object_name, res.result())); } auto item = rjson::parse(std::move(res.body())); // Ensure we got the info we asked for/expect if (rjson::get(item, "kind") != "storage#object"s) { throw failed_operation("Malformed query object reply"); } _size = std::stoull(rjson::get(item, "size")); _generation = std::stoull(rjson::get(item, "generation")); _timestamp = parse_rfc3339(rjson::get(item, "updated")); } utils::gcp::storage::client::client(std::string_view endpoint, std::optional c, shared_ptr certs) : _impl(seastar::make_shared(endpoint, std::move(c), nullptr, std::move(certs))) {} utils::gcp::storage::client::client(std::string_view endpoint, std::optional c, seastar::semaphore& memory, shared_ptr certs) : _impl(seastar::make_shared(endpoint, std::move(c), &memory, std::move(certs))) {} utils::gcp::storage::client::~client() = default; future<> utils::gcp::storage::client::create_bucket(std::string_view project, rjson::value meta) { gcp_storage.debug("Create bucket {}:{}", project, rjson::get(meta, "name")); auto path = fmt::format("/storage/v1/b?project={}", project); auto body = rjson::print(meta); auto res = co_await _impl->send_with_retry(path , GCP_OBJECT_SCOPE_FULL_CONTROL , body , APPLICATION_JSON , httpclient::method_type::POST ); switch (res.result()) { case status_type::ok: case status_type::created: co_return; // done and happy default: throw failed_operation(fmt::format("Could not create bucket {}: {}", rjson::get(meta, "name"), res.result())); } } future<> utils::gcp::storage::client::create_bucket(std::string_view project, std::string_view bucket, std::string_view region, std::string_view storage_class) { // Construct metadata. Could fmt::format, but this is somewhat safer. rjson::value meta = rjson::empty_object(); rjson::add(meta, "name", std::string(bucket)); rjson::add(meta, "location", std::string(region.empty() ? "US" : region)); rjson::add(meta, "storageClass", std::string(storage_class.empty() ? "STANDARD" : storage_class)); rjson::value uniformBucketLevelAccess = rjson::empty_object(); rjson::add(uniformBucketLevelAccess, "enabled", true); rjson::value iamConfiguration = rjson::empty_object(); rjson::add(iamConfiguration, "uniformBucketLevelAccess", std::move(uniformBucketLevelAccess)); rjson::add(meta, "iamConfiguration", std::move(iamConfiguration)); co_await create_bucket(project, std::move(meta)); } future<> utils::gcp::storage::client::delete_bucket(std::string_view bucket_in) { std::string bucket(bucket_in); gcp_storage.debug("Delete bucket {}", bucket); auto path = fmt::format("/storage/v1/b/{}", bucket); auto res = co_await _impl->send_with_retry(path , GCP_OBJECT_SCOPE_FULL_CONTROL , ""s , ""s , httpclient::method_type::DELETE ); switch (res.result()) { case status_type::ok: // mock server sends wrong code, but seems acceptable case status_type::no_content: co_return; // done and happy default: throw failed_operation(fmt::format("Could not delete bucket {}: {}", bucket, res.result())); } } static utils::gcp::storage::object_info create_info(const rjson::value& item) { utils::gcp::storage::object_info info; info.name = rjson::get(item, "name"); info.content_type = rjson::get_opt(item, "contentType").value_or(""s); info.size = std::stoull(rjson::get(item, "size")); info.generation = std::stoull(rjson::get(item, "generation")); info.modified = parse_rfc3339(rjson::get(item, "updated")); return info; } // See https://cloud.google.com/storage/docs/listing-objects // TODO: maybe make a generator? However, we don't have a streaming // json parsing routine as such, so however we do this, we need to // read all data from network, etc. Thus there is not all that much // point in it. Return chunked_vector to avoid large alloc, but keep it // in one object... for now... future> utils::gcp::storage::client::list_objects(std::string_view bucket_in, std::string_view prefix, bucket_paging& pager) { std::string bucket(bucket_in); gcp_storage.debug("List bucket {} (prefix={}, max_results={})", bucket, prefix, pager.max_results); auto path = fmt::format("/storage/v1/b/{}/o", bucket); auto psep = "?"; if (!prefix.empty()) { path += fmt::format("{}prefix={}", psep, prefix); psep = "&&"; } if (pager.max_results != 0) { path += fmt::format("{}maxResults={}", psep, pager.max_results); psep = "&&"; } if (!pager.token.empty()) { path += fmt::format("{}pageToken={}", psep, pager.token); psep = "&&"; } utils::chunked_vector result; co_await _impl->send_with_retry(path , GCP_OBJECT_SCOPE_READ_ONLY , ""s , ""s , [&](const seastar::http::reply& rep, seastar::input_stream& in) -> future<> { if (rep._status != status_type::ok) { throw failed_operation(fmt::format("Could not list bucket {}: {} ({})", bucket, rep._status , co_await get_gcp_error_message(in) )); } // ensure these are on our coroutine frame. auto bufs = co_await util::read_entire_stream(in); auto root = rjson::parse(std::move(bufs)); if (rjson::get(root, "kind") != "storage#objects"s) { throw failed_operation("Malformed list object reply"); } auto items = rjson::find(root, "items"); if (!items) { co_return; } if (!items->IsArray()) { throw failed_operation("Malformed list object items"); } pager.token = rjson::get_opt(root, "nextPageToken").value_or(""s); for (auto& item : items->GetArray()) { object_info info = create_info(item); result.emplace_back(std::move(info)); } } , httpclient::method_type::GET ); co_return result; } future> utils::gcp::storage::client::list_objects(std::string_view bucket, std::string_view prefix) { bucket_paging dummy(0); co_return co_await list_objects(bucket, prefix, dummy); } // See https://cloud.google.com/storage/docs/deleting-objects future<> utils::gcp::storage::client::delete_object(std::string_view bucket_in, std::string_view object_name_in) { std::string bucket(bucket_in), object_name(object_name_in); gcp_storage.debug("Delete object {}:{}", bucket, object_name); auto path = fmt::format("/storage/v1/b/{}/o/{}", bucket, object_name); auto res = co_await _impl->send_with_retry(path , GCP_OBJECT_SCOPE_READ_WRITE , ""s , ""s , httpclient::method_type::DELETE ); switch (res.result()) { case status_type::ok: case status_type::no_content: gcp_storage.debug("Deleted {}:{}", bucket, object_name); co_return; // done and happy case status_type::not_found: gcp_storage.debug("Could not delete {}:{} - no such object", bucket, object_name); co_return; // ok...? default: throw failed_operation(fmt::format("Could not delete object {}:{}: {} ({})", bucket, object_name, res.result() , get_gcp_error_message(res.body()) )); } } // See https://cloud.google.com/storage/docs/copying-renaming-moving-objects // GCP does not support moveTo across buckets. future<> utils::gcp::storage::client::rename_object(std::string_view bucket, std::string_view object_name, std::string_view new_bucket, std::string_view new_name) { co_await copy_object(bucket, object_name, new_bucket, new_name); co_await delete_object(bucket, object_name); } // See https://cloud.google.com/storage/docs/copying-renaming-moving-objects future<> utils::gcp::storage::client::rename_object(std::string_view bucket_in, std::string_view object_name_in, std::string_view new_name_in) { std::string bucket(bucket_in), object_name(object_name_in), new_name(new_name_in); gcp_storage.debug("Move object {}:{} -> {}", bucket, object_name, new_name); auto path = fmt::format("/storage/v1/b/{}/o/{}/moveTo/o/{}", bucket, object_name, new_name); auto res = co_await _impl->send_with_retry(path , GCP_OBJECT_SCOPE_READ_WRITE , ""s , ""s , httpclient::method_type::PUT ); switch (res.result()) { case status_type::ok: case status_type::created: gcp_storage.debug("Moved {}:{} to {}", bucket, object_name, new_name); co_return; // done and happy default: throw failed_operation(fmt::format("Could not rename object {}:{}: {} ({})", bucket, object_name, res.result() , get_gcp_error_message(res.body()) )); } } // See https://cloud.google.com/storage/docs/copying-renaming-moving-objects // Copying an object in GCP can only process a certain amount of data in one call // Must keep doing it until all data is copied, and check response. future<> utils::gcp::storage::client::copy_object(std::string_view bucket_in, std::string_view object_name_in, std::string_view new_bucket_in, std::string_view to_name_in) { std::string bucket(bucket_in), object_name(object_name_in), new_bucket(new_bucket_in), to_name(to_name_in); auto path = fmt::format("/storage/v1/b/{}/o/{}/rewriteTo/b/{}/o/{}", bucket, object_name, new_bucket, to_name); std::string body; for (;;) { auto res = co_await _impl->send_with_retry(path , GCP_OBJECT_SCOPE_READ_WRITE , body , APPLICATION_JSON , httpclient::method_type::PUT ); if (res.result() != status_type::ok) { throw failed_operation(fmt::format("Could not copy object {}:{}: {} ({})", bucket, object_name, res.result() , get_gcp_error_message(res.body()) )); } auto resp = rjson::parse(res.body()); if (rjson::get(resp, "done")) { gcp_storage.debug("Copied {}:{} to {}:{}", bucket, object_name, new_bucket, to_name); co_return; // done and happy } auto token = rjson::get(resp, "rewriteToken"); auto written = rjson::get(resp, "totalBytesRewritten"); auto size = rjson::get(resp, "objectSize"); // Call 2+ must include the rewriteToken body = fmt::format("{{\"rewriteToken\": \"{}\"}}", token); gcp_storage.debug("Partial copy of {}:{} to {}:{} ({}/{})", bucket, object_name, new_bucket, to_name, written, size); } } future utils::gcp::storage::client::merge_objects(std::string_view bucket_in, std::string_view dest_object_name, std::vector source_object_names, rjson::value metadata, seastar::abort_source* as) { rjson::value compose = rjson::empty_object(); rjson::value source_objects = rjson::empty_array(); if (source_object_names.size() > 32) { throw std::invalid_argument(fmt::format("Can only merge up to 32 objects. {} requested.", source_object_names.size())); } for (auto& src : source_object_names) { rjson::value obj = rjson::empty_object(); rjson::add(obj, "name", src); rjson::push_back(source_objects, std::move(obj)); } rjson::add(compose, "sourceObjects", std::move(source_objects)); rjson::add(compose, "destination", std::move(metadata)); std::string bucket(bucket_in), object_name(dest_object_name); auto path = fmt::format("/storage/v1/b/{}/o/{}/compose", bucket, object_name); auto body = rjson::print(compose); auto res = co_await _impl->send_with_retry(path , GCP_OBJECT_SCOPE_READ_WRITE , body , APPLICATION_JSON , httpclient::method_type::POST ); if (res.result() != status_type::ok) { throw failed_operation(fmt::format("Could not merge to object {} -> {}:{}: {} ({})", source_object_names, bucket, object_name, res.result() , get_gcp_error_message(res.body()) )); } auto resp = rjson::parse(res.body()); co_return create_info(resp); } future<> utils::gcp::storage::client::copy_object(std::string_view bucket, std::string_view object_name, std::string_view to_name) { co_await copy_object(bucket, object_name, bucket, to_name); } seastar::data_sink utils::gcp::storage::client::create_upload_sink(std::string_view bucket, std::string_view object_name, rjson::value metadata, seastar::abort_source* as) const { return seastar::data_sink(std::make_unique(_impl, bucket, object_name, std::move(metadata), as)); } seekable_data_source utils::gcp::storage::client::create_download_source(std::string_view bucket, std::string_view object_name, seastar::abort_source* as) const { return seekable_data_source(std::make_unique(_impl, bucket, object_name, as)); } future<> utils::gcp::storage::client::close() { return _impl->close(); } const std::string utils::gcp::storage::client::DEFAULT_ENDPOINT = "https://storage.googleapis.com";