utils: Replace local memory sink/source with seastar equivalents

Replace the local buffer_data_sink_impl and buffer_data_source_impl
classes in create_memory_sink() and create_memory_source() with
seastar::util::memory_data_sink and seastar::util::memory_data_source
respectively, which are now available upstream.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Closes scylladb/scylladb#29616
This commit is contained in:
Pavel Emelyanov
2026-04-23 11:42:41 +03:00
committed by Botond Dénes
parent 1f7d20f701
commit 30f1075544

View File

@@ -9,6 +9,8 @@
#include "io-wrappers.hh"
#include "seekable_source.hh"
#include <seastar/util/internal/iovec_utils.hh>
#include <seastar/util/memory-data-sink.hh>
#include <seastar/util/memory-data-source.hh>
using namespace seastar;
@@ -173,68 +175,11 @@ file create_noop_file() {
}
data_sink create_memory_sink(std::vector<seastar::temporary_buffer<char>>& bufs) {
// TODO: move to seastar. Based on memory_data_sink, but allowing us
// to actually move away the buffers later. I don't want to modify
// util classes in an enterprise patch.
class buffer_data_sink_impl : public data_sink_impl {
std::vector<temporary_buffer<char>>& _bufs;
public:
buffer_data_sink_impl(std::vector<temporary_buffer<char>>& bufs)
: _bufs(bufs)
{}
future<> put(std::span<temporary_buffer<char>> bufs) override {
for (auto&& buf : bufs) {
_bufs.emplace_back(std::move(buf));
}
return make_ready_future<>();
}
future<> flush() override {
return make_ready_future<>();
}
future<> close() override {
return make_ready_future<>();
}
size_t buffer_size() const noexcept override {
return 128*1024;
}
};
return data_sink(std::make_unique<buffer_data_sink_impl>(bufs));
return data_sink(std::make_unique<seastar::util::memory_data_sink>(bufs, 128*1024));
}
data_source create_memory_source(std::vector<seastar::temporary_buffer<char>> bufs) {
// TODO: move to seastar. Based on buffer_input... in utils, but
// handles potential 1+ buffers
class buffer_data_source_impl : public data_source_impl {
private:
std::vector<temporary_buffer<char>> _bufs;
size_t _index = 0;
public:
buffer_data_source_impl(std::vector<temporary_buffer<char>>&& bufs)
: _bufs(std::move(bufs))
{}
buffer_data_source_impl(buffer_data_source_impl&&) noexcept = default;
buffer_data_source_impl& operator=(buffer_data_source_impl&&) noexcept = default;
future<temporary_buffer<char>> get() override {
if (_index < _bufs.size()) {
return make_ready_future<temporary_buffer<char>>(std::move(_bufs.at(_index++)));
}
return make_ready_future<temporary_buffer<char>>();
}
future<temporary_buffer<char>> skip(uint64_t n) override {
while (n > 0 && _index < _bufs.size()) {
auto& buf = _bufs.at(_index);
auto min = std::min(n, buf.size());
buf.trim_front(min);
if (buf.empty()) {
++_index;
}
n -= min;
}
return get();
}
};
return data_source(std::make_unique<buffer_data_source_impl>(std::move(bufs)));
return data_source(std::make_unique<seastar::util::memory_data_source>(std::move(bufs)));
}
seastar::file create_file_for_seekable_source(seekable_data_source src, seekable_data_source_shard_src src_func) {