From f83d0bf7c4dcf76bf107ea0ce374e024e7654c0f Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Jul 2015 11:40:12 +0800 Subject: [PATCH 1/7] streaming: Fix info printout format --- streaming/stream_coordinator.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/stream_coordinator.cc b/streaming/stream_coordinator.cc index 43a660e035..9aa5c52abf 100644 --- a/streaming/stream_coordinator.cc +++ b/streaming/stream_coordinator.cc @@ -181,7 +181,7 @@ void stream_coordinator::host_streaming_data::connect_all_stream_sessions() { for (auto& x : _stream_sessions) { auto& session = x.second; session->start(); - sslog.info("[Stream #{}, ID#{}] Beginning stream session with {}", session->plan_id(), session->session_index(), session->peer); + sslog.info("[Stream #{} ID#{}] Beginning stream session with {}", session->plan_id(), session->session_index(), session->peer); } } } // namespace streaming From d934b2c7612434d2dfd5ee2b4b6e58c4617499af Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Jul 2015 11:16:20 +0800 Subject: [PATCH 2/7] streaming: Fix prepare_message and stream_request deserialization vector(size_type count) constructs the container with count default-inserted instances of T. So, current code will end up with 2*num elements which is wrong. --- streaming/messages/prepare_message.cc | 4 ++-- streaming/stream_request.cc | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streaming/messages/prepare_message.cc b/streaming/messages/prepare_message.cc index 51f27d8951..957c5276a4 100644 --- a/streaming/messages/prepare_message.cc +++ b/streaming/messages/prepare_message.cc @@ -41,14 +41,14 @@ void prepare_message::serialize(bytes::iterator& out) const { prepare_message prepare_message::deserialize(bytes_view& v) { auto num = read_simple(v); - std::vector requests_(num); + std::vector requests_; for (int32_t i = 0; i < num; i++) { auto r = stream_request::deserialize(v); requests_.push_back(std::move(r)); } num = read_simple(v); - std::vector summaries_(num); + std::vector summaries_; for (int32_t i = 0; i < num; i++) { auto s = stream_summary::deserialize(v); summaries_.push_back(std::move(s)); diff --git a/streaming/stream_request.cc b/streaming/stream_request.cc index c1fe28f730..5ddea21f15 100644 --- a/streaming/stream_request.cc +++ b/streaming/stream_request.cc @@ -43,13 +43,13 @@ stream_request stream_request::deserialize(bytes_view& v) { auto keyspace_ = read_simple_short_string(v); auto num = read_simple(v); - std::vector> ranges_(num); + std::vector> ranges_; for (int32_t i = 0; i < num; i++) { // FIXME: query::range } num = read_simple(v); - std::vector column_families_(num); + std::vector column_families_; for (int32_t i = 0; i < num; i++) { auto s = read_simple_short_string(v); column_families_.push_back(std::move(s)); From d39dd0f9d19aafa3627a14a337064208bf13a1f0 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Jul 2015 11:21:13 +0800 Subject: [PATCH 3/7] streaming: Add operator<< for stream_request --- streaming/stream_request.cc | 8 ++++++++ streaming/stream_request.hh | 1 + 2 files changed, 9 insertions(+) diff --git a/streaming/stream_request.cc b/streaming/stream_request.cc index 5ddea21f15..a8f2522125 100644 --- a/streaming/stream_request.cc +++ b/streaming/stream_request.cc @@ -78,4 +78,12 @@ size_t stream_request::serialized_size() const { return size; } +std::ostream& operator<<(std::ostream& os, const stream_request& sr) { + os << "[ ks = " << sr.keyspace << " cf = "; + for (auto& cf : sr.column_families) { + os << cf << " "; + } + return os << "]"; +} + } // namespace streaming; diff --git a/streaming/stream_request.hh b/streaming/stream_request.hh index 77e35ab79b..4d95b57fb8 100644 --- a/streaming/stream_request.hh +++ b/streaming/stream_request.hh @@ -42,6 +42,7 @@ public: , column_families(std::move(_column_families)) , repaired_at(_repaired_at) { } + friend std::ostream& operator<<(std::ostream& os, const stream_request& r); public: void serialize(bytes::iterator& out) const; static stream_request deserialize(bytes_view& v); From 736c0bc08fc99eff7816872f61c6e414ffe4fb4f Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Jul 2015 11:22:01 +0800 Subject: [PATCH 4/7] streaming: Improve query::range deserialization a bit query::range is not serialized ATM. Always use full range for now. --- streaming/stream_request.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/stream_request.cc b/streaming/stream_request.cc index a8f2522125..960e856553 100644 --- a/streaming/stream_request.cc +++ b/streaming/stream_request.cc @@ -46,6 +46,7 @@ stream_request stream_request::deserialize(bytes_view& v) { std::vector> ranges_; for (int32_t i = 0; i < num; i++) { // FIXME: query::range + ranges_.push_back(query::range::make_open_ended_both_sides()); } num = read_simple(v); From 1c6084472716db51b98822090343162eba234a63 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Jul 2015 11:25:07 +0800 Subject: [PATCH 5/7] streaming: Always start_streaming_files upon receiving of PREPARE_MESSAGE reply --- streaming/stream_session.cc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 45d25456f0..f78c981c1e 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -259,10 +259,7 @@ future<> stream_session::on_initialization_complete() { for (auto& summary : msg.summaries) { prepare_receiving(summary); } - // if we don't need to prepare for receiving stream, start sending files immediately - if (_requests.empty()) { - start_streaming_files(); - } + start_streaming_files(); }); } From 96049a99cfc20dfc737dc9986220cf92af535921 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Jul 2015 11:27:58 +0800 Subject: [PATCH 6/7] streaming: Add more debug print for stream_session::prepare --- streaming/stream_session.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index f78c981c1e..1dcad3164b 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -277,10 +277,12 @@ void stream_session::on_error() { // Only follower calls this function upon receiving of prepare_message from initiator messages::prepare_message stream_session::prepare(std::vector requests, std::vector summaries) { + sslog.debug("stream_session::prepare requests nr={}, summaries nr={}", requests.size(), summaries.size()); // prepare tasks set_state(stream_session_state::PREPARING); for (auto& request : requests) { // always flush on stream request + sslog.debug("stream_session::prepare stream_request={}", request); add_transfer_ranges(request.keyspace, request.ranges, request.column_families, true, request.repaired_at); } for (auto& summary : summaries) { From 54d482afe498b33e8cdf66f2789c693b9bc4f995 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Jul 2015 11:29:21 +0800 Subject: [PATCH 7/7] streaming: Test both pushing and pulling of data stream_plan.transfer_ranges() sends data from local to remote node. stream_plan.request_ranges() asks remote to send data to local. After streaming, both nodes contains all the keys. $ cat /tmp/out1|grep "\[Stream" [Stream #9fd8c3c0-3023-11e5-b450-000000000000] Executing streaming plan for MYPLAN [Stream #9fd8c3c0-3023-11e5-b450-000000000000] Starting streaming to 127.0.0.2 [Stream #9fd8c3c0-3023-11e5-b450-000000000000] Sending stream init for incoming stream [Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Beginning stream session with 127.0.0.2 [Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Prepare completed. Receiving 1 files(105553124400080 bytes), sending 1 files(105553124104160 bytes) [Stream #9fd8c3c0-3023-11e5-b450-000000000000] Session with 127.0.0.2 is complete [Stream #9fd8c3c0-3023-11e5-b450-000000000000] All sessions completed $ cat /tmp/out2|grep "\[Stream" [Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Creating new streaming plan for MYPLAN [Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Received streaming plan for MYPLAN [Stream #9fd8c3c0-3023-11e5-b450-000000000000 ID#0] Prepare completed. Receiving 1 files(105553124104160 bytes), sending 1 files(105553124400080 bytes) [Stream #9fd8c3c0-3023-11e5-b450-000000000000] Session with 127.0.0.1 is complete [Stream #9fd8c3c0-3023-11e5-b450-000000000000] All sessions completed Node 1 $ sstable2json tmp/1/ks/*/la-1-big-Data.db | grep key | sort {"key": "1", {"key": "2", {"key": "3", {"key": "4", {"key": "5", {"key": "6", Node 2 $ sstable2json tmp/2/ks/*/la-1-big-Data.db | grep key | sort {"key": "1", {"key": "2", {"key": "3", {"key": "4", {"key": "5", {"key": "6", --- streaming/stream_session.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 1dcad3164b..ad7de30dc7 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -216,9 +216,11 @@ future<> stream_session::test(distributed& qp) { sslog.debug("================ START STREAM =============="); auto sp = stream_plan("MYPLAN"); auto to = inet_address("127.0.0.2"); + auto tb = sstring("tb"); + auto ks = sstring("ks"); std::vector> ranges = {query::range::make_open_ended_both_sides()}; - std::vector cfs{"tb"}; - sp.transfer_ranges(to, to, "ks", std::move(ranges), std::move(cfs)).execute(); + std::vector cfs{tb}; + sp.transfer_ranges(to, to, ks, ranges, cfs).request_ranges(to, to, ks, ranges, cfs).execute(); }); }); });