diff --git a/utils/io-wrappers.cc b/utils/io-wrappers.cc index 67910170bc..4b774d8a3f 100644 --- a/utils/io-wrappers.cc +++ b/utils/io-wrappers.cc @@ -7,6 +7,7 @@ */ #include "io-wrappers.hh" +#include "seekable_source.hh" #include using namespace seastar; @@ -243,3 +244,85 @@ data_source create_memory_source(std::vector> bu return data_source(std::make_unique(std::move(bufs))); } +seastar::file create_file_for_seekable_source(seekable_data_source src, seekable_data_source_shard_src src_func) { + class seekable_data_source_file_impl : public noop_file_impl { + seekable_data_source _source; + seekable_data_source_shard_src _src_func; + public: + seekable_data_source_file_impl(seekable_data_source source, seekable_data_source_shard_src src_func) + : _source(std::move(source)) + , _src_func(std::move(src_func)) + {} + future read_dma(uint64_t pos, void* buffer, size_t len, io_intent*) override { + co_await _source.seek(pos); + size_t res = 0; + auto dst = reinterpret_cast(buffer); + while (len) { + auto buf = co_await _source.get(len); + assert(buf.size() <= len); + if (buf.empty()) { + break; + } + dst = std::copy(buf.begin(), buf.end(), dst); + len -= buf.size(); + res += buf.size(); + } + co_return res; + } + future read_dma(uint64_t pos, std::vector iov, io_intent* intent) override { + size_t res = 0; + for (auto& iv : iov) { + auto n = co_await read_dma(pos, iv.iov_base, iv.iov_len, intent); + if (n == 0) { + break; + } + res += n; + } + co_return res; + } + future size(void) override { + return _source.size(); + } + future stat(void) override { + struct stat res = {}; + res.st_nlink = 1; + res.st_mode = S_IFREG | S_IRUSR | S_IRGRP | S_IROTH; + res.st_size = co_await _source.size(); + res.st_blksize = 1 << 10; // huh? + res.st_blocks = res.st_size >> 9; + res.st_mtime = std::chrono::system_clock::to_time_t(co_await _source.timestamp()); + res.st_ctime = res.st_mtime; + co_return res; + } + future> dma_read_bulk(uint64_t offset, size_t range_size, io_intent* intent) override { + temporary_buffer res(range_size); + auto n = co_await read_dma(offset, res.get_write(), range_size, intent); + res.trim(n); + co_return res; + } + std::unique_ptr dup() override { + if (!_src_func) { + not_implemented(); + } + class my_file_handle_impl : public file_handle_impl { + seekable_data_source_shard_src _func; + public: + my_file_handle_impl(seekable_data_source_shard_src func) + : _func(std::move(func)) + {} + std::unique_ptr clone() const override { + return std::make_unique(_func); + } + shared_ptr to_file() && override { + auto src = _func(); + return seastar::make_shared(std::move(src), std::move(_func)); + } + }; + return std::make_unique(_src_func); + } + future<> close() override { + co_await _source.close(); + } + }; + return file{seastar::make_shared(std::move(src), std::move(src_func))}; +} diff --git a/utils/io-wrappers.hh b/utils/io-wrappers.hh index 6ae4ed9785..ee49154407 100644 --- a/utils/io-wrappers.hh +++ b/utils/io-wrappers.hh @@ -45,3 +45,9 @@ seastar::data_sink create_memory_sink(std::vector>); + +class seekable_data_source; + +using seekable_data_source_shard_src = std::function; + +seastar::file create_file_for_seekable_source(seekable_data_source, seekable_data_source_shard_src = {});