From bd6460f00a4ce3dd903b3005556360dd4ee3c8d5 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 18 Aug 2021 18:21:33 +0300 Subject: [PATCH] sstables: convert parse(summary&) to a coroutine --- sstables/sstables.cc | 50 ++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index ff264c254b..77414c2ae6 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -508,23 +508,26 @@ void write(sstable_version_types v, file_writer& out, const disk_set_of_tagged_u future<> parse(const schema& schema, sstable_version_types v, random_access_reader& in, summary& s) { using pos_type = typename decltype(summary::positions)::value_type; - return parse(schema, v, in, s.header.min_index_interval, + co_await parse(schema, v, in, s.header.min_index_interval, s.header.size, s.header.memory_size, s.header.sampling_level, - s.header.size_at_full_sampling).then([v, &schema, &in, &s] { - return in.read_exactly(s.header.size * sizeof(pos_type)).then([&in, &s] (auto buf) { + s.header.size_at_full_sampling); + { + auto buf = co_await in.read_exactly(s.header.size * sizeof(pos_type)); + { auto len = s.header.size * sizeof(pos_type); check_buf_size(buf, len); // Positions are encoded in little-endian. auto b = buf.get(); s.positions = utils::chunked_vector(); - return do_until([&s] { return s.positions.size() == s.header.size; }, [&s, buf = std::move(buf), b] () mutable { + while (s.positions.size() != s.header.size) { s.positions.push_back(seastar::read_le(b)); b += sizeof(pos_type); - return make_ready_future<>(); - }).then([&s] { + co_await make_ready_future<>(); // yield + } + { // Since the keys in the index are not sized, we need to calculate // the start position of the index i+1 to determine the boundaries // of index i. The "memory_size" field in the header determines the @@ -532,24 +535,23 @@ future<> parse(const schema& schema, sstable_version_types v, random_access_read // can guarantee that no conditionals are used, and we can always // query the position of the "next" index. s.positions.push_back(s.header.memory_size); - return make_ready_future<>(); - }); - }).then([&in, &s] { - return in.seek(sizeof(summary::header) + s.header.memory_size); - }).then([v, &schema, &in, &s] { - return parse(schema, v, in, s.first_key, s.last_key); - }).then([&in, &s] { - return in.seek(s.positions[0] + sizeof(summary::header)); - }).then([&schema, &in, &s] { + } + + co_await in.seek(sizeof(summary::header) + s.header.memory_size); + co_await parse(schema, v, in, s.first_key, s.last_key); + co_await in.seek(s.positions[0] + sizeof(summary::header)); + s.entries.reserve(s.header.size); - return do_with(int(0), [&schema, &in, &s] (int& idx) mutable { - return do_until([&s] { return s.entries.size() == s.header.size; }, [&schema, &s, &in, &idx] () mutable { + int idx = 0; + { + while (s.entries.size() != s.header.size) { auto pos = s.positions[idx++]; auto next = s.positions[idx]; auto entrysize = next - pos; - return in.read_exactly(entrysize).then([&schema, &s, entrysize] (auto buf) mutable { + auto buf = co_await in.read_exactly(entrysize); + { check_buf_size(buf, entrysize); auto keysize = entrysize - 8; @@ -561,15 +563,13 @@ future<> parse(const schema& schema, sstable_version_types v, random_access_read auto token = schema.get_partitioner().get_token(key_view(key_data)); s.add_summary_data(token.data()); s.entries.push_back({ token, key_data, position }); - return make_ready_future<>(); - }); - }); - }).then([&s] { + } + } // Delete last element which isn't part of the on-disk format. s.positions.pop_back(); - }); - }); - }); + } + } + } } inline void write(sstable_version_types v, file_writer& out, const summary_entry& entry) {