diff --git a/sstables/sstables.cc b/sstables/sstables.cc index bb656cf016..deac9e4c80 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -784,6 +784,15 @@ static inline sstring parent_path(const sstring& fname) { return fs::canonical(fs::path(fname)).parent_path().string(); } +future> sstable::read_and_parse_toc(file f) { + return with_closeable(make_file_input_stream(f), [] (input_stream& in) -> future> { + std::vector components; + auto all = co_await util::read_entire_stream_contiguous(in); + boost::split(components, all, boost::is_any_of("\n")); + co_return components; + }); +} + // This is small enough, and well-defined. Easier to just read it all // at once future<> sstable::read_toc() noexcept { @@ -793,21 +802,7 @@ future<> sstable::read_toc() noexcept { try { co_await do_read_simple(component_type::TOC, [&] (version_types v, file f) -> future<> { - auto bufptr = allocate_aligned_buffer(4096, 4096); - - size_t size = co_await f.dma_read(0, bufptr.get(), 4096); - // This file is supposed to be very small. Theoretically we should check its size, - // but if we so much as read a whole page from it, there is definitely something fishy - // going on - and this simplifies the code. - if (size >= 4096) { - throw malformed_sstable_exception("SSTable TOC too big: " + to_sstring(size) + " bytes", filename(component_type::TOC)); - } - - std::string_view buf(bufptr.get(), size); - std::vector comps; - - boost::split(comps , buf, boost::is_any_of("\n")); - + auto comps = co_await read_and_parse_toc(f); for (auto& c: comps) { // accept trailing newlines if (c == "") { @@ -2625,12 +2620,7 @@ future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync) { } auto toc_file = co_await open_checked_file_dma(sstable_write_error_handler, new_toc_name, open_flags::ro); std::vector components = co_await with_closeable(std::move(toc_file), [] (file& toc_file) { - return with_closeable(make_file_input_stream(toc_file), [] (input_stream& in) -> future> { - std::vector components; - auto all = co_await util::read_entire_stream_contiguous(in); - boost::split(components, all, boost::is_any_of("\n")); - co_return components; - }); + return sstable::read_and_parse_toc(toc_file); }); co_await coroutine::parallel_for_each(components, [&prefix] (sstring component) -> future<> { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 0b39f707e2..2991c83f6d 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -603,6 +603,8 @@ public: bool has_component(component_type f) const; sstables_manager& manager() { return _manager; } const sstables_manager& manager() const { return _manager; } + + static future> read_and_parse_toc(file f); private: void unused(); // Called when reference count drops to zero future open_file(component_type, open_flags, file_open_options = {}) noexcept; diff --git a/sstables/storage.cc b/sstables/storage.cc index 1d3d1b532e..3c4a7cd047 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include "sstables/exceptions.hh" #include "sstables/sstable_directory.hh" @@ -539,21 +540,19 @@ future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept { future<> s3_storage::remove_by_registry_entry(entry_descriptor desc) { auto prefix = format("/{}/{}", _bucket, desc.generation); - std::optional> toc; std::vector components; try { - toc = co_await _client->get_object_contiguous(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC)); + auto f = _client->make_readable_file(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC)); + components = co_await with_closeable(std::move(f), [] (file& f) { + return sstable::read_and_parse_toc(f); + }); } catch (const storage_io_error& e) { if (e.code().value() != ENOENT) { throw; } } - if (!toc) { - co_return; // missing TOC object is OK - } - boost::split(components, std::string_view(toc->get(), toc->size()), boost::is_any_of("\n")); co_await coroutine::parallel_for_each(components, [this, &prefix] (sstring comp) -> future<> { if (comp != sstable_version_constants::TOC_SUFFIX) { co_await _client->delete_object(prefix + "/" + comp); diff --git a/test/boost/s3_test.cc b/test/boost/s3_test.cc index 8532345289..75593fc114 100644 --- a/test/boost/s3_test.cc +++ b/test/boost/s3_test.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include "test/lib/scylla_test_case.hh" #include "test/lib/log.hh" #include "test/lib/random_utils.hh" @@ -233,6 +234,30 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file) { BOOST_REQUIRE_EQUAL(to_sstring(std::move(buf)), sstring("67890ABC")); } +SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream) { + const sstring name(fmt::format("/{}/teststreamobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid())); + + testlog.info("Make client\n"); + semaphore mem(16<<20); + auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem); + auto close_client = deferred_close(*cln); + + testlog.info("Put object {}\n", name); + sstring sample("1F2E3D4C5B6A70899807A6B5C4D3E2F1"); + temporary_buffer data(sample.c_str(), sample.size()); + cln->put_object(name, std::move(data)).get(); + auto delete_object = deferred_delete_object(cln, name); + + auto f = cln->make_readable_file(name); + auto close_readable_file = deferred_close(f); + auto in = make_file_input_stream(f); + auto close_stream = deferred_close(in); + + testlog.info("Check input stream read\n"); + auto res = seastar::util::read_entire_stream_contiguous(in).get0(); + BOOST_REQUIRE_EQUAL(res, sample); +} + SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging) { const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid())); diff --git a/utils/s3/client.cc b/utils/s3/client.cc index 6fa31b0ac7..802f46d164 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -850,11 +850,23 @@ data_sink client::make_upload_jumbo_sink(sstring object_name, std::optional _client; sstring _object_name; + std::optional _stats; [[noreturn]] void unsupported() { throw_with_backtrace("unsupported operation on s3 readable file"); } + future<> maybe_update_stats() { + if (_stats) { + return make_ready_future<>(); + } + + return _client->get_object_stats(_object_name).then([this] (auto st) { + _stats = std::move(st); + return make_ready_future<>(); + }); + } + public: readable_file(shared_ptr cln, sstring object_name) : _client(std::move(cln)) @@ -899,26 +911,36 @@ public: } virtual future stat(void) override { - auto object_stats = co_await _client->get_object_stats(_object_name); + co_await maybe_update_stats(); struct stat ret {}; ret.st_nlink = 1; ret.st_mode = S_IFREG | S_IRUSR | S_IRGRP | S_IROTH; - ret.st_size = object_stats.size; + ret.st_size = _stats->size; ret.st_blksize = 1 << 10; // huh? - ret.st_blocks = object_stats.size >> 9; + ret.st_blocks = _stats->size >> 9; // objects are immutable on S3, therefore we can use Last-Modified to set both st_mtime and st_ctime - ret.st_mtime = object_stats.last_modified; - ret.st_ctime = object_stats.last_modified; + ret.st_mtime = _stats->last_modified; + ret.st_ctime = _stats->last_modified; co_return ret; } virtual future read_dma(uint64_t pos, void* buffer, size_t len, io_intent*) override { + co_await maybe_update_stats(); + if (pos >= _stats->size) { + co_return 0; + } + auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, len }); std::copy_n(buf.get(), buf.size(), reinterpret_cast(buffer)); co_return buf.size(); } virtual future read_dma(uint64_t pos, std::vector iov, io_intent*) override { + co_await maybe_update_stats(); + if (pos >= _stats->size) { + co_return 0; + } + auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, utils::iovec_len(iov) }); uint64_t off = 0; for (auto& v : iov) { @@ -933,6 +955,11 @@ public: } virtual future> dma_read_bulk(uint64_t offset, size_t range_size, io_intent*) override { + co_await maybe_update_stats(); + if (offset >= _stats->size) { + co_return temporary_buffer(); + } + auto buf = co_await _client->get_object_contiguous(_object_name, range{ offset, range_size }); co_return temporary_buffer(reinterpret_cast(buf.get_write()), buf.size(), buf.release()); }