service/storage_proxy: Consider target liveness in sent_to_endpoint()

So we don't attempt to send mutations to unreachable endpoints and
instead store a hint for them, we now check the endpoint status and
populate dead_endpoints accordingly in
storage_proxy::send_to_endpoint().

Fixes #3820

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
Message-Id: <20181007100640.2182-1-duarte@scylladb.com>
(cherry picked from commit 30d6ed8f92)
This commit is contained in:
Duarte Nunes
2018-10-07 11:06:40 +01:00
parent 53924e5c7f
commit b0a9c40ab1

View File

@@ -64,6 +64,7 @@
#include <boost/range/adaptors.hpp>
#include <boost/algorithm/cxx11/any_of.hpp>
#include <boost/algorithm/cxx11/none_of.hpp>
#include <boost/algorithm/cxx11/partition_copy.hpp>
#include <boost/range/algorithm/count_if.hpp>
#include <boost/range/algorithm/find.hpp>
#include <boost/range/algorithm/find_if.hpp>
@@ -1654,14 +1655,22 @@ future<> storage_proxy::send_to_endpoint(
// View updates use consistency level ANY in order to fall back to hinted handoff in case of a failed update
db::consistency_level cl = (type == db::write_type::VIEW) ? db::consistency_level::ANY : db::consistency_level::ONE;
std::unordered_set<gms::inet_address> targets(pending_endpoints.begin(), pending_endpoints.end());
targets.insert(std::move(target));
return mutate_prepare(std::array<mutation, 1>{std::move(m)}, cl, type,
[this, targets = std::move(targets), pending_endpoints = std::move(pending_endpoints), &stats] (
[this, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] (
const mutation& m,
db::consistency_level cl,
db::write_type type) mutable {
std::unordered_set<gms::inet_address> targets;
targets.reserve(pending_endpoints.size() + 1);
std::vector<gms::inet_address> dead_endpoints;
boost::algorithm::partition_copy(
boost::range::join(pending_endpoints, target),
std::inserter(targets, targets.begin()),
std::back_inserter(dead_endpoints),
[] (gms::inet_address ep) { return gms::get_local_failure_detector().is_alive(ep); });
auto& ks = _db.local().find_keyspace(m.schema()->ks_name());
slogger.trace("Creating write handler with live: {}; dead: {}", targets, dead_endpoints);
db::assure_sufficient_live_nodes(cl, ks, targets, pending_endpoints);
return create_write_response_handler(
ks,
cl,
@@ -1669,7 +1678,7 @@ future<> storage_proxy::send_to_endpoint(
std::make_unique<shared_mutation>(m),
std::move(targets),
pending_endpoints,
{ },
std::move(dead_endpoints),
nullptr,
stats);
}).then([this, &stats, cl] (std::vector<unique_response_handler> ids) {