streaming: Introduce get_session helper

To simplify streaming verb handler.

- Use get_session instead of open coded logic to get get_coordinator and
  stream_session in all the verb handlers

- Use throw instead of assert for error handling

- init_receiving_side now returns a shared_ptr<stream_result_future>
This commit is contained in:
Asias He
2016-01-29 15:38:23 +08:00
parent 360df6089c
commit cb92fe75e6
3 changed files with 61 additions and 94 deletions

View File

@@ -64,17 +64,20 @@ future<stream_state> stream_result_future::init_sending_side(UUID plan_id_, sstr
return sr->_done.get_future();
}
void stream_result_future::init_receiving_side(UUID plan_id, sstring description, inet_address from) {
shared_ptr<stream_result_future> stream_result_future::init_receiving_side(UUID plan_id, sstring description, inet_address from) {
auto& sm = get_local_stream_manager();
auto sr = sm.get_receiving_stream(plan_id);
if (sr == nullptr) {
sslog.info("[Stream #{}] Creating new streaming plan for {}, with {}", plan_id, description, from);
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
// TODO: stream_result_future needs a ref to stream_coordinator.
bool is_receiving = true;
sm.register_receiving(make_shared<stream_result_future>(plan_id, description, is_receiving));
if (sr) {
auto err = sprint("[Stream #%s] GOT PREPARE_MESSAGE from %s, description=%s,"
"stream_plan exists, duplicated message received?", plan_id, description, from);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
sslog.info("[Stream #{}] Received streaming plan for {}, with {}", plan_id, description, from);
sslog.info("[Stream #{}] Creating new streaming plan for {}, with {}", plan_id, description, from);
bool is_receiving = true;
sr = make_shared<stream_result_future>(plan_id, description, is_receiving);
sm.register_receiving(sr);
return sr;
}
void stream_result_future::handle_session_prepared(shared_ptr<stream_session> session) {

View File

@@ -102,7 +102,7 @@ public:
public:
static future<stream_state> init_sending_side(UUID plan_id_, sstring description_, std::vector<stream_event_handler*> listeners_, shared_ptr<stream_coordinator> coordinator_);
static void init_receiving_side(UUID plan_id, sstring description, inet_address from);
static shared_ptr<stream_result_future> init_receiving_side(UUID plan_id, sstring description, inet_address from);
public:
void add_event_listener(stream_event_handler* listener) {

View File

@@ -73,27 +73,36 @@ static auto get_stream_result_future(utils::UUID plan_id) {
return f;
}
static auto get_session(utils::UUID plan_id, gms::inet_address from, const char* verb, std::experimental::optional<utils::UUID> cf_id = {}) {
if (cf_id) {
sslog.debug("[Stream #{}] GOT {} from {}: cf_id={}", plan_id, verb, from, *cf_id);
} else {
sslog.debug("[Stream #{}] GOT {} from {}", plan_id, verb, from);
}
auto sr = get_stream_result_future(plan_id);
if (!sr) {
auto err = sprint("[Stream #%s] GOT %s from %s: Can not find stream_manager", plan_id, verb, from);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
auto coordinator = sr->get_coordinator();
if (!coordinator) {
auto err = sprint("[Stream #%s] GOT %s from %s: Can not find coordinator", plan_id, verb, from);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
return coordinator->get_or_create_session(from);
}
void stream_session::init_messaging_service_handler() {
ms().register_prepare_message([] (const rpc::client_info& cinfo, prepare_message msg, UUID plan_id, sstring description) {
const auto& src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
auto dst_cpu_id = engine().cpu_id();
return smp::submit_to(dst_cpu_id, [msg = std::move(msg), plan_id, description = std::move(description), from, src_cpu_id, dst_cpu_id] () mutable {
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE from {}", plan_id, from);
auto& sm = get_local_stream_manager();
auto f = sm.get_receiving_stream(plan_id);
if (f) {
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE from {}, stream_plan exists, duplicated message received?", plan_id, from);
throw std::runtime_error("stream_plan exists");
} else {
stream_result_future::init_receiving_side(plan_id, description, from);
f = sm.get_receiving_stream(plan_id);
}
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_session(from);
assert(session);
session->init(f);
auto sr = stream_result_future::init_receiving_side(plan_id, description, from);
auto session = get_session(plan_id, from, "PREPARE_MESSAGE");
session->init(sr);
session->dst_cpu_id = src_cpu_id;
session->start_keep_alive_timer();
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE from {}: get session peer={}, dst_cpu_id={}",
@@ -104,90 +113,45 @@ void stream_session::init_messaging_service_handler() {
ms().register_prepare_done_message([] (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id) {
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return smp::submit_to(dst_cpu_id, [plan_id, from] () mutable {
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE from {}", plan_id, from);
auto f = get_stream_result_future(plan_id);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_session(from);
assert(session);
session->start_keep_alive_timer();
session->follower_start_sent();
return make_ready_future<>();
} else {
auto err = sprint("[Stream #%s] GOT PREPARE_DONE_MESSAGE from %s: Can not find stream_manager", plan_id, from);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
auto session = get_session(plan_id, from, "PREPARE_DONE_MESSAGE");
session->start_keep_alive_timer();
session->follower_start_sent();
return make_ready_future<>();
});
});
ms().register_stream_mutation([] (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) {
msg_addr from = net::messaging_service::get_source(cinfo);
return smp::submit_to(dst_cpu_id, [plan_id, from, fm = std::move(fm)] () mutable {
auto from_msg_addr = net::messaging_service::get_source(cinfo);
return smp::submit_to(dst_cpu_id, [plan_id, from_msg_addr, fm = std::move(fm)] () mutable {
auto& from = from_msg_addr.addr;
auto cf_id = fm.column_family_id();
if (sslog.is_enabled(logging::log_level::debug)) {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION from {}: cf_id={}", plan_id, from.addr, cf_id);
}
auto f = get_stream_result_future(plan_id);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_session(from.addr);
assert(session);
session->start_keep_alive_timer();
session->progress(cf_id, progress_info::direction::IN, fm.representation().size());
return service::get_schema_for_write(fm.schema_version(), from).then([&fm] (schema_ptr s) {
return service::get_storage_proxy().local().mutate_locally(std::move(s), fm);
});
} else {
auto err = sprint("[Stream #%s] GOT STREAM_MUTATION from %s: Can not find stream_manager", plan_id, from.addr);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
auto session = get_session(plan_id, from, "STREAM_MUTATION", cf_id);
session->start_keep_alive_timer();
session->progress(cf_id, progress_info::direction::IN, fm.representation().size());
return service::get_schema_for_write(fm.schema_version(), from_msg_addr).then([&fm] (schema_ptr s) {
return service::get_storage_proxy().local().mutate_locally(std::move(s), fm);
});
});
});
ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id) {
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE from {}: cf_id={}", plan_id, from, cf_id);
auto f = get_stream_result_future(plan_id);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_session(from);
assert(session);
session->start_keep_alive_timer();
session->receive_task_completed(cf_id);
return session->get_db().invoke_on_all([ranges = std::move(ranges), cf_id] (database& db) {
auto& cf = db.find_column_family(cf_id);
for (auto& range : ranges) {
cf.get_row_cache().invalidate(query::to_partition_range(range));
}
});
} else {
auto err = sprint("[Stream #%s] GOT STREAM_MUTATION_DONE from %s: Can not find stream_manager", plan_id, from);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
auto session = get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id);
session->start_keep_alive_timer();
session->receive_task_completed(cf_id);
return session->get_db().invoke_on_all([ranges = std::move(ranges), cf_id] (database& db) {
auto& cf = db.find_column_family(cf_id);
for (auto& range : ranges) {
cf.get_row_cache().invalidate(query::to_partition_range(range));
}
});
});
});
ms().register_complete_message([] (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id) {
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return smp::submit_to(dst_cpu_id, [plan_id, from, dst_cpu_id] () mutable {
sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE from {}: dst_cpu_id={}", plan_id, from, dst_cpu_id);
auto f = get_stream_result_future(plan_id);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_session(from);
assert(session);
session->start_keep_alive_timer();
session->complete();
} else {
auto err = sprint("[Stream #%s] COMPLETE_MESSAGE from %s: Can not find stream_manager", plan_id, from);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
auto session = get_session(plan_id, from, "COMPLETE_MESSAGE");
session->start_keep_alive_timer();
session->complete();
});
});
}