diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 89a95d2046..a5c14fc1f2 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -80,13 +80,12 @@ void stream_session::init_messaging_service_handler() { session->dst_cpu_id = src_cpu_id; sslog.debug("PREPARE_MESSAGE: get session peer={} connecting={} plan_id={} src_cpu_id={}, dst_cpu_id={}", session->peer, session->connecting, session->plan_id(), session->src_cpu_id, session->dst_cpu_id); - auto msg_ret = session->prepare(std::move(msg.requests), std::move(msg.summaries)); - return make_ready_future(std::move(msg_ret)); + return session->prepare(std::move(msg.requests), std::move(msg.summaries)); + } else { + auto err = sprint("PREPARE_MESSAGE: Can not find stream_manager for plan_id=%s", plan_id); + sslog.warn(err.c_str()); + throw std::runtime_error(err); } - // TODO: Send error msg back - print("stream_session:: session does not exist within plan_id = %s\n", plan_id); - auto msg_ret = messages::prepare_message(); - return make_ready_future(std::move(msg_ret)); }); }); ms().register_stream_mutation([] (UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) { @@ -291,16 +290,38 @@ void stream_session::on_error() { } // Only follower calls this function upon receiving of prepare_message from initiator -messages::prepare_message stream_session::prepare(std::vector requests, std::vector summaries) { +future stream_session::prepare(std::vector requests, std::vector summaries) { sslog.debug("stream_session::prepare requests nr={}, summaries nr={}", requests.size(), summaries.size()); // prepare tasks set_state(stream_session_state::PREPARING); + auto& db = get_local_db(); for (auto& request : requests) { // always flush on stream request sslog.debug("stream_session::prepare stream_request={}", request); + auto ks = request.keyspace; + // Make sure cf requested by peer node exists + for (auto& cf : request.column_families) { + try { + db.find_column_family(ks, cf); + } catch (no_such_column_family) { + auto err = sprint("prepare: requested ks={} cf={} does not exist", ks, cf); + sslog.error(err.c_str()); + throw std::runtime_error(err); + } + } add_transfer_ranges(request.keyspace, request.ranges, request.column_families, true, request.repaired_at); } for (auto& summary : summaries) { + sslog.debug("stream_session::prepare stream_summary={}", summary); + auto cf_id = summary.cf_id; + // Make sure cf the peer node will sent to us exists + try { + db.find_column_family(cf_id); + } catch (no_such_column_family) { + auto err = sprint("prepare: cf_id=%s does not exist", cf_id); + sslog.error(err.c_str()); + throw std::runtime_error(err); + } prepare_receiving(summary); } @@ -318,7 +339,7 @@ messages::prepare_message stream_session::prepare(std::vector re start_streaming_files(); } - return prepare; + return make_ready_future(std::move(prepare)); } void stream_session::file_sent(const messages::file_message_header& header) { diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 6655e578eb..64bbd57886 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -318,7 +318,7 @@ public: /** * Prepare this session for sending/receiving files. */ - messages::prepare_message prepare(std::vector requests, std::vector summaries); + future prepare(std::vector requests, std::vector summaries); /** * Call back after sending FileMessageHeader.