streaming: Allow drop table during streaming

Currently, if a table is dropped during streaming, the streaming would
fail with no_such_column_family error.

Since the table is dropped anyway, it makes more sense to ignore the
streaming result of the dropped table, whether it is successful or
failed.

This allows users to drop tables during node operations, e.g., bootstrap
or decommission a node.

This is especially useful for the cloud users where it is hard to
coordinate between a node operation by admin and user cql change.

This patch also fixes a possible user after free issue by not passing
the table reference object around.

Fixes #10395

Closes #10396
This commit is contained in:
Asias He
2022-04-19 08:35:44 +08:00
committed by Avi Kivity
parent 607ccf0393
commit 953af38281
3 changed files with 27 additions and 7 deletions

View File

@@ -78,14 +78,12 @@ void stream_manager::init_messaging_service_handler() {
auto from = netw::messaging_service::get_source(cinfo);
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason));
replica::table& cf = _db.local().find_column_family(cf_id);
if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) {
return make_exception_future<rpc::sink<int>>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later",
utils::fb_utilities::get_broadcast_address())));
}
return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, schema_id, &cf, source, reason] (schema_ptr s) mutable {
return _db.local().obtain_reader_permit(cf, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, schema_id, &cf, source, reason, s] (reader_permit permit) mutable {
return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, cf_id, schema_id, source, reason] (schema_ptr s) mutable {
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, cf_id, schema_id, source, reason, s] (reader_permit permit) mutable {
auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source);
struct stream_mutation_fragments_cmd_status {
bool got_cmd = false;
@@ -126,11 +124,15 @@ void stream_manager::init_messaging_service_handler() {
}
});
};
try {
// Make sure the table with cf_id is still present at this point.
// Close the sink in case the table is dropped.
auto op = _db.local().find_column_family(cf_id).stream_in_progress();
//FIXME: discarded future.
(void)mutation_writer::distribute_reader_and_consume_on_shards(s,
make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)),
make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason)),
cf.stream_in_progress()
std::move(op)
).then_wrapped([s, plan_id, from, sink, estimated_partitions] (future<uint64_t> f) mutable {
int32_t status = 0;
uint64_t received_partitions = 0;
@@ -152,6 +154,11 @@ void stream_manager::init_messaging_service_handler() {
sslog.error("[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (respond phase) for ks={}, cf={}, peer={}: {}",
plan_id, s->ks_name(), s->cf_name(), from.addr, ep);
});
} catch (...) {
return sink.close().then([sink, eptr = std::current_exception()] () -> future<rpc::sink<int>> {
return make_exception_future<rpc::sink<int>>(eptr);
});
}
return make_ready_future<rpc::sink<int>>(sink);
});
});

View File

@@ -200,7 +200,7 @@ future<> stream_transfer_task::execute() {
auto si = make_lw_shared<send_info>(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, [&sm, plan_id, addr = id.addr] (size_t sz) {
sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz);
});
return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) {
return si->has_relevant_range_on_this_shard().then([&sm, si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) {
if (!has_relevant_range_on_this_shard) {
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}",
plan_id, cf_id, this_shard_id());
@@ -219,8 +219,20 @@ future<> stream_transfer_task::execute() {
std::rethrow_exception(ep);
});
}).then([this, id, plan_id, cf_id] {
_mutation_done_sent = true;
sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr);
}).handle_exception([this, plan_id, id] (auto ep){
}).handle_exception([this, plan_id, cf_id, id] (std::exception_ptr ep) {
// If the table is dropped during streaming, we can ignore the
// errors and make the stream successful. This allows user to
// drop tables during node operations like decommission or
// bootstrap.
if (!session->manager().db().column_family_exists(cf_id)) {
sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming: {}", plan_id, cf_id, ep);
if (!_mutation_done_sent) {
return session->manager().ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id);
}
return make_ready_future<>();
}
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
std::rethrow_exception(ep);
});

View File

@@ -30,6 +30,7 @@ private:
dht::token_range_vector _ranges;
std::map<unsigned, dht::partition_range_vector> _shard_ranges;
long _total_size;
bool _mutation_done_sent = false;
public:
using UUID = utils::UUID;
stream_transfer_task(stream_transfer_task&&) = default;