sstables: convert parse(summary&) to a coroutine

This commit is contained in:
Avi Kivity
2021-08-18 18:21:33 +03:00
parent 3a1cf3aa88
commit bd6460f00a

View File

@@ -508,23 +508,26 @@ void write(sstable_version_types v, file_writer& out, const disk_set_of_tagged_u
future<> parse(const schema& schema, sstable_version_types v, random_access_reader& in, summary& s) {
using pos_type = typename decltype(summary::positions)::value_type;
return parse(schema, v, in, s.header.min_index_interval,
co_await parse(schema, v, in, s.header.min_index_interval,
s.header.size,
s.header.memory_size,
s.header.sampling_level,
s.header.size_at_full_sampling).then([v, &schema, &in, &s] {
return in.read_exactly(s.header.size * sizeof(pos_type)).then([&in, &s] (auto buf) {
s.header.size_at_full_sampling);
{
auto buf = co_await in.read_exactly(s.header.size * sizeof(pos_type));
{
auto len = s.header.size * sizeof(pos_type);
check_buf_size(buf, len);
// Positions are encoded in little-endian.
auto b = buf.get();
s.positions = utils::chunked_vector<pos_type>();
return do_until([&s] { return s.positions.size() == s.header.size; }, [&s, buf = std::move(buf), b] () mutable {
while (s.positions.size() != s.header.size) {
s.positions.push_back(seastar::read_le<pos_type>(b));
b += sizeof(pos_type);
return make_ready_future<>();
}).then([&s] {
co_await make_ready_future<>(); // yield
}
{
// Since the keys in the index are not sized, we need to calculate
// the start position of the index i+1 to determine the boundaries
// of index i. The "memory_size" field in the header determines the
@@ -532,24 +535,23 @@ future<> parse(const schema& schema, sstable_version_types v, random_access_read
// can guarantee that no conditionals are used, and we can always
// query the position of the "next" index.
s.positions.push_back(s.header.memory_size);
return make_ready_future<>();
});
}).then([&in, &s] {
return in.seek(sizeof(summary::header) + s.header.memory_size);
}).then([v, &schema, &in, &s] {
return parse(schema, v, in, s.first_key, s.last_key);
}).then([&in, &s] {
return in.seek(s.positions[0] + sizeof(summary::header));
}).then([&schema, &in, &s] {
}
co_await in.seek(sizeof(summary::header) + s.header.memory_size);
co_await parse(schema, v, in, s.first_key, s.last_key);
co_await in.seek(s.positions[0] + sizeof(summary::header));
s.entries.reserve(s.header.size);
return do_with(int(0), [&schema, &in, &s] (int& idx) mutable {
return do_until([&s] { return s.entries.size() == s.header.size; }, [&schema, &s, &in, &idx] () mutable {
int idx = 0;
{
while (s.entries.size() != s.header.size) {
auto pos = s.positions[idx++];
auto next = s.positions[idx];
auto entrysize = next - pos;
return in.read_exactly(entrysize).then([&schema, &s, entrysize] (auto buf) mutable {
auto buf = co_await in.read_exactly(entrysize);
{
check_buf_size(buf, entrysize);
auto keysize = entrysize - 8;
@@ -561,15 +563,13 @@ future<> parse(const schema& schema, sstable_version_types v, random_access_read
auto token = schema.get_partitioner().get_token(key_view(key_data));
s.add_summary_data(token.data());
s.entries.push_back({ token, key_data, position });
return make_ready_future<>();
});
});
}).then([&s] {
}
}
// Delete last element which isn't part of the on-disk format.
s.positions.pop_back();
});
});
});
}
}
}
}
inline void write(sstable_version_types v, file_writer& out, const summary_entry& entry) {