Files
scylladb/dht/token.cc
Lakshmi Narayanan Sreethar 0c5fa8e154 locator/token_metadata.cc: use chunked_vector to store _sorted_tokens
The `token_metadata_impl` stores the sorted tokens in an `std::vector`.
With a large number of nodes, the size of this vector can grow quickly,
and updating it might lead to oversized allocations.

This commit changes `_sorted_tokens` to a `chunked_vector` to avoid such
issues. It also updates all related code to use `chunked_vector` instead
of `std::vector`.

Fixes #24876

Signed-off-by: Lakshmi Narayanan Sreethar <lakshmi.sreethar@scylladb.com>

Closes scylladb/scylladb#25027
2025-07-27 11:29:22 +03:00

261 lines
8.5 KiB
C++

/*
* Copyright (C) 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <algorithm>
#include <limits>
#include <ostream>
#include <random>
#include <ranges>
#include <boost/lexical_cast.hpp>
#include "dht/token.hh"
#include "dht/token-sharding.hh"
namespace dht {
using uint128_t = unsigned __int128;
inline int64_t long_token(const token& t) {
return t._data;
}
std::ostream& operator<<(std::ostream& out, const token& t) {
fmt::print(out, "{}", t);
return out;
}
sstring token::to_sstring() const {
return seastar::to_sstring<sstring>(long_token(*this));
}
token token::midpoint(const token& t1, const token& t2) {
uint64_t l1 = long_token(t1);
uint64_t l2 = long_token(t2);
int64_t mid = l1 + (l2 - l1)/2;
return token{kind::key, mid};
}
token token::get_random_token() {
static thread_local std::default_random_engine re{std::random_device{}()};
// std::numeric_limits<int64_t>::min() value is reserved and shouldn't
// be used for regular tokens.
static thread_local std::uniform_int_distribution<int64_t> dist(
std::numeric_limits<int64_t>::min() + 1);
return token(kind::key, dist(re));
}
token token::from_sstring(const sstring& t) {
auto lp = boost::lexical_cast<long>(t);
if (lp == std::numeric_limits<long>::min()) {
return minimum_token();
} else {
return token(kind::key, uint64_t(lp));
}
}
token token::from_bytes(bytes_view bytes) {
if (bytes.size() != sizeof(int64_t)) {
throw runtime_exception(format("Invalid token. Should have size {:d}, has size {:d}\n", sizeof(int64_t), bytes.size()));
}
auto tok = net::ntoh(read_unaligned<int64_t>(bytes.begin()));
if (tok == std::numeric_limits<int64_t>::min()) {
return minimum_token();
} else {
return dht::token(dht::token::kind::key, tok);
}
}
static float ratio_helper(int64_t a, int64_t b) {
uint64_t val = (a > b)? static_cast<uint64_t>(a) - static_cast<uint64_t>(b) : (static_cast<uint64_t>(a) - static_cast<uint64_t>(b) - 1);
return val/(float)std::numeric_limits<uint64_t>::max();
}
std::map<token, float>
token::describe_ownership(const utils::chunked_vector<token>& sorted_tokens) {
std::map<token, float> ownerships;
auto i = sorted_tokens.begin();
// 0-case
if (i == sorted_tokens.end()) {
throw runtime_exception("No nodes present in the cluster. Has this node finished starting up?");
}
// 1-case
if (sorted_tokens.size() == 1) {
ownerships[sorted_tokens[0]] = 1.0;
// n-case
} else {
const token& start = sorted_tokens[0];
int64_t ti = long_token(start); // The first token and its value
int64_t start_long = ti;
int64_t tim1 = ti; // The last token and its value (after loop)
for (i++; i != sorted_tokens.end(); i++) {
ti = long_token(*i); // The next token and its value
ownerships[*i]= ratio_helper(ti, tim1); // save (T(i) -> %age)
tim1 = ti;
}
// The start token's range extends backward to the last token, which is why both were saved above.
ownerships[start] = ratio_helper(start_long, ti);
}
return ownerships;
}
data_type
token::get_token_validator() {
return long_type;
}
inline
unsigned
zero_based_shard_of(uint64_t token, unsigned shards, unsigned sharding_ignore_msb_bits) {
// This is the master function, the inverses have to match it wrt. rounding errors.
token <<= sharding_ignore_msb_bits;
// Treat "token" as a fraction in the interval [0, 1); compute:
// shard = floor((0.token) * shards)
return (uint128_t(token) * shards) >> 64;
}
std::vector<uint64_t>
init_zero_based_shard_start(unsigned shards, unsigned sharding_ignore_msb_bits) {
// computes the inverse of zero_based_shard_of(). ret[s] will return the smallest token that belongs to s
if (shards == 1) {
// Avoid the while loops below getting confused finding the "edge" between two nonexistent shards
return std::vector<uint64_t>(1, uint64_t(0));
}
auto ret = std::vector<uint64_t>(shards);
for (auto s : std::views::iota(0u, shards)) {
uint64_t token = (uint128_t(s) << 64) / shards;
token >>= sharding_ignore_msb_bits; // leftmost bits are ignored by zero_based_shard_of
// token is the start of the next shard, and can be slightly before due to rounding errors; adjust
while (zero_based_shard_of(token, shards, sharding_ignore_msb_bits) != s) {
++token;
}
ret[s] = token;
}
return ret;
}
unsigned
shard_of(unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t) {
switch (t._kind) {
case token::kind::before_all_keys:
return token::shard_of_minimum_token();
case token::kind::after_all_keys:
return shard_count - 1;
case token::kind::key:
uint64_t adjusted = unbias(t);
return zero_based_shard_of(adjusted, shard_count, sharding_ignore_msb_bits);
}
abort();
}
token
token_for_next_shard(const std::vector<uint64_t>& shard_start, unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t, shard_id shard, unsigned spans) {
uint64_t n = 0;
switch (t._kind) {
case token::kind::before_all_keys:
break;
case token::kind::after_all_keys:
return maximum_token();
case token::kind::key:
n = unbias(t);
break;
}
auto s = zero_based_shard_of(n, shard_count, sharding_ignore_msb_bits);
if (!sharding_ignore_msb_bits) {
// This ought to be the same as the else branch, but avoids shifts by 64
n = shard_start[shard];
if (spans > 1 || shard <= s) {
return maximum_token();
}
} else {
auto left_part = n >> (64 - sharding_ignore_msb_bits);
left_part += spans - unsigned(shard > s);
if (left_part >= (1u << sharding_ignore_msb_bits)) {
return maximum_token();
}
left_part <<= (64 - sharding_ignore_msb_bits);
auto right_part = shard_start[shard];
n = left_part | right_part;
}
return bias(n);
}
int64_t token::to_int64(token t) {
return long_token(t);
}
dht::token token::from_int64(int64_t i) {
return {kind::key, i};
}
static
dht::token find_first_token_for_shard_in_not_wrap_around_range(const dht::static_sharder& sharder, dht::token start, dht::token end, size_t shard_idx) {
// Invariant start < end
// It is guaranteed that start is not MAX_INT64 because end is greater
auto t = dht::token::from_int64(dht::token::to_int64(start) + 1);
if (sharder.shard_of(t) != shard_idx) {
t = sharder.token_for_next_shard(t, shard_idx);
}
return std::min(t, end);
}
dht::token find_first_token_for_shard(
const dht::static_sharder& sharder, dht::token start, dht::token end, size_t shard_idx) {
if (start < end) { // Not a wrap around token range
return find_first_token_for_shard_in_not_wrap_around_range(sharder, start, end, shard_idx);
} else { // A wrap around token range
dht::token t;
if (dht::token::to_int64(start) != std::numeric_limits<int64_t>::max()) {
t = find_first_token_for_shard_in_not_wrap_around_range(sharder, start, dht::maximum_token(), shard_idx);
if (!t.is_maximum()) {
// This means we have found a token for shard shard_idx before 2^63
return t;
}
}
// No token owned by shard shard_idx was found in (start, 2^63 - 1]
// so we have to search in (-2^63, end]
return find_first_token_for_shard_in_not_wrap_around_range(
sharder, dht::minimum_token(), end, shard_idx);
}
}
size_t
compaction_group_of(unsigned most_significant_bits, const token& t) {
if (!most_significant_bits) {
return 0;
}
switch (t._kind) {
case token::kind::before_all_keys:
return 0;
case token::kind::after_all_keys:
return (1 << most_significant_bits) - 1;
case token::kind::key:
uint64_t adjusted = unbias(t);
return adjusted >> (64 - most_significant_bits);
}
__builtin_unreachable();
}
token last_token_of_compaction_group(unsigned most_significant_bits, size_t group) {
uint64_t n;
if (group == ((1ul << most_significant_bits) - 1)) {
n = std::numeric_limits<uint64_t>::max();
} else {
n = ((uint64_t(group) + 1) << (64 - most_significant_bits)) - 1;
}
return bias(n);
}
} // namespace dht