diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index b605fdd79f..603027f0d6 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -172,7 +172,8 @@ public: uint64_t bytes_slack = 0; uint64_t segments_created = 0; uint64_t segments_destroyed = 0; - uint64_t pending_operations = 0; + uint64_t pending_writes = 0; + uint64_t pending_flushes = 0; uint64_t total_size = 0; uint64_t buffer_list_bytes = 0; uint64_t total_size_on_disk = 0; @@ -180,12 +181,21 @@ public: stats totals; - void begin_op() { + void begin_write() { _gate.enter(); - ++totals.pending_operations; + ++totals.pending_writes; } - void end_op() { - --totals.pending_operations; + void end_write() { + --totals.pending_writes; + _gate.leave(); + } + + void begin_flush() { + _gate.enter(); + ++totals.pending_flushes; + } + void end_flush() { + --totals.pending_flushes; _gate.leave(); } @@ -462,8 +472,8 @@ public: logger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos); return make_ready_future(std::move(me)); } - _segment_manager->begin_op(); - return _file.flush().then_wrapped([this, pos, me](auto f) { + _segment_manager->begin_flush(); + return _file.flush().then_wrapped([this, pos, me](future<> f) { try { f.get(); // TODO: retry/ignore/fail/stop - optional behaviour in origin. @@ -477,7 +487,7 @@ public: throw; } }).finally([this, me] { - _segment_manager->end_op(); + _segment_manager->end_flush(); }); }); } @@ -565,10 +575,10 @@ public: return _dwrite.read_lock().then([this, size, off, buf = std::move(buf), me]() mutable { auto written = make_lw_shared(0); auto p = buf.get(); - _segment_manager->begin_op(); + _segment_manager->begin_write(); return repeat([this, size, off, written, p]() mutable { auto& priority_class = service::get_local_commitlog_priority(); - return _file.dma_write(off + *written, p + *written, size - *written, priority_class).then_wrapped([this, size, written](auto&& f) { + return _file.dma_write(off + *written, p + *written, size - *written, priority_class).then_wrapped([this, size, written](future&& f) { try { auto bytes = std::get<0>(f.get()); *written += bytes; @@ -592,7 +602,7 @@ public: }); }).finally([this, buf = std::move(buf)]() mutable { _segment_manager->release_buffer(std::move(buf)); - _segment_manager->end_op(); + _segment_manager->end_write(); }); }).then([me] { return make_ready_future(std::move(me)); @@ -849,8 +859,12 @@ scollectd::registrations db::commitlog::segment_manager::create_counters() { ), add_polled_metric(type_instance_id("commitlog" - , per_cpu_plugin_instance, "queue_length", "pending_operations") - , make_typed(data_type::GAUGE, totals.pending_operations) + , per_cpu_plugin_instance, "queue_length", "pending_writes") + , make_typed(data_type::GAUGE, totals.pending_writes) + ), + add_polled_metric(type_instance_id("commitlog" + , per_cpu_plugin_instance, "queue_length", "pending_flushes") + , make_typed(data_type::GAUGE, totals.pending_flushes) ), add_polled_metric(type_instance_id("commitlog" , per_cpu_plugin_instance, "memory", "total_size") @@ -1508,7 +1522,8 @@ uint64_t db::commitlog::get_flush_count() const { } uint64_t db::commitlog::get_pending_tasks() const { - return _segment_manager->totals.pending_operations; + return _segment_manager->totals.pending_writes + + _segment_manager->totals.pending_flushes; } uint64_t db::commitlog::get_num_segments_created() const {