sstable: Generalize toc file read and parse

There are several places where TOC file is parsed into a vector of
components -- sstable::read_toc(), remove_by_toc_name() and
remove_by_registry_entry(). All three deserve some generalization.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2023-11-24 16:08:18 +03:00
parent c5d85bdf79
commit 0da37d5fa6
3 changed files with 18 additions and 27 deletions

View File

@@ -784,6 +784,15 @@ static inline sstring parent_path(const sstring& fname) {
return fs::canonical(fs::path(fname)).parent_path().string();
}
future<std::vector<sstring>> sstable::read_and_parse_toc(file f) {
return with_closeable(make_file_input_stream(f), [] (input_stream<char>& in) -> future<std::vector<sstring>> {
std::vector<sstring> components;
auto all = co_await util::read_entire_stream_contiguous(in);
boost::split(components, all, boost::is_any_of("\n"));
co_return components;
});
}
// This is small enough, and well-defined. Easier to just read it all
// at once
future<> sstable::read_toc() noexcept {
@@ -793,21 +802,7 @@ future<> sstable::read_toc() noexcept {
try {
co_await do_read_simple(component_type::TOC, [&] (version_types v, file f) -> future<> {
auto bufptr = allocate_aligned_buffer<char>(4096, 4096);
size_t size = co_await f.dma_read(0, bufptr.get(), 4096);
// This file is supposed to be very small. Theoretically we should check its size,
// but if we so much as read a whole page from it, there is definitely something fishy
// going on - and this simplifies the code.
if (size >= 4096) {
throw malformed_sstable_exception("SSTable TOC too big: " + to_sstring(size) + " bytes", filename(component_type::TOC));
}
std::string_view buf(bufptr.get(), size);
std::vector<sstring> comps;
boost::split(comps , buf, boost::is_any_of("\n"));
auto comps = co_await read_and_parse_toc(f);
for (auto& c: comps) {
// accept trailing newlines
if (c == "") {
@@ -2625,12 +2620,7 @@ future<> remove_by_toc_name(sstring sstable_toc_name, storage::sync_dir sync) {
}
auto toc_file = co_await open_checked_file_dma(sstable_write_error_handler, new_toc_name, open_flags::ro);
std::vector<sstring> components = co_await with_closeable(std::move(toc_file), [] (file& toc_file) {
return with_closeable(make_file_input_stream(toc_file), [] (input_stream<char>& in) -> future<std::vector<sstring>> {
std::vector<sstring> components;
auto all = co_await util::read_entire_stream_contiguous(in);
boost::split(components, all, boost::is_any_of("\n"));
co_return components;
});
return sstable::read_and_parse_toc(toc_file);
});
co_await coroutine::parallel_for_each(components, [&prefix] (sstring component) -> future<> {

View File

@@ -603,6 +603,8 @@ public:
bool has_component(component_type f) const;
sstables_manager& manager() { return _manager; }
const sstables_manager& manager() const { return _manager; }
static future<std::vector<sstring>> read_and_parse_toc(file f);
private:
void unused(); // Called when reference count drops to zero
future<file> open_file(component_type, open_flags, file_open_options = {}) noexcept;

View File

@@ -16,6 +16,7 @@
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/file.hh>
#include <seastar/util/closeable.hh>
#include "sstables/exceptions.hh"
#include "sstables/sstable_directory.hh"
@@ -539,21 +540,19 @@ future<> s3_storage::wipe(const sstable& sst, sync_dir) noexcept {
future<> s3_storage::remove_by_registry_entry(entry_descriptor desc) {
auto prefix = format("/{}/{}", _bucket, desc.generation);
std::optional<temporary_buffer<char>> toc;
std::vector<sstring> components;
try {
toc = co_await _client->get_object_contiguous(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC));
auto f = _client->make_readable_file(prefix + "/" + sstable_version_constants::get_component_map(desc.version).at(component_type::TOC));
components = co_await with_closeable(std::move(f), [] (file& f) {
return sstable::read_and_parse_toc(f);
});
} catch (const storage_io_error& e) {
if (e.code().value() != ENOENT) {
throw;
}
}
if (!toc) {
co_return; // missing TOC object is OK
}
boost::split(components, std::string_view(toc->get(), toc->size()), boost::is_any_of("\n"));
co_await coroutine::parallel_for_each(components, [this, &prefix] (sstring comp) -> future<> {
if (comp != sstable_version_constants::TOC_SUFFIX) {
co_await _client->delete_object(prefix + "/" + comp);