diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index b1625f9253..7ad6ed55ed 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3784,6 +3784,11 @@ struct hash { namespace service { +struct partition_range_and_sort_key { + query::partition_range pr; + unsigned sort_key_shard_order; // for the same source partition range, we sort in shard order +}; + future>> storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr cmd, const dht::partition_range_vector& prs, tracing::trace_state_ptr trace_state, uint64_t max_size) { @@ -3795,7 +3800,7 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr{}, + std::unordered_map{}, mutation_result_merger{s, cmd}, dht::ring_position_range_vector_sharder{prs}, global_schema_ptr(s), @@ -3806,7 +3811,7 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr& shards_for_this_iteration, + std::unordered_map& shards_for_this_iteration, mutation_result_merger& mrm, dht::ring_position_range_vector_sharder& rprs, global_schema_ptr& gs, @@ -3832,37 +3837,46 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptrelement, now->shard}, now->ring_range); + auto&& rng_ok = shards_for_this_iteration.emplace(element_and_shard{now->element, now->shard}, partition_range_and_sort_key{now->ring_range, i}); if (!rng_ok.second) { // We saw this shard already, enlarge the range (we know now->ring_range came from the same partition range; // otherwise it would have had a unique now->element). - auto& rng = rng_ok.first->second; + auto& rng = rng_ok.first->second.pr; rng = nonwrapping_range(std::move(rng.start()), std::move(now->ring_range.end())); + // This range is no longer ordered with respect to the others, so: retain_shard_order = false; } } auto key_base = mutation_result_merger_key; // prepare for next iteration - mutation_result_merger_key += smp::count * partition_range_count; // worst case, each element queried on each shard + // Each iteration uses a merger key that is either i in the loop above (so in the range [0, shards_in_parallel), + // or, the element index in prs (so in the range [0, partition_range_count). Make room for sufficient keys. + mutation_result_merger_key += std::max(shards_in_parallel, partition_range_count); shards_in_parallel *= 2; shard_cmd->partition_limit = cmd->partition_limit - mrm.partition_count(); shard_cmd->row_limit = cmd->row_limit - mrm.row_count(); - return parallel_for_each(shards_for_this_iteration, [&, key_base, retain_shard_order] (const std::pair& elem_shard_range) { + return parallel_for_each(shards_for_this_iteration, [&, key_base, retain_shard_order] (const std::pair& elem_shard_range) { auto&& elem = elem_shard_range.first.element; auto&& shard = elem_shard_range.first.shard; - auto&& range = elem_shard_range.second; + auto&& range = elem_shard_range.second.pr; + auto sort_key_shard_order = elem_shard_range.second.sort_key_shard_order; return _db.invoke_on(shard, [&, range, gt, fstate = mrm.memory().state_for_another_shard()] (database& db) { query::result_memory_accounter accounter(db.get_result_memory_limiter(), std::move(fstate)); return db.query_mutations(gs, *shard_cmd, range, std::move(accounter), std::move(gt)).then([] (reconcilable_result&& rr) { return make_foreign(make_lw_shared(std::move(rr))); }); - }).then([&, key_base, retain_shard_order, elem, shard] (foreign_ptr> partial_result) { - auto key = key_base + elem * smp::count; + }).then([&, key_base, retain_shard_order, elem, sort_key_shard_order] (foreign_ptr> partial_result) { + // Each outer (sequential) iteration is in result order, so we pick increasing keys. + // Within the inner (parallel) iteration, the results can be in order (if retain_shard_order), or not (if !retain_shard_order). + // If the results are unordered, we still have to order them according to which element of prs they originated from. + auto key = key_base; // for outer loop if (retain_shard_order) { - key += shard; + key += sort_key_shard_order; // inner loop is ordered + } else { + key += elem; // inner loop ordered only by position within prs } mrm.add_result(key, std::move(partial_result)); });