diff --git a/streaming/stream_result_future.cc b/streaming/stream_result_future.cc index 5f01d62166..17e37ae7f7 100644 --- a/streaming/stream_result_future.cc +++ b/streaming/stream_result_future.cc @@ -64,17 +64,20 @@ future stream_result_future::init_sending_side(UUID plan_id_, sstr return sr->_done.get_future(); } -void stream_result_future::init_receiving_side(UUID plan_id, sstring description, inet_address from) { +shared_ptr stream_result_future::init_receiving_side(UUID plan_id, sstring description, inet_address from) { auto& sm = get_local_stream_manager(); auto sr = sm.get_receiving_stream(plan_id); - if (sr == nullptr) { - sslog.info("[Stream #{}] Creating new streaming plan for {}, with {}", plan_id, description, from); - // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. - // TODO: stream_result_future needs a ref to stream_coordinator. - bool is_receiving = true; - sm.register_receiving(make_shared(plan_id, description, is_receiving)); + if (sr) { + auto err = sprint("[Stream #%s] GOT PREPARE_MESSAGE from %s, description=%s," + "stream_plan exists, duplicated message received?", plan_id, description, from); + sslog.warn(err.c_str()); + throw std::runtime_error(err); } - sslog.info("[Stream #{}] Received streaming plan for {}, with {}", plan_id, description, from); + sslog.info("[Stream #{}] Creating new streaming plan for {}, with {}", plan_id, description, from); + bool is_receiving = true; + sr = make_shared(plan_id, description, is_receiving); + sm.register_receiving(sr); + return sr; } void stream_result_future::handle_session_prepared(shared_ptr session) { diff --git a/streaming/stream_result_future.hh b/streaming/stream_result_future.hh index f9b00b766e..c11c2270eb 100644 --- a/streaming/stream_result_future.hh +++ b/streaming/stream_result_future.hh @@ -102,7 +102,7 @@ public: public: static future init_sending_side(UUID plan_id_, sstring description_, std::vector listeners_, shared_ptr coordinator_); - static void init_receiving_side(UUID plan_id, sstring description, inet_address from); + static shared_ptr init_receiving_side(UUID plan_id, sstring description, inet_address from); public: void add_event_listener(stream_event_handler* listener) { diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index a2830af3ef..4504e56047 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -73,27 +73,36 @@ static auto get_stream_result_future(utils::UUID plan_id) { return f; } +static auto get_session(utils::UUID plan_id, gms::inet_address from, const char* verb, std::experimental::optional cf_id = {}) { + if (cf_id) { + sslog.debug("[Stream #{}] GOT {} from {}: cf_id={}", plan_id, verb, from, *cf_id); + } else { + sslog.debug("[Stream #{}] GOT {} from {}", plan_id, verb, from); + } + auto sr = get_stream_result_future(plan_id); + if (!sr) { + auto err = sprint("[Stream #%s] GOT %s from %s: Can not find stream_manager", plan_id, verb, from); + sslog.warn(err.c_str()); + throw std::runtime_error(err); + } + auto coordinator = sr->get_coordinator(); + if (!coordinator) { + auto err = sprint("[Stream #%s] GOT %s from %s: Can not find coordinator", plan_id, verb, from); + sslog.warn(err.c_str()); + throw std::runtime_error(err); + } + return coordinator->get_or_create_session(from); +} + void stream_session::init_messaging_service_handler() { ms().register_prepare_message([] (const rpc::client_info& cinfo, prepare_message msg, UUID plan_id, sstring description) { const auto& src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); const auto& from = cinfo.retrieve_auxiliary("baddr"); auto dst_cpu_id = engine().cpu_id(); return smp::submit_to(dst_cpu_id, [msg = std::move(msg), plan_id, description = std::move(description), from, src_cpu_id, dst_cpu_id] () mutable { - sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE from {}", plan_id, from); - auto& sm = get_local_stream_manager(); - auto f = sm.get_receiving_stream(plan_id); - if (f) { - sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE from {}, stream_plan exists, duplicated message received?", plan_id, from); - throw std::runtime_error("stream_plan exists"); - } else { - stream_result_future::init_receiving_side(plan_id, description, from); - f = sm.get_receiving_stream(plan_id); - } - auto coordinator = f->get_coordinator(); - assert(coordinator); - auto session = coordinator->get_or_create_session(from); - assert(session); - session->init(f); + auto sr = stream_result_future::init_receiving_side(plan_id, description, from); + auto session = get_session(plan_id, from, "PREPARE_MESSAGE"); + session->init(sr); session->dst_cpu_id = src_cpu_id; session->start_keep_alive_timer(); sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE from {}: get session peer={}, dst_cpu_id={}", @@ -104,90 +113,45 @@ void stream_session::init_messaging_service_handler() { ms().register_prepare_done_message([] (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id) { const auto& from = cinfo.retrieve_auxiliary("baddr"); return smp::submit_to(dst_cpu_id, [plan_id, from] () mutable { - sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE from {}", plan_id, from); - auto f = get_stream_result_future(plan_id); - if (f) { - auto coordinator = f->get_coordinator(); - assert(coordinator); - auto session = coordinator->get_or_create_session(from); - assert(session); - session->start_keep_alive_timer(); - session->follower_start_sent(); - return make_ready_future<>(); - } else { - auto err = sprint("[Stream #%s] GOT PREPARE_DONE_MESSAGE from %s: Can not find stream_manager", plan_id, from); - sslog.warn(err.c_str()); - throw std::runtime_error(err); - } + auto session = get_session(plan_id, from, "PREPARE_DONE_MESSAGE"); + session->start_keep_alive_timer(); + session->follower_start_sent(); + return make_ready_future<>(); }); }); ms().register_stream_mutation([] (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id) { - msg_addr from = net::messaging_service::get_source(cinfo); - return smp::submit_to(dst_cpu_id, [plan_id, from, fm = std::move(fm)] () mutable { + auto from_msg_addr = net::messaging_service::get_source(cinfo); + return smp::submit_to(dst_cpu_id, [plan_id, from_msg_addr, fm = std::move(fm)] () mutable { + auto& from = from_msg_addr.addr; auto cf_id = fm.column_family_id(); - if (sslog.is_enabled(logging::log_level::debug)) { - sslog.debug("[Stream #{}] GOT STREAM_MUTATION from {}: cf_id={}", plan_id, from.addr, cf_id); - } - auto f = get_stream_result_future(plan_id); - if (f) { - auto coordinator = f->get_coordinator(); - assert(coordinator); - auto session = coordinator->get_or_create_session(from.addr); - assert(session); - session->start_keep_alive_timer(); - session->progress(cf_id, progress_info::direction::IN, fm.representation().size()); - return service::get_schema_for_write(fm.schema_version(), from).then([&fm] (schema_ptr s) { - return service::get_storage_proxy().local().mutate_locally(std::move(s), fm); - }); - } else { - auto err = sprint("[Stream #%s] GOT STREAM_MUTATION from %s: Can not find stream_manager", plan_id, from.addr); - sslog.warn(err.c_str()); - throw std::runtime_error(err); - } + auto session = get_session(plan_id, from, "STREAM_MUTATION", cf_id); + session->start_keep_alive_timer(); + session->progress(cf_id, progress_info::direction::IN, fm.representation().size()); + return service::get_schema_for_write(fm.schema_version(), from_msg_addr).then([&fm] (schema_ptr s) { + return service::get_storage_proxy().local().mutate_locally(std::move(s), fm); + }); }); }); ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id) { const auto& from = cinfo.retrieve_auxiliary("baddr"); return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable { - sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE from {}: cf_id={}", plan_id, from, cf_id); - auto f = get_stream_result_future(plan_id); - if (f) { - auto coordinator = f->get_coordinator(); - assert(coordinator); - auto session = coordinator->get_or_create_session(from); - assert(session); - session->start_keep_alive_timer(); - session->receive_task_completed(cf_id); - return session->get_db().invoke_on_all([ranges = std::move(ranges), cf_id] (database& db) { - auto& cf = db.find_column_family(cf_id); - for (auto& range : ranges) { - cf.get_row_cache().invalidate(query::to_partition_range(range)); - } - }); - } else { - auto err = sprint("[Stream #%s] GOT STREAM_MUTATION_DONE from %s: Can not find stream_manager", plan_id, from); - sslog.warn(err.c_str()); - throw std::runtime_error(err); - } + auto session = get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id); + session->start_keep_alive_timer(); + session->receive_task_completed(cf_id); + return session->get_db().invoke_on_all([ranges = std::move(ranges), cf_id] (database& db) { + auto& cf = db.find_column_family(cf_id); + for (auto& range : ranges) { + cf.get_row_cache().invalidate(query::to_partition_range(range)); + } + }); }); }); ms().register_complete_message([] (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id) { const auto& from = cinfo.retrieve_auxiliary("baddr"); return smp::submit_to(dst_cpu_id, [plan_id, from, dst_cpu_id] () mutable { - sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE from {}: dst_cpu_id={}", plan_id, from, dst_cpu_id); - auto f = get_stream_result_future(plan_id); - if (f) { - auto coordinator = f->get_coordinator(); - assert(coordinator); - auto session = coordinator->get_or_create_session(from); - assert(session); - session->start_keep_alive_timer(); - session->complete(); - } else { - auto err = sprint("[Stream #%s] COMPLETE_MESSAGE from %s: Can not find stream_manager", plan_id, from); - sslog.warn(err.c_str()); - throw std::runtime_error(err); - } + auto session = get_session(plan_id, from, "COMPLETE_MESSAGE"); + session->start_keep_alive_timer(); + session->complete(); }); }); }