Merge "streaming: handle cf is deleted" from Asias

"Fixes #979
 Fixes #976"
This commit is contained in:
Pekka Enberg
2016-03-09 14:52:27 +02:00
4 changed files with 33 additions and 22 deletions

View File

@@ -1374,6 +1374,10 @@ const column_family& database::find_column_family(const utils::UUID& uuid) const
}
}
bool database::column_family_exists(const utils::UUID& uuid) const {
return _column_families.count(uuid);
}
void
keyspace::create_replication_strategy(const std::map<sstring, sstring>& options) {
using namespace locator;

View File

@@ -644,6 +644,7 @@ public:
const column_family& find_column_family(const utils::UUID&) const throw (no_such_column_family);
column_family& find_column_family(const schema_ptr&) throw (no_such_column_family);
const column_family& find_column_family(const schema_ptr&) const throw (no_such_column_family);
bool column_family_exists(const utils::UUID& uuid) const;
schema_ptr find_schema(const sstring& ks_name, const sstring& cf_name) const throw (no_such_column_family);
schema_ptr find_schema(const utils::UUID&) const throw (no_such_column_family);
bool has_schema(const sstring& ks_name, const sstring& cf_name) const;

View File

@@ -121,7 +121,14 @@ void stream_session::init_messaging_service_handler() {
return do_with(std::move(fm), [plan_id, from] (const auto& fm) {
auto fm_size = fm.representation().size();
get_local_stream_manager().update_progress(plan_id, from.addr, progress_info::direction::IN, fm_size);
return service::get_schema_for_write(fm.schema_version(), from).then([&fm] (schema_ptr s) {
return service::get_schema_for_write(fm.schema_version(), from).then([plan_id, from, &fm] (schema_ptr s) {
auto cf_id = fm.column_family_id();
auto& db = service::get_local_storage_proxy().get_db().local();
if (!db.column_family_exists(cf_id)) {
sslog.debug("[Stream #{}] STREAM_MUTATION from {}: cf_id={} is missing, assume the table is dropped",
plan_id, from.addr, cf_id);
return make_ready_future<>();
}
return service::get_storage_proxy().local().mutate_locally(std::move(s), fm);
});
});
@@ -131,11 +138,17 @@ void stream_session::init_messaging_service_handler() {
return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable {
auto session = get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id);
session->receive_task_completed(cf_id);
return session->get_db().invoke_on_all([ranges = std::move(ranges), cf_id] (database& db) {
return session->get_db().invoke_on_all([ranges = std::move(ranges), plan_id, from, cf_id] (database& db) {
if (!db.column_family_exists(cf_id)) {
sslog.debug("[Stream #{}] STREAM_MUTATION_DONE from {}: cf_id={} is missing, assume the table is dropped",
plan_id, from, cf_id);
return make_ready_future<>();
}
auto& cf = db.find_column_family(cf_id);
for (auto& range : ranges) {
cf.get_row_cache().invalidate(query::to_partition_range(range));
}
return make_ready_future<>();
});
});
});

View File

@@ -87,24 +87,13 @@ future<stop_iteration> do_send_mutations(auto si, auto fm) {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then_wrapped([si, fm_size] (auto&& f) {
try {
f.get();
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
} catch (std::exception& e) {
auto err = std::string(e.what());
// Seastar RPC does not provide exception type info, so we can not catch no_such_column_family here
// Need to compare the exception error msg
if (err.find("Can't find a column family with UUID") != std::string::npos) {
sslog.info("[Stream #{}] remote node {} does not have the cf_id = {}", si->plan_id, si->id, si->cf_id);
si->mutations_done.signal();
} else {
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, err);
si->mutations_done.broken();
}
}
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
si->mutations_done.signal();
}).handle_exception([si] (auto ep) {
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
si->mutations_done.broken();
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
@@ -118,7 +107,7 @@ future<> send_mutations(auto si) {
return do_with(cf.make_reader(cf.schema(), si->pr, priority), [si] (auto& reader) {
return repeat([si, &reader] () {
return reader().then([si] (auto mopt) {
if (mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);
return do_send_mutations(si, std::move(fm));
@@ -156,7 +145,11 @@ void stream_transfer_task::start() {
});
}).then([this, plan_id, cf_id, id] {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
return session->ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id);
return session->ms().send_stream_mutation_done(id, plan_id, _ranges,
cf_id, session->dst_cpu_id).handle_exception([plan_id, id, cf_id] (auto ep) {
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
throw;
});
}).then([this, id, plan_id, cf_id] {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr);
session->start_keep_alive_timer();