mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 09:30:45 +00:00
Instead of dht::partition_ranges_vector, which is an std::vector<> and
have been seen to cause large allocations when calculating ranges to be
invalidated after compaction:
seastar_memory - oversized allocation: 147456 bytes. This is non-fatal, but could lead to latency and/or fragmentation issues. Please report: at
[Backtrace #0]
void seastar::backtrace<seastar::current_backtrace_tasklocal()::$_0>(seastar::current_backtrace_tasklocal()::$_0&&, bool) at ./build/release/seastar/./seastar/include/seastar/util/backtrace.hh:89
(inlined by) seastar::current_backtrace_tasklocal() at ./build/release/seastar/./seastar/src/util/backtrace.cc:99
seastar::current_tasktrace() at ./build/release/seastar/./seastar/src/util/backtrace.cc:136
seastar::current_backtrace() at ./build/release/seastar/./seastar/src/util/backtrace.cc:169
seastar::memory::cpu_pages::warn_large_allocation(unsigned long) at ./build/release/seastar/./seastar/src/core/memory.cc:840
seastar::memory::cpu_pages::check_large_allocation(unsigned long) at ./build/release/seastar/./seastar/src/core/memory.cc:903
(inlined by) seastar::memory::cpu_pages::allocate_large(unsigned int, bool) at ./build/release/seastar/./seastar/src/core/memory.cc:910
(inlined by) seastar::memory::allocate_large(unsigned long, bool) at ./build/release/seastar/./seastar/src/core/memory.cc:1533
(inlined by) seastar::memory::allocate_slowpath(unsigned long) at ./build/release/seastar/./seastar/src/core/memory.cc:1679
seastar::memory::allocate(unsigned long) at ././seastar/src/core/memory.cc:1698
(inlined by) operator new(unsigned long) at ././seastar/src/core/memory.cc:2440
(inlined by) std::__new_allocator<interval<dht::ring_position>>::allocate(unsigned long, void const*) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/new_allocator.h:151
(inlined by) std::allocator<interval<dht::ring_position>>::allocate(unsigned long) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/allocator.h:203
(inlined by) std::allocator_traits<std::allocator<interval<dht::ring_position>>>::allocate(std::allocator<interval<dht::ring_position>>&, unsigned long) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/alloc_traits.h:614
(inlined by) std::_Vector_base<interval<dht::ring_position>, std::allocator<interval<dht::ring_position>>>::_M_allocate(unsigned long) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/stl_vector.h:387
(inlined by) std::vector<interval<dht::ring_position>, std::allocator<interval<dht::ring_position>>>::reserve(unsigned long) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/vector.tcc:79
dht::to_partition_ranges(utils::chunked_vector<interval<dht::token>, 131072ul> const&, seastar::bool_class<utils::can_yield_tag>) at ./dht/i_partitioner.cc:347
compaction::compaction::get_ranges_for_invalidation(std::vector<seastar::lw_shared_ptr<sstables::sstable>, std::allocator<seastar::lw_shared_ptr<sstables::sstable>>> const&) at ./compaction/compaction.cc:619
(inlined by) compaction::compaction::get_compaction_completion_desc(std::vector<seastar::lw_shared_ptr<sstables::sstable>, std::allocator<seastar::lw_shared_ptr<sstables::sstable>>>, std::vector<seastar::lw_shared_ptr<sstables::sstable>, std::allocator<seastar::lw_shared_ptr<sstables::sstable>>>) at ./compaction/compaction.cc:719
(inlined by) compaction::regular_compaction::replace_remaining_exhausted_sstables() at ./compaction/compaction.cc:1362
compaction::compaction::finish(std::chrono::time_point<db_clock, std::chrono::duration<long, std::ratio<1l, 1000l>>>, std::chrono::time_point<db_clock, std::chrono::duration<long, std::ratio<1l, 1000l>>>) at ./compaction/compaction.cc:1021
compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0::operator()() at ./compaction/compaction.cc:1960
(inlined by) compaction::compaction_result std::__invoke_impl<compaction::compaction_result, compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>(std::__invoke_other, compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/invoke.h:63
(inlined by) std::__invoke_result<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>::type std::__invoke<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>(compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/bits/invoke.h:98
(inlined by) decltype(auto) std::__apply_impl<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0, std::tuple<>>(compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&, std::tuple<>&&, std::integer_sequence<unsigned long, ...>) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/tuple:2920
(inlined by) decltype(auto) std::apply<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0, std::tuple<>>(compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&, std::tuple<>&&) at /usr/lib/gcc/x86_64-redhat-linux/15/../../../../include/c++/15/tuple:2935
(inlined by) seastar::future<compaction::compaction_result> seastar::futurize<compaction::compaction_result>::apply<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>(compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&, std::tuple<>&&) at ././seastar/include/seastar/core/future.hh:1930
(inlined by) seastar::futurize<std::invoke_result<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>::type>::type seastar::async<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>(seastar::thread_attributes, compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&)::'lambda'()::operator()() const at ././seastar/include/seastar/core/thread.hh:267
(inlined by) seastar::noncopyable_function<void ()>::direct_vtable_for<seastar::futurize<std::invoke_result<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>::type>::type seastar::async<compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0>(seastar::thread_attributes, compaction::compaction::run(std::unique_ptr<compaction::compaction, std::default_delete<compaction::compaction>>)::$_0&&)::'lambda'()>::call(seastar::noncopyable_function<void ()> const*) at ././seastar/include/seastar/util/noncopyable_function.hh:138
seastar::noncopyable_function<void ()>::operator()() const at ./build/release/seastar/./seastar/include/seastar/util/noncopyable_function.hh:224
(inlined by) seastar::thread_context::main() at ./build/release/seastar/./seastar/src/core/thread.cc:318
dht::partition_ranges_vector is used on the hot path, so just convert
the problematic user -- cache invalidation -- to use
utils::chunked_vector<dht::partition_range> instead.
Fixes: SCYLLADB-121
Closes scylladb/scylladb#28855
604 lines
22 KiB
C++
604 lines
22 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "i_partitioner.hh"
|
|
#include "sharder.hh"
|
|
#include "auto_refreshing_sharder.hh"
|
|
#include <seastar/core/loop.hh>
|
|
#include <seastar/core/seastar.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include "dht/ring_position.hh"
|
|
#include "dht/token-sharding.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/class_registrator.hh"
|
|
#include "sstables/key.hh"
|
|
#include "replica/database.hh"
|
|
#include <seastar/core/thread.hh>
|
|
#include <seastar/core/on_internal_error.hh>
|
|
#include "utils/log.hh"
|
|
|
|
namespace dht {
|
|
|
|
static logging::logger logger("i_partitioner");
|
|
|
|
sharder::sharder(unsigned shard_count, unsigned sharding_ignore_msb_bits)
|
|
: _shard_count(shard_count)
|
|
// if one shard, ignore sharding_ignore_msb_bits as they will just cause needless
|
|
// range breaks
|
|
, _sharding_ignore_msb_bits(shard_count > 1 ? sharding_ignore_msb_bits : 0)
|
|
{}
|
|
|
|
static_sharder::static_sharder(unsigned shard_count, unsigned sharding_ignore_msb_bits)
|
|
: sharder(shard_count, sharding_ignore_msb_bits)
|
|
, _shard_start(init_zero_based_shard_start(_shard_count, _sharding_ignore_msb_bits))
|
|
{}
|
|
|
|
unsigned
|
|
static_sharder::shard_of(const token& t) const {
|
|
return dht::shard_of(_shard_count, _sharding_ignore_msb_bits, t);
|
|
}
|
|
|
|
std::optional<unsigned>
|
|
static_sharder::try_get_shard_for_reads(const token& t) const {
|
|
return shard_of(t);
|
|
}
|
|
|
|
shard_replica_set
|
|
static_sharder::shard_for_writes(const token& t, std::optional<write_replica_set_selector> sel) const {
|
|
return {shard_of(t)};
|
|
}
|
|
|
|
token
|
|
static_sharder::token_for_next_shard(const token& t, shard_id shard, unsigned spans) const {
|
|
return dht::token_for_next_shard(_shard_start, _shard_count, _sharding_ignore_msb_bits, t, shard, spans);
|
|
}
|
|
|
|
token
|
|
static_sharder::token_for_next_shard_for_reads(const token& t, shard_id shard, unsigned spans) const {
|
|
return token_for_next_shard(t, shard, spans);
|
|
}
|
|
|
|
std::optional<shard_and_token>
|
|
static_sharder::next_shard(const token& t) const {
|
|
auto shard = shard_for_reads(t);
|
|
auto next_shard = shard + 1 == _shard_count ? 0 : shard + 1;
|
|
auto next_token = token_for_next_shard_for_reads(t, next_shard);
|
|
if (next_token.is_maximum()) {
|
|
return std::nullopt;
|
|
}
|
|
return shard_and_token{next_shard, next_token};
|
|
}
|
|
|
|
std::optional<shard_and_token>
|
|
static_sharder::next_shard_for_reads(const token& t) const {
|
|
return next_shard(t);
|
|
}
|
|
|
|
std::unique_ptr<dht::i_partitioner> make_partitioner(sstring partitioner_name) {
|
|
try {
|
|
return create_object<i_partitioner>(partitioner_name);
|
|
} catch (std::exception& e) {
|
|
throw std::runtime_error(fmt::format("Partitioner {} is not supported, supported partitioners = {{ {} }} : {}",
|
|
partitioner_name,
|
|
fmt::join(
|
|
class_registry<i_partitioner>::classes() |
|
|
std::views::keys,
|
|
", "),
|
|
e.what()));
|
|
}
|
|
}
|
|
|
|
bool
|
|
decorated_key::equal(const schema& s, const decorated_key& other) const {
|
|
if (_token == other._token) {
|
|
return _key.legacy_equal(s, other._key);
|
|
}
|
|
return false;
|
|
}
|
|
|
|
std::strong_ordering
|
|
decorated_key::tri_compare(const schema& s, const decorated_key& other) const {
|
|
auto r = _token <=> other._token;
|
|
if (r != 0) {
|
|
return r;
|
|
} else {
|
|
return _key.legacy_tri_compare(s, other._key);
|
|
}
|
|
}
|
|
|
|
std::strong_ordering
|
|
decorated_key::tri_compare(const schema& s, const ring_position& other) const {
|
|
auto r = _token <=> other.token();
|
|
if (r != 0) {
|
|
return r;
|
|
} else if (other.has_key()) {
|
|
return _key.legacy_tri_compare(s, *other.key());
|
|
}
|
|
return 0 <=> other.relation_to_keys();
|
|
}
|
|
|
|
bool
|
|
decorated_key::less_compare(const schema& s, const ring_position& other) const {
|
|
return tri_compare(s, other) < 0;
|
|
}
|
|
|
|
bool
|
|
decorated_key::less_compare(const schema& s, const decorated_key& other) const {
|
|
return tri_compare(s, other) < 0;
|
|
}
|
|
|
|
decorated_key::less_comparator::less_comparator(schema_ptr s)
|
|
: s(std::move(s))
|
|
{ }
|
|
|
|
bool
|
|
decorated_key::less_comparator::operator()(const decorated_key& lhs, const decorated_key& rhs) const {
|
|
return lhs.less_compare(*s, rhs);
|
|
}
|
|
|
|
bool
|
|
decorated_key::less_comparator::operator()(const ring_position& lhs, const decorated_key& rhs) const {
|
|
return rhs.tri_compare(*s, lhs) > 0;
|
|
}
|
|
|
|
bool
|
|
decorated_key::less_comparator::operator()(const decorated_key& lhs, const ring_position& rhs) const {
|
|
return lhs.tri_compare(*s, rhs) < 0;
|
|
}
|
|
|
|
unsigned static_shard_of(const schema& s, const token& t) {
|
|
return s.get_sharder().shard_for_reads(t);
|
|
}
|
|
|
|
std::optional<dht::token_range>
|
|
selective_token_range_sharder::next() {
|
|
if (_done) {
|
|
return {};
|
|
}
|
|
while (_range.overlaps(dht::token_range(_start_boundary, {}), dht::token_comparator())
|
|
&& !(_start_boundary && _start_boundary->value() == maximum_token())) {
|
|
auto end_token = _sharder.token_for_next_shard_for_reads(_start_token, _next_shard);
|
|
auto candidate = dht::token_range(std::move(_start_boundary), interval_bound<dht::token>(end_token, false));
|
|
auto intersection = _range.intersection(std::move(candidate), dht::token_comparator());
|
|
_start_token = _sharder.token_for_next_shard_for_reads(end_token, _shard);
|
|
_start_boundary = interval_bound<dht::token>(_start_token);
|
|
if (intersection) {
|
|
return *intersection;
|
|
}
|
|
}
|
|
|
|
_done = true;
|
|
return {};
|
|
}
|
|
|
|
std::optional<ring_position_range_and_shard>
|
|
ring_position_range_sharder::next(const schema& s) {
|
|
if (_done) {
|
|
return {};
|
|
}
|
|
auto token = _range.start() ? _range.start()->value().token() : dht::minimum_token();
|
|
auto shard = _sharder.shard_for_reads(token);
|
|
auto next_shard_and_token = _sharder.next_shard_for_reads(token);
|
|
if (!next_shard_and_token) {
|
|
_done = true;
|
|
return ring_position_range_and_shard{std::move(_range), shard};
|
|
}
|
|
auto shard_boundary_token = next_shard_and_token->token;
|
|
auto shard_boundary = ring_position::starting_at(shard_boundary_token);
|
|
if ((!_range.end() || shard_boundary.less_compare(s, _range.end()->value()))
|
|
&& !shard_boundary_token.is_maximum()) {
|
|
// split the range at end_of_shard
|
|
auto start = _range.start_copy();
|
|
auto end = interval_bound<ring_position>(shard_boundary, false);
|
|
_range = dht::partition_range(
|
|
interval_bound<ring_position>(std::move(shard_boundary), true),
|
|
_range.end());
|
|
return ring_position_range_and_shard{dht::partition_range(std::move(start), std::move(end)), shard};
|
|
}
|
|
_done = true;
|
|
return ring_position_range_and_shard{std::move(_range), shard};
|
|
}
|
|
|
|
ring_position_range_vector_sharder::ring_position_range_vector_sharder(const sharder& sharder, utils::chunked_vector<dht::partition_range> ranges)
|
|
: _ranges(std::move(ranges))
|
|
, _sharder(sharder)
|
|
, _current_range(_ranges.begin()) {
|
|
next_range();
|
|
}
|
|
|
|
std::optional<ring_position_range_and_shard_and_element>
|
|
ring_position_range_vector_sharder::next(const schema& s) {
|
|
if (!_current_sharder) {
|
|
return std::nullopt;
|
|
}
|
|
auto range_and_shard = _current_sharder->next(s);
|
|
while (!range_and_shard && _current_range != _ranges.end()) {
|
|
next_range();
|
|
range_and_shard = _current_sharder->next(s);
|
|
}
|
|
auto ret = std::optional<ring_position_range_and_shard_and_element>();
|
|
if (range_and_shard) {
|
|
ret.emplace(std::move(*range_and_shard), _current_range - _ranges.begin() - 1);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
future<utils::chunked_vector<partition_range>>
|
|
split_range_to_single_shard(const schema& s, const static_sharder& sharder, const partition_range& pr, shard_id shard) {
|
|
auto start_token = pr.start() ? pr.start()->value().token() : minimum_token();
|
|
auto start_shard = sharder.shard_of(start_token);
|
|
auto start_boundary = start_shard == shard ? pr.start_copy() : interval_bound<ring_position>(ring_position::starting_at(sharder.token_for_next_shard(start_token, shard)));
|
|
start_token = start_shard == shard ? start_token : sharder.token_for_next_shard(start_token, shard);
|
|
return repeat_until_value([&sharder,
|
|
&pr,
|
|
cmp = ring_position_comparator(s),
|
|
ret = utils::chunked_vector<partition_range>(),
|
|
start_token,
|
|
start_boundary,
|
|
shard] () mutable {
|
|
if (pr.overlaps(partition_range(start_boundary, {}), cmp)
|
|
&& !(start_boundary && start_boundary->value().token().is_maximum())) {
|
|
dht::token end_token = maximum_token();
|
|
auto s_a_t = sharder.next_shard(start_token);
|
|
if (s_a_t) {
|
|
end_token = s_a_t->token;
|
|
}
|
|
auto candidate = partition_range(std::move(start_boundary), interval_bound<ring_position>(ring_position::starting_at(end_token), false));
|
|
auto intersection = pr.intersection(std::move(candidate), cmp);
|
|
if (intersection) {
|
|
ret.push_back(std::move(*intersection));
|
|
}
|
|
if (!s_a_t) {
|
|
return make_ready_future<std::optional<utils::chunked_vector<partition_range>>>(std::move(ret));
|
|
}
|
|
if (s_a_t->shard == shard) {
|
|
start_token = end_token;
|
|
} else {
|
|
start_token = sharder.token_for_next_shard(end_token, shard);
|
|
}
|
|
start_boundary = interval_bound<ring_position>(ring_position::starting_at(start_token));
|
|
return make_ready_future<std::optional<utils::chunked_vector<partition_range>>>();
|
|
}
|
|
return make_ready_future<std::optional<utils::chunked_vector<partition_range>>>(std::move(ret));
|
|
});
|
|
}
|
|
|
|
std::strong_ordering ring_position::tri_compare(const schema& s, const ring_position& o) const {
|
|
return ring_position_comparator(s)(*this, o);
|
|
}
|
|
|
|
bool ring_position::equal(const schema& s, const ring_position& other) const {
|
|
return tri_compare(s, other) == 0;
|
|
}
|
|
|
|
bool ring_position::less_compare(const schema& s, const ring_position& other) const {
|
|
return tri_compare(s, other) < 0;
|
|
}
|
|
|
|
std::strong_ordering ring_position_tri_compare(const schema& s, ring_position_view lh, ring_position_view rh) {
|
|
auto token_cmp = *lh._token <=> *rh._token;
|
|
if (token_cmp != 0) {
|
|
return token_cmp;
|
|
}
|
|
if (lh._key && rh._key) {
|
|
auto c = lh._key->legacy_tri_compare(s, *rh._key);
|
|
if (c != 0) {
|
|
return c;
|
|
}
|
|
return (lh._weight - rh._weight) <=> 0;
|
|
}
|
|
if (!lh._key && !rh._key) {
|
|
return lh._weight - rh._weight <=> 0;
|
|
} else if (!lh._key) {
|
|
return lh._weight > 0 ? std::strong_ordering::greater : std::strong_ordering::less;
|
|
} else {
|
|
return rh._weight > 0 ? std::strong_ordering::less : std::strong_ordering::greater;
|
|
}
|
|
}
|
|
|
|
std::strong_ordering ring_position_comparator_for_sstables::operator()(ring_position_view lh, sstables::decorated_key_view rh) const {
|
|
auto token_cmp = *lh._token <=> rh.token();
|
|
if (token_cmp != 0) {
|
|
return token_cmp;
|
|
}
|
|
if (lh._key) {
|
|
auto rel = rh.key().tri_compare(s, *lh._key);
|
|
if (rel != std::strong_ordering::equal) {
|
|
return 0 <=> rel;
|
|
}
|
|
}
|
|
return lh._weight <=> 0;
|
|
}
|
|
|
|
std::strong_ordering ring_position_comparator_for_sstables::operator()(sstables::decorated_key_view a, ring_position_view b) const {
|
|
return 0 <=> (*this)(b, a);
|
|
}
|
|
|
|
dht::partition_range
|
|
to_partition_range(dht::token_range r) {
|
|
using bound = dht::partition_range::bound;
|
|
using bound_opt = std::optional<dht::partition_range::bound>;
|
|
auto start = r.start()
|
|
? bound_opt(bound(dht::ring_position(r.start()->value(),
|
|
r.start()->is_inclusive()
|
|
? dht::ring_position::token_bound::start
|
|
: dht::ring_position::token_bound::end),
|
|
r.start()->is_inclusive()))
|
|
: bound_opt();
|
|
|
|
auto end = r.end()
|
|
? bound_opt(bound(dht::ring_position(r.end()->value(),
|
|
r.end()->is_inclusive()
|
|
? dht::ring_position::token_bound::end
|
|
: dht::ring_position::token_bound::start),
|
|
r.end()->is_inclusive()))
|
|
: bound_opt();
|
|
|
|
return { std::move(start), std::move(end) };
|
|
}
|
|
|
|
dht::partition_range_vector to_partition_ranges(const dht::token_range_vector& ranges, utils::can_yield can_yield) {
|
|
dht::partition_range_vector prs;
|
|
prs.reserve(ranges.size());
|
|
for (auto& range : ranges) {
|
|
prs.push_back(dht::to_partition_range(range));
|
|
utils::maybe_yield(can_yield);
|
|
}
|
|
return prs;
|
|
}
|
|
|
|
future<utils::chunked_vector<dht::partition_range>> to_partition_ranges_chunked(const dht::token_range_vector& ranges) {
|
|
utils::chunked_vector<dht::partition_range> prs;
|
|
prs.reserve(ranges.size());
|
|
for (auto& range : ranges) {
|
|
prs.push_back(dht::to_partition_range(range));
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
co_return prs;
|
|
}
|
|
|
|
std::map<unsigned, dht::partition_range_vector>
|
|
split_range_to_shards(dht::partition_range pr, const schema& s, const sharder& raw_sharder) {
|
|
std::map<unsigned, dht::partition_range_vector> ret;
|
|
auto sharder = dht::ring_position_range_sharder(raw_sharder, std::move(pr));
|
|
auto rprs = sharder.next(s);
|
|
while (rprs) {
|
|
ret[rprs->shard].emplace_back(rprs->ring_range);
|
|
rprs = sharder.next(s);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
future<utils::chunked_vector<dht::partition_range>> subtract_ranges(const schema& schema, utils::chunked_vector<dht::partition_range> source_ranges, utils::chunked_vector<dht::partition_range> ranges_to_subtract) {
|
|
auto cmp = dht::ring_position_comparator(schema);
|
|
// optimize set of potentially overlapping ranges by deoverlapping them.
|
|
auto ranges = dht::partition_range::deoverlap(std::move(source_ranges), cmp);
|
|
utils::chunked_vector<dht::partition_range> res;
|
|
res.reserve(ranges.size() * 2);
|
|
|
|
auto range = ranges.begin();
|
|
auto range_end = ranges.end();
|
|
auto range_to_subtract = ranges_to_subtract.begin();
|
|
auto range_to_subtract_end = ranges_to_subtract.end();
|
|
while (range != range_end) {
|
|
if (range_to_subtract == range_to_subtract_end) {
|
|
// We're done with range_to_subtracts
|
|
res.emplace_back(std::move(*range));
|
|
++range;
|
|
continue;
|
|
}
|
|
|
|
auto diff = range->subtract(*range_to_subtract, cmp);
|
|
auto size = diff.size();
|
|
switch (size) {
|
|
case 0:
|
|
// current range is fully covered by range_to_subtract, done with it
|
|
// range_to_subtrace.start <= range.start &&
|
|
// range_to_subtrace.end >= range.end
|
|
++range;
|
|
break;
|
|
case 1:
|
|
// Possible cases:
|
|
// a. range and range_to_subtract are disjoint (so diff == range)
|
|
// a.i range_to_subtract.end < range.start
|
|
// a.ii range_to_subtract.start > range.end
|
|
// b. range_to_subtrace.start > range.start, so it removes the range suffix
|
|
// c. range_to_subtrace.start < range.start, so it removes the range prefix
|
|
|
|
// Does range_to_subtract sort after range?
|
|
if (range_to_subtract->start() && (!range->start() || cmp(range_to_subtract->start()->value(), range->start()->value()) > 0)) {
|
|
// save range prefix in the result
|
|
// (note that diff[0] == range in the disjoint case)
|
|
res.emplace_back(std::move(diff[0]));
|
|
// done with current range
|
|
++range;
|
|
} else {
|
|
// set the current range to the remaining suffix
|
|
*range = std::move(diff[0]);
|
|
// done with current range_to_subtract
|
|
++range_to_subtract;
|
|
}
|
|
break;
|
|
case 2:
|
|
// range contains range_to_subtract
|
|
|
|
// save range prefix in the result
|
|
res.emplace_back(std::move(diff[0]));
|
|
// set the current range to the remaining suffix
|
|
*range = std::move(diff[1]);
|
|
// done with current range_to_subtract
|
|
++range_to_subtract;
|
|
break;
|
|
default:
|
|
SCYLLA_ASSERT(size <= 2);
|
|
}
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
co_return res;
|
|
}
|
|
|
|
dht::token_range_vector split_token_range_msb(unsigned most_significant_bits) {
|
|
dht::token_range_vector ret;
|
|
// Avoid shift left 64
|
|
if (!most_significant_bits) {
|
|
auto&& start_bound = dht::token_range::bound(dht::minimum_token(), true);
|
|
auto&& end_bound = dht::token_range::bound(dht::maximum_token(), true);
|
|
ret.emplace_back(std::move(start_bound), std::move(end_bound));
|
|
return ret;
|
|
}
|
|
uint64_t number_of_ranges = 1 << most_significant_bits;
|
|
ret.reserve(number_of_ranges);
|
|
SCYLLA_ASSERT(most_significant_bits < 64);
|
|
dht::token prev_last_token;
|
|
for (uint64_t i = 0; i < number_of_ranges; i++) {
|
|
std::optional<dht::token_range::bound> start_bound;
|
|
std::optional<dht::token_range::bound> end_bound;
|
|
if (i == 0) {
|
|
start_bound = dht::token_range::bound(dht::minimum_token(), true);
|
|
} else {
|
|
auto token = dht::next_token(prev_last_token);
|
|
if (compaction_group_of(most_significant_bits, token) != i) {
|
|
on_fatal_internal_error(logger, format("split_token_range_msb: inconsistent end_bound compaction group: index={} msbits={} token={} compaction_group_of={}",
|
|
i, most_significant_bits, token, compaction_group_of(most_significant_bits, token)));
|
|
}
|
|
start_bound = dht::token_range::bound(prev_last_token, false);
|
|
}
|
|
prev_last_token = dht::last_token_of_compaction_group(most_significant_bits, i);
|
|
end_bound = dht::token_range::bound(prev_last_token, true);
|
|
ret.emplace_back(std::move(start_bound), std::move(end_bound));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
dht::token first_token(const dht::partition_range& pr) {
|
|
auto start = dht::ring_position_view::for_range_start(pr);
|
|
auto token = start.token();
|
|
// Check if the range excludes "token".
|
|
if (!start.key()
|
|
&& start.get_token_bound() == dht::ring_position::token_bound::end
|
|
&& token._kind == dht::token::kind::key
|
|
&& !token.is_last()) {
|
|
token = dht::next_token(token);
|
|
}
|
|
return token;
|
|
}
|
|
|
|
std::optional<shard_id> is_single_shard(const dht::sharder& sharder, const schema& s, const dht::partition_range& pr) {
|
|
auto token = first_token(pr);
|
|
auto shard = sharder.shard_for_reads(token);
|
|
if (pr.is_singular()) {
|
|
return shard;
|
|
}
|
|
if (auto s_a_t = sharder.next_shard_for_reads(token)) {
|
|
dht::ring_position_comparator cmp(s);
|
|
auto end = dht::ring_position_view::for_range_end(pr);
|
|
if (cmp(end, dht::ring_position_view::starting_at(s_a_t->token)) > 0) {
|
|
return std::nullopt;
|
|
}
|
|
}
|
|
return shard;
|
|
}
|
|
|
|
auto_refreshing_sharder::auto_refreshing_sharder(lw_shared_ptr<replica::table> table, std::optional<write_replica_set_selector> sel)
|
|
: _table(std::move(table))
|
|
, _sel(sel)
|
|
{
|
|
refresh();
|
|
}
|
|
|
|
auto_refreshing_sharder::~auto_refreshing_sharder() = default;
|
|
|
|
void
|
|
auto_refreshing_sharder::refresh() {
|
|
_erm = _table->get_effective_replication_map();
|
|
_sharder = &_erm->get_sharder(*_table->schema());
|
|
_callback = _erm->get_validity_abort_source().subscribe([this] () noexcept {
|
|
refresh();
|
|
});
|
|
}
|
|
|
|
std::optional<unsigned> auto_refreshing_sharder::try_get_shard_for_reads(const token& t) const {
|
|
return _sharder->try_get_shard_for_reads(t);
|
|
}
|
|
|
|
dht::shard_replica_set
|
|
auto_refreshing_sharder::shard_for_writes(const token& t, std::optional<write_replica_set_selector> sel) const {
|
|
if (!sel) {
|
|
sel = _sel;
|
|
}
|
|
return _sharder->shard_for_writes(t, sel);
|
|
}
|
|
|
|
std::optional<dht::shard_and_token>
|
|
auto_refreshing_sharder::next_shard_for_reads(const dht::token& t) const {
|
|
return _sharder->next_shard_for_reads(t);
|
|
}
|
|
|
|
dht::token
|
|
auto_refreshing_sharder::token_for_next_shard_for_reads(const dht::token& t, shard_id shard, unsigned spans) const {
|
|
return _sharder->token_for_next_shard_for_reads(t, shard, spans);
|
|
}
|
|
|
|
double overlap_ratio(const dht::token_range& base, const dht::token_range& other) {
|
|
auto bound_range = [] (const token_range& tr) {
|
|
auto full_range = dht::token_range::make(first_token(), last_token());
|
|
return full_range.intersection(tr, token_comparator());
|
|
};
|
|
auto bounded_base = bound_range(base);
|
|
auto bounded_other = bound_range(other);
|
|
if (!bounded_base || !bounded_other) {
|
|
return 0.0;
|
|
}
|
|
|
|
// intersection of two bounded intervals should never yield an interval with unbounded range.
|
|
auto intersection = bounded_base->intersection(*bounded_other, token_comparator());
|
|
if (!intersection) {
|
|
return 0.0;
|
|
}
|
|
auto size_of_bounded_range = [] (const token_range& tr) {
|
|
// uses unbiased token (uint64_t) to avoid overflow when calculating size
|
|
return tr.end()->value().unbias() - tr.start()->value().unbias();
|
|
};
|
|
|
|
return double(size_of_bounded_range(*intersection)) / size_of_bounded_range(*bounded_base);
|
|
}
|
|
|
|
}
|
|
|
|
auto fmt::formatter<dht::ring_position_view>::format(const dht::ring_position_view& pos, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
auto out = ctx.out();
|
|
out = fmt::format_to(out, "{{{}", *pos._token);
|
|
if (pos._key) {
|
|
out = fmt::format_to(out, ", {}", *pos._key);
|
|
}
|
|
return fmt::format_to(out, ", w={}}}", static_cast<int>(pos._weight));
|
|
}
|
|
|
|
auto fmt::formatter<dht::ring_position>::format(const dht::ring_position& pos, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
auto out = ctx.out();
|
|
out = fmt::format_to(out, "{{{}", pos.token());
|
|
if (pos.has_key()) {
|
|
out = fmt::format_to(out, ", {}", *pos.key());
|
|
} else {
|
|
out = fmt::format_to(out, ", {}", (pos.relation_to_keys() < 0) ? "start" : "end");
|
|
}
|
|
return fmt::format_to(out, "}}");
|
|
}
|
|
|
|
auto fmt::formatter<dht::partition_ranges_view>::format(const dht::partition_ranges_view& v, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
auto out = fmt::format_to(ctx.out(), "{{");
|
|
for (auto& range : v) {
|
|
out = fmt::format_to(out, "{}", range);
|
|
}
|
|
return fmt::format_to(out, "}}");
|
|
}
|