Merge "Change token representation to int64_t" from Piotr
" After deprecating partitioners other than Murmur3 we can change the representation of tokens to int64_t. This will allow setting custom partitioner on each table. With this change partitioners become just converters from partition keys to tokens (int64_t). Following operations are no longer dependant on partitioner implementation: - Tokens comparison - Tokens serialization/deserialization to strings - Tokens serialization/deserialization to bytes - Sharding logic - Random token generation This change will be followed by a PR that enables per table partitioner and then another PR that introduces a special partitioner for CDC tables. Tests: unit(dev) Results of memory footprint test: Differences: in cache: 992 vs 984 in memtable: 750 vs 742 sizeof(cache_entry) = 112 vs 104 -- sizeof(decorated_key) = 36 vs 32 MASTER: mutation footprint: in cache: 992 in memtable: 750 in sstable: 351 frozen: 540 canonical: 827 query result: 342 sizeof(cache_entry) = 112 -- sizeof(decorated_key) = 36 -- sizeof(cache_link_type) = 32 -- sizeof(mutation_partition) = 96 -- -- sizeof(_static_row) = 8 -- -- sizeof(_rows) = 24 -- -- sizeof(_row_tombstones) = 40 sizeof(rows_entry) = 232 sizeof(lru_link_type) = 16 sizeof(deletable_row) = 168 sizeof(row) = 112 sizeof(atomic_cell_or_collection) = 8 THIS PATCHSET: mutation footprint: in cache: 984 in memtable: 742 in sstable: 351 frozen: 540 canonical: 827 query result: 342 sizeof(cache_entry) = 104 -- sizeof(decorated_key) = 32 -- sizeof(cache_link_type) = 32 -- sizeof(mutation_partition) = 96 -- -- sizeof(_static_row) = 8 -- -- sizeof(_rows) = 24 -- -- sizeof(_row_tombstones) = 40 sizeof(rows_entry) = 232 sizeof(lru_link_type) = 16 sizeof(deletable_row) = 168 sizeof(row) = 112 sizeof(atomic_cell_or_collection) = 8 " * 'fixed_token_representation' of https://github.com/haaawk/scylla: (21 commits) token: cast to int64_t not long in long_token murmur3: move sharding logic to token and i_partitioner partitioner: move shard_of_minimum_token to token partitioner: remove token_to_bytes partitioner: move get_token_validator to token partitioner: merge tri_compare into dht::tri_compare partitioner: move describe_ownership to token partitioner: move from_bytes to token partitioner: move from_string to token partitioner: move to_sstring to token partitioner: move get_random_token to token partitioner: move midpoint function to token token: remove token_view sstables: use copy constructor for tokens token: change _data to int64_t partitioner: remove hash_large_token token: change data to array<uint8_t, 8> partitioner: Extract token to separate .hh and .cc files partitioner: remove unused functions Revert "dht/murmur3_partitioner: take private methods out of the class" ...
This commit is contained in:
@@ -29,6 +29,7 @@
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "dht/token-sharding.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
@@ -177,7 +178,7 @@ topology_description generate_topology_description(
|
||||
auto it = std::lower_bound(tokens.begin(), tokens.end(), token);
|
||||
auto& entry = entries[it != tokens.end() ? std::distance(tokens.begin(), it) : 0];
|
||||
|
||||
auto shard_id = partitioner.shard_of(token, entry.streams.size(), entry.sharding_ignore_msb);
|
||||
auto shard_id = dht::shard_of(entry.streams.size(), entry.sharding_ignore_msb, token);
|
||||
assert(shard_id < entry.streams.size());
|
||||
|
||||
if (!entry.streams[shard_id].is_set()) {
|
||||
|
||||
@@ -431,7 +431,7 @@ public:
|
||||
// more details like tombstones/ttl? Probably not but keep in mind.
|
||||
mutation transform(const mutation& m, const cql3::untyped_result_set* rs = nullptr) const {
|
||||
auto ts = find_timestamp(*_schema, m);
|
||||
auto stream_id = _ctx._cdc_metadata.get_stream(ts, m.token(), _ctx._partitioner);
|
||||
auto stream_id = _ctx._cdc_metadata.get_stream(ts, m.token());
|
||||
mutation res(_log_schema, stream_id.to_partition_key(*_log_schema));
|
||||
auto tuuid = timeuuid_type->decompose(generate_timeuuid(ts));
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "dht/token-sharding.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
|
||||
#include "cdc/generation.hh"
|
||||
@@ -38,11 +38,10 @@ static api::timestamp_type to_ts(db_clock::time_point tp) {
|
||||
|
||||
static cdc::stream_id get_stream(
|
||||
const cdc::token_range_description& entry,
|
||||
dht::token tok,
|
||||
const dht::i_partitioner& p) {
|
||||
dht::token tok) {
|
||||
// The ith stream is the stream for the ith shard.
|
||||
auto shard_cnt = entry.streams.size();
|
||||
auto shard_id = p.shard_of(tok, shard_cnt, entry.sharding_ignore_msb);
|
||||
auto shard_id = dht::shard_of(shard_cnt, entry.sharding_ignore_msb, tok);
|
||||
|
||||
if (shard_id >= shard_cnt) {
|
||||
on_internal_error(cdc_log, "get_stream: shard_id out of bounds");
|
||||
@@ -53,8 +52,7 @@ static cdc::stream_id get_stream(
|
||||
|
||||
static cdc::stream_id get_stream(
|
||||
const std::vector<cdc::token_range_description>& entries,
|
||||
dht::token tok,
|
||||
const dht::i_partitioner& p) {
|
||||
dht::token tok) {
|
||||
if (entries.empty()) {
|
||||
on_internal_error(cdc_log, "get_stream: entries empty");
|
||||
}
|
||||
@@ -65,7 +63,7 @@ static cdc::stream_id get_stream(
|
||||
it = entries.begin();
|
||||
}
|
||||
|
||||
return get_stream(*it, tok, p);
|
||||
return get_stream(*it, tok);
|
||||
}
|
||||
|
||||
cdc::metadata::container_t::const_iterator cdc::metadata::gen_used_at(api::timestamp_type ts) const {
|
||||
@@ -78,7 +76,7 @@ cdc::metadata::container_t::const_iterator cdc::metadata::gen_used_at(api::times
|
||||
return std::prev(it);
|
||||
}
|
||||
|
||||
cdc::stream_id cdc::metadata::get_stream(api::timestamp_type ts, dht::token tok, const dht::i_partitioner& p) {
|
||||
cdc::stream_id cdc::metadata::get_stream(api::timestamp_type ts, dht::token tok) {
|
||||
auto now = api::new_timestamp();
|
||||
if (ts > now + generation_leeway.count()) {
|
||||
throw exceptions::invalid_request_exception(format(
|
||||
@@ -132,7 +130,7 @@ cdc::stream_id cdc::metadata::get_stream(api::timestamp_type ts, dht::token tok,
|
||||
}
|
||||
|
||||
auto& gen = *it->second;
|
||||
auto ret = ::get_stream(gen.entries(), tok, p);
|
||||
auto ret = ::get_stream(gen.entries(), tok);
|
||||
_last_stream_timestamp = ts;
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
|
||||
namespace dht {
|
||||
class token;
|
||||
class i_partitioner;
|
||||
}
|
||||
|
||||
namespace cdc {
|
||||
@@ -66,7 +65,7 @@ public:
|
||||
* yet know about. The amount of leeway (how much "into the future" we allow `ts` to be) is defined
|
||||
* by the `cdc::generation_leeway` constant.
|
||||
*/
|
||||
stream_id get_stream(api::timestamp_type ts, dht::token tok, const dht::i_partitioner&);
|
||||
stream_id get_stream(api::timestamp_type ts, dht::token tok);
|
||||
|
||||
/* Insert the generation given by `gen` with timestamp `ts` to be used by the `get_stream` function,
|
||||
* if the generation is not already known or older than the currently known ones.
|
||||
|
||||
@@ -680,6 +680,7 @@ scylla_core = (['database.cc',
|
||||
'gms/application_state.cc',
|
||||
'gms/inet_address.cc',
|
||||
'dht/i_partitioner.cc',
|
||||
'dht/token.cc',
|
||||
'dht/murmur3_partitioner.cc',
|
||||
'dht/boot_strapper.cc',
|
||||
'dht/range_streamer.cc',
|
||||
|
||||
@@ -56,7 +56,7 @@ private:
|
||||
public:
|
||||
token_fct(schema_ptr s)
|
||||
: native_scalar_function("token",
|
||||
dht::global_partitioner().get_token_validator(),
|
||||
dht::token::get_token_validator(),
|
||||
s->partition_key_type()->types())
|
||||
, _schema(s) {
|
||||
}
|
||||
@@ -65,7 +65,7 @@ public:
|
||||
auto key = partition_key::from_optional_exploded(*_schema, parameters);
|
||||
auto tok = dht::global_partitioner().get_token(*_schema, key);
|
||||
warn(unimplemented::cause::VALIDATION);
|
||||
return dht::global_partitioner().token_to_bytes(tok);
|
||||
return tok.data();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ public:
|
||||
if (!buf) {
|
||||
throw exceptions::invalid_request_exception("Invalid null token value");
|
||||
}
|
||||
auto tk = dht::global_partitioner().from_bytes(*buf);
|
||||
auto tk = dht::token::from_bytes(*buf);
|
||||
if (tk.is_minimum() && !is_start(b)) {
|
||||
// The token was parsed as a minimum marker (token::kind::before_all_keys), but
|
||||
// as it appears in the end bound position, it is actually the maximum marker
|
||||
|
||||
@@ -861,7 +861,7 @@ static void append_base_key_to_index_ck(std::vector<bytes_view>& exploded_index_
|
||||
exploded_index_ck.push_back(bytes_view(*indexed_column_value));
|
||||
} else {
|
||||
dht::i_partitioner& partitioner = dht::global_partitioner();
|
||||
token_bytes = partitioner.token_to_bytes(partitioner.get_token(*_schema, last_base_pk));
|
||||
token_bytes = partitioner.get_token(*_schema, last_base_pk).data();
|
||||
exploded_index_ck.push_back(bytes_view(token_bytes));
|
||||
append_base_key_to_index_ck<partition_key>(exploded_index_ck, last_base_pk, *cdef);
|
||||
}
|
||||
@@ -1040,7 +1040,7 @@ query::partition_slice indexed_table_select_statement::get_partition_slice_for_g
|
||||
// Computed token column needs to be added to index view restrictions
|
||||
const column_definition& token_cdef = *_view_schema->clustering_key_columns().begin();
|
||||
auto base_pk = partition_key::from_optional_exploded(*_schema, _restrictions->get_partition_key_restrictions()->values(options));
|
||||
bytes token_value = dht::global_partitioner().token_to_bytes(dht::global_partitioner().get_token(*_schema, base_pk));
|
||||
bytes token_value = dht::global_partitioner().get_token(*_schema, base_pk).data();
|
||||
auto token_restriction = ::make_shared<restrictions::single_column_restriction::EQ>(token_cdef, ::make_shared<cql3::constants::value>(cql3::raw_value::make_value(token_value)));
|
||||
clustering_restrictions->merge_with(token_restriction);
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ std::vector<::shared_ptr<cql3::column_specification>> cql3::token_relation::to_r
|
||||
//auto* c = column_defs.front();
|
||||
return {::make_shared<column_specification>(schema->ks_name(), schema->cf_name(),
|
||||
::make_shared<column_identifier>("partition key token", true),
|
||||
dht::global_partitioner().get_token_validator())};
|
||||
dht::token::get_token_validator())};
|
||||
}
|
||||
|
||||
::shared_ptr<cql3::restrictions::restriction> cql3::token_relation::new_EQ_restriction(
|
||||
|
||||
@@ -169,7 +169,7 @@ static system_keyspace::range_estimates estimate(const column_family& cf, const
|
||||
int64_t count{0};
|
||||
utils::estimated_histogram hist{0};
|
||||
auto from_bytes = [] (auto& b) {
|
||||
return dht::global_partitioner().from_sstring(utf8_type->to_string(b));
|
||||
return dht::token::from_sstring(utf8_type->to_string(b));
|
||||
};
|
||||
dht::token_range_vector ranges;
|
||||
::compat::unwrap_into(
|
||||
@@ -193,7 +193,7 @@ future<std::vector<token_range>> get_local_ranges() {
|
||||
std::vector<token_range> local_ranges;
|
||||
auto to_bytes = [](const std::optional<dht::token_range::bound>& b) {
|
||||
assert(b);
|
||||
return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value()));
|
||||
return utf8_type->decompose(b->value().to_sstring());
|
||||
};
|
||||
// We merge the ranges to be compatible with how Cassandra shows it's size estimates table.
|
||||
// All queries will be on that table, where all entries are text and there's no notion of
|
||||
|
||||
@@ -230,49 +230,6 @@ static db::consistency_level quorum_if_many(size_t num_token_owners) {
|
||||
return num_token_owners > 1 ? db::consistency_level::QUORUM : db::consistency_level::ONE;
|
||||
}
|
||||
|
||||
// --------------- TODO: copy-pasted from murmur3_partitioner; remove this after haaawk's change is merged
|
||||
static int64_t long_token(dht::token_view t) {
|
||||
if (t.is_minimum() || t.is_maximum()) {
|
||||
return std::numeric_limits<long>::min();
|
||||
}
|
||||
|
||||
if (t._data.size() != sizeof(int64_t)) {
|
||||
throw runtime_exception(format("Invalid token. Should have size {:d}, has size {:d}\n", sizeof(int64_t), t._data.size()));
|
||||
}
|
||||
|
||||
auto ptr = t._data.begin();
|
||||
auto lp = unaligned_cast<const int64_t *>(ptr);
|
||||
return net::ntoh(*lp);
|
||||
}
|
||||
|
||||
static int64_t normalize(int64_t in) {
|
||||
return in == std::numeric_limits<int64_t>::lowest()
|
||||
? std::numeric_limits<int64_t>::max()
|
||||
: in;
|
||||
}
|
||||
|
||||
static dht::token get_token(uint64_t value) {
|
||||
auto t = net::hton(normalize(value));
|
||||
bytes b(bytes::initialized_later(), 8);
|
||||
std::copy_n(reinterpret_cast<int8_t*>(&t), 8, b.begin());
|
||||
return dht::token{dht::token::kind::key, std::move(b)};
|
||||
}
|
||||
|
||||
static dht::token token_from_string(const sstring& t) {
|
||||
auto lp = boost::lexical_cast<long>(t);
|
||||
if (lp == std::numeric_limits<long>::min()) {
|
||||
return dht::minimum_token();
|
||||
} else {
|
||||
return get_token(uint64_t(lp));
|
||||
}
|
||||
}
|
||||
|
||||
static sstring string_from_token(const dht::token& t) {
|
||||
return seastar::to_sstring<sstring>(long_token(t));
|
||||
}
|
||||
|
||||
// ---------------
|
||||
|
||||
static list_type_impl::native_type prepare_cdc_generation_description(const cdc::topology_description& description) {
|
||||
list_type_impl::native_type ret;
|
||||
for (auto& e: description.entries()) {
|
||||
@@ -282,7 +239,7 @@ static list_type_impl::native_type prepare_cdc_generation_description(const cdc:
|
||||
}
|
||||
|
||||
ret.push_back(make_tuple_value(cdc_token_range_description_type,
|
||||
{ data_value(string_from_token(e.token_range_end))
|
||||
{ data_value(e.token_range_end.to_sstring())
|
||||
, make_list_value(cdc_streams_list_type, std::move(streams))
|
||||
, data_value(int8_t(e.sharding_ignore_msb))
|
||||
}));
|
||||
@@ -313,7 +270,7 @@ static cdc::token_range_description get_token_range_description_from_value(const
|
||||
on_internal_error(cdc_log, "get_token_range_description_from_value: stream tuple type size != 3");
|
||||
}
|
||||
|
||||
auto token = token_from_string(value_cast<sstring>(tup[0]));
|
||||
auto token = dht::token::from_sstring(value_cast<sstring>(tup[0]));
|
||||
auto streams = get_streams_from_list_value(tup[1]);
|
||||
auto sharding_ignore_msb = uint8_t(value_cast<int8_t>(tup[2]));
|
||||
|
||||
|
||||
@@ -1510,7 +1510,7 @@ future<db_clock::time_point> get_truncated_at(utils::UUID cf_id) {
|
||||
static set_type_impl::native_type prepare_tokens(const std::unordered_set<dht::token>& tokens) {
|
||||
set_type_impl::native_type tset;
|
||||
for (auto& t: tokens) {
|
||||
tset.push_back(dht::global_partitioner().to_sstring(t));
|
||||
tset.push_back(t.to_sstring());
|
||||
}
|
||||
return tset;
|
||||
}
|
||||
@@ -1519,8 +1519,8 @@ std::unordered_set<dht::token> decode_tokens(set_type_impl::native_type& tokens)
|
||||
std::unordered_set<dht::token> tset;
|
||||
for (auto& t: tokens) {
|
||||
auto str = value_cast<sstring>(t);
|
||||
assert(str == dht::global_partitioner().to_sstring(dht::global_partitioner().from_sstring(str)));
|
||||
tset.insert(dht::global_partitioner().from_sstring(str));
|
||||
assert(str == dht::token::from_sstring(str).to_sstring());
|
||||
tset.insert(dht::token::from_sstring(str));
|
||||
}
|
||||
return tset;
|
||||
}
|
||||
@@ -2073,7 +2073,7 @@ future<> register_view_for_building(sstring ks_name, sstring view_name, const dh
|
||||
std::move(view_name),
|
||||
0,
|
||||
int32_t(engine().cpu_id()),
|
||||
dht::global_partitioner().to_sstring(token)).discard_result();
|
||||
token.to_sstring()).discard_result();
|
||||
}
|
||||
|
||||
future<> update_view_build_progress(sstring ks_name, sstring view_name, const dht::token& token) {
|
||||
@@ -2083,7 +2083,7 @@ future<> update_view_build_progress(sstring ks_name, sstring view_name, const dh
|
||||
std::move(req),
|
||||
std::move(ks_name),
|
||||
std::move(view_name),
|
||||
dht::global_partitioner().to_sstring(token),
|
||||
token.to_sstring(),
|
||||
int32_t(engine().cpu_id())).discard_result();
|
||||
}
|
||||
|
||||
@@ -2134,11 +2134,11 @@ future<std::vector<view_build_progress>> load_view_build_progress() {
|
||||
for (auto& row : *cql_result) {
|
||||
auto ks_name = row.get_as<sstring>("keyspace_name");
|
||||
auto cf_name = row.get_as<sstring>("view_name");
|
||||
auto first_token = dht::global_partitioner().from_sstring(row.get_as<sstring>("first_token"));
|
||||
auto first_token = dht::token::from_sstring(row.get_as<sstring>("first_token"));
|
||||
auto next_token_sstring = row.get_opt<sstring>("next_token");
|
||||
std::optional<dht::token> next_token;
|
||||
if (next_token_sstring) {
|
||||
next_token = dht::global_partitioner().from_sstring(std::move(next_token_sstring).value());
|
||||
next_token = dht::token::from_sstring(std::move(next_token_sstring).value());
|
||||
}
|
||||
auto cpu_id = row.get_as<int32_t>("cpu_id");
|
||||
progress.emplace_back(view_build_progress{
|
||||
|
||||
@@ -79,7 +79,7 @@ std::unordered_set<token> boot_strapper::get_bootstrap_tokens(token_metadata met
|
||||
blogger.debug("tokens manually specified as {}", initial_tokens);
|
||||
std::unordered_set<token> tokens;
|
||||
for (auto& token_string : initial_tokens) {
|
||||
auto token = dht::global_partitioner().from_sstring(token_string);
|
||||
auto token = dht::token::from_sstring(token_string);
|
||||
if (metadata.get_endpoint(token)) {
|
||||
throw std::runtime_error(format("Bootstrapping to existing token {} is not allowed (decommission/removenode the old node first).", token_string));
|
||||
}
|
||||
@@ -106,7 +106,7 @@ std::unordered_set<token> boot_strapper::get_bootstrap_tokens(token_metadata met
|
||||
std::unordered_set<token> boot_strapper::get_random_tokens(token_metadata metadata, size_t num_tokens) {
|
||||
std::unordered_set<token> tokens;
|
||||
while (tokens.size() < num_tokens) {
|
||||
auto token = global_partitioner().get_random_token();
|
||||
auto token = dht::token::get_random_token();
|
||||
auto ep = metadata.get_endpoint(token);
|
||||
if (!ep) {
|
||||
tokens.emplace(token);
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "i_partitioner.hh"
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include "dht/murmur3_partitioner.hh"
|
||||
#include "dht/token-sharding.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "types.hh"
|
||||
#include "utils/murmur_hash.hh"
|
||||
@@ -35,106 +36,24 @@
|
||||
|
||||
namespace dht {
|
||||
|
||||
static const token min_token{ token::kind::before_all_keys, {} };
|
||||
static const token max_token{ token::kind::after_all_keys, {} };
|
||||
i_partitioner::i_partitioner(unsigned shard_count, unsigned sharding_ignore_msb_bits)
|
||||
: _shard_count(shard_count)
|
||||
// if one shard, ignore sharding_ignore_msb_bits as they will just cause needless
|
||||
// range breaks
|
||||
, _sharding_ignore_msb_bits(shard_count > 1 ? sharding_ignore_msb_bits : 0)
|
||||
, _shard_start(init_zero_based_shard_start(_shard_count, _sharding_ignore_msb_bits))
|
||||
{}
|
||||
|
||||
const token&
|
||||
minimum_token() {
|
||||
return min_token;
|
||||
}
|
||||
|
||||
const token&
|
||||
maximum_token() {
|
||||
return max_token;
|
||||
}
|
||||
|
||||
// result + overflow bit
|
||||
std::pair<bytes, bool>
|
||||
add_bytes(bytes_view b1, bytes_view b2, bool carry = false) {
|
||||
auto sz = std::max(b1.size(), b2.size());
|
||||
auto expand = [sz] (bytes_view b) {
|
||||
bytes ret(bytes::initialized_later(), sz);
|
||||
auto bsz = b.size();
|
||||
auto p = std::copy(b.begin(), b.end(), ret.begin());
|
||||
std::fill_n(p, sz - bsz, 0);
|
||||
return ret;
|
||||
};
|
||||
auto eb1 = expand(b1);
|
||||
auto eb2 = expand(b2);
|
||||
auto p1 = eb1.begin();
|
||||
auto p2 = eb2.begin();
|
||||
unsigned tmp = carry;
|
||||
for (size_t idx = 0; idx < sz; ++idx) {
|
||||
tmp += uint8_t(p1[sz - idx - 1]);
|
||||
tmp += uint8_t(p2[sz - idx - 1]);
|
||||
p1[sz - idx - 1] = tmp;
|
||||
tmp >>= std::numeric_limits<uint8_t>::digits;
|
||||
}
|
||||
return { std::move(eb1), bool(tmp) };
|
||||
}
|
||||
|
||||
bytes
|
||||
shift_right(bool carry, bytes b) {
|
||||
unsigned tmp = carry;
|
||||
auto sz = b.size();
|
||||
auto p = b.begin();
|
||||
for (size_t i = 0; i < sz; ++i) {
|
||||
auto lsb = p[i] & 1;
|
||||
p[i] = (tmp << std::numeric_limits<uint8_t>::digits) | uint8_t(p[i]) >> 1;
|
||||
tmp = lsb;
|
||||
}
|
||||
return b;
|
||||
unsigned
|
||||
i_partitioner::shard_of(const token& t) const {
|
||||
return dht::shard_of(_shard_count, _sharding_ignore_msb_bits, t);
|
||||
}
|
||||
|
||||
token
|
||||
midpoint_unsigned_tokens(const token& t1, const token& t2) {
|
||||
// calculate the average of the two tokens.
|
||||
// before_all_keys is implicit 0, after_all_keys is implicit 1.
|
||||
bool c1 = t1._kind == token::kind::after_all_keys;
|
||||
bool c2 = t1._kind == token::kind::after_all_keys;
|
||||
if (c1 && c2) {
|
||||
// both end-of-range tokens?
|
||||
return t1;
|
||||
}
|
||||
// we can ignore beginning-of-range, since their representation is 0.0
|
||||
auto sum_carry = add_bytes(t1._data, t2._data);
|
||||
auto& sum = sum_carry.first;
|
||||
// if either was end-of-range, we added 0.0, so pretend we added 1.0 and
|
||||
// and got a carry:
|
||||
bool carry = sum_carry.second || c1 || c2;
|
||||
auto avg = shift_right(carry, std::move(sum));
|
||||
if (t1 > t2) {
|
||||
// wrap around the ring. We really want (t1 + (t2 + 1.0)) / 2, so add 0.5.
|
||||
// example: midpoint(0.9, 0.2) == midpoint(0.9, 1.2) == 1.05 == 0.05
|
||||
// == (0.9 + 0.2) / 2 + 0.5 (mod 1)
|
||||
if (avg.size() > 0) {
|
||||
avg[0] ^= 0x80;
|
||||
}
|
||||
}
|
||||
return token{token::kind::key, std::move(avg)};
|
||||
i_partitioner::token_for_next_shard(const token& t, shard_id shard, unsigned spans) const {
|
||||
return dht::token_for_next_shard(_shard_start, _shard_count, _sharding_ignore_msb_bits, t, shard, spans);
|
||||
}
|
||||
|
||||
int tri_compare(token_view t1, token_view t2) {
|
||||
if (t1._kind < t2._kind) {
|
||||
return -1;
|
||||
} else if (t1._kind > t2._kind) {
|
||||
return 1;
|
||||
} else if (t1._kind == token_kind::key) {
|
||||
return global_partitioner().tri_compare(t1, t2);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const token& t) {
|
||||
if (t._kind == token::kind::after_all_keys) {
|
||||
out << "maximum token";
|
||||
} else if (t._kind == token::kind::before_all_keys) {
|
||||
out << "minimum token";
|
||||
} else {
|
||||
out << global_partitioner().to_sstring(t);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const decorated_key& dk) {
|
||||
return out << "{key: " << dk._key << ", token:" << dk._token << "}";
|
||||
@@ -294,7 +213,7 @@ ring_position_range_sharder::next(const schema& s) {
|
||||
if (_done) {
|
||||
return {};
|
||||
}
|
||||
auto shard = _range.start() ? _partitioner.shard_of(_range.start()->value().token()) : _partitioner.shard_of_minimum_token();
|
||||
auto shard = _range.start() ? _partitioner.shard_of(_range.start()->value().token()) : token::shard_of_minimum_token();
|
||||
auto next_shard = shard + 1 < _partitioner.shard_count() ? shard + 1 : 0;
|
||||
auto shard_boundary_token = _partitioner.token_for_next_shard(_range.start() ? _range.start()->value().token() : minimum_token(), next_shard);
|
||||
auto shard_boundary = ring_position::starting_at(shard_boundary_token);
|
||||
@@ -569,16 +488,3 @@ split_ranges_to_shards(const dht::token_range_vector& ranges, const schema& s) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace std {
|
||||
|
||||
size_t
|
||||
hash<dht::token>::hash_large_token(const managed_bytes& b) const {
|
||||
auto read_bytes = boost::irange<size_t>(0, b.size())
|
||||
| boost::adaptors::transformed([&b] (size_t idx) { return b[idx]; });
|
||||
std::array<uint64_t, 2> result;
|
||||
utils::murmur_hash::hash3_x64_128(read_bytes.begin(), b.size(), 0, result);
|
||||
return result[0];
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -50,6 +50,8 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include <range.hh>
|
||||
#include <byteswap.h>
|
||||
#include "dht/token.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -70,7 +72,6 @@ namespace dht {
|
||||
// into its users.
|
||||
|
||||
class decorated_key;
|
||||
class token;
|
||||
class ring_position;
|
||||
|
||||
using partition_range = nonwrapping_range<ring_position>;
|
||||
@@ -79,89 +80,6 @@ using token_range = nonwrapping_range<token>;
|
||||
using partition_range_vector = std::vector<partition_range>;
|
||||
using token_range_vector = std::vector<token_range>;
|
||||
|
||||
enum class token_kind {
|
||||
before_all_keys,
|
||||
key,
|
||||
after_all_keys,
|
||||
};
|
||||
|
||||
|
||||
class token_view {
|
||||
public:
|
||||
token_kind _kind;
|
||||
bytes_view _data;
|
||||
|
||||
token_view(token_kind kind, bytes_view data) : _kind(kind), _data(data) {}
|
||||
explicit token_view(const token& token);
|
||||
|
||||
bool is_minimum() const {
|
||||
return _kind == token_kind::before_all_keys;
|
||||
}
|
||||
|
||||
bool is_maximum() const {
|
||||
return _kind == token_kind::after_all_keys;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
class token {
|
||||
public:
|
||||
using kind = token_kind;
|
||||
kind _kind;
|
||||
// _data can be interpreted as a big endian binary fraction
|
||||
// in the range [0.0, 1.0).
|
||||
//
|
||||
// So, [] == 0.0
|
||||
// [0x00] == 0.0
|
||||
// [0x80] == 0.5
|
||||
// [0x00, 0x80] == 1/512
|
||||
// [0xff, 0x80] == 1 - 1/512
|
||||
managed_bytes _data;
|
||||
|
||||
token() : _kind(kind::before_all_keys) {
|
||||
}
|
||||
|
||||
token(kind k, managed_bytes d) : _kind(std::move(k)), _data(std::move(d)) {
|
||||
}
|
||||
|
||||
bool is_minimum() const {
|
||||
return _kind == kind::before_all_keys;
|
||||
}
|
||||
|
||||
bool is_maximum() const {
|
||||
return _kind == kind::after_all_keys;
|
||||
}
|
||||
|
||||
size_t external_memory_usage() const {
|
||||
return _data.external_memory_usage();
|
||||
}
|
||||
|
||||
size_t memory_usage() const {
|
||||
return sizeof(token) + external_memory_usage();
|
||||
}
|
||||
|
||||
explicit token(token_view v) : _kind(v._kind), _data(v._data) {}
|
||||
|
||||
operator token_view() const {
|
||||
return token_view(*this);
|
||||
}
|
||||
};
|
||||
|
||||
inline token_view::token_view(const token& token) : _kind(token._kind), _data(bytes_view(token._data)) {}
|
||||
|
||||
token midpoint_unsigned(const token& t1, const token& t2);
|
||||
const token& minimum_token();
|
||||
const token& maximum_token();
|
||||
int tri_compare(token_view t1, token_view t2);
|
||||
inline bool operator==(token_view t1, token_view t2) { return tri_compare(t1, t2) == 0; }
|
||||
inline bool operator<(token_view t1, token_view t2) { return tri_compare(t1, t2) < 0; }
|
||||
|
||||
inline bool operator!=(const token& t1, const token& t2) { return std::rel_ops::operator!=(t1, t2); }
|
||||
inline bool operator>(const token& t1, const token& t2) { return std::rel_ops::operator>(t1, t2); }
|
||||
inline bool operator<=(const token& t1, const token& t2) { return std::rel_ops::operator<=(t1, t2); }
|
||||
inline bool operator>=(const token& t1, const token& t2) { return std::rel_ops::operator>=(t1, t2); }
|
||||
std::ostream& operator<<(std::ostream& out, const token& t);
|
||||
|
||||
template <typename T>
|
||||
inline auto get_random_number() {
|
||||
static thread_local std::default_random_engine re{std::random_device{}()};
|
||||
@@ -232,8 +150,10 @@ using decorated_key_opt = std::optional<decorated_key>;
|
||||
class i_partitioner {
|
||||
protected:
|
||||
unsigned _shard_count;
|
||||
unsigned _sharding_ignore_msb_bits;
|
||||
std::vector<uint64_t> _shard_start;
|
||||
public:
|
||||
explicit i_partitioner(unsigned shard_count) : _shard_count(shard_count) {}
|
||||
i_partitioner(unsigned shard_count = smp::count, unsigned sharding_ignore_msb_bits = 0);
|
||||
virtual ~i_partitioner() {}
|
||||
|
||||
/**
|
||||
@@ -257,22 +177,6 @@ public:
|
||||
return { std::move(token), std::move(key) };
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate a token representing the approximate "middle" of the given
|
||||
* range.
|
||||
*
|
||||
* @return The approximate midpoint between left and right.
|
||||
*/
|
||||
virtual token midpoint(const token& left, const token& right) const = 0;
|
||||
|
||||
/**
|
||||
* @return A token smaller than all others in the range that is being partitioned.
|
||||
* Not legal to assign to a node or key. (But legal to use in range scans.)
|
||||
*/
|
||||
token get_minimum_token() {
|
||||
return dht::minimum_token();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a token that can be used to route a given key
|
||||
* (This is NOT a method to create a token from its string representation;
|
||||
@@ -281,27 +185,6 @@ public:
|
||||
virtual token get_token(const schema& s, partition_key_view key) const = 0;
|
||||
virtual token get_token(const sstables::key_view& key) const = 0;
|
||||
|
||||
|
||||
/**
|
||||
* @return a partitioner-specific string representation of this token
|
||||
*/
|
||||
virtual sstring to_sstring(const dht::token& t) const = 0;
|
||||
|
||||
/**
|
||||
* @return a token from its partitioner-specific string representation
|
||||
*/
|
||||
virtual dht::token from_sstring(const sstring& t) const = 0;
|
||||
|
||||
/**
|
||||
* @return a token from its partitioner-specific byte representation
|
||||
*/
|
||||
virtual dht::token from_bytes(bytes_view bytes) const = 0;
|
||||
|
||||
/**
|
||||
* @return a randomly generated token
|
||||
*/
|
||||
virtual token get_random_token() = 0;
|
||||
|
||||
// FIXME: token.tokenFactory
|
||||
//virtual token.tokenFactory gettokenFactory() = 0;
|
||||
|
||||
@@ -311,17 +194,6 @@ public:
|
||||
*/
|
||||
virtual bool preserves_order() = 0;
|
||||
|
||||
/**
|
||||
* Calculate the deltas between tokens in the ring in order to compare
|
||||
* relative sizes.
|
||||
*
|
||||
* @param sortedtokens a sorted List of tokens
|
||||
* @return the mapping from 'token' to 'percentage of the ring owned by that token'.
|
||||
*/
|
||||
virtual std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens) = 0;
|
||||
|
||||
virtual data_type get_token_validator() = 0;
|
||||
|
||||
/**
|
||||
* @return name of partitioner.
|
||||
*/
|
||||
@@ -330,12 +202,7 @@ public:
|
||||
/**
|
||||
* Calculates the shard that handles a particular token.
|
||||
*/
|
||||
virtual unsigned shard_of(const token& t) const = 0;
|
||||
|
||||
/**
|
||||
* Calculates the shard that handles a particular token using custom shard_count and sharding_ignore_msb.
|
||||
*/
|
||||
virtual unsigned shard_of(const token& t, unsigned shard_count, unsigned sharding_ignore_msb) const = 0;
|
||||
virtual unsigned shard_of(const token& t) const;
|
||||
|
||||
/**
|
||||
* Gets the first token greater than `t` that is in shard `shard`, and is a shard boundary (its first token).
|
||||
@@ -348,38 +215,7 @@ public:
|
||||
*
|
||||
* On overflow, maximum_token() is returned.
|
||||
*/
|
||||
virtual token token_for_next_shard(const token& t, shard_id shard, unsigned spans = 1) const = 0;
|
||||
|
||||
/**
|
||||
* Gets the first shard of the minimum token.
|
||||
*/
|
||||
unsigned shard_of_minimum_token() const {
|
||||
return 0; // hardcoded for now; unlikely to change
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bytes that represent the token as required by get_token_validator().
|
||||
*/
|
||||
virtual bytes token_to_bytes(const token& t) const {
|
||||
return bytes(t._data.begin(), t._data.end());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return < 0 if if t1's _data array is less, t2's. 0 if they are equal, and > 0 otherwise. _kind comparison should be done separately.
|
||||
*/
|
||||
virtual int tri_compare(token_view t1, token_view t2) const = 0;
|
||||
/**
|
||||
* @return true if t1's _data array is equal t2's. _kind comparison should be done separately.
|
||||
*/
|
||||
bool is_equal(token_view t1, token_view t2) const {
|
||||
return tri_compare(t1, t2) == 0;
|
||||
}
|
||||
/**
|
||||
* @return true if t1's _data array is less then t2's. _kind comparison should be done separately.
|
||||
*/
|
||||
bool is_less(token_view t1, token_view t2) const {
|
||||
return tri_compare(t1, t2) < 0;
|
||||
}
|
||||
virtual token token_for_next_shard(const token& t, shard_id shard, unsigned spans = 1) const;
|
||||
|
||||
/**
|
||||
* @return number of shards configured for this partitioner
|
||||
@@ -392,13 +228,9 @@ public:
|
||||
return "biased-token-round-robin";
|
||||
}
|
||||
|
||||
virtual unsigned sharding_ignore_msb() const {
|
||||
return 0;
|
||||
unsigned sharding_ignore_msb() const {
|
||||
return _sharding_ignore_msb_bits;
|
||||
}
|
||||
|
||||
friend bool operator==(token_view t1, token_view t2);
|
||||
friend bool operator<(token_view t1, token_view t2);
|
||||
friend int tri_compare(token_view t1, token_view t2);
|
||||
};
|
||||
|
||||
//
|
||||
@@ -955,14 +787,11 @@ namespace std {
|
||||
template<>
|
||||
struct hash<dht::token> {
|
||||
size_t operator()(const dht::token& t) const {
|
||||
const auto& b = t._data;
|
||||
if (b.size() == sizeof(size_t)) { // practically always
|
||||
return read_le<size_t>(reinterpret_cast<const char*>(b.data()));
|
||||
}
|
||||
return hash_large_token(b);
|
||||
// We have to reverse the bytes here to keep compatibility with
|
||||
// the behaviour that was here when tokens were represented as
|
||||
// sequence of bytes.
|
||||
return bswap_64(t._data);
|
||||
}
|
||||
private:
|
||||
size_t hash_large_token(const managed_bytes& b) const;
|
||||
};
|
||||
|
||||
template <>
|
||||
|
||||
@@ -26,127 +26,26 @@
|
||||
#include <boost/lexical_cast.hpp>
|
||||
#include <boost/range/irange.hpp>
|
||||
|
||||
using uint128_t = unsigned __int128;
|
||||
namespace dht {
|
||||
|
||||
static inline
|
||||
unsigned
|
||||
zero_based_shard_of(uint64_t token, unsigned shards, unsigned sharding_ignore_msb_bits) {
|
||||
// This is the master function, the inverses have to match it wrt. rounding errors.
|
||||
token <<= sharding_ignore_msb_bits;
|
||||
// Treat "token" as a fraction in the interval [0, 1); compute:
|
||||
// shard = floor((0.token) * shards)
|
||||
return (uint128_t(token) * shards) >> 64;
|
||||
}
|
||||
|
||||
static
|
||||
std::vector<uint64_t>
|
||||
init_zero_based_shard_start(unsigned shards, unsigned sharding_ignore_msb_bits) {
|
||||
// computes the inverse of zero_based_shard_of(). ret[s] will return the smallest token that belongs to s
|
||||
if (shards == 1) {
|
||||
// Avoid the while loops below getting confused finding the "edge" between two nonexistent shards
|
||||
return std::vector<uint64_t>(1, uint64_t(0));
|
||||
}
|
||||
auto ret = std::vector<uint64_t>(shards);
|
||||
for (auto s : boost::irange<unsigned>(0, shards)) {
|
||||
uint64_t token = (uint128_t(s) << 64) / shards;
|
||||
token >>= sharding_ignore_msb_bits; // leftmost bits are ignored by zero_based_shard_of
|
||||
// token is the start of the next shard, and can be slightly before due to rounding errors; adjust
|
||||
while (zero_based_shard_of(token, shards, sharding_ignore_msb_bits) != s) {
|
||||
++token;
|
||||
}
|
||||
ret[s] = token;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline
|
||||
int64_t
|
||||
normalize(int64_t in) {
|
||||
return in == std::numeric_limits<int64_t>::lowest()
|
||||
? std::numeric_limits<int64_t>::max()
|
||||
: in;
|
||||
}
|
||||
|
||||
static
|
||||
dht::token
|
||||
get_token(uint64_t value) {
|
||||
// We don't normalize() the value, since token includes an is-before-everything
|
||||
// indicator.
|
||||
// FIXME: will this require a repair when importing a database?
|
||||
auto t = net::hton(normalize(value));
|
||||
bytes b(bytes::initialized_later(), 8);
|
||||
std::copy_n(reinterpret_cast<int8_t*>(&t), 8, b.begin());
|
||||
return dht::token{dht::token::kind::key, std::move(b)};
|
||||
}
|
||||
|
||||
static
|
||||
dht::token
|
||||
get_token(bytes_view key) {
|
||||
token
|
||||
murmur3_partitioner::get_token(bytes_view key) const {
|
||||
if (key.empty()) {
|
||||
return dht::minimum_token();
|
||||
return minimum_token();
|
||||
}
|
||||
std::array<uint64_t, 2> hash;
|
||||
utils::murmur_hash::hash3_x64_128(key, 0, hash);
|
||||
return get_token(hash[0]);
|
||||
}
|
||||
|
||||
static inline
|
||||
int64_t long_token(dht::token_view t) {
|
||||
if (t.is_minimum() || t.is_maximum()) {
|
||||
return std::numeric_limits<long>::min();
|
||||
}
|
||||
|
||||
if (t._data.size() != sizeof(int64_t)) {
|
||||
throw runtime_exception(format("Invalid token. Should have size {:d}, has size {:d}\n", sizeof(int64_t), t._data.size()));
|
||||
}
|
||||
|
||||
auto ptr = t._data.begin();
|
||||
auto lp = unaligned_cast<const int64_t *>(ptr);
|
||||
return net::ntoh(*lp);
|
||||
token
|
||||
murmur3_partitioner::get_token(uint64_t value) const {
|
||||
return token(token::kind::key, value);
|
||||
}
|
||||
|
||||
// translate to a zero-based range
|
||||
static inline
|
||||
uint64_t
|
||||
unbias(const dht::token& t) {
|
||||
return uint64_t(long_token(t)) + uint64_t(std::numeric_limits<int64_t>::min());
|
||||
}
|
||||
|
||||
// translate from a zero-based range
|
||||
static inline
|
||||
dht::token
|
||||
bias(uint64_t n) {
|
||||
return get_token(n - uint64_t(std::numeric_limits<int64_t>::min()));
|
||||
}
|
||||
|
||||
static inline
|
||||
unsigned
|
||||
shard_of(const dht::token& t, unsigned shard_count, unsigned sharding_ignore_msb) {
|
||||
switch (t._kind) {
|
||||
case dht::token::kind::before_all_keys:
|
||||
return 0;
|
||||
case dht::token::kind::after_all_keys:
|
||||
return shard_count - 1;
|
||||
case dht::token::kind::key:
|
||||
uint64_t adjusted = unbias(t);
|
||||
return zero_based_shard_of(adjusted, shard_count, sharding_ignore_msb);
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
namespace dht {
|
||||
|
||||
murmur3_partitioner::murmur3_partitioner(unsigned shard_count, unsigned sharding_ignore_msb_bits)
|
||||
: i_partitioner(shard_count)
|
||||
// if one shard, ignore sharding_ignore_msb_bits as they will just cause needless
|
||||
// range breaks
|
||||
, _sharding_ignore_msb_bits(shard_count > 1 ? sharding_ignore_msb_bits : 0)
|
||||
, _shard_start(init_zero_based_shard_start(_shard_count, _sharding_ignore_msb_bits))
|
||||
{ }
|
||||
|
||||
token
|
||||
murmur3_partitioner::get_token(const sstables::key_view& key) const {
|
||||
return ::get_token(bytes_view(key));
|
||||
return get_token(bytes_view(key));
|
||||
}
|
||||
|
||||
token
|
||||
@@ -154,177 +53,9 @@ murmur3_partitioner::get_token(const schema& s, partition_key_view key) const {
|
||||
std::array<uint64_t, 2> hash;
|
||||
auto&& legacy = key.legacy_form(s);
|
||||
utils::murmur_hash::hash3_x64_128(legacy.begin(), legacy.size(), 0, hash);
|
||||
return ::get_token(hash[0]);
|
||||
return get_token(hash[0]);
|
||||
}
|
||||
|
||||
token murmur3_partitioner::get_random_token() {
|
||||
auto rand = dht::get_random_number<uint64_t>();
|
||||
return ::get_token(rand);
|
||||
}
|
||||
|
||||
sstring murmur3_partitioner::to_sstring(const token& t) const {
|
||||
return seastar::to_sstring<sstring>(long_token(t));
|
||||
}
|
||||
|
||||
dht::token murmur3_partitioner::from_sstring(const sstring& t) const {
|
||||
auto lp = boost::lexical_cast<long>(t);
|
||||
if (lp == std::numeric_limits<long>::min()) {
|
||||
return minimum_token();
|
||||
} else {
|
||||
return ::get_token(uint64_t(lp));
|
||||
}
|
||||
}
|
||||
|
||||
dht::token murmur3_partitioner::from_bytes(bytes_view bytes) const {
|
||||
if (bytes.size() != sizeof(int64_t)) {
|
||||
throw runtime_exception(format("Invalid token. Should have size {:d}, has size {:d}\n", sizeof(int64_t), bytes.size()));
|
||||
}
|
||||
|
||||
int64_t v;
|
||||
std::copy_n(bytes.begin(), sizeof(v), reinterpret_cast<int8_t *>(&v));
|
||||
auto tok = net::ntoh(v);
|
||||
if (tok == std::numeric_limits<int64_t>::min()) {
|
||||
return minimum_token();
|
||||
} else {
|
||||
return dht::token(dht::token::kind::key, bytes);
|
||||
}
|
||||
}
|
||||
|
||||
int murmur3_partitioner::tri_compare(token_view t1, token_view t2) const {
|
||||
auto l1 = long_token(t1);
|
||||
auto l2 = long_token(t2);
|
||||
|
||||
if (l1 == l2) {
|
||||
return 0;
|
||||
} else {
|
||||
return l1 < l2 ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Assuming that x>=y, return the positive difference x-y.
|
||||
// The return type is an unsigned type, as the difference may overflow
|
||||
// a signed type (e.g., consider very positive x and very negative y).
|
||||
template <typename T>
|
||||
static std::make_unsigned_t<T> positive_subtract(T x, T y) {
|
||||
return std::make_unsigned_t<T>(x) - std::make_unsigned_t<T>(y);
|
||||
}
|
||||
|
||||
token murmur3_partitioner::midpoint(const token& t1, const token& t2) const {
|
||||
auto l1 = long_token(t1);
|
||||
auto l2 = long_token(t2);
|
||||
int64_t mid;
|
||||
if (l1 <= l2) {
|
||||
// To find the midpoint, we cannot use the trivial formula (l1+l2)/2
|
||||
// because the addition can overflow the integer. To avoid this
|
||||
// overflow, we first notice that the above formula is equivalent to
|
||||
// l1 + (l2-l1)/2. Now, "l2-l1" can still overflow a signed integer
|
||||
// (e.g., think of a very positive l2 and very negative l1), but
|
||||
// because l1 <= l2 in this branch, we note that l2-l1 is positive
|
||||
// and fits an *unsigned* int's range. So,
|
||||
mid = l1 + positive_subtract(l2, l1)/2;
|
||||
} else {
|
||||
// When l2 < l1, we need to switch l1 and and l2 in the above
|
||||
// formula, because now l1 - l2 is positive.
|
||||
// Additionally, we consider this case is a "wrap around", so we need
|
||||
// to behave as if l2 + 2^64 was meant instead of l2, i.e., add 2^63
|
||||
// to the average.
|
||||
mid = l2 + positive_subtract(l1, l2)/2 + 0x8000'0000'0000'0000;
|
||||
}
|
||||
return ::get_token(mid);
|
||||
}
|
||||
|
||||
static float ratio_helper(int64_t a, int64_t b) {
|
||||
|
||||
uint64_t val = (a > b)? static_cast<uint64_t>(a) - static_cast<uint64_t>(b) : (static_cast<uint64_t>(a) - static_cast<uint64_t>(b) - 1);
|
||||
return val/(float)std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
|
||||
std::map<token, float>
|
||||
murmur3_partitioner::describe_ownership(const std::vector<token>& sorted_tokens) {
|
||||
std::map<token, float> ownerships;
|
||||
auto i = sorted_tokens.begin();
|
||||
|
||||
// 0-case
|
||||
if (i == sorted_tokens.end()) {
|
||||
throw runtime_exception("No nodes present in the cluster. Has this node finished starting up?");
|
||||
}
|
||||
// 1-case
|
||||
if (sorted_tokens.size() == 1) {
|
||||
ownerships[sorted_tokens[0]] = 1.0;
|
||||
// n-case
|
||||
} else {
|
||||
const token& start = sorted_tokens[0];
|
||||
|
||||
int64_t ti = long_token(start); // The first token and its value
|
||||
int64_t start_long = ti;
|
||||
int64_t tim1 = ti; // The last token and its value (after loop)
|
||||
for (i++; i != sorted_tokens.end(); i++) {
|
||||
ti = long_token(*i); // The next token and its value
|
||||
ownerships[*i]= ratio_helper(ti, tim1); // save (T(i) -> %age)
|
||||
tim1 = ti;
|
||||
}
|
||||
|
||||
// The start token's range extends backward to the last token, which is why both were saved above.
|
||||
ownerships[start] = ratio_helper(start_long, ti);
|
||||
}
|
||||
|
||||
return ownerships;
|
||||
}
|
||||
|
||||
data_type
|
||||
murmur3_partitioner::get_token_validator() {
|
||||
return long_type;
|
||||
}
|
||||
|
||||
unsigned
|
||||
murmur3_partitioner::shard_of(const token& t) const {
|
||||
return ::shard_of(t, _shard_count, _sharding_ignore_msb_bits);
|
||||
}
|
||||
|
||||
unsigned
|
||||
murmur3_partitioner::shard_of(const token& t, unsigned shard_count, unsigned sharding_ignore_msb) const {
|
||||
return ::shard_of(t, shard_count, sharding_ignore_msb);
|
||||
}
|
||||
|
||||
token
|
||||
murmur3_partitioner::token_for_next_shard(const token& t, shard_id shard, unsigned spans) const {
|
||||
uint64_t n = 0;
|
||||
switch (t._kind) {
|
||||
case token::kind::before_all_keys:
|
||||
break;
|
||||
case token::kind::after_all_keys:
|
||||
return maximum_token();
|
||||
case token::kind::key:
|
||||
n = unbias(t);
|
||||
break;
|
||||
}
|
||||
auto s = zero_based_shard_of(n, _shard_count, _sharding_ignore_msb_bits);
|
||||
|
||||
if (!_sharding_ignore_msb_bits) {
|
||||
// This ought to be the same as the else branch, but avoids shifts by 64
|
||||
n = _shard_start[shard];
|
||||
if (spans > 1 || shard <= s) {
|
||||
return maximum_token();
|
||||
}
|
||||
} else {
|
||||
auto left_part = n >> (64 - _sharding_ignore_msb_bits);
|
||||
left_part += spans - unsigned(shard > s);
|
||||
if (left_part >= (1u << _sharding_ignore_msb_bits)) {
|
||||
return maximum_token();
|
||||
}
|
||||
left_part <<= (64 - _sharding_ignore_msb_bits);
|
||||
auto right_part = _shard_start[shard];
|
||||
n = left_part | right_part;
|
||||
}
|
||||
return bias(n);
|
||||
}
|
||||
|
||||
unsigned
|
||||
murmur3_partitioner::sharding_ignore_msb() const {
|
||||
return _sharding_ignore_msb_bits;
|
||||
}
|
||||
|
||||
|
||||
using registry = class_registrator<i_partitioner, murmur3_partitioner, const unsigned&, const unsigned&>;
|
||||
static registry registrator("org.apache.cassandra.dht.Murmur3Partitioner");
|
||||
static registry registrator_short_name("Murmur3Partitioner");
|
||||
|
||||
@@ -28,27 +28,16 @@
|
||||
namespace dht {
|
||||
|
||||
class murmur3_partitioner final : public i_partitioner {
|
||||
unsigned _sharding_ignore_msb_bits;
|
||||
std::vector<uint64_t> _shard_start;
|
||||
public:
|
||||
murmur3_partitioner(unsigned shard_count = smp::count, unsigned sharding_ignore_msb_bits = 0);
|
||||
murmur3_partitioner(unsigned shard_count = smp::count, unsigned sharding_ignore_msb_bits = 0)
|
||||
: i_partitioner(shard_count, sharding_ignore_msb_bits) {}
|
||||
virtual const sstring name() const { return "org.apache.cassandra.dht.Murmur3Partitioner"; }
|
||||
virtual token get_token(const schema& s, partition_key_view key) const override;
|
||||
virtual token get_token(const sstables::key_view& key) const override;
|
||||
virtual token get_random_token() override;
|
||||
virtual bool preserves_order() override { return false; }
|
||||
virtual std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens) override;
|
||||
virtual data_type get_token_validator() override;
|
||||
virtual int tri_compare(token_view t1, token_view t2) const override;
|
||||
virtual token midpoint(const token& t1, const token& t2) const override;
|
||||
virtual sstring to_sstring(const dht::token& t) const override;
|
||||
virtual dht::token from_sstring(const sstring& t) const override;
|
||||
virtual dht::token from_bytes(bytes_view bytes) const override;
|
||||
|
||||
virtual unsigned shard_of(const token& t) const override;
|
||||
virtual unsigned shard_of(const token& t, unsigned shard_count, unsigned sharding_ignore_msb) const override;
|
||||
virtual token token_for_next_shard(const token& t, shard_id shard, unsigned spans) const override;
|
||||
virtual unsigned sharding_ignore_msb() const override;
|
||||
private:
|
||||
token get_token(bytes_view key) const;
|
||||
token get_token(uint64_t value) const;
|
||||
};
|
||||
|
||||
|
||||
|
||||
34
dht/token-sharding.hh
Normal file
34
dht/token-sharding.hh
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "dht/token.hh"
|
||||
|
||||
namespace dht {
|
||||
|
||||
std::vector<uint64_t> init_zero_based_shard_start(unsigned shards, unsigned sharding_ignore_msb_bits);
|
||||
|
||||
unsigned shard_of(unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t);
|
||||
|
||||
token token_for_next_shard(const std::vector<uint64_t>& shard_start, unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t, shard_id shard, unsigned spans);
|
||||
|
||||
} //namespace dht
|
||||
276
dht/token.cc
Normal file
276
dht/token.cc
Normal file
@@ -0,0 +1,276 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <algorithm>
|
||||
#include <limits>
|
||||
#include <ostream>
|
||||
#include <utility>
|
||||
|
||||
#include "dht/token.hh"
|
||||
#include "dht/token-sharding.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
|
||||
namespace dht {
|
||||
|
||||
using uint128_t = unsigned __int128;
|
||||
|
||||
inline int64_t long_token(const token& t) {
|
||||
if (t.is_minimum() || t.is_maximum()) {
|
||||
return std::numeric_limits<int64_t>::min();
|
||||
}
|
||||
|
||||
return t._data;
|
||||
}
|
||||
|
||||
static const token min_token{ token::kind::before_all_keys, 0 };
|
||||
static const token max_token{ token::kind::after_all_keys, 0 };
|
||||
|
||||
const token&
|
||||
minimum_token() {
|
||||
return min_token;
|
||||
}
|
||||
|
||||
const token&
|
||||
maximum_token() {
|
||||
return max_token;
|
||||
}
|
||||
|
||||
int tri_compare(const token& t1, const token& t2) {
|
||||
if (t1._kind < t2._kind) {
|
||||
return -1;
|
||||
} else if (t1._kind > t2._kind) {
|
||||
return 1;
|
||||
} else if (t1._kind == token_kind::key) {
|
||||
auto l1 = long_token(t1);
|
||||
auto l2 = long_token(t2);
|
||||
if (l1 == l2) {
|
||||
return 0;
|
||||
} else {
|
||||
return l1 < l2 ? -1 : 1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const token& t) {
|
||||
if (t._kind == token::kind::after_all_keys) {
|
||||
out << "maximum token";
|
||||
} else if (t._kind == token::kind::before_all_keys) {
|
||||
out << "minimum token";
|
||||
} else {
|
||||
out << t.to_sstring();
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
sstring token::to_sstring() const {
|
||||
return seastar::to_sstring<sstring>(long_token(*this));
|
||||
}
|
||||
|
||||
// Assuming that x>=y, return the positive difference x-y.
|
||||
// The return type is an unsigned type, as the difference may overflow
|
||||
// a signed type (e.g., consider very positive x and very negative y).
|
||||
template <typename T>
|
||||
static std::make_unsigned_t<T> positive_subtract(T x, T y) {
|
||||
return std::make_unsigned_t<T>(x) - std::make_unsigned_t<T>(y);
|
||||
}
|
||||
|
||||
token token::midpoint(const token& t1, const token& t2) {
|
||||
auto l1 = long_token(t1);
|
||||
auto l2 = long_token(t2);
|
||||
int64_t mid;
|
||||
if (l1 <= l2) {
|
||||
// To find the midpoint, we cannot use the trivial formula (l1+l2)/2
|
||||
// because the addition can overflow the integer. To avoid this
|
||||
// overflow, we first notice that the above formula is equivalent to
|
||||
// l1 + (l2-l1)/2. Now, "l2-l1" can still overflow a signed integer
|
||||
// (e.g., think of a very positive l2 and very negative l1), but
|
||||
// because l1 <= l2 in this branch, we note that l2-l1 is positive
|
||||
// and fits an *unsigned* int's range. So,
|
||||
mid = l1 + positive_subtract(l2, l1)/2;
|
||||
} else {
|
||||
// When l2 < l1, we need to switch l1 and and l2 in the above
|
||||
// formula, because now l1 - l2 is positive.
|
||||
// Additionally, we consider this case is a "wrap around", so we need
|
||||
// to behave as if l2 + 2^64 was meant instead of l2, i.e., add 2^63
|
||||
// to the average.
|
||||
mid = l2 + positive_subtract(l1, l2)/2 + 0x8000'0000'0000'0000;
|
||||
}
|
||||
return token{kind::key, mid};
|
||||
}
|
||||
|
||||
token token::get_random_token() {
|
||||
return {kind::key, dht::get_random_number<int64_t>()};
|
||||
}
|
||||
|
||||
token token::from_sstring(const sstring& t) {
|
||||
auto lp = boost::lexical_cast<long>(t);
|
||||
if (lp == std::numeric_limits<long>::min()) {
|
||||
return minimum_token();
|
||||
} else {
|
||||
return token(kind::key, uint64_t(lp));
|
||||
}
|
||||
}
|
||||
|
||||
token token::from_bytes(bytes_view bytes) {
|
||||
if (bytes.size() != sizeof(int64_t)) {
|
||||
throw runtime_exception(format("Invalid token. Should have size {:d}, has size {:d}\n", sizeof(int64_t), bytes.size()));
|
||||
}
|
||||
|
||||
int64_t v;
|
||||
std::copy_n(bytes.begin(), sizeof(v), reinterpret_cast<int8_t *>(&v));
|
||||
auto tok = net::ntoh(v);
|
||||
if (tok == std::numeric_limits<int64_t>::min()) {
|
||||
return minimum_token();
|
||||
} else {
|
||||
return dht::token(dht::token::kind::key, tok);
|
||||
}
|
||||
}
|
||||
|
||||
static float ratio_helper(int64_t a, int64_t b) {
|
||||
|
||||
uint64_t val = (a > b)? static_cast<uint64_t>(a) - static_cast<uint64_t>(b) : (static_cast<uint64_t>(a) - static_cast<uint64_t>(b) - 1);
|
||||
return val/(float)std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
|
||||
std::map<token, float>
|
||||
token::describe_ownership(const std::vector<token>& sorted_tokens) {
|
||||
std::map<token, float> ownerships;
|
||||
auto i = sorted_tokens.begin();
|
||||
|
||||
// 0-case
|
||||
if (i == sorted_tokens.end()) {
|
||||
throw runtime_exception("No nodes present in the cluster. Has this node finished starting up?");
|
||||
}
|
||||
// 1-case
|
||||
if (sorted_tokens.size() == 1) {
|
||||
ownerships[sorted_tokens[0]] = 1.0;
|
||||
// n-case
|
||||
} else {
|
||||
const token& start = sorted_tokens[0];
|
||||
|
||||
int64_t ti = long_token(start); // The first token and its value
|
||||
int64_t start_long = ti;
|
||||
int64_t tim1 = ti; // The last token and its value (after loop)
|
||||
for (i++; i != sorted_tokens.end(); i++) {
|
||||
ti = long_token(*i); // The next token and its value
|
||||
ownerships[*i]= ratio_helper(ti, tim1); // save (T(i) -> %age)
|
||||
tim1 = ti;
|
||||
}
|
||||
|
||||
// The start token's range extends backward to the last token, which is why both were saved above.
|
||||
ownerships[start] = ratio_helper(start_long, ti);
|
||||
}
|
||||
|
||||
return ownerships;
|
||||
}
|
||||
|
||||
data_type
|
||||
token::get_token_validator() {
|
||||
return long_type;
|
||||
}
|
||||
|
||||
static uint64_t unbias(const token& t) {
|
||||
return uint64_t(long_token(t)) + uint64_t(std::numeric_limits<int64_t>::min());
|
||||
}
|
||||
|
||||
static token bias(uint64_t n) {
|
||||
return token(token::kind::key, n - uint64_t(std::numeric_limits<int64_t>::min()));
|
||||
}
|
||||
|
||||
inline
|
||||
unsigned
|
||||
zero_based_shard_of(uint64_t token, unsigned shards, unsigned sharding_ignore_msb_bits) {
|
||||
// This is the master function, the inverses have to match it wrt. rounding errors.
|
||||
token <<= sharding_ignore_msb_bits;
|
||||
// Treat "token" as a fraction in the interval [0, 1); compute:
|
||||
// shard = floor((0.token) * shards)
|
||||
return (uint128_t(token) * shards) >> 64;
|
||||
}
|
||||
|
||||
std::vector<uint64_t>
|
||||
init_zero_based_shard_start(unsigned shards, unsigned sharding_ignore_msb_bits) {
|
||||
// computes the inverse of zero_based_shard_of(). ret[s] will return the smallest token that belongs to s
|
||||
if (shards == 1) {
|
||||
// Avoid the while loops below getting confused finding the "edge" between two nonexistent shards
|
||||
return std::vector<uint64_t>(1, uint64_t(0));
|
||||
}
|
||||
auto ret = std::vector<uint64_t>(shards);
|
||||
for (auto s : boost::irange<unsigned>(0, shards)) {
|
||||
uint64_t token = (uint128_t(s) << 64) / shards;
|
||||
token >>= sharding_ignore_msb_bits; // leftmost bits are ignored by zero_based_shard_of
|
||||
// token is the start of the next shard, and can be slightly before due to rounding errors; adjust
|
||||
while (zero_based_shard_of(token, shards, sharding_ignore_msb_bits) != s) {
|
||||
++token;
|
||||
}
|
||||
ret[s] = token;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
unsigned
|
||||
shard_of(unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t) {
|
||||
switch (t._kind) {
|
||||
case token::kind::before_all_keys:
|
||||
return 0;
|
||||
case token::kind::after_all_keys:
|
||||
return shard_count - 1;
|
||||
case token::kind::key:
|
||||
uint64_t adjusted = unbias(t);
|
||||
return zero_based_shard_of(adjusted, shard_count, sharding_ignore_msb_bits);
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
token
|
||||
token_for_next_shard(const std::vector<uint64_t>& shard_start, unsigned shard_count, unsigned sharding_ignore_msb_bits, const token& t, shard_id shard, unsigned spans) {
|
||||
uint64_t n = 0;
|
||||
switch (t._kind) {
|
||||
case token::kind::before_all_keys:
|
||||
break;
|
||||
case token::kind::after_all_keys:
|
||||
return maximum_token();
|
||||
case token::kind::key:
|
||||
n = unbias(t);
|
||||
break;
|
||||
}
|
||||
auto s = zero_based_shard_of(n, shard_count, sharding_ignore_msb_bits);
|
||||
|
||||
if (!sharding_ignore_msb_bits) {
|
||||
// This ought to be the same as the else branch, but avoids shifts by 64
|
||||
n = shard_start[shard];
|
||||
if (spans > 1 || shard <= s) {
|
||||
return maximum_token();
|
||||
}
|
||||
} else {
|
||||
auto left_part = n >> (64 - sharding_ignore_msb_bits);
|
||||
left_part += spans - unsigned(shard > s);
|
||||
if (left_part >= (1u << sharding_ignore_msb_bits)) {
|
||||
return maximum_token();
|
||||
}
|
||||
left_part <<= (64 - sharding_ignore_msb_bits);
|
||||
auto right_part = shard_start[shard];
|
||||
n = left_part | right_part;
|
||||
}
|
||||
return bias(n);
|
||||
}
|
||||
|
||||
} // namespace dht
|
||||
159
dht/token.hh
Normal file
159
dht/token.hh
Normal file
@@ -0,0 +1,159 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "bytes.hh"
|
||||
#include "utils/managed_bytes.hh"
|
||||
#include "types.hh"
|
||||
|
||||
#include <seastar/net/byteorder.hh>
|
||||
#include <fmt/format.h>
|
||||
#include <array>
|
||||
#include <functional>
|
||||
#include <utility>
|
||||
|
||||
namespace dht {
|
||||
|
||||
class token;
|
||||
|
||||
enum class token_kind {
|
||||
before_all_keys,
|
||||
key,
|
||||
after_all_keys,
|
||||
};
|
||||
|
||||
class token {
|
||||
static inline int64_t normalize(int64_t t) {
|
||||
return t == std::numeric_limits<int64_t>::min() ? std::numeric_limits<int64_t>::max() : t;
|
||||
}
|
||||
public:
|
||||
using kind = token_kind;
|
||||
kind _kind;
|
||||
int64_t _data;
|
||||
|
||||
token() : _kind(kind::before_all_keys) {
|
||||
}
|
||||
|
||||
token(kind k, int64_t d)
|
||||
: _kind(std::move(k))
|
||||
, _data(normalize(d)) { }
|
||||
|
||||
token(kind k, const bytes& b) : _kind(std::move(k)) {
|
||||
if (b.size() != sizeof(_data)) {
|
||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||
}
|
||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||
_data = net::ntoh(_data);
|
||||
}
|
||||
|
||||
token(kind k, bytes_view b) : _kind(std::move(k)) {
|
||||
if (b.size() != sizeof(_data)) {
|
||||
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
|
||||
}
|
||||
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
|
||||
_data = net::ntoh(_data);
|
||||
}
|
||||
|
||||
bool is_minimum() const {
|
||||
return _kind == kind::before_all_keys;
|
||||
}
|
||||
|
||||
bool is_maximum() const {
|
||||
return _kind == kind::after_all_keys;
|
||||
}
|
||||
|
||||
size_t external_memory_usage() const {
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t memory_usage() const {
|
||||
return sizeof(token);
|
||||
}
|
||||
|
||||
bytes data() const {
|
||||
auto t = net::hton(_data);
|
||||
bytes b(bytes::initialized_later(), sizeof(_data));
|
||||
std::copy_n(reinterpret_cast<int8_t*>(&t), sizeof(_data), b.begin());
|
||||
return b;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a string representation of this token
|
||||
*/
|
||||
sstring to_sstring() const;
|
||||
|
||||
/**
|
||||
* Calculate a token representing the approximate "middle" of the given
|
||||
* range.
|
||||
*
|
||||
* @return The approximate midpoint between left and right.
|
||||
*/
|
||||
static token midpoint(const token& left, const token& right);
|
||||
|
||||
/**
|
||||
* @return a randomly generated token
|
||||
*/
|
||||
static token get_random_token();
|
||||
|
||||
/**
|
||||
* @return a token from string representation
|
||||
*/
|
||||
static dht::token from_sstring(const sstring& t);
|
||||
|
||||
/**
|
||||
* @return a token from its byte representation
|
||||
*/
|
||||
static dht::token from_bytes(bytes_view bytes);
|
||||
|
||||
/**
|
||||
* Calculate the deltas between tokens in the ring in order to compare
|
||||
* relative sizes.
|
||||
*
|
||||
* @param sortedtokens a sorted List of tokens
|
||||
* @return the mapping from 'token' to 'percentage of the ring owned by that token'.
|
||||
*/
|
||||
static std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens);
|
||||
|
||||
static data_type get_token_validator();
|
||||
|
||||
/**
|
||||
* Gets the first shard of the minimum token.
|
||||
*/
|
||||
static unsigned shard_of_minimum_token() {
|
||||
return 0; // hardcoded for now; unlikely to change
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
const token& minimum_token();
|
||||
const token& maximum_token();
|
||||
int tri_compare(const token& t1, const token& t2);
|
||||
inline bool operator==(const token& t1, const token& t2) { return tri_compare(t1, t2) == 0; }
|
||||
inline bool operator<(const token& t1, const token& t2) { return tri_compare(t1, t2) < 0; }
|
||||
|
||||
inline bool operator!=(const token& t1, const token& t2) { return std::rel_ops::operator!=(t1, t2); }
|
||||
inline bool operator>(const token& t1, const token& t2) { return std::rel_ops::operator>(t1, t2); }
|
||||
inline bool operator<=(const token& t1, const token& t2) { return std::rel_ops::operator<=(t1, t2); }
|
||||
inline bool operator>=(const token& t1, const token& t2) { return std::rel_ops::operator>=(t1, t2); }
|
||||
std::ostream& operator<<(std::ostream& out, const token& t);
|
||||
|
||||
} // namespace dht
|
||||
@@ -133,14 +133,14 @@ public:
|
||||
public:
|
||||
sstring make_full_token_string(const std::unordered_set<token>& tokens) {
|
||||
return ::join(";", tokens | boost::adaptors::transformed([] (const token& t) {
|
||||
return dht::global_partitioner().to_sstring(t); })
|
||||
return t.to_sstring(); })
|
||||
);
|
||||
}
|
||||
sstring make_token_string(const std::unordered_set<token>& tokens) {
|
||||
if (tokens.empty()) {
|
||||
return "";
|
||||
}
|
||||
return dht::global_partitioner().to_sstring(*tokens.begin());
|
||||
return tokens.begin()->to_sstring();
|
||||
}
|
||||
|
||||
sstring make_cdc_streams_timestamp_string(std::optional<db_clock::time_point> t) {
|
||||
|
||||
@@ -6,7 +6,7 @@ class token {
|
||||
after_all_keys,
|
||||
};
|
||||
dht::token::kind _kind;
|
||||
bytes _data;
|
||||
bytes data();
|
||||
};
|
||||
|
||||
class decorated_key {
|
||||
|
||||
@@ -57,7 +57,7 @@ public:
|
||||
// The use of minimum_token() here twice is not a typo - because wrap-
|
||||
// around token ranges are supported by midpoint(), the beyond-maximum
|
||||
// token can also be represented by minimum_token().
|
||||
auto midpoint = dht::global_partitioner().midpoint(
|
||||
auto midpoint = dht::token::midpoint(
|
||||
range.start() ? range.start()->value() : dht::minimum_token(),
|
||||
range.end() ? range.end()->value() : dht::minimum_token());
|
||||
// This shouldn't happen, but if the range included just one token, we
|
||||
|
||||
@@ -1232,8 +1232,8 @@ private:
|
||||
throw(std::runtime_error("range must have two components "
|
||||
"separated by ':', got '" + range + "'"));
|
||||
}
|
||||
auto tok_start = dht::global_partitioner().from_sstring(token_strings[0]);
|
||||
auto tok_end = dht::global_partitioner().from_sstring(token_strings[1]);
|
||||
auto tok_start = dht::token::from_sstring(token_strings[0]);
|
||||
auto tok_end = dht::token::from_sstring(token_strings[1]);
|
||||
auto rng = wrapping_range<dht::token>(
|
||||
::range<dht::token>::bound(tok_start, false),
|
||||
::range<dht::token>::bound(tok_end, true));
|
||||
@@ -1377,12 +1377,12 @@ static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
|
||||
std::optional<::range<dht::token>::bound> tok_end;
|
||||
if (!options.start_token.empty()) {
|
||||
tok_start = ::range<dht::token>::bound(
|
||||
dht::global_partitioner().from_sstring(options.start_token),
|
||||
dht::token::from_sstring(options.start_token),
|
||||
true);
|
||||
}
|
||||
if (!options.end_token.empty()) {
|
||||
tok_end = ::range<dht::token>::bound(
|
||||
dht::global_partitioner().from_sstring(options.end_token),
|
||||
dht::token::from_sstring(options.end_token),
|
||||
false);
|
||||
}
|
||||
dht::token_range given_range_complement(tok_end, tok_start);
|
||||
|
||||
@@ -1457,7 +1457,7 @@ bytes token_column_computation::serialize() const {
|
||||
|
||||
bytes_opt token_column_computation::compute_value(const schema& schema, const partition_key& key, const clustering_row& row) const {
|
||||
dht::i_partitioner& partitioner = dht::global_partitioner();
|
||||
return partitioner.token_to_bytes(partitioner.get_token(schema, key));
|
||||
return partitioner.get_token(schema, key).data();
|
||||
}
|
||||
|
||||
bool operator==(const raw_view_info& x, const raw_view_info& y) {
|
||||
|
||||
@@ -402,7 +402,7 @@ std::unordered_set<token> get_replace_tokens() {
|
||||
}
|
||||
tokens.erase("");
|
||||
for (auto token_string : tokens) {
|
||||
auto token = dht::global_partitioner().from_sstring(token_string);
|
||||
auto token = dht::token::from_sstring(token_string);
|
||||
ret.insert(token);
|
||||
}
|
||||
return ret;
|
||||
@@ -804,7 +804,7 @@ void storage_service::join_token_ring(int delay) {
|
||||
}
|
||||
} else {
|
||||
for (auto token_string : initial_tokens) {
|
||||
auto token = dht::global_partitioner().from_sstring(token_string);
|
||||
auto token = dht::token::from_sstring(token_string);
|
||||
_bootstrap_tokens.insert(token);
|
||||
}
|
||||
slogger.info("Saved tokens not found. Using configuration value: {}", _bootstrap_tokens);
|
||||
@@ -1645,7 +1645,7 @@ std::unordered_set<locator::token> storage_service::get_tokens_for(inet_address
|
||||
std::unordered_set<token> ret;
|
||||
boost::split(tokens, tokens_string, boost::is_any_of(";"));
|
||||
for (auto str : tokens) {
|
||||
auto t = dht::global_partitioner().from_sstring(str);
|
||||
auto t = dht::token::from_sstring(str);
|
||||
slogger.trace("endpoint={}, token_str={} token={}", endpoint, str, t);
|
||||
ret.emplace(std::move(t));
|
||||
}
|
||||
@@ -2056,7 +2056,7 @@ storage_service::prepare_replacement_info(const std::unordered_map<gms::inet_add
|
||||
|
||||
future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
|
||||
return run_with_no_api_lock([] (storage_service& ss) {
|
||||
auto token_map = dht::global_partitioner().describe_ownership(ss._token_metadata.sorted_tokens());
|
||||
auto token_map = dht::token::describe_ownership(ss._token_metadata.sorted_tokens());
|
||||
// describeOwnership returns tokens in an unspecified order, let's re-order them
|
||||
std::map<gms::inet_address, float> ownership;
|
||||
for (auto entry : token_map) {
|
||||
@@ -2092,7 +2092,7 @@ future<std::map<gms::inet_address, float>> storage_service::effective_ownership(
|
||||
}
|
||||
keyspace_name = "system_traces";
|
||||
}
|
||||
auto token_ownership = dht::global_partitioner().describe_ownership(ss._token_metadata.sorted_tokens());
|
||||
auto token_ownership = dht::token::describe_ownership(ss._token_metadata.sorted_tokens());
|
||||
|
||||
std::map<gms::inet_address, float> final_ownership;
|
||||
|
||||
@@ -3341,10 +3341,10 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
|
||||
auto addresses = entry.second;
|
||||
token_range_endpoints tr;
|
||||
if (range.start()) {
|
||||
tr._start_token = dht::global_partitioner().to_sstring(range.start()->value());
|
||||
tr._start_token = range.start()->value().to_sstring();
|
||||
}
|
||||
if (range.end()) {
|
||||
tr._end_token = dht::global_partitioner().to_sstring(range.end()->value());
|
||||
tr._end_token = range.end()->value().to_sstring();
|
||||
}
|
||||
for (auto endpoint : addresses) {
|
||||
endpoint_details details;
|
||||
|
||||
@@ -1894,7 +1894,7 @@ private:
|
||||
public:
|
||||
future<> move(sstring new_token) {
|
||||
// FIXME: getPartitioner().getTokenFactory().validate(newToken);
|
||||
return move(dht::global_partitioner().from_sstring(new_token));
|
||||
return move(dht::token::from_sstring(new_token));
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
@@ -66,6 +66,7 @@
|
||||
#include "mutation_compactor.hh"
|
||||
#include "leveled_manifest.hh"
|
||||
#include "utils/observable.hh"
|
||||
#include "dht/token.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
|
||||
@@ -144,13 +144,13 @@ inline key maximum_key() {
|
||||
};
|
||||
|
||||
class decorated_key_view {
|
||||
dht::token_view _token;
|
||||
dht::token _token;
|
||||
key_view _partition_key;
|
||||
public:
|
||||
decorated_key_view(dht::token_view token, key_view partition_key) noexcept
|
||||
decorated_key_view(dht::token token, key_view partition_key) noexcept
|
||||
: _token(token), _partition_key(partition_key) { }
|
||||
|
||||
dht::token_view token() const {
|
||||
dht::token token() const {
|
||||
return _token;
|
||||
}
|
||||
|
||||
|
||||
@@ -572,8 +572,8 @@ future<> parse(sstable_version_types v, random_access_reader& in, summary& s) {
|
||||
// position is little-endian encoded
|
||||
auto position = seastar::read_le<uint64_t>(buf.get());
|
||||
auto token = dht::global_partitioner().get_token(key_view(key_data));
|
||||
auto token_data = s.add_summary_data(bytes_view(token._data));
|
||||
s.entries.push_back({ dht::token_view(dht::token::kind::key, token_data), key_data, position });
|
||||
s.add_summary_data(token.data());
|
||||
s.entries.push_back({ token, key_data, position });
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
@@ -1886,8 +1886,8 @@ create_sharding_metadata(schema_ptr schema, const dht::decorated_key& first_key,
|
||||
auto&& right_token = right.token();
|
||||
auto right_exclusive = !right.has_key() && right.bound() == dht::ring_position::token_bound::start;
|
||||
sm.token_ranges.elements.push_back(disk_token_range{
|
||||
{left_exclusive, to_bytes(bytes_view(left_token._data))},
|
||||
{right_exclusive, to_bytes(bytes_view(right_token._data))}});
|
||||
{left_exclusive, left_token.data()},
|
||||
{right_exclusive, right_token.data()}});
|
||||
}
|
||||
}
|
||||
return sm;
|
||||
@@ -1969,9 +1969,9 @@ void maybe_add_summary_entry(summary& s, const dht::token& token, bytes_view key
|
||||
if (data_offset >= state.next_data_offset_to_write_summary) {
|
||||
auto entry_size = 8 + 2 + key.size(); // offset + key_size.size + key.size
|
||||
state.next_data_offset_to_write_summary += state.summary_byte_cost * entry_size;
|
||||
auto token_data = s.add_summary_data(bytes_view(token._data));
|
||||
s.add_summary_data(token.data());
|
||||
auto key_data = s.add_summary_data(key);
|
||||
s.entries.push_back({ dht::token_view(dht::token::kind::key, token_data), key_data, index_offset });
|
||||
s.entries.push_back({ token, key_data, index_offset });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3207,8 +3207,8 @@ sstable::compute_shards_for_this_sstable() const {
|
||||
dht::ring_position::ending_at(get_last_decorated_key().token())));
|
||||
} else {
|
||||
auto disk_token_range_to_ring_position_range = [] (const disk_token_range& dtr) {
|
||||
auto t1 = dht::token(dht::token::kind::key, managed_bytes(bytes_view(dtr.left.token)));
|
||||
auto t2 = dht::token(dht::token::kind::key, managed_bytes(bytes_view(dtr.right.token)));
|
||||
auto t1 = dht::token(dht::token::kind::key, bytes_view(dtr.left.token));
|
||||
auto t2 = dht::token(dht::token::kind::key, bytes_view(dtr.right.token));
|
||||
return dht::partition_range::make(
|
||||
(dtr.left.exclusive ? dht::ring_position::ending_at : dht::ring_position::starting_at)(std::move(t1)),
|
||||
(dtr.right.exclusive ? dht::ring_position::starting_at : dht::ring_position::ending_at)(std::move(t2)));
|
||||
|
||||
@@ -129,7 +129,7 @@ inline std::ostream& operator<<(std::ostream& o, indexable_element e) {
|
||||
|
||||
class summary_entry {
|
||||
public:
|
||||
dht::token_view token;
|
||||
dht::token token;
|
||||
bytes_view key;
|
||||
uint64_t position;
|
||||
|
||||
|
||||
@@ -1966,31 +1966,18 @@ public:
|
||||
, _tokens(boost::copy_range<std::vector<dht::token>>(something_by_token | boost::adaptors::map_keys)) {
|
||||
}
|
||||
|
||||
virtual dht::token midpoint(const dht::token& left, const dht::token& right) const override { return _partitioner.midpoint(left, right); }
|
||||
virtual dht::token get_token(const schema& s, partition_key_view key) const override { return _partitioner.get_token(s, key); }
|
||||
virtual dht::token get_token(const sstables::key_view& key) const override { return _partitioner.get_token(key); }
|
||||
virtual sstring to_sstring(const dht::token& t) const override { return _partitioner.to_sstring(t); }
|
||||
virtual dht::token from_sstring(const sstring& t) const override { return _partitioner.from_sstring(t); }
|
||||
virtual dht::token from_bytes(bytes_view bytes) const override { return _partitioner.from_bytes(bytes); }
|
||||
virtual dht::token get_random_token() override { return _partitioner.get_random_token(); }
|
||||
virtual bool preserves_order() override { return _partitioner.preserves_order(); }
|
||||
virtual std::map<dht::token, float> describe_ownership(const std::vector<dht::token>& sorted_tokens) override { return _partitioner.describe_ownership(sorted_tokens); }
|
||||
virtual data_type get_token_validator() override { return _partitioner.get_token_validator(); }
|
||||
virtual const sstring name() const override { return _partitioner.name(); }
|
||||
virtual unsigned shard_of(const dht::token& t) const override;
|
||||
virtual unsigned shard_of(const dht::token& t, unsigned shard_count, unsigned sharding_ignore_msb) const override;
|
||||
virtual dht::token token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans = 1) const override;
|
||||
virtual int tri_compare(dht::token_view t1, dht::token_view t2) const override { return _partitioner.tri_compare(t1, t2); }
|
||||
};
|
||||
|
||||
unsigned dummy_partitioner::shard_of(const dht::token& t) const {
|
||||
return shard_of(t, _partitioner.shard_count(), 0);
|
||||
}
|
||||
|
||||
unsigned dummy_partitioner::shard_of(const dht::token& t, unsigned shard_count, unsigned sharding_ignore_msb) const {
|
||||
auto it = boost::find(_tokens, t);
|
||||
// Unknown tokens are assigned to shard 0
|
||||
return it == _tokens.end() ? 0 : std::distance(_tokens.begin(), it) % shard_count;
|
||||
return it == _tokens.end() ? 0 : std::distance(_tokens.begin(), it) % _partitioner.shard_count();
|
||||
}
|
||||
|
||||
dht::token dummy_partitioner::token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans) const {
|
||||
|
||||
@@ -133,11 +133,8 @@ void endpoints_check(
|
||||
}
|
||||
}
|
||||
|
||||
auto d2t = [](double d) {
|
||||
unsigned long l = net::hton(static_cast<unsigned long>(d*(std::numeric_limits<unsigned long>::max())));
|
||||
std::array<char, 8> a;
|
||||
memcpy(a.data(), &l, 8);
|
||||
return a;
|
||||
auto d2t = [](double d) -> int64_t {
|
||||
return static_cast<unsigned long>(d*(std::numeric_limits<unsigned long>::max()));
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -154,8 +151,7 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
|
||||
|
||||
for (auto& rp : ring_points) {
|
||||
double cur_point1 = rp.point - 0.5;
|
||||
token t1{dht::token::kind::key,
|
||||
{(int8_t*)d2t(cur_point1 / ring_points.size()).data(), 8}};
|
||||
token t1(dht::token::kind::key, d2t(cur_point1 / ring_points.size()));
|
||||
uint64_t cache_hit_count = ars_ptr->get_cache_hits_count();
|
||||
auto endpoints1 = ars_ptr->get_natural_endpoints(t1);
|
||||
|
||||
@@ -172,8 +168,7 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
|
||||
//
|
||||
cache_hit_count = ars_ptr->get_cache_hits_count();
|
||||
double cur_point2 = rp.point - 0.2;
|
||||
token t2{dht::token::kind::key,
|
||||
{(int8_t*)d2t(cur_point2 / ring_points.size()).data(), 8}};
|
||||
token t2(dht::token::kind::key, d2t(cur_point2 / ring_points.size()));
|
||||
auto endpoints2 = ars_ptr->get_natural_endpoints(t2);
|
||||
|
||||
endpoints_check(ars_ptr, endpoints2);
|
||||
@@ -208,9 +203,7 @@ future<> simple_test() {
|
||||
// Initialize the token_metadata
|
||||
for (unsigned i = 0; i < ring_points.size(); i++) {
|
||||
tm->update_normal_token(
|
||||
{dht::token::kind::key,
|
||||
{(int8_t*)d2t(ring_points[i].point / ring_points.size()).data(), 8}
|
||||
},
|
||||
{dht::token::kind::key, d2t(ring_points[i].point / ring_points.size())},
|
||||
ring_points[i].host);
|
||||
}
|
||||
|
||||
@@ -300,8 +293,7 @@ future<> heavy_origin_test() {
|
||||
ring_point rp = {token_point, address};
|
||||
|
||||
ring_points.emplace_back(rp);
|
||||
tokens[address].emplace(token{dht::token::kind::key,
|
||||
{(int8_t*)d2t(token_point / total_eps).data(), 8}});
|
||||
tokens[address].emplace(token{dht::token::kind::key, d2t(token_point / total_eps)});
|
||||
|
||||
nlogger.debug("adding node {} at {}", address, token_point);
|
||||
|
||||
@@ -477,7 +469,7 @@ static std::vector<inet_address> calculate_natural_endpoints(
|
||||
return std::move(replicas.get_vector());
|
||||
}
|
||||
|
||||
static void test_equivalence(token_metadata& tm, snitch_ptr& snitch, dht::murmur3_partitioner& partitioner, const std::unordered_map<sstring, size_t>& datacenters) {
|
||||
static void test_equivalence(token_metadata& tm, snitch_ptr& snitch, const std::unordered_map<sstring, size_t>& datacenters) {
|
||||
class my_network_topology_strategy : public network_topology_strategy {
|
||||
public:
|
||||
using network_topology_strategy::network_topology_strategy;
|
||||
@@ -493,7 +485,7 @@ static void test_equivalence(token_metadata& tm, snitch_ptr& snitch, dht::murmur
|
||||
})));
|
||||
|
||||
for (size_t i = 0; i < 1000; ++i) {
|
||||
auto token = partitioner.get_random_token();
|
||||
auto token = dht::token::get_random_token();
|
||||
auto expected = calculate_natural_endpoints(token, tm, snitch, datacenters);
|
||||
auto actual = nts.calculate_natural_endpoints(token, tm);
|
||||
|
||||
@@ -573,7 +565,6 @@ SEASTAR_TEST_CASE(testCalculateEndpoints) {
|
||||
constexpr size_t VNODES = 64;
|
||||
constexpr size_t RUNS = 10;
|
||||
|
||||
dht::murmur3_partitioner partitioner;
|
||||
std::unordered_map<sstring, size_t> datacenters = {
|
||||
{ "rf1", 1 },
|
||||
{ "rf3", 3 },
|
||||
@@ -597,10 +588,10 @@ SEASTAR_TEST_CASE(testCalculateEndpoints) {
|
||||
|
||||
for (auto& node : nodes) {
|
||||
for (size_t i = 0; i < VNODES; ++i) {
|
||||
tm.update_normal_token(partitioner.get_random_token(), node);
|
||||
tm.update_normal_token(dht::token::get_random_token(), node);
|
||||
}
|
||||
}
|
||||
test_equivalence(tm, snitch, partitioner, datacenters);
|
||||
test_equivalence(tm, snitch, datacenters);
|
||||
}
|
||||
|
||||
return i_endpoint_snitch::stop_snitch();
|
||||
|
||||
@@ -47,7 +47,7 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) {
|
||||
.with_column("v", bytes_type)
|
||||
.build();
|
||||
|
||||
dht::token tok = dht::global_partitioner().get_random_token();
|
||||
dht::token tok = dht::token::get_random_token();
|
||||
|
||||
auto key1 = dht::decorated_key{tok,
|
||||
partition_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("key1"))))};
|
||||
@@ -245,8 +245,8 @@ BOOST_AUTO_TEST_CASE(range_overlap_tests) {
|
||||
|
||||
auto get_item(std::string left, std::string right, std::string val) {
|
||||
using value_type = std::unordered_set<std::string>;
|
||||
auto l = dht::global_partitioner().from_sstring(left);
|
||||
auto r = dht::global_partitioner().from_sstring(right);
|
||||
auto l = dht::token::from_sstring(left);
|
||||
auto r = dht::token::from_sstring(right);
|
||||
auto rg = dht::token_range({{l, false}}, {r});
|
||||
value_type v{val};
|
||||
return std::make_pair(locator::token_metadata::range_to_interval(rg), v);
|
||||
@@ -272,7 +272,7 @@ BOOST_AUTO_TEST_CASE(test_range_interval_map) {
|
||||
}
|
||||
|
||||
auto search_item = [&mymap] (std::string val) {
|
||||
auto tok = dht::global_partitioner().from_sstring(val);
|
||||
auto tok = dht::token::from_sstring(val);
|
||||
auto search = dht::token_range(tok);
|
||||
auto it = mymap.find(locator::token_metadata::range_to_interval(search));
|
||||
if (it != mymap.end()) {
|
||||
|
||||
@@ -43,16 +43,11 @@ debug(Args&&... args) {
|
||||
}
|
||||
|
||||
static dht::token token_from_long(uint64_t value) {
|
||||
auto t = net::hton(value);
|
||||
bytes b(bytes::initialized_later(), 8);
|
||||
std::copy_n(reinterpret_cast<int8_t*>(&t), 8, b.begin());
|
||||
return { dht::token::kind::key, std::move(b) };
|
||||
return dht::token(dht::token::kind::key, value);
|
||||
}
|
||||
|
||||
static int64_t long_from_token(dht::token token) {
|
||||
int64_t data;
|
||||
std::copy_n(token._data.data(), 8, reinterpret_cast<char*>(&data));
|
||||
return net::ntoh(data);
|
||||
return token._data;
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_decorated_key_is_compatible_with_origin) {
|
||||
@@ -78,7 +73,7 @@ SEASTAR_THREAD_TEST_CASE(test_token_wraparound_1) {
|
||||
BOOST_REQUIRE(t1 > t2);
|
||||
// Even without knowing what the midpoint is, it needs to be inside the
|
||||
// wrapped range, i.e., between t1 and inf, OR between -inf and t2
|
||||
auto midpoint = partitioner.midpoint(t1, t2);
|
||||
auto midpoint = dht::token::midpoint(t1, t2);
|
||||
BOOST_REQUIRE(midpoint > t1 || midpoint < t2);
|
||||
// We can also calculate the actual value the midpoint should have:
|
||||
BOOST_REQUIRE_EQUAL(midpoint, token_from_long(0x8800'0000'0000'0000));
|
||||
@@ -89,7 +84,7 @@ SEASTAR_THREAD_TEST_CASE(test_token_wraparound_2) {
|
||||
auto t2 = token_from_long(0x9000'0000'0000'0000);
|
||||
dht::murmur3_partitioner partitioner;
|
||||
BOOST_REQUIRE(t1 > t2);
|
||||
auto midpoint = partitioner.midpoint(t1, t2);
|
||||
auto midpoint = dht::token::midpoint(t1, t2);
|
||||
BOOST_REQUIRE(midpoint > t1 || midpoint < t2);
|
||||
BOOST_REQUIRE_EQUAL(midpoint, token_from_long(0x7800'0000'0000'0000));
|
||||
}
|
||||
@@ -224,9 +219,8 @@ SEASTAR_THREAD_TEST_CASE(test_ring_position_ordering) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_token_no_wraparound_1) {
|
||||
auto t1 = token_from_long(0x5000'0000'0000'0000);
|
||||
auto t2 = token_from_long(0x7000'0000'0000'0000);
|
||||
dht::murmur3_partitioner partitioner;
|
||||
BOOST_REQUIRE(t1 < t2);
|
||||
auto midpoint = partitioner.midpoint(t1, t2);
|
||||
auto midpoint = dht::token::midpoint(t1, t2);
|
||||
BOOST_REQUIRE(midpoint > t1 && midpoint < t2);
|
||||
BOOST_REQUIRE_EQUAL(midpoint, token_from_long(0x6000'0000'0000'0000));
|
||||
}
|
||||
@@ -243,7 +237,7 @@ void test_partitioner_sharding(const dht::i_partitioner& part, unsigned shards,
|
||||
BOOST_REQUIRE_EQUAL(part.shard_of(lim), i % shards);
|
||||
if (i != 0) {
|
||||
BOOST_REQUIRE_EQUAL(part.shard_of(prev_token(part, lim)), (i - 1) % shards);
|
||||
BOOST_REQUIRE(part.is_equal(lim, part.token_for_next_shard(prev_token(part, lim), i % shards)));
|
||||
BOOST_REQUIRE_EQUAL(lim, part.token_for_next_shard(prev_token(part, lim), i % shards));
|
||||
}
|
||||
if (i != (shards << ignorebits) - 1) {
|
||||
auto next_shard = (i + 1) % shards;
|
||||
@@ -613,7 +607,7 @@ do_test_selective_token_range_sharder(const dht::i_partitioner& part, const sche
|
||||
}
|
||||
BOOST_REQUIRE(end_shard == shard);
|
||||
}
|
||||
auto midpoint = part.midpoint(
|
||||
auto midpoint = dht::token::midpoint(
|
||||
range_shard->start() ? range_shard->start()->value() : dht::minimum_token(),
|
||||
range_shard->end() ? range_shard->end()->value() : dht::minimum_token());
|
||||
auto mid_shard = part.shard_of(midpoint);
|
||||
|
||||
@@ -55,7 +55,7 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) {
|
||||
.with_column("v", bytes_type)
|
||||
.build();
|
||||
|
||||
dht::token tok = dht::global_partitioner().get_random_token();
|
||||
dht::token tok = dht::token::get_random_token();
|
||||
|
||||
auto key1 = dht::decorated_key{tok,
|
||||
partition_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("key1"))))};
|
||||
@@ -282,8 +282,8 @@ BOOST_AUTO_TEST_CASE(range_overlap_tests) {
|
||||
|
||||
auto get_item(std::string left, std::string right, std::string val) {
|
||||
using value_type = std::unordered_set<std::string>;
|
||||
auto l = dht::global_partitioner().from_sstring(left);
|
||||
auto r = dht::global_partitioner().from_sstring(right);
|
||||
auto l = dht::token::from_sstring(left);
|
||||
auto r = dht::token::from_sstring(right);
|
||||
auto rg = range<dht::token>({{l, false}}, {r});
|
||||
value_type v{val};
|
||||
return std::make_pair(locator::token_metadata::range_to_interval(rg), v);
|
||||
@@ -309,7 +309,7 @@ BOOST_AUTO_TEST_CASE(test_range_interval_map) {
|
||||
}
|
||||
|
||||
auto search_item = [&mymap] (std::string val) {
|
||||
auto tok = dht::global_partitioner().from_sstring(val);
|
||||
auto tok = dht::token::from_sstring(val);
|
||||
auto search = range<token>(tok);
|
||||
auto it = mymap.find(locator::token_metadata::range_to_interval(search));
|
||||
if (it != mymap.end()) {
|
||||
|
||||
@@ -5019,10 +5019,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) {
|
||||
}
|
||||
|
||||
static dht::token token_from_long(int64_t value) {
|
||||
auto t = net::hton(value);
|
||||
bytes b(bytes::initialized_later(), 8);
|
||||
std::copy_n(reinterpret_cast<int8_t*>(&t), 8, b.begin());
|
||||
return { dht::token::kind::key, std::move(b) };
|
||||
return { dht::token::kind::key, value };
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(basic_interval_map_testing_for_sstable_set) {
|
||||
|
||||
@@ -187,9 +187,9 @@ SEASTAR_TEST_CASE(test_query_size_estimates_virtual_table) {
|
||||
|
||||
SEASTAR_TEST_CASE(test_query_view_built_progress_virtual_table) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto rand = [] { return dht::global_partitioner().get_random_token(); };
|
||||
auto rand = [] { return dht::token::get_random_token(); };
|
||||
auto next_token = rand();
|
||||
auto next_token_str = dht::global_partitioner().to_sstring(next_token);
|
||||
auto next_token_str = next_token.to_sstring();
|
||||
db::system_keyspace::register_view_for_building("ks", "v1", rand()).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v2", rand()).get();
|
||||
db::system_keyspace::register_view_for_building("ks", "v3", rand()).get();
|
||||
|
||||
@@ -803,8 +803,8 @@ public:
|
||||
void describe_splits_ex(thrift_fn::function<void(std::vector<CfSplit> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) {
|
||||
with_cob(std::move(cob), std::move(exn_cob), [&]{
|
||||
dht::token_range_vector ranges;
|
||||
auto tstart = start_token.empty() ? dht::minimum_token() : dht::global_partitioner().from_sstring(sstring(start_token));
|
||||
auto tend = end_token.empty() ? dht::maximum_token() : dht::global_partitioner().from_sstring(sstring(end_token));
|
||||
auto tstart = start_token.empty() ? dht::minimum_token() : dht::token::from_sstring(sstring(start_token));
|
||||
auto tend = end_token.empty() ? dht::maximum_token() : dht::token::from_sstring(sstring(end_token));
|
||||
range<dht::token> r({{ std::move(tstart), false }}, {{ std::move(tend), true }});
|
||||
auto cf = sstring(cfName);
|
||||
auto splits = service::get_local_storage_service().get_splits(current_keyspace(), cf, std::move(r), keys_per_split);
|
||||
@@ -813,8 +813,8 @@ public:
|
||||
for (auto&& s : splits) {
|
||||
res.emplace_back();
|
||||
assert(s.first.start() && s.first.end());
|
||||
auto start_token = dht::global_partitioner().to_sstring(s.first.start()->value());
|
||||
auto end_token = dht::global_partitioner().to_sstring(s.first.end()->value());
|
||||
auto start_token = s.first.start()->value().to_sstring();
|
||||
auto end_token = s.first.end()->value().to_sstring();
|
||||
res.back().__set_start_token(bytes_to_string(to_bytes_view(start_token)));
|
||||
res.back().__set_end_token(bytes_to_string(to_bytes_view(end_token)));
|
||||
res.back().__set_row_count(s.second);
|
||||
@@ -1714,7 +1714,7 @@ private:
|
||||
auto start = range.start_key.empty()
|
||||
? dht::ring_position::starting_at(dht::minimum_token())
|
||||
: partitioner.decorate_key(s, key_from_thrift(s, to_bytes(range.start_key)));
|
||||
auto end = dht::ring_position::ending_at(partitioner.from_sstring(sstring(range.end_token)));
|
||||
auto end = dht::ring_position::ending_at(dht::token::from_sstring(sstring(range.end_token)));
|
||||
if (end.token().is_minimum()) {
|
||||
end = dht::ring_position::ending_at(dht::maximum_token());
|
||||
} else if (end.less_compare(s, start)) {
|
||||
@@ -1725,8 +1725,8 @@ private:
|
||||
}
|
||||
|
||||
// Token range can wrap; the start token is exclusive.
|
||||
auto start = dht::ring_position::ending_at(partitioner.from_sstring(sstring(range.start_token)));
|
||||
auto end = dht::ring_position::ending_at(partitioner.from_sstring(sstring(range.end_token)));
|
||||
auto start = dht::ring_position::ending_at(dht::token::from_sstring(sstring(range.start_token)));
|
||||
auto end = dht::ring_position::ending_at(dht::token::from_sstring(sstring(range.end_token)));
|
||||
if (end.token().is_minimum()) {
|
||||
end = dht::ring_position::ending_at(dht::maximum_token());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user