From 4abaacfc616370436eda6fbc9b127ffc47a76c61 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 9 Mar 2016 08:25:58 +0800 Subject: [PATCH 1/3] db: Introduce column_family_exists It is cheaper than throwing a no_such_column_family exception to test if a cf is gone, e.g., deleted. --- database.cc | 4 ++++ database.hh | 1 + 2 files changed, 5 insertions(+) diff --git a/database.cc b/database.cc index b5a1babc6b..e671ca46fe 100644 --- a/database.cc +++ b/database.cc @@ -1374,6 +1374,10 @@ const column_family& database::find_column_family(const utils::UUID& uuid) const } } +bool database::column_family_exists(const utils::UUID& uuid) const { + return _column_families.count(uuid); +} + void keyspace::create_replication_strategy(const std::map& options) { using namespace locator; diff --git a/database.hh b/database.hh index eb31bee832..5fe2607549 100644 --- a/database.hh +++ b/database.hh @@ -644,6 +644,7 @@ public: const column_family& find_column_family(const utils::UUID&) const throw (no_such_column_family); column_family& find_column_family(const schema_ptr&) throw (no_such_column_family); const column_family& find_column_family(const schema_ptr&) const throw (no_such_column_family); + bool column_family_exists(const utils::UUID& uuid) const; schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family); schema_ptr find_schema(const utils::UUID&) const throw (no_such_column_family); bool has_schema(const sstring& ks_name, const sstring& cf_name) const; From efa74dbae0fce52da12e865d05d765cfdab981b3 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 9 Mar 2016 08:27:41 +0800 Subject: [PATCH 2/3] streaming: Do not send if the cf is deleted It is possible that a cf is deleted after we make the cf reader. Avoid sending them to avoid the unnecessary overhead to send them on the wire and the peer node to drop the received mutations. --- streaming/stream_transfer_task.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 699861055a..cf9c84e848 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -118,7 +118,7 @@ future<> send_mutations(auto si) { return do_with(cf.make_reader(cf.schema(), si->pr, priority), [si] (auto& reader) { return repeat([si, &reader] () { return reader().then([si] (auto mopt) { - if (mopt) { + if (mopt && si->db.column_family_exists(si->cf_id)) { si->mutations_nr++; auto fm = frozen_mutation(*mopt); return do_send_mutations(si, std::move(fm)); From d9ead889f3bad22171599318766a9a023fc37e04 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 3 Mar 2016 15:13:00 +0800 Subject: [PATCH 3/3] streaming: Handle cf is deleted when sending STREAM_MUTATION_DONE In the preparation phase of streaming, we check that remote node has all the cf_id which are needed for the entire streaming process, including the cf_id which local node will send to remote node and wise versa. So, at later time, if the cf_id is missing, it must be that the cf_id is deleted. It is fine to ingore no_such_column_family exception. In this patch, we change the code to ignore at server side to avoid sending the exception back, to avoid handle exception in an IDL compatiable way. One thing we can improve is that the sender might know the cf is deleted later than the receiver does. In this case, the sender will send some more mutations if we send back the no_such_column_family back to the sender. However, since we do not throw exceptions in the receiver stream mutation handler, it will not cause a lot of overhead, the receiver will just ignore the mutation received. Fixes #979 --- streaming/stream_session.cc | 17 +++++++++++++++-- streaming/stream_transfer_task.cc | 31 ++++++++++++------------------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 4b7e16ea53..61529c44cc 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -121,7 +121,14 @@ void stream_session::init_messaging_service_handler() { return do_with(std::move(fm), [plan_id, from] (const auto& fm) { auto fm_size = fm.representation().size(); get_local_stream_manager().update_progress(plan_id, from.addr, progress_info::direction::IN, fm_size); - return service::get_schema_for_write(fm.schema_version(), from).then([&fm] (schema_ptr s) { + return service::get_schema_for_write(fm.schema_version(), from).then([plan_id, from, &fm] (schema_ptr s) { + auto cf_id = fm.column_family_id(); + auto& db = service::get_local_storage_proxy().get_db().local(); + if (!db.column_family_exists(cf_id)) { + sslog.debug("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped", + plan_id, from.addr, cf_id); + return make_ready_future<>(); + } return service::get_storage_proxy().local().mutate_locally(std::move(s), fm); }); }); @@ -131,11 +138,17 @@ void stream_session::init_messaging_service_handler() { return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable { auto session = get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id); session->receive_task_completed(cf_id); - return session->get_db().invoke_on_all([ranges = std::move(ranges), cf_id] (database& db) { + return session->get_db().invoke_on_all([ranges = std::move(ranges), plan_id, from, cf_id] (database& db) { + if (!db.column_family_exists(cf_id)) { + sslog.debug("[Stream #{}] STREAM_MUTATION_DONE from {}: cf_id={} is missing, assume the table is dropped", + plan_id, from, cf_id); + return make_ready_future<>(); + } auto& cf = db.find_column_family(cf_id); for (auto& range : ranges) { cf.get_row_cache().invalidate(query::to_partition_range(range)); } + return make_ready_future<>(); }); }); }); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index cf9c84e848..19bb22515c 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -87,24 +87,13 @@ future do_send_mutations(auto si, auto fm) { return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable { sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id); auto fm_size = fm.representation().size(); - net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then_wrapped([si, fm_size] (auto&& f) { - try { - f.get(); - sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); - get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); - si->mutations_done.signal(); - } catch (std::exception& e) { - auto err = std::string(e.what()); - // Seastar RPC does not provide exception type info, so we can not catch no_such_column_family here - // Need to compare the exception error msg - if (err.find("Can't find a column family with UUID") != std::string::npos) { - sslog.info("[Stream #{}] remote node {} does not have the cf_id = {}", si->plan_id, si->id, si->cf_id); - si->mutations_done.signal(); - } else { - sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, err); - si->mutations_done.broken(); - } - } + net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] { + sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr); + get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size); + si->mutations_done.signal(); + }).handle_exception([si] (auto ep) { + sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep); + si->mutations_done.broken(); }).finally([] { get_local_stream_manager().mutation_send_limiter().signal(); }); @@ -156,7 +145,11 @@ void stream_transfer_task::start() { }); }).then([this, plan_id, cf_id, id] { sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id); - return session->ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id); + return session->ms().send_stream_mutation_done(id, plan_id, _ranges, + cf_id, session->dst_cpu_id).handle_exception([plan_id, id, cf_id] (auto ep) { + sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep); + throw; + }); }).then([this, id, plan_id, cf_id] { sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr); session->start_keep_alive_timer();