From eb520e7352e750c0fc74761cee3d88072aa6f1ae Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 5 Jan 2017 13:42:53 +0200 Subject: [PATCH] storage_proxy: fix result ordering for parallel partition range scans During a range scan, we try to avoid sorting according to partition range when we can do so. This is when we scan fewer than smp::count shards -- each shard's range is strictly ordered with respect to the others. However, we use the wrong key for the sort -- we use the shard number. But if we started at shard s > 0 and wrapped around to shard 0, then shard 0's range will be after the range belonging to shard s, but will sort before it. Fix by storing the iteration order as the sort key. We use that when we know that shards do not overlap (shards < smp::count) and the index within the source partition range vector when they do. Fixes #1998. Message-Id: <20170105114253.17492-1-avi@scylladb.com> --- service/storage_proxy.cc | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index b1625f9253..7ad6ed55ed 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3784,6 +3784,11 @@ struct hash { namespace service { +struct partition_range_and_sort_key { + query::partition_range pr; + unsigned sort_key_shard_order; // for the same source partition range, we sort in shard order +}; + future>> storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr cmd, const dht::partition_range_vector& prs, tracing::trace_state_ptr trace_state, uint64_t max_size) { @@ -3795,7 +3800,7 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr{}, + std::unordered_map{}, mutation_result_merger{s, cmd}, dht::ring_position_range_vector_sharder{prs}, global_schema_ptr(s), @@ -3806,7 +3811,7 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr& shards_for_this_iteration, + std::unordered_map& shards_for_this_iteration, mutation_result_merger& mrm, dht::ring_position_range_vector_sharder& rprs, global_schema_ptr& gs, @@ -3832,37 +3837,46 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptrelement, now->shard}, now->ring_range); + auto&& rng_ok = shards_for_this_iteration.emplace(element_and_shard{now->element, now->shard}, partition_range_and_sort_key{now->ring_range, i}); if (!rng_ok.second) { // We saw this shard already, enlarge the range (we know now->ring_range came from the same partition range; // otherwise it would have had a unique now->element). - auto& rng = rng_ok.first->second; + auto& rng = rng_ok.first->second.pr; rng = nonwrapping_range(std::move(rng.start()), std::move(now->ring_range.end())); + // This range is no longer ordered with respect to the others, so: retain_shard_order = false; } } auto key_base = mutation_result_merger_key; // prepare for next iteration - mutation_result_merger_key += smp::count * partition_range_count; // worst case, each element queried on each shard + // Each iteration uses a merger key that is either i in the loop above (so in the range [0, shards_in_parallel), + // or, the element index in prs (so in the range [0, partition_range_count). Make room for sufficient keys. + mutation_result_merger_key += std::max(shards_in_parallel, partition_range_count); shards_in_parallel *= 2; shard_cmd->partition_limit = cmd->partition_limit - mrm.partition_count(); shard_cmd->row_limit = cmd->row_limit - mrm.row_count(); - return parallel_for_each(shards_for_this_iteration, [&, key_base, retain_shard_order] (const std::pair& elem_shard_range) { + return parallel_for_each(shards_for_this_iteration, [&, key_base, retain_shard_order] (const std::pair& elem_shard_range) { auto&& elem = elem_shard_range.first.element; auto&& shard = elem_shard_range.first.shard; - auto&& range = elem_shard_range.second; + auto&& range = elem_shard_range.second.pr; + auto sort_key_shard_order = elem_shard_range.second.sort_key_shard_order; return _db.invoke_on(shard, [&, range, gt, fstate = mrm.memory().state_for_another_shard()] (database& db) { query::result_memory_accounter accounter(db.get_result_memory_limiter(), std::move(fstate)); return db.query_mutations(gs, *shard_cmd, range, std::move(accounter), std::move(gt)).then([] (reconcilable_result&& rr) { return make_foreign(make_lw_shared(std::move(rr))); }); - }).then([&, key_base, retain_shard_order, elem, shard] (foreign_ptr> partial_result) { - auto key = key_base + elem * smp::count; + }).then([&, key_base, retain_shard_order, elem, sort_key_shard_order] (foreign_ptr> partial_result) { + // Each outer (sequential) iteration is in result order, so we pick increasing keys. + // Within the inner (parallel) iteration, the results can be in order (if retain_shard_order), or not (if !retain_shard_order). + // If the results are unordered, we still have to order them according to which element of prs they originated from. + auto key = key_base; // for outer loop if (retain_shard_order) { - key += shard; + key += sort_key_shard_order; // inner loop is ordered + } else { + key += elem; // inner loop ordered only by position within prs } mrm.add_result(key, std::move(partial_result)); });