streaming: Kill stream_result_future::create_and_register

The helper is used only once in init_sending_side and in
init_receiving_side we do not use create_and_register to create
stream_result_future. Kill the trivial helper to make the code more
consistent.

In addition, rename variables "future" and "f" to sr (streaming_result).
This commit is contained in:
Asias He
2016-01-22 17:07:07 +08:00
parent face74a8f2
commit bc4ac5004e
2 changed files with 11 additions and 19 deletions

View File

@@ -44,28 +44,30 @@ namespace streaming {
extern logging::logger sslog;
future<stream_state> stream_result_future::init_sending_side(UUID plan_id_, sstring description_, std::vector<stream_event_handler*> listeners_, shared_ptr<stream_coordinator> coordinator_) {
auto future = create_and_register(plan_id_, description_, coordinator_);
future<stream_state> stream_result_future::init_sending_side(UUID plan_id_, sstring description_,
std::vector<stream_event_handler*> listeners_, shared_ptr<stream_coordinator> coordinator_) {
auto sr = make_shared<stream_result_future>(plan_id_, description_, coordinator_);
get_local_stream_manager().register_sending(sr);
for (auto& listener : listeners_) {
future->add_event_listener(listener);
sr->add_event_listener(listener);
}
sslog.info("[Stream #{}] Executing streaming plan for {}", plan_id_, description_);
// Initialize and start all sessions
for (auto& session : coordinator_->get_all_stream_sessions()) {
session->init(future);
session->init(sr);
}
coordinator_->connect_all_stream_sessions();
return future->_done.get_future();
return sr->_done.get_future();
}
void stream_result_future::init_receiving_side(UUID plan_id,
sstring description, inet_address from) {
void stream_result_future::init_receiving_side(UUID plan_id, sstring description, inet_address from) {
auto& sm = get_local_stream_manager();
auto f = sm.get_receiving_stream(plan_id);
if (f == nullptr) {
auto sr = sm.get_receiving_stream(plan_id);
if (sr == nullptr) {
sslog.info("[Stream #{}] Creating new streaming plan for {}, with {}", plan_id, description, from);
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
// TODO: stream_result_future needs a ref to stream_coordinator.
@@ -134,11 +136,4 @@ void stream_result_future::handle_progress(progress_info progress) {
fire_stream_event(progress_event(plan_id, std::move(progress)));
}
shared_ptr<stream_result_future> stream_result_future::create_and_register(UUID plan_id_, sstring description_, shared_ptr<stream_coordinator> coordinator_) {
auto future = make_shared<stream_result_future>(plan_id_, description_, coordinator_);
auto& sm = get_local_stream_manager();
sm.register_sending(future);
return future;
}
} // namespace streaming

View File

@@ -104,9 +104,6 @@ public:
static future<stream_state> init_sending_side(UUID plan_id_, sstring description_, std::vector<stream_event_handler*> listeners_, shared_ptr<stream_coordinator> coordinator_);
static void init_receiving_side(UUID plan_id, sstring description, inet_address from);
private:
static shared_ptr<stream_result_future> create_and_register(UUID plan_id_, sstring description_, shared_ptr<stream_coordinator> coordinator_);
public:
void add_event_listener(stream_event_handler* listener) {
// FIXME: Futures.addCallback(this, listener);