diff --git a/idl/streaming.idl.hh b/idl/streaming.idl.hh index 5160d3f3a2..7c043d40a4 100644 --- a/idl/streaming.idl.hh +++ b/idl/streaming.idl.hh @@ -51,4 +51,10 @@ enum class stream_reason : uint8_t { repair, }; +enum class stream_mutation_fragments_cmd : uint8_t { + error, + mutation_fragment_data, + end_of_stream, +}; + } diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 9001158072..1222030b13 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -87,6 +87,7 @@ #include "frozen_mutation.hh" #include "flat_mutation_reader.hh" #include "streaming/stream_manager.hh" +#include "streaming/stream_mutation_fragments_cmd.hh" namespace netw { @@ -651,27 +652,27 @@ std::unique_ptr& messaging_service::rpc return _rpc; } -rpc::sink messaging_service::make_sink_for_stream_mutation_fragments(rpc::source& source) { +rpc::sink messaging_service::make_sink_for_stream_mutation_fragments(rpc::source>& source) { return source.make_sink(); } -future, rpc::source> +future, rpc::source> messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) { if (is_stopping()) { - return make_exception_future, rpc::source>(rpc::closed_error()); + return make_exception_future, rpc::source>(rpc::closed_error()); } auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); - return rpc_client->make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { - auto rpc_handler = rpc()->make_client (utils::UUID, utils::UUID, utils::UUID, uint64_t, streaming::stream_reason, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); + return rpc_client->make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { + auto rpc_handler = rpc()->make_client (utils::UUID, utils::UUID, utils::UUID, uint64_t, streaming::stream_reason, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink).then_wrapped([sink, rpc_client] (future> source) mutable { return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable { - return make_ready_future, rpc::source>(std::move(sink), std::move(source.get0())); + return make_ready_future, rpc::source>(std::move(sink), std::move(source.get0())); }); }); }); } -void messaging_service::register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional, rpc::source source)>&& func) { +void messaging_service::register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional, rpc::source> source)>&& func) { register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func)); } diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 6e4ec5a3e8..ecb21c1d48 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -36,6 +36,7 @@ #include "tracing/tracing.hh" #include "digest_algorithm.hh" #include "streaming/stream_reason.hh" +#include "streaming/stream_mutation_fragments_cmd.hh" #include @@ -256,9 +257,9 @@ public: // Wrapper for STREAM_MUTATION_FRAGMENTS // The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, other status code value are reserved for future use. - void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source source)>&& func); - rpc::sink make_sink_for_stream_mutation_fragments(rpc::source& source); - future, rpc::source> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id); + void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source)>&& func); + rpc::sink make_sink_for_stream_mutation_fragments(rpc::source>& source); + future, rpc::source> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id); void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id); diff --git a/streaming/stream_mutation_fragments_cmd.hh b/streaming/stream_mutation_fragments_cmd.hh new file mode 100644 index 0000000000..e86f11b46e --- /dev/null +++ b/streaming/stream_mutation_fragments_cmd.hh @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +namespace streaming { + +enum class stream_mutation_fragments_cmd : uint8_t { + error, + mutation_fragment_data, + end_of_stream, +}; + + +} diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index c48a44750e..3555c631cf 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -63,6 +63,7 @@ #include "db/system_keyspace.hh" #include #include +#include "streaming/stream_mutation_fragments_cmd.hh" namespace streaming { @@ -214,7 +215,7 @@ void stream_session::init_messaging_service_handler() { }); }); }); - ms().register_stream_mutation_fragments([] (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source source) { + ms().register_stream_mutation_fragments([] (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source) { auto from = netw::messaging_service::get_source(cinfo); auto reason = reason_opt ? *reason_opt: stream_reason::unspecified; sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason)); @@ -225,15 +226,41 @@ void stream_session::init_messaging_service_handler() { return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(), [from, estimated_partitions, plan_id, schema_id, cf_id, source, reason] () mutable { return service::get_schema_for_write(schema_id, from).then([from, estimated_partitions, plan_id, schema_id, cf_id, source, reason] (schema_ptr s) mutable { auto sink = ms().make_sink_for_stream_mutation_fragments(source); - auto get_next_mutation_fragment = [source, plan_id, from, s] () mutable { - return source().then([plan_id, from, s] (stdx::optional> fmf_opt) mutable { - if (fmf_opt) { - frozen_mutation_fragment& fmf = std::get<0>(fmf_opt.value()); + struct stream_mutation_fragments_cmd_status { + bool got_cmd = false; + bool got_end_of_stream = false; + }; + auto cmd_status = make_lw_shared(); + auto get_next_mutation_fragment = [source, plan_id, from, s, cmd_status] () mutable { + return source().then([plan_id, from, s, cmd_status] (stdx::optional>> opt) mutable { + if (opt) { + auto cmd = std::get<1>(*opt); + if (cmd) { + cmd_status->got_cmd = true; + switch (*cmd) { + case stream_mutation_fragments_cmd::mutation_fragment_data: + break; + case stream_mutation_fragments_cmd::error: + return make_exception_future(std::runtime_error("Sender failed")); + case stream_mutation_fragments_cmd::end_of_stream: + cmd_status->got_end_of_stream = true; + return make_ready_future(); + default: + return make_exception_future(std::runtime_error("Sender sent wrong cmd")); + } + } + frozen_mutation_fragment& fmf = std::get<0>(*opt); auto sz = fmf.representation().size(); auto mf = fmf.unfreeze(*s); streaming::get_local_stream_manager().update_progress(plan_id, from.addr, progress_info::direction::IN, sz); return make_ready_future(std::move(mf)); } else { + // If the sender has sent stream_mutation_fragments_cmd it means it is + // a node that understands the new protocol. It must send end_of_stream + // before close the stream. + if (cmd_status->got_cmd && !cmd_status->got_end_of_stream) { + return make_exception_future(std::runtime_error("Sender did not sent end_of_stream")); + } return make_ready_future(); } }); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 39fa66b21d..80cb8b7d14 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -42,6 +42,7 @@ #include "streaming/stream_session.hh" #include "streaming/stream_manager.hh" #include "streaming/stream_reason.hh" +#include "streaming/stream_mutation_fragments_cmd.hh" #include "mutation_reader.hh" #include "frozen_mutation.hh" #include "mutation.hh" @@ -175,7 +176,7 @@ future<> send_mutations(lw_shared_ptr si) { future<> send_mutation_fragments(lw_shared_ptr si) { return si->estimate_partitions().then([si] (size_t estimated_partitions) { sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name(), estimated_partitions); - return netw::get_local_messaging_service().make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then([si] (rpc::sink sink, rpc::source source) mutable { + return netw::get_local_messaging_service().make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then([si] (rpc::sink sink, rpc::source source) mutable { auto got_error_from_peer = make_lw_shared(false); auto source_op = [source, got_error_from_peer, si] () mutable -> future<> { @@ -198,18 +199,25 @@ future<> send_mutation_fragments(lw_shared_ptr si) { }(); auto sink_op = [sink, si, got_error_from_peer] () mutable -> future<> { - return do_with(std::move(sink), [si, got_error_from_peer] (rpc::sink& sink) { + return do_with(std::move(sink), [si, got_error_from_peer] (rpc::sink& sink) { return repeat([&sink, si, got_error_from_peer] () mutable { return si->reader(db::no_timeout).then([&sink, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable { if (mf && !(*got_error_from_peer)) { frozen_mutation_fragment fmf = freeze(*s, *mf); auto size = fmf.representation().size(); streaming::get_local_stream_manager().update_progress(si->plan_id, si->id.addr, streaming::progress_info::direction::OUT, size); - return sink(fmf).then([] { return stop_iteration::no; }); + return sink(fmf, stream_mutation_fragments_cmd::mutation_fragment_data).then([] { return stop_iteration::no; }); } else { return make_ready_future(stop_iteration::yes); } }); + }).then([&sink] () mutable { + return sink(frozen_mutation_fragment(bytes_ostream()), stream_mutation_fragments_cmd::end_of_stream); + }).handle_exception([&sink] (std::exception_ptr ep) mutable { + // Notify the receiver the sender has failed + return sink(frozen_mutation_fragment(bytes_ostream()), stream_mutation_fragments_cmd::error).then([ep = std::move(ep)] () mutable { + return make_exception_future<>(std::move(ep)); + }); }).finally([&sink] () mutable { return sink.close(); });