Merge "Add ByteOrderedPartitioner" from Paweł

This patch series adds partial implementation of ByteOrderedPartitioner
and allows choosing it in configuration file.
While ByteOrderedPartitioner is generally not recommended it is used by
some tests in DTEST due to its order guarantees.
This commit is contained in:
Avi Kivity
2015-07-12 16:20:19 +03:00
7 changed files with 92 additions and 3 deletions

View File

@@ -489,6 +489,7 @@ urchin_core = (['database.cc',
'gms/gms.cc',
'dht/i_partitioner.cc',
'dht/murmur3_partitioner.cc',
'dht/byte_ordered_partitioner.cc',
'unimplemented.cc',
'query.cc',
'query-result-set.cc',

View File

@@ -0,0 +1,32 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include "byte_ordered_partitioner.hh"
#include "utils/class_registrator.hh"
namespace dht {
token byte_ordered_partitioner::get_random_token()
{
bytes b(bytes::initialized_later(), 16);
*unaligned_cast<uint64_t>(b.begin()) = dht::get_random_number<uint64_t>();
*unaligned_cast<uint64_t>(b.begin() + 8) = dht::get_random_number<uint64_t>();
return token(token::kind::key, std::move(b));
}
std::map<token, float> byte_ordered_partitioner::describe_ownership(const std::vector<token>& sorted_tokens)
{
throw std::runtime_error("not implemented");
}
token byte_ordered_partitioner::midpoint(const token& t1, const token& t2) const
{
throw std::runtime_error("not implemented");
}
using registry = class_registrator<i_partitioner, byte_ordered_partitioner>;
static registry registrator("org.apache.cassandra.dht.ByteOrderedPartitioner");
static registry registrator_short_name("ByteOrderedPartitioner");
}

View File

@@ -0,0 +1,44 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include "i_partitioner.hh"
#include "bytes.hh"
#include "sstables/key.hh"
namespace dht {
class byte_ordered_partitioner final : public i_partitioner {
public:
virtual const sstring name() { return "org.apache.cassandra.dht.ByteOrderedPartitioner"; }
virtual token get_token(const schema& s, partition_key_view key) override {
auto&& legacy = key.legacy_form(s);
return token(token::kind::key, bytes(legacy.begin(), legacy.end()));
}
virtual token get_token(const sstables::key_view& key) override {
auto v = bytes_view(key);
if (v.empty()) {
return minimum_token();
}
return token(token::kind::key, bytes(v.begin(), v.end()));
}
virtual token get_random_token() override;
virtual bool preserves_order() override { return true; }
virtual std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens) override;
virtual data_type get_token_validator() override { return bytes_type; }
virtual bool is_equal(const token& t1, const token& t2) override {
return compare_unsigned(t1._data, t2._data) == 0;
}
virtual bool is_less(const token& t1, const token& t2) override {
return compare_unsigned(t1._data, t2._data) < 0;
}
virtual token midpoint(const token& t1, const token& t2) const;
virtual sstring to_sstring(const dht::token& t) const override {
return to_hex(t._data);
}
};
}

View File

@@ -5,6 +5,7 @@
#include "i_partitioner.hh"
#include "core/reactor.hh"
#include "murmur3_partitioner.hh"
#include "utils/class_registrator.hh"
namespace dht {
@@ -157,13 +158,17 @@ std::ostream& operator<<(std::ostream& out, const decorated_key& dk) {
return out << "{key: " << dk._key << ", token:" << dk._token << "}";
}
// FIXME: get from global config
// FIXME: make it per-keyspace
murmur3_partitioner default_partitioner;
std::unique_ptr<i_partitioner> default_partitioner { new murmur3_partitioner };
void set_global_partitioner(const sstring& class_name)
{
default_partitioner = create_object<i_partitioner>(class_name);
}
i_partitioner&
global_partitioner() {
return default_partitioner;
return *default_partitioner;
}
bool

View File

@@ -273,6 +273,7 @@ std::ostream& operator<<(std::ostream& out, const token& t);
std::ostream& operator<<(std::ostream& out, const decorated_key& t);
void set_global_partitioner(const sstring& class_name);
i_partitioner& global_partitioner();
unsigned shard_of(const token&);

View File

@@ -5,6 +5,7 @@
#include "murmur3_partitioner.hh"
#include "utils/murmur_hash.hh"
#include "sstables/key.hh"
#include "utils/class_registrator.hh"
namespace dht {
@@ -110,6 +111,10 @@ murmur3_partitioner::get_token_validator() {
abort();
}
using registry = class_registrator<i_partitioner, murmur3_partitioner>;
static registry registrator("org.apache.cassandra.dht.Murmur3Partitioner");
static registry registrator_short_name("Murmur3Partitioner");
}

View File

@@ -51,6 +51,7 @@ int main(int ac, char** av) {
auto&& opts = app.configuration();
return read_config(opts, *cfg).then([&cfg, &db, &qp, &proxy, &ctx, &opts]() {
dht::set_global_partitioner(cfg->partitioner());
uint16_t thrift_port = cfg->rpc_port();
uint16_t cql_port = cfg->native_transport_port();
uint16_t api_port = opts["api-port"].as<uint16_t>();