Merge "Streaming updates" from Asias

"With this series:

1) We can verify that data from Node A to Node B are streamed correctly.

2) Session completion are handled now.

Node A:
[Stream #08a2d480-2f7b-11e5-ae28-000000000000] Session with 127.0.0.2 is complete
[Stream #08a2d480-2f7b-11e5-ae28-000000000000] All sessions completed

Node B:
[Stream #08a2d480-2f7b-11e5-ae28-000000000000] Session with 127.0.0.1 is complete
[Stream #08a2d480-2f7b-11e5-ae28-000000000000] All sessions completed"
This commit is contained in:
Avi Kivity
2015-07-21 11:29:26 +03:00
41 changed files with 387 additions and 663 deletions

View File

@@ -9,7 +9,7 @@
#include <algorithm>
#include <vector>
#include <boost/range/iterator_range.hpp>
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include "unimplemented.hh"
// value_traits is meant to abstract away whether we are working on 'bytes'

View File

@@ -347,8 +347,6 @@ urchin_core = (['database.cc',
'streaming/progress_info.cc',
'streaming/session_info.cc',
'streaming/stream_coordinator.cc',
'streaming/stream_writer.cc',
'streaming/stream_reader.cc',
'streaming/stream_manager.cc',
'streaming/stream_result_future.cc',
'streaming/messages/stream_init_message.cc',

View File

@@ -5,7 +5,7 @@
#include "serializer.hh"
#include "database.hh"
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
typedef uint32_t count_type; // Me thinks 32-bits are enough for "normal" count purposes.

View File

@@ -22,7 +22,7 @@
#pragma once
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include "gms/heart_beat_state.hh"
#include "gms/application_state.hh"
#include "gms/versioned_value.hh"

View File

@@ -23,7 +23,7 @@
#include "types.hh"
#include "core/sstring.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include "gms/inet_address.hh"
namespace gms {

View File

@@ -22,7 +22,7 @@
#pragma once
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include "gms/gossip_digest.hh"
#include "gms/inet_address.hh"
#include "gms/endpoint_state.hh"

View File

@@ -22,7 +22,7 @@
#pragma once
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include "gms/gossip_digest.hh"
#include "gms/inet_address.hh"
#include "gms/endpoint_state.hh"

View File

@@ -23,7 +23,7 @@
#include "types.hh"
#include "core/sstring.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include "gms/gossip_digest.hh"
namespace gms {

View File

@@ -23,7 +23,7 @@
#include "gms/version_generator.hh"
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include <ostream>
namespace gms {

View File

@@ -6,7 +6,7 @@
#include "types.hh"
#include "net/ip.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include <sstream>
namespace gms {

View File

@@ -23,7 +23,7 @@
#include "types.hh"
#include "core/sstring.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include "utils/UUID.hh"
#include "version_generator.hh"
#include "gms/inet_address.hh"

View File

@@ -329,20 +329,34 @@ future<unsigned> messaging_service::send_stream_init_message(shard_id id, stream
}
void messaging_service::register_prepare_message(std::function<future<streaming::messages::prepare_message> (streaming::messages::prepare_message msg, UUID plan_id,
inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func) {
inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::PREPARE_MESSAGE, std::move(func));
}
future<streaming::messages::prepare_message> messaging_service::send_prepare_message(shard_id id, streaming::messages::prepare_message msg, UUID plan_id,
inet_address from, inet_address connecting, unsigned dst_cpu_id) {
inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id) {
return send_message<streaming::messages::prepare_message>(this, messaging_verb::PREPARE_MESSAGE, std::move(id), std::move(msg),
std::move(plan_id), std::move(from), std::move(connecting), std::move(dst_cpu_id));
std::move(plan_id), std::move(from), std::move(connecting), std::move(src_cpu_id), std::move(dst_cpu_id));
}
void messaging_service::register_stream_mutation(std::function<future<> (frozen_mutation fm, unsigned dst_cpu_id)>&& func) {
void messaging_service::register_stream_mutation(std::function<future<> (UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION, std::move(func));
}
future<> messaging_service::send_stream_mutation(shard_id id, frozen_mutation fm, unsigned dst_cpu_id) {
return send_message<void>(this, messaging_verb::STREAM_MUTATION, std::move(id), std::move(fm), std::move(dst_cpu_id));
future<> messaging_service::send_stream_mutation(shard_id id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) {
return send_message<void>(this, messaging_verb::STREAM_MUTATION, std::move(id), std::move(plan_id), std::move(fm), std::move(dst_cpu_id));
}
void messaging_service::register_stream_mutation_done(std::function<future<> (UUID plan_id, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_DONE, std::move(func));
}
future<> messaging_service::send_stream_mutation_done(shard_id id, UUID plan_id, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
return send_message<void>(this, messaging_verb::STREAM_MUTATION_DONE, std::move(id), std::move(plan_id), std::move(cf_id), std::move(from), std::move(connecting), std::move(dst_cpu_id));
}
void messaging_service::register_complete_message(std::function<rpc::no_wait_type (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::COMPLETE_MESSAGE, std::move(func));
}
future<> messaging_service::send_complete_message(shard_id id, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
return send_message_oneway(this, messaging_verb::COMPLETE_MESSAGE, std::move(id), std::move(plan_id), std::move(from), std::move(connecting), std::move(dst_cpu_id));
}
void messaging_service::register_echo(std::function<future<> ()>&& func) {

View File

@@ -10,7 +10,7 @@
#include "core/print.hh"
#include "core/sstring.hh"
#include "net/api.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include "gms/inet_address.hh"
#include "rpc/rpc_types.hh"
#include <unordered_map>
@@ -93,6 +93,7 @@ enum class messaging_verb : int32_t {
STREAM_INIT_MESSAGE,
PREPARE_MESSAGE,
STREAM_MUTATION,
STREAM_MUTATION_DONE,
INCOMING_FILE_MESSAGE,
OUTGOING_FILE_MESSAGE,
RECEIVED_MESSAGE,
@@ -430,13 +431,19 @@ public:
// Wrapper for PREPARE_MESSAGE verb
void register_prepare_message(std::function<future<streaming::messages::prepare_message> (streaming::messages::prepare_message msg, UUID plan_id,
inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func);
inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id)>&& func);
future<streaming::messages::prepare_message> send_prepare_message(shard_id id, streaming::messages::prepare_message msg, UUID plan_id,
inet_address from, inet_address connecting, unsigned dst_cpu_id);
inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id);
// Wrapper for STREAM_MUTATION verb
void register_stream_mutation(std::function<future<> (frozen_mutation fm, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation(shard_id id, frozen_mutation fm, unsigned dst_cpu_id);
void register_stream_mutation(std::function<future<> (UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation(shard_id id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id);
void register_stream_mutation_done(std::function<future<> (UUID plan_id, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation_done(shard_id id, UUID plan_id, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id);
void register_complete_message(std::function<rpc::no_wait_type (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func);
future<> send_complete_message(shard_id id, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id);
// Wrapper for ECHO verb
void register_echo(std::function<future<> ()>&& func);

View File

@@ -1,4 +1,4 @@
#include <util/serialization.hh>
#include <utils/serialization.hh>
#include <iterator>
#include "serialization_format.hh"

View File

@@ -20,7 +20,7 @@
*/
#include "streaming/messages/file_message_header.hh"
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
namespace streaming {
namespace messages {

View File

@@ -22,7 +22,7 @@
#include "streaming/messages/prepare_message.hh"
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
namespace streaming {
namespace messages {

View File

@@ -21,7 +21,7 @@
#include "streaming/messages/received_message.hh"
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
namespace streaming {
namespace messages {

View File

@@ -21,7 +21,7 @@
#include "streaming/messages/retry_message.hh"
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
namespace streaming {
namespace messages {

View File

@@ -21,7 +21,7 @@
#include "streaming/messages/stream_init_message.hh"
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
namespace streaming {
namespace messages {

View File

@@ -114,7 +114,7 @@ public:
/**
* @return total number of files to send in the session
*/
long getTotalFilesToSend() {
long get_total_files_to_send() {
return get_total_files(sending_summaries);
}

View File

@@ -22,11 +22,13 @@
#include "streaming/stream_detail.hh"
#include "streaming/stream_session_state.hh"
#include "streaming/stream_coordinator.hh"
#include "log.hh"
namespace streaming {
extern logging::logger sslog;
using gms::inet_address;
//using stream_coordinator::host_streaming_data;
bool stream_coordinator::has_active_sessions() {
for (auto& x : _peer_sessions) {
@@ -100,8 +102,11 @@ stream_coordinator::host_streaming_data& stream_coordinator::get_host_data(inet_
}
stream_coordinator::host_streaming_data& stream_coordinator::get_or_create_host_data(inet_address peer) {
_peer_sessions[peer] = host_streaming_data(_connections_per_host, _keep_ss_table_level);
return _peer_sessions[peer];
auto it = _peer_sessions.find(peer);
if (it == _peer_sessions.end()) {
it = _peer_sessions.emplace(peer, host_streaming_data(_connections_per_host, _keep_ss_table_level)).first;
}
return it->second;
}
bool stream_coordinator::host_streaming_data::has_active_sessions() {
@@ -166,4 +171,17 @@ std::vector<session_info> stream_coordinator::host_streaming_data::get_all_sessi
return sessions;
}
void stream_coordinator::connect_all_stream_sessions() {
for (auto& data : _peer_sessions) {
data.second.connect_all_stream_sessions();
}
}
void stream_coordinator::host_streaming_data::connect_all_stream_sessions() {
for (auto& x : _stream_sessions) {
auto& session = x.second;
session->start();
sslog.info("[Stream #{}, ID#{}] Beginning stream session with {}", session->plan_id(), session->session_index(), session->peer);
}
}
} // namespace streaming

View File

@@ -67,12 +67,7 @@ public:
bool is_receiving();
void connect_all_stream_sessions() {
for (auto& data : _peer_sessions) {
data.second.connect_all_stream_sessions();
}
}
void connect_all_stream_sessions();
std::set<inet_address> get_peers();
public:
@@ -150,13 +145,7 @@ private:
shared_ptr<stream_session> get_or_create_next_session(inet_address peer, inet_address connecting);
void connect_all_stream_sessions() {
for (auto& x : _stream_sessions) {
auto& session = x.second;
session->start();
// logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.plan_id(), session.session_index(), session.peer);
}
}
void connect_all_stream_sessions();
std::vector<shared_ptr<stream_session>> get_all_stream_sessions();

View File

@@ -26,13 +26,16 @@
namespace streaming {
class stream_event_handler /* extends FutureCallback<StreamState> */ {
public:
/**
* Callback for various streaming events.
*
* @see StreamEvent.Type
* @param event Stream event.
*/
virtual void handle_stream_event(stream_event event) = 0;
virtual void handle_stream_event(session_complete_event event) {}
virtual void handle_stream_event(progress_event event) {}
virtual void handle_stream_event(session_prepared_event event) {}
};
} // namespace streaming

View File

@@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#include "streaming/stream_reader.hh"
namespace streaming {
stream_reader::stream_reader(file_message_header header, shared_ptr<stream_session> session_)
: cf_id(header.cf_id)
, estimated_keys(header.estimated_keys)
, sections(header.sections)
, session(session_)
// input_version = header.format.info.getVersion(header.version)
, repaired_at(header.repaired_at)
, format(header.format)
, sstable_level(header.sstable_level) {
}
stream_reader::~stream_reader() = default;
int64_t stream_reader::total_size() {
int64_t size = 0;
for (auto section : sections)
size += section.second - section.first;
return size;
}
} // namespace streaming

View File

@@ -1,141 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#pragma once
#include "utils/UUID.hh"
#include "streaming/stream_session.hh"
#include "sstables/sstables.hh"
#include "streaming/messages/file_message_header.hh"
#include <map>
namespace streaming {
/**
* StreamReader reads from stream and writes to SSTable.
*/
class stream_reader {
using UUID = utils::UUID;
using format_types = sstables::sstable::format_types;
using version_types = sstables::sstable::version_types;
using file_message_header = streaming::messages::file_message_header;
protected:
UUID cf_id;
int64_t estimated_keys;
std::map<int64_t, int64_t> sections;
shared_ptr<stream_session> session;
// FIXME: Version
version_types input_version;
int64_t repaired_at;
format_types format;
int sstable_level;
// FIXME: Descriptor
//Descriptor desc;
public:
stream_reader(file_message_header header, shared_ptr<stream_session> session_);
~stream_reader();
#if 0
/**
* @param channel where this reads data from
* @return SSTable transferred
* @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
public SSTableWriter read(ReadableByteChannel channel) throws IOException
{
logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repaired_at, sstableLevel);
long totalSize = totalSize();
Pair<String, String> kscf = Schema.instance.getCF(cf_id);
if (kscf == null)
{
// schema was dropped during streaming
throw new IOException("CF " + cf_id + " was dropped during streaming");
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
SSTableWriter writer = createWriter(cfs, totalSize, repaired_at, format);
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
try
{
while (in.getBytesRead() < totalSize)
{
writeRow(writer, in, cfs);
// TODO move this to BytesReadTracker
session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
return writer;
} catch (Throwable e)
{
writer.abort();
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
else
throw Throwables.propagate(e);
}
}
protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repaired_at, SSTableFormat.Type format) throws IOException
{
Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir), format));
return SSTableWriter.create(desc, estimated_keys, repaired_at, sstableLevel);
}
protected void drain(InputStream dis, long bytesRead) throws IOException
{
long toSkip = totalSize() - bytesRead;
// InputStream.skip can return -1 if dis is inaccessible.
long skipped = dis.skip(toSkip);
if (skipped == -1)
return;
toSkip = toSkip - skipped;
while (toSkip > 0)
{
skipped = dis.skip(toSkip);
if (skipped == -1)
break;
toSkip = toSkip - skipped;
}
}
#endif
protected:
int64_t total_size();
#if 0
protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
{
DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
writer.appendFromStream(key, cfs.metadata, in, input_version);
cfs.invalidateCachedRow(key);
}
#endif
};
} // namespace streaming

View File

@@ -20,20 +20,98 @@
#include "streaming/stream_result_future.hh"
#include "streaming/stream_manager.hh"
#include "log.hh"
namespace streaming {
extern logging::logger sslog;
void stream_result_future::init(UUID plan_id_, sstring description_, std::vector<stream_event_handler*> listeners_, shared_ptr<stream_coordinator> coordinator_) {
auto future = create_and_register(plan_id_, description_, coordinator_);
for (auto& listener : listeners_) {
future->add_event_listener(listener);
}
sslog.info("[Stream #{}] Executing streaming plan for {}", plan_id_, description_);
// Initialize and start all sessions
for (auto& session : coordinator_->get_all_stream_sessions()) {
session->init(future);
}
coordinator_->connect_all_stream_sessions();
}
void stream_result_future::init_receiving_side(int session_index, UUID plan_id,
sstring description, inet_address from, bool keep_ss_table_level) {
auto& sm = get_local_stream_manager();
auto f = sm.get_receiving_stream(plan_id);
if (f == nullptr) {
// logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
sslog.info("[Stream #{} ID#{}] Creating new streaming plan for {}", plan_id, session_index, description);
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
// TODO: stream_result_future needs a ref to stream_coordinator.
sm.register_receiving(make_shared<stream_result_future>(plan_id, description, keep_ss_table_level));
}
// logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
sslog.info("[Stream #{} ID#{}] Received streaming plan for {}", plan_id, session_index, description);
}
void stream_result_future::handle_session_prepared(shared_ptr<stream_session> session) {
auto si = session->get_session_info();
sslog.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
session->plan_id(),
session->session_index(),
si.get_total_files_to_receive(),
si.get_total_size_to_receive(),
si.get_total_files_to_send(),
si.get_total_size_to_send());
auto event = session_prepared_event(plan_id, si);
_coordinator->add_session_info(std::move(si));
fire_stream_event(std::move(event));
}
void stream_result_future::handle_session_complete(shared_ptr<stream_session> session) {
sslog.info("[Stream #{}] Session with {} is complete", session->plan_id(), session->peer);
auto event = session_complete_event(session);
fire_stream_event(std::move(event));
auto si = session->get_session_info();
_coordinator->add_session_info(std::move(si));
maybe_complete();
}
template <typename Event>
void stream_result_future::fire_stream_event(Event event) {
// delegate to listener
for (auto listener : _event_listeners) {
listener->handle_stream_event(std::move(event));
}
}
void stream_result_future::maybe_complete() {
if (!_coordinator->has_active_sessions()) {
auto final_state = get_current_state();
if (final_state.has_failed_session()) {
sslog.warn("[Stream #{}] Stream failed", plan_id);
// FIXME: setException(new StreamException(finalState, "Stream failed"));
} else {
sslog.info("[Stream #{}] All sessions completed", plan_id);
// FIXME: set(finalState);
}
}
}
stream_state stream_result_future::get_current_state() {
return stream_state(plan_id, description, _coordinator->get_all_session_info());
}
void stream_result_future::handle_progress(progress_info progress) {
_coordinator->update_progress(progress);
fire_stream_event(progress_event(plan_id, std::move(progress)));
}
shared_ptr<stream_result_future> stream_result_future::create_and_register(UUID plan_id_, sstring description_, shared_ptr<stream_coordinator> coordinator_) {
auto future = make_shared<stream_result_future>(plan_id_, description_, coordinator_);
auto& sm = get_local_stream_manager();
sm.register_receiving(future);
return future;
}
} // namespace streaming

View File

@@ -26,6 +26,8 @@
#include "gms/inet_address.hh"
#include "streaming/stream_coordinator.hh"
#include "streaming/stream_event_handler.hh"
#include "streaming/stream_state.hh"
#include "streaming/progress_info.hh"
#include <vector>
namespace streaming {
@@ -80,53 +82,24 @@ public:
shared_ptr<stream_coordinator> get_coordinator() { return _coordinator; };
public:
static void init(UUID plan_id_, sstring description_, std::vector<stream_event_handler*> listeners_, shared_ptr<stream_coordinator> coordinator_) {
auto future = create_and_register(plan_id_, description_, coordinator_);
for (auto& listener : listeners_) {
future->add_event_listener(listener);
}
//logger.info("[Stream #{}] Executing streaming plan for {}", plan_id, description);
// Initialize and start all sessions
for (auto& session : coordinator_->get_all_stream_sessions()) {
session->init(future);
}
coordinator_->connect_all_stream_sessions();
}
static void init(UUID plan_id_, sstring description_, std::vector<stream_event_handler*> listeners_, shared_ptr<stream_coordinator> coordinator_);
static void init_receiving_side(int session_index, UUID plan_id,
sstring description, inet_address from, bool keep_ss_table_level);
private:
static shared_ptr<stream_result_future> create_and_register(UUID plan_id_, sstring description_, shared_ptr<stream_coordinator> coordinator_) {
auto future = make_shared<stream_result_future>(plan_id_, description_, coordinator_);
// FIXME: StreamManager.instance.register(future);
return future;
}
static shared_ptr<stream_result_future> create_and_register(UUID plan_id_, sstring description_, shared_ptr<stream_coordinator> coordinator_);
#if 0
private void attachSocket(InetAddress from, int sessionIndex, Socket socket, boolean isForOutgoing, int version) throws IOException
{
StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, socket.getInetAddress());
session.init(this);
session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
}
#endif
public:
void add_event_listener(stream_event_handler* listener) {
// FIXME: Futures.addCallback(this, listener);
_event_listeners.push_back(listener);
}
#if 0
/**
* @return Current snapshot of streaming progress.
*/
public StreamState getCurrentState()
{
return new StreamState(planId, description, coordinator.getAllSessionInfo());
}
stream_state get_current_state();
#if 0
@Override
public boolean equals(Object o)
@@ -142,62 +115,20 @@ public:
{
return planId.hashCode();
}
void handleSessionPrepared(StreamSession session)
{
SessionInfo sessionInfo = session.getSessionInfo();
logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
session.planId(),
session.sessionIndex(),
sessionInfo.getTotalFilesToReceive(),
sessionInfo.getTotalSizeToReceive(),
sessionInfo.getTotalFilesToSend(),
sessionInfo.getTotalSizeToSend());
StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
coordinator.addSessionInfo(sessionInfo);
fireStreamEvent(event);
}
void handleSessionComplete(StreamSession session)
{
logger.info("[Stream #{}] Session with {} is complete", session.planId(), session.peer);
fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
SessionInfo sessionInfo = session.getSessionInfo();
coordinator.addSessionInfo(sessionInfo);
maybeComplete();
}
public void handleProgress(ProgressInfo progress)
{
coordinator.updateProgress(progress);
fireStreamEvent(new StreamEvent.ProgressEvent(planId, progress));
}
synchronized void fireStreamEvent(StreamEvent event)
{
// delegate to listener
for (StreamEventHandler listener : eventListeners)
listener.handleStreamEvent(event);
}
private synchronized void maybeComplete()
{
if (!coordinator.hasActiveSessions())
{
StreamState finalState = getCurrentState();
if (finalState.hasFailedSession())
{
logger.warn("[Stream #{}] Stream failed", planId);
setException(new StreamException(finalState, "Stream failed"));
}
else
{
logger.info("[Stream #{}] All sessions completed", planId);
set(finalState);
}
}
}
#endif
void handle_session_prepared(shared_ptr<stream_session> session);
void handle_session_complete(shared_ptr<stream_session> session);
void handle_progress(progress_info progress);
template <typename Event>
void fire_stream_event(Event event);
private:
void maybe_complete();
};
} // namespace streaming

View File

@@ -37,10 +37,13 @@
#include "utils/fb_utilities.hh"
#include "streaming/stream_plan.hh"
#include "core/sleep.hh"
#include "service/storage_service.hh"
#include "core/thread.hh"
#include "cql3/query_processor.hh"
namespace streaming {
thread_local logging::logger sslog("stream_session");
logging::logger sslog("stream_session");
void stream_session::init_messaging_service_handler() {
ms().register_stream_init_message([] (messages::stream_init_message msg, unsigned src_cpu_id) {
@@ -52,18 +55,20 @@ void stream_session::init_messaging_service_handler() {
return make_ready_future<unsigned>(dst_cpu_id);
});
});
ms().register_prepare_message([] (messages::prepare_message msg, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
sslog.debug("GOT PREPARE_MESSAGE");
return smp::submit_to(dst_cpu_id, [msg = std::move(msg), plan_id = std::move(plan_id), from, connecting] () mutable {
ms().register_prepare_message([] (messages::prepare_message msg, UUID plan_id, inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [msg = std::move(msg), plan_id, from, connecting, src_cpu_id] () mutable {
auto& sm = get_local_stream_manager();
auto f = sm.get_receiving_stream(plan_id);
sslog.debug("PREPARE_MESSAGE: plan_id={}, description={}, from={}, connecting={}", f->plan_id, f->description, from, connecting);
sslog.debug("GOT PREPARE_MESSAGE: plan_id={}, description={}, from={}, connecting={}", f->plan_id, f->description, from, connecting);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_next_session(from, from);
assert(session);
sslog.debug("PREPARE_MESSAGE: get session peer={} connecting={}", session->peer, session->connecting);
session->init(f);
session->dst_cpu_id = src_cpu_id;
sslog.debug("PREPARE_MESSAGE: get session peer={} connecting={} plan_id={} src_cpu_id={}, dst_cpu_id={}",
session->peer, session->connecting, session->plan_id(), session->src_cpu_id, session->dst_cpu_id);
auto msg_ret = session->prepare(std::move(msg.requests), std::move(msg.summaries));
return make_ready_future<messages::prepare_message>(std::move(msg_ret));
}
@@ -73,16 +78,40 @@ void stream_session::init_messaging_service_handler() {
return make_ready_future<messages::prepare_message>(std::move(msg_ret));
});
});
ms().register_stream_mutation([] (frozen_mutation fm, unsigned dst_cpu_id) {
sslog.debug("GOT STREAM_MUTATION");
return smp::submit_to(dst_cpu_id, [fm = std::move(fm)] () mutable {
ms().register_stream_mutation([] (UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [plan_id, fm = std::move(fm)] () mutable {
auto cf_id = fm.column_family_id();
auto& db = stream_session::get_local_db();
auto& cf = db.find_column_family(cf_id);
cf.apply(fm, db::replay_position());
sslog.debug("GOT STREAM_MUTATION: plan_id={}, cf_id={}", plan_id, cf_id);
try {
auto& db = stream_session::get_local_db();
auto& cf = db.find_column_family(cf_id);
cf.apply(fm, db::replay_position());
} catch (no_such_column_family) {
// TODO: Send error msg back
sslog.warn("stream_session: {} does not exist\n", cf_id);
}
return make_ready_future<>();
});
});
ms().register_stream_mutation_done([] (UUID plan_id, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [plan_id, cf_id, from, connecting] () mutable {
sslog.debug("GOT STREAM_MUTATION_DONE: plan_id={}, cf_id={}, from={}, connecting={}", plan_id, cf_id, from, connecting);
auto& sm = get_local_stream_manager();
auto f = sm.get_receiving_stream(plan_id);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_next_session(from, from);
assert(session);
session->receive_task_completed(cf_id);
return make_ready_future<>();
} else {
auto err = sprint("STREAM_MUTATION_DONE: Can not find stream_manager for plan_id=%s", plan_id);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
});
});
#if 0
ms().register_handler(messaging_verb::RETRY_MESSAGE, [] (messages::retry_message msg, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [msg = std::move(msg)] () mutable {
@@ -90,13 +119,27 @@ void stream_session::init_messaging_service_handler() {
return make_ready_future<>();
});
});
ms().register_handler(messaging_verb::COMPLETE_MESSAGE, [] (messages::complete_message msg, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [msg = std::move(msg)] () mutable {
// TODO
messages::complete_message msg_ret;
return make_ready_future<messages::complete_message>(std::move(msg_ret));
#endif
ms().register_complete_message([] (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
smp::submit_to(dst_cpu_id, [plan_id, from, connecting, dst_cpu_id] () mutable {
sslog.debug("GOT COMPLETE_MESSAGE, plan_id={}, from={}, connecting={}, dst_cpu_id={}", plan_id, from, connecting, dst_cpu_id);
auto& sm = get_local_stream_manager();
auto f = sm.get_receiving_stream(plan_id);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_next_session(from, from);
assert(session);
session->complete();
} else {
auto err = sprint("COMPLETE_MESSAGE: Can not find stream_manager for plan_id=%s", plan_id);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
});
return messaging_service::no_wait();
});
#if 0
ms().register_handler(messaging_verb::SESSION_FAILED_MESSAGE, [] (messages::session_failed_message msg, unsigned dst_cpu_id) {
smp::submit_to(dst_cpu_id, [msg = std::move(msg)] () mutable {
// TODO
@@ -143,32 +186,49 @@ future<> stream_session::init_streaming_service(distributed<database>& db) {
});
}
future<> stream_session::test() {
future<> stream_session::test(distributed<cql3::query_processor>& qp) {
if (utils::fb_utilities::get_broadcast_address() == inet_address("127.0.0.1")) {
return sleep(std::chrono::seconds(5)).then([] {
sslog.debug("================ STREAM_PLAN TEST ==============\n");
auto sp = stream_plan("MYPLAN");
auto to = inet_address("127.0.0.2");
std::vector<query::range<token>> ranges;
std::vector<sstring> cfs{"local"};
sp.transfer_ranges(to, to, "system", std::move(ranges), std::move(cfs)).execute();
auto tester = make_shared<timer<lowres_clock>>();
tester->set_callback ([tester, &qp] {
seastar::async([&qp] {
sslog.debug("================ STREAM_PLAN TEST ==============");
auto cs = service::client_state::for_external_calls();
service::query_state qs(cs);
auto opts = make_shared<cql3::query_options>(cql3::query_options::DEFAULT);
qp.local().process("CREATE KEYSPACE ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };", qs, *opts).get();
sslog.debug("CREATE KEYSPACE = KS DONE");
qp.local().process("CREATE TABLE ks.tb ( key text PRIMARY KEY, C0 text, C1 text, C2 text, C3 blob, C4 text);", qs, *opts).get();
sslog.debug("CREATE TABLE = TB DONE");
qp.local().process("insert into ks.tb (key,c0) values ('1','1');", qs, *opts).get();
sslog.debug("INSERT VALUE DONE: 1");
qp.local().process("insert into ks.tb (key,c0) values ('2','2');", qs, *opts).get();
sslog.debug("INSERT VALUE DONE: 2");
qp.local().process("insert into ks.tb (key,c0) values ('3','3');", qs, *opts).get();
sslog.debug("INSERT VALUE DONE: 3");
qp.local().process("insert into ks.tb (key,c0) values ('4','4');", qs, *opts).get();
sslog.debug("INSERT VALUE DONE: 4");
qp.local().process("insert into ks.tb (key,c0) values ('5','5');", qs, *opts).get();
sslog.debug("INSERT VALUE DONE: 5");
qp.local().process("insert into ks.tb (key,c0) values ('6','6');", qs, *opts).get();
sslog.debug("INSERT VALUE DONE: 6");
}).then([] {
sleep(std::chrono::seconds(10)).then([] {
sslog.debug("================ START STREAM ==============");
auto sp = stream_plan("MYPLAN");
auto to = inet_address("127.0.0.2");
std::vector<query::range<token>> ranges = {query::range<token>::make_open_ended_both_sides()};
std::vector<sstring> cfs{"tb"};
sp.transfer_ranges(to, to, "ks", std::move(ranges), std::move(cfs)).execute();
});
});
});
tester->arm(std::chrono::seconds(10));
}
return make_ready_future<>();
}
future<> stream_session::initiate() {
#if 0
logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
Socket incomingSocket = session.createConnection();
incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
incoming.sendInitMessage(incomingSocket, true);
logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
Socket outgoingSocket = session.createConnection();
outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
outgoing.sendInitMessage(outgoingSocket, false);
#endif
sslog.debug("[Stream #{}] Sending stream init for incoming stream", plan_id());
auto from = utils::fb_utilities::get_broadcast_address();
bool is_for_outgoing = true;
messages::stream_init_message msg(from, session_index(), plan_id(), description(),
@@ -194,12 +254,8 @@ future<> stream_session::on_initialization_complete() {
auto id = shard_id{this->peer, this->dst_cpu_id};
auto from = utils::fb_utilities::get_broadcast_address();
sslog.debug("SEND PREPARE_MESSAGE to {}", id);
return ms().send_prepare_message(id, std::move(prepare), plan_id(), from, this->connecting, this->dst_cpu_id).then([this] (messages::prepare_message msg) {
return ms().send_prepare_message(id, std::move(prepare), plan_id(), from, this->connecting, this->src_cpu_id, this->dst_cpu_id).then([this] (messages::prepare_message msg) {
sslog.debug("GOT PREPARE_MESSAGE Reply");
for (auto& request : msg.requests) {
// always flush on stream request
add_transfer_ranges(request.keyspace, request.ranges, request.column_families, true, request.repaired_at);
}
for (auto& summary : msg.summaries) {
prepare_receiving(summary);
}
@@ -211,8 +267,8 @@ future<> stream_session::on_initialization_complete() {
}
void stream_session::on_error() {
sslog.error("[Stream #{}] Streaming error occurred", plan_id());
#if 0
//logger.error("[Stream #{}] Streaming error occurred", planId(), e);
// send session failure message
if (handler.is_outgoing_connected()) {
handler.sendMessage(session_failed_message());
@@ -222,6 +278,7 @@ void stream_session::on_error() {
close_session(stream_session_state::FAILED);
}
// Only follower calls this function upon receiving of prepare_message from initiator
messages::prepare_message stream_session::prepare(std::vector<stream_request> requests, std::vector<stream_summary> summaries) {
// prepare tasks
set_state(stream_session_state::PREPARING);
@@ -233,14 +290,13 @@ messages::prepare_message stream_session::prepare(std::vector<stream_request> re
prepare_receiving(summary);
}
// send back prepare message if prepare message contains stream request
// Always send a prepare_message back to follower
messages::prepare_message prepare;
if (!requests.empty()) {
for (auto& x: _transfers) {
auto& task = x.second;
prepare.summaries.emplace_back(task.get_summary());
}
//handler.send_message(std::move(prepare));
}
// if there are files to stream
@@ -279,8 +335,8 @@ void stream_session::receive(messages::incoming_file_message message) {
}
void stream_session::progress(/* Descriptor desc */ progress_info::direction dir, long bytes, long total) {
// auto progress = progress_info(peer, _index, /* desc.filenameFor(Component.DATA),*/ dir, bytes, total);
// streamResult.handleProgress(progress);
auto progress = progress_info(peer, _index, "", dir, bytes, total);
_stream_result->handle_progress(std::move(progress));
}
void stream_session::received(UUID cf_id, int sequence_number) {
@@ -301,10 +357,11 @@ void stream_session::retry(UUID cf_id, int sequence_number) {
void stream_session::complete() {
if (_state == stream_session_state::WAIT_COMPLETE) {
if (!_complete_sent) {
//handler.sendMessage(new CompleteMessage());
_complete_sent = true;
send_complete_message().then([this] {
_complete_sent = true;
});
}
close_session(stream_session_state::COMPLETE);
close_session(stream_session_state::COMPLETE);
} else {
set_state(stream_session_state::WAIT_COMPLETE);
}
@@ -326,6 +383,12 @@ session_info stream_session::get_session_info() {
return session_info(peer, _index, connecting, std::move(receiving_summaries), std::move(transfer_summaries), _state);
}
void stream_session::receive_task_completed(UUID cf_id) {
_receivers.erase(cf_id);
sslog.debug("receive_task_completed: cf_id={} done, {} {}", cf_id, _receivers.size(), _transfers.size());
maybe_completed();
}
void stream_session::task_completed(stream_receive_task& completed_task) {
_receivers.erase(completed_task.cf_id);
maybe_completed();
@@ -336,20 +399,40 @@ void stream_session::task_completed(stream_transfer_task& completed_task) {
maybe_completed();
}
future<> stream_session::send_complete_message() {
auto from = utils::fb_utilities::get_broadcast_address();
auto id = shard_id{this->peer, this->dst_cpu_id};
sslog.debug("SEND COMPLETE_MESSAGE to {}, plan_id={}", id, this->plan_id());
return this->ms().send_complete_message(id, this->plan_id(), from, this->connecting, this->dst_cpu_id).then_wrapped([] (auto&& f) {
try {
f.get();
sslog.debug("GOT COMPLETE_MESSAGE Reply");
return make_ready_future<>();
} catch (...) {
sslog.warn("ERROR COMPLETE_MESSAGE Reply");
return make_ready_future<>();
}
});
}
bool stream_session::maybe_completed() {
bool completed = _receivers.empty() && _transfers.empty();
if (completed) {
if (_state == stream_session_state::WAIT_COMPLETE) {
if (!_complete_sent) {
//handler.sendMessage(new CompleteMessage());
_complete_sent = true;
send_complete_message().then([this] {
_complete_sent = true;
});
}
close_session(stream_session_state::COMPLETE);
sslog.debug("session complete");
} else {
// notify peer that this session is completed
//handler.sendMessage(new CompleteMessage());
_complete_sent = true;
set_state(stream_session_state::WAIT_COMPLETE);
send_complete_message().then([this] {
_complete_sent = true;
set_state(stream_session_state::WAIT_COMPLETE);
sslog.debug("session wait complete");
});
}
}
return completed;
@@ -363,9 +446,8 @@ void stream_session::prepare_receiving(stream_summary& summary) {
}
void stream_session::start_streaming_files() {
_stream_result->handle_session_prepared(shared_from_this());
#if 0
streamResult.handleSessionPrepared(this);
state(State.STREAMING);
for (StreamTransferTask task : transfers.values())
{
@@ -407,7 +489,7 @@ std::vector<column_family*> stream_session::get_column_family_stores(const sstri
}
void stream_session::add_transfer_ranges(sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families, bool flush_tables, long repaired_at) {
std::vector<stream_detail> sstable_details;
std::vector<stream_detail> stream_details;
auto cfs = get_column_family_stores(keyspace, column_families);
if (flush_tables) {
// FIXME: flushSSTables(stores);
@@ -429,15 +511,15 @@ void stream_session::add_transfer_ranges(sstring keyspace, std::vector<query::ra
mutation_reader mr = make_combined_reader(std::move(readers));
// FIXME: sstable.estimatedKeysForRanges(ranges)
long estimated_keys = 0;
sstable_details.emplace_back(std::move(cf_id), std::move(prs), std::move(mr), estimated_keys, repaired_at);
stream_details.emplace_back(std::move(cf_id), std::move(prs), std::move(mr), estimated_keys, repaired_at);
}
if (!sstable_details.empty()) {
add_transfer_files(std::move(sstable_details));
if (!stream_details.empty()) {
add_transfer_files(std::move(stream_details));
}
}
void stream_session::add_transfer_files(std::vector<stream_detail> sstable_details) {
for (auto& detail : sstable_details) {
void stream_session::add_transfer_files(std::vector<stream_detail> stream_details) {
for (auto& detail : stream_details) {
#if 0
if (details.sections.empty()) {
// A reference was acquired on the sstable and we won't stream it
@@ -472,20 +554,21 @@ void stream_session::close_session(stream_session_state final_state) {
// Note that we shouldn't block on this close because this method is called on the handler
// incoming thread (so we would deadlock).
//handler.close();
//streamResult.handleSessionComplete(this);
_stream_result->handle_session_complete(shared_from_this());
}
}
void stream_session::start() {
if (_requests.empty() && _transfers.empty()) {
//logger.info("[Stream #{}] Session does not have any tasks.", planId());
sslog.info("[Stream #{}] Session does not have any tasks.", plan_id());
close_session(stream_session_state::COMPLETE);
return;
}
// logger.info("[Stream #{}] Starting streaming to {}{}", plan_id(),
// peer, peer == connecting ? "" : " through " + connecting);
if (peer == connecting) {
sslog.info("[Stream #{}] Starting streaming to {}", plan_id(), peer);
} else {
sslog.info("[Stream #{}] Starting streaming to {} through {}", plan_id(), peer, connecting);
}
initiate().then([this] {
return on_initialization_complete();
}).then_wrapped([this] (auto&& f) {

View File

@@ -23,6 +23,7 @@
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "core/distributed.hh"
#include "cql3/query_processor.hh"
#include "message/messaging_service.hh"
#include "utils/UUID.hh"
#include "streaming/stream_session_state.hh"
@@ -135,7 +136,7 @@ public:
}
static database& get_local_db() { return _db->local(); }
static future<> init_streaming_service(distributed<database>& db);
static future<> test();
static future<> test(distributed<cql3::query_processor>& qp);
public:
/**
* Streaming endpoint.
@@ -373,6 +374,8 @@ public:
*/
session_info get_session_info();
void receive_task_completed(UUID cf_id);
void task_completed(stream_receive_task& completed_task);
void task_completed(stream_transfer_task& completed_task);
@@ -387,6 +390,7 @@ public:
virtual void on_restart(inet_address endpoint, endpoint_state ep_state) override { close_session(stream_session_state::FAILED); }
private:
future<> send_complete_message();
bool maybe_completed();
#if 0

View File

@@ -23,7 +23,7 @@
#include "utils/UUID.hh"
#include "streaming/session_info.hh"
#include <set>
#include <vector>
namespace streaming {
@@ -35,9 +35,9 @@ public:
using UUID = utils::UUID;
UUID plan_id;
sstring description;
std::set<session_info> sessions;
std::vector<session_info> sessions;
stream_state(UUID plan_id_, sstring description_, std::set<session_info> sessions_)
stream_state(UUID plan_id_, sstring description_, std::vector<session_info> sessions_)
: plan_id(std::move(plan_id_))
, description(std::move(description_))
, sessions(std::move(sessions_)) {

View File

@@ -21,7 +21,7 @@
#include "streaming/stream_summary.hh"
#include "types.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
namespace streaming {

View File

@@ -31,7 +31,7 @@
namespace streaming {
extern thread_local logging::logger sslog;
extern logging::logger sslog;
stream_transfer_task::stream_transfer_task(shared_ptr<stream_session> session, UUID cf_id)
: stream_task(session, cf_id) {
@@ -61,7 +61,7 @@ void stream_transfer_task::start() {
return consume(msg.detail.mr, [this, seq, &id, &msg] (mutation&& m) {
auto fm = make_lw_shared<const frozen_mutation>(m);
sslog.debug("SEND STREAM_MUTATION to {}, cf_id={}", id, fm->column_family_id());
return session->ms().send_stream_mutation(id, *fm, session->dst_cpu_id).then([this, fm] {
return session->ms().send_stream_mutation(id, session->plan_id(), *fm, session->dst_cpu_id).then([this, fm] {
sslog.debug("GOT STREAM_MUTATION Reply");
return stop_iteration::no;
});
@@ -72,4 +72,31 @@ void stream_transfer_task::start() {
}
}
void stream_transfer_task::complete(int sequence_number) {
// ScheduledFuture timeout = timeoutTasks.remove(sequenceNumber);
// if (timeout != null)
// timeout.cancel(false);
files.erase(sequence_number);
sslog.debug("stream_transfer_task: complete cf_id = {}, seq = {}", this->cf_id, sequence_number);
auto signal_complete = files.empty();
// all file sent, notify session this task is complete.
if (signal_complete) {
using shard_id = net::messaging_service::shard_id;
auto from = utils::fb_utilities::get_broadcast_address();
auto id = shard_id{session->peer, session->dst_cpu_id};
session->ms().send_stream_mutation_done(id, session->plan_id(), this->cf_id, from, session->connecting, session->dst_cpu_id).then_wrapped([this] (auto&& f) {
try {
f.get();
sslog.debug("GOT STREAM_MUTATION_DONE Reply");
session->task_completed(*this);
} catch (...) {
sslog.warn("ERROR STREAM_MUTATION_DONE Reply ");
}
});
}
}
} // namespace streaming

View File

@@ -56,27 +56,7 @@ public:
*
* @param sequenceNumber sequence number of file
*/
void complete(int sequence_number) {
#if 0
boolean signalComplete;
synchronized (this)
{
ScheduledFuture timeout = timeoutTasks.remove(sequenceNumber);
if (timeout != null)
timeout.cancel(false);
OutgoingFileMessage file = files.remove(sequenceNumber);
if (file != null)
file.sstable.releaseReference();
signalComplete = files.isEmpty();
}
// all file sent, notify session this task is complete.
if (signalComplete)
session.taskCompleted(this);
#endif
}
void complete(int sequence_number);
public:
virtual void abort() override {

View File

@@ -1,33 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#include "streaming/stream_writer.hh"
namespace streaming {
int64_t stream_writer::total_size() {
int64_t size = 0;
for (auto section : sections) {
size += section.second - section.first;
}
return size;
}
} // namespace streaming

View File

@@ -1,147 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#pragma once
#include "streaming/stream_session.hh"
#include "sstables/sstables.hh"
#include <memory>
#include <map>
namespace streaming {
/**
* StreamWriter writes given section of the SSTable to given channel.
*/
class stream_writer {
static constexpr int DEFAULT_CHUNK_SIZE = 64 * 1024;
protected:
sstables::sstable& sstable;
std::map<int64_t, int64_t> sections;
//StreamRateLimiter limiter;
shared_ptr<stream_session> session;
#if 0
private OutputStream compressedOutput;
// allocate buffer to use for transfers only once
private byte[] transferBuffer;
#endif
public:
stream_writer(sstables::sstable& sstable_, std::map<int64_t, int64_t> sections_, shared_ptr<stream_session> session_)
: sstable(sstable_)
, sections(std::move(sections_))
, session(session_) {
//this.limiter = StreamManager.getRateLimiter(session.peer);
}
#if 0
/**
* Stream file of specified sections to given channel.
*
* StreamWriter uses LZF compression on wire to decrease size to transfer.
*
* @param channel where this writes data to
* @throws IOException on any I/O error
*/
public void write(WritableByteChannel channel) throws IOException
{
long totalSize = totalSize();
RandomAccessReader file = sstable.openDataReader();
ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
? DataIntegrityMetadata.checksumValidator(sstable.descriptor)
: null;
transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
// setting up data compression stream
compressedOutput = new LZFOutputStream(Channels.newOutputStream(channel));
long progress = 0L;
try
{
// stream each of the required sections of the file
for (Pair<Long, Long> section : sections)
{
long start = validator == null ? section.left : validator.chunkStart(section.left);
int readOffset = (int) (section.left - start);
// seek to the beginning of the section
file.seek(start);
if (validator != null)
validator.seek(start);
// length of the section to read
long length = section.right - start;
// tracks write progress
long bytesRead = 0;
while (bytesRead < length)
{
long lastBytesRead = write(file, validator, readOffset, length, bytesRead);
bytesRead += lastBytesRead;
progress += (lastBytesRead - readOffset);
session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
readOffset = 0;
}
// make sure that current section is send
compressedOutput.flush();
}
}
finally
{
// no matter what happens close file
FileUtils.closeQuietly(file);
FileUtils.closeQuietly(validator);
}
}
#endif
protected:
int64_t total_size();
#if 0
/**
* Sequentially read bytes from the file and write them to the output stream
*
* @param reader The file reader to read from
* @param validator validator to verify data integrity
* @param start number of bytes to skip transfer, but include for validation.
* @param length The full length that should be read from {@code reader}
* @param bytesTransferred Number of bytes already read out of {@code length}
*
* @return Number of bytes read
*
* @throws java.io.IOException on any I/O error
*/
protected long write(RandomAccessReader reader, ChecksumValidator validator, int start, long length, long bytesTransferred) throws IOException
{
int toTransfer = (int) Math.min(transferBuffer.length, length - bytesTransferred);
int minReadable = (int) Math.min(transferBuffer.length, reader.length() - reader.getFilePointer());
reader.readFully(transferBuffer, 0, minReadable);
if (validator != null)
validator.validate(transferBuffer, 0, minReadable);
limiter.acquire(toTransfer - start);
compressedOutput.write(transferBuffer, start, (toTransfer - start));
return toTransfer;
}
#endif
};
} // namespace streaming

View File

@@ -3,7 +3,7 @@
#include <sstream>
#include <initializer_list>
#include "util/serialization.hh"
#include "utils/serialization.hh"
#define BOOST_TEST_MODULE test-serialization
#define BOOST_TEST_DYN_LINK

View File

@@ -1,4 +1,3 @@
#if 0
#include "core/reactor.hh"
#include "core/app-template.hh"
#include "core/sstring.hh"
@@ -7,26 +6,11 @@
#include "gms/gossip_digest_ack.hh"
#include "gms/gossip_digest_ack2.hh"
#include "gms/gossip_digest.hh"
#include "core/sleep.hh"
#include "api/api.hh"
using namespace std::chrono_literals;
using namespace net;
struct empty_msg {
void serialize(bytes::iterator& out) const {
}
static empty_msg deserialize(bytes_view& v) {
return empty_msg();
}
size_t serialized_size() const {
return 0;
}
friend inline std::ostream& operator<<(std::ostream& os, const empty_msg& ack) {
return os << "empty_msg";
}
};
class tester {
private:
messaging_service& ms;
@@ -53,7 +37,7 @@ public:
}
public:
void init_handler() {
ms.register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [] (gms::gossip_digest_syn msg) {
ms.register_gossip_digest_syn([] (gms::gossip_digest_syn msg) {
print("Server got syn msg = %s\n", msg);
auto ep1 = inet_address("1.1.1.1");
auto ep2 = inet_address("2.2.2.2");
@@ -68,25 +52,23 @@ public:
{ep2, endpoint_state()},
};
gms::gossip_digest_ack ack(std::move(digests), std::move(eps));
return make_ready_future<gms::gossip_digest_ack>(ack);
return make_ready_future<gms::gossip_digest_ack>(std::move(ack));
});
ms.register_handler(messaging_verb::GOSSIP_DIGEST_ACK2, [] (gms::gossip_digest_ack2 msg) {
ms.register_gossip_digest_ack2([] (gms::gossip_digest_ack2 msg) {
print("Server got ack2 msg = %s\n", msg);
return messaging_service::no_wait();
});
ms.register_handler(messaging_verb::GOSSIP_SHUTDOWN, [] (empty_msg msg) {
print("Server got shutdown msg = %s\n", msg);
ms.register_gossip_shutdown([] (inet_address from) {
print("Server got shutdown msg = %s\n", from);
return messaging_service::no_wait();
});
ms.register_handler(messaging_verb::ECHO, [] (int x, int y) {
print("Server got echo msg = (%d, %ld) \n", x, y);
return make_ready_future<int, long>(x*x, y*y);
});
ms.register_handler(messaging_verb::UNUSED_1, [] (int x, int y) {
print("Server got echo msg = (%d, %ld) \n", x, y);
ms.register_echo([] {
print("Server got echo msg\n");
throw std::runtime_error("I'm throwing runtime_error exception");
long ret = x + y;
return make_ready_future<decltype(ret)>(ret);
return make_ready_future<>();
});
}
@@ -104,8 +86,7 @@ public:
{ep2, gen++, ver++},
};
gms::gossip_digest_syn syn("my_cluster", "my_partition", digests);
using RetMsg = gms::gossip_digest_ack;
return ms.send_message<RetMsg>(messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), std::move(syn)).then([this, id] (RetMsg ack) {
return ms.send_gossip_digest_syn(id, std::move(syn)).then([this, id] (gms::gossip_digest_ack ack) {
print("Client sent gossip_digest_syn got gossip_digest_ack reply = %s\n", ack);
// Prepare gossip_digest_ack2 message
auto ep1 = inet_address("3.3.3.3");
@@ -113,7 +94,7 @@ public:
{ep1, endpoint_state()},
};
gms::gossip_digest_ack2 ack2(std::move(eps));
return ms.send_message_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(ack2)).then([] () {
return ms.send_gossip_digest_ack2(id, std::move(ack2)).then([] () {
print("Client sent gossip_digest_ack2 got reply = void\n");
return make_ready_future<>();
});
@@ -123,8 +104,8 @@ public:
future<> test_gossip_shutdown() {
print("=== %s ===\n", __func__);
auto id = get_shard_id();
empty_msg msg;
return ms.send_message_oneway(messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(msg)).then([] () {
inet_address from("127.0.0.1");
return ms.send_gossip_shutdown(id, from).then([] () {
print("Client sent gossip_shutdown got reply = void\n");
return make_ready_future<>();
});
@@ -133,31 +114,12 @@ public:
future<> test_echo() {
print("=== %s ===\n", __func__);
auto id = get_shard_id();
using RetMsg = future<int, long>;
return ms.send_message<RetMsg>(messaging_verb::ECHO, id, 30, 60).then_wrapped([] (future<int, long> f) {
return ms.send_echo(id).then_wrapped([] (auto&& f) {
try {
auto msg = f.get();
print("Client sent echo got reply = (%d , %ld)\n", std::get<0>(msg), std::get<1>(msg));
return sleep(100ms).then([]{
return make_ready_future<>();
});
} catch (std::runtime_error& e) {
print("test_echo: %s\n", e.what());
}
return make_ready_future<>();
});
}
future<> test_exception() {
print("=== %s ===\n", __func__);
auto id = get_shard_id();
return ms.send_message<long>(messaging_verb::UNUSED_1, id, 3, 6).then_wrapped([] (future<long> f) {
try {
auto ret = std::get<0>(f.get());
print("Client sent UNUSED_1 got reply = %ld\n", ret);
f.get();
return make_ready_future<>();
} catch (std::runtime_error& e) {
print("Client sent UNUSED_1 got exception: %s\n", e.what());
print("test_echo: %s\n", e.what());
}
return make_ready_future<>();
});
@@ -207,8 +169,6 @@ int main(int ac, char ** av) {
return t->test_gossip_shutdown();
}).then([testers, t] {
return t->test_echo();
}).then([testers, t] {
return t->test_exception();
}).then([testers, t, stay_alive] {
if (stay_alive) {
return;
@@ -225,5 +185,3 @@ int main(int ac, char ** av) {
});
});
}
#endif
int main(int ac, char ** av) {}

View File

@@ -9,7 +9,7 @@
#include "core/print.hh"
#include "net/ip.hh"
#include "database.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include "combine.hh"
#include <cmath>
#include <sstream>

View File

@@ -15,7 +15,7 @@
#include "core/print.hh"
#include "net/byteorder.hh"
#include "bytes.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
namespace utils {

View File

@@ -10,7 +10,7 @@
#include <boost/algorithm/string.hpp>
#include <string>
#include "core/sstring.hh"
#include "util/serialization.hh"
#include "utils/serialization.hh"
#include "types.hh"
namespace utils {