From d0dffd734616e7c4d29be6dbf1f69324ac3ed40c Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 17 Jul 2017 15:24:51 +0800 Subject: [PATCH] streaming: Better stats Log the number of bytes streamed and streaming bandwidth summary in the same line with session complete message. --- streaming/stream_result_future.cc | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/streaming/stream_result_future.cc b/streaming/stream_result_future.cc index 0ce4d1b977..6e940403cc 100644 --- a/streaming/stream_result_future.cc +++ b/streaming/stream_result_future.cc @@ -54,7 +54,7 @@ future stream_result_future::init_sending_side(UUID plan_id_, sstr sr->add_event_listener(listener); } - sslog.info("[Stream #{}] Executing streaming plan for {}", plan_id_, description_); + sslog.info("[Stream #{}] Executing streaming plan for {} with peers={}, master", plan_id_, description_, coordinator_->get_peers()); // Initialize and start all sessions for (auto& session : coordinator_->get_all_stream_sessions()) { @@ -74,7 +74,7 @@ shared_ptr stream_result_future::init_receiving_side(UUID sslog.warn(err.c_str()); throw std::runtime_error(err); } - sslog.info("[Stream #{}] Creating new streaming plan for {}, with {}", plan_id, description, from); + sslog.info("[Stream #{}] Executing streaming plan for {} with peers={}, slave", plan_id, description, from); bool is_receiving = true; sr = make_shared(plan_id, description, is_receiving); sm.register_receiving(sr); @@ -120,25 +120,25 @@ void stream_result_future::maybe_complete() { sm.show_streams(); } auto duration = std::chrono::duration_cast>(lowres_clock::now() - _start_time).count(); - sm.get_progress_on_all_shards(plan_id).then([plan_id, duration] (auto sbytes) { - auto tx_bw = sstring("+inf"); - auto rx_bw = sstring("+inf"); + auto stats = make_lw_shared(""); + sm.get_progress_on_all_shards(plan_id).then([plan_id, duration, stats] (auto sbytes) { + auto tx_bw = sstring("0"); + auto rx_bw = sstring("0"); if (std::fabs(duration) > FLT_EPSILON) { - tx_bw = sprint("%.3f", sbytes.bytes_sent / duration / (1024 * 1024)); - rx_bw = sprint("%.3f", sbytes.bytes_received / duration / (1024 * 1024)); + tx_bw = sprint("%.2f", sbytes.bytes_sent / duration / 1024); + rx_bw = sprint("%.2f", sbytes.bytes_received / duration / 1024); } - sslog.info("[Stream #{}] bytes_sent = {}, bytes_received = {}, tx_bandwidth = {} MiB/s, rx_bandwidth = {} MiB/s", - plan_id, sbytes.bytes_sent, sbytes.bytes_received, tx_bw, rx_bw); + *stats = sprint("tx=%ld KiB, %s KiB/s, rx=%ld KiB, %s KiB/s", sbytes.bytes_sent / 1024, tx_bw, sbytes.bytes_received / 1024, rx_bw); }).handle_exception([plan_id] (auto ep) { sslog.warn("[Stream #{}] Fail to get progess on all shards: {}", plan_id, ep); - }).finally([this, plan_id, &sm] { + }).finally([this, plan_id, stats, &sm] () { sm.remove_stream(plan_id); auto final_state = get_current_state(); if (final_state.has_failed_session()) { - sslog.warn("[Stream #{}] Stream failed for streaming plan {}, peers={}", plan_id, description, _coordinator->get_peers()); + sslog.warn("[Stream #{}] Streaming plan for {} failed, peers={}, {}", plan_id, description, _coordinator->get_peers(), *stats); _done.set_exception(stream_exception(final_state, "Stream failed")); } else { - sslog.info("[Stream #{}] All sessions completed for streaming plan {}, peers={}", plan_id, description, _coordinator->get_peers()); + sslog.info("[Stream #{}] Streaming plan for {} succeeded, peers={}, {}", plan_id, description, _coordinator->get_peers(), *stats); _done.set_value(final_state); } });