/* * * Modified by ScyllaDB * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) */ #include "utils/log.hh" #include "streaming/stream_fwd.hh" #include "streaming/stream_detail.hh" #include "streaming/stream_transfer_task.hh" #include "streaming/stream_session.hh" #include "streaming/stream_manager.hh" #include "streaming/stream_reason.hh" #include "streaming/stream_mutation_fragments_cmd.hh" #include "service/topology_guard.hh" #include "readers/mutation_fragment_v1_stream.hh" #include "mutation/mutation_fragment_stream_validator.hh" #include "mutation/frozen_mutation.hh" #include "mutation/mutation.hh" #include "message/messaging_service.hh" #include "utils/interval.hh" #include "dht/i_partitioner.hh" #include "dht/sharder.hh" #include #include #include #include "sstables/sstables.hh" #include "replica/database.hh" #include "streaming/table_check.hh" #include "gms/feature_service.hh" #include "utils/error_injection.hh" #include "idl/streaming.dist.hh" #include namespace streaming { extern logging::logger sslog; stream_transfer_task::stream_transfer_task(shared_ptr session, table_id cf_id, dht::token_range_vector ranges, long total_size) : stream_task(session, cf_id) , _ranges(std::move(ranges)) , _total_size(total_size) { } stream_transfer_task::~stream_transfer_task() = default; struct send_info { netw::messaging_service& ms; streaming::plan_id plan_id; table_id cf_id; locator::host_id id; uint32_t dst_cpu_id; stream_reason reason; service::frozen_topology_guard topo_guard; size_t mutations_nr{0}; semaphore mutations_done{0}; bool error_logged = false; lw_shared_ptr cf; dht::token_range_vector ranges; dht::partition_range_vector prs; mutation_fragment_v1_stream reader; noncopyable_function update; send_info(netw::messaging_service& ms_, streaming::plan_id plan_id_, lw_shared_ptr tbl_, reader_permit permit_, dht::token_range_vector ranges_, locator::host_id id_, uint32_t dst_cpu_id_, stream_reason reason_, service::frozen_topology_guard topo_guard_, noncopyable_function update_fn) : ms(ms_) , plan_id(plan_id_) , cf_id(tbl_->schema()->id()) , id(id_) , dst_cpu_id(dst_cpu_id_) , reason(reason_) , topo_guard(topo_guard_) , cf(tbl_) , ranges(std::move(ranges_)) , prs(dht::to_partition_ranges(ranges)) , reader(cf->make_streaming_reader(cf->schema(), std::move(permit_), prs, gc_clock::now())) , update(std::move(update_fn)) { } future has_relevant_range_on_this_shard() { return do_with(false, ranges.begin(), [this] (bool& found_relevant_range, dht::token_range_vector::iterator& ranges_it) { auto stop_cond = [this, &found_relevant_range, &ranges_it] { return ranges_it == ranges.end() || found_relevant_range; }; return do_until(std::move(stop_cond), [this, &found_relevant_range, &ranges_it] { dht::token_range range = *ranges_it++; if (!found_relevant_range) { auto& table_sharder = cf->get_effective_replication_map()->get_sharder(*cf->schema()); auto sharder = dht::selective_token_range_sharder(table_sharder, std::move(range), this_shard_id()); auto range_shard = sharder.next(); if (range_shard) { found_relevant_range = true; } } return make_ready_future<>(); }).then([&found_relevant_range] { return found_relevant_range; }); }); } future estimate_partitions() { auto sstables = cf->get_sstables(); size_t partition_count = 0; for (auto& range : ranges) { partition_count += co_await cf->estimated_partitions_in_range(range); co_await coroutine::maybe_yield(); } co_return partition_count; } }; future<> send_mutation_fragments(lw_shared_ptr si) { return si->reader.has_more_fragments().then([si] (bool there_is_more) { if (!there_is_more) { // The reader contains no data sslog.info("[Stream #{}] Skip sending ks={}, cf={}, reader contains no data, with new rpc streaming", si->plan_id, si->cf->schema()->ks_name(), si->cf->schema()->cf_name()); return make_ready_future<>(); } return si->estimate_partitions().then([si] (size_t estimated_partitions) { sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf->schema()->ks_name(), si->cf->schema()->cf_name(), estimated_partitions); return si->ms.make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->topo_guard, si->id).then_unpack([si] (rpc::sink sink, rpc::source source) mutable { auto got_error_from_peer = make_lw_shared(false); auto table_is_dropped = make_lw_shared(false); auto source_op = [source, got_error_from_peer, table_is_dropped, si] () mutable -> future<> { return repeat([source, got_error_from_peer, table_is_dropped, si] () mutable { return source().then([source, got_error_from_peer, table_is_dropped, si] (std::optional> status_opt) mutable { if (status_opt) { auto status = std::get<0>(*status_opt); if (status == -1) { *got_error_from_peer = true; } else if (status == -2) { *got_error_from_peer = true; *table_is_dropped = true; } sslog.debug("Got status code from peer={}, plan_id={}, cf_id={}, status={}", si->id, si->plan_id, si->cf_id, status); // we've got an error from the other side, but we cannot just abandon rpc::source we // need to continue reading until EOS since this will signal that no more work // is left and rpc::source can be destroyed. The sender closes connection immediately // after sending the status, so EOS should arrive shortly. return stop_iteration::no; } else { return stop_iteration::yes; } }); }); }(); auto sink_op = [sink, si, got_error_from_peer] () mutable -> future<> { mutation_fragment_stream_validator validator(*(si->reader.schema())); return do_with(std::move(sink), std::move(validator), [si, got_error_from_peer] (rpc::sink& sink, mutation_fragment_stream_validator& validator) { return repeat([&sink, &validator, si, got_error_from_peer] () mutable { return si->reader().then([&sink, &validator, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable { if (*got_error_from_peer) { return make_exception_future(std::runtime_error("Got status error code from peer")); } if (mf) { if (!validator(mf->mutation_fragment_kind())) { return make_exception_future(std::runtime_error(format("Stream reader mutation_fragment validator failed, previous={}, current={}", validator.previous_mutation_fragment_kind(), mf->mutation_fragment_kind()))); } frozen_mutation_fragment fmf = freeze(*s, *mf); auto size = fmf.representation().size(); si->update(size); return sink(fmf, stream_mutation_fragments_cmd::mutation_fragment_data).then([] { return stop_iteration::no; }); } else { if (!validator.on_end_of_stream()) { return make_exception_future(std::runtime_error(format("Stream reader mutation_fragment validator failed on end_of_stream, previous={}, current=end_of_stream", validator.previous_mutation_fragment_kind()))); } return make_ready_future(stop_iteration::yes); } }); }).then([&sink] () mutable { return sink(frozen_mutation_fragment(bytes_ostream()), stream_mutation_fragments_cmd::end_of_stream); }).handle_exception([&sink] (std::exception_ptr ep) mutable { // Notify the receiver the sender has failed return sink(frozen_mutation_fragment(bytes_ostream()), stream_mutation_fragments_cmd::error).then([ep = std::move(ep)] () mutable { return make_exception_future<>(std::move(ep)); }); }).finally([&sink] () mutable { return sink.close(); }); }); }(); return when_all_succeed(std::move(source_op), std::move(sink_op)).then_unpack([got_error_from_peer, table_is_dropped, si] { if (*got_error_from_peer) { if (*table_is_dropped) { sslog.info("[Stream #{}] Skipped streaming the dropped table {}.{}", si->plan_id, si->cf->schema()->ks_name(), si->cf->schema()->cf_name()); } else { throw std::runtime_error(format("Peer failed to process mutation_fragment peer={}, plan_id={}, cf_id={}", si->id, si->plan_id, si->cf_id)); } } }); }); }); }); } future<> stream_transfer_task::execute() { auto plan_id = session->plan_id(); auto cf_id = this->cf_id; auto id = session->peer; auto& sm = session->manager(); auto table_dropped = co_await streaming::with_table_drop_silenced(sm.db(), sm.mm(), cf_id, [this, &sm, cf_id, plan_id, id] (const table_id &) { auto dst_cpu_id = session->dst_cpu_id; sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id); sort_and_merge_ranges(); auto reason = session->get_reason(); auto topo_guard = session->topo_guard(); return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason, topo_guard] (stream_manager& sm) mutable { auto tbl = sm.db().find_column_family(cf_id).shared_from_this(); return sm.db().obtain_reader_permit(*tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason, topo_guard] (reader_permit permit) mutable { auto si = make_lw_shared(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, topo_guard, [&sm, plan_id, id] (size_t sz) { sm.update_progress(plan_id, id, streaming::progress_info::direction::OUT, sz); }); return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) { if (!has_relevant_range_on_this_shard) { sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}", plan_id, cf_id, this_shard_id()); return make_ready_future<>(); } return send_mutation_fragments(std::move(si)); }).finally([si] { return si->reader.close(); }); }); }).then([this, plan_id, cf_id, id, &sm] { sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id); return ser::streaming_rpc_verbs::send_stream_mutation_done(&sm.ms(), id, plan_id, _ranges, cf_id, session->dst_cpu_id).handle_exception([plan_id, id] (auto ep) { sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep); std::rethrow_exception(ep); }); }).then([this, id, plan_id] { _mutation_done_sent = true; sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id); }).handle_exception([plan_id, id, &sm] (std::exception_ptr ep) { sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep); utils::get_local_injector().inject("stream_mutation_fragments_table_dropped", [&sm] () { sm.db().find_column_family(table_id::create_null_id()); }); std::rethrow_exception(ep); }); }); // If the table is dropped during streaming, we can ignore the // errors and make the stream successful. This allows user to // drop tables during node operations like decommission or // bootstrap. if (table_dropped) { sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming", plan_id, cf_id); if (!_mutation_done_sent) { co_await ser::streaming_rpc_verbs::send_stream_mutation_done(&session->manager().ms(), id, plan_id, _ranges, cf_id, session->dst_cpu_id); } } } void stream_transfer_task::append_ranges(const dht::token_range_vector& ranges) { _ranges.insert(_ranges.end(), ranges.begin(), ranges.end()); } void stream_transfer_task::sort_and_merge_ranges() { boost::icl::interval_set myset; dht::token_range_vector ranges; sslog.debug("cf_id = {}, before ranges = {}, size={}", cf_id, _ranges, _ranges.size()); _ranges.swap(ranges); for (auto& range : ranges) { // TODO: We should convert range_to_interval and interval_to_range to // take interval ranges. myset += locator::token_metadata::range_to_interval(std::move(range)); } ranges.clear(); ranges.shrink_to_fit(); for (auto& i : myset) { auto r = locator::token_metadata::interval_to_range(i); _ranges.push_back(dht::token_range(std::move(r))); } sslog.debug("cf_id = {}, after ranges = {}, size={}", cf_id, _ranges, _ranges.size()); } } // namespace streaming