sstables, cached_file: Avoid copying buffers from cache when parsing promoted index
This commit is contained in:
@@ -180,17 +180,21 @@ public:
|
||||
promoted_index_block_parser _block_parser;
|
||||
reader_permit _permit;
|
||||
cached_file::stream _stream;
|
||||
logalloc::allocating_section _as;
|
||||
private:
|
||||
// Feeds the stream into the consumer until the consumer is satisfied.
|
||||
// Does not give unconsumed data back to the stream.
|
||||
template <typename Consumer>
|
||||
static future<> consume_stream(cached_file::stream& s, Consumer& c) {
|
||||
future<> consume_stream(cached_file::stream& s, Consumer& c) {
|
||||
return repeat([&] {
|
||||
return s.next().then([&] (temporary_buffer<char>&& buf) {
|
||||
if (buf.empty()) {
|
||||
return s.next_page_view().then([&] (cached_file::page_view&& page) {
|
||||
if (!page) {
|
||||
on_internal_error(sstlog, "End of stream while parsing");
|
||||
}
|
||||
return stop_iteration(c.consume(buf) == data_consumer::read_status::ready);
|
||||
return _as(_cached_file.region(), [&] {
|
||||
auto buf = page.get_buf();
|
||||
return stop_iteration(c.consume(buf) == data_consumer::read_status::ready);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -204,7 +208,9 @@ private:
|
||||
|
||||
future<pi_offset_type> read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
|
||||
_stream = _cached_file.read(_promoted_index_start + get_offset_entry_pos(idx), _pc, _permit, trace_state);
|
||||
return _stream.next().then([this, idx] (temporary_buffer<char>&& buf) {
|
||||
return _stream.next_page_view().then([this, idx] (cached_file::page_view page) {
|
||||
temporary_buffer<char> buf = page.get_buf();
|
||||
static_assert(noexcept(std::declval<data_consumer::primitive_consumer>().read_32(buf)));
|
||||
if (__builtin_expect(_primitive_parser.read_32(buf) == data_consumer::read_status::ready, true)) {
|
||||
return make_ready_future<pi_offset_type>(_primitive_parser._u32);
|
||||
}
|
||||
|
||||
@@ -127,6 +127,13 @@ private:
|
||||
return temporary_buffer<char>(_buf.get_write(), _buf.size(), make_deleter([self = std::move(self)] {}));
|
||||
}
|
||||
|
||||
// Returns a buffer which reflects contents of this page.
|
||||
// The buffer will not prevent eviction.
|
||||
// The buffer is invalidated when the page is evicted or when the owning LSA region invalidates references.
|
||||
temporary_buffer<char> get_buf_weak() {
|
||||
return temporary_buffer<char>(_lsa_buf.get(), _lsa_buf.size(), deleter());
|
||||
}
|
||||
|
||||
size_t size_in_allocator() {
|
||||
// lsa_buf occupies 4K in LSA even if the buf size is smaller.
|
||||
// _buf is transient and not accounted here.
|
||||
@@ -187,6 +194,31 @@ private:
|
||||
});
|
||||
}
|
||||
public:
|
||||
class page_view {
|
||||
cached_page::ptr_type _page;
|
||||
size_t _offset;
|
||||
size_t _size;
|
||||
std::optional<reader_permit::resource_units> _units;
|
||||
public:
|
||||
page_view() = default;
|
||||
page_view(size_t offset, size_t size, cached_page::ptr_type page, std::optional<reader_permit::resource_units> units)
|
||||
: _page(std::move(page))
|
||||
, _offset(offset)
|
||||
, _size(size)
|
||||
, _units(std::move(units))
|
||||
{}
|
||||
|
||||
// The returned buffer is valid only until the LSA region associated with cached_file invalidates references.
|
||||
temporary_buffer<char> get_buf() {
|
||||
auto buf = _page->get_buf_weak();
|
||||
buf.trim(_size);
|
||||
buf.trim_front(_offset);
|
||||
return buf;
|
||||
}
|
||||
|
||||
operator bool() const { return bool(_page); }
|
||||
};
|
||||
|
||||
// Generator of subsequent pages of data reflecting the contents of the file.
|
||||
// Single-user.
|
||||
class stream {
|
||||
@@ -243,6 +275,27 @@ public:
|
||||
return page;
|
||||
});
|
||||
}
|
||||
|
||||
// Yields the next chunk of data.
|
||||
// Returns empty buffer when end-of-stream is reached.
|
||||
// Calls must be serialized.
|
||||
// This instance must be kept alive until the returned future resolves.
|
||||
future<page_view> next_page_view() {
|
||||
if (!_cached_file || _page_idx > _cached_file->_last_page) {
|
||||
return make_ready_future<page_view>(page_view());
|
||||
}
|
||||
auto units = get_page_units();
|
||||
return _cached_file->get_page_ptr(_page_idx, *_pc, _trace_state).then(
|
||||
[this, units = std::move(units)] (cached_page::ptr_type page) mutable {
|
||||
size_t size = _page_idx == _cached_file->_last_page
|
||||
? _cached_file->_last_page_size
|
||||
: page_size;
|
||||
page_view buf(_offset_in_page, size, std::move(page), std::move(units));
|
||||
_offset_in_page = 0;
|
||||
++_page_idx;
|
||||
return buf;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
void on_evicted(cached_page& p) {
|
||||
@@ -360,6 +413,10 @@ public:
|
||||
file& get_file() {
|
||||
return _file;
|
||||
}
|
||||
|
||||
logalloc::region& region() {
|
||||
return _region;
|
||||
}
|
||||
};
|
||||
|
||||
inline
|
||||
|
||||
Reference in New Issue
Block a user