From 339182287f48aa3df86b7733e2dd1c1d7dd11909 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 24 Nov 2023 17:58:56 +0300 Subject: [PATCH 1/3] s3/client: Cache stats on readable_file S3-based sstables components are immutable, so every time stat is called there's no need to ping server again. But the main intention of this patch is to provide stats for read calls in the next patch. Signed-off-by: Pavel Emelyanov --- utils/s3/client.cc | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/utils/s3/client.cc b/utils/s3/client.cc index 6fa31b0ac7..66f80fa184 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -850,11 +850,23 @@ data_sink client::make_upload_jumbo_sink(sstring object_name, std::optional _client; sstring _object_name; + std::optional _stats; [[noreturn]] void unsupported() { throw_with_backtrace("unsupported operation on s3 readable file"); } + future<> maybe_update_stats() { + if (_stats) { + return make_ready_future<>(); + } + + return _client->get_object_stats(_object_name).then([this] (auto st) { + _stats = std::move(st); + return make_ready_future<>(); + }); + } + public: readable_file(shared_ptr cln, sstring object_name) : _client(std::move(cln)) @@ -899,16 +911,16 @@ public: } virtual future stat(void) override { - auto object_stats = co_await _client->get_object_stats(_object_name); + co_await maybe_update_stats(); struct stat ret {}; ret.st_nlink = 1; ret.st_mode = S_IFREG | S_IRUSR | S_IRGRP | S_IROTH; - ret.st_size = object_stats.size; + ret.st_size = _stats->size; ret.st_blksize = 1 << 10; // huh? - ret.st_blocks = object_stats.size >> 9; + ret.st_blocks = _stats->size >> 9; // objects are immutable on S3, therefore we can use Last-Modified to set both st_mtime and st_ctime - ret.st_mtime = object_stats.last_modified; - ret.st_ctime = object_stats.last_modified; + ret.st_mtime = _stats->last_modified; + ret.st_ctime = _stats->last_modified; co_return ret; } From c5d85bdf79901274ec26b2eda41a2b9ce2eda5d8 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 24 Nov 2023 17:59:14 +0300 Subject: [PATCH 2/3] s3/client: Don't GET object contents on out-of-bound reads If S3 readable file is used inside file input stream, the latter may call its read methods with position that is above file size. In that case server replies with generic http error and the fact that the range was invalid is encoded into reply body's xml. That's not great to catch this via wrong reply status exception and xml parsing all the more so we can know that the read is out-of-bound in advance. Signed-off-by: Pavel Emelyanov --- test/boost/s3_test.cc | 25 +++++++++++++++++++++++++ utils/s3/client.cc | 15 +++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/test/boost/s3_test.cc b/test/boost/s3_test.cc index 8532345289..75593fc114 100644 --- a/test/boost/s3_test.cc +++ b/test/boost/s3_test.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include "test/lib/scylla_test_case.hh" #include "test/lib/log.hh" #include "test/lib/random_utils.hh" @@ -233,6 +234,30 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file) { BOOST_REQUIRE_EQUAL(to_sstring(std::move(buf)), sstring("67890ABC")); } +SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream) { + const sstring name(fmt::format("/{}/teststreamobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid())); + + testlog.info("Make client\n"); + semaphore mem(16<<20); + auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem); + auto close_client = deferred_close(*cln); + + testlog.info("Put object {}\n", name); + sstring sample("1F2E3D4C5B6A70899807A6B5C4D3E2F1"); + temporary_buffer data(sample.c_str(), sample.size()); + cln->put_object(name, std::move(data)).get(); + auto delete_object = deferred_delete_object(cln, name); + + auto f = cln->make_readable_file(name); + auto close_readable_file = deferred_close(f); + auto in = make_file_input_stream(f); + auto close_stream = deferred_close(in); + + testlog.info("Check input stream read\n"); + auto res = seastar::util::read_entire_stream_contiguous(in).get0(); + BOOST_REQUIRE_EQUAL(res, sample); +} + SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging) { const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid())); diff --git a/utils/s3/client.cc b/utils/s3/client.cc index 66f80fa184..802f46d164 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -925,12 +925,22 @@ public: } virtual future read_dma(uint64_t pos, void* buffer, size_t len, io_intent*) override { + co_await maybe_update_stats(); + if (pos >= _stats->size) { + co_return 0; + } + auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, len }); std::copy_n(buf.get(), buf.size(), reinterpret_cast(buffer)); co_return buf.size(); } virtual future read_dma(uint64_t pos, std::vector iov, io_intent*) override { + co_await maybe_update_stats(); + if (pos >= _stats->size) { + co_return 0; + } + auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, utils::iovec_len(iov) }); uint64_t off = 0; for (auto& v : iov) { @@ -945,6 +955,11 @@ public: } virtual future> dma_read_bulk(uint64_t offset, size_t range_size, io_intent*) override { + co_await maybe_update_stats(); + if (offset >= _stats->size) { + co_return temporary_buffer(); + } + auto buf = co_await _client->get_object_contiguous(_object_name, range{ offset, range_size }); co_return temporary_buffer(reinterpret_cast(buf.get_write()), buf.size(), buf.release()); } From 0da37d5fa6fff299695cec164fdd270240a09aff Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 24 Nov 2023 16:08:18 +0300 Subject: [PATCH 3/3] sstable: Generalize toc file read and parse There are several places where TOC file is parsed into a vector of components -- sstable::read_toc(), remove_by_toc_name() and remove_by_registry_entry(). All three deserve some generalization. Signed-off-by: Pavel Emelyanov --- sstables/sstables.cc | 32 +++++++++++--------------------- sstables/sstables.hh | 2 ++ sstables/storage.cc | 11 +++++------ 3 files changed, 18 insertions(+), 27 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index bb656cf016..deac9e4c80 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -784,6 +784,15 @@ static inline sstring parent_path(const sstring& fname) { return fs::canonical(fs::path(fname)).parent_path().string(); } +future> sstable::read_and_parse_toc(file f) { + return with_closeable(make_file_input_stream(f), [] (input_stream& in) -> future> { + std::vector components; + auto all = co_await util::read_entire_stream_contiguous(in); + boost::split(components, all, boost::is_any_of("\n")); + co_return components; + }); +} + // This is small enough, and well-defined. Easier to just read it all // at once future<> sstable::read_toc() noexcept { @@ -793,21 +802,7 @@ future<> sstable::read_toc() noexcept { try { co_await do_read_simple(component_type::TOC, [&] (version_types v, file f) -> future<> { - auto bufptr = allocate_aligned_buffer(4096, 4096); - - size_t size = co_await f.dma_read(0, bufptr.get(), 4096); - // This file is supposed to be very small. Theoretically we should check its size, - // but if we so much as read a whole page from it, there is definitely something fishy - // going on - and this simplifies the code. - if (size >= 4096) { - throw malformed_sstable_exception("SSTable TOC too big: " + to_sstring(size) + " bytes", filename(component_type::TOC)); - } - - std::string_view buf(bufptr.get(), size); - std::vector comps; - - boost::split(comps , buf, boost::is_any_of("\n")); - + auto comps = co_await read_and_parse_toc(f); for (auto& c: comps) { // accept trailing newlines if (c == "") { @@ -2625,12 +2620,7 @@ future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync) { } auto toc_file = co_await open_checked_file_dma(sstable_write_error_handler, new_toc_name, open_flags::ro); std::vector components = co_await with_closeable(std::move(toc_file), [] (file& toc_file) { - return with_closeable(make_file_input_stream(toc_file), [] (input_stream& in) -> future> { - std::vector components; - auto all = co_await util::read_entire_stream_contiguous(in); - boost::split(components, all, boost::is_any_of("\n")); - co_return components; - }); + return sstable::read_and_parse_toc(toc_file); }); co_await coroutine::parallel_for_each(components, [&prefix] (sstring component) -> future<> { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 0b39f707e2..2991c83f6d 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -603,6 +603,8 @@ public: bool has_component(component_type f) const; sstables_manager& manager() { return _manager; } const sstables_manager& manager() const { return _manager; } + + static future> read_and_parse_toc(file f); private: void unused(); // Called when reference count drops to zero future open_file(component_type, open_flags, file_open_options = {}) noexcept; diff --git a/sstables/storage.cc b/sstables/storage.cc index 1d3d1b532e..3c4a7cd047 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include "sstables/exceptions.hh" #include "sstables/sstable_directory.hh" @@ -539,21 +540,19 @@ future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept { future<> s3_storage::remove_by_registry_entry(entry_descriptor desc) { auto prefix = format("/{}/{}", _bucket, desc.generation); - std::optional> toc; std::vector components; try { - toc = co_await _client->get_object_contiguous(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC)); + auto f = _client->make_readable_file(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC)); + components = co_await with_closeable(std::move(f), [] (file& f) { + return sstable::read_and_parse_toc(f); + }); } catch (const storage_io_error& e) { if (e.code().value() != ENOENT) { throw; } } - if (!toc) { - co_return; // missing TOC object is OK - } - boost::split(components, std::string_view(toc->get(), toc->size()), boost::is_any_of("\n")); co_await coroutine::parallel_for_each(components, [this, &prefix] (sstring comp) -> future<> { if (comp != sstable_version_constants::TOC_SUFFIX) { co_await _client->delete_object(prefix + "/" + comp);