From c97ce32f47bcd565ebe7975eded1bf3f25f44754 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 13 Apr 2026 10:34:37 +0200 Subject: [PATCH] Update position in dma_read(iovec) in create_file_for_seekable_source Fixes: SCYLLADB-1523 The returned file object does not increment file pos as is. One line fix. Added test to make sure this read path works as expected. Closes scylladb/scylladb#29456 --- test/boost/gcp_object_storage_test.cc | 58 ++++++++++++++++++++++----- utils/io-wrappers.cc | 1 + 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/test/boost/gcp_object_storage_test.cc b/test/boost/gcp_object_storage_test.cc index dd5cb0d6e5..6c7b5c0e5f 100644 --- a/test/boost/gcp_object_storage_test.cc +++ b/test/boost/gcp_object_storage_test.cc @@ -89,16 +89,7 @@ static future<> create_object_of_size(storage::client& c co_await sink.close(); } -static future<> compare_object_data(const local_gcs_wrapper& env, std::string_view object_name, std::vector>&& bufs) { - auto& c = env.client(); - auto total = std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) { - return s + buf.size(); - }); - - auto source = c.create_download_source(env.bucket, object_name); - auto is1 = seastar::input_stream(std::move(source)); - auto is2 = seastar::input_stream(create_memory_source(std::move(bufs))); - +static future<> compare_stream_data(seastar::input_stream& is1, seastar::input_stream& is2, size_t total) { uint64_t read = 0; while (!is1.eof()) { auto buf = co_await is1.read(); @@ -113,6 +104,23 @@ static future<> compare_object_data(const local_gcs_wrapper& env, std::string_vi BOOST_REQUIRE_EQUAL(read, total); } +static std::tuple, size_t> stream_from_buffers(std::vector>&& bufs) { + auto total = std::accumulate(bufs.begin(), bufs.end(), size_t{}, [](size_t s, auto& buf) { + return s + buf.size(); + }); + auto is = seastar::input_stream(create_memory_source(std::move(bufs))); + return std::make_tuple(std::move(is), total); +} + +static future<> compare_object_data(const local_gcs_wrapper& env, std::string_view object_name, std::vector>&& bufs) { + auto& c = env.client(); + auto source = c.create_download_source(env.bucket, object_name); + auto is1 = seastar::input_stream(std::move(source)); + auto [is2, total] = stream_from_buffers(std::move(bufs)); + + co_await compare_stream_data(is1, is2, total); +} + using namespace std::string_literals; static constexpr auto prefix = "bork/ninja/"s; @@ -277,4 +285,34 @@ SEASTAR_FIXTURE_TEST_CASE(test_merge_objects, local_gcs_wrapper, *check_gcp_stor co_await compare_object_data(env, name, std::move(bufs)); } + +SEASTAR_FIXTURE_TEST_CASE(test_gcp_storage_read_large_object_iov, local_gcs_wrapper, *check_gcp_storage_test_enabled()) { + auto& c = client(); + auto name = make_name(); + std::vector> written; + + auto dest_size = 32*1024*1024 + 357 + 1022*67; + + // ensure we remove the object + objects_to_delete.emplace_back(name); + co_await create_object_of_size(c, bucket, name, dest_size, &written); + auto source = c.create_download_source(bucket, name); + auto f = create_file_for_seekable_source(std::move(source)); + auto [is2, total] = stream_from_buffers(std::move(std::move(written))); + std::vector> bufs; + std::vector vecs; + for (size_t i = 0; i < total; ) { + auto n = std::min(total - i, size_t(8192)); + bufs.emplace_back(n); + vecs.emplace_back(iovec{ .iov_base = bufs.back().get_write(), .iov_len = n }); + i += n; + } + auto read = co_await f.dma_read(0, std::move(vecs)); + BOOST_REQUIRE_EQUAL(read, total); + auto [is1, total2] = stream_from_buffers(std::move(std::move(bufs))); + BOOST_REQUIRE_EQUAL(total, total2); + co_await compare_stream_data(is1, is2, total); + +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/utils/io-wrappers.cc b/utils/io-wrappers.cc index 69102ffa06..6258a12e22 100644 --- a/utils/io-wrappers.cc +++ b/utils/io-wrappers.cc @@ -270,6 +270,7 @@ seastar::file create_file_for_seekable_source(seekable_data_source src, seekable break; } res += n; + pos += n; } co_return res; }