diff --git a/core/reactor.cc b/core/reactor.cc index c92fa4011f..ef75b2daec 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -374,6 +374,20 @@ reactor::submit_io(Func prepare_io) { }); } +template +future +reactor::submit_io_read(Func prepare_io) { + ++_aio_reads; + return submit_io(std::move(prepare_io)); +} + +template +future +reactor::submit_io_write(Func prepare_io) { + ++_aio_writes; + return submit_io(std::move(prepare_io)); +} + bool reactor::process_io() { io_event ev[max_aio]; @@ -391,7 +405,7 @@ bool reactor::process_io() future posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) { - return engine().submit_io([this, pos, buffer, len] (iocb& io) { + return engine().submit_io_write([this, pos, buffer, len] (iocb& io) { io_prep_pwrite(&io, _fd, const_cast(buffer), len, pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -401,7 +415,7 @@ posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) { future posix_file_impl::write_dma(uint64_t pos, std::vector iov) { - return engine().submit_io([this, pos, iov = std::move(iov)] (iocb& io) { + return engine().submit_io_write([this, pos, iov = std::move(iov)] (iocb& io) { io_prep_pwritev(&io, _fd, iov.data(), iov.size(), pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -411,7 +425,7 @@ posix_file_impl::write_dma(uint64_t pos, std::vector iov) { future posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len) { - return engine().submit_io([this, pos, buffer, len] (iocb& io) { + return engine().submit_io_read([this, pos, buffer, len] (iocb& io) { io_prep_pread(&io, _fd, buffer, len, pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -421,7 +435,7 @@ posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len) { future posix_file_impl::read_dma(uint64_t pos, std::vector iov) { - return engine().submit_io([this, pos, iov = std::move(iov)] (iocb& io) { + return engine().submit_io_read([this, pos, iov = std::move(iov)] (iocb& io) { io_prep_preadv(&io, _fd, iov.data(), iov.size(), pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -509,6 +523,7 @@ reactor::make_directory(sstring name) { future<> posix_file_impl::flush(void) { + ++engine()._fsyncs; return engine()._thread_pool.submit>([this] { return wrap_syscall(::fsync(_fd)); }).then([] (syscall_result sr) { @@ -794,6 +809,31 @@ reactor::register_collectd_metrics() { , scollectd::make_typed(scollectd::data_type::GAUGE, [this] () -> uint32_t { return _load * 100; }) ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("reactor" + , scollectd::per_cpu_plugin_instance + , "total_operations", "aio-reads") + , scollectd::make_typed(scollectd::data_type::DERIVE, _aio_reads) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("reactor" + , scollectd::per_cpu_plugin_instance + , "total_operations", "aio-writes") + , scollectd::make_typed(scollectd::data_type::DERIVE, _aio_writes) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("reactor" + , scollectd::per_cpu_plugin_instance + , "total_operations", "fsyncs") + , scollectd::make_typed(scollectd::data_type::DERIVE, _fsyncs) + ), + // total_operations value:DERIVE:0:U + scollectd::add_polled_metric(scollectd::type_instance_id("reactor" + , scollectd::per_cpu_plugin_instance + , "total_operations", "io-threaded-fallbacks") + , scollectd::make_typed(scollectd::data_type::DERIVE, + std::bind(&thread_pool::operation_count, &_thread_pool)) + ), scollectd::add_polled_metric( scollectd::type_instance_id("memory", scollectd::per_cpu_plugin_instance, diff --git a/core/reactor.hh b/core/reactor.hh index 064ae62a9e..7cf7dcd9cf 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -507,6 +507,7 @@ private: }; class thread_pool { + uint64_t _aio_threaded_fallbacks = 0; #ifndef HAVE_OSV // FIXME: implement using reactor_notifier abstraction we used for SMP syscall_work_queue inter_thread_wq; @@ -518,6 +519,7 @@ public: ~thread_pool(); template future submit(Func func) {return inter_thread_wq.submit(std::move(func));} + uint64_t operation_count() const { return _aio_threaded_fallbacks; } #else public: template @@ -671,6 +673,9 @@ private: seastar::timer_set, &timer::_link>::timer_list_t _expired_lowres_timers; io_context_t _io_context; semaphore _io_context_available; + uint64_t _aio_reads = 0; + uint64_t _aio_writes = 0; + uint64_t _fsyncs = 0; circular_buffer> _pending_tasks; circular_buffer> _at_destroy_tasks; size_t _task_quota; @@ -767,6 +772,10 @@ public: template future submit_io(Func prepare_io); + template + future submit_io_read(Func prepare_io); + template + future submit_io_write(Func prepare_io); int run(); void exit(int ret);