mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-16 04:42:07 +00:00
utils: Replace local memory sink/source with seastar equivalents
Replace the local buffer_data_sink_impl and buffer_data_source_impl classes in create_memory_sink() and create_memory_source() with seastar::util::memory_data_sink and seastar::util::memory_data_source respectively, which are now available upstream. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Closes scylladb/scylladb#29616
This commit is contained in:
committed by
Botond Dénes
parent
1f7d20f701
commit
30f1075544
@@ -9,6 +9,8 @@
|
||||
#include "io-wrappers.hh"
|
||||
#include "seekable_source.hh"
|
||||
#include <seastar/util/internal/iovec_utils.hh>
|
||||
#include <seastar/util/memory-data-sink.hh>
|
||||
#include <seastar/util/memory-data-source.hh>
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
@@ -173,68 +175,11 @@ file create_noop_file() {
|
||||
}
|
||||
|
||||
data_sink create_memory_sink(std::vector<seastar::temporary_buffer<char>>& bufs) {
|
||||
// TODO: move to seastar. Based on memory_data_sink, but allowing us
|
||||
// to actually move away the buffers later. I don't want to modify
|
||||
// util classes in an enterprise patch.
|
||||
class buffer_data_sink_impl : public data_sink_impl {
|
||||
std::vector<temporary_buffer<char>>& _bufs;
|
||||
public:
|
||||
buffer_data_sink_impl(std::vector<temporary_buffer<char>>& bufs)
|
||||
: _bufs(bufs)
|
||||
{}
|
||||
future<> put(std::span<temporary_buffer<char>> bufs) override {
|
||||
for (auto&& buf : bufs) {
|
||||
_bufs.emplace_back(std::move(buf));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> flush() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> close() override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
size_t buffer_size() const noexcept override {
|
||||
return 128*1024;
|
||||
}
|
||||
};
|
||||
return data_sink(std::make_unique<buffer_data_sink_impl>(bufs));
|
||||
return data_sink(std::make_unique<seastar::util::memory_data_sink>(bufs, 128*1024));
|
||||
}
|
||||
|
||||
data_source create_memory_source(std::vector<seastar::temporary_buffer<char>> bufs) {
|
||||
// TODO: move to seastar. Based on buffer_input... in utils, but
|
||||
// handles potential 1+ buffers
|
||||
class buffer_data_source_impl : public data_source_impl {
|
||||
private:
|
||||
std::vector<temporary_buffer<char>> _bufs;
|
||||
size_t _index = 0;
|
||||
public:
|
||||
buffer_data_source_impl(std::vector<temporary_buffer<char>>&& bufs)
|
||||
: _bufs(std::move(bufs))
|
||||
{}
|
||||
buffer_data_source_impl(buffer_data_source_impl&&) noexcept = default;
|
||||
buffer_data_source_impl& operator=(buffer_data_source_impl&&) noexcept = default;
|
||||
|
||||
future<temporary_buffer<char>> get() override {
|
||||
if (_index < _bufs.size()) {
|
||||
return make_ready_future<temporary_buffer<char>>(std::move(_bufs.at(_index++)));
|
||||
}
|
||||
return make_ready_future<temporary_buffer<char>>();
|
||||
}
|
||||
future<temporary_buffer<char>> skip(uint64_t n) override {
|
||||
while (n > 0 && _index < _bufs.size()) {
|
||||
auto& buf = _bufs.at(_index);
|
||||
auto min = std::min(n, buf.size());
|
||||
buf.trim_front(min);
|
||||
if (buf.empty()) {
|
||||
++_index;
|
||||
}
|
||||
n -= min;
|
||||
}
|
||||
return get();
|
||||
}
|
||||
};
|
||||
return data_source(std::make_unique<buffer_data_source_impl>(std::move(bufs)));
|
||||
return data_source(std::make_unique<seastar::util::memory_data_source>(std::move(bufs)));
|
||||
}
|
||||
|
||||
seastar::file create_file_for_seekable_source(seekable_data_source src, seekable_data_source_shard_src src_func) {
|
||||
|
||||
Reference in New Issue
Block a user