sstables, cached_file: Avoid copying buffers from cache when parsing promoted index

This commit is contained in:
Tomasz Grabiec
2021-04-13 19:48:39 +02:00
parent 7b6f18b4ed
commit 934824394a
2 changed files with 68 additions and 5 deletions

View File

@@ -180,17 +180,21 @@ public:
promoted_index_block_parser _block_parser;
reader_permit _permit;
cached_file::stream _stream;
logalloc::allocating_section _as;
private:
// Feeds the stream into the consumer until the consumer is satisfied.
// Does not give unconsumed data back to the stream.
template <typename Consumer>
static future<> consume_stream(cached_file::stream& s, Consumer& c) {
future<> consume_stream(cached_file::stream& s, Consumer& c) {
return repeat([&] {
return s.next().then([&] (temporary_buffer<char>&& buf) {
if (buf.empty()) {
return s.next_page_view().then([&] (cached_file::page_view&& page) {
if (!page) {
on_internal_error(sstlog, "End of stream while parsing");
}
return stop_iteration(c.consume(buf) == data_consumer::read_status::ready);
return _as(_cached_file.region(), [&] {
auto buf = page.get_buf();
return stop_iteration(c.consume(buf) == data_consumer::read_status::ready);
});
});
});
}
@@ -204,7 +208,9 @@ private:
future<pi_offset_type> read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
_stream = _cached_file.read(_promoted_index_start + get_offset_entry_pos(idx), _pc, _permit, trace_state);
return _stream.next().then([this, idx] (temporary_buffer<char>&& buf) {
return _stream.next_page_view().then([this, idx] (cached_file::page_view page) {
temporary_buffer<char> buf = page.get_buf();
static_assert(noexcept(std::declval<data_consumer::primitive_consumer>().read_32(buf)));
if (__builtin_expect(_primitive_parser.read_32(buf) == data_consumer::read_status::ready, true)) {
return make_ready_future<pi_offset_type>(_primitive_parser._u32);
}

View File

@@ -127,6 +127,13 @@ private:
return temporary_buffer<char>(_buf.get_write(), _buf.size(), make_deleter([self = std::move(self)] {}));
}
// Returns a buffer which reflects contents of this page.
// The buffer will not prevent eviction.
// The buffer is invalidated when the page is evicted or when the owning LSA region invalidates references.
temporary_buffer<char> get_buf_weak() {
return temporary_buffer<char>(_lsa_buf.get(), _lsa_buf.size(), deleter());
}
size_t size_in_allocator() {
// lsa_buf occupies 4K in LSA even if the buf size is smaller.
// _buf is transient and not accounted here.
@@ -187,6 +194,31 @@ private:
});
}
public:
class page_view {
cached_page::ptr_type _page;
size_t _offset;
size_t _size;
std::optional<reader_permit::resource_units> _units;
public:
page_view() = default;
page_view(size_t offset, size_t size, cached_page::ptr_type page, std::optional<reader_permit::resource_units> units)
: _page(std::move(page))
, _offset(offset)
, _size(size)
, _units(std::move(units))
{}
// The returned buffer is valid only until the LSA region associated with cached_file invalidates references.
temporary_buffer<char> get_buf() {
auto buf = _page->get_buf_weak();
buf.trim(_size);
buf.trim_front(_offset);
return buf;
}
operator bool() const { return bool(_page); }
};
// Generator of subsequent pages of data reflecting the contents of the file.
// Single-user.
class stream {
@@ -243,6 +275,27 @@ public:
return page;
});
}
// Yields the next chunk of data.
// Returns empty buffer when end-of-stream is reached.
// Calls must be serialized.
// This instance must be kept alive until the returned future resolves.
future<page_view> next_page_view() {
if (!_cached_file || _page_idx > _cached_file->_last_page) {
return make_ready_future<page_view>(page_view());
}
auto units = get_page_units();
return _cached_file->get_page_ptr(_page_idx, *_pc, _trace_state).then(
[this, units = std::move(units)] (cached_page::ptr_type page) mutable {
size_t size = _page_idx == _cached_file->_last_page
? _cached_file->_last_page_size
: page_size;
page_view buf(_offset_in_page, size, std::move(page), std::move(units));
_offset_in_page = 0;
++_page_idx;
return buf;
});
}
};
void on_evicted(cached_page& p) {
@@ -360,6 +413,10 @@ public:
file& get_file() {
return _file;
}
logalloc::region& region() {
return _region;
}
};
inline