Drop the AGPL license in favor of a source-available license. See the blog post [1] for details. [1] https://www.scylladb.com/2024/12/18/why-were-moving-to-a-source-available-license/
240 lines
9.9 KiB
C++
240 lines
9.9 KiB
C++
/*
|
|
* Copyright (C) 2019-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/format.hh>
|
|
|
|
#include "tracing/traced_file.hh"
|
|
|
|
using namespace seastar;
|
|
|
|
class traced_file_impl : public file_impl {
|
|
private:
|
|
file _f;
|
|
|
|
tracing::trace_state_ptr _trace_state;
|
|
sstring _trace_prefix;
|
|
|
|
public:
|
|
traced_file_impl(file, tracing::trace_state_ptr, sstring);
|
|
|
|
virtual future<size_t> write_dma(uint64_t pos, const void* buf, size_t len, io_intent*) override;
|
|
virtual future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, io_intent*) override;
|
|
virtual future<size_t> read_dma(uint64_t pos, void* buf, size_t len, io_intent*) override;
|
|
virtual future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, io_intent*) override;
|
|
virtual future<> flush(void) override;
|
|
virtual future<struct stat> stat(void) override;
|
|
virtual future<> truncate(uint64_t length) override;
|
|
virtual future<> discard(uint64_t offset, uint64_t length) override;
|
|
virtual future<> allocate(uint64_t position, uint64_t length) override;
|
|
virtual future<uint64_t> size(void) override;
|
|
virtual future<> close() override;
|
|
virtual std::unique_ptr<seastar::file_handle_impl> dup() override;
|
|
virtual subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) override;
|
|
virtual future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, io_intent*) override;
|
|
};
|
|
|
|
traced_file_impl::traced_file_impl(file f, tracing::trace_state_ptr trace_state, sstring trace_prefix)
|
|
: file_impl(*get_file_impl(f)), _f(std::move(f)), _trace_state(std::move(trace_state)), _trace_prefix(std::move(trace_prefix)) {
|
|
}
|
|
|
|
future<size_t>
|
|
traced_file_impl::write_dma(uint64_t pos, const void* buf, size_t len, io_intent* intent) {
|
|
tracing::trace(_trace_state, "{} scheduling DMA write of {} bytes at position {}", _trace_prefix, len, pos);
|
|
return get_file_impl(_f)->write_dma(pos, buf, len, intent).then_wrapped([this, pos, len] (future<size_t> f) {
|
|
try {
|
|
auto ret = f.get();
|
|
tracing::trace(_trace_state, "{} finished DMA write of {} bytes at position {}, successfully wrote {} bytes", _trace_prefix, len, pos, ret);
|
|
return ret;
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed DMA write of {} bytes at position {}: {}", _trace_prefix, len, pos, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
future<size_t>
|
|
traced_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, io_intent* intent) {
|
|
tracing::trace(_trace_state, "{} scheduling DMA write at position {}", _trace_prefix, pos);
|
|
return get_file_impl(_f)->write_dma(pos, std::move(iov), intent).then_wrapped([this, pos] (future<size_t> f) {
|
|
try {
|
|
auto ret = f.get();
|
|
tracing::trace(_trace_state, "{} finished DMA write at position {}, successfully wrote {} bytes", _trace_prefix, pos, ret);
|
|
return ret;
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed DMA write of at position {}: {}", _trace_prefix, pos, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
future<size_t>
|
|
traced_file_impl::read_dma(uint64_t pos, void* buf, size_t len, io_intent* intent) {
|
|
tracing::trace(_trace_state, "{} scheduling DMA read of {} bytes at position {}", _trace_prefix, len, pos);
|
|
return get_file_impl(_f)->read_dma(pos, buf, len, intent).then_wrapped([this, pos, len] (future<size_t> f) {
|
|
try {
|
|
auto ret = f.get();
|
|
tracing::trace(_trace_state, "{} finished DMA read of {} bytes at position {}, successfully read {} bytes", _trace_prefix, len, pos, ret);
|
|
return ret;
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed DMA read of {} bytes at position {}: {}", _trace_prefix, len, pos, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
future<size_t>
|
|
traced_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, io_intent* intent) {
|
|
tracing::trace(_trace_state, "{} scheduling DMA read at position {}", _trace_prefix, pos);
|
|
return get_file_impl(_f)->read_dma(pos, std::move(iov), intent).then_wrapped([this, pos] (future<size_t> f) {
|
|
try {
|
|
auto ret = f.get();
|
|
tracing::trace(_trace_state, "{} finished DMA read at position {}, successfully read {} bytes", _trace_prefix, pos, ret);
|
|
return ret;
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed DMA read at position {}: {}", _trace_prefix, pos, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
future<>
|
|
traced_file_impl::flush(void) {
|
|
tracing::trace(_trace_state, "{} scheduling flush", _trace_prefix);
|
|
return get_file_impl(_f)->flush().then_wrapped([this] (future<> f) {
|
|
try {
|
|
f.get();
|
|
tracing::trace(_trace_state, "{} finished flush", _trace_prefix);
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed flush: {}", _trace_prefix, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
future<struct stat>
|
|
traced_file_impl::stat(void) {
|
|
tracing::trace(_trace_state, "{} scheduling file status retrieval", _trace_prefix);
|
|
return get_file_impl(_f)->stat().then_wrapped([this] (future<struct stat> f) {
|
|
try {
|
|
auto s = f.get();
|
|
tracing::trace(_trace_state, "{} finished file status retrieval", _trace_prefix);
|
|
return s;
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed to retrieve file status: {}", _trace_prefix, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
future<>
|
|
traced_file_impl::truncate(uint64_t length) {
|
|
tracing::trace(_trace_state, "{} scheduling truncate to {} bytes", _trace_prefix, length);
|
|
return get_file_impl(_f)->truncate(length).then_wrapped([this, length] (future<> f) {
|
|
try {
|
|
f.get();
|
|
tracing::trace(_trace_state, "{} finished truncate to {} bytes", _trace_prefix, length);
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed to truncate file to {} bytes: {}", _trace_prefix, length, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
future<>
|
|
traced_file_impl::discard(uint64_t offset, uint64_t length) {
|
|
tracing::trace(_trace_state, "{} scheduling discard of {} bytes at offset {}", _trace_prefix, length, offset);
|
|
return get_file_impl(_f)->discard(offset, length).then_wrapped([this, offset, length] (future<> f) {
|
|
try {
|
|
f.get();
|
|
tracing::trace(_trace_state, "{} finished discard of {} bytes at offset {}", _trace_prefix, length, offset);
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed to discard {} bytes at offset {}: {}", _trace_prefix, length, offset, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
future<>
|
|
traced_file_impl::allocate(uint64_t position, uint64_t length) {
|
|
tracing::trace(_trace_state, "{} scheduling allocation of {} bytes at position {}", _trace_prefix, length, position);
|
|
return get_file_impl(_f)->allocate(position, length).then_wrapped([this, position, length] (future<> f) {
|
|
try {
|
|
f.get();
|
|
tracing::trace(_trace_state, "{} finished allocation of {} bytes at position {}", _trace_prefix, length, position);
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed to allocate {} bytes at position {}: {}", _trace_prefix, length, position, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
future<uint64_t>
|
|
traced_file_impl::size(void) {
|
|
tracing::trace(_trace_state, "{} scheduling size retrieval", _trace_prefix);
|
|
return get_file_impl(_f)->size().then_wrapped([this] (future<uint64_t> f) {
|
|
try {
|
|
auto ret = f.get();
|
|
tracing::trace(_trace_state, "{} retrieved size = {}", _trace_prefix, ret);
|
|
return ret;
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed to retrieve size: {}", _trace_prefix, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
future<>
|
|
traced_file_impl::close() {
|
|
tracing::trace(_trace_state, "{} scheduling close", _trace_prefix);
|
|
return get_file_impl(_f)->close().then_wrapped([this] (future<> f) {
|
|
try {
|
|
f.get();
|
|
tracing::trace(_trace_state, "{} closed file", _trace_prefix);
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed to close: {}", _trace_prefix, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
std::unique_ptr<seastar::file_handle_impl>
|
|
traced_file_impl::dup() {
|
|
return get_file_impl(_f)->dup();
|
|
}
|
|
|
|
subscription<directory_entry>
|
|
traced_file_impl::list_directory(std::function<future<> (directory_entry de)> next) {
|
|
return get_file_impl(_f)->list_directory(std::move(next));
|
|
}
|
|
|
|
future<temporary_buffer<uint8_t>>
|
|
traced_file_impl::dma_read_bulk(uint64_t offset, size_t range_size, io_intent* intent) {
|
|
tracing::trace(_trace_state, "{} scheduling bulk DMA read of size {} at offset {}", _trace_prefix, range_size, offset);
|
|
return get_file_impl(_f)->dma_read_bulk(offset, range_size, intent).then_wrapped([this, offset, range_size] (future<temporary_buffer<uint8_t>> f) {
|
|
try {
|
|
auto ret = f.get();
|
|
tracing::trace(_trace_state, "{} finished bulk DMA read of size {} at offset {}, successfully read {} bytes",
|
|
_trace_prefix, range_size, offset, ret.size());
|
|
return ret;
|
|
} catch (...) {
|
|
tracing::trace(_trace_state, "{} failed a bulk DMA read of size {} at offset {}: {}",
|
|
_trace_prefix, range_size, offset, std::current_exception());
|
|
throw;
|
|
}
|
|
});
|
|
}
|
|
|
|
namespace tracing {
|
|
|
|
file make_traced_file(file f, trace_state_ptr trace_state, sstring trace_prefix) {
|
|
return file(::make_shared<traced_file_impl>(std::move(f), std::move(trace_state), std::move(trace_prefix)));
|
|
}
|
|
|
|
} // namespace tracing
|