diff --git a/sstables/sstables.cc b/sstables/sstables.cc index bb656cf016..deac9e4c80 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -784,6 +784,15 @@ static inline sstring parent_path(const sstring& fname) { return fs::canonical(fs::path(fname)).parent_path().string(); } +future> sstable::read_and_parse_toc(file f) { + return with_closeable(make_file_input_stream(f), [] (input_stream& in) -> future> { + std::vector components; + auto all = co_await util::read_entire_stream_contiguous(in); + boost::split(components, all, boost::is_any_of("\n")); + co_return components; + }); +} + // This is small enough, and well-defined. Easier to just read it all // at once future<> sstable::read_toc() noexcept { @@ -793,21 +802,7 @@ future<> sstable::read_toc() noexcept { try { co_await do_read_simple(component_type::TOC, [&] (version_types v, file f) -> future<> { - auto bufptr = allocate_aligned_buffer(4096, 4096); - - size_t size = co_await f.dma_read(0, bufptr.get(), 4096); - // This file is supposed to be very small. Theoretically we should check its size, - // but if we so much as read a whole page from it, there is definitely something fishy - // going on - and this simplifies the code. - if (size >= 4096) { - throw malformed_sstable_exception("SSTable TOC too big: " + to_sstring(size) + " bytes", filename(component_type::TOC)); - } - - std::string_view buf(bufptr.get(), size); - std::vector comps; - - boost::split(comps , buf, boost::is_any_of("\n")); - + auto comps = co_await read_and_parse_toc(f); for (auto& c: comps) { // accept trailing newlines if (c == "") { @@ -2625,12 +2620,7 @@ future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync) { } auto toc_file = co_await open_checked_file_dma(sstable_write_error_handler, new_toc_name, open_flags::ro); std::vector components = co_await with_closeable(std::move(toc_file), [] (file& toc_file) { - return with_closeable(make_file_input_stream(toc_file), [] (input_stream& in) -> future> { - std::vector components; - auto all = co_await util::read_entire_stream_contiguous(in); - boost::split(components, all, boost::is_any_of("\n")); - co_return components; - }); + return sstable::read_and_parse_toc(toc_file); }); co_await coroutine::parallel_for_each(components, [&prefix] (sstring component) -> future<> { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 0b39f707e2..2991c83f6d 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -603,6 +603,8 @@ public: bool has_component(component_type f) const; sstables_manager& manager() { return _manager; } const sstables_manager& manager() const { return _manager; } + + static future> read_and_parse_toc(file f); private: void unused(); // Called when reference count drops to zero future open_file(component_type, open_flags, file_open_options = {}) noexcept; diff --git a/sstables/storage.cc b/sstables/storage.cc index 1d3d1b532e..3c4a7cd047 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include "sstables/exceptions.hh" #include "sstables/sstable_directory.hh" @@ -539,21 +540,19 @@ future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept { future<> s3_storage::remove_by_registry_entry(entry_descriptor desc) { auto prefix = format("/{}/{}", _bucket, desc.generation); - std::optional> toc; std::vector components; try { - toc = co_await _client->get_object_contiguous(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC)); + auto f = _client->make_readable_file(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC)); + components = co_await with_closeable(std::move(f), [] (file& f) { + return sstable::read_and_parse_toc(f); + }); } catch (const storage_io_error& e) { if (e.code().value() != ENOENT) { throw; } } - if (!toc) { - co_return; // missing TOC object is OK - } - boost::split(components, std::string_view(toc->get(), toc->size()), boost::is_any_of("\n")); co_await coroutine::parallel_for_each(components, [this, &prefix] (sstring comp) -> future<> { if (comp != sstable_version_constants::TOC_SUFFIX) { co_await _client->delete_object(prefix + "/" + comp);