Files
scylladb/utils/gcp/object_storage.hh
Calle Wilund 01f4dfed84 utils::gcp::object_storage: Add optional memory limits to up/download
Adds optional memory semaphore to limit the mem buffer usage in sink/source.
Note that we don't bookkeep exact, to avoid deadlock issues in higher layer.

In upload, we overlease on first buffer put to ensure we can at least fill
the desired 8M of buffers. We try to adjust when going over, but if we
fail, we fail, but at least will initiate upload -> soon release memory.
On next put, we try to grab multiples of 8M again, and so forth. Thus
potentially causing waiting for resources, without ending up not uploading
at least one active sink.

For download (source), we try to get lease for as much as we want to read,
but if we fail, we adjust this down to 256k and download anyway. Since this
will typically be released immediately, we at least don't overrun for long,
and again, avoid fully stopping, throttling rate instead.
2025-10-13 08:53:27 +00:00

191 lines
6.5 KiB
C++

/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <variant>
#include <string>
#include <chrono>
#include <seastar/core/future.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/net/tls.hh>
#include "utils/rjson.hh"
#include "utils/chunked_vector.hh"
#include "utils/seekable_source.hh"
namespace seastar {
class abort_source;
}
namespace utils::gcp {
struct google_credentials;
}
namespace utils::gcp::storage {
/**
* List info on a named object in a bucket
*/
struct object_info {
std::string name;
std::string content_type;
uint64_t size;
uint64_t generation;
std::chrono::system_clock::time_point modified;
// TODO: what info do we need?
};
class client;
struct bucket_paging {
private:
uint32_t max_results;
std::string token;
friend class client;
public:
bucket_paging(uint64_t max = 1000)
: max_results(max)
{}
bucket_paging(const bucket_paging&) = delete;
bucket_paging(bucket_paging&&) = default;
};
/**
* Minimal GCP object storage client
*/
class client {
class impl;
class object_data_sink;
class object_data_source;
shared_ptr<impl> _impl;
public:
static const std::string DEFAULT_ENDPOINT;
client(client&&);
/**
* @endpoint - typically https://storage.googleapis.com.
* @credentials - google credentials for accessing the bucket(s). Can be nullopt, but only really useful for mockup testing.
* @certs - TLS certs (truststore). Optional TLS parameters for connecting to endpoint. Normally not required (default is
* using system trust iff endpoint is a https url)
*/
client(std::string_view endpoint, std::optional<google_credentials> credentials, shared_ptr<seastar::tls::certificate_credentials> certs={});
/**
* Same as above, but with an additional memory limiting semaphore which will be shared across all up/download source/sinks to
* limit buffer memory usage.
*/
client(std::string_view endpoint, std::optional<google_credentials> credentials, seastar::semaphore& memory_limit, shared_ptr<seastar::tls::certificate_credentials> certs={});
~client();
/**
* Creates a named bucket in project and region, using storage_class
*/
future<> create_bucket(std::string_view project, std::string_view bucket, std::string_view region = {}, std::string_view storage_class = {});
/**
* Creates a named bucket in project, using provided metadata json
* See https://cloud.google.com/storage/docs/creating-buckets
*/
future<> create_bucket(std::string_view project, rjson::value meta);
/**
* Deletes a bucket. Note: bucket must be empty.
*/
future<> delete_bucket(std::string_view bucket);
/**
* List objects in bucket. Optionally applies the @prefix as filter
*/
future<utils::chunked_vector<object_info>> list_objects(std::string_view bucket, std::string_view prefix = {});
/**
* List objects in bucket. Optionally applies the @prefix as filter. Uses page size and offset as defined by
* the bucket_pager
*/
future<utils::chunked_vector<object_info>> list_objects(std::string_view bucket, std::string_view prefix, bucket_paging&);
/**
* Deletes a named object from bucket
*/
future<> delete_object(std::string_view bucket, std::string_view object_name);
/**
* Renames a named object in bucket
*/
future<> rename_object(std::string_view bucket, std::string_view object_name, std::string_view new_name);
/**
* Moves a named object from one bucket to a different one using new name
*/
future<> rename_object(std::string_view bucket, std::string_view object_name, std::string_view new_bucket, std::string_view new_name);
/**
* Copies a named object to @new_name
*/
future<> copy_object(std::string_view bucket, std::string_view object_name, std::string_view to_name);
/**
* Copies a named object to @new_bucket and @new_name
*/
future<> copy_object(std::string_view bucket, std::string_view object_name, std::string_view new_bucket, std::string_view to_name);
/**
* Merges sub-objects into a new destination. Actual file will be composed in order of subobject in `source_object`.
* @return info of the created, merged object.
*/
future<object_info> merge_objects(std::string_view bucket, std::string_view dest_object_name, std::vector<std::string> source_objects, rjson::value metadata = {}, seastar::abort_source* = nullptr);
/**
* Creates a data_sink for uploading data to a given name in bucket.
*
* @name - name of object to create/overwrite
* @metadata - optional metadata to set in the created object.
*
* Note: this will overwrite any existing object of the same name.
*/
seastar::data_sink create_upload_sink(std::string_view bucket, std::string_view object_name, rjson::value metadata = {}, seastar::abort_source* = nullptr) const;
/**
* Creates a data_source for reading from a named object.
*/
seekable_data_source create_download_source(std::string_view bucket, std::string_view object_name, seastar::abort_source* = nullptr) const;
/**
* Destroys resources. Must be called before releasing object
*/
future<> close();
};
class storage_error : public std::runtime_error {
int _status;
public:
storage_error(const std::string&);
storage_error(int status, const std::string&);
// TODO: make http::status_type non-nested type, and forward declarable
int status() const {
return _status;
}
};
class permission_error : public storage_error {
public:
using mybase = storage_error;
using mybase::mybase;
};
class failed_operation : public storage_error {
public:
using mybase = storage_error;
using mybase::mybase;
};
class failed_upload_error : public failed_operation {
public:
using mybase = failed_operation;
using mybase::mybase;
};
}