From aaf67bcbaa73578cc72db2197ce7f49941b7da72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 21 Dec 2017 10:25:10 +0200 Subject: [PATCH] Consider preferred replicas when choosing endpoints for query_singular() Propagate the preferred_replicas to db::filter_for_query() and consider them when selecting the endpoints. The algoritm for selecting the endpoints is as follows: * Compute the intersection of the endpoint candidates and the preferred endpoints. * If this yields a set of endpoints that already satisfies the CL requirements use this set. * Otherwise select the remaining endpoints according to the load-balancing strategy, just like before. --- db/consistency_level.cc | 39 +++++++++++++++++++++++++++++++++------ db/consistency_level.hh | 5 ++++- service/storage_proxy.cc | 31 ++++++++++++++++++++++++++++--- service/storage_proxy.hh | 6 +++++- 4 files changed, 70 insertions(+), 11 deletions(-) diff --git a/db/consistency_level.cc b/db/consistency_level.cc index fd46d8ea23..b95ac3dc0f 100644 --- a/db/consistency_level.cc +++ b/db/consistency_level.cc @@ -157,7 +157,10 @@ std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector live_endpoints, - read_repair_decision read_repair, gms::inet_address* extra, column_family* cf) { + const std::vector& preferred_endpoints, + read_repair_decision read_repair, + gms::inet_address* extra, + column_family* cf) { size_t local_count; if (read_repair == read_repair_decision::GLOBAL) { // take RRD.GLOBAL out of the way @@ -182,6 +185,30 @@ filter_for_query(consistency_level cl, return std::move(live_endpoints); } + std::vector selected_endpoints; + + // Pre-select endpoints based on client preference. If the endpoints + // selected this way aren't enough to satisfy CL requirements select the + // remaining ones according to the load-balancing strategy as before. + if (!preferred_endpoints.empty()) { + const auto it = boost::stable_partition(live_endpoints, [&preferred_endpoints] (const gms::inet_address& a) { + return std::find(preferred_endpoints.cbegin(), preferred_endpoints.cend(), a) == preferred_endpoints.end(); + }); + const size_t selected = std::distance(it, live_endpoints.end()); + if (selected >= bf) { + if (extra) { + *extra = selected == bf ? live_endpoints.front() : *(it + bf); + } + return std::vector(it, it + bf); + } else if (selected) { + selected_endpoints.reserve(bf); + std::move(it, live_endpoints.end(), std::back_inserter(selected_endpoints)); + live_endpoints.erase(it, live_endpoints.end()); + } + } + + const auto remaining_bf = bf - selected_endpoints.size(); + if (cf) { auto get_hit_rate = [cf] (gms::inet_address ep) -> float { constexpr float max_hit_rate = 0.999; @@ -213,21 +240,21 @@ filter_for_query(consistency_level cl, if (!old_node && ht_max - ht_min > 0.01) { // if there is old node or hit rates are close skip calculations // local node is always first if present (see storage_proxy::get_live_sorted_endpoints) unsigned local_idx = epi[0].first == utils::fb_utilities::get_broadcast_address() ? 0 : epi.size() + 1; - live_endpoints = miss_equalizing_combination(epi, local_idx, bf, bool(extra)); + live_endpoints = miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra)); } } if (extra) { - *extra = live_endpoints[bf]; // extra replica for speculation + *extra = live_endpoints[remaining_bf]; // extra replica for speculation } - live_endpoints.erase(live_endpoints.begin() + bf, live_endpoints.end()); + std::move(live_endpoints.begin(), live_endpoints.begin() + remaining_bf, std::back_inserter(selected_endpoints)); - return std::move(live_endpoints); + return selected_endpoints; } std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector& live_endpoints, column_family* cf) { - return filter_for_query(cl, ks, live_endpoints, read_repair_decision::NONE, nullptr, cf); + return filter_for_query(cl, ks, live_endpoints, {}, read_repair_decision::NONE, nullptr, cf); } bool diff --git a/db/consistency_level.hh b/db/consistency_level.hh index fedf437ef0..b9ba56b7db 100644 --- a/db/consistency_level.hh +++ b/db/consistency_level.hh @@ -79,7 +79,10 @@ std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector live_endpoints, - read_repair_decision read_repair, gms::inet_address* extra, column_family* cf); + const std::vector& preferred_endpoints, + read_repair_decision read_repair, + gms::inet_address* extra, + column_family* cf); std::vector filter_for_query(consistency_level cl, keyspace& ks, std::vector& live_endpoints, column_family* cf); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index be4f8acd33..26d4915e1a 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -402,6 +402,22 @@ public: } }; +static std::vector +replica_ids_to_endpoints(const std::vector& replica_ids) { + const auto& tm = get_local_storage_service().get_token_metadata(); + + std::vector endpoints; + endpoints.reserve(replica_ids.size()); + + for (const auto& replica_id : replica_ids) { + if (auto endpoint_opt = tm.get_endpoint_for_host_id(replica_id)) { + endpoints.push_back(*endpoint_opt); + } + } + + return endpoints; +} + bool storage_proxy::need_throttle_writes() const { return _stats.background_write_bytes > memory::stats().total_memory() / 10 || _stats.queued_write_bytes > 6*1024*1024; } @@ -2845,7 +2861,11 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s return db::read_repair_decision::NONE; } -::shared_ptr storage_proxy::get_read_executor(lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, tracing::trace_state_ptr trace_state) { +::shared_ptr storage_proxy::get_read_executor(lw_shared_ptr cmd, + dht::partition_range pr, + db::consistency_level cl, + tracing::trace_state_ptr trace_state, + const std::vector& preferred_endpoints) { const dht::token& token = pr.start()->value().token(); schema_ptr schema = local_schema_registry().get(cmd->schema_version); keyspace& ks = _db.local().find_keyspace(schema->ks_name()); @@ -2855,7 +2875,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s std::vector all_replicas = get_live_sorted_endpoints(ks, token); db::read_repair_decision repair_decision = new_read_repair_decision(*schema); auto cf = _db.local().find_column_family(schema).shared_from_this(); - std::vector target_replicas = db::filter_for_query(cl, ks, all_replicas, repair_decision, + std::vector target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision, retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica, _db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr); @@ -2969,7 +2989,12 @@ storage_proxy::query_singular(lw_shared_ptr cmd, if (!pr.is_singular()) { throw std::runtime_error("mixed singular and non singular range are not supported"); } - exec.push_back(get_read_executor(cmd, std::move(pr), cl, trace_state)); + + auto token_range = nonwrapping_range::make_singular(pr.start()->value().token()); + auto it = preferred_replicas.find(token_range); + const auto replicas = it == preferred_replicas.end() ? std::vector{} : replica_ids_to_endpoints(it->second); + + exec.push_back(get_read_executor(cmd, std::move(pr), cl, trace_state, replicas)); } query::result_merger merger(cmd->row_limit, cmd->partition_limit); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index c8c5a695e7..9b6f74eb94 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -254,7 +254,11 @@ private: std::vector get_live_endpoints(keyspace& ks, const dht::token& token); std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s); - ::shared_ptr get_read_executor(lw_shared_ptr cmd, dht::partition_range pr, db::consistency_level cl, tracing::trace_state_ptr trace_state); + ::shared_ptr get_read_executor(lw_shared_ptr cmd, + dht::partition_range pr, + db::consistency_level cl, + tracing::trace_state_ptr trace_state, + const std::vector& preferred_endpoints); future>, cache_temperature> query_result_local(schema_ptr, lw_shared_ptr cmd, const dht::partition_range& pr, query::result_options opts, tracing::trace_state_ptr trace_state,