streaming: Better stats

Log the number of bytes streamed and streaming bandwidth summary in the same line with session
complete message.
This commit is contained in:
Asias He
2017-07-17 15:24:51 +08:00
parent 64ef7aa5e4
commit d0dffd7346

View File

@@ -54,7 +54,7 @@ future<stream_state> stream_result_future::init_sending_side(UUID plan_id_, sstr
sr->add_event_listener(listener);
}
sslog.info("[Stream #{}] Executing streaming plan for {}", plan_id_, description_);
sslog.info("[Stream #{}] Executing streaming plan for {} with peers={}, master", plan_id_, description_, coordinator_->get_peers());
// Initialize and start all sessions
for (auto& session : coordinator_->get_all_stream_sessions()) {
@@ -74,7 +74,7 @@ shared_ptr<stream_result_future> stream_result_future::init_receiving_side(UUID
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
sslog.info("[Stream #{}] Creating new streaming plan for {}, with {}", plan_id, description, from);
sslog.info("[Stream #{}] Executing streaming plan for {} with peers={}, slave", plan_id, description, from);
bool is_receiving = true;
sr = make_shared<stream_result_future>(plan_id, description, is_receiving);
sm.register_receiving(sr);
@@ -120,25 +120,25 @@ void stream_result_future::maybe_complete() {
sm.show_streams();
}
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(lowres_clock::now() - _start_time).count();
sm.get_progress_on_all_shards(plan_id).then([plan_id, duration] (auto sbytes) {
auto tx_bw = sstring("+inf");
auto rx_bw = sstring("+inf");
auto stats = make_lw_shared<sstring>("");
sm.get_progress_on_all_shards(plan_id).then([plan_id, duration, stats] (auto sbytes) {
auto tx_bw = sstring("0");
auto rx_bw = sstring("0");
if (std::fabs(duration) > FLT_EPSILON) {
tx_bw = sprint("%.3f", sbytes.bytes_sent / duration / (1024 * 1024));
rx_bw = sprint("%.3f", sbytes.bytes_received / duration / (1024 * 1024));
tx_bw = sprint("%.2f", sbytes.bytes_sent / duration / 1024);
rx_bw = sprint("%.2f", sbytes.bytes_received / duration / 1024);
}
sslog.info("[Stream #{}] bytes_sent = {}, bytes_received = {}, tx_bandwidth = {} MiB/s, rx_bandwidth = {} MiB/s",
plan_id, sbytes.bytes_sent, sbytes.bytes_received, tx_bw, rx_bw);
*stats = sprint("tx=%ld KiB, %s KiB/s, rx=%ld KiB, %s KiB/s", sbytes.bytes_sent / 1024, tx_bw, sbytes.bytes_received / 1024, rx_bw);
}).handle_exception([plan_id] (auto ep) {
sslog.warn("[Stream #{}] Fail to get progess on all shards: {}", plan_id, ep);
}).finally([this, plan_id, &sm] {
}).finally([this, plan_id, stats, &sm] () {
sm.remove_stream(plan_id);
auto final_state = get_current_state();
if (final_state.has_failed_session()) {
sslog.warn("[Stream #{}] Stream failed for streaming plan {}, peers={}", plan_id, description, _coordinator->get_peers());
sslog.warn("[Stream #{}] Streaming plan for {} failed, peers={}, {}", plan_id, description, _coordinator->get_peers(), *stats);
_done.set_exception(stream_exception(final_state, "Stream failed"));
} else {
sslog.info("[Stream #{}] All sessions completed for streaming plan {}, peers={}", plan_id, description, _coordinator->get_peers());
sslog.info("[Stream #{}] Streaming plan for {} succeeded, peers={}, {}", plan_id, description, _coordinator->get_peers(), *stats);
_done.set_value(final_state);
}
});