mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-28 10:41:12 +00:00
streaming: Implement stream_result_future::handle_session_prepared
Instead of playing the game of casting between stream_event and derived
class. We overload handle_stream_event with derived stream_event class.
virtual void handle_stream_event(session_complete_event event) {}
virtual void handle_stream_event(progress_event event) {}
virtual void handle_stream_event(session_prepared_event event) {}
Also, make the virtual function non pure virtual, so user can override
the interested event only without defining all of the three.
This commit is contained in:
@@ -33,7 +33,9 @@ public:
|
||||
* @see StreamEvent.Type
|
||||
* @param event Stream event.
|
||||
*/
|
||||
virtual void handle_stream_event(stream_event event) = 0;
|
||||
virtual void handle_stream_event(session_complete_event event) {}
|
||||
virtual void handle_stream_event(progress_event event) {}
|
||||
virtual void handle_stream_event(session_prepared_event event) {}
|
||||
};
|
||||
|
||||
} // namespace streaming
|
||||
|
||||
@@ -54,4 +54,26 @@ void stream_result_future::init_receiving_side(int session_index, UUID plan_id,
|
||||
sslog.info("[Stream #{}, ID#{}] Received streaming plan for {}", plan_id, session_index, description);
|
||||
}
|
||||
|
||||
void stream_result_future::handle_session_prepared(shared_ptr<stream_session> session) {
|
||||
auto si = session->get_session_info();
|
||||
sslog.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
|
||||
session->plan_id(),
|
||||
session->session_index(),
|
||||
si.get_total_files_to_receive(),
|
||||
si.get_total_size_to_receive(),
|
||||
si.get_total_files_to_send(),
|
||||
si.get_total_size_to_send());
|
||||
auto event = session_prepared_event(plan_id, si);
|
||||
_coordinator->add_session_info(std::move(si));
|
||||
fire_stream_event(std::move(event));
|
||||
}
|
||||
|
||||
template <typename Event>
|
||||
void stream_result_future::fire_stream_event(Event event) {
|
||||
// delegate to listener
|
||||
for (auto listener : _event_listeners) {
|
||||
listener->handle_stream_event(std::move(event));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace streaming
|
||||
|
||||
@@ -120,21 +120,11 @@ public:
|
||||
{
|
||||
return planId.hashCode();
|
||||
}
|
||||
#endif
|
||||
|
||||
void handleSessionPrepared(StreamSession session)
|
||||
{
|
||||
SessionInfo sessionInfo = session.getSessionInfo();
|
||||
logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
|
||||
session.planId(),
|
||||
session.sessionIndex(),
|
||||
sessionInfo.getTotalFilesToReceive(),
|
||||
sessionInfo.getTotalSizeToReceive(),
|
||||
sessionInfo.getTotalFilesToSend(),
|
||||
sessionInfo.getTotalSizeToSend());
|
||||
StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
|
||||
coordinator.addSessionInfo(sessionInfo);
|
||||
fireStreamEvent(event);
|
||||
}
|
||||
void handle_session_prepared(shared_ptr<stream_session> session);
|
||||
|
||||
#if 0
|
||||
|
||||
void handleSessionComplete(StreamSession session)
|
||||
{
|
||||
@@ -150,14 +140,12 @@ public:
|
||||
coordinator.updateProgress(progress);
|
||||
fireStreamEvent(new StreamEvent.ProgressEvent(planId, progress));
|
||||
}
|
||||
#endif
|
||||
|
||||
synchronized void fireStreamEvent(StreamEvent event)
|
||||
{
|
||||
// delegate to listener
|
||||
for (StreamEventHandler listener : eventListeners)
|
||||
listener.handleStreamEvent(event);
|
||||
}
|
||||
template <typename Event>
|
||||
void fire_stream_event(Event event);
|
||||
|
||||
#if 0
|
||||
private synchronized void maybeComplete()
|
||||
{
|
||||
if (!coordinator.hasActiveSessions())
|
||||
|
||||
Reference in New Issue
Block a user