mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
storage_proxy: fix result ordering for parallel partition range scans
During a range scan, we try to avoid sorting according to partition range when we can do so. This is when we scan fewer than smp::count shards -- each shard's range is strictly ordered with respect to the others. However, we use the wrong key for the sort -- we use the shard number. But if we started at shard s > 0 and wrapped around to shard 0, then shard 0's range will be after the range belonging to shard s, but will sort before it. Fix by storing the iteration order as the sort key. We use that when we know that shards do not overlap (shards < smp::count) and the index within the source partition range vector when they do. Fixes #1998. Message-Id: <20170105114253.17492-1-avi@scylladb.com>
This commit is contained in:
committed by
Tomasz Grabiec
parent
8014adc2a1
commit
eb520e7352
@@ -3784,6 +3784,11 @@ struct hash<element_and_shard> {
|
||||
|
||||
namespace service {
|
||||
|
||||
struct partition_range_and_sort_key {
|
||||
query::partition_range pr;
|
||||
unsigned sort_key_shard_order; // for the same source partition range, we sort in shard order
|
||||
};
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
||||
storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range_vector& prs,
|
||||
tracing::trace_state_ptr trace_state, uint64_t max_size) {
|
||||
@@ -3795,7 +3800,7 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<q
|
||||
0u,
|
||||
false,
|
||||
prs.size(),
|
||||
std::unordered_map<element_and_shard, query::partition_range>{},
|
||||
std::unordered_map<element_and_shard, partition_range_and_sort_key>{},
|
||||
mutation_result_merger{s, cmd},
|
||||
dht::ring_position_range_vector_sharder{prs},
|
||||
global_schema_ptr(s),
|
||||
@@ -3806,7 +3811,7 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<q
|
||||
unsigned& mutation_result_merger_key,
|
||||
bool& no_more_ranges,
|
||||
unsigned partition_range_count,
|
||||
std::unordered_map<element_and_shard, query::partition_range>& shards_for_this_iteration,
|
||||
std::unordered_map<element_and_shard, partition_range_and_sort_key>& shards_for_this_iteration,
|
||||
mutation_result_merger& mrm,
|
||||
dht::ring_position_range_vector_sharder& rprs,
|
||||
global_schema_ptr& gs,
|
||||
@@ -3832,37 +3837,46 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<q
|
||||
break;
|
||||
}
|
||||
// Let's see if this is a new shard, or if we can expand an existing range
|
||||
auto&& rng_ok = shards_for_this_iteration.emplace(element_and_shard{now->element, now->shard}, now->ring_range);
|
||||
auto&& rng_ok = shards_for_this_iteration.emplace(element_and_shard{now->element, now->shard}, partition_range_and_sort_key{now->ring_range, i});
|
||||
if (!rng_ok.second) {
|
||||
// We saw this shard already, enlarge the range (we know now->ring_range came from the same partition range;
|
||||
// otherwise it would have had a unique now->element).
|
||||
auto& rng = rng_ok.first->second;
|
||||
auto& rng = rng_ok.first->second.pr;
|
||||
rng = nonwrapping_range<dht::ring_position>(std::move(rng.start()), std::move(now->ring_range.end()));
|
||||
// This range is no longer ordered with respect to the others, so:
|
||||
retain_shard_order = false;
|
||||
}
|
||||
}
|
||||
auto key_base = mutation_result_merger_key;
|
||||
|
||||
// prepare for next iteration
|
||||
mutation_result_merger_key += smp::count * partition_range_count; // worst case, each element queried on each shard
|
||||
// Each iteration uses a merger key that is either i in the loop above (so in the range [0, shards_in_parallel),
|
||||
// or, the element index in prs (so in the range [0, partition_range_count). Make room for sufficient keys.
|
||||
mutation_result_merger_key += std::max(shards_in_parallel, partition_range_count);
|
||||
shards_in_parallel *= 2;
|
||||
|
||||
shard_cmd->partition_limit = cmd->partition_limit - mrm.partition_count();
|
||||
shard_cmd->row_limit = cmd->row_limit - mrm.row_count();
|
||||
|
||||
return parallel_for_each(shards_for_this_iteration, [&, key_base, retain_shard_order] (const std::pair<const element_and_shard, query::partition_range>& elem_shard_range) {
|
||||
return parallel_for_each(shards_for_this_iteration, [&, key_base, retain_shard_order] (const std::pair<const element_and_shard, partition_range_and_sort_key>& elem_shard_range) {
|
||||
auto&& elem = elem_shard_range.first.element;
|
||||
auto&& shard = elem_shard_range.first.shard;
|
||||
auto&& range = elem_shard_range.second;
|
||||
auto&& range = elem_shard_range.second.pr;
|
||||
auto sort_key_shard_order = elem_shard_range.second.sort_key_shard_order;
|
||||
return _db.invoke_on(shard, [&, range, gt, fstate = mrm.memory().state_for_another_shard()] (database& db) {
|
||||
query::result_memory_accounter accounter(db.get_result_memory_limiter(), std::move(fstate));
|
||||
return db.query_mutations(gs, *shard_cmd, range, std::move(accounter), std::move(gt)).then([] (reconcilable_result&& rr) {
|
||||
return make_foreign(make_lw_shared(std::move(rr)));
|
||||
});
|
||||
}).then([&, key_base, retain_shard_order, elem, shard] (foreign_ptr<lw_shared_ptr<reconcilable_result>> partial_result) {
|
||||
auto key = key_base + elem * smp::count;
|
||||
}).then([&, key_base, retain_shard_order, elem, sort_key_shard_order] (foreign_ptr<lw_shared_ptr<reconcilable_result>> partial_result) {
|
||||
// Each outer (sequential) iteration is in result order, so we pick increasing keys.
|
||||
// Within the inner (parallel) iteration, the results can be in order (if retain_shard_order), or not (if !retain_shard_order).
|
||||
// If the results are unordered, we still have to order them according to which element of prs they originated from.
|
||||
auto key = key_base; // for outer loop
|
||||
if (retain_shard_order) {
|
||||
key += shard;
|
||||
key += sort_key_shard_order; // inner loop is ordered
|
||||
} else {
|
||||
key += elem; // inner loop ordered only by position within prs
|
||||
}
|
||||
mrm.add_result(key, std::move(partial_result));
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user