Merge "streaming: Support both pushing and pulling of data"

"stream_plan.transfer_ranges() sends data from local to remote node.
stream_plan.request_ranges() asks remote to send data to local.

After streaming, both nodes contains all the keys."
This commit is contained in:
Avi Kivity
2015-07-22 10:13:10 +03:00
5 changed files with 22 additions and 11 deletions

View File

@@ -41,14 +41,14 @@ void prepare_message::serialize(bytes::iterator& out) const {
prepare_message prepare_message::deserialize(bytes_view& v) {
auto num = read_simple<int32_t>(v);
std::vector<stream_request> requests_(num);
std::vector<stream_request> requests_;
for (int32_t i = 0; i < num; i++) {
auto r = stream_request::deserialize(v);
requests_.push_back(std::move(r));
}
num = read_simple<int32_t>(v);
std::vector<stream_summary> summaries_(num);
std::vector<stream_summary> summaries_;
for (int32_t i = 0; i < num; i++) {
auto s = stream_summary::deserialize(v);
summaries_.push_back(std::move(s));

View File

@@ -181,7 +181,7 @@ void stream_coordinator::host_streaming_data::connect_all_stream_sessions() {
for (auto& x : _stream_sessions) {
auto& session = x.second;
session->start();
sslog.info("[Stream #{}, ID#{}] Beginning stream session with {}", session->plan_id(), session->session_index(), session->peer);
sslog.info("[Stream #{} ID#{}] Beginning stream session with {}", session->plan_id(), session->session_index(), session->peer);
}
}
} // namespace streaming

View File

@@ -43,13 +43,14 @@ stream_request stream_request::deserialize(bytes_view& v) {
auto keyspace_ = read_simple_short_string(v);
auto num = read_simple<int32_t>(v);
std::vector<query::range<token>> ranges_(num);
std::vector<query::range<token>> ranges_;
for (int32_t i = 0; i < num; i++) {
// FIXME: query::range<token>
ranges_.push_back(query::range<token>::make_open_ended_both_sides());
}
num = read_simple<int32_t>(v);
std::vector<sstring> column_families_(num);
std::vector<sstring> column_families_;
for (int32_t i = 0; i < num; i++) {
auto s = read_simple_short_string(v);
column_families_.push_back(std::move(s));
@@ -78,4 +79,12 @@ size_t stream_request::serialized_size() const {
return size;
}
std::ostream& operator<<(std::ostream& os, const stream_request& sr) {
os << "[ ks = " << sr.keyspace << " cf = ";
for (auto& cf : sr.column_families) {
os << cf << " ";
}
return os << "]";
}
} // namespace streaming;

View File

@@ -42,6 +42,7 @@ public:
, column_families(std::move(_column_families))
, repaired_at(_repaired_at) {
}
friend std::ostream& operator<<(std::ostream& os, const stream_request& r);
public:
void serialize(bytes::iterator& out) const;
static stream_request deserialize(bytes_view& v);

View File

@@ -216,9 +216,11 @@ future<> stream_session::test(distributed<cql3::query_processor>& qp) {
sslog.debug("================ START STREAM ==============");
auto sp = stream_plan("MYPLAN");
auto to = inet_address("127.0.0.2");
auto tb = sstring("tb");
auto ks = sstring("ks");
std::vector<query::range<token>> ranges = {query::range<token>::make_open_ended_both_sides()};
std::vector<sstring> cfs{"tb"};
sp.transfer_ranges(to, to, "ks", std::move(ranges), std::move(cfs)).execute();
std::vector<sstring> cfs{tb};
sp.transfer_ranges(to, to, ks, ranges, cfs).request_ranges(to, to, ks, ranges, cfs).execute();
});
});
});
@@ -259,10 +261,7 @@ future<> stream_session::on_initialization_complete() {
for (auto& summary : msg.summaries) {
prepare_receiving(summary);
}
// if we don't need to prepare for receiving stream, start sending files immediately
if (_requests.empty()) {
start_streaming_files();
}
start_streaming_files();
});
}
@@ -280,10 +279,12 @@ void stream_session::on_error() {
// Only follower calls this function upon receiving of prepare_message from initiator
messages::prepare_message stream_session::prepare(std::vector<stream_request> requests, std::vector<stream_summary> summaries) {
sslog.debug("stream_session::prepare requests nr={}, summaries nr={}", requests.size(), summaries.size());
// prepare tasks
set_state(stream_session_state::PREPARING);
for (auto& request : requests) {
// always flush on stream request
sslog.debug("stream_session::prepare stream_request={}", request);
add_transfer_ranges(request.keyspace, request.ranges, request.column_families, true, request.repaired_at);
}
for (auto& summary : summaries) {