stream_session: Make sure cf exists before streaming

We use storage_proxy::mutate_locally() to apply the mutations when we
receive them. mutate_locally() will ignore the mutation if the cf does not
exist. We check in the prepare phase to make sure all the cf's exist.
This commit is contained in:
Asias He
2015-08-04 15:03:39 +08:00
parent 36eb1d1d79
commit 924ca5915e
2 changed files with 30 additions and 9 deletions

View File

@@ -80,13 +80,12 @@ void stream_session::init_messaging_service_handler() {
session->dst_cpu_id = src_cpu_id;
sslog.debug("PREPARE_MESSAGE: get session peer={} connecting={} plan_id={} src_cpu_id={}, dst_cpu_id={}",
session->peer, session->connecting, session->plan_id(), session->src_cpu_id, session->dst_cpu_id);
auto msg_ret = session->prepare(std::move(msg.requests), std::move(msg.summaries));
return make_ready_future<messages::prepare_message>(std::move(msg_ret));
return session->prepare(std::move(msg.requests), std::move(msg.summaries));
} else {
auto err = sprint("PREPARE_MESSAGE: Can not find stream_manager for plan_id=%s", plan_id);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
// TODO: Send error msg back
print("stream_session:: session does not exist within plan_id = %s\n", plan_id);
auto msg_ret = messages::prepare_message();
return make_ready_future<messages::prepare_message>(std::move(msg_ret));
});
});
ms().register_stream_mutation([] (UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) {
@@ -291,16 +290,38 @@ void stream_session::on_error() {
}
// Only follower calls this function upon receiving of prepare_message from initiator
messages::prepare_message stream_session::prepare(std::vector<stream_request> requests, std::vector<stream_summary> summaries) {
future<messages::prepare_message> stream_session::prepare(std::vector<stream_request> requests, std::vector<stream_summary> summaries) {
sslog.debug("stream_session::prepare requests nr={}, summaries nr={}", requests.size(), summaries.size());
// prepare tasks
set_state(stream_session_state::PREPARING);
auto& db = get_local_db();
for (auto& request : requests) {
// always flush on stream request
sslog.debug("stream_session::prepare stream_request={}", request);
auto ks = request.keyspace;
// Make sure cf requested by peer node exists
for (auto& cf : request.column_families) {
try {
db.find_column_family(ks, cf);
} catch (no_such_column_family) {
auto err = sprint("prepare: requested ks={} cf={} does not exist", ks, cf);
sslog.error(err.c_str());
throw std::runtime_error(err);
}
}
add_transfer_ranges(request.keyspace, request.ranges, request.column_families, true, request.repaired_at);
}
for (auto& summary : summaries) {
sslog.debug("stream_session::prepare stream_summary={}", summary);
auto cf_id = summary.cf_id;
// Make sure cf the peer node will sent to us exists
try {
db.find_column_family(cf_id);
} catch (no_such_column_family) {
auto err = sprint("prepare: cf_id=%s does not exist", cf_id);
sslog.error(err.c_str());
throw std::runtime_error(err);
}
prepare_receiving(summary);
}
@@ -318,7 +339,7 @@ messages::prepare_message stream_session::prepare(std::vector<stream_request> re
start_streaming_files();
}
return prepare;
return make_ready_future<messages::prepare_message>(std::move(prepare));
}
void stream_session::file_sent(const messages::file_message_header& header) {

View File

@@ -318,7 +318,7 @@ public:
/**
* Prepare this session for sending/receiving files.
*/
messages::prepare_message prepare(std::vector<stream_request> requests, std::vector<stream_summary> summaries);
future<messages::prepare_message> prepare(std::vector<stream_request> requests, std::vector<stream_summary> summaries);
/**
* Call back after sending FileMessageHeader.