diff --git a/CMakeLists.txt b/CMakeLists.txt index 161e366875..b07e5a5fd3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -528,6 +528,7 @@ set(scylla_sources partition_version.cc querier.cc query.cc + query_ranges_to_vnodes.cc query-result-set.cc raft/fsm.cc raft/log.cc diff --git a/configure.py b/configure.py index ed7f97a8e7..06ccd5d458 100755 --- a/configure.py +++ b/configure.py @@ -824,6 +824,7 @@ scylla_core = (['replica/database.cc', 'service/priority_manager.cc', 'service/migration_manager.cc', 'service/storage_proxy.cc', + 'query_ranges_to_vnodes.cc', 'service/paxos/proposal.cc', 'service/paxos/prepare_response.cc', 'service/paxos/paxos_state.cc', diff --git a/cql3/statements/select_statement.cc b/cql3/statements/select_statement.cc index b905767898..89e426b496 100644 --- a/cql3/statements/select_statement.cc +++ b/cql3/statements/select_statement.cc @@ -25,6 +25,7 @@ #include "exceptions/unrecognized_entity_exception.hh" #include #include "query-result-reader.hh" +#include "query_ranges_to_vnodes.hh" #include "query_result_merger.hh" #include "service/pager/query_pagers.hh" #include "service/storage_proxy.hh" @@ -498,14 +499,14 @@ indexed_table_select_statement::do_execute_base_query( auto cmd = prepare_command_for_base_query(qp, options, state, now, bool(paging_state)); auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options); uint32_t queried_ranges_count = partition_ranges.size(); - service::query_ranges_to_vnodes_generator ranges_to_vnodes(qp.proxy().get_token_metadata_ptr(), _schema, std::move(partition_ranges)); + query_ranges_to_vnodes_generator ranges_to_vnodes(qp.proxy().get_token_metadata_ptr(), _schema, std::move(partition_ranges)); struct base_query_state { query::result_merger merger; - service::query_ranges_to_vnodes_generator ranges_to_vnodes; + query_ranges_to_vnodes_generator ranges_to_vnodes; size_t concurrency = 1; size_t previous_result_size = 0; - base_query_state(uint64_t row_limit, service::query_ranges_to_vnodes_generator&& ranges_to_vnodes_) + base_query_state(uint64_t row_limit, query_ranges_to_vnodes_generator&& ranges_to_vnodes_) : merger(row_limit, query::max_partitions) , ranges_to_vnodes(std::move(ranges_to_vnodes_)) {} diff --git a/query_ranges_to_vnodes.cc b/query_ranges_to_vnodes.cc new file mode 100644 index 0000000000..18cd60d28a --- /dev/null +++ b/query_ranges_to_vnodes.cc @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2021-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "query_ranges_to_vnodes.hh" + +static inline +const dht::token& start_token(const dht::partition_range& r) { + static const dht::token min_token = dht::minimum_token(); + return r.start() ? r.start()->value().token() : min_token; +} + +static inline +const dht::token& end_token(const dht::partition_range& r) { + static const dht::token max_token = dht::maximum_token(); + return r.end() ? r.end()->value().token() : max_token; +} + +query_ranges_to_vnodes_generator::query_ranges_to_vnodes_generator(const locator::token_metadata_ptr tmptr, schema_ptr s, dht::partition_range_vector ranges, bool local) : + _s(s), _ranges(std::move(ranges)), _i(_ranges.begin()), _local(local), _tmptr(std::move(tmptr)) {} + +dht::partition_range_vector query_ranges_to_vnodes_generator::operator()(size_t n) { + n = std::min(n, size_t(1024)); + + dht::partition_range_vector result; + result.reserve(n); + while (_i != _ranges.end() && result.size() != n) { + process_one_range(n, result); + } + return result; +} + +bool query_ranges_to_vnodes_generator::empty() const { + return _ranges.end() == _i; +} + +/** + * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges, + * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results. + */ +void query_ranges_to_vnodes_generator::process_one_range(size_t n, dht::partition_range_vector& ranges) { + dht::ring_position_comparator cmp(*_s); + dht::partition_range& cr = *_i; + + auto get_remainder = [this, &cr] { + _i++; + return std::move(cr); + }; + + auto add_range = [&ranges] (dht::partition_range&& r) { + ranges.emplace_back(std::move(r)); + }; + + if (_local) { // if the range is local no need to divide to vnodes + add_range(get_remainder()); + return; + } + + // special case for bounds containing exactly 1 token + if (start_token(cr) == end_token(cr)) { + if (start_token(cr).is_minimum()) { + _i++; // empty range? Move to the next one + return; + } + add_range(get_remainder()); + return; + } + + // divide the queryRange into pieces delimited by the ring + auto ring_iter = _tmptr->ring_range(cr.start()); + for (const dht::token& upper_bound_token : ring_iter) { + /* + * remainder can be a range/bounds of token _or_ keys and we want to split it with a token: + * - if remainder is tokens, then we'll just split using the provided token. + * - if remainder is keys, we want to split using token.upperBoundKey. For instance, if remainder + * is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to + * split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix + * tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all + * keys having 15 as token and B include none of those (since that is what our node owns). + * asSplitValue() abstracts that choice. + */ + + dht::ring_position split_point(upper_bound_token, dht::ring_position::token_bound::end); + if (!cr.contains(split_point, cmp)) { + break; // no more splits + } + + + // We shouldn't attempt to split on upper bound, because it may result in + // an ambiguous range of the form (x; x] + if (end_token(cr) == upper_bound_token) { + break; + } + + std::pair splits = + cr.split(split_point, cmp); + + add_range(std::move(splits.first)); + cr = std::move(splits.second); + if (ranges.size() == n) { + // we have enough ranges + break; + } + } + + if (ranges.size() < n) { + add_range(get_remainder()); + } +} diff --git a/query_ranges_to_vnodes.hh b/query_ranges_to_vnodes.hh new file mode 100644 index 0000000000..e6c2ebe0e1 --- /dev/null +++ b/query_ranges_to_vnodes.hh @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2021-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "dht/i_partitioner.hh" +#include "locator/token_metadata.hh" +#include "schema.hh" + +class query_ranges_to_vnodes_generator { + schema_ptr _s; + dht::partition_range_vector _ranges; + dht::partition_range_vector::iterator _i; // iterator to current range in _ranges + bool _local; + const locator::token_metadata_ptr _tmptr; + void process_one_range(size_t n, dht::partition_range_vector& ranges); +public: + query_ranges_to_vnodes_generator(const locator::token_metadata_ptr tmptr, schema_ptr s, dht::partition_range_vector ranges, bool local = false); + query_ranges_to_vnodes_generator(const query_ranges_to_vnodes_generator&) = delete; + query_ranges_to_vnodes_generator(query_ranges_to_vnodes_generator&&) = default; + // generate next 'n' vnodes, may return less than requested number of ranges + // which means either that there are no more ranges + // (in which case empty() == true), or too many ranges + // are requested + dht::partition_range_vector operator()(size_t n); + bool empty() const; +}; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 2303d2217b..f4e723d3f0 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -123,12 +123,6 @@ query::digest_algorithm digest_algorithm(service::storage_proxy& proxy) { : query::digest_algorithm::legacy_xxHash_without_null_digest; } -static inline -const dht::token& start_token(const dht::partition_range& r) { - static const dht::token min_token = dht::minimum_token(); - return r.start() ? r.start()->value().token() : min_token; -} - static inline const dht::token& end_token(const dht::partition_range& r) { static const dht::token max_token = dht::maximum_token(); @@ -4757,98 +4751,6 @@ inet_address_vector_replica_set storage_proxy::intersection(const inet_address_v return inter; } -query_ranges_to_vnodes_generator::query_ranges_to_vnodes_generator(const locator::token_metadata_ptr tmptr, schema_ptr s, dht::partition_range_vector ranges, bool local) : - _s(s), _ranges(std::move(ranges)), _i(_ranges.begin()), _local(local), _tmptr(std::move(tmptr)) {} - -dht::partition_range_vector query_ranges_to_vnodes_generator::operator()(size_t n) { - n = std::min(n, size_t(1024)); - - dht::partition_range_vector result; - result.reserve(n); - while (_i != _ranges.end() && result.size() != n) { - process_one_range(n, result); - } - return result; -} - -bool query_ranges_to_vnodes_generator::empty() const { - return _ranges.end() == _i; -} - -/** - * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges, - * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results. - */ -void query_ranges_to_vnodes_generator::process_one_range(size_t n, dht::partition_range_vector& ranges) { - dht::ring_position_comparator cmp(*_s); - dht::partition_range& cr = *_i; - - auto get_remainder = [this, &cr] { - _i++; - return std::move(cr); - }; - - auto add_range = [&ranges] (dht::partition_range&& r) { - ranges.emplace_back(std::move(r)); - }; - - if (_local) { // if the range is local no need to divide to vnodes - add_range(get_remainder()); - return; - } - - // special case for bounds containing exactly 1 token - if (start_token(cr) == end_token(cr)) { - if (start_token(cr).is_minimum()) { - _i++; // empty range? Move to the next one - return; - } - add_range(get_remainder()); - return; - } - - // divide the queryRange into pieces delimited by the ring - auto ring_iter = _tmptr->ring_range(cr.start()); - for (const dht::token& upper_bound_token : ring_iter) { - /* - * remainder can be a range/bounds of token _or_ keys and we want to split it with a token: - * - if remainder is tokens, then we'll just split using the provided token. - * - if remainder is keys, we want to split using token.upperBoundKey. For instance, if remainder - * is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to - * split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix - * tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all - * keys having 15 as token and B include none of those (since that is what our node owns). - * asSplitValue() abstracts that choice. - */ - - dht::ring_position split_point(upper_bound_token, dht::ring_position::token_bound::end); - if (!cr.contains(split_point, cmp)) { - break; // no more splits - } - - - // We shouldn't attempt to split on upper bound, because it may result in - // an ambiguous range of the form (x; x] - if (end_token(cr) == upper_bound_token) { - break; - } - - std::pair splits = - cr.split(split_point, cmp); - - add_range(std::move(splits.first)); - cr = std::move(splits.second); - if (ranges.size() == n) { - // we have enough ranges - break; - } - } - - if (ranges.size() < n) { - add_range(get_remainder()); - } -} - bool storage_proxy::hints_enabled(db::write_type type) const noexcept { return (!_hints_manager.is_disabled_for_all() && type != db::write_type::CAS) || type == db::write_type::VIEW; } diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index c5cdb25563..fd1d3d8b9f 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -43,6 +43,7 @@ #include "utils/small_vector.hh" #include "service/endpoint_lifecycle_subscriber.hh" #include +#include "query_ranges_to_vnodes.hh" class reconcilable_result; class frozen_mutation_and_schema; @@ -99,26 +100,6 @@ struct view_update_backlog_timestamped { struct allow_hints_tag {}; using allow_hints = bool_class; - -class query_ranges_to_vnodes_generator { - schema_ptr _s; - dht::partition_range_vector _ranges; - dht::partition_range_vector::iterator _i; // iterator to current range in _ranges - bool _local; - const locator::token_metadata_ptr _tmptr; - void process_one_range(size_t n, dht::partition_range_vector& ranges); -public: - query_ranges_to_vnodes_generator(const locator::token_metadata_ptr tmptr, schema_ptr s, dht::partition_range_vector ranges, bool local = false); - query_ranges_to_vnodes_generator(const query_ranges_to_vnodes_generator&) = delete; - query_ranges_to_vnodes_generator(query_ranges_to_vnodes_generator&&) = default; - // generate next 'n' vnodes, may return less than requested number of ranges - // which means either that there are no more ranges - // (in which case empty() == true), or too many ranges - // are requested - dht::partition_range_vector operator()(size_t n); - bool empty() const; -}; - struct storage_proxy_coordinator_query_result { foreign_ptr> query_result; replicas_per_token_range last_replicas; diff --git a/test/boost/storage_proxy_test.cc b/test/boost/storage_proxy_test.cc index c9f10906a8..7e1fede4bf 100644 --- a/test/boost/storage_proxy_test.cc +++ b/test/boost/storage_proxy_test.cc @@ -15,6 +15,7 @@ #include "test/lib/mutation_source_test.hh" #include "test/lib/result_set_assertions.hh" #include "service/storage_proxy.hh" +#include "query_ranges_to_vnodes.hh" #include "partition_slice_builder.hh" #include "schema_builder.hh" @@ -42,7 +43,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) { auto check = [&s](locator::token_metadata_ptr tmptr, dht::partition_range input, dht::partition_range_vector expected) { - service::query_ranges_to_vnodes_generator ranges_to_vnodes(tmptr, s, {input}); + query_ranges_to_vnodes_generator ranges_to_vnodes(tmptr, s, {input}); auto actual = ranges_to_vnodes(expected.size()); if (!std::equal(actual.begin(), actual.end(), expected.begin(), [&s](auto&& r1, auto&& r2) { return r1.equal(r2, dht::ring_position_comparator(*s));