Merge "Change token representation to int64_t" from Piotr

"
After deprecating partitioners other than Murmur3 we can change the representation of tokens to int64_t. This will allow setting custom partitioner on each table. With this change partitioners become just converters from partition keys to tokens (int64_t). Following operations are no longer dependant on partitioner implementation:

 - Tokens comparison
 - Tokens serialization/deserialization to strings
 - Tokens serialization/deserialization to bytes
 - Sharding logic
 - Random token generation

This change will be followed by a PR that enables per table partitioner and then another PR that introduces a special partitioner for CDC tables.

Tests: unit(dev)

Results of memory footprint test:

Differences:

in cache: 992 vs 984
in memtable: 750 vs 742
sizeof(cache_entry) = 112 vs 104
-- sizeof(decorated_key) = 36 vs 32
MASTER:
mutation footprint:

in cache: 992
in memtable: 750
in sstable: 351
frozen: 540
canonical: 827
query result: 342
sizeof(cache_entry) = 112
-- sizeof(decorated_key) = 36
-- sizeof(cache_link_type) = 32
-- sizeof(mutation_partition) = 96
-- -- sizeof(_static_row) = 8
-- -- sizeof(_rows) = 24
-- -- sizeof(_row_tombstones) = 40

sizeof(rows_entry) = 232
sizeof(lru_link_type) = 16
sizeof(deletable_row) = 168
sizeof(row) = 112
sizeof(atomic_cell_or_collection) = 8

THIS PATCHSET:
mutation footprint:

in cache: 984
in memtable: 742
in sstable: 351
frozen: 540
canonical: 827
query result: 342
sizeof(cache_entry) = 104
-- sizeof(decorated_key) = 32
-- sizeof(cache_link_type) = 32
-- sizeof(mutation_partition) = 96
-- -- sizeof(_static_row) = 8
-- -- sizeof(_rows) = 24
-- -- sizeof(_row_tombstones) = 40

sizeof(rows_entry) = 232
sizeof(lru_link_type) = 16
sizeof(deletable_row) = 168
sizeof(row) = 112
sizeof(atomic_cell_or_collection) = 8
"

* 'fixed_token_representation' of https://github.com/haaawk/scylla: (21 commits)
  token: cast to int64_t not long in long_token
  murmur3: move sharding logic to token and i_partitioner
  partitioner: move shard_of_minimum_token to token
  partitioner: remove token_to_bytes
  partitioner: move get_token_validator to token
  partitioner: merge tri_compare into dht::tri_compare
  partitioner: move describe_ownership to token
  partitioner: move from_bytes to token
  partitioner: move from_string to token
  partitioner: move to_sstring to token
  partitioner: move get_random_token to token
  partitioner: move midpoint function to token
  token: remove token_view
  sstables: use copy constructor for tokens
  token: change _data to int64_t
  partitioner: remove hash_large_token
  token: change data to array<uint8_t, 8>
  partitioner: Extract token to separate .hh and .cc files
  partitioner: remove unused functions
  Revert "dht/murmur3_partitioner: take private methods out of the class"
  ...
This commit is contained in:
Avi Kivity
2020-02-05 12:21:02 +02:00
39 changed files with 607 additions and 757 deletions

View File

@@ -29,6 +29,7 @@
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "dht/i_partitioner.hh"
#include "dht/token-sharding.hh"
#include "locator/token_metadata.hh"
#include "gms/application_state.hh"
#include "gms/inet_address.hh"
@@ -177,7 +178,7 @@ topology_description generate_topology_description(
auto it = std::lower_bound(tokens.begin(), tokens.end(), token);
auto& entry = entries[it != tokens.end() ? std::distance(tokens.begin(), it) : 0];
auto shard_id = partitioner.shard_of(token, entry.streams.size(), entry.sharding_ignore_msb);
auto shard_id = dht::shard_of(entry.streams.size(), entry.sharding_ignore_msb, token);
assert(shard_id < entry.streams.size());
if (!entry.streams[shard_id].is_set()) {

View File

@@ -431,7 +431,7 @@ public:
// more details like tombstones/ttl? Probably not but keep in mind.
mutation transform(const mutation& m, const cql3::untyped_result_set* rs = nullptr) const {
auto ts = find_timestamp(*_schema, m);
auto stream_id = _ctx._cdc_metadata.get_stream(ts, m.token(), _ctx._partitioner);
auto stream_id = _ctx._cdc_metadata.get_stream(ts, m.token());
mutation res(_log_schema, stream_id.to_partition_key(*_log_schema));
auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts));

View File

@@ -19,7 +19,7 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dht/i_partitioner.hh"
#include "dht/token-sharding.hh"
#include "utils/exceptions.hh"
#include "cdc/generation.hh"
@@ -38,11 +38,10 @@ static api::timestamp_type to_ts(db_clock::time_point tp) {
static cdc::stream_id get_stream(
const cdc::token_range_description& entry,
dht::token tok,
const dht::i_partitioner& p) {
dht::token tok) {
// The ith stream is the stream for the ith shard.
auto shard_cnt = entry.streams.size();
auto shard_id = p.shard_of(tok, shard_cnt, entry.sharding_ignore_msb);
auto shard_id = dht::shard_of(shard_cnt, entry.sharding_ignore_msb, tok);
if (shard_id >= shard_cnt) {
on_internal_error(cdc_log, "get_stream: shard_id out of bounds");
@@ -53,8 +52,7 @@ static cdc::stream_id get_stream(
static cdc::stream_id get_stream(
const std::vector<cdc::token_range_description>& entries,
dht::token tok,
const dht::i_partitioner& p) {
dht::token tok) {
if (entries.empty()) {
on_internal_error(cdc_log, "get_stream: entries empty");
}
@@ -65,7 +63,7 @@ static cdc::stream_id get_stream(
it = entries.begin();
}
return get_stream(*it, tok, p);
return get_stream(*it, tok);
}
cdc::metadata::container_t::const_iterator cdc::metadata::gen_used_at(api::timestamp_type ts) const {
@@ -78,7 +76,7 @@ cdc::metadata::container_t::const_iterator cdc::metadata::gen_used_at(api::times
return std::prev(it);
}
cdc::stream_id cdc::metadata::get_stream(api::timestamp_type ts, dht::token tok, const dht::i_partitioner& p) {
cdc::stream_id cdc::metadata::get_stream(api::timestamp_type ts, dht::token tok) {
auto now = api::new_timestamp();
if (ts > now + generation_leeway.count()) {
throw exceptions::invalid_request_exception(format(
@@ -132,7 +130,7 @@ cdc::stream_id cdc::metadata::get_stream(api::timestamp_type ts, dht::token tok,
}
auto& gen = *it->second;
auto ret = ::get_stream(gen.entries(), tok, p);
auto ret = ::get_stream(gen.entries(), tok);
_last_stream_timestamp = ts;
return ret;
}

View File

@@ -28,7 +28,6 @@
namespace dht {
class token;
class i_partitioner;
}
namespace cdc {
@@ -66,7 +65,7 @@ public:
* yet know about. The amount of leeway (how much "into the future" we allow `ts` to be) is defined
* by the `cdc::generation_leeway` constant.
*/
stream_id get_stream(api::timestamp_type ts, dht::token tok, const dht::i_partitioner&);
stream_id get_stream(api::timestamp_type ts, dht::token tok);
/* Insert the generation given by `gen` with timestamp `ts` to be used by the `get_stream` function,
* if the generation is not already known or older than the currently known ones.

View File

@@ -680,6 +680,7 @@ scylla_core = (['database.cc',
'gms/application_state.cc',
'gms/inet_address.cc',
'dht/i_partitioner.cc',
'dht/token.cc',
'dht/murmur3_partitioner.cc',
'dht/boot_strapper.cc',
'dht/range_streamer.cc',

View File

@@ -56,7 +56,7 @@ private:
public:
token_fct(schema_ptr s)
: native_scalar_function("token",
dht::global_partitioner().get_token_validator(),
dht::token::get_token_validator(),
s->partition_key_type()->types())
, _schema(s) {
}
@@ -65,7 +65,7 @@ public:
auto key = partition_key::from_optional_exploded(*_schema, parameters);
auto tok = dht::global_partitioner().get_token(*_schema, key);
warn(unimplemented::cause::VALIDATION);
return dht::global_partitioner().token_to_bytes(tok);
return tok.data();
}
};

View File

@@ -94,7 +94,7 @@ public:
if (!buf) {
throw exceptions::invalid_request_exception("Invalid null token value");
}
auto tk = dht::global_partitioner().from_bytes(*buf);
auto tk = dht::token::from_bytes(*buf);
if (tk.is_minimum() && !is_start(b)) {
// The token was parsed as a minimum marker (token::kind::before_all_keys), but
// as it appears in the end bound position, it is actually the maximum marker

View File

@@ -861,7 +861,7 @@ static void append_base_key_to_index_ck(std::vector<bytes_view>& exploded_index_
exploded_index_ck.push_back(bytes_view(*indexed_column_value));
} else {
dht::i_partitioner& partitioner = dht::global_partitioner();
token_bytes = partitioner.token_to_bytes(partitioner.get_token(*_schema, last_base_pk));
token_bytes = partitioner.get_token(*_schema, last_base_pk).data();
exploded_index_ck.push_back(bytes_view(token_bytes));
append_base_key_to_index_ck<partition_key>(exploded_index_ck, last_base_pk, *cdef);
}
@@ -1040,7 +1040,7 @@ query::partition_slice indexed_table_select_statement::get_partition_slice_for_g
// Computed token column needs to be added to index view restrictions
const column_definition& token_cdef = *_view_schema->clustering_key_columns().begin();
auto base_pk = partition_key::from_optional_exploded(*_schema, _restrictions->get_partition_key_restrictions()->values(options));
bytes token_value = dht::global_partitioner().token_to_bytes(dht::global_partitioner().get_token(*_schema, base_pk));
bytes token_value = dht::global_partitioner().get_token(*_schema, base_pk).data();
auto token_restriction = ::make_shared<restrictions::single_column_restriction::EQ>(token_cdef, ::make_shared<cql3::constants::value>(cql3::raw_value::make_value(token_value)));
clustering_restrictions->merge_with(token_restriction);

View File

@@ -76,7 +76,7 @@ std::vector<::shared_ptr<cql3::column_specification>> cql3::token_relation::to_r
//auto* c = column_defs.front();
return {::make_shared<column_specification>(schema->ks_name(), schema->cf_name(),
::make_shared<column_identifier>("partition key token", true),
dht::global_partitioner().get_token_validator())};
dht::token::get_token_validator())};
}
::shared_ptr<cql3::restrictions::restriction> cql3::token_relation::new_EQ_restriction(

View File

@@ -169,7 +169,7 @@ static system_keyspace::range_estimates estimate(const column_family& cf, const
int64_t count{0};
utils::estimated_histogram hist{0};
auto from_bytes = [] (auto& b) {
return dht::global_partitioner().from_sstring(utf8_type->to_string(b));
return dht::token::from_sstring(utf8_type->to_string(b));
};
dht::token_range_vector ranges;
::compat::unwrap_into(
@@ -193,7 +193,7 @@ future<std::vector<token_range>> get_local_ranges() {
std::vector<token_range> local_ranges;
auto to_bytes = [](const std::optional<dht::token_range::bound>& b) {
assert(b);
return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value()));
return utf8_type->decompose(b->value().to_sstring());
};
// We merge the ranges to be compatible with how Cassandra shows it's size estimates table.
// All queries will be on that table, where all entries are text and there's no notion of

View File

@@ -230,49 +230,6 @@ static db::consistency_level quorum_if_many(size_t num_token_owners) {
return num_token_owners > 1 ? db::consistency_level::QUORUM : db::consistency_level::ONE;
}
// --------------- TODO: copy-pasted from murmur3_partitioner; remove this after haaawk's change is merged
static int64_t long_token(dht::token_view t) {
if (t.is_minimum() || t.is_maximum()) {
return std::numeric_limits<long>::min();
}
if (t._data.size() != sizeof(int64_t)) {
throw runtime_exception(format("Invalid token. Should have size {:d}, has size {:d}\n", sizeof(int64_t), t._data.size()));
}
auto ptr = t._data.begin();
auto lp = unaligned_cast<const int64_t *>(ptr);
return net::ntoh(*lp);
}
static int64_t normalize(int64_t in) {
return in == std::numeric_limits<int64_t>::lowest()
? std::numeric_limits<int64_t>::max()
: in;
}
static dht::token get_token(uint64_t value) {
auto t = net::hton(normalize(value));
bytes b(bytes::initialized_later(), 8);
std::copy_n(reinterpret_cast<int8_t*>(&t), 8, b.begin());
return dht::token{dht::token::kind::key, std::move(b)};
}
static dht::token token_from_string(const sstring& t) {
auto lp = boost::lexical_cast<long>(t);
if (lp == std::numeric_limits<long>::min()) {
return dht::minimum_token();
} else {
return get_token(uint64_t(lp));
}
}
static sstring string_from_token(const dht::token& t) {
return seastar::to_sstring<sstring>(long_token(t));
}
// ---------------
static list_type_impl::native_type prepare_cdc_generation_description(const cdc::topology_description& description) {
list_type_impl::native_type ret;
for (auto& e: description.entries()) {
@@ -282,7 +239,7 @@ static list_type_impl::native_type prepare_cdc_generation_description(const cdc:
}
ret.push_back(make_tuple_value(cdc_token_range_description_type,
{ data_value(string_from_token(e.token_range_end))
{ data_value(e.token_range_end.to_sstring())
, make_list_value(cdc_streams_list_type, std::move(streams))
, data_value(int8_t(e.sharding_ignore_msb))
}));
@@ -313,7 +270,7 @@ static cdc::token_range_description get_token_range_description_from_value(const
on_internal_error(cdc_log, "get_token_range_description_from_value: stream tuple type size != 3");
}
auto token = token_from_string(value_cast<sstring>(tup[0]));
auto token = dht::token::from_sstring(value_cast<sstring>(tup[0]));
auto streams = get_streams_from_list_value(tup[1]);
auto sharding_ignore_msb = uint8_t(value_cast<int8_t>(tup[2]));

View File

@@ -1510,7 +1510,7 @@ future<db_clock::time_point> get_truncated_at(utils::UUID cf_id) {
static set_type_impl::native_type prepare_tokens(const std::unordered_set<dht::token>& tokens) {
set_type_impl::native_type tset;
for (auto& t: tokens) {
tset.push_back(dht::global_partitioner().to_sstring(t));
tset.push_back(t.to_sstring());
}
return tset;
}
@@ -1519,8 +1519,8 @@ std::unordered_set<dht::token> decode_tokens(set_type_impl::native_type& tokens)
std::unordered_set<dht::token> tset;
for (auto& t: tokens) {
auto str = value_cast<sstring>(t);
assert(str == dht::global_partitioner().to_sstring(dht::global_partitioner().from_sstring(str)));
tset.insert(dht::global_partitioner().from_sstring(str));
assert(str == dht::token::from_sstring(str).to_sstring());
tset.insert(dht::token::from_sstring(str));
}
return tset;
}
@@ -2073,7 +2073,7 @@ future<> register_view_for_building(sstring ks_name, sstring view_name, const dh
std::move(view_name),
0,
int32_t(engine().cpu_id()),
dht::global_partitioner().to_sstring(token)).discard_result();
token.to_sstring()).discard_result();
}
future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) {
@@ -2083,7 +2083,7 @@ future<> update_view_build_progress(sstring ks_name, sstring view_name, const dh
std::move(req),
std::move(ks_name),
std::move(view_name),
dht::global_partitioner().to_sstring(token),
token.to_sstring(),
int32_t(engine().cpu_id())).discard_result();
}
@@ -2134,11 +2134,11 @@ future<std::vector<view_build_progress>> load_view_build_progress() {
for (auto& row : *cql_result) {
auto ks_name = row.get_as<sstring>("keyspace_name");
auto cf_name = row.get_as<sstring>("view_name");
auto first_token = dht::global_partitioner().from_sstring(row.get_as<sstring>("first_token"));
auto first_token = dht::token::from_sstring(row.get_as<sstring>("first_token"));
auto next_token_sstring = row.get_opt<sstring>("next_token");
std::optional<dht::token> next_token;
if (next_token_sstring) {
next_token = dht::global_partitioner().from_sstring(std::move(next_token_sstring).value());
next_token = dht::token::from_sstring(std::move(next_token_sstring).value());
}
auto cpu_id = row.get_as<int32_t>("cpu_id");
progress.emplace_back(view_build_progress{

View File

@@ -79,7 +79,7 @@ std::unordered_set<token> boot_strapper::get_bootstrap_tokens(token_metadata met
blogger.debug("tokens manually specified as {}", initial_tokens);
std::unordered_set<token> tokens;
for (auto& token_string : initial_tokens) {
auto token = dht::global_partitioner().from_sstring(token_string);
auto token = dht::token::from_sstring(token_string);
if (metadata.get_endpoint(token)) {
throw std::runtime_error(format("Bootstrapping to existing token {} is not allowed (decommission/removenode the old node first).", token_string));
}
@@ -106,7 +106,7 @@ std::unordered_set<token> boot_strapper::get_bootstrap_tokens(token_metadata met
std::unordered_set<token> boot_strapper::get_random_tokens(token_metadata metadata, size_t num_tokens) {
std::unordered_set<token> tokens;
while (tokens.size() < num_tokens) {
auto token = global_partitioner().get_random_token();
auto token = dht::token::get_random_token();
auto ep = metadata.get_endpoint(token);
if (!ep) {
tokens.emplace(token);

View File

@@ -22,6 +22,7 @@
#include "i_partitioner.hh"
#include <seastar/core/reactor.hh>
#include "dht/murmur3_partitioner.hh"
#include "dht/token-sharding.hh"
#include "utils/class_registrator.hh"
#include "types.hh"
#include "utils/murmur_hash.hh"
@@ -35,106 +36,24 @@
namespace dht {
static const token min_token{ token::kind::before_all_keys, {} };
static const token max_token{ token::kind::after_all_keys, {} };
i_partitioner::i_partitioner(unsigned shard_count, unsigned sharding_ignore_msb_bits)
: _shard_count(shard_count)
// if one shard, ignore sharding_ignore_msb_bits as they will just cause needless
// range breaks
, _sharding_ignore_msb_bits(shard_count > 1 ? sharding_ignore_msb_bits : 0)
, _shard_start(init_zero_based_shard_start(_shard_count, _sharding_ignore_msb_bits))
{}
const token&
minimum_token() {
return min_token;
}
const token&
maximum_token() {
return max_token;
}
// result + overflow bit
std::pair<bytes, bool>
add_bytes(bytes_view b1, bytes_view b2, bool carry = false) {
auto sz = std::max(b1.size(), b2.size());
auto expand = [sz] (bytes_view b) {
bytes ret(bytes::initialized_later(), sz);
auto bsz = b.size();
auto p = std::copy(b.begin(), b.end(), ret.begin());
std::fill_n(p, sz - bsz, 0);
return ret;
};
auto eb1 = expand(b1);
auto eb2 = expand(b2);
auto p1 = eb1.begin();
auto p2 = eb2.begin();
unsigned tmp = carry;
for (size_t idx = 0; idx < sz; ++idx) {
tmp += uint8_t(p1[sz - idx - 1]);
tmp += uint8_t(p2[sz - idx - 1]);
p1[sz - idx - 1] = tmp;
tmp >>= std::numeric_limits<uint8_t>::digits;
}
return { std::move(eb1), bool(tmp) };
}
bytes
shift_right(bool carry, bytes b) {
unsigned tmp = carry;
auto sz = b.size();
auto p = b.begin();
for (size_t i = 0; i < sz; ++i) {
auto lsb = p[i] & 1;
p[i] = (tmp << std::numeric_limits<uint8_t>::digits) | uint8_t(p[i]) >> 1;
tmp = lsb;
}
return b;
unsigned
i_partitioner::shard_of(const token& t) const {
return dht::shard_of(_shard_count, _sharding_ignore_msb_bits, t);
}
token
midpoint_unsigned_tokens(const token& t1, const token& t2) {
// calculate the average of the two tokens.
// before_all_keys is implicit 0, after_all_keys is implicit 1.
bool c1 = t1._kind == token::kind::after_all_keys;
bool c2 = t1._kind == token::kind::after_all_keys;
if (c1 && c2) {
// both end-of-range tokens?
return t1;
}
// we can ignore beginning-of-range, since their representation is 0.0
auto sum_carry = add_bytes(t1._data, t2._data);
auto& sum = sum_carry.first;
// if either was end-of-range, we added 0.0, so pretend we added 1.0 and
// and got a carry:
bool carry = sum_carry.second || c1 || c2;
auto avg = shift_right(carry, std::move(sum));
if (t1 > t2) {
// wrap around the ring. We really want (t1 + (t2 + 1.0)) / 2, so add 0.5.
// example: midpoint(0.9, 0.2) == midpoint(0.9, 1.2) == 1.05 == 0.05
// == (0.9 + 0.2) / 2 + 0.5 (mod 1)
if (avg.size() > 0) {
avg[0] ^= 0x80;
}
}
return token{token::kind::key, std::move(avg)};
i_partitioner::token_for_next_shard(const token& t, shard_id shard, unsigned spans) const {
return dht::token_for_next_shard(_shard_start, _shard_count, _sharding_ignore_msb_bits, t, shard, spans);
}
int tri_compare(token_view t1, token_view t2) {
if (t1._kind < t2._kind) {
return -1;
} else if (t1._kind > t2._kind) {
return 1;
} else if (t1._kind == token_kind::key) {
return global_partitioner().tri_compare(t1, t2);
}
return 0;
}
std::ostream& operator<<(std::ostream& out, const token& t) {
if (t._kind == token::kind::after_all_keys) {
out << "maximum token";
} else if (t._kind == token::kind::before_all_keys) {
out << "minimum token";
} else {
out << global_partitioner().to_sstring(t);
}
return out;
}
std::ostream& operator<<(std::ostream& out, const decorated_key& dk) {
return out << "{key: " << dk._key << ", token:" << dk._token << "}";
@@ -294,7 +213,7 @@ ring_position_range_sharder::next(const schema& s) {
if (_done) {
return {};
}
auto shard = _range.start() ? _partitioner.shard_of(_range.start()->value().token()) : _partitioner.shard_of_minimum_token();
auto shard = _range.start() ? _partitioner.shard_of(_range.start()->value().token()) : token::shard_of_minimum_token();
auto next_shard = shard + 1 < _partitioner.shard_count() ? shard + 1 : 0;
auto shard_boundary_token = _partitioner.token_for_next_shard(_range.start() ? _range.start()->value().token() : minimum_token(), next_shard);
auto shard_boundary = ring_position::starting_at(shard_boundary_token);
@@ -569,16 +488,3 @@ split_ranges_to_shards(const dht::token_range_vector& ranges, const schema& s) {
}
}
namespace std {
size_t
hash<dht::token>::hash_large_token(const managed_bytes& b) const {
auto read_bytes = boost::irange<size_t>(0, b.size())
| boost::adaptors::transformed([&b] (size_t idx) { return b[idx]; });
std::array<uint64_t, 2> result;
utils::murmur_hash::hash3_x64_128(read_bytes.begin(), b.size(), 0, result);
return result[0];
}
}

View File

@@ -50,6 +50,8 @@
#include <utility>
#include <vector>
#include <range.hh>
#include <byteswap.h>
#include "dht/token.hh"
namespace sstables {
@@ -70,7 +72,6 @@ namespace dht {
// into its users.
class decorated_key;
class token;
class ring_position;
using partition_range = nonwrapping_range<ring_position>;
@@ -79,89 +80,6 @@ using token_range = nonwrapping_range<token>;
using partition_range_vector = std::vector<partition_range>;
using token_range_vector = std::vector<token_range>;
enum class token_kind {
before_all_keys,
key,
after_all_keys,
};
class token_view {
public:
token_kind _kind;
bytes_view _data;
token_view(token_kind kind, bytes_view data) : _kind(kind), _data(data) {}
explicit token_view(const token& token);
bool is_minimum() const {
return _kind == token_kind::before_all_keys;
}
bool is_maximum() const {
return _kind == token_kind::after_all_keys;
}
};
class token {
public:
using kind = token_kind;
kind _kind;
// _data can be interpreted as a big endian binary fraction
// in the range [0.0, 1.0).
//
// So, [] == 0.0
// [0x00] == 0.0
// [0x80] == 0.5
// [0x00, 0x80] == 1/512
// [0xff, 0x80] == 1 - 1/512
managed_bytes _data;
token() : _kind(kind::before_all_keys) {
}
token(kind k, managed_bytes d) : _kind(std::move(k)), _data(std::move(d)) {
}
bool is_minimum() const {
return _kind == kind::before_all_keys;
}
bool is_maximum() const {
return _kind == kind::after_all_keys;
}
size_t external_memory_usage() const {
return _data.external_memory_usage();
}
size_t memory_usage() const {
return sizeof(token) + external_memory_usage();
}
explicit token(token_view v) : _kind(v._kind), _data(v._data) {}
operator token_view() const {
return token_view(*this);
}
};
inline token_view::token_view(const token& token) : _kind(token._kind), _data(bytes_view(token._data)) {}
token midpoint_unsigned(const token& t1, const token& t2);
const token& minimum_token();
const token& maximum_token();
int tri_compare(token_view t1, token_view t2);
inline bool operator==(token_view t1, token_view t2) { return tri_compare(t1, t2) == 0; }
inline bool operator<(token_view t1, token_view t2) { return tri_compare(t1, t2) < 0; }
inline bool operator!=(const token& t1, const token& t2) { return std::rel_ops::operator!=(t1, t2); }
inline bool operator>(const token& t1, const token& t2) { return std::rel_ops::operator>(t1, t2); }
inline bool operator<=(const token& t1, const token& t2) { return std::rel_ops::operator<=(t1, t2); }
inline bool operator>=(const token& t1, const token& t2) { return std::rel_ops::operator>=(t1, t2); }
std::ostream& operator<<(std::ostream& out, const token& t);
template <typename T>
inline auto get_random_number() {
static thread_local std::default_random_engine re{std::random_device{}()};
@@ -232,8 +150,10 @@ using decorated_key_opt = std::optional<decorated_key>;
class i_partitioner {
protected:
unsigned _shard_count;
unsigned _sharding_ignore_msb_bits;
std::vector<uint64_t> _shard_start;
public:
explicit i_partitioner(unsigned shard_count) : _shard_count(shard_count) {}
i_partitioner(unsigned shard_count = smp::count, unsigned sharding_ignore_msb_bits = 0);
virtual ~i_partitioner() {}
/**
@@ -257,22 +177,6 @@ public:
return { std::move(token), std::move(key) };
}
/**
* Calculate a token representing the approximate "middle" of the given
* range.
*
* @return The approximate midpoint between left and right.
*/
virtual token midpoint(const token& left, const token& right) const = 0;
/**
* @return A token smaller than all others in the range that is being partitioned.
* Not legal to assign to a node or key. (But legal to use in range scans.)
*/
token get_minimum_token() {
return dht::minimum_token();
}
/**
* @return a token that can be used to route a given key
* (This is NOT a method to create a token from its string representation;
@@ -281,27 +185,6 @@ public:
virtual token get_token(const schema& s, partition_key_view key) const = 0;
virtual token get_token(const sstables::key_view& key) const = 0;
/**
* @return a partitioner-specific string representation of this token
*/
virtual sstring to_sstring(const dht::token& t) const = 0;
/**
* @return a token from its partitioner-specific string representation
*/
virtual dht::token from_sstring(const sstring& t) const = 0;
/**
* @return a token from its partitioner-specific byte representation
*/
virtual dht::token from_bytes(bytes_view bytes) const = 0;
/**
* @return a randomly generated token
*/
virtual token get_random_token() = 0;
// FIXME: token.tokenFactory
//virtual token.tokenFactory gettokenFactory() = 0;
@@ -311,17 +194,6 @@ public:
*/
virtual bool preserves_order() = 0;
/**
* Calculate the deltas between tokens in the ring in order to compare
* relative sizes.
*
* @param sortedtokens a sorted List of tokens
* @return the mapping from 'token' to 'percentage of the ring owned by that token'.
*/
virtual std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens) = 0;
virtual data_type get_token_validator() = 0;
/**
* @return name of partitioner.
*/
@@ -330,12 +202,7 @@ public:
/**
* Calculates the shard that handles a particular token.
*/
virtual unsigned shard_of(const token& t) const = 0;
/**
* Calculates the shard that handles a particular token using custom shard_count and sharding_ignore_msb.
*/
virtual unsigned shard_of(const token& t, unsigned shard_count, unsigned sharding_ignore_msb) const = 0;
virtual unsigned shard_of(const token& t) const;
/**
* Gets the first token greater than `t` that is in shard `shard`, and is a shard boundary (its first token).
@@ -348,38 +215,7 @@ public:
*
* On overflow, maximum_token() is returned.
*/
virtual token token_for_next_shard(const token& t, shard_id shard, unsigned spans = 1) const = 0;
/**
* Gets the first shard of the minimum token.
*/
unsigned shard_of_minimum_token() const {
return 0; // hardcoded for now; unlikely to change
}
/**
* @return bytes that represent the token as required by get_token_validator().
*/
virtual bytes token_to_bytes(const token& t) const {
return bytes(t._data.begin(), t._data.end());
}
/**
* @return < 0 if if t1's _data array is less, t2's. 0 if they are equal, and > 0 otherwise. _kind comparison should be done separately.
*/
virtual int tri_compare(token_view t1, token_view t2) const = 0;
/**
* @return true if t1's _data array is equal t2's. _kind comparison should be done separately.
*/
bool is_equal(token_view t1, token_view t2) const {
return tri_compare(t1, t2) == 0;
}
/**
* @return true if t1's _data array is less then t2's. _kind comparison should be done separately.
*/
bool is_less(token_view t1, token_view t2) const {
return tri_compare(t1, t2) < 0;
}
virtual token token_for_next_shard(const token& t, shard_id shard, unsigned spans = 1) const;
/**
* @return number of shards configured for this partitioner
@@ -392,13 +228,9 @@ public:
return "biased-token-round-robin";
}
virtual unsigned sharding_ignore_msb() const {
return 0;
unsigned sharding_ignore_msb() const {
return _sharding_ignore_msb_bits;
}
friend bool operator==(token_view t1, token_view t2);
friend bool operator<(token_view t1, token_view t2);
friend int tri_compare(token_view t1, token_view t2);
};
//
@@ -955,14 +787,11 @@ namespace std {
template<>
struct hash<dht::token> {
size_t operator()(const dht::token& t) const {
const auto& b = t._data;
if (b.size() == sizeof(size_t)) { // practically always
return read_le<size_t>(reinterpret_cast<const char*>(b.data()));
}
return hash_large_token(b);
// We have to reverse the bytes here to keep compatibility with
// the behaviour that was here when tokens were represented as
// sequence of bytes.
return bswap_64(t._data);
}
private:
size_t hash_large_token(const managed_bytes& b) const;
};
template <>

View File

@@ -26,127 +26,26 @@
#include <boost/lexical_cast.hpp>
#include <boost/range/irange.hpp>
using uint128_t = unsigned __int128;
namespace dht {
static inline
unsigned
zero_based_shard_of(uint64_t token, unsigned shards, unsigned sharding_ignore_msb_bits) {
// This is the master function, the inverses have to match it wrt. rounding errors.
token <<= sharding_ignore_msb_bits;
// Treat "token" as a fraction in the interval [0, 1); compute:
// shard = floor((0.token) * shards)
return (uint128_t(token) * shards) >> 64;
}
static
std::vector<uint64_t>
init_zero_based_shard_start(unsigned shards, unsigned sharding_ignore_msb_bits) {
// computes the inverse of zero_based_shard_of(). ret[s] will return the smallest token that belongs to s
if (shards == 1) {
// Avoid the while loops below getting confused finding the "edge" between two nonexistent shards
return std::vector<uint64_t>(1, uint64_t(0));
}
auto ret = std::vector<uint64_t>(shards);
for (auto s : boost::irange<unsigned>(0, shards)) {
uint64_t token = (uint128_t(s) << 64) / shards;
token >>= sharding_ignore_msb_bits; // leftmost bits are ignored by zero_based_shard_of
// token is the start of the next shard, and can be slightly before due to rounding errors; adjust
while (zero_based_shard_of(token, shards, sharding_ignore_msb_bits) != s) {
++token;
}
ret[s] = token;
}
return ret;
}
static inline
int64_t
normalize(int64_t in) {
return in == std::numeric_limits<int64_t>::lowest()
? std::numeric_limits<int64_t>::max()
: in;
}
static
dht::token
get_token(uint64_t value) {
// We don't normalize() the value, since token includes an is-before-everything
// indicator.
// FIXME: will this require a repair when importing a database?
auto t = net::hton(normalize(value));
bytes b(bytes::initialized_later(), 8);
std::copy_n(reinterpret_cast<int8_t*>(&t), 8, b.begin());
return dht::token{dht::token::kind::key, std::move(b)};
}
static
dht::token
get_token(bytes_view key) {
token
murmur3_partitioner::get_token(bytes_view key) const {
if (key.empty()) {
return dht::minimum_token();
return minimum_token();
}
std::array<uint64_t, 2> hash;
utils::murmur_hash::hash3_x64_128(key, 0, hash);
return get_token(hash[0]);
}
static inline
int64_t long_token(dht::token_view t) {
if (t.is_minimum() || t.is_maximum()) {
return std::numeric_limits<long>::min();
}
if (t._data.size() != sizeof(int64_t)) {
throw runtime_exception(format("Invalid token. Should have size {:d}, has size {:d}\n", sizeof(int64_t), t._data.size()));
}
auto ptr = t._data.begin();
auto lp = unaligned_cast<const int64_t *>(ptr);
return net::ntoh(*lp);
token
murmur3_partitioner::get_token(uint64_t value) const {
return token(token::kind::key, value);
}
// translate to a zero-based range
static inline
uint64_t
unbias(const dht::token& t) {
return uint64_t(long_token(t)) + uint64_t(std::numeric_limits<int64_t>::min());
}
// translate from a zero-based range
static inline
dht::token
bias(uint64_t n) {
return get_token(n - uint64_t(std::numeric_limits<int64_t>::min()));
}
static inline
unsigned
shard_of(const dht::token& t, unsigned shard_count, unsigned sharding_ignore_msb) {
switch (t._kind) {
case dht::token::kind::before_all_keys:
return 0;
case dht::token::kind::after_all_keys:
return shard_count - 1;
case dht::token::kind::key:
uint64_t adjusted = unbias(t);
return zero_based_shard_of(adjusted, shard_count, sharding_ignore_msb);
}
abort();
}
namespace dht {
murmur3_partitioner::murmur3_partitioner(unsigned shard_count, unsigned sharding_ignore_msb_bits)
: i_partitioner(shard_count)
// if one shard, ignore sharding_ignore_msb_bits as they will just cause needless
// range breaks
, _sharding_ignore_msb_bits(shard_count > 1 ? sharding_ignore_msb_bits : 0)
, _shard_start(init_zero_based_shard_start(_shard_count, _sharding_ignore_msb_bits))
{ }
token
murmur3_partitioner::get_token(const sstables::key_view& key) const {
return ::get_token(bytes_view(key));
return get_token(bytes_view(key));
}
token
@@ -154,177 +53,9 @@ murmur3_partitioner::get_token(const schema& s, partition_key_view key) const {
std::array<uint64_t, 2> hash;
auto&& legacy = key.legacy_form(s);
utils::murmur_hash::hash3_x64_128(legacy.begin(), legacy.size(), 0, hash);
return ::get_token(hash[0]);
return get_token(hash[0]);
}
token murmur3_partitioner::get_random_token() {
auto rand = dht::get_random_number<uint64_t>();
return ::get_token(rand);
}
sstring murmur3_partitioner::to_sstring(const token& t) const {
return seastar::to_sstring<sstring>(long_token(t));
}
dht::token murmur3_partitioner::from_sstring(const sstring& t) const {
auto lp = boost::lexical_cast<long>(t);
if (lp == std::numeric_limits<long>::min()) {
return minimum_token();
} else {
return ::get_token(uint64_t(lp));
}
}
dht::token murmur3_partitioner::from_bytes(bytes_view bytes) const {
if (bytes.size() != sizeof(int64_t)) {
throw runtime_exception(format("Invalid token. Should have size {:d}, has size {:d}\n", sizeof(int64_t), bytes.size()));
}
int64_t v;
std::copy_n(bytes.begin(), sizeof(v), reinterpret_cast<int8_t *>(&v));
auto tok = net::ntoh(v);
if (tok == std::numeric_limits<int64_t>::min()) {
return minimum_token();
} else {
return dht::token(dht::token::kind::key, bytes);
}
}
int murmur3_partitioner::tri_compare(token_view t1, token_view t2) const {
auto l1 = long_token(t1);
auto l2 = long_token(t2);
if (l1 == l2) {
return 0;
} else {
return l1 < l2 ? -1 : 1;
}
}
// Assuming that x>=y, return the positive difference x-y.
// The return type is an unsigned type, as the difference may overflow
// a signed type (e.g., consider very positive x and very negative y).
template <typename T>
static std::make_unsigned_t<T> positive_subtract(T x, T y) {
return std::make_unsigned_t<T>(x) - std::make_unsigned_t<T>(y);
}
token murmur3_partitioner::midpoint(const token& t1, const token& t2) const {
auto l1 = long_token(t1);
auto l2 = long_token(t2);
int64_t mid;
if (l1 <= l2) {
// To find the midpoint, we cannot use the trivial formula (l1+l2)/2
// because the addition can overflow the integer. To avoid this
// overflow, we first notice that the above formula is equivalent to
// l1 + (l2-l1)/2. Now, "l2-l1" can still overflow a signed integer
// (e.g., think of a very positive l2 and very negative l1), but
// because l1 <= l2 in this branch, we note that l2-l1 is positive
// and fits an *unsigned* int's range. So,
mid = l1 + positive_subtract(l2, l1)/2;
} else {
// When l2 < l1, we need to switch l1 and and l2 in the above
// formula, because now l1 - l2 is positive.
// Additionally, we consider this case is a "wrap around", so we need
// to behave as if l2 + 2^64 was meant instead of l2, i.e., add 2^63
// to the average.
mid = l2 + positive_subtract(l1, l2)/2 + 0x8000'0000'0000'0000;
}
return ::get_token(mid);
}
static float ratio_helper(int64_t a, int64_t b) {
uint64_t val = (a > b)? static_cast<uint64_t>(a) - static_cast<uint64_t>(b) : (static_cast<uint64_t>(a) - static_cast<uint64_t>(b) - 1);
return val/(float)std::numeric_limits<uint64_t>::max();
}
std::map<token, float>
murmur3_partitioner::describe_ownership(const std::vector<token>& sorted_tokens) {
std::map<token, float> ownerships;
auto i = sorted_tokens.begin();
// 0-case
if (i == sorted_tokens.end()) {
throw runtime_exception("No nodes present in the cluster. Has this node finished starting up?");
}
// 1-case
if (sorted_tokens.size() == 1) {
ownerships[sorted_tokens[0]] = 1.0;
// n-case
} else {
const token& start = sorted_tokens[0];
int64_t ti = long_token(start); // The first token and its value
int64_t start_long = ti;
int64_t tim1 = ti; // The last token and its value (after loop)
for (i++; i != sorted_tokens.end(); i++) {
ti = long_token(*i); // The next token and its value
ownerships[*i]= ratio_helper(ti, tim1); // save (T(i) -> %age)
tim1 = ti;
}
// The start token's range extends backward to the last token, which is why both were saved above.
ownerships[start] = ratio_helper(start_long, ti);
}
return ownerships;
}
data_type
murmur3_partitioner::get_token_validator() {
return long_type;
}
unsigned
murmur3_partitioner::shard_of(const token& t) const {
return ::shard_of(t, _shard_count, _sharding_ignore_msb_bits);
}
unsigned
murmur3_partitioner::shard_of(const token& t, unsigned shard_count, unsigned sharding_ignore_msb) const {
return ::shard_of(t, shard_count, sharding_ignore_msb);
}
token
murmur3_partitioner::token_for_next_shard(const token& t, shard_id shard, unsigned spans) const {
uint64_t n = 0;
switch (t._kind) {
case token::kind::before_all_keys:
break;
case token::kind::after_all_keys:
return maximum_token();
case token::kind::key:
n = unbias(t);
break;
}
auto s = zero_based_shard_of(n, _shard_count, _sharding_ignore_msb_bits);
if (!_sharding_ignore_msb_bits) {
// This ought to be the same as the else branch, but avoids shifts by 64
n = _shard_start[shard];
if (spans > 1 || shard <= s) {
return maximum_token();
}
} else {
auto left_part = n >> (64 - _sharding_ignore_msb_bits);
left_part += spans - unsigned(shard > s);
if (left_part >= (1u << _sharding_ignore_msb_bits)) {
return maximum_token();
}
left_part <<= (64 - _sharding_ignore_msb_bits);
auto right_part = _shard_start[shard];
n = left_part | right_part;
}
return bias(n);
}
unsigned
murmur3_partitioner::sharding_ignore_msb() const {
return _sharding_ignore_msb_bits;
}
using registry = class_registrator<i_partitioner, murmur3_partitioner, const unsigned&, const unsigned&>;
static registry registrator("org.apache.cassandra.dht.Murmur3Partitioner");
static registry registrator_short_name("Murmur3Partitioner");

View File

@@ -28,27 +28,16 @@
namespace dht {
class murmur3_partitioner final : public i_partitioner {
unsigned _sharding_ignore_msb_bits;
std::vector<uint64_t> _shard_start;
public:
murmur3_partitioner(unsigned shard_count = smp::count, unsigned sharding_ignore_msb_bits = 0);
murmur3_partitioner(unsigned shard_count = smp::count, unsigned sharding_ignore_msb_bits = 0)
: i_partitioner(shard_count, sharding_ignore_msb_bits) {}
virtual const sstring name() const { return "org.apache.cassandra.dht.Murmur3Partitioner"; }
virtual token get_token(const schema& s, partition_key_view key) const override;
virtual token get_token(const sstables::key_view& key) const override;
virtual token get_random_token() override;
virtual bool preserves_order() override { return false; }
virtual std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens) override;
virtual data_type get_token_validator() override;
virtual int tri_compare(token_view t1, token_view t2) const override;
virtual token midpoint(const token& t1, const token& t2) const override;
virtual sstring to_sstring(const dht::token& t) const override;
virtual dht::token from_sstring(const sstring& t) const override;
virtual dht::token from_bytes(bytes_view bytes) const override;
virtual unsigned shard_of(const token& t) const override;
virtual unsigned shard_of(const token& t, unsigned shard_count, unsigned sharding_ignore_msb) const override;
virtual token token_for_next_shard(const token& t, shard_id shard, unsigned spans) const override;
virtual unsigned sharding_ignore_msb() const override;
private:
token get_token(bytes_view key) const;
token get_token(uint64_t value) const;
};

34
dht/token-sharding.hh Normal file
View File

@@ -0,0 +1,34 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "dht/token.hh"
namespace dht {
std::vector<uint64_t> init_zero_based_shard_start(unsigned shards, unsigned sharding_ignore_msb_bits);
unsigned shard_of(unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t);
token token_for_next_shard(const std::vector<uint64_t>& shard_start, unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t, shard_id shard, unsigned spans);
} //namespace dht

276
dht/token.cc Normal file
View File

@@ -0,0 +1,276 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <algorithm>
#include <limits>
#include <ostream>
#include <utility>
#include "dht/token.hh"
#include "dht/token-sharding.hh"
#include "dht/i_partitioner.hh"
namespace dht {
using uint128_t = unsigned __int128;
inline int64_t long_token(const token& t) {
if (t.is_minimum() || t.is_maximum()) {
return std::numeric_limits<int64_t>::min();
}
return t._data;
}
static const token min_token{ token::kind::before_all_keys, 0 };
static const token max_token{ token::kind::after_all_keys, 0 };
const token&
minimum_token() {
return min_token;
}
const token&
maximum_token() {
return max_token;
}
int tri_compare(const token& t1, const token& t2) {
if (t1._kind < t2._kind) {
return -1;
} else if (t1._kind > t2._kind) {
return 1;
} else if (t1._kind == token_kind::key) {
auto l1 = long_token(t1);
auto l2 = long_token(t2);
if (l1 == l2) {
return 0;
} else {
return l1 < l2 ? -1 : 1;
}
}
return 0;
}
std::ostream& operator<<(std::ostream& out, const token& t) {
if (t._kind == token::kind::after_all_keys) {
out << "maximum token";
} else if (t._kind == token::kind::before_all_keys) {
out << "minimum token";
} else {
out << t.to_sstring();
}
return out;
}
sstring token::to_sstring() const {
return seastar::to_sstring<sstring>(long_token(*this));
}
// Assuming that x>=y, return the positive difference x-y.
// The return type is an unsigned type, as the difference may overflow
// a signed type (e.g., consider very positive x and very negative y).
template <typename T>
static std::make_unsigned_t<T> positive_subtract(T x, T y) {
return std::make_unsigned_t<T>(x) - std::make_unsigned_t<T>(y);
}
token token::midpoint(const token& t1, const token& t2) {
auto l1 = long_token(t1);
auto l2 = long_token(t2);
int64_t mid;
if (l1 <= l2) {
// To find the midpoint, we cannot use the trivial formula (l1+l2)/2
// because the addition can overflow the integer. To avoid this
// overflow, we first notice that the above formula is equivalent to
// l1 + (l2-l1)/2. Now, "l2-l1" can still overflow a signed integer
// (e.g., think of a very positive l2 and very negative l1), but
// because l1 <= l2 in this branch, we note that l2-l1 is positive
// and fits an *unsigned* int's range. So,
mid = l1 + positive_subtract(l2, l1)/2;
} else {
// When l2 < l1, we need to switch l1 and and l2 in the above
// formula, because now l1 - l2 is positive.
// Additionally, we consider this case is a "wrap around", so we need
// to behave as if l2 + 2^64 was meant instead of l2, i.e., add 2^63
// to the average.
mid = l2 + positive_subtract(l1, l2)/2 + 0x8000'0000'0000'0000;
}
return token{kind::key, mid};
}
token token::get_random_token() {
return {kind::key, dht::get_random_number<int64_t>()};
}
token token::from_sstring(const sstring& t) {
auto lp = boost::lexical_cast<long>(t);
if (lp == std::numeric_limits<long>::min()) {
return minimum_token();
} else {
return token(kind::key, uint64_t(lp));
}
}
token token::from_bytes(bytes_view bytes) {
if (bytes.size() != sizeof(int64_t)) {
throw runtime_exception(format("Invalid token. Should have size {:d}, has size {:d}\n", sizeof(int64_t), bytes.size()));
}
int64_t v;
std::copy_n(bytes.begin(), sizeof(v), reinterpret_cast<int8_t *>(&v));
auto tok = net::ntoh(v);
if (tok == std::numeric_limits<int64_t>::min()) {
return minimum_token();
} else {
return dht::token(dht::token::kind::key, tok);
}
}
static float ratio_helper(int64_t a, int64_t b) {
uint64_t val = (a > b)? static_cast<uint64_t>(a) - static_cast<uint64_t>(b) : (static_cast<uint64_t>(a) - static_cast<uint64_t>(b) - 1);
return val/(float)std::numeric_limits<uint64_t>::max();
}
std::map<token, float>
token::describe_ownership(const std::vector<token>& sorted_tokens) {
std::map<token, float> ownerships;
auto i = sorted_tokens.begin();
// 0-case
if (i == sorted_tokens.end()) {
throw runtime_exception("No nodes present in the cluster. Has this node finished starting up?");
}
// 1-case
if (sorted_tokens.size() == 1) {
ownerships[sorted_tokens[0]] = 1.0;
// n-case
} else {
const token& start = sorted_tokens[0];
int64_t ti = long_token(start); // The first token and its value
int64_t start_long = ti;
int64_t tim1 = ti; // The last token and its value (after loop)
for (i++; i != sorted_tokens.end(); i++) {
ti = long_token(*i); // The next token and its value
ownerships[*i]= ratio_helper(ti, tim1); // save (T(i) -> %age)
tim1 = ti;
}
// The start token's range extends backward to the last token, which is why both were saved above.
ownerships[start] = ratio_helper(start_long, ti);
}
return ownerships;
}
data_type
token::get_token_validator() {
return long_type;
}
static uint64_t unbias(const token& t) {
return uint64_t(long_token(t)) + uint64_t(std::numeric_limits<int64_t>::min());
}
static token bias(uint64_t n) {
return token(token::kind::key, n - uint64_t(std::numeric_limits<int64_t>::min()));
}
inline
unsigned
zero_based_shard_of(uint64_t token, unsigned shards, unsigned sharding_ignore_msb_bits) {
// This is the master function, the inverses have to match it wrt. rounding errors.
token <<= sharding_ignore_msb_bits;
// Treat "token" as a fraction in the interval [0, 1); compute:
// shard = floor((0.token) * shards)
return (uint128_t(token) * shards) >> 64;
}
std::vector<uint64_t>
init_zero_based_shard_start(unsigned shards, unsigned sharding_ignore_msb_bits) {
// computes the inverse of zero_based_shard_of(). ret[s] will return the smallest token that belongs to s
if (shards == 1) {
// Avoid the while loops below getting confused finding the "edge" between two nonexistent shards
return std::vector<uint64_t>(1, uint64_t(0));
}
auto ret = std::vector<uint64_t>(shards);
for (auto s : boost::irange<unsigned>(0, shards)) {
uint64_t token = (uint128_t(s) << 64) / shards;
token >>= sharding_ignore_msb_bits; // leftmost bits are ignored by zero_based_shard_of
// token is the start of the next shard, and can be slightly before due to rounding errors; adjust
while (zero_based_shard_of(token, shards, sharding_ignore_msb_bits) != s) {
++token;
}
ret[s] = token;
}
return ret;
}
unsigned
shard_of(unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t) {
switch (t._kind) {
case token::kind::before_all_keys:
return 0;
case token::kind::after_all_keys:
return shard_count - 1;
case token::kind::key:
uint64_t adjusted = unbias(t);
return zero_based_shard_of(adjusted, shard_count, sharding_ignore_msb_bits);
}
abort();
}
token
token_for_next_shard(const std::vector<uint64_t>& shard_start, unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t, shard_id shard, unsigned spans) {
uint64_t n = 0;
switch (t._kind) {
case token::kind::before_all_keys:
break;
case token::kind::after_all_keys:
return maximum_token();
case token::kind::key:
n = unbias(t);
break;
}
auto s = zero_based_shard_of(n, shard_count, sharding_ignore_msb_bits);
if (!sharding_ignore_msb_bits) {
// This ought to be the same as the else branch, but avoids shifts by 64
n = shard_start[shard];
if (spans > 1 || shard <= s) {
return maximum_token();
}
} else {
auto left_part = n >> (64 - sharding_ignore_msb_bits);
left_part += spans - unsigned(shard > s);
if (left_part >= (1u << sharding_ignore_msb_bits)) {
return maximum_token();
}
left_part <<= (64 - sharding_ignore_msb_bits);
auto right_part = shard_start[shard];
n = left_part | right_part;
}
return bias(n);
}
} // namespace dht

159
dht/token.hh Normal file
View File

@@ -0,0 +1,159 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "bytes.hh"
#include "utils/managed_bytes.hh"
#include "types.hh"
#include <seastar/net/byteorder.hh>
#include <fmt/format.h>
#include <array>
#include <functional>
#include <utility>
namespace dht {
class token;
enum class token_kind {
before_all_keys,
key,
after_all_keys,
};
class token {
static inline int64_t normalize(int64_t t) {
return t == std::numeric_limits<int64_t>::min() ? std::numeric_limits<int64_t>::max() : t;
}
public:
using kind = token_kind;
kind _kind;
int64_t _data;
token() : _kind(kind::before_all_keys) {
}
token(kind k, int64_t d)
: _kind(std::move(k))
, _data(normalize(d)) { }
token(kind k, const bytes& b) : _kind(std::move(k)) {
if (b.size() != sizeof(_data)) {
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
}
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
_data = net::ntoh(_data);
}
token(kind k, bytes_view b) : _kind(std::move(k)) {
if (b.size() != sizeof(_data)) {
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
}
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
_data = net::ntoh(_data);
}
bool is_minimum() const {
return _kind == kind::before_all_keys;
}
bool is_maximum() const {
return _kind == kind::after_all_keys;
}
size_t external_memory_usage() const {
return 0;
}
size_t memory_usage() const {
return sizeof(token);
}
bytes data() const {
auto t = net::hton(_data);
bytes b(bytes::initialized_later(), sizeof(_data));
std::copy_n(reinterpret_cast<int8_t*>(&t), sizeof(_data), b.begin());
return b;
}
/**
* @return a string representation of this token
*/
sstring to_sstring() const;
/**
* Calculate a token representing the approximate "middle" of the given
* range.
*
* @return The approximate midpoint between left and right.
*/
static token midpoint(const token& left, const token& right);
/**
* @return a randomly generated token
*/
static token get_random_token();
/**
* @return a token from string representation
*/
static dht::token from_sstring(const sstring& t);
/**
* @return a token from its byte representation
*/
static dht::token from_bytes(bytes_view bytes);
/**
* Calculate the deltas between tokens in the ring in order to compare
* relative sizes.
*
* @param sortedtokens a sorted List of tokens
* @return the mapping from 'token' to 'percentage of the ring owned by that token'.
*/
static std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens);
static data_type get_token_validator();
/**
* Gets the first shard of the minimum token.
*/
static unsigned shard_of_minimum_token() {
return 0; // hardcoded for now; unlikely to change
}
};
const token& minimum_token();
const token& maximum_token();
int tri_compare(const token& t1, const token& t2);
inline bool operator==(const token& t1, const token& t2) { return tri_compare(t1, t2) == 0; }
inline bool operator<(const token& t1, const token& t2) { return tri_compare(t1, t2) < 0; }
inline bool operator!=(const token& t1, const token& t2) { return std::rel_ops::operator!=(t1, t2); }
inline bool operator>(const token& t1, const token& t2) { return std::rel_ops::operator>(t1, t2); }
inline bool operator<=(const token& t1, const token& t2) { return std::rel_ops::operator<=(t1, t2); }
inline bool operator>=(const token& t1, const token& t2) { return std::rel_ops::operator>=(t1, t2); }
std::ostream& operator<<(std::ostream& out, const token& t);
} // namespace dht

View File

@@ -133,14 +133,14 @@ public:
public:
sstring make_full_token_string(const std::unordered_set<token>& tokens) {
return ::join(";", tokens | boost::adaptors::transformed([] (const token& t) {
return dht::global_partitioner().to_sstring(t); })
return t.to_sstring(); })
);
}
sstring make_token_string(const std::unordered_set<token>& tokens) {
if (tokens.empty()) {
return "";
}
return dht::global_partitioner().to_sstring(*tokens.begin());
return tokens.begin()->to_sstring();
}
sstring make_cdc_streams_timestamp_string(std::optional<db_clock::time_point> t) {

View File

@@ -6,7 +6,7 @@ class token {
after_all_keys,
};
dht::token::kind _kind;
bytes _data;
bytes data();
};
class decorated_key {

View File

@@ -57,7 +57,7 @@ public:
// The use of minimum_token() here twice is not a typo - because wrap-
// around token ranges are supported by midpoint(), the beyond-maximum
// token can also be represented by minimum_token().
auto midpoint = dht::global_partitioner().midpoint(
auto midpoint = dht::token::midpoint(
range.start() ? range.start()->value() : dht::minimum_token(),
range.end() ? range.end()->value() : dht::minimum_token());
// This shouldn't happen, but if the range included just one token, we

View File

@@ -1232,8 +1232,8 @@ private:
throw(std::runtime_error("range must have two components "
"separated by ':', got '" + range + "'"));
}
auto tok_start = dht::global_partitioner().from_sstring(token_strings[0]);
auto tok_end = dht::global_partitioner().from_sstring(token_strings[1]);
auto tok_start = dht::token::from_sstring(token_strings[0]);
auto tok_end = dht::token::from_sstring(token_strings[1]);
auto rng = wrapping_range<dht::token>(
::range<dht::token>::bound(tok_start, false),
::range<dht::token>::bound(tok_end, true));
@@ -1377,12 +1377,12 @@ static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
std::optional<::range<dht::token>::bound> tok_end;
if (!options.start_token.empty()) {
tok_start = ::range<dht::token>::bound(
dht::global_partitioner().from_sstring(options.start_token),
dht::token::from_sstring(options.start_token),
true);
}
if (!options.end_token.empty()) {
tok_end = ::range<dht::token>::bound(
dht::global_partitioner().from_sstring(options.end_token),
dht::token::from_sstring(options.end_token),
false);
}
dht::token_range given_range_complement(tok_end, tok_start);

View File

@@ -1457,7 +1457,7 @@ bytes token_column_computation::serialize() const {
bytes_opt token_column_computation::compute_value(const schema& schema, const partition_key& key, const clustering_row& row) const {
dht::i_partitioner& partitioner = dht::global_partitioner();
return partitioner.token_to_bytes(partitioner.get_token(schema, key));
return partitioner.get_token(schema, key).data();
}
bool operator==(const raw_view_info& x, const raw_view_info& y) {

View File

@@ -402,7 +402,7 @@ std::unordered_set<token> get_replace_tokens() {
}
tokens.erase("");
for (auto token_string : tokens) {
auto token = dht::global_partitioner().from_sstring(token_string);
auto token = dht::token::from_sstring(token_string);
ret.insert(token);
}
return ret;
@@ -804,7 +804,7 @@ void storage_service::join_token_ring(int delay) {
}
} else {
for (auto token_string : initial_tokens) {
auto token = dht::global_partitioner().from_sstring(token_string);
auto token = dht::token::from_sstring(token_string);
_bootstrap_tokens.insert(token);
}
slogger.info("Saved tokens not found. Using configuration value: {}", _bootstrap_tokens);
@@ -1645,7 +1645,7 @@ std::unordered_set<locator::token> storage_service::get_tokens_for(inet_address
std::unordered_set<token> ret;
boost::split(tokens, tokens_string, boost::is_any_of(";"));
for (auto str : tokens) {
auto t = dht::global_partitioner().from_sstring(str);
auto t = dht::token::from_sstring(str);
slogger.trace("endpoint={}, token_str={} token={}", endpoint, str, t);
ret.emplace(std::move(t));
}
@@ -2056,7 +2056,7 @@ storage_service::prepare_replacement_info(const std::unordered_map<gms::inet_add
future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
return run_with_no_api_lock([] (storage_service& ss) {
auto token_map = dht::global_partitioner().describe_ownership(ss._token_metadata.sorted_tokens());
auto token_map = dht::token::describe_ownership(ss._token_metadata.sorted_tokens());
// describeOwnership returns tokens in an unspecified order, let's re-order them
std::map<gms::inet_address, float> ownership;
for (auto entry : token_map) {
@@ -2092,7 +2092,7 @@ future<std::map<gms::inet_address, float>> storage_service::effective_ownership(
}
keyspace_name = "system_traces";
}
auto token_ownership = dht::global_partitioner().describe_ownership(ss._token_metadata.sorted_tokens());
auto token_ownership = dht::token::describe_ownership(ss._token_metadata.sorted_tokens());
std::map<gms::inet_address, float> final_ownership;
@@ -3341,10 +3341,10 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
auto addresses = entry.second;
token_range_endpoints tr;
if (range.start()) {
tr._start_token = dht::global_partitioner().to_sstring(range.start()->value());
tr._start_token = range.start()->value().to_sstring();
}
if (range.end()) {
tr._end_token = dht::global_partitioner().to_sstring(range.end()->value());
tr._end_token = range.end()->value().to_sstring();
}
for (auto endpoint : addresses) {
endpoint_details details;

View File

@@ -1894,7 +1894,7 @@ private:
public:
future<> move(sstring new_token) {
// FIXME: getPartitioner().getTokenFactory().validate(newToken);
return move(dht::global_partitioner().from_sstring(new_token));
return move(dht::token::from_sstring(new_token));
}
private:

View File

@@ -66,6 +66,7 @@
#include "mutation_compactor.hh"
#include "leveled_manifest.hh"
#include "utils/observable.hh"
#include "dht/token.hh"
namespace sstables {

View File

@@ -144,13 +144,13 @@ inline key maximum_key() {
};
class decorated_key_view {
dht::token_view _token;
dht::token _token;
key_view _partition_key;
public:
decorated_key_view(dht::token_view token, key_view partition_key) noexcept
decorated_key_view(dht::token token, key_view partition_key) noexcept
: _token(token), _partition_key(partition_key) { }
dht::token_view token() const {
dht::token token() const {
return _token;
}

View File

@@ -572,8 +572,8 @@ future<> parse(sstable_version_types v, random_access_reader& in, summary& s) {
// position is little-endian encoded
auto position = seastar::read_le<uint64_t>(buf.get());
auto token = dht::global_partitioner().get_token(key_view(key_data));
auto token_data = s.add_summary_data(bytes_view(token._data));
s.entries.push_back({ dht::token_view(dht::token::kind::key, token_data), key_data, position });
s.add_summary_data(token.data());
s.entries.push_back({ token, key_data, position });
return make_ready_future<>();
});
});
@@ -1886,8 +1886,8 @@ create_sharding_metadata(schema_ptr schema, const dht::decorated_key& first_key,
auto&& right_token = right.token();
auto right_exclusive = !right.has_key() && right.bound() == dht::ring_position::token_bound::start;
sm.token_ranges.elements.push_back(disk_token_range{
{left_exclusive, to_bytes(bytes_view(left_token._data))},
{right_exclusive, to_bytes(bytes_view(right_token._data))}});
{left_exclusive, left_token.data()},
{right_exclusive, right_token.data()}});
}
}
return sm;
@@ -1969,9 +1969,9 @@ void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key
if (data_offset >= state.next_data_offset_to_write_summary) {
auto entry_size = 8 + 2 + key.size(); // offset + key_size.size + key.size
state.next_data_offset_to_write_summary += state.summary_byte_cost * entry_size;
auto token_data = s.add_summary_data(bytes_view(token._data));
s.add_summary_data(token.data());
auto key_data = s.add_summary_data(key);
s.entries.push_back({ dht::token_view(dht::token::kind::key, token_data), key_data, index_offset });
s.entries.push_back({ token, key_data, index_offset });
}
}
@@ -3207,8 +3207,8 @@ sstable::compute_shards_for_this_sstable() const {
dht::ring_position::ending_at(get_last_decorated_key().token())));
} else {
auto disk_token_range_to_ring_position_range = [] (const disk_token_range& dtr) {
auto t1 = dht::token(dht::token::kind::key, managed_bytes(bytes_view(dtr.left.token)));
auto t2 = dht::token(dht::token::kind::key, managed_bytes(bytes_view(dtr.right.token)));
auto t1 = dht::token(dht::token::kind::key, bytes_view(dtr.left.token));
auto t2 = dht::token(dht::token::kind::key, bytes_view(dtr.right.token));
return dht::partition_range::make(
(dtr.left.exclusive ? dht::ring_position::ending_at : dht::ring_position::starting_at)(std::move(t1)),
(dtr.right.exclusive ? dht::ring_position::starting_at : dht::ring_position::ending_at)(std::move(t2)));

View File

@@ -129,7 +129,7 @@ inline std::ostream& operator<<(std::ostream& o, indexable_element e) {
class summary_entry {
public:
dht::token_view token;
dht::token token;
bytes_view key;
uint64_t position;

View File

@@ -1966,31 +1966,18 @@ public:
, _tokens(boost::copy_range<std::vector<dht::token>>(something_by_token | boost::adaptors::map_keys)) {
}
virtual dht::token midpoint(const dht::token& left, const dht::token& right) const override { return _partitioner.midpoint(left, right); }
virtual dht::token get_token(const schema& s, partition_key_view key) const override { return _partitioner.get_token(s, key); }
virtual dht::token get_token(const sstables::key_view& key) const override { return _partitioner.get_token(key); }
virtual sstring to_sstring(const dht::token& t) const override { return _partitioner.to_sstring(t); }
virtual dht::token from_sstring(const sstring& t) const override { return _partitioner.from_sstring(t); }
virtual dht::token from_bytes(bytes_view bytes) const override { return _partitioner.from_bytes(bytes); }
virtual dht::token get_random_token() override { return _partitioner.get_random_token(); }
virtual bool preserves_order() override { return _partitioner.preserves_order(); }
virtual std::map<dht::token, float> describe_ownership(const std::vector<dht::token>& sorted_tokens) override { return _partitioner.describe_ownership(sorted_tokens); }
virtual data_type get_token_validator() override { return _partitioner.get_token_validator(); }
virtual const sstring name() const override { return _partitioner.name(); }
virtual unsigned shard_of(const dht::token& t) const override;
virtual unsigned shard_of(const dht::token& t, unsigned shard_count, unsigned sharding_ignore_msb) const override;
virtual dht::token token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans = 1) const override;
virtual int tri_compare(dht::token_view t1, dht::token_view t2) const override { return _partitioner.tri_compare(t1, t2); }
};
unsigned dummy_partitioner::shard_of(const dht::token& t) const {
return shard_of(t, _partitioner.shard_count(), 0);
}
unsigned dummy_partitioner::shard_of(const dht::token& t, unsigned shard_count, unsigned sharding_ignore_msb) const {
auto it = boost::find(_tokens, t);
// Unknown tokens are assigned to shard 0
return it == _tokens.end() ? 0 : std::distance(_tokens.begin(), it) % shard_count;
return it == _tokens.end() ? 0 : std::distance(_tokens.begin(), it) % _partitioner.shard_count();
}
dht::token dummy_partitioner::token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans) const {

View File

@@ -133,11 +133,8 @@ void endpoints_check(
}
}
auto d2t = [](double d) {
unsigned long l = net::hton(static_cast<unsigned long>(d*(std::numeric_limits<unsigned long>::max())));
std::array<char, 8> a;
memcpy(a.data(), &l, 8);
return a;
auto d2t = [](double d) -> int64_t {
return static_cast<unsigned long>(d*(std::numeric_limits<unsigned long>::max()));
};
/**
@@ -154,8 +151,7 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
for (auto& rp : ring_points) {
double cur_point1 = rp.point - 0.5;
token t1{dht::token::kind::key,
{(int8_t*)d2t(cur_point1 / ring_points.size()).data(), 8}};
token t1(dht::token::kind::key, d2t(cur_point1 / ring_points.size()));
uint64_t cache_hit_count = ars_ptr->get_cache_hits_count();
auto endpoints1 = ars_ptr->get_natural_endpoints(t1);
@@ -172,8 +168,7 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
//
cache_hit_count = ars_ptr->get_cache_hits_count();
double cur_point2 = rp.point - 0.2;
token t2{dht::token::kind::key,
{(int8_t*)d2t(cur_point2 / ring_points.size()).data(), 8}};
token t2(dht::token::kind::key, d2t(cur_point2 / ring_points.size()));
auto endpoints2 = ars_ptr->get_natural_endpoints(t2);
endpoints_check(ars_ptr, endpoints2);
@@ -208,9 +203,7 @@ future<> simple_test() {
// Initialize the token_metadata
for (unsigned i = 0; i < ring_points.size(); i++) {
tm->update_normal_token(
{dht::token::kind::key,
{(int8_t*)d2t(ring_points[i].point / ring_points.size()).data(), 8}
},
{dht::token::kind::key, d2t(ring_points[i].point / ring_points.size())},
ring_points[i].host);
}
@@ -300,8 +293,7 @@ future<> heavy_origin_test() {
ring_point rp = {token_point, address};
ring_points.emplace_back(rp);
tokens[address].emplace(token{dht::token::kind::key,
{(int8_t*)d2t(token_point / total_eps).data(), 8}});
tokens[address].emplace(token{dht::token::kind::key, d2t(token_point / total_eps)});
nlogger.debug("adding node {} at {}", address, token_point);
@@ -477,7 +469,7 @@ static std::vector<inet_address> calculate_natural_endpoints(
return std::move(replicas.get_vector());
}
static void test_equivalence(token_metadata& tm, snitch_ptr& snitch, dht::murmur3_partitioner& partitioner, const std::unordered_map<sstring, size_t>& datacenters) {
static void test_equivalence(token_metadata& tm, snitch_ptr& snitch, const std::unordered_map<sstring, size_t>& datacenters) {
class my_network_topology_strategy : public network_topology_strategy {
public:
using network_topology_strategy::network_topology_strategy;
@@ -493,7 +485,7 @@ static void test_equivalence(token_metadata& tm, snitch_ptr& snitch, dht::murmur
})));
for (size_t i = 0; i < 1000; ++i) {
auto token = partitioner.get_random_token();
auto token = dht::token::get_random_token();
auto expected = calculate_natural_endpoints(token, tm, snitch, datacenters);
auto actual = nts.calculate_natural_endpoints(token, tm);
@@ -573,7 +565,6 @@ SEASTAR_TEST_CASE(testCalculateEndpoints) {
constexpr size_t VNODES = 64;
constexpr size_t RUNS = 10;
dht::murmur3_partitioner partitioner;
std::unordered_map<sstring, size_t> datacenters = {
{ "rf1", 1 },
{ "rf3", 3 },
@@ -597,10 +588,10 @@ SEASTAR_TEST_CASE(testCalculateEndpoints) {
for (auto& node : nodes) {
for (size_t i = 0; i < VNODES; ++i) {
tm.update_normal_token(partitioner.get_random_token(), node);
tm.update_normal_token(dht::token::get_random_token(), node);
}
}
test_equivalence(tm, snitch, partitioner, datacenters);
test_equivalence(tm, snitch, datacenters);
}
return i_endpoint_snitch::stop_snitch();

View File

@@ -47,7 +47,7 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) {
.with_column("v", bytes_type)
.build();
dht::token tok = dht::global_partitioner().get_random_token();
dht::token tok = dht::token::get_random_token();
auto key1 = dht::decorated_key{tok,
partition_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("key1"))))};
@@ -245,8 +245,8 @@ BOOST_AUTO_TEST_CASE(range_overlap_tests) {
auto get_item(std::string left, std::string right, std::string val) {
using value_type = std::unordered_set<std::string>;
auto l = dht::global_partitioner().from_sstring(left);
auto r = dht::global_partitioner().from_sstring(right);
auto l = dht::token::from_sstring(left);
auto r = dht::token::from_sstring(right);
auto rg = dht::token_range({{l, false}}, {r});
value_type v{val};
return std::make_pair(locator::token_metadata::range_to_interval(rg), v);
@@ -272,7 +272,7 @@ BOOST_AUTO_TEST_CASE(test_range_interval_map) {
}
auto search_item = [&mymap] (std::string val) {
auto tok = dht::global_partitioner().from_sstring(val);
auto tok = dht::token::from_sstring(val);
auto search = dht::token_range(tok);
auto it = mymap.find(locator::token_metadata::range_to_interval(search));
if (it != mymap.end()) {

View File

@@ -43,16 +43,11 @@ debug(Args&&... args) {
}
static dht::token token_from_long(uint64_t value) {
auto t = net::hton(value);
bytes b(bytes::initialized_later(), 8);
std::copy_n(reinterpret_cast<int8_t*>(&t), 8, b.begin());
return { dht::token::kind::key, std::move(b) };
return dht::token(dht::token::kind::key, value);
}
static int64_t long_from_token(dht::token token) {
int64_t data;
std::copy_n(token._data.data(), 8, reinterpret_cast<char*>(&data));
return net::ntoh(data);
return token._data;
}
SEASTAR_THREAD_TEST_CASE(test_decorated_key_is_compatible_with_origin) {
@@ -78,7 +73,7 @@ SEASTAR_THREAD_TEST_CASE(test_token_wraparound_1) {
BOOST_REQUIRE(t1 > t2);
// Even without knowing what the midpoint is, it needs to be inside the
// wrapped range, i.e., between t1 and inf, OR between -inf and t2
auto midpoint = partitioner.midpoint(t1, t2);
auto midpoint = dht::token::midpoint(t1, t2);
BOOST_REQUIRE(midpoint > t1 || midpoint < t2);
// We can also calculate the actual value the midpoint should have:
BOOST_REQUIRE_EQUAL(midpoint, token_from_long(0x8800'0000'0000'0000));
@@ -89,7 +84,7 @@ SEASTAR_THREAD_TEST_CASE(test_token_wraparound_2) {
auto t2 = token_from_long(0x9000'0000'0000'0000);
dht::murmur3_partitioner partitioner;
BOOST_REQUIRE(t1 > t2);
auto midpoint = partitioner.midpoint(t1, t2);
auto midpoint = dht::token::midpoint(t1, t2);
BOOST_REQUIRE(midpoint > t1 || midpoint < t2);
BOOST_REQUIRE_EQUAL(midpoint, token_from_long(0x7800'0000'0000'0000));
}
@@ -224,9 +219,8 @@ SEASTAR_THREAD_TEST_CASE(test_ring_position_ordering) {
SEASTAR_THREAD_TEST_CASE(test_token_no_wraparound_1) {
auto t1 = token_from_long(0x5000'0000'0000'0000);
auto t2 = token_from_long(0x7000'0000'0000'0000);
dht::murmur3_partitioner partitioner;
BOOST_REQUIRE(t1 < t2);
auto midpoint = partitioner.midpoint(t1, t2);
auto midpoint = dht::token::midpoint(t1, t2);
BOOST_REQUIRE(midpoint > t1 && midpoint < t2);
BOOST_REQUIRE_EQUAL(midpoint, token_from_long(0x6000'0000'0000'0000));
}
@@ -243,7 +237,7 @@ void test_partitioner_sharding(const dht::i_partitioner& part, unsigned shards,
BOOST_REQUIRE_EQUAL(part.shard_of(lim), i % shards);
if (i != 0) {
BOOST_REQUIRE_EQUAL(part.shard_of(prev_token(part, lim)), (i - 1) % shards);
BOOST_REQUIRE(part.is_equal(lim, part.token_for_next_shard(prev_token(part, lim), i % shards)));
BOOST_REQUIRE_EQUAL(lim, part.token_for_next_shard(prev_token(part, lim), i % shards));
}
if (i != (shards << ignorebits) - 1) {
auto next_shard = (i + 1) % shards;
@@ -613,7 +607,7 @@ do_test_selective_token_range_sharder(const dht::i_partitioner& part, const sche
}
BOOST_REQUIRE(end_shard == shard);
}
auto midpoint = part.midpoint(
auto midpoint = dht::token::midpoint(
range_shard->start() ? range_shard->start()->value() : dht::minimum_token(),
range_shard->end() ? range_shard->end()->value() : dht::minimum_token());
auto mid_shard = part.shard_of(midpoint);

View File

@@ -55,7 +55,7 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) {
.with_column("v", bytes_type)
.build();
dht::token tok = dht::global_partitioner().get_random_token();
dht::token tok = dht::token::get_random_token();
auto key1 = dht::decorated_key{tok,
partition_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("key1"))))};
@@ -282,8 +282,8 @@ BOOST_AUTO_TEST_CASE(range_overlap_tests) {
auto get_item(std::string left, std::string right, std::string val) {
using value_type = std::unordered_set<std::string>;
auto l = dht::global_partitioner().from_sstring(left);
auto r = dht::global_partitioner().from_sstring(right);
auto l = dht::token::from_sstring(left);
auto r = dht::token::from_sstring(right);
auto rg = range<dht::token>({{l, false}}, {r});
value_type v{val};
return std::make_pair(locator::token_metadata::range_to_interval(rg), v);
@@ -309,7 +309,7 @@ BOOST_AUTO_TEST_CASE(test_range_interval_map) {
}
auto search_item = [&mymap] (std::string val) {
auto tok = dht::global_partitioner().from_sstring(val);
auto tok = dht::token::from_sstring(val);
auto search = range<token>(tok);
auto it = mymap.find(locator::token_metadata::range_to_interval(search));
if (it != mymap.end()) {

View File

@@ -5019,10 +5019,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
}
static dht::token token_from_long(int64_t value) {
auto t = net::hton(value);
bytes b(bytes::initialized_later(), 8);
std::copy_n(reinterpret_cast<int8_t*>(&t), 8, b.begin());
return { dht::token::kind::key, std::move(b) };
return { dht::token::kind::key, value };
}
SEASTAR_TEST_CASE(basic_interval_map_testing_for_sstable_set) {

View File

@@ -187,9 +187,9 @@ SEASTAR_TEST_CASE(test_query_size_estimates_virtual_table) {
SEASTAR_TEST_CASE(test_query_view_built_progress_virtual_table) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto rand = [] { return dht::global_partitioner().get_random_token(); };
auto rand = [] { return dht::token::get_random_token(); };
auto next_token = rand();
auto next_token_str = dht::global_partitioner().to_sstring(next_token);
auto next_token_str = next_token.to_sstring();
db::system_keyspace::register_view_for_building("ks", "v1", rand()).get();
db::system_keyspace::register_view_for_building("ks", "v2", rand()).get();
db::system_keyspace::register_view_for_building("ks", "v3", rand()).get();

View File

@@ -803,8 +803,8 @@ public:
void describe_splits_ex(thrift_fn::function<void(std::vector<CfSplit> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) {
with_cob(std::move(cob), std::move(exn_cob), [&]{
dht::token_range_vector ranges;
auto tstart = start_token.empty() ? dht::minimum_token() : dht::global_partitioner().from_sstring(sstring(start_token));
auto tend = end_token.empty() ? dht::maximum_token() : dht::global_partitioner().from_sstring(sstring(end_token));
auto tstart = start_token.empty() ? dht::minimum_token() : dht::token::from_sstring(sstring(start_token));
auto tend = end_token.empty() ? dht::maximum_token() : dht::token::from_sstring(sstring(end_token));
range<dht::token> r({{ std::move(tstart), false }}, {{ std::move(tend), true }});
auto cf = sstring(cfName);
auto splits = service::get_local_storage_service().get_splits(current_keyspace(), cf, std::move(r), keys_per_split);
@@ -813,8 +813,8 @@ public:
for (auto&& s : splits) {
res.emplace_back();
assert(s.first.start() && s.first.end());
auto start_token = dht::global_partitioner().to_sstring(s.first.start()->value());
auto end_token = dht::global_partitioner().to_sstring(s.first.end()->value());
auto start_token = s.first.start()->value().to_sstring();
auto end_token = s.first.end()->value().to_sstring();
res.back().__set_start_token(bytes_to_string(to_bytes_view(start_token)));
res.back().__set_end_token(bytes_to_string(to_bytes_view(end_token)));
res.back().__set_row_count(s.second);
@@ -1714,7 +1714,7 @@ private:
auto start = range.start_key.empty()
? dht::ring_position::starting_at(dht::minimum_token())
: partitioner.decorate_key(s, key_from_thrift(s, to_bytes(range.start_key)));
auto end = dht::ring_position::ending_at(partitioner.from_sstring(sstring(range.end_token)));
auto end = dht::ring_position::ending_at(dht::token::from_sstring(sstring(range.end_token)));
if (end.token().is_minimum()) {
end = dht::ring_position::ending_at(dht::maximum_token());
} else if (end.less_compare(s, start)) {
@@ -1725,8 +1725,8 @@ private:
}
// Token range can wrap; the start token is exclusive.
auto start = dht::ring_position::ending_at(partitioner.from_sstring(sstring(range.start_token)));
auto end = dht::ring_position::ending_at(partitioner.from_sstring(sstring(range.end_token)));
auto start = dht::ring_position::ending_at(dht::token::from_sstring(sstring(range.start_token)));
auto end = dht::ring_position::ending_at(dht::token::from_sstring(sstring(range.end_token)));
if (end.token().is_minimum()) {
end = dht::ring_position::ending_at(dht::maximum_token());
}