From cc0b4d249b95aca0d69966e8a29b2a7927869653 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 7 Aug 2019 22:50:06 +0800 Subject: [PATCH] streaming: Send error code from the sender to receiver In case of error on the sender side, the sender does not propagate the error to the receiver. The sender will close the stream. As a result, the receiver will get nullopt from the source in get_next_mutation_fragment and pass mutation_fragment_opt with no value to the generating_reader. In turn, the generating_reader generates end of stream. However, the last element that the generating_reader has generated can be any type of mutation_fragment. This makes the sstable that consumes the generating_reader violates the mutation_fragment stream rule. To fix, we need to propagate the error. However RPC streaming does not support propagate the error in the framework. User has to send an error code explicitly. Fixes: #4789 (cherry picked from commit bac987e32ada54797cb238b549f28a8730638972) streaming: Move stream_mutation_fragments_cmd to a new file Avoid including the stream_session.hh in messaging_service.hh. More importantly, fix the build because currently messaging_service.cc and messaging_service.hh does not include stream_mutation_fragments_cmd. I am not sure why it builds on my machine. Spotted this when backporting the change to 3.0 branch. Refs: #4789 (cherry picked from commit 49a73aa2fcc5b45e5a0fe6157703866a49d9d927) streaming: Do not call rpc stream flush in send_mutation_fragments The stream close() guarantees the data sent will be flushed. No need to call the stream flush() since the stream is not reused. Follow up fix for commit bac987e32ada (streaming: Send error code from the sender to receiver). Fixes: #4789 (cherry picked from commit 288371ce759c7db62508d71e7b52f4556517b282) Message-Id: <87058e290ae3f59f874b860121786b22f24957c7.1565189319.git.asias@scylladb.com> --- idl/streaming.idl.hh | 6 ++++ message/messaging_service.cc | 15 +++++---- message/messaging_service.hh | 7 ++-- streaming/stream_mutation_fragments_cmd.hh | 33 +++++++++++++++++++ streaming/stream_session.cc | 37 +++++++++++++++++++--- streaming/stream_transfer_task.cc | 14 ++++++-- 6 files changed, 94 insertions(+), 18 deletions(-) create mode 100644 streaming/stream_mutation_fragments_cmd.hh diff --git a/idl/streaming.idl.hh b/idl/streaming.idl.hh index 5160d3f3a2..7c043d40a4 100644 --- a/idl/streaming.idl.hh +++ b/idl/streaming.idl.hh @@ -51,4 +51,10 @@ enum class stream_reason : uint8_t { repair, }; +enum class stream_mutation_fragments_cmd : uint8_t { + error, + mutation_fragment_data, + end_of_stream, +}; + } diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 9001158072..1222030b13 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -87,6 +87,7 @@ #include "frozen_mutation.hh" #include "flat_mutation_reader.hh" #include "streaming/stream_manager.hh" +#include "streaming/stream_mutation_fragments_cmd.hh" namespace netw { @@ -651,27 +652,27 @@ std::unique_ptr& messaging_service::rpc return _rpc; } -rpc::sink messaging_service::make_sink_for_stream_mutation_fragments(rpc::source& source) { +rpc::sink messaging_service::make_sink_for_stream_mutation_fragments(rpc::source>& source) { return source.make_sink(); } -future, rpc::source> +future, rpc::source> messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) { if (is_stopping()) { - return make_exception_future, rpc::source>(rpc::closed_error()); + return make_exception_future, rpc::source>(rpc::closed_error()); } auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); - return rpc_client->make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { - auto rpc_handler = rpc()->make_client (utils::UUID, utils::UUID, utils::UUID, uint64_t, streaming::stream_reason, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); + return rpc_client->make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { + auto rpc_handler = rpc()->make_client (utils::UUID, utils::UUID, utils::UUID, uint64_t, streaming::stream_reason, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink).then_wrapped([sink, rpc_client] (future> source) mutable { return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable { - return make_ready_future, rpc::source>(std::move(sink), std::move(source.get0())); + return make_ready_future, rpc::source>(std::move(sink), std::move(source.get0())); }); }); }); } -void messaging_service::register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional, rpc::source source)>&& func) { +void messaging_service::register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional, rpc::source> source)>&& func) { register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func)); } diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 6e4ec5a3e8..ecb21c1d48 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -36,6 +36,7 @@ #include "tracing/tracing.hh" #include "digest_algorithm.hh" #include "streaming/stream_reason.hh" +#include "streaming/stream_mutation_fragments_cmd.hh" #include @@ -256,9 +257,9 @@ public: // Wrapper for STREAM_MUTATION_FRAGMENTS // The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, other status code value are reserved for future use. - void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source source)>&& func); - rpc::sink make_sink_for_stream_mutation_fragments(rpc::source& source); - future, rpc::source> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id); + void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source)>&& func); + rpc::sink make_sink_for_stream_mutation_fragments(rpc::source>& source); + future, rpc::source> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id); void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id); diff --git a/streaming/stream_mutation_fragments_cmd.hh b/streaming/stream_mutation_fragments_cmd.hh new file mode 100644 index 0000000000..e86f11b46e --- /dev/null +++ b/streaming/stream_mutation_fragments_cmd.hh @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +namespace streaming { + +enum class stream_mutation_fragments_cmd : uint8_t { + error, + mutation_fragment_data, + end_of_stream, +}; + + +} diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index c48a44750e..3555c631cf 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -63,6 +63,7 @@ #include "db/system_keyspace.hh" #include #include +#include "streaming/stream_mutation_fragments_cmd.hh" namespace streaming { @@ -214,7 +215,7 @@ void stream_session::init_messaging_service_handler() { }); }); }); - ms().register_stream_mutation_fragments([] (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source source) { + ms().register_stream_mutation_fragments([] (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source) { auto from = netw::messaging_service::get_source(cinfo); auto reason = reason_opt ? *reason_opt: stream_reason::unspecified; sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason)); @@ -225,15 +226,41 @@ void stream_session::init_messaging_service_handler() { return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(), [from, estimated_partitions, plan_id, schema_id, cf_id, source, reason] () mutable { return service::get_schema_for_write(schema_id, from).then([from, estimated_partitions, plan_id, schema_id, cf_id, source, reason] (schema_ptr s) mutable { auto sink = ms().make_sink_for_stream_mutation_fragments(source); - auto get_next_mutation_fragment = [source, plan_id, from, s] () mutable { - return source().then([plan_id, from, s] (stdx::optional> fmf_opt) mutable { - if (fmf_opt) { - frozen_mutation_fragment& fmf = std::get<0>(fmf_opt.value()); + struct stream_mutation_fragments_cmd_status { + bool got_cmd = false; + bool got_end_of_stream = false; + }; + auto cmd_status = make_lw_shared(); + auto get_next_mutation_fragment = [source, plan_id, from, s, cmd_status] () mutable { + return source().then([plan_id, from, s, cmd_status] (stdx::optional>> opt) mutable { + if (opt) { + auto cmd = std::get<1>(*opt); + if (cmd) { + cmd_status->got_cmd = true; + switch (*cmd) { + case stream_mutation_fragments_cmd::mutation_fragment_data: + break; + case stream_mutation_fragments_cmd::error: + return make_exception_future(std::runtime_error("Sender failed")); + case stream_mutation_fragments_cmd::end_of_stream: + cmd_status->got_end_of_stream = true; + return make_ready_future(); + default: + return make_exception_future(std::runtime_error("Sender sent wrong cmd")); + } + } + frozen_mutation_fragment& fmf = std::get<0>(*opt); auto sz = fmf.representation().size(); auto mf = fmf.unfreeze(*s); streaming::get_local_stream_manager().update_progress(plan_id, from.addr, progress_info::direction::IN, sz); return make_ready_future(std::move(mf)); } else { + // If the sender has sent stream_mutation_fragments_cmd it means it is + // a node that understands the new protocol. It must send end_of_stream + // before close the stream. + if (cmd_status->got_cmd && !cmd_status->got_end_of_stream) { + return make_exception_future(std::runtime_error("Sender did not sent end_of_stream")); + } return make_ready_future(); } }); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 39fa66b21d..80cb8b7d14 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -42,6 +42,7 @@ #include "streaming/stream_session.hh" #include "streaming/stream_manager.hh" #include "streaming/stream_reason.hh" +#include "streaming/stream_mutation_fragments_cmd.hh" #include "mutation_reader.hh" #include "frozen_mutation.hh" #include "mutation.hh" @@ -175,7 +176,7 @@ future<> send_mutations(lw_shared_ptr si) { future<> send_mutation_fragments(lw_shared_ptr si) { return si->estimate_partitions().then([si] (size_t estimated_partitions) { sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name(), estimated_partitions); - return netw::get_local_messaging_service().make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then([si] (rpc::sink sink, rpc::source source) mutable { + return netw::get_local_messaging_service().make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then([si] (rpc::sink sink, rpc::source source) mutable { auto got_error_from_peer = make_lw_shared(false); auto source_op = [source, got_error_from_peer, si] () mutable -> future<> { @@ -198,18 +199,25 @@ future<> send_mutation_fragments(lw_shared_ptr si) { }(); auto sink_op = [sink, si, got_error_from_peer] () mutable -> future<> { - return do_with(std::move(sink), [si, got_error_from_peer] (rpc::sink& sink) { + return do_with(std::move(sink), [si, got_error_from_peer] (rpc::sink& sink) { return repeat([&sink, si, got_error_from_peer] () mutable { return si->reader(db::no_timeout).then([&sink, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable { if (mf && !(*got_error_from_peer)) { frozen_mutation_fragment fmf = freeze(*s, *mf); auto size = fmf.representation().size(); streaming::get_local_stream_manager().update_progress(si->plan_id, si->id.addr, streaming::progress_info::direction::OUT, size); - return sink(fmf).then([] { return stop_iteration::no; }); + return sink(fmf, stream_mutation_fragments_cmd::mutation_fragment_data).then([] { return stop_iteration::no; }); } else { return make_ready_future(stop_iteration::yes); } }); + }).then([&sink] () mutable { + return sink(frozen_mutation_fragment(bytes_ostream()), stream_mutation_fragments_cmd::end_of_stream); + }).handle_exception([&sink] (std::exception_ptr ep) mutable { + // Notify the receiver the sender has failed + return sink(frozen_mutation_fragment(bytes_ostream()), stream_mutation_fragments_cmd::error).then([ep = std::move(ep)] () mutable { + return make_exception_future<>(std::move(ep)); + }); }).finally([&sink] () mutable { return sink.close(); });