mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 03:56:42 +00:00
commitlog: split write/flush counters
This commit is contained in:
@@ -172,7 +172,8 @@ public:
|
||||
uint64_t bytes_slack = 0;
|
||||
uint64_t segments_created = 0;
|
||||
uint64_t segments_destroyed = 0;
|
||||
uint64_t pending_operations = 0;
|
||||
uint64_t pending_writes = 0;
|
||||
uint64_t pending_flushes = 0;
|
||||
uint64_t total_size = 0;
|
||||
uint64_t buffer_list_bytes = 0;
|
||||
uint64_t total_size_on_disk = 0;
|
||||
@@ -180,12 +181,21 @@ public:
|
||||
|
||||
stats totals;
|
||||
|
||||
void begin_op() {
|
||||
void begin_write() {
|
||||
_gate.enter();
|
||||
++totals.pending_operations;
|
||||
++totals.pending_writes;
|
||||
}
|
||||
void end_op() {
|
||||
--totals.pending_operations;
|
||||
void end_write() {
|
||||
--totals.pending_writes;
|
||||
_gate.leave();
|
||||
}
|
||||
|
||||
void begin_flush() {
|
||||
_gate.enter();
|
||||
++totals.pending_flushes;
|
||||
}
|
||||
void end_flush() {
|
||||
--totals.pending_flushes;
|
||||
_gate.leave();
|
||||
}
|
||||
|
||||
@@ -462,8 +472,8 @@ public:
|
||||
logger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
|
||||
return make_ready_future<sseg_ptr>(std::move(me));
|
||||
}
|
||||
_segment_manager->begin_op();
|
||||
return _file.flush().then_wrapped([this, pos, me](auto f) {
|
||||
_segment_manager->begin_flush();
|
||||
return _file.flush().then_wrapped([this, pos, me](future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
|
||||
@@ -477,7 +487,7 @@ public:
|
||||
throw;
|
||||
}
|
||||
}).finally([this, me] {
|
||||
_segment_manager->end_op();
|
||||
_segment_manager->end_flush();
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -565,10 +575,10 @@ public:
|
||||
return _dwrite.read_lock().then([this, size, off, buf = std::move(buf), me]() mutable {
|
||||
auto written = make_lw_shared<size_t>(0);
|
||||
auto p = buf.get();
|
||||
_segment_manager->begin_op();
|
||||
_segment_manager->begin_write();
|
||||
return repeat([this, size, off, written, p]() mutable {
|
||||
auto& priority_class = service::get_local_commitlog_priority();
|
||||
return _file.dma_write(off + *written, p + *written, size - *written, priority_class).then_wrapped([this, size, written](auto&& f) {
|
||||
return _file.dma_write(off + *written, p + *written, size - *written, priority_class).then_wrapped([this, size, written](future<size_t>&& f) {
|
||||
try {
|
||||
auto bytes = std::get<0>(f.get());
|
||||
*written += bytes;
|
||||
@@ -592,7 +602,7 @@ public:
|
||||
});
|
||||
}).finally([this, buf = std::move(buf)]() mutable {
|
||||
_segment_manager->release_buffer(std::move(buf));
|
||||
_segment_manager->end_op();
|
||||
_segment_manager->end_write();
|
||||
});
|
||||
}).then([me] {
|
||||
return make_ready_future<sseg_ptr>(std::move(me));
|
||||
@@ -849,8 +859,12 @@ scollectd::registrations db::commitlog::segment_manager::create_counters() {
|
||||
),
|
||||
|
||||
add_polled_metric(type_instance_id("commitlog"
|
||||
, per_cpu_plugin_instance, "queue_length", "pending_operations")
|
||||
, make_typed(data_type::GAUGE, totals.pending_operations)
|
||||
, per_cpu_plugin_instance, "queue_length", "pending_writes")
|
||||
, make_typed(data_type::GAUGE, totals.pending_writes)
|
||||
),
|
||||
add_polled_metric(type_instance_id("commitlog"
|
||||
, per_cpu_plugin_instance, "queue_length", "pending_flushes")
|
||||
, make_typed(data_type::GAUGE, totals.pending_flushes)
|
||||
),
|
||||
add_polled_metric(type_instance_id("commitlog"
|
||||
, per_cpu_plugin_instance, "memory", "total_size")
|
||||
@@ -1508,7 +1522,8 @@ uint64_t db::commitlog::get_flush_count() const {
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::get_pending_tasks() const {
|
||||
return _segment_manager->totals.pending_operations;
|
||||
return _segment_manager->totals.pending_writes
|
||||
+ _segment_manager->totals.pending_flushes;
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::get_num_segments_created() const {
|
||||
|
||||
Reference in New Issue
Block a user