diff --git a/streaming/stream_coordinator.cc b/streaming/stream_coordinator.cc index 1b3155ac4b..bd7582cf1e 100644 --- a/streaming/stream_coordinator.cc +++ b/streaming/stream_coordinator.cc @@ -37,8 +37,8 @@ bool stream_coordinator::has_active_sessions() { return false; } -std::vector> stream_coordinator::get_all_stream_sessions() { - std::vector> results; +std::vector> stream_coordinator::get_all_stream_sessions() { + std::vector> results; for (auto& x : _peer_sessions) { auto s = x.second.get_all_stream_sessions(); std::move(s.begin(), s.end(), std::back_inserter(results)); @@ -114,11 +114,11 @@ bool stream_coordinator::host_streaming_data::has_active_sessions() { return false; } -std::shared_ptr stream_coordinator::host_streaming_data::get_or_create_next_session(inet_address peer, inet_address connecting) { +shared_ptr stream_coordinator::host_streaming_data::get_or_create_next_session(inet_address peer, inet_address connecting) { // create int size = _stream_sessions.size(); if (size < _connections_per_host) { - auto session = std::make_shared(peer, connecting, size, _keep_ss_table_level); + auto session = make_shared(peer, connecting, size, _keep_ss_table_level); _stream_sessions.emplace(++_last_returned, session); return _stream_sessions[_last_returned]; // get @@ -130,19 +130,19 @@ std::shared_ptr stream_coordinator::host_streaming_data::get_or_ } } -std::vector> stream_coordinator::host_streaming_data::get_all_stream_sessions() { - std::vector> sessions; +std::vector> stream_coordinator::host_streaming_data::get_all_stream_sessions() { + std::vector> sessions; for (auto& x : _stream_sessions) { sessions.push_back(x.second); } return sessions; } -std::shared_ptr stream_coordinator::host_streaming_data::get_or_create_session_by_id(inet_address peer, +shared_ptr stream_coordinator::host_streaming_data::get_or_create_session_by_id(inet_address peer, int id, inet_address connecting) { auto it = _stream_sessions.find(id); if (it == _stream_sessions.end()) { - it = _stream_sessions.emplace(id, std::make_shared(peer, connecting, id, _keep_ss_table_level)).first; + it = _stream_sessions.emplace(id, make_shared(peer, connecting, id, _keep_ss_table_level)).first; } return it->second; } diff --git a/streaming/stream_coordinator.hh b/streaming/stream_coordinator.hh index 1f87c57c71..82a2adf5e4 100644 --- a/streaming/stream_coordinator.hh +++ b/streaming/stream_coordinator.hh @@ -63,7 +63,7 @@ public: */ bool has_active_sessions(); - std::vector> get_all_stream_sessions(); + std::vector> get_all_stream_sessions(); bool is_receiving(); @@ -76,11 +76,11 @@ public: std::set get_peers(); public: - std::shared_ptr get_or_create_next_session(inet_address peer, inet_address connecting) { + shared_ptr get_or_create_next_session(inet_address peer, inet_address connecting) { return get_or_create_host_data(peer).get_or_create_next_session(peer, connecting); } - std::shared_ptr get_or_create_session_by_id(inet_address peer, int id, inet_address connecting) { + shared_ptr get_or_create_session_by_id(inet_address peer, int id, inet_address connecting) { return get_or_create_host_data(peer).get_or_create_session_by_id(peer, id, connecting); } @@ -132,7 +132,7 @@ private: class host_streaming_data { using inet_address = gms::inet_address; private: - std::map> _stream_sessions; + std::map> _stream_sessions; std::map _session_infos; int _last_returned = -1; int _connections_per_host; @@ -148,7 +148,7 @@ private: bool has_active_sessions(); - std::shared_ptr get_or_create_next_session(inet_address peer, inet_address connecting); + shared_ptr get_or_create_next_session(inet_address peer, inet_address connecting); void connect_all_stream_sessions() { for (auto& x : _stream_sessions) { @@ -158,9 +158,9 @@ private: } } - std::vector> get_all_stream_sessions(); + std::vector> get_all_stream_sessions(); - std::shared_ptr get_or_create_session_by_id(inet_address peer, int id, inet_address connecting); + shared_ptr get_or_create_session_by_id(inet_address peer, int id, inet_address connecting); void update_progress(progress_info info); diff --git a/streaming/stream_event.hh b/streaming/stream_event.hh index 0503dbe07f..cb28c088e0 100644 --- a/streaming/stream_event.hh +++ b/streaming/stream_event.hh @@ -53,7 +53,7 @@ struct session_complete_event : public stream_event { bool success; int session_index; - session_complete_event(std::shared_ptr session) + session_complete_event(shared_ptr session) : stream_event(stream_event::type::STREAM_COMPLETE, session->plan_id()) , peer(session->peer) , success(session->is_success()) diff --git a/streaming/stream_reader.cc b/streaming/stream_reader.cc index b802170e4c..e996668821 100644 --- a/streaming/stream_reader.cc +++ b/streaming/stream_reader.cc @@ -22,6 +22,19 @@ namespace streaming { +stream_reader::stream_reader(file_message_header header, shared_ptr session_) + : cf_id(header.cf_id) + , estimated_keys(header.estimated_keys) + , sections(header.sections) + , session(session_) + // input_version = header.format.info.getVersion(header.version) + , repaired_at(header.repaired_at) + , format(header.format) + , sstable_level(header.sstable_level) { +} + +stream_reader::~stream_reader() = default; + int64_t stream_reader::total_size() { int64_t size = 0; for (auto section : sections) diff --git a/streaming/stream_reader.hh b/streaming/stream_reader.hh index 36f407c770..b1e041c3e0 100644 --- a/streaming/stream_reader.hh +++ b/streaming/stream_reader.hh @@ -40,7 +40,7 @@ protected: UUID cf_id; int64_t estimated_keys; std::map sections; - std::shared_ptr session; + shared_ptr session; // FIXME: Version version_types input_version; int64_t repaired_at; @@ -49,16 +49,8 @@ protected: // FIXME: Descriptor //Descriptor desc; public: - stream_reader(file_message_header header, std::shared_ptr session_) - : cf_id(header.cf_id) - , estimated_keys(header.estimated_keys) - , sections(header.sections) - , session(session_) - // input_version = header.format.info.getVersion(header.version) - , repaired_at(header.repaired_at) - , format(header.format) - , sstable_level(header.sstable_level) { - } + stream_reader(file_message_header header, shared_ptr session_); + ~stream_reader(); #if 0 /** diff --git a/streaming/stream_receive_task.cc b/streaming/stream_receive_task.cc index 7b594ff543..d3b9a9ad60 100644 --- a/streaming/stream_receive_task.cc +++ b/streaming/stream_receive_task.cc @@ -24,7 +24,7 @@ namespace streaming { -stream_receive_task::stream_receive_task(std::shared_ptr _session, UUID _cf_id, int _total_files, long _total_size) +stream_receive_task::stream_receive_task(shared_ptr _session, UUID _cf_id, int _total_files, long _total_size) : stream_task(_session, _cf_id) , total_files(_total_files) , total_size(_total_size) { diff --git a/streaming/stream_receive_task.hh b/streaming/stream_receive_task.hh index 25ae1e5279..021a7eb3af 100644 --- a/streaming/stream_receive_task.hh +++ b/streaming/stream_receive_task.hh @@ -46,7 +46,7 @@ private: // holds references to SSTables received // protected Collection sstables; public: - stream_receive_task(std::shared_ptr _session, UUID _cf_id, int _total_files, long _total_size); + stream_receive_task(shared_ptr _session, UUID _cf_id, int _total_files, long _total_size); ~stream_receive_task(); /** diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 13a5c97cf9..3cf9c31c81 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -389,7 +389,7 @@ std::vector stream_session::get_column_family_stores(const sstri abort(); // FIXME: stores.addAll(Keyspace.open(keyspace).getColumnFamilyStores()); } else { - // TODO: We can move this to database class and use std::shared_ptr instead + // TODO: We can move this to database class and use shared_ptr instead for (auto& cf_name : column_families) { auto& x = db.find_column_family(keyspace, cf_name); stores.push_back(&x); diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 93478b9487..bf2827ad03 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -115,7 +115,7 @@ void streaming_debug(const char* fmt, Args&&... args) { * session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and * send a CompleteMessage to the other side. */ -class stream_session : public gms::i_endpoint_state_change_subscriber, public std::enable_shared_from_this { +class stream_session : public gms::i_endpoint_state_change_subscriber, public enable_shared_from_this { private: using messaging_verb = net::messaging_verb; using messaging_service = net::messaging_service; diff --git a/streaming/stream_task.cc b/streaming/stream_task.cc index e5509d6137..2eed45b600 100644 --- a/streaming/stream_task.cc +++ b/streaming/stream_task.cc @@ -24,7 +24,7 @@ namespace streaming { -stream_task::stream_task(std::shared_ptr _session, UUID _cf_id) +stream_task::stream_task(shared_ptr _session, UUID _cf_id) : session(_session) , cf_id(std::move(_cf_id)) { } diff --git a/streaming/stream_task.hh b/streaming/stream_task.hh index d7b5e22285..bfc24977e6 100644 --- a/streaming/stream_task.hh +++ b/streaming/stream_task.hh @@ -24,6 +24,7 @@ #include "utils/UUID.hh" #include "streaming/stream_summary.hh" #include +#include "core/shared_ptr.hh" namespace streaming { @@ -36,11 +37,11 @@ class stream_task { public: using UUID = utils::UUID; /** StreamSession that this task belongs */ - std::shared_ptr session; + shared_ptr session; UUID cf_id; - stream_task(std::shared_ptr _session, UUID _cf_id); + stream_task(shared_ptr _session, UUID _cf_id); ~stream_task(); public: diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index af8173a244..fdc920b66d 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -30,7 +30,7 @@ namespace streaming { -stream_transfer_task::stream_transfer_task(std::shared_ptr session, UUID cf_id) +stream_transfer_task::stream_transfer_task(shared_ptr session, UUID cf_id) : stream_task(session, cf_id) { } diff --git a/streaming/stream_transfer_task.hh b/streaming/stream_transfer_task.hh index 7e627daa40..06eee576dd 100644 --- a/streaming/stream_transfer_task.hh +++ b/streaming/stream_transfer_task.hh @@ -46,7 +46,7 @@ private: long total_size; public: using UUID = utils::UUID; - stream_transfer_task(std::shared_ptr session, UUID cf_id); + stream_transfer_task(shared_ptr session, UUID cf_id); ~stream_transfer_task(); void add_transfer_file(stream_detail detail); diff --git a/streaming/stream_writer.hh b/streaming/stream_writer.hh index 2c92b0f796..dd8e6f48be 100644 --- a/streaming/stream_writer.hh +++ b/streaming/stream_writer.hh @@ -36,7 +36,7 @@ protected: sstables::sstable& sstable; std::map sections; //StreamRateLimiter limiter; - std::shared_ptr session; + shared_ptr session; #if 0 private OutputStream compressedOutput; // allocate buffer to use for transfers only once @@ -44,7 +44,7 @@ protected: #endif public: - stream_writer(sstables::sstable& sstable_, std::map sections_, std::shared_ptr session_) + stream_writer(sstables::sstable& sstable_, std::map sections_, shared_ptr session_) : sstable(sstable_) , sections(std::move(sections_)) , session(session_) {