messaging_service: Get rid of timeout and retry logic for streaming verb

With the "Use range_streamer everywhere" (7217b7ab36) seires, all
the user of streaming now do streaming with relative small ranges and
can retry streaming at higher level.

There are problems with timeout and retry at RPC verb level in streaming:
1) Timeout can be false negative.
2) We can not cancel the send operations which are already called. When
user aborts the streaming, the retry logic keeps running for a long
time.

This patch removes all the timeout and retry logic for streaming verbs.
After this, the timeout is the job of TCP, the retry is the job of the
upper layer.

Message-Id: <df20303c1fa728dcfdf06430417cf2bd7a843b00.1503994267.git.asias@scylladb.com>
This commit is contained in:
Asias He
2017-08-29 16:12:31 +08:00
committed by Avi Kivity
parent d1209c548a
commit 8fa35d6ddf

View File

@@ -640,59 +640,6 @@ auto send_message_timeout(messaging_service* ms, messaging_verb verb, msg_addr i
});
}
template <typename MsgIn, typename... MsgOut>
auto send_message_timeout_and_retry(messaging_service* ms, messaging_verb verb, msg_addr id,
std::chrono::seconds timeout, int nr_retry, std::chrono::seconds wait, MsgOut... msg) {
using MsgInTuple = typename futurize_t<MsgIn>::value_type;
return do_with(int(nr_retry), std::move(msg)..., [ms, verb, id, timeout, wait, nr_retry] (auto& retry, const auto&... messages) {
return repeat_until_value([ms, verb, id, timeout, wait, nr_retry, &retry, &messages...] {
return send_message_timeout<MsgIn>(ms, verb, id, timeout, messages...).then_wrapped(
[ms, verb, id, timeout, wait, nr_retry, &retry] (auto&& f) mutable {
auto vb = int(verb);
try {
MsgInTuple ret = f.get();
if (retry != nr_retry) {
mlogger.info("Retry verb={} to {}, retry={}: OK", vb, id, retry);
}
return make_ready_future<stdx::optional<MsgInTuple>>(std::move(ret));
} catch (rpc::timeout_error&) {
mlogger.info("Retry verb={} to {}, retry={}: timeout in {} seconds", vb, id, retry, timeout.count());
throw;
} catch (rpc::closed_error&) {
mlogger.info("Retry verb={} to {}, retry={}: {}", vb, id, retry, std::current_exception());
// Stop retrying if retry reaches 0 or message service is shutdown
// or the remote node is removed from gossip (on_remove())
retry--;
if (retry == 0) {
mlogger.debug("Retry verb={} to {}, retry={}: stop retrying: retry == 0", vb, id, retry);
throw;
}
if (ms->is_stopping()) {
mlogger.debug("Retry verb={} to {}, retry={}: stop retrying: messaging_service is stopped",
vb, id, retry);
throw;
}
if (!gms::get_local_gossiper().is_known_endpoint(id.addr)) {
mlogger.debug("Retry verb={} to {}, retry={}: stop retrying: node is removed from the cluster",
vb, id, retry);
throw;
}
return sleep_abortable(wait).then([] {
return make_ready_future<stdx::optional<MsgInTuple>>(stdx::nullopt);
}).handle_exception([vb, id, retry] (std::exception_ptr ep) {
mlogger.debug("Retry verb={} to {}, retry={}: stop retrying: {}", vb, id, retry, ep);
return make_exception_future<stdx::optional<MsgInTuple>>(ep);
});
} catch (...) {
throw;
}
});
}).then([ms = ms->shared_from_this()] (MsgInTuple result) {
return futurize<MsgIn>::from_tuple(std::move(result));
});
});
}
// Send one way message for verb
template <typename... MsgOut>
auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
@@ -707,13 +654,6 @@ auto send_message_oneway_timeout(messaging_service* ms, Timeout timeout, messagi
// Wrappers for verbs
// Retransmission parameters for streaming verbs.
// A stream plan gives up retrying in 10*30 + 10*60 seconds (15 minutes) at
// most, 10*30 seconds (5 minutes) at least.
static constexpr int streaming_nr_retry = 10;
static constexpr std::chrono::seconds streaming_timeout{10*60};
static constexpr std::chrono::seconds streaming_wait_before_retry{30};
// PREPARE_MESSAGE
void messaging_service::register_prepare_message(std::function<future<streaming::prepare_message> (const rpc::client_info& cinfo,
streaming::prepare_message msg, UUID plan_id, sstring description)>&& func) {
@@ -721,8 +661,7 @@ void messaging_service::register_prepare_message(std::function<future<streaming:
}
future<streaming::prepare_message> messaging_service::send_prepare_message(msg_addr id, streaming::prepare_message msg, UUID plan_id,
sstring description) {
return send_message_timeout_and_retry<streaming::prepare_message>(this, messaging_verb::PREPARE_MESSAGE, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
return send_message<streaming::prepare_message>(this, messaging_verb::PREPARE_MESSAGE, id,
std::move(msg), plan_id, std::move(description));
}
@@ -731,8 +670,7 @@ void messaging_service::register_prepare_done_message(std::function<future<> (co
register_handler(this, messaging_verb::PREPARE_DONE_MESSAGE, std::move(func));
}
future<> messaging_service::send_prepare_done_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id) {
return send_message_timeout_and_retry<void>(this, messaging_verb::PREPARE_DONE_MESSAGE, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
return send_message<void>(this, messaging_verb::PREPARE_DONE_MESSAGE, id,
plan_id, dst_cpu_id);
}
@@ -741,8 +679,7 @@ void messaging_service::register_stream_mutation(std::function<future<> (const r
register_handler(this, messaging_verb::STREAM_MUTATION, std::move(func));
}
future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented) {
return send_message_timeout_and_retry<void>(this, messaging_verb::STREAM_MUTATION, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
return send_message<void>(this, messaging_verb::STREAM_MUTATION, id,
plan_id, std::move(fm), dst_cpu_id, fragmented);
}
@@ -757,8 +694,7 @@ void messaging_service::register_stream_mutation_done(std::function<future<> (co
});
}
future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id) {
return send_message_timeout_and_retry<void>(this, messaging_verb::STREAM_MUTATION_DONE, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
return send_message<void>(this, messaging_verb::STREAM_MUTATION_DONE, id,
plan_id, std::move(ranges), cf_id, dst_cpu_id);
}
@@ -767,8 +703,8 @@ void messaging_service::register_complete_message(std::function<future<> (const
register_handler(this, messaging_verb::COMPLETE_MESSAGE, std::move(func));
}
future<> messaging_service::send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id, bool failed) {
return send_message_timeout<void>(this, messaging_verb::COMPLETE_MESSAGE, id,
streaming_timeout, plan_id, dst_cpu_id, failed);
return send_message<void>(this, messaging_verb::COMPLETE_MESSAGE, id,
plan_id, dst_cpu_id, failed);
}
void messaging_service::register_gossip_echo(std::function<future<> ()>&& func) {