streaming: Allow drop table during streaming
Currently, if a table is dropped during streaming, the streaming would fail with no_such_column_family error. Since the table is dropped anyway, it makes more sense to ignore the streaming result of the dropped table, whether it is successful or failed. This allows users to drop tables during node operations, e.g., bootstrap or decommission a node. This is especially useful for the cloud users where it is hard to coordinate between a node operation by admin and user cql change. This patch also fixes a possible user after free issue by not passing the table reference object around. Fixes #10395 Closes #10396
This commit is contained in:
@@ -78,14 +78,12 @@ void stream_manager::init_messaging_service_handler() {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
|
||||
sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason));
|
||||
replica::table& cf = _db.local().find_column_family(cf_id);
|
||||
if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) {
|
||||
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
|
||||
utils::fb_utilities::get_broadcast_address())));
|
||||
}
|
||||
|
||||
return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, schema_id, &cf, source, reason] (schema_ptr s) mutable {
|
||||
return _db.local().obtain_reader_permit(cf, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, schema_id, &cf, source, reason, s] (reader_permit permit) mutable {
|
||||
return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, cf_id, schema_id, source, reason] (schema_ptr s) mutable {
|
||||
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, cf_id, schema_id, source, reason, s] (reader_permit permit) mutable {
|
||||
auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source);
|
||||
struct stream_mutation_fragments_cmd_status {
|
||||
bool got_cmd = false;
|
||||
@@ -126,11 +124,15 @@ void stream_manager::init_messaging_service_handler() {
|
||||
}
|
||||
});
|
||||
};
|
||||
try {
|
||||
// Make sure the table with cf_id is still present at this point.
|
||||
// Close the sink in case the table is dropped.
|
||||
auto op = _db.local().find_column_family(cf_id).stream_in_progress();
|
||||
//FIXME: discarded future.
|
||||
(void)mutation_writer::distribute_reader_and_consume_on_shards(s,
|
||||
make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)),
|
||||
make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason)),
|
||||
cf.stream_in_progress()
|
||||
std::move(op)
|
||||
).then_wrapped([s, plan_id, from, sink, estimated_partitions] (future<uint64_t> f) mutable {
|
||||
int32_t status = 0;
|
||||
uint64_t received_partitions = 0;
|
||||
@@ -152,6 +154,11 @@ void stream_manager::init_messaging_service_handler() {
|
||||
sslog.error("[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (respond phase) for ks={}, cf={}, peer={}: {}",
|
||||
plan_id, s->ks_name(), s->cf_name(), from.addr, ep);
|
||||
});
|
||||
} catch (...) {
|
||||
return sink.close().then([sink, eptr = std::current_exception()] () -> future<rpc::sink<int>> {
|
||||
return make_exception_future<rpc::sink<int>>(eptr);
|
||||
});
|
||||
}
|
||||
return make_ready_future<rpc::sink<int>>(sink);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -200,7 +200,7 @@ future<> stream_transfer_task::execute() {
|
||||
auto si = make_lw_shared<send_info>(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, [&sm, plan_id, addr = id.addr] (size_t sz) {
|
||||
sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz);
|
||||
});
|
||||
return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) {
|
||||
return si->has_relevant_range_on_this_shard().then([&sm, si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) {
|
||||
if (!has_relevant_range_on_this_shard) {
|
||||
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}",
|
||||
plan_id, cf_id, this_shard_id());
|
||||
@@ -219,8 +219,20 @@ future<> stream_transfer_task::execute() {
|
||||
std::rethrow_exception(ep);
|
||||
});
|
||||
}).then([this, id, plan_id, cf_id] {
|
||||
_mutation_done_sent = true;
|
||||
sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr);
|
||||
}).handle_exception([this, plan_id, id] (auto ep){
|
||||
}).handle_exception([this, plan_id, cf_id, id] (std::exception_ptr ep) {
|
||||
// If the table is dropped during streaming, we can ignore the
|
||||
// errors and make the stream successful. This allows user to
|
||||
// drop tables during node operations like decommission or
|
||||
// bootstrap.
|
||||
if (!session->manager().db().column_family_exists(cf_id)) {
|
||||
sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming: {}", plan_id, cf_id, ep);
|
||||
if (!_mutation_done_sent) {
|
||||
return session->manager().ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
|
||||
std::rethrow_exception(ep);
|
||||
});
|
||||
|
||||
@@ -30,6 +30,7 @@ private:
|
||||
dht::token_range_vector _ranges;
|
||||
std::map<unsigned, dht::partition_range_vector> _shard_ranges;
|
||||
long _total_size;
|
||||
bool _mutation_done_sent = false;
|
||||
public:
|
||||
using UUID = utils::UUID;
|
||||
stream_transfer_task(stream_transfer_task&&) = default;
|
||||
|
||||
Reference in New Issue
Block a user