diff --git a/database.cc b/database.cc index b5a1babc6b..e671ca46fe 100644 --- a/database.cc +++ b/database.cc @@ -1374,6 +1374,10 @@ const column_family& database::find_column_family(const utils::UUID& uuid) const } } +bool database::column_family_exists(const utils::UUID& uuid) const { + return _column_families.count(uuid); +} + void keyspace::create_replication_strategy(const std::map& options) { using namespace locator; diff --git a/database.hh b/database.hh index eb31bee832..5fe2607549 100644 --- a/database.hh +++ b/database.hh @@ -644,6 +644,7 @@ public: const column_family& find_column_family(const utils::UUID&) const throw (no_such_column_family); column_family& find_column_family(const schema_ptr&) throw (no_such_column_family); const column_family& find_column_family(const schema_ptr&) const throw (no_such_column_family); + bool column_family_exists(const utils::UUID& uuid) const; schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family); schema_ptr find_schema(const utils::UUID&) const throw (no_such_column_family); bool has_schema(const sstring& ks_name, const sstring& cf_name) const; diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 4b7e16ea53..61529c44cc 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -121,7 +121,14 @@ void stream_session::init_messaging_service_handler() { return do_with(std::move(fm), [plan_id, from] (const auto& fm) { auto fm_size = fm.representation().size(); get_local_stream_manager().update_progress(plan_id, from.addr, progress_info::direction::IN, fm_size); - return service::get_schema_for_write(fm.schema_version(), from).then([&fm] (schema_ptr s) { + return service::get_schema_for_write(fm.schema_version(), from).then([plan_id, from, &fm] (schema_ptr s) { + auto cf_id = fm.column_family_id(); + auto& db = service::get_local_storage_proxy().get_db().local(); + if (!db.column_family_exists(cf_id)) { + sslog.debug("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped", + plan_id, from.addr, cf_id); + return make_ready_future<>(); + } return service::get_storage_proxy().local().mutate_locally(std::move(s), fm); }); }); @@ -131,11 +138,17 @@ void stream_session::init_messaging_service_handler() { return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable { auto session = get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id); session->receive_task_completed(cf_id); - return session->get_db().invoke_on_all([ranges = std::move(ranges), cf_id] (database& db) { + return session->get_db().invoke_on_all([ranges = std::move(ranges), plan_id, from, cf_id] (database& db) { + if (!db.column_family_exists(cf_id)) { + sslog.debug("[Stream #{}] STREAM_MUTATION_DONE from {}: cf_id={} is missing, assume the table is dropped", + plan_id, from, cf_id); + return make_ready_future<>(); + } auto& cf = db.find_column_family(cf_id); for (auto& range : ranges) { cf.get_row_cache().invalidate(query::to_partition_range(range)); } + return make_ready_future<>(); }); }); }); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 699861055a..19bb22515c 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -87,24 +87,13 @@ future do_send_mutations(auto si, auto fm) { return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable { sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); auto fm_size = fm.representation().size(); - net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then_wrapped([si, fm_size] (auto&& f) { - try { - f.get(); - sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); - get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); - si->mutations_done.signal(); - } catch (std::exception& e) { - auto err = std::string(e.what()); - // Seastar RPC does not provide exception type info, so we can not catch no_such_column_family here - // Need to compare the exception error msg - if (err.find("Can't find a column family with UUID") != std::string::npos) { - sslog.info("[Stream #{}] remote node {} does not have the cf_id = {}", si->plan_id, si->id, si->cf_id); - si->mutations_done.signal(); - } else { - sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, err); - si->mutations_done.broken(); - } - } + net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] { + sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); + get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); + si->mutations_done.signal(); + }).handle_exception([si] (auto ep) { + sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep); + si->mutations_done.broken(); }).finally([] { get_local_stream_manager().mutation_send_limiter().signal(); }); @@ -118,7 +107,7 @@ future<> send_mutations(auto si) { return do_with(cf.make_reader(cf.schema(), si->pr, priority), [si] (auto& reader) { return repeat([si, &reader] () { return reader().then([si] (auto mopt) { - if (mopt) { + if (mopt && si->db.column_family_exists(si->cf_id)) { si->mutations_nr++; auto fm = frozen_mutation(*mopt); return do_send_mutations(si, std::move(fm)); @@ -156,7 +145,11 @@ void stream_transfer_task::start() { }); }).then([this, plan_id, cf_id, id] { sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id); - return session->ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id); + return session->ms().send_stream_mutation_done(id, plan_id, _ranges, + cf_id, session->dst_cpu_id).handle_exception([plan_id, id, cf_id] (auto ep) { + sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep); + throw; + }); }).then([this, id, plan_id, cf_id] { sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr); session->start_keep_alive_timer();