storage_proxy: extract query_ranges_to_vnodes_generator to a separate file

Such separation allows using query_ranges_to_vnodes_generator by other
services without needing a storage_proxy dependency.
This commit is contained in:
Michał Sala
2021-11-07 17:01:43 +01:00
parent fff454761a
commit 0fe59082ec
8 changed files with 154 additions and 122 deletions

View File

@@ -528,6 +528,7 @@ set(scylla_sources
partition_version.cc
querier.cc
query.cc
query_ranges_to_vnodes.cc
query-result-set.cc
raft/fsm.cc
raft/log.cc

View File

@@ -824,6 +824,7 @@ scylla_core = (['replica/database.cc',
'service/priority_manager.cc',
'service/migration_manager.cc',
'service/storage_proxy.cc',
'query_ranges_to_vnodes.cc',
'service/paxos/proposal.cc',
'service/paxos/prepare_response.cc',
'service/paxos/paxos_state.cc',

View File

@@ -25,6 +25,7 @@
#include "exceptions/unrecognized_entity_exception.hh"
#include <seastar/core/shared_ptr.hh>
#include "query-result-reader.hh"
#include "query_ranges_to_vnodes.hh"
#include "query_result_merger.hh"
#include "service/pager/query_pagers.hh"
#include "service/storage_proxy.hh"
@@ -498,14 +499,14 @@ indexed_table_select_statement::do_execute_base_query(
auto cmd = prepare_command_for_base_query(qp, options, state, now, bool(paging_state));
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
uint32_t queried_ranges_count = partition_ranges.size();
service::query_ranges_to_vnodes_generator ranges_to_vnodes(qp.proxy().get_token_metadata_ptr(), _schema, std::move(partition_ranges));
query_ranges_to_vnodes_generator ranges_to_vnodes(qp.proxy().get_token_metadata_ptr(), _schema, std::move(partition_ranges));
struct base_query_state {
query::result_merger merger;
service::query_ranges_to_vnodes_generator ranges_to_vnodes;
query_ranges_to_vnodes_generator ranges_to_vnodes;
size_t concurrency = 1;
size_t previous_result_size = 0;
base_query_state(uint64_t row_limit, service::query_ranges_to_vnodes_generator&& ranges_to_vnodes_)
base_query_state(uint64_t row_limit, query_ranges_to_vnodes_generator&& ranges_to_vnodes_)
: merger(row_limit, query::max_partitions)
, ranges_to_vnodes(std::move(ranges_to_vnodes_))
{}

113
query_ranges_to_vnodes.cc Normal file
View File

@@ -0,0 +1,113 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "query_ranges_to_vnodes.hh"
static inline
const dht::token& start_token(const dht::partition_range& r) {
static const dht::token min_token = dht::minimum_token();
return r.start() ? r.start()->value().token() : min_token;
}
static inline
const dht::token& end_token(const dht::partition_range& r) {
static const dht::token max_token = dht::maximum_token();
return r.end() ? r.end()->value().token() : max_token;
}
query_ranges_to_vnodes_generator::query_ranges_to_vnodes_generator(const locator::token_metadata_ptr tmptr, schema_ptr s, dht::partition_range_vector ranges, bool local) :
_s(s), _ranges(std::move(ranges)), _i(_ranges.begin()), _local(local), _tmptr(std::move(tmptr)) {}
dht::partition_range_vector query_ranges_to_vnodes_generator::operator()(size_t n) {
n = std::min(n, size_t(1024));
dht::partition_range_vector result;
result.reserve(n);
while (_i != _ranges.end() && result.size() != n) {
process_one_range(n, result);
}
return result;
}
bool query_ranges_to_vnodes_generator::empty() const {
return _ranges.end() == _i;
}
/**
* Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
* so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
*/
void query_ranges_to_vnodes_generator::process_one_range(size_t n, dht::partition_range_vector& ranges) {
dht::ring_position_comparator cmp(*_s);
dht::partition_range& cr = *_i;
auto get_remainder = [this, &cr] {
_i++;
return std::move(cr);
};
auto add_range = [&ranges] (dht::partition_range&& r) {
ranges.emplace_back(std::move(r));
};
if (_local) { // if the range is local no need to divide to vnodes
add_range(get_remainder());
return;
}
// special case for bounds containing exactly 1 token
if (start_token(cr) == end_token(cr)) {
if (start_token(cr).is_minimum()) {
_i++; // empty range? Move to the next one
return;
}
add_range(get_remainder());
return;
}
// divide the queryRange into pieces delimited by the ring
auto ring_iter = _tmptr->ring_range(cr.start());
for (const dht::token& upper_bound_token : ring_iter) {
/*
* remainder can be a range/bounds of token _or_ keys and we want to split it with a token:
* - if remainder is tokens, then we'll just split using the provided token.
* - if remainder is keys, we want to split using token.upperBoundKey. For instance, if remainder
* is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to
* split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix
* tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all
* keys having 15 as token and B include none of those (since that is what our node owns).
* asSplitValue() abstracts that choice.
*/
dht::ring_position split_point(upper_bound_token, dht::ring_position::token_bound::end);
if (!cr.contains(split_point, cmp)) {
break; // no more splits
}
// We shouldn't attempt to split on upper bound, because it may result in
// an ambiguous range of the form (x; x]
if (end_token(cr) == upper_bound_token) {
break;
}
std::pair<dht::partition_range, dht::partition_range> splits =
cr.split(split_point, cmp);
add_range(std::move(splits.first));
cr = std::move(splits.second);
if (ranges.size() == n) {
// we have enough ranges
break;
}
}
if (ranges.size() < n) {
add_range(get_remainder());
}
}

32
query_ranges_to_vnodes.hh Normal file
View File

@@ -0,0 +1,32 @@
/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "dht/i_partitioner.hh"
#include "locator/token_metadata.hh"
#include "schema.hh"
class query_ranges_to_vnodes_generator {
schema_ptr _s;
dht::partition_range_vector _ranges;
dht::partition_range_vector::iterator _i; // iterator to current range in _ranges
bool _local;
const locator::token_metadata_ptr _tmptr;
void process_one_range(size_t n, dht::partition_range_vector& ranges);
public:
query_ranges_to_vnodes_generator(const locator::token_metadata_ptr tmptr, schema_ptr s, dht::partition_range_vector ranges, bool local = false);
query_ranges_to_vnodes_generator(const query_ranges_to_vnodes_generator&) = delete;
query_ranges_to_vnodes_generator(query_ranges_to_vnodes_generator&&) = default;
// generate next 'n' vnodes, may return less than requested number of ranges
// which means either that there are no more ranges
// (in which case empty() == true), or too many ranges
// are requested
dht::partition_range_vector operator()(size_t n);
bool empty() const;
};

View File

@@ -123,12 +123,6 @@ query::digest_algorithm digest_algorithm(service::storage_proxy& proxy) {
: query::digest_algorithm::legacy_xxHash_without_null_digest;
}
static inline
const dht::token& start_token(const dht::partition_range& r) {
static const dht::token min_token = dht::minimum_token();
return r.start() ? r.start()->value().token() : min_token;
}
static inline
const dht::token& end_token(const dht::partition_range& r) {
static const dht::token max_token = dht::maximum_token();
@@ -4757,98 +4751,6 @@ inet_address_vector_replica_set storage_proxy::intersection(const inet_address_v
return inter;
}
query_ranges_to_vnodes_generator::query_ranges_to_vnodes_generator(const locator::token_metadata_ptr tmptr, schema_ptr s, dht::partition_range_vector ranges, bool local) :
_s(s), _ranges(std::move(ranges)), _i(_ranges.begin()), _local(local), _tmptr(std::move(tmptr)) {}
dht::partition_range_vector query_ranges_to_vnodes_generator::operator()(size_t n) {
n = std::min(n, size_t(1024));
dht::partition_range_vector result;
result.reserve(n);
while (_i != _ranges.end() && result.size() != n) {
process_one_range(n, result);
}
return result;
}
bool query_ranges_to_vnodes_generator::empty() const {
return _ranges.end() == _i;
}
/**
* Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
* so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
*/
void query_ranges_to_vnodes_generator::process_one_range(size_t n, dht::partition_range_vector& ranges) {
dht::ring_position_comparator cmp(*_s);
dht::partition_range& cr = *_i;
auto get_remainder = [this, &cr] {
_i++;
return std::move(cr);
};
auto add_range = [&ranges] (dht::partition_range&& r) {
ranges.emplace_back(std::move(r));
};
if (_local) { // if the range is local no need to divide to vnodes
add_range(get_remainder());
return;
}
// special case for bounds containing exactly 1 token
if (start_token(cr) == end_token(cr)) {
if (start_token(cr).is_minimum()) {
_i++; // empty range? Move to the next one
return;
}
add_range(get_remainder());
return;
}
// divide the queryRange into pieces delimited by the ring
auto ring_iter = _tmptr->ring_range(cr.start());
for (const dht::token& upper_bound_token : ring_iter) {
/*
* remainder can be a range/bounds of token _or_ keys and we want to split it with a token:
* - if remainder is tokens, then we'll just split using the provided token.
* - if remainder is keys, we want to split using token.upperBoundKey. For instance, if remainder
* is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to
* split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix
* tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all
* keys having 15 as token and B include none of those (since that is what our node owns).
* asSplitValue() abstracts that choice.
*/
dht::ring_position split_point(upper_bound_token, dht::ring_position::token_bound::end);
if (!cr.contains(split_point, cmp)) {
break; // no more splits
}
// We shouldn't attempt to split on upper bound, because it may result in
// an ambiguous range of the form (x; x]
if (end_token(cr) == upper_bound_token) {
break;
}
std::pair<dht::partition_range, dht::partition_range> splits =
cr.split(split_point, cmp);
add_range(std::move(splits.first));
cr = std::move(splits.second);
if (ranges.size() == n) {
// we have enough ranges
break;
}
}
if (ranges.size() < n) {
add_range(get_remainder());
}
}
bool storage_proxy::hints_enabled(db::write_type type) const noexcept {
return (!_hints_manager.is_disabled_for_all() && type != db::write_type::CAS) || type == db::write_type::VIEW;
}

View File

@@ -43,6 +43,7 @@
#include "utils/small_vector.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include <seastar/core/circular_buffer.hh>
#include "query_ranges_to_vnodes.hh"
class reconcilable_result;
class frozen_mutation_and_schema;
@@ -99,26 +100,6 @@ struct view_update_backlog_timestamped {
struct allow_hints_tag {};
using allow_hints = bool_class<allow_hints_tag>;
class query_ranges_to_vnodes_generator {
schema_ptr _s;
dht::partition_range_vector _ranges;
dht::partition_range_vector::iterator _i; // iterator to current range in _ranges
bool _local;
const locator::token_metadata_ptr _tmptr;
void process_one_range(size_t n, dht::partition_range_vector& ranges);
public:
query_ranges_to_vnodes_generator(const locator::token_metadata_ptr tmptr, schema_ptr s, dht::partition_range_vector ranges, bool local = false);
query_ranges_to_vnodes_generator(const query_ranges_to_vnodes_generator&) = delete;
query_ranges_to_vnodes_generator(query_ranges_to_vnodes_generator&&) = default;
// generate next 'n' vnodes, may return less than requested number of ranges
// which means either that there are no more ranges
// (in which case empty() == true), or too many ranges
// are requested
dht::partition_range_vector operator()(size_t n);
bool empty() const;
};
struct storage_proxy_coordinator_query_result {
foreign_ptr<lw_shared_ptr<query::result>> query_result;
replicas_per_token_range last_replicas;

View File

@@ -15,6 +15,7 @@
#include "test/lib/mutation_source_test.hh"
#include "test/lib/result_set_assertions.hh"
#include "service/storage_proxy.hh"
#include "query_ranges_to_vnodes.hh"
#include "partition_slice_builder.hh"
#include "schema_builder.hh"
@@ -42,7 +43,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
auto check = [&s](locator::token_metadata_ptr tmptr, dht::partition_range input,
dht::partition_range_vector expected) {
service::query_ranges_to_vnodes_generator ranges_to_vnodes(tmptr, s, {input});
query_ranges_to_vnodes_generator ranges_to_vnodes(tmptr, s, {input});
auto actual = ranges_to_vnodes(expected.size());
if (!std::equal(actual.begin(), actual.end(), expected.begin(), [&s](auto&& r1, auto&& r2) {
return r1.equal(r2, dht::ring_position_comparator(*s));