From 6486d6c8bd29bd28fa41dff2afb77df9296bb9bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 18 Apr 2018 16:23:16 +0300 Subject: [PATCH] storage_proxy: use preferred/last replicas --- service/storage_proxy.cc | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index e098f3ba70..285cbc9eb0 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3175,12 +3175,20 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t auto p = shared_from_this(); auto& cf= _db.local().find_column_family(schema); auto pcf = _db.local().get_config().cache_hit_rate_read_balancing() ? &cf : nullptr; + std::unordered_map> ranges_per_exec; + const auto preferred_replicas_for_range = [&preferred_replicas] (const dht::partition_range& r) { + auto it = preferred_replicas.find(r.transform(std::mem_fn(&dht::ring_position::token))); + return it == preferred_replicas.end() ? std::vector{} : replica_ids_to_endpoints(it->second); + }; + const auto to_token_range = [] (const dht::partition_range& r) { return r.transform(std::mem_fn(&dht::ring_position::token)); }; while (i != ranges.end() && std::distance(concurrent_fetch_starting_index, i) < concurrency_factor) { dht::partition_range& range = *i; std::vector live_endpoints = get_live_sorted_endpoints(ks, end_token(range)); - std::vector filtered_endpoints = filter_for_query(cl, ks, live_endpoints, {}, pcf); + std::vector merged_preferred_replicas = preferred_replicas_for_range(*i); + std::vector filtered_endpoints = filter_for_query(cl, ks, live_endpoints, merged_preferred_replicas, pcf); + std::vector merged_ranges{to_token_range(range)}; ++i; // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take @@ -3188,9 +3196,10 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand. while (i != ranges.end()) { + const auto current_range_preferred_replicas = preferred_replicas_for_range(*i); dht::partition_range& next_range = *i; std::vector next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range)); - std::vector next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, {}, pcf); + std::vector next_filtered_endpoints = filter_for_query(cl, ks, next_endpoints, current_range_preferred_replicas, pcf); // Origin has this to say here: // * If the current range right is the min token, we should stop merging because CFS.getRangeSlice @@ -3204,13 +3213,14 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t } std::vector merged = intersection(live_endpoints, next_endpoints); + std::vector current_merged_preferred_replicas = intersection(merged_preferred_replicas, current_range_preferred_replicas); // Check if there is enough endpoint for the merge to be possible. if (!is_sufficient_live_nodes(cl, ks, merged)) { break; } - std::vector filtered_merged = filter_for_query(cl, ks, merged, {}, pcf); + std::vector filtered_merged = filter_for_query(cl, ks, merged, current_merged_preferred_replicas, pcf); // Estimate whether merging will be a win or not if (!locator::i_endpoint_snitch::get_local_snitch_ptr()->is_worth_merging_for_range_query(filtered_merged, filtered_endpoints, next_filtered_endpoints)) { @@ -3239,8 +3249,10 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t // If we get there, merge this range and the next one range = dht::partition_range(range.start(), next_range.end()); live_endpoints = std::move(merged); + merged_preferred_replicas = std::move(current_merged_preferred_replicas); filtered_endpoints = std::move(filtered_merged); ++i; + merged_ranges.push_back(to_token_range(next_range)); } slogger.trace("creating range read executor with targets {}", filtered_endpoints); try { @@ -3252,6 +3264,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t } exec.push_back(::make_shared(schema, cf.shared_from_this(), p, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state)); + ranges_per_exec.emplace(exec.back().get(), std::move(merged_ranges)); } query::result_merger merger(cmd->row_limit, cmd->partition_limit); @@ -3273,14 +3286,26 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t remaining_row_count, remaining_partition_count, trace_state = std::move(trace_state), - preferred_replicas = std::move(preferred_replicas)] (foreign_ptr>&& result) mutable { + preferred_replicas = std::move(preferred_replicas), + ranges_per_exec = std::move(ranges_per_exec)] (foreign_ptr>&& result) mutable { result->ensure_counts(); remaining_row_count -= result->row_count().value(); remaining_partition_count -= result->partition_count().value(); results.emplace_back(std::move(result)); if (i == ranges.end() || !remaining_row_count || !remaining_partition_count) { - return make_ready_future>>, replicas_per_token_range>(std::move(results), - replicas_per_token_range{}); + auto used_replicas = replicas_per_token_range(); + for (auto& e : exec) { + // We add used replicas in separate per-vnode entries even if + // they were merged, for two reasons: + // 1) The list of replicas is determined for each vnode + // separately and thus this makes lookups more convenient. + // 2) On the next page the ranges might not be merged. + auto replica_ids = endpoints_to_replica_ids(e->used_targets()); + for (auto& r : ranges_per_exec[e.get()]) { + used_replicas.emplace(std::move(r), replica_ids); + } + } + return make_ready_future>>, replicas_per_token_range>(std::move(results), std::move(used_replicas)); } else { cmd->row_limit = remaining_row_count; cmd->partition_limit = remaining_partition_count;