diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index d111621515..2b9e3e3dea 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -64,6 +64,7 @@ #include #include #include +#include #include #include #include @@ -1654,14 +1655,22 @@ future<> storage_proxy::send_to_endpoint( // View updates use consistency level ANY in order to fall back to hinted handoff in case of a failed update db::consistency_level cl = (type == db::write_type::VIEW) ? db::consistency_level::ANY : db::consistency_level::ONE; - std::unordered_set targets(pending_endpoints.begin(), pending_endpoints.end()); - targets.insert(std::move(target)); return mutate_prepare(std::array{std::move(m)}, cl, type, - [this, targets = std::move(targets), pending_endpoints = std::move(pending_endpoints), &stats] ( + [this, target = std::array{target}, pending_endpoints = std::move(pending_endpoints), &stats] ( const mutation& m, db::consistency_level cl, db::write_type type) mutable { + std::unordered_set targets; + targets.reserve(pending_endpoints.size() + 1); + std::vector dead_endpoints; + boost::algorithm::partition_copy( + boost::range::join(pending_endpoints, target), + std::inserter(targets, targets.begin()), + std::back_inserter(dead_endpoints), + [] (gms::inet_address ep) { return gms::get_local_failure_detector().is_alive(ep); }); auto& ks = _db.local().find_keyspace(m.schema()->ks_name()); + slogger.trace("Creating write handler with live: {}; dead: {}", targets, dead_endpoints); + db::assure_sufficient_live_nodes(cl, ks, targets, pending_endpoints); return create_write_response_handler( ks, cl, @@ -1669,7 +1678,7 @@ future<> storage_proxy::send_to_endpoint( std::make_unique(m), std::move(targets), pending_endpoints, - { }, + std::move(dead_endpoints), nullptr, stats); }).then([this, &stats, cl] (std::vector ids) {