Merge "Range friendly sharding"

Switch sharding to work on the most significant bits of the token,
rather than the least significant bits.  This is more friendly to
range operations, since adjacent partitions will reside on the same shard.

Not exploited yet, except for ignoring sstables that don't belong to a
shard.
This commit is contained in:
Avi Kivity
2015-08-03 20:32:58 +03:00
11 changed files with 55 additions and 6 deletions

View File

@@ -349,6 +349,16 @@ future<> column_family::probe_file(sstring sstdir, sstring fname) {
}
void column_family::add_sstable(sstables::sstable&& sstable) {
auto key_shard = [this] (const partition_key& pk) {
auto token = dht::global_partitioner().get_token(*_schema, pk);
return dht::shard_of(token);
};
auto s1 = key_shard(sstable.get_first_partition_key(*_schema));
auto s2 = key_shard(sstable.get_last_partition_key(*_schema));
if (s1 > engine().cpu_id() || engine().cpu_id() < s2) {
dblog.info("sstable {} not relevant for this shard, ignoring", sstable.get_filename());
return;
}
auto generation = sstable.generation();
// allow in-progress reads to continue using old list
_sstables = make_lw_shared<sstable_list>(*_sstables);

View File

@@ -25,6 +25,15 @@ token byte_ordered_partitioner::midpoint(const token& t1, const token& t2) const
throw std::runtime_error("not implemented");
}
unsigned
byte_ordered_partitioner::shard_of(const token& t) const {
if (t._data.empty()) {
return 0;
}
// treat first byte as a fraction in the range [0, 1) and divide it evenly:
return (uint8_t(t._data[0]) * smp::count) >> 8;
}
using registry = class_registrator<i_partitioner, byte_ordered_partitioner>;
static registry registrator("org.apache.cassandra.dht.ByteOrderedPartitioner");
static registry registrator_short_name("ByteOrderedPartitioner");

View File

@@ -39,6 +39,7 @@ public:
virtual sstring to_sstring(const dht::token& t) const override {
return to_hex(t._data);
}
virtual unsigned shard_of(const token& t) const override;
};
}

View File

@@ -279,12 +279,7 @@ ring_position ring_position::deserialize(bytes_view& in) {
}
unsigned shard_of(const token& t) {
if (t._data.size() < 2) {
return 0;
}
uint16_t v = uint8_t(t._data[t._data.size() - 1])
| (uint8_t(t._data[t._data.size() - 2]) << 8);
return v % smp::count;
return global_partitioner().shard_of(t);
}
int ring_position_comparator::operator()(const ring_position& lh, const ring_position& rh) const {

View File

@@ -224,6 +224,11 @@ public:
* @return name of partitioner.
*/
virtual const sstring name() = 0;
/**
* Calculates the shard that handles a particular token.
*/
virtual unsigned shard_of(const token& t) const = 0;
protected:
/**
* @return true if t1's _data array is equal t2's. _kind comparison should be done separately.

View File

@@ -111,6 +111,15 @@ murmur3_partitioner::get_token_validator() {
abort();
}
unsigned
murmur3_partitioner::shard_of(const token& t) const {
int64_t l = long_token(t);
// treat l as a fraction between 0 and 1 and use 128-bit arithmetic to
// divide that range evenly among shards:
uint64_t adjusted = uint64_t(l) + uint64_t(std::numeric_limits<int64_t>::min());
return (__int128(adjusted) * smp::count) >> 64;
}
using registry = class_registrator<i_partitioner, murmur3_partitioner>;
static registry registrator("org.apache.cassandra.dht.Murmur3Partitioner");
static registry registrator_short_name("Murmur3Partitioner");

View File

@@ -22,6 +22,7 @@ public:
virtual bool is_less(const token& t1, const token& t2) override;
virtual token midpoint(const token& t1, const token& t2) const override;
virtual sstring to_sstring(const dht::token& t) const override;
virtual unsigned shard_of(const token& t) const override;
private:
static int64_t normalize(int64_t in);
token get_token(bytes_view key);

View File

@@ -124,6 +124,11 @@ key key::from_partition_key(const schema& s, const partition_key& pk) {
return from_components(pk.begin(s), pk.end(s), sstable_serializer(), composite);
}
partition_key
key::to_partition_key(const schema& s) {
return partition_key::from_exploded(s, explode(s));
}
template <typename ClusteringElement>
composite composite::from_clustering_element(const schema& s, const ClusteringElement& ce) {
return from_components(ce.begin(s), ce.end(s), sstable_serializer());

View File

@@ -79,6 +79,7 @@ public:
static key from_exploded(const schema& s, std::vector<bytes>&& v);
// Unfortunately, the _bytes field for the partition_key are not public. We can't move.
static key from_partition_key(const schema& s, const partition_key& pk);
partition_key to_partition_key(const schema& s);
std::vector<bytes> explode(const schema& s) const;

View File

@@ -1441,6 +1441,16 @@ future<temporary_buffer<char>> sstable::data_read(uint64_t pos, size_t len) {
});
}
partition_key
sstable::get_first_partition_key(const schema& s) const {
return key::from_bytes(_summary.first_key.value).to_partition_key(s);
}
partition_key
sstable::get_last_partition_key(const schema& s) const {
return key::from_bytes(_summary.last_key.value).to_partition_key(s);
}
sstable::~sstable() {
if (_index_file) {
_index_file.close().handle_exception([save = _index_file] (auto ep) {

View File

@@ -235,6 +235,9 @@ public:
// Returns the total bytes of all components.
future<uint64_t> bytes_on_disk();
partition_key get_first_partition_key(const schema& s) const;
partition_key get_last_partition_key(const schema& s) const;
const sstring get_filename() {
return filename(component_type::Data);
}