streaming: Send error code from the sender to receiver

In case of error on the sender side, the sender does not propagate the
error to the receiver. The sender will close the stream. As a result,
the receiver will get nullopt from the source in
get_next_mutation_fragment and pass mutation_fragment_opt with no value
to the generating_reader. In turn, the generating_reader generates end
of stream. However, the last element that the generating_reader has
generated can be any type of mutation_fragment. This makes the sstable
that consumes the generating_reader violates the mutation_fragment
stream rule.

To fix, we need to propagate the error. However RPC streaming does not
support propagate the error in the framework. User has to send an error
code explicitly.

Fixes: #4789
(cherry picked from commit bac987e32a)

streaming: Move stream_mutation_fragments_cmd to a new file

Avoid including the stream_session.hh in messaging_service.hh.

More importantly, fix the build because currently messaging_service.cc
and messaging_service.hh does not include stream_mutation_fragments_cmd.
I am not sure why it builds on my machine. Spotted this when backporting
the change to 3.0 branch.

Refs: #4789
(cherry picked from commit 49a73aa2fc)

streaming: Do not call rpc stream flush in send_mutation_fragments

The stream close() guarantees the data sent will be flushed. No need to
call the stream flush() since the stream is not reused.

Follow up fix for commit bac987e32a (streaming: Send error code from
the sender to receiver).

Fixes: #4789
(cherry picked from commit 288371ce75)
Message-Id: <87058e290ae3f59f874b860121786b22f24957c7.1565189319.git.asias@scylladb.com>
This commit is contained in:
Asias He
2019-08-07 22:50:06 +08:00
committed by Tomasz Grabiec
parent e10afc7f50
commit cc0b4d249b
6 changed files with 94 additions and 18 deletions

View File

@@ -51,4 +51,10 @@ enum class stream_reason : uint8_t {
repair,
};
enum class stream_mutation_fragments_cmd : uint8_t {
error,
mutation_fragment_data,
end_of_stream,
};
}

View File

@@ -87,6 +87,7 @@
#include "frozen_mutation.hh"
#include "flat_mutation_reader.hh"
#include "streaming/stream_manager.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
namespace netw {
@@ -651,27 +652,27 @@ std::unique_ptr<messaging_service::rpc_protocol_wrapper>& messaging_service::rpc
return _rpc;
}
rpc::sink<int32_t> messaging_service::make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment>& source) {
rpc::sink<int32_t> messaging_service::make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>>& source) {
return source.make_sink<netw::serializer, int32_t>();
}
future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>
future<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>
messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) {
if (is_stopping()) {
return make_exception_future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>(rpc::closed_error());
return make_exception_future<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment>().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (utils::UUID, utils::UUID, utils::UUID, uint64_t, streaming::stream_reason, rpc::sink<frozen_mutation_fragment>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (utils::UUID, utils::UUID, utils::UUID, uint64_t, streaming::stream_reason, rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink).then_wrapped([sink, rpc_client] (future<rpc::source<int32_t>> source) mutable {
return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable {
return make_ready_future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>(std::move(sink), std::move(source.get0()));
return make_ready_future<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>(std::move(sink), std::move(source.get0()));
});
});
});
}
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::source<frozen_mutation_fragment> source)>&& func) {
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func));
}

View File

@@ -36,6 +36,7 @@
#include "tracing/tracing.hh"
#include "digest_algorithm.hh"
#include "streaming/stream_reason.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
#include <seastar/net/tls.hh>
@@ -256,9 +257,9 @@ public:
// Wrapper for STREAM_MUTATION_FRAGMENTS
// The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, other status code value are reserved for future use.
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment> source)>&& func);
rpc::sink<int32_t> make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment>& source);
future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id);
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func);
rpc::sink<int32_t> make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>>& source);
future<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>> make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id);
void register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id);

View File

@@ -0,0 +1,33 @@
/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
namespace streaming {
enum class stream_mutation_fragments_cmd : uint8_t {
error,
mutation_fragment_data,
end_of_stream,
};
}

View File

@@ -63,6 +63,7 @@
#include "db/system_keyspace.hh"
#include <boost/algorithm/cxx11/any_of.hpp>
#include <boost/range/adaptor/map.hpp>
#include "streaming/stream_mutation_fragments_cmd.hh"
namespace streaming {
@@ -214,7 +215,7 @@ void stream_session::init_messaging_service_handler() {
});
});
});
ms().register_stream_mutation_fragments([] (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional<stream_reason> reason_opt, rpc::source<frozen_mutation_fragment> source) {
ms().register_stream_mutation_fragments([] (const rpc::client_info& cinfo, UUID plan_id, UUID schema_id, UUID cf_id, uint64_t estimated_partitions, rpc::optional<stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source) {
auto from = netw::messaging_service::get_source(cinfo);
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason));
@@ -225,15 +226,41 @@ void stream_session::init_messaging_service_handler() {
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(), [from, estimated_partitions, plan_id, schema_id, cf_id, source, reason] () mutable {
return service::get_schema_for_write(schema_id, from).then([from, estimated_partitions, plan_id, schema_id, cf_id, source, reason] (schema_ptr s) mutable {
auto sink = ms().make_sink_for_stream_mutation_fragments(source);
auto get_next_mutation_fragment = [source, plan_id, from, s] () mutable {
return source().then([plan_id, from, s] (stdx::optional<std::tuple<frozen_mutation_fragment>> fmf_opt) mutable {
if (fmf_opt) {
frozen_mutation_fragment& fmf = std::get<0>(fmf_opt.value());
struct stream_mutation_fragments_cmd_status {
bool got_cmd = false;
bool got_end_of_stream = false;
};
auto cmd_status = make_lw_shared<stream_mutation_fragments_cmd_status>();
auto get_next_mutation_fragment = [source, plan_id, from, s, cmd_status] () mutable {
return source().then([plan_id, from, s, cmd_status] (stdx::optional<std::tuple<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>>> opt) mutable {
if (opt) {
auto cmd = std::get<1>(*opt);
if (cmd) {
cmd_status->got_cmd = true;
switch (*cmd) {
case stream_mutation_fragments_cmd::mutation_fragment_data:
break;
case stream_mutation_fragments_cmd::error:
return make_exception_future<mutation_fragment_opt>(std::runtime_error("Sender failed"));
case stream_mutation_fragments_cmd::end_of_stream:
cmd_status->got_end_of_stream = true;
return make_ready_future<mutation_fragment_opt>();
default:
return make_exception_future<mutation_fragment_opt>(std::runtime_error("Sender sent wrong cmd"));
}
}
frozen_mutation_fragment& fmf = std::get<0>(*opt);
auto sz = fmf.representation().size();
auto mf = fmf.unfreeze(*s);
streaming::get_local_stream_manager().update_progress(plan_id, from.addr, progress_info::direction::IN, sz);
return make_ready_future<mutation_fragment_opt>(std::move(mf));
} else {
// If the sender has sent stream_mutation_fragments_cmd it means it is
// a node that understands the new protocol. It must send end_of_stream
// before close the stream.
if (cmd_status->got_cmd && !cmd_status->got_end_of_stream) {
return make_exception_future<mutation_fragment_opt>(std::runtime_error("Sender did not sent end_of_stream"));
}
return make_ready_future<mutation_fragment_opt>();
}
});

View File

@@ -42,6 +42,7 @@
#include "streaming/stream_session.hh"
#include "streaming/stream_manager.hh"
#include "streaming/stream_reason.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "mutation_reader.hh"
#include "frozen_mutation.hh"
#include "mutation.hh"
@@ -175,7 +176,7 @@ future<> send_mutations(lw_shared_ptr<send_info> si) {
future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
return si->estimate_partitions().then([si] (size_t estimated_partitions) {
sslog.info("[Stream #{}] Start sending ks={}, cf={}, estimated_partitions={}, with new rpc streaming", si->plan_id, si->cf.schema()->ks_name(), si->cf.schema()->cf_name(), estimated_partitions);
return netw::get_local_messaging_service().make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then([si] (rpc::sink<frozen_mutation_fragment> sink, rpc::source<int32_t> source) mutable {
return netw::get_local_messaging_service().make_sink_and_source_for_stream_mutation_fragments(si->reader.schema()->version(), si->plan_id, si->cf_id, estimated_partitions, si->reason, si->id).then([si] (rpc::sink<frozen_mutation_fragment, stream_mutation_fragments_cmd> sink, rpc::source<int32_t> source) mutable {
auto got_error_from_peer = make_lw_shared<bool>(false);
auto source_op = [source, got_error_from_peer, si] () mutable -> future<> {
@@ -198,18 +199,25 @@ future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
}();
auto sink_op = [sink, si, got_error_from_peer] () mutable -> future<> {
return do_with(std::move(sink), [si, got_error_from_peer] (rpc::sink<frozen_mutation_fragment>& sink) {
return do_with(std::move(sink), [si, got_error_from_peer] (rpc::sink<frozen_mutation_fragment, stream_mutation_fragments_cmd>& sink) {
return repeat([&sink, si, got_error_from_peer] () mutable {
return si->reader(db::no_timeout).then([&sink, si, s = si->reader.schema(), got_error_from_peer] (mutation_fragment_opt mf) mutable {
if (mf && !(*got_error_from_peer)) {
frozen_mutation_fragment fmf = freeze(*s, *mf);
auto size = fmf.representation().size();
streaming::get_local_stream_manager().update_progress(si->plan_id, si->id.addr, streaming::progress_info::direction::OUT, size);
return sink(fmf).then([] { return stop_iteration::no; });
return sink(fmf, stream_mutation_fragments_cmd::mutation_fragment_data).then([] { return stop_iteration::no; });
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).then([&sink] () mutable {
return sink(frozen_mutation_fragment(bytes_ostream()), stream_mutation_fragments_cmd::end_of_stream);
}).handle_exception([&sink] (std::exception_ptr ep) mutable {
// Notify the receiver the sender has failed
return sink(frozen_mutation_fragment(bytes_ostream()), stream_mutation_fragments_cmd::error).then([ep = std::move(ep)] () mutable {
return make_exception_future<>(std::move(ep));
});
}).finally([&sink] () mutable {
return sink.close();
});