diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 24dddb8e4a..eb18cf68c8 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2141,16 +2141,23 @@ gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation& throw exceptions::unavailable_exception(cl, block_for(ks, cl), 0); } - auto local_endpoints = boost::copy_range>(live_endpoints | boost::adaptors::filtered([&] (auto&& ep) { + const auto my_address = utils::fb_utilities::get_broadcast_address(); + // Early return if coordinator can become the leader (so one extra internode message can be + // avoided). With token-aware drivers this is the expected case, so we are doing it ASAP. + if (boost::algorithm::any_of_equal(live_endpoints, my_address)) { + return my_address; + } + + const auto local_endpoints = boost::copy_range>(live_endpoints | boost::adaptors::filtered([&] (auto&& ep) { return db::is_local(ep); })); + if (local_endpoints.empty()) { // FIXME: O(n log n) to get maximum auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); - snitch->sort_by_proximity(utils::fb_utilities::get_broadcast_address(), live_endpoints); + snitch->sort_by_proximity(my_address, live_endpoints); return live_endpoints[0]; } else { - // FIXME: favour ourselves to avoid additional hop? static thread_local std::default_random_engine re{std::random_device{}()}; std::uniform_int_distribution<> dist(0, local_endpoints.size() - 1); return local_endpoints[dist(re)]; @@ -2206,6 +2213,10 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level return std::move(m.fm); })); + // Coordinator is preferred as the leader - if it's not selected we can assume + // that the query was non-token-aware and bump relevant metric. + get_stats().writes_coordinator_outside_replica_set += fms.size(); + auto msg_addr = netw::messaging_service::msg_addr{ endpoint_and_mutations.first, 0 }; tracing::trace(tr_state, "Enqueuing counter update to {}", msg_addr); f = _messaging.send_counter_mutation(msg_addr, timeout, std::move(fms), cl, tracing::make_trace_info(tr_state));