Merge 'GCS object storage. Fix incompatibilty issues with "real" GCS' from Calle Wilund
Fixes #28398 Fixes #28399 When used as path elements in google storage paths, the object names need to be URL encoded. Due to a.) tests not really using prefixes including non-url valid chars (i.e. / etc) and b.) the mock server used for most testing not enforcing this particular aspect, this was missed. Modified unit tests to use prefixing for all names, so when running real GS, any errors like this will show. "Real" GCS also behaves a bit different when listing with pager, compared to mock; The former will not give a pager token for last page, only penultimate. Adds handling for this. Needs backport to the releases that have (though might not really use) the feature, as it is technically possible to use google storage for backup and whatnot there, and it should work as expected. Closes scylladb/scylladb#28400 * github.com:scylladb/scylladb: utils/gcp/object_storage: URL-encode object names in URL:s utils::gcp::object_storage: Fix list object pager end condition detection
This commit is contained in:
@@ -113,15 +113,23 @@ static future<> compare_object_data(const local_gcs_wrapper& env, std::string_vi
|
||||
BOOST_REQUIRE_EQUAL(read, total);
|
||||
}
|
||||
|
||||
using namespace std::string_literals;
|
||||
static constexpr auto prefix = "bork/ninja/"s;
|
||||
|
||||
// #28398 include a prefix in all names.
|
||||
static std::string make_name() {
|
||||
return fmt::format("{}{}", prefix, utils::UUID_gen::get_time_UUID());
|
||||
}
|
||||
|
||||
static future<> test_read_write_helper(const local_gcs_wrapper& env, size_t dest_size, std::optional<size_t> specific_buffer_size = std::nullopt) {
|
||||
auto& c = env.client();
|
||||
auto uuid = fmt::format("{}", utils::UUID_gen::get_time_UUID());
|
||||
auto name = make_name();
|
||||
std::vector<temporary_buffer<char>> written;
|
||||
|
||||
// ensure we remove the object
|
||||
env.objects_to_delete.emplace_back(uuid);
|
||||
co_await create_object_of_size(c, env.bucket, uuid, dest_size, &written, specific_buffer_size);
|
||||
co_await compare_object_data(env, uuid, std::move(written));
|
||||
env.objects_to_delete.emplace_back(name);
|
||||
co_await create_object_of_size(c, env.bucket, name, dest_size, &written, specific_buffer_size);
|
||||
co_await compare_object_data(env, name, std::move(written));
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(gcs_tests, *seastar::testing::async_fixture<gcs_fixture>())
|
||||
@@ -147,21 +155,28 @@ SEASTAR_FIXTURE_TEST_CASE(test_gcp_storage_list_objects, local_gcs_wrapper, *che
|
||||
auto& c = env.client();
|
||||
std::unordered_map<std::string, uint64_t> names;
|
||||
for (size_t i = 0; i < 10; ++i) {
|
||||
auto name = fmt::format("{}", utils::UUID_gen::get_time_UUID());
|
||||
auto name = make_name();
|
||||
auto size = tests::random::get_int(size_t(1), size_t(2*1024*1024));
|
||||
env.objects_to_delete.emplace_back(name);
|
||||
co_await create_object_of_size(c, env.bucket, name, size);
|
||||
names.emplace(name, size);
|
||||
}
|
||||
|
||||
auto infos = co_await c.list_objects(env.bucket);
|
||||
utils::gcp::storage::bucket_paging paging;
|
||||
size_t n_found = 0;
|
||||
|
||||
for (auto& info : infos) {
|
||||
auto i = names.find(info.name);
|
||||
if (i != names.end()) {
|
||||
BOOST_REQUIRE_EQUAL(info.size, i->second);
|
||||
++n_found;
|
||||
for (;;) {
|
||||
auto infos = co_await c.list_objects(env.bucket, "", paging);
|
||||
|
||||
for (auto& info : infos) {
|
||||
auto i = names.find(info.name);
|
||||
if (i != names.end()) {
|
||||
BOOST_REQUIRE_EQUAL(info.size, i->second);
|
||||
++n_found;
|
||||
}
|
||||
}
|
||||
if (infos.empty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(n_found, names.size());
|
||||
@@ -170,7 +185,7 @@ SEASTAR_FIXTURE_TEST_CASE(test_gcp_storage_list_objects, local_gcs_wrapper, *che
|
||||
SEASTAR_FIXTURE_TEST_CASE(test_gcp_storage_delete_object, local_gcs_wrapper, *check_gcp_storage_test_enabled()) {
|
||||
auto& env = *this;
|
||||
auto& c = env.client();
|
||||
auto name = fmt::format("{}", utils::UUID_gen::get_time_UUID());
|
||||
auto name = make_name();
|
||||
env.objects_to_delete.emplace_back(name);
|
||||
co_await create_object_of_size(c, env.bucket, name, 128);
|
||||
{
|
||||
@@ -190,7 +205,7 @@ SEASTAR_FIXTURE_TEST_CASE(test_gcp_storage_delete_object, local_gcs_wrapper, *ch
|
||||
SEASTAR_FIXTURE_TEST_CASE(test_gcp_storage_skip_read, local_gcs_wrapper, *check_gcp_storage_test_enabled()) {
|
||||
auto& env = *this;
|
||||
auto& c = env.client();
|
||||
auto name = fmt::format("{}", utils::UUID_gen::get_time_UUID());
|
||||
auto name = make_name();
|
||||
std::vector<temporary_buffer<char>> bufs;
|
||||
constexpr size_t file_size = 12*1024*1024 + 384*7 + 31;
|
||||
|
||||
@@ -243,7 +258,7 @@ SEASTAR_FIXTURE_TEST_CASE(test_merge_objects, local_gcs_wrapper, *check_gcp_stor
|
||||
|
||||
size_t total = 0;
|
||||
for (size_t i = 0; i < 32; ++i) {
|
||||
auto name = fmt::format("{}", utils::UUID_gen::get_time_UUID());
|
||||
auto name = make_name();
|
||||
auto size = tests::random::get_int(size_t(1), size_t(2*1024*1024));
|
||||
env.objects_to_delete.emplace_back(name);
|
||||
co_await create_object_of_size(c, env.bucket, name, size, &bufs);
|
||||
@@ -251,7 +266,7 @@ SEASTAR_FIXTURE_TEST_CASE(test_merge_objects, local_gcs_wrapper, *check_gcp_stor
|
||||
total += size;
|
||||
}
|
||||
|
||||
auto name = fmt::format("{}", utils::UUID_gen::get_time_UUID());
|
||||
auto name = make_name();
|
||||
env.objects_to_delete.emplace_back(name);
|
||||
|
||||
auto info = co_await c.merge_objects(env.bucket, name, names);
|
||||
|
||||
@@ -429,7 +429,7 @@ future<> utils::gcp::storage::client::object_data_sink::acquire_session() {
|
||||
}
|
||||
auto path = fmt::format("/upload/storage/v1/b/{}/o?uploadType=resumable&name={}"
|
||||
, _bucket
|
||||
, _object_name
|
||||
, seastar::http::internal::url_encode(_object_name)
|
||||
);
|
||||
|
||||
auto reply = co_await _impl->send_with_retry(path
|
||||
@@ -689,7 +689,11 @@ future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::
|
||||
}
|
||||
|
||||
// Ensure we read from the same generation as we queried in read_info. Note: mock server ignores this.
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}?ifGenerationMatch={}&alt=media", _bucket, _object_name, _generation);
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}?ifGenerationMatch={}&alt=media"
|
||||
, _bucket
|
||||
, seastar::http::internal::url_encode(_object_name)
|
||||
, _generation
|
||||
);
|
||||
auto range = fmt::format("bytes={}-{}", _position, _position+to_read-1); // inclusive range
|
||||
|
||||
co_await _impl->send_with_retry(path
|
||||
@@ -799,7 +803,7 @@ future<temporary_buffer<char>> utils::gcp::storage::client::object_data_source::
|
||||
future<> utils::gcp::storage::client::object_data_source::read_info() {
|
||||
gcp_storage.debug("Read info {}:{}", _bucket, _object_name);
|
||||
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}", _bucket, _object_name);
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}", _bucket, seastar::http::internal::url_encode(_object_name));
|
||||
|
||||
auto res = co_await _impl->send_with_retry(path
|
||||
, GCP_OBJECT_SCOPE_READ_ONLY
|
||||
@@ -916,6 +920,12 @@ static utils::gcp::storage::object_info create_info(const rjson::value& item) {
|
||||
// point in it. Return chunked_vector to avoid large alloc, but keep it
|
||||
// in one object... for now...
|
||||
future<utils::chunked_vector<utils::gcp::storage::object_info>> utils::gcp::storage::client::list_objects(std::string_view bucket_in, std::string_view prefix, bucket_paging& pager) {
|
||||
utils::chunked_vector<utils::gcp::storage::object_info> result;
|
||||
|
||||
if (pager.done) {
|
||||
co_return result;
|
||||
}
|
||||
|
||||
std::string bucket(bucket_in);
|
||||
|
||||
gcp_storage.debug("List bucket {} (prefix={}, max_results={})", bucket, prefix, pager.max_results);
|
||||
@@ -935,8 +945,6 @@ future<utils::chunked_vector<utils::gcp::storage::object_info>> utils::gcp::stor
|
||||
psep = "&&";
|
||||
}
|
||||
|
||||
utils::chunked_vector<utils::gcp::storage::object_info> result;
|
||||
|
||||
co_await _impl->send_with_retry(path
|
||||
, GCP_OBJECT_SCOPE_READ_ONLY
|
||||
, ""s
|
||||
@@ -965,6 +973,7 @@ future<utils::chunked_vector<utils::gcp::storage::object_info>> utils::gcp::stor
|
||||
}
|
||||
|
||||
pager.token = rjson::get_opt<std::string>(root, "nextPageToken").value_or(""s);
|
||||
pager.done = pager.token.empty();
|
||||
|
||||
for (auto& item : items->GetArray()) {
|
||||
object_info info = create_info(item);
|
||||
@@ -989,7 +998,7 @@ future<> utils::gcp::storage::client::delete_object(std::string_view bucket_in,
|
||||
|
||||
gcp_storage.debug("Delete object {}:{}", bucket, object_name);
|
||||
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}", bucket, object_name);
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}", bucket, seastar::http::internal::url_encode(object_name));
|
||||
|
||||
auto res = co_await _impl->send_with_retry(path
|
||||
, GCP_OBJECT_SCOPE_READ_WRITE
|
||||
@@ -1026,7 +1035,11 @@ future<> utils::gcp::storage::client::rename_object(std::string_view bucket_in,
|
||||
|
||||
gcp_storage.debug("Move object {}:{} -> {}", bucket, object_name, new_name);
|
||||
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}/moveTo/o/{}", bucket, object_name, new_name);
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}/moveTo/o/{}"
|
||||
, bucket
|
||||
, seastar::http::internal::url_encode(object_name)
|
||||
, seastar::http::internal::url_encode(new_name)
|
||||
);
|
||||
auto res = co_await _impl->send_with_retry(path
|
||||
, GCP_OBJECT_SCOPE_READ_WRITE
|
||||
, ""s
|
||||
@@ -1052,7 +1065,12 @@ future<> utils::gcp::storage::client::rename_object(std::string_view bucket_in,
|
||||
future<> utils::gcp::storage::client::copy_object(std::string_view bucket_in, std::string_view object_name_in, std::string_view new_bucket_in, std::string_view to_name_in) {
|
||||
std::string bucket(bucket_in), object_name(object_name_in), new_bucket(new_bucket_in), to_name(to_name_in);
|
||||
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}/rewriteTo/b/{}/o/{}", bucket, object_name, new_bucket, to_name);
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}/rewriteTo/b/{}/o/{}"
|
||||
, bucket
|
||||
, seastar::http::internal::url_encode(object_name)
|
||||
, new_bucket
|
||||
, seastar::http::internal::url_encode(to_name)
|
||||
);
|
||||
std::string body;
|
||||
|
||||
for (;;) {
|
||||
@@ -1105,7 +1123,7 @@ future<utils::gcp::storage::object_info> utils::gcp::storage::client::merge_obje
|
||||
|
||||
std::string bucket(bucket_in), object_name(dest_object_name);
|
||||
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}/compose", bucket, object_name);
|
||||
auto path = fmt::format("/storage/v1/b/{}/o/{}/compose", bucket, seastar::http::internal::url_encode(object_name));
|
||||
auto body = rjson::print(compose);
|
||||
|
||||
auto res = co_await _impl->send_with_retry(path
|
||||
|
||||
@@ -49,10 +49,12 @@ namespace utils::gcp::storage {
|
||||
private:
|
||||
uint32_t max_results;
|
||||
std::string token;
|
||||
bool done;
|
||||
friend class client;
|
||||
public:
|
||||
bucket_paging(uint64_t max = 1000)
|
||||
: max_results(max)
|
||||
, done(false)
|
||||
{}
|
||||
bucket_paging(const bucket_paging&) = delete;
|
||||
bucket_paging(bucket_paging&&) = default;
|
||||
|
||||
Reference in New Issue
Block a user