diff --git a/streaming/messages/prepare_message.cc b/streaming/messages/prepare_message.cc index 51f27d8951..957c5276a4 100644 --- a/streaming/messages/prepare_message.cc +++ b/streaming/messages/prepare_message.cc @@ -41,14 +41,14 @@ void prepare_message::serialize(bytes::iterator& out) const { prepare_message prepare_message::deserialize(bytes_view& v) { auto num = read_simple(v); - std::vector requests_(num); + std::vector requests_; for (int32_t i = 0; i < num; i++) { auto r = stream_request::deserialize(v); requests_.push_back(std::move(r)); } num = read_simple(v); - std::vector summaries_(num); + std::vector summaries_; for (int32_t i = 0; i < num; i++) { auto s = stream_summary::deserialize(v); summaries_.push_back(std::move(s)); diff --git a/streaming/stream_coordinator.cc b/streaming/stream_coordinator.cc index 43a660e035..9aa5c52abf 100644 --- a/streaming/stream_coordinator.cc +++ b/streaming/stream_coordinator.cc @@ -181,7 +181,7 @@ void stream_coordinator::host_streaming_data::connect_all_stream_sessions() { for (auto& x : _stream_sessions) { auto& session = x.second; session->start(); - sslog.info("[Stream #{}, ID#{}] Beginning stream session with {}", session->plan_id(), session->session_index(), session->peer); + sslog.info("[Stream #{} ID#{}] Beginning stream session with {}", session->plan_id(), session->session_index(), session->peer); } } } // namespace streaming diff --git a/streaming/stream_request.cc b/streaming/stream_request.cc index c1fe28f730..960e856553 100644 --- a/streaming/stream_request.cc +++ b/streaming/stream_request.cc @@ -43,13 +43,14 @@ stream_request stream_request::deserialize(bytes_view& v) { auto keyspace_ = read_simple_short_string(v); auto num = read_simple(v); - std::vector> ranges_(num); + std::vector> ranges_; for (int32_t i = 0; i < num; i++) { // FIXME: query::range + ranges_.push_back(query::range::make_open_ended_both_sides()); } num = read_simple(v); - std::vector column_families_(num); + std::vector column_families_; for (int32_t i = 0; i < num; i++) { auto s = read_simple_short_string(v); column_families_.push_back(std::move(s)); @@ -78,4 +79,12 @@ size_t stream_request::serialized_size() const { return size; } +std::ostream& operator<<(std::ostream& os, const stream_request& sr) { + os << "[ ks = " << sr.keyspace << " cf = "; + for (auto& cf : sr.column_families) { + os << cf << " "; + } + return os << "]"; +} + } // namespace streaming; diff --git a/streaming/stream_request.hh b/streaming/stream_request.hh index 77e35ab79b..4d95b57fb8 100644 --- a/streaming/stream_request.hh +++ b/streaming/stream_request.hh @@ -42,6 +42,7 @@ public: , column_families(std::move(_column_families)) , repaired_at(_repaired_at) { } + friend std::ostream& operator<<(std::ostream& os, const stream_request& r); public: void serialize(bytes::iterator& out) const; static stream_request deserialize(bytes_view& v); diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 45d25456f0..ad7de30dc7 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -216,9 +216,11 @@ future<> stream_session::test(distributed& qp) { sslog.debug("================ START STREAM =============="); auto sp = stream_plan("MYPLAN"); auto to = inet_address("127.0.0.2"); + auto tb = sstring("tb"); + auto ks = sstring("ks"); std::vector> ranges = {query::range::make_open_ended_both_sides()}; - std::vector cfs{"tb"}; - sp.transfer_ranges(to, to, "ks", std::move(ranges), std::move(cfs)).execute(); + std::vector cfs{tb}; + sp.transfer_ranges(to, to, ks, ranges, cfs).request_ranges(to, to, ks, ranges, cfs).execute(); }); }); }); @@ -259,10 +261,7 @@ future<> stream_session::on_initialization_complete() { for (auto& summary : msg.summaries) { prepare_receiving(summary); } - // if we don't need to prepare for receiving stream, start sending files immediately - if (_requests.empty()) { - start_streaming_files(); - } + start_streaming_files(); }); } @@ -280,10 +279,12 @@ void stream_session::on_error() { // Only follower calls this function upon receiving of prepare_message from initiator messages::prepare_message stream_session::prepare(std::vector requests, std::vector summaries) { + sslog.debug("stream_session::prepare requests nr={}, summaries nr={}", requests.size(), summaries.size()); // prepare tasks set_state(stream_session_state::PREPARING); for (auto& request : requests) { // always flush on stream request + sslog.debug("stream_session::prepare stream_request={}", request); add_transfer_ranges(request.keyspace, request.ranges, request.column_families, true, request.repaired_at); } for (auto& summary : summaries) {