utils::io-wrappers: Add file wrapper type for seekable_source

Provides a read-only file interface for a seekable_source object.
This commit is contained in:
Calle Wilund
2025-09-10 13:45:20 +00:00
parent e62a18304e
commit 91c0467282
2 changed files with 89 additions and 0 deletions

View File

@@ -7,6 +7,7 @@
*/
#include "io-wrappers.hh"
#include "seekable_source.hh"
#include <seastar/util/internal/iovec_utils.hh>
using namespace seastar;
@@ -243,3 +244,85 @@ data_source create_memory_source(std::vector<seastar::temporary_buffer<char>> bu
return data_source(std::make_unique<buffer_data_source_impl>(std::move(bufs)));
}
seastar::file create_file_for_seekable_source(seekable_data_source src, seekable_data_source_shard_src src_func) {
class seekable_data_source_file_impl : public noop_file_impl {
seekable_data_source _source;
seekable_data_source_shard_src _src_func;
public:
seekable_data_source_file_impl(seekable_data_source source, seekable_data_source_shard_src src_func)
: _source(std::move(source))
, _src_func(std::move(src_func))
{}
future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, io_intent*) override {
co_await _source.seek(pos);
size_t res = 0;
auto dst = reinterpret_cast<char*>(buffer);
while (len) {
auto buf = co_await _source.get(len);
assert(buf.size() <= len);
if (buf.empty()) {
break;
}
dst = std::copy(buf.begin(), buf.end(), dst);
len -= buf.size();
res += buf.size();
}
co_return res;
}
future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, io_intent* intent) override {
size_t res = 0;
for (auto& iv : iov) {
auto n = co_await read_dma(pos, iv.iov_base, iv.iov_len, intent);
if (n == 0) {
break;
}
res += n;
}
co_return res;
}
future<uint64_t> size(void) override {
return _source.size();
}
future<struct stat> stat(void) override {
struct stat res = {};
res.st_nlink = 1;
res.st_mode = S_IFREG | S_IRUSR | S_IRGRP | S_IROTH;
res.st_size = co_await _source.size();
res.st_blksize = 1 << 10; // huh?
res.st_blocks = res.st_size >> 9;
res.st_mtime = std::chrono::system_clock::to_time_t(co_await _source.timestamp());
res.st_ctime = res.st_mtime;
co_return res;
}
future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, io_intent* intent) override {
temporary_buffer<uint8_t> res(range_size);
auto n = co_await read_dma(offset, res.get_write(), range_size, intent);
res.trim(n);
co_return res;
}
std::unique_ptr<seastar::file_handle_impl> dup() override {
if (!_src_func) {
not_implemented();
}
class my_file_handle_impl : public file_handle_impl {
seekable_data_source_shard_src _func;
public:
my_file_handle_impl(seekable_data_source_shard_src func)
: _func(std::move(func))
{}
std::unique_ptr<file_handle_impl> clone() const override {
return std::make_unique<my_file_handle_impl>(_func);
}
shared_ptr<file_impl> to_file() && override {
auto src = _func();
return seastar::make_shared<seekable_data_source_file_impl>(std::move(src), std::move(_func));
}
};
return std::make_unique<my_file_handle_impl>(_src_func);
}
future<> close() override {
co_await _source.close();
}
};
return file{seastar::make_shared<seekable_data_source_file_impl>(std::move(src), std::move(src_func))};
}

View File

@@ -45,3 +45,9 @@ seastar::data_sink create_memory_sink(std::vector<seastar::temporary_buffer<char
* from the source vector buffers.
*/
seastar::data_source create_memory_source(std::vector<seastar::temporary_buffer<char>>);
class seekable_data_source;
using seekable_data_source_shard_src = std::function<seekable_data_source()>;
seastar::file create_file_for_seekable_source(seekable_data_source, seekable_data_source_shard_src = {});