Consider preferred replicas when choosing endpoints for query_singular()

Propagate the preferred_replicas to db::filter_for_query() and consider
them when selecting the endpoints. The algoritm for selecting the
endpoints is as follows:
* Compute the intersection of the endpoint candidates and the
preferred endpoints.
* If this yields a set of endpoints that already satisfies the CL
requirements use this set.
* Otherwise select the remaining endpoints according to the
load-balancing strategy, just like before.
This commit is contained in:
Botond Dénes
2017-12-21 10:25:10 +02:00
parent eac597d726
commit aaf67bcbaa
4 changed files with 70 additions and 11 deletions

View File

@@ -157,7 +157,10 @@ std::vector<gms::inet_address>
filter_for_query(consistency_level cl,
keyspace& ks,
std::vector<gms::inet_address> live_endpoints,
read_repair_decision read_repair, gms::inet_address* extra, column_family* cf) {
const std::vector<gms::inet_address>& preferred_endpoints,
read_repair_decision read_repair,
gms::inet_address* extra,
column_family* cf) {
size_t local_count;
if (read_repair == read_repair_decision::GLOBAL) { // take RRD.GLOBAL out of the way
@@ -182,6 +185,30 @@ filter_for_query(consistency_level cl,
return std::move(live_endpoints);
}
std::vector<gms::inet_address> selected_endpoints;
// Pre-select endpoints based on client preference. If the endpoints
// selected this way aren't enough to satisfy CL requirements select the
// remaining ones according to the load-balancing strategy as before.
if (!preferred_endpoints.empty()) {
const auto it = boost::stable_partition(live_endpoints, [&preferred_endpoints] (const gms::inet_address& a) {
return std::find(preferred_endpoints.cbegin(), preferred_endpoints.cend(), a) == preferred_endpoints.end();
});
const size_t selected = std::distance(it, live_endpoints.end());
if (selected >= bf) {
if (extra) {
*extra = selected == bf ? live_endpoints.front() : *(it + bf);
}
return std::vector<gms::inet_address>(it, it + bf);
} else if (selected) {
selected_endpoints.reserve(bf);
std::move(it, live_endpoints.end(), std::back_inserter(selected_endpoints));
live_endpoints.erase(it, live_endpoints.end());
}
}
const auto remaining_bf = bf - selected_endpoints.size();
if (cf) {
auto get_hit_rate = [cf] (gms::inet_address ep) -> float {
constexpr float max_hit_rate = 0.999;
@@ -213,21 +240,21 @@ filter_for_query(consistency_level cl,
if (!old_node && ht_max - ht_min > 0.01) { // if there is old node or hit rates are close skip calculations
// local node is always first if present (see storage_proxy::get_live_sorted_endpoints)
unsigned local_idx = epi[0].first == utils::fb_utilities::get_broadcast_address() ? 0 : epi.size() + 1;
live_endpoints = miss_equalizing_combination(epi, local_idx, bf, bool(extra));
live_endpoints = miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra));
}
}
if (extra) {
*extra = live_endpoints[bf]; // extra replica for speculation
*extra = live_endpoints[remaining_bf]; // extra replica for speculation
}
live_endpoints.erase(live_endpoints.begin() + bf, live_endpoints.end());
std::move(live_endpoints.begin(), live_endpoints.begin() + remaining_bf, std::back_inserter(selected_endpoints));
return std::move(live_endpoints);
return selected_endpoints;
}
std::vector<gms::inet_address> filter_for_query(consistency_level cl, keyspace& ks, std::vector<gms::inet_address>& live_endpoints, column_family* cf) {
return filter_for_query(cl, ks, live_endpoints, read_repair_decision::NONE, nullptr, cf);
return filter_for_query(cl, ks, live_endpoints, {}, read_repair_decision::NONE, nullptr, cf);
}
bool

View File

@@ -79,7 +79,10 @@ std::vector<gms::inet_address>
filter_for_query(consistency_level cl,
keyspace& ks,
std::vector<gms::inet_address> live_endpoints,
read_repair_decision read_repair, gms::inet_address* extra, column_family* cf);
const std::vector<gms::inet_address>& preferred_endpoints,
read_repair_decision read_repair,
gms::inet_address* extra,
column_family* cf);
std::vector<gms::inet_address> filter_for_query(consistency_level cl, keyspace& ks, std::vector<gms::inet_address>& live_endpoints, column_family* cf);

View File

@@ -402,6 +402,22 @@ public:
}
};
static std::vector<gms::inet_address>
replica_ids_to_endpoints(const std::vector<utils::UUID>& replica_ids) {
const auto& tm = get_local_storage_service().get_token_metadata();
std::vector<gms::inet_address> endpoints;
endpoints.reserve(replica_ids.size());
for (const auto& replica_id : replica_ids) {
if (auto endpoint_opt = tm.get_endpoint_for_host_id(replica_id)) {
endpoints.push_back(*endpoint_opt);
}
}
return endpoints;
}
bool storage_proxy::need_throttle_writes() const {
return _stats.background_write_bytes > memory::stats().total_memory() / 10 || _stats.queued_write_bytes > 6*1024*1024;
}
@@ -2845,7 +2861,11 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
return db::read_repair_decision::NONE;
}
::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, tracing::trace_state_ptr trace_state) {
::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_ptr<query::read_command> cmd,
dht::partition_range pr,
db::consistency_level cl,
tracing::trace_state_ptr trace_state,
const std::vector<gms::inet_address>& preferred_endpoints) {
const dht::token& token = pr.start()->value().token();
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
@@ -2855,7 +2875,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
std::vector<gms::inet_address> all_replicas = get_live_sorted_endpoints(ks, token);
db::read_repair_decision repair_decision = new_read_repair_decision(*schema);
auto cf = _db.local().find_column_family(schema).shared_from_this();
std::vector<gms::inet_address> target_replicas = db::filter_for_query(cl, ks, all_replicas, repair_decision,
std::vector<gms::inet_address> target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision,
retry_type == speculative_retry::type::NONE ? nullptr : &extra_replica,
_db.local().get_config().cache_hit_rate_read_balancing() ? &*cf : nullptr);
@@ -2969,7 +2989,12 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
if (!pr.is_singular()) {
throw std::runtime_error("mixed singular and non singular range are not supported");
}
exec.push_back(get_read_executor(cmd, std::move(pr), cl, trace_state));
auto token_range = nonwrapping_range<dht::token>::make_singular(pr.start()->value().token());
auto it = preferred_replicas.find(token_range);
const auto replicas = it == preferred_replicas.end() ? std::vector<gms::inet_address>{} : replica_ids_to_endpoints(it->second);
exec.push_back(get_read_executor(cmd, std::move(pr), cl, trace_state, replicas));
}
query::result_merger merger(cmd->row_limit, cmd->partition_limit);

View File

@@ -254,7 +254,11 @@ private:
std::vector<gms::inet_address> get_live_endpoints(keyspace& ks, const dht::token& token);
std::vector<gms::inet_address> get_live_sorted_endpoints(keyspace& ks, const dht::token& token);
db::read_repair_decision new_read_repair_decision(const schema& s);
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd, dht::partition_range pr, db::consistency_level cl, tracing::trace_state_ptr trace_state);
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd,
dht::partition_range pr,
db::consistency_level cl,
tracing::trace_state_ptr trace_state,
const std::vector<gms::inet_address>& preferred_endpoints);
future<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
query::result_options opts,
tracing::trace_state_ptr trace_state,