mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 01:20:39 +00:00
reactor: add counters for file reads, writes, fsyncs, and threaded fallbacks
Reviewed-by: Pekka Enberg <penberg@cloudius-systems.com>
This commit is contained in:
@@ -374,6 +374,20 @@ reactor::submit_io(Func prepare_io) {
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
future<io_event>
|
||||
reactor::submit_io_read(Func prepare_io) {
|
||||
++_aio_reads;
|
||||
return submit_io(std::move(prepare_io));
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
future<io_event>
|
||||
reactor::submit_io_write(Func prepare_io) {
|
||||
++_aio_writes;
|
||||
return submit_io(std::move(prepare_io));
|
||||
}
|
||||
|
||||
bool reactor::process_io()
|
||||
{
|
||||
io_event ev[max_aio];
|
||||
@@ -391,7 +405,7 @@ bool reactor::process_io()
|
||||
|
||||
future<size_t>
|
||||
posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) {
|
||||
return engine().submit_io([this, pos, buffer, len] (iocb& io) {
|
||||
return engine().submit_io_write([this, pos, buffer, len] (iocb& io) {
|
||||
io_prep_pwrite(&io, _fd, const_cast<void*>(buffer), len, pos);
|
||||
}).then([] (io_event ev) {
|
||||
throw_kernel_error(long(ev.res));
|
||||
@@ -401,7 +415,7 @@ posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) {
|
||||
|
||||
future<size_t>
|
||||
posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov) {
|
||||
return engine().submit_io([this, pos, iov = std::move(iov)] (iocb& io) {
|
||||
return engine().submit_io_write([this, pos, iov = std::move(iov)] (iocb& io) {
|
||||
io_prep_pwritev(&io, _fd, iov.data(), iov.size(), pos);
|
||||
}).then([] (io_event ev) {
|
||||
throw_kernel_error(long(ev.res));
|
||||
@@ -411,7 +425,7 @@ posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov) {
|
||||
|
||||
future<size_t>
|
||||
posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len) {
|
||||
return engine().submit_io([this, pos, buffer, len] (iocb& io) {
|
||||
return engine().submit_io_read([this, pos, buffer, len] (iocb& io) {
|
||||
io_prep_pread(&io, _fd, buffer, len, pos);
|
||||
}).then([] (io_event ev) {
|
||||
throw_kernel_error(long(ev.res));
|
||||
@@ -421,7 +435,7 @@ posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len) {
|
||||
|
||||
future<size_t>
|
||||
posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov) {
|
||||
return engine().submit_io([this, pos, iov = std::move(iov)] (iocb& io) {
|
||||
return engine().submit_io_read([this, pos, iov = std::move(iov)] (iocb& io) {
|
||||
io_prep_preadv(&io, _fd, iov.data(), iov.size(), pos);
|
||||
}).then([] (io_event ev) {
|
||||
throw_kernel_error(long(ev.res));
|
||||
@@ -509,6 +523,7 @@ reactor::make_directory(sstring name) {
|
||||
|
||||
future<>
|
||||
posix_file_impl::flush(void) {
|
||||
++engine()._fsyncs;
|
||||
return engine()._thread_pool.submit<syscall_result<int>>([this] {
|
||||
return wrap_syscall<int>(::fsync(_fd));
|
||||
}).then([] (syscall_result<int> sr) {
|
||||
@@ -794,6 +809,31 @@ reactor::register_collectd_metrics() {
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE,
|
||||
[this] () -> uint32_t { return _load * 100; })
|
||||
),
|
||||
// total_operations value:DERIVE:0:U
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("reactor"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "aio-reads")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _aio_reads)
|
||||
),
|
||||
// total_operations value:DERIVE:0:U
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("reactor"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "aio-writes")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _aio_writes)
|
||||
),
|
||||
// total_operations value:DERIVE:0:U
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("reactor"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "fsyncs")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE, _fsyncs)
|
||||
),
|
||||
// total_operations value:DERIVE:0:U
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("reactor"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "total_operations", "io-threaded-fallbacks")
|
||||
, scollectd::make_typed(scollectd::data_type::DERIVE,
|
||||
std::bind(&thread_pool::operation_count, &_thread_pool))
|
||||
),
|
||||
scollectd::add_polled_metric(
|
||||
scollectd::type_instance_id("memory",
|
||||
scollectd::per_cpu_plugin_instance,
|
||||
|
||||
@@ -507,6 +507,7 @@ private:
|
||||
};
|
||||
|
||||
class thread_pool {
|
||||
uint64_t _aio_threaded_fallbacks = 0;
|
||||
#ifndef HAVE_OSV
|
||||
// FIXME: implement using reactor_notifier abstraction we used for SMP
|
||||
syscall_work_queue inter_thread_wq;
|
||||
@@ -518,6 +519,7 @@ public:
|
||||
~thread_pool();
|
||||
template <typename T, typename Func>
|
||||
future<T> submit(Func func) {return inter_thread_wq.submit<T>(std::move(func));}
|
||||
uint64_t operation_count() const { return _aio_threaded_fallbacks; }
|
||||
#else
|
||||
public:
|
||||
template <typename T, typename Func>
|
||||
@@ -671,6 +673,9 @@ private:
|
||||
seastar::timer_set<timer<lowres_clock>, &timer<lowres_clock>::_link>::timer_list_t _expired_lowres_timers;
|
||||
io_context_t _io_context;
|
||||
semaphore _io_context_available;
|
||||
uint64_t _aio_reads = 0;
|
||||
uint64_t _aio_writes = 0;
|
||||
uint64_t _fsyncs = 0;
|
||||
circular_buffer<std::unique_ptr<task>> _pending_tasks;
|
||||
circular_buffer<std::unique_ptr<task>> _at_destroy_tasks;
|
||||
size_t _task_quota;
|
||||
@@ -767,6 +772,10 @@ public:
|
||||
|
||||
template <typename Func>
|
||||
future<io_event> submit_io(Func prepare_io);
|
||||
template <typename Func>
|
||||
future<io_event> submit_io_read(Func prepare_io);
|
||||
template <typename Func>
|
||||
future<io_event> submit_io_write(Func prepare_io);
|
||||
|
||||
int run();
|
||||
void exit(int ret);
|
||||
|
||||
Reference in New Issue
Block a user