Large reserves in allocating_section can cause stalls. We already log reserve increase, but we don't know which table it belongs to: lsa - LSA allocation failure, increasing reserve in section 0x600009f94590 to 128 segments; Allocating sections used for updating row cache on memtable flush are notoriously problematic. Each table has its own row_cache, so its own allocating_section(s). If we attached table name to those sections, we could identify which table is causing problems. In some issues we suspected system.raft, but we can't be sure. This patch allows naming allocating_sections for the purpose of identifying them in such log messages. I use abstract_formatter for this purpose to avoid the cost of formatting strings on the hot path (e.g. index_reader). And also to avoid duplicating strings which are already stored elsewhere. Fixes #25799 Closes scylladb/scylladb#27470
658 lines
26 KiB
C++
658 lines
26 KiB
C++
/*
|
|
* Copyright (C) 2019-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "reader_permit.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/div_ceil.hh"
|
|
#include "utils/bptree.hh"
|
|
#include "utils/logalloc.hh"
|
|
#include "utils/lru.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include "tracing/trace_state.hh"
|
|
#include "utils/cached_file_stats.hh"
|
|
|
|
#include <seastar/core/file.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
|
|
using namespace seastar;
|
|
|
|
/// \brief A read-through cache of a file.
|
|
///
|
|
/// Caches contents with page granularity (4 KiB).
|
|
/// Cached pages are evicted by the LRU or manually using the invalidate_*() method family, or when the object is destroyed.
|
|
///
|
|
/// Concurrent reading is allowed.
|
|
///
|
|
/// The object is movable but this is only allowed before readers are created.
|
|
///
|
|
class cached_file {
|
|
public:
|
|
// Must be aligned to _file.disk_read_dma_alignment(). 4K is always safe.
|
|
static constexpr size_t page_size = 4096;
|
|
|
|
// The content of the underlying file (_file) is divided into pages
|
|
// of equal size (page_size). This type is used to identify pages.
|
|
// Pages are assigned consecutive identifiers starting from 0.
|
|
using page_idx_type = uint64_t;
|
|
|
|
// Represents the number of pages in a sequence
|
|
using page_count_type = uint64_t;
|
|
|
|
using offset_type = uint64_t;
|
|
|
|
private:
|
|
class cached_page final : public index_evictable {
|
|
public:
|
|
cached_file* parent;
|
|
page_idx_type idx;
|
|
logalloc::lsa_buffer _lsa_buf;
|
|
temporary_buffer<char> _buf; // Empty when not shared. May mirror _lsa_buf when shared.
|
|
size_t _use_count = 0;
|
|
public:
|
|
struct cached_page_del {
|
|
void operator()(cached_page* cp) {
|
|
if (--cp->_use_count == 0) {
|
|
cp->parent->_metrics.bytes_in_std -= cp->_buf.size();
|
|
cp->_buf = {};
|
|
cp->parent->_lru.add(*cp);
|
|
}
|
|
}
|
|
};
|
|
|
|
using ptr_type = std::unique_ptr<cached_page, cached_page_del>;
|
|
|
|
// As long as any ptr_type is alive, this cached_page will not be destroyed
|
|
// because it will not be linked in the LRU.
|
|
ptr_type share() noexcept {
|
|
if (_use_count++ == 0) {
|
|
if (is_linked()) {
|
|
parent->_lru.remove(*this);
|
|
}
|
|
}
|
|
return std::unique_ptr<cached_page, cached_page_del>(this);
|
|
}
|
|
|
|
bool only_ref() const {
|
|
return _use_count <= 1;
|
|
}
|
|
public:
|
|
explicit cached_page(cached_file* parent, page_idx_type idx, temporary_buffer<char> buf)
|
|
: parent(parent)
|
|
, idx(idx)
|
|
, _buf(std::move(buf))
|
|
{
|
|
_lsa_buf = parent->_region.alloc_buf(_buf.size());
|
|
parent->_metrics.bytes_in_std += _buf.size();
|
|
std::copy(_buf.begin(), _buf.end(), _lsa_buf.get());
|
|
}
|
|
|
|
cached_page(cached_page&&) noexcept {
|
|
// The move constructor is required by allocation_strategy::construct() due to generic bplus::tree,
|
|
// but the object is always allocated in the standard allocator context so never actually moved.
|
|
// We cannot properly implement the move constructor because "this" is captured in various places.
|
|
abort();
|
|
}
|
|
|
|
~cached_page() {
|
|
SCYLLA_ASSERT(!_use_count);
|
|
}
|
|
|
|
size_t pos() const {
|
|
return idx * page_size;
|
|
}
|
|
|
|
void on_evicted() noexcept override;
|
|
|
|
temporary_buffer<char> get_buf() {
|
|
auto self = share();
|
|
if (!_buf) {
|
|
_buf = temporary_buffer<char>(_lsa_buf.size());
|
|
parent->_metrics.bytes_in_std += _lsa_buf.size();
|
|
std::copy(_lsa_buf.get(), _lsa_buf.get() + _lsa_buf.size(), _buf.get_write());
|
|
}
|
|
// Holding to a temporary buffer holds the cached page so that the buffer can be reused by concurrent hits.
|
|
// Also, sharing cached_page keeps the temporary_buffer's storage alive.
|
|
return temporary_buffer<char>(_buf.get_write(), _buf.size(), make_deleter([self = std::move(self)] {}));
|
|
}
|
|
|
|
// Returns a pointer to the contents of the page.
|
|
// The buffer is invalidated when the page is evicted or when the owning LSA region invalidates references.
|
|
char* begin() {
|
|
return _lsa_buf.get();
|
|
}
|
|
|
|
// Returns a pointer to the contents of the page.
|
|
// The buffer can be invalidated when the page is evicted or when the owning LSA region invalidates references.
|
|
std::span<const std::byte> get_view() const {
|
|
return std::as_bytes(std::span<const char>(_lsa_buf.get(), _lsa_buf.size()));
|
|
}
|
|
|
|
size_t size_in_allocator() {
|
|
// lsa_buf occupies 4K in LSA even if the buf size is smaller.
|
|
// _buf is transient and not accounted here.
|
|
return page_size;
|
|
}
|
|
};
|
|
|
|
struct page_idx_less_comparator {
|
|
bool operator()(page_idx_type lhs, page_idx_type rhs) const noexcept {
|
|
return lhs < rhs;
|
|
}
|
|
};
|
|
|
|
file _file;
|
|
sstring _file_name; // for logging / tracing
|
|
cached_file_stats& _metrics;
|
|
lru& _lru;
|
|
logalloc::region& _region;
|
|
logalloc::allocating_section _as;
|
|
|
|
using cache_type = bplus::tree<page_idx_type, cached_page, page_idx_less_comparator, 12, bplus::key_search::linear>;
|
|
cache_type _cache;
|
|
|
|
const offset_type _size;
|
|
offset_type _cached_bytes = 0;
|
|
|
|
offset_type _last_page_size;
|
|
page_idx_type _last_page;
|
|
public:
|
|
using ptr_type = cached_page::ptr_type;
|
|
struct page_read_result {
|
|
ptr_type ptr;
|
|
bool was_already_cached;
|
|
page_read_result(ptr_type p, bool cached)
|
|
: ptr(std::move(p))
|
|
, was_already_cached(cached)
|
|
{}
|
|
};
|
|
future<page_read_result> get_shared_page(size_t global_pos, tracing::trace_state_ptr trace_state) {
|
|
return get_page_ptr(global_pos / page_size, 1, trace_state);
|
|
}
|
|
future<page_read_result> get_shared_page(size_t global_pos, reader_permit permit, tracing::trace_state_ptr trace_state) {
|
|
return get_page_ptr(global_pos / page_size, 1, trace_state, permit);
|
|
}
|
|
private:
|
|
future<page_read_result> get_page_ptr(page_idx_type idx,
|
|
page_count_type read_ahead,
|
|
tracing::trace_state_ptr trace_state,
|
|
std::optional<reader_permit> permit = {}) {
|
|
auto i = _cache.lower_bound(idx);
|
|
if (i != _cache.end() && i->idx == idx) {
|
|
++_metrics.page_hits;
|
|
tracing::trace(trace_state, "page cache hit: file={}, page={}", _file_name, idx);
|
|
cached_page& cp = *i;
|
|
return make_ready_future<page_read_result>(cp.share(), true);
|
|
}
|
|
tracing::trace(trace_state, "page cache miss: file={}, page={}, readahead={}", _file_name, idx, read_ahead);
|
|
++_metrics.page_misses;
|
|
size_t size = (idx + read_ahead) > _last_page
|
|
? (_last_page_size + (_last_page - idx) * page_size)
|
|
: read_ahead * page_size;
|
|
|
|
std::optional<reader_permit::resource_units> units;
|
|
std::optional<reader_permit::awaits_guard> await_guard;
|
|
if (permit) {
|
|
units = permit->consume_memory(size);
|
|
await_guard.emplace(*permit);
|
|
}
|
|
|
|
return _file.dma_read_exactly<char>(idx * page_size, size)
|
|
.then([this, ag = std::move(await_guard), units = std::move(units), idx] (temporary_buffer<char>&& buf) mutable {
|
|
cached_page::ptr_type first_page;
|
|
while (buf.size()) {
|
|
auto this_size = std::min(page_size, buf.size());
|
|
// _cache.emplace() needs to run under allocating section even though it lives in the std space
|
|
// because bplus::tree operations are not reentrant, so we need to prevent memory reclamation.
|
|
auto [cp, missed] = _as(_region, [&] {
|
|
auto this_buf = buf.share();
|
|
this_buf.trim(this_size);
|
|
return _cache.emplace(idx, this, idx, std::move(this_buf));
|
|
});
|
|
buf.trim_front(this_size);
|
|
++idx;
|
|
if (missed) {
|
|
++_metrics.page_populations;
|
|
_metrics.cached_bytes += cp->size_in_allocator();
|
|
_cached_bytes += cp->size_in_allocator();
|
|
}
|
|
// pages read ahead will be placed into LRU, as there's no guarantee they will be fetched later.
|
|
cached_page::ptr_type ptr = cp->share();
|
|
if (!first_page) {
|
|
first_page = std::move(ptr);
|
|
}
|
|
}
|
|
utils::get_local_injector().inject("cached_file_get_first_page", []() {
|
|
throw std::bad_alloc();
|
|
});
|
|
return page_read_result{std::move(first_page), false};
|
|
});
|
|
}
|
|
// Returns (page, true) if the page was cached, and (page, false) if the page was uncached.
|
|
future<std::pair<temporary_buffer<char>, bool>> get_page(page_idx_type idx,
|
|
page_count_type count,
|
|
tracing::trace_state_ptr trace_state,
|
|
std::optional<reader_permit> permit = {}) {
|
|
return get_page_ptr(idx, count, std::move(trace_state), permit).then([permit] (page_read_result read_result) mutable {
|
|
auto buf = read_result.ptr->get_buf();
|
|
if (permit) {
|
|
auto units = permit->consume_memory(buf.size());
|
|
buf = temporary_buffer<char>(buf.get_write(), buf.size(),
|
|
make_object_deleter(buf.release(), std::move(units)));
|
|
}
|
|
return std::make_pair(std::move(buf), read_result.was_already_cached);
|
|
});
|
|
}
|
|
public:
|
|
class page_view {
|
|
cached_page::ptr_type _page;
|
|
size_t _offset;
|
|
size_t _size = 0;
|
|
std::optional<reader_permit::resource_units> _units;
|
|
public:
|
|
page_view() = default;
|
|
|
|
page_view(size_t offset, size_t size, cached_page::ptr_type page, std::optional<reader_permit::resource_units> units)
|
|
: _page(std::move(page))
|
|
, _offset(offset)
|
|
, _size(size)
|
|
, _units(std::move(units))
|
|
{}
|
|
|
|
page_view(page_view&& o) noexcept
|
|
: _page(std::move(o._page))
|
|
, _offset(std::exchange(o._offset, 0))
|
|
, _size(std::exchange(o._size, 0))
|
|
, _units(std::move(o._units))
|
|
{}
|
|
|
|
page_view& operator=(page_view&& o) noexcept {
|
|
_page = std::move(o._page);
|
|
_offset = std::exchange(o._offset, 0);
|
|
_size = std::exchange(o._size, 0);
|
|
_units = std::move(o._units);
|
|
return *this;
|
|
}
|
|
|
|
// Fills the page with garbage, releases the pointer and evicts the page so that it's no longer in cache.
|
|
// For testing use-after-free on the buffer space.
|
|
// After the call, the object is the same state as after being moved-from.
|
|
void release_and_scramble() noexcept {
|
|
if (_page->only_ref()) {
|
|
std::memset(_page->_lsa_buf.get(), 0xfe, _page->_lsa_buf.size());
|
|
cached_page& cp = *_page;
|
|
_page = nullptr;
|
|
cp.parent->_lru.remove(cp);
|
|
cp.on_evicted();
|
|
} else {
|
|
_page = nullptr;
|
|
}
|
|
_size = 0;
|
|
_offset = 0;
|
|
_units = std::nullopt;
|
|
}
|
|
|
|
operator bool() const { return bool(_page) && _size; }
|
|
public: // ContiguousSharedBuffer concept
|
|
const char* begin() const { return _page ? _page->begin() + _offset : nullptr; }
|
|
const char* get() const { return begin(); }
|
|
const char* end() const { return begin() + _size; }
|
|
size_t size() const { return _size; }
|
|
bool empty() const { return !_size; }
|
|
char* get_write() { return const_cast<char*>(begin()); }
|
|
|
|
void trim(size_t pos) {
|
|
_size = pos;
|
|
}
|
|
|
|
void trim_front(size_t n) {
|
|
_offset += n;
|
|
_size -= n;
|
|
}
|
|
|
|
page_view share() {
|
|
return share(0, _size);
|
|
}
|
|
|
|
page_view share(size_t pos, size_t len) {
|
|
return page_view(_offset + pos, len, _page->share(), {});
|
|
}
|
|
};
|
|
|
|
// Generator of subsequent pages of data reflecting the contents of the file.
|
|
// Single-user.
|
|
class stream {
|
|
cached_file* _cached_file;
|
|
std::optional<reader_permit> _permit;
|
|
page_idx_type _page_idx;
|
|
offset_type _offset_in_page;
|
|
offset_type _size_hint;
|
|
tracing::trace_state_ptr _trace_state;
|
|
private:
|
|
std::optional<reader_permit::resource_units> get_page_units(size_t size = page_size) {
|
|
return _permit
|
|
? std::make_optional(_permit->consume_memory(size))
|
|
: std::nullopt;
|
|
}
|
|
void shrink_size_hint(bool page_was_cached) {
|
|
if (page_was_cached) {
|
|
// If the page was cached, shrink the _size_hint by page_size,
|
|
// but don't reduce it below page_size.
|
|
_size_hint = std::max(_size_hint, 2 * page_size) - page_size;
|
|
} else {
|
|
// If the page was uncached, then get_page read the entire _size_hint bytes from disk,
|
|
// (in one I/O operation) and inserted the read pages into the cache.
|
|
// We will most likely serve the remainder of the stream from them.
|
|
//
|
|
// But if some of those pages happen to be evicted before we complete the read
|
|
// (this shouldn't really happen in practice, because in practice stay in cache
|
|
// for much, much longer than any read takes, but still), we don't want to read
|
|
// something on the order of _size_hint again, as that could result, in theory,
|
|
// in a quadratic amount of work.
|
|
//
|
|
// So in the very unlikely chance that we will have to re-read something from disk,
|
|
// let's do it page-by-page.
|
|
_size_hint = page_size;
|
|
}
|
|
}
|
|
public:
|
|
// Creates an empty stream.
|
|
stream()
|
|
: _cached_file(nullptr)
|
|
{ }
|
|
|
|
stream(cached_file& cf, std::optional<reader_permit> permit, tracing::trace_state_ptr trace_state,
|
|
page_idx_type start_page, offset_type start_offset_in_page, offset_type size_hint)
|
|
: _cached_file(&cf)
|
|
, _permit(std::move(permit))
|
|
, _page_idx(start_page)
|
|
, _offset_in_page(start_offset_in_page)
|
|
, _size_hint(size_hint)
|
|
, _trace_state(std::move(trace_state))
|
|
{ }
|
|
|
|
// Yields the next chunk of data.
|
|
// Returns empty buffer when end-of-stream is reached.
|
|
// Calls must be serialized.
|
|
// This instance must be kept alive until the returned future resolves.
|
|
future<temporary_buffer<char>> next() {
|
|
if (!_cached_file || _page_idx > _cached_file->_last_page) {
|
|
return make_ready_future<temporary_buffer<char>>(temporary_buffer<char>());
|
|
}
|
|
page_count_type readahead = div_ceil(_size_hint, page_size);
|
|
return _cached_file->get_page(_page_idx, readahead, _trace_state, _permit).then(
|
|
[this] (std::pair<temporary_buffer<char>, bool> read_result) mutable {
|
|
auto page = std::move(read_result.first);
|
|
if (_page_idx == _cached_file->_last_page) {
|
|
page.trim(_cached_file->_last_page_size);
|
|
}
|
|
page.trim_front(_offset_in_page);
|
|
_offset_in_page = 0;
|
|
++_page_idx;
|
|
shrink_size_hint(read_result.second);
|
|
return page;
|
|
});
|
|
}
|
|
|
|
// Yields the next chunk of data.
|
|
// Returns empty buffer when end-of-stream is reached.
|
|
// Calls must be serialized.
|
|
// This instance must be kept alive until the returned future resolves.
|
|
future<page_view> next_page_view() {
|
|
if (!_cached_file || _page_idx > _cached_file->_last_page) {
|
|
return make_ready_future<page_view>(page_view());
|
|
}
|
|
page_count_type readahead = div_ceil(_size_hint, page_size);
|
|
return _cached_file->get_page_ptr(_page_idx, readahead, _trace_state, _permit).then(
|
|
[this] (page_read_result read_result) mutable {
|
|
auto page = std::move(read_result.ptr);
|
|
size_t size = _page_idx == _cached_file->_last_page
|
|
? _cached_file->_last_page_size
|
|
: page_size;
|
|
page_view buf(_offset_in_page, size - _offset_in_page, std::move(page), get_page_units());
|
|
_offset_in_page = 0;
|
|
++_page_idx;
|
|
shrink_size_hint(read_result.was_already_cached);
|
|
return buf;
|
|
});
|
|
}
|
|
};
|
|
|
|
void on_evicted(cached_page& p) {
|
|
_metrics.cached_bytes -= p.size_in_allocator();
|
|
_cached_bytes -= p.size_in_allocator();
|
|
++_metrics.page_evictions;
|
|
}
|
|
|
|
size_t evict_range(cache_type::iterator start, cache_type::iterator end) noexcept {
|
|
return with_allocator(standard_allocator(), [&] {
|
|
size_t count = 0;
|
|
auto disposer = [] (auto* p) noexcept {};
|
|
while (start != end) {
|
|
if (start->is_linked()) {
|
|
++count;
|
|
_lru.remove(*start);
|
|
on_evicted(*start);
|
|
start = start.erase_and_dispose(disposer, page_idx_less_comparator());
|
|
} else {
|
|
++start;
|
|
}
|
|
}
|
|
return count;
|
|
});
|
|
}
|
|
public:
|
|
/// \brief Constructs a cached_file.
|
|
///
|
|
/// The cached area will reflect subset of f from the byte range [start, start + size).
|
|
///
|
|
/// \param m Metrics object which should be updated from operations on this object.
|
|
/// The metrics object can be shared by many cached_file instances, in which case it
|
|
/// will reflect the sum of operations on all cached_file instances.
|
|
cached_file(file f, cached_file_stats& m, lru& l, logalloc::region& reg, offset_type size, sstring file_name = {})
|
|
: _file(std::move(f))
|
|
, _file_name(std::move(file_name))
|
|
, _metrics(m)
|
|
, _lru(l)
|
|
, _region(reg)
|
|
, _as(abstract_formatter([this] (fmt::format_context& ctx) {
|
|
fmt::format_to(ctx.out(), "cached_file {}", _file_name);
|
|
}))
|
|
, _cache(page_idx_less_comparator())
|
|
, _size(size)
|
|
{
|
|
offset_type last_byte_offset = _size ? (_size - 1) : 0;
|
|
_last_page_size = (last_byte_offset % page_size) + (_size ? 1 : 0);
|
|
_last_page = last_byte_offset / page_size;
|
|
}
|
|
|
|
cached_file(cached_file&&) = delete; // captured this
|
|
cached_file(const cached_file&) = delete;
|
|
|
|
~cached_file() {
|
|
evict_range(_cache.begin(), _cache.end());
|
|
SCYLLA_ASSERT(_cache.empty());
|
|
}
|
|
|
|
/// \brief Invalidates [start, end) or less.
|
|
///
|
|
/// Invariants:
|
|
///
|
|
/// - all bytes outside [start, end) which were cached before the call will still be cached.
|
|
///
|
|
void invalidate_at_most(offset_type start, offset_type end, tracing::trace_state_ptr trace_state = {}) {
|
|
auto lo_page = start / page_size
|
|
// If start is 0 then we can drop the containing page
|
|
// Otherwise we cannot drop the page.
|
|
+ bool(start % page_size) * bool(start != 0);
|
|
|
|
auto hi_page = (end) / page_size;
|
|
|
|
if (lo_page < hi_page) {
|
|
auto count = evict_range(_cache.lower_bound(lo_page), _cache.lower_bound(hi_page));
|
|
if (count) {
|
|
tracing::trace(trace_state, "page cache: evicted {} page(s) in [{}, {}), file={}", count,
|
|
lo_page, hi_page, _file_name);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// \brief Equivalent to \ref invalidate_at_most(0, end).
|
|
void invalidate_at_most_front(offset_type end, tracing::trace_state_ptr trace_state = {}) {
|
|
auto hi_page = end / page_size;
|
|
auto count = evict_range(_cache.begin(), _cache.lower_bound(hi_page));
|
|
if (count) {
|
|
tracing::trace(trace_state, "page cache: evicted {} page(s) in [0, {}), file={}", count,
|
|
hi_page, _file_name);
|
|
}
|
|
}
|
|
|
|
/// \brief Read from the file
|
|
///
|
|
/// Returns a stream with data which starts at position pos in the area managed by this instance.
|
|
/// This cached_file instance must outlive the returned stream and buffers returned by the stream.
|
|
/// The stream does not do any read-ahead.
|
|
///
|
|
/// \param pos The offset of the first byte to read, relative to the cached file area.
|
|
/// \param permit Holds reader_permit under which returned buffers should be accounted.
|
|
/// When disengaged, no accounting is done.
|
|
stream read(offset_type global_pos, std::optional<reader_permit> permit,
|
|
tracing::trace_state_ptr trace_state = {},
|
|
size_t size_hint = page_size) {
|
|
if (global_pos >= _size) {
|
|
return stream();
|
|
}
|
|
auto offset = global_pos % page_size;
|
|
auto page_idx = global_pos / page_size;
|
|
return stream(*this, std::move(permit), std::move(trace_state), page_idx, offset, size_hint);
|
|
}
|
|
|
|
/// \brief Returns the number of bytes in the area managed by this instance.
|
|
offset_type size() const {
|
|
return _size;
|
|
}
|
|
|
|
/// \brief Returns the number of bytes cached.
|
|
size_t cached_bytes() const {
|
|
return _cached_bytes;
|
|
}
|
|
|
|
/// \brief Returns the underlying file.
|
|
file& get_file() {
|
|
return _file;
|
|
}
|
|
|
|
logalloc::region& region() {
|
|
return _region;
|
|
}
|
|
|
|
// Evicts all unused pages.
|
|
// Pages which are used are not removed.
|
|
future<> evict_gently() {
|
|
auto i = _cache.begin();
|
|
while (i != _cache.end()) {
|
|
if (i->is_linked()) {
|
|
_lru.remove(*i);
|
|
on_evicted(*i);
|
|
i = i.erase(page_idx_less_comparator());
|
|
} else {
|
|
++i;
|
|
}
|
|
if (need_preempt() && i != _cache.end()) {
|
|
auto key = i->idx;
|
|
co_await coroutine::maybe_yield();
|
|
i = _cache.lower_bound(key);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
inline
|
|
void cached_file::cached_page::on_evicted() noexcept {
|
|
parent->on_evicted(*this);
|
|
with_allocator(standard_allocator(), [this] {
|
|
cached_file::cache_type::iterator it(this);
|
|
it.erase(page_idx_less_comparator());
|
|
});
|
|
}
|
|
|
|
class cached_file_impl : public file_impl {
|
|
cached_file& _cf;
|
|
tracing::trace_state_ptr _trace_state;
|
|
private:
|
|
[[noreturn]] void unsupported() {
|
|
throw_with_backtrace<std::logic_error>("unsupported operation");
|
|
}
|
|
public:
|
|
cached_file_impl(cached_file& cf, tracing::trace_state_ptr trace_state = {})
|
|
: file_impl(*get_file_impl(cf.get_file()))
|
|
, _cf(cf)
|
|
, _trace_state(std::move(trace_state))
|
|
{ }
|
|
|
|
// unsupported
|
|
virtual future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, io_intent*) override { unsupported(); }
|
|
virtual future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, io_intent*) override { unsupported(); }
|
|
virtual future<> flush(void) override { unsupported(); }
|
|
virtual future<> truncate(uint64_t length) override { unsupported(); }
|
|
virtual future<> discard(uint64_t offset, uint64_t length) override { unsupported(); }
|
|
virtual future<> allocate(uint64_t position, uint64_t length) override { unsupported(); }
|
|
virtual subscription<directory_entry> list_directory(std::function<future<>(directory_entry)>) override { unsupported(); }
|
|
|
|
// delegating
|
|
virtual future<struct stat> stat(void) override { return _cf.get_file().stat(); }
|
|
virtual future<uint64_t> size(void) override { return _cf.get_file().size(); }
|
|
virtual future<> close() override { return _cf.get_file().close(); }
|
|
virtual std::unique_ptr<seastar::file_handle_impl> dup() override { return get_file_impl(_cf.get_file())->dup(); }
|
|
|
|
virtual future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t size, io_intent* intent) override {
|
|
return do_with(_cf.read(offset, std::nullopt, _trace_state, size), size, temporary_buffer<uint8_t>(),
|
|
[this, size] (cached_file::stream& s, size_t& size_left, temporary_buffer<uint8_t>& result) {
|
|
if (size_left == 0) {
|
|
return make_ready_future<temporary_buffer<uint8_t>>(std::move(result));
|
|
}
|
|
return repeat([this, &s, &size_left, &result, size] {
|
|
return s.next().then([this, &size_left, &result, size] (temporary_buffer<char> buf) {
|
|
if (!buf) {
|
|
throw seastar::file::eof_error();
|
|
}
|
|
if (!result) {
|
|
if (buf.size() >= size_left) {
|
|
result = temporary_buffer<uint8_t>(reinterpret_cast<uint8_t*>(buf.get_write()), size_left, buf.release());
|
|
return stop_iteration::yes;
|
|
}
|
|
result = temporary_buffer<uint8_t>::aligned(_memory_dma_alignment, size_left);
|
|
}
|
|
size_t this_len = std::min(buf.size(), size_left);
|
|
std::copy(buf.begin(), buf.begin() + this_len, result.get_write() + (size - size_left));
|
|
size_left -= this_len;
|
|
return stop_iteration(size_left == 0);
|
|
});
|
|
}).then([&] {
|
|
return std::move(result);
|
|
});
|
|
});
|
|
}
|
|
|
|
virtual future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, io_intent*) override {
|
|
unsupported(); // FIXME
|
|
}
|
|
|
|
virtual future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, io_intent*) override {
|
|
unsupported(); // FIXME
|
|
}
|
|
};
|
|
|
|
// Creates a seastar::file object which will read through a given cached_file instance.
|
|
// The cached_file object must be kept alive as long as the file is in use.
|
|
inline
|
|
file make_cached_seastar_file(cached_file& cf) {
|
|
return file(make_shared<cached_file_impl>(cf));
|
|
}
|