From f915ff1fcd9a5a5102c377766ace8d320ed76cfb Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 3 Aug 2015 17:29:30 +0300 Subject: [PATCH 1/4] dht: introduce i_partitioner::shard_of() and implement msb sharding Make sharding partitioner-specific, since different partitioners interpret the byte content differently. Implement it by extracting the shard from the most significant bits, which can be used to minimize cross shard traffic for range queries, and reduces sstable sharing. --- dht/byte_ordered_partitioner.cc | 9 +++++++++ dht/byte_ordered_partitioner.hh | 1 + dht/i_partitioner.cc | 7 +------ dht/i_partitioner.hh | 5 +++++ dht/murmur3_partitioner.cc | 9 +++++++++ dht/murmur3_partitioner.hh | 1 + 6 files changed, 26 insertions(+), 6 deletions(-) diff --git a/dht/byte_ordered_partitioner.cc b/dht/byte_ordered_partitioner.cc index c9c9650cb3..c84cb3bf57 100644 --- a/dht/byte_ordered_partitioner.cc +++ b/dht/byte_ordered_partitioner.cc @@ -25,6 +25,15 @@ token byte_ordered_partitioner::midpoint(const token& t1, const token& t2) const throw std::runtime_error("not implemented"); } +unsigned +byte_ordered_partitioner::shard_of(const token& t) const { + if (t._data.empty()) { + return 0; + } + // treat first byte as a fraction in the range [0, 1) and divide it evenly: + return (uint8_t(t._data[0]) * smp::count) >> 8; +} + using registry = class_registrator; static registry registrator("org.apache.cassandra.dht.ByteOrderedPartitioner"); static registry registrator_short_name("ByteOrderedPartitioner"); diff --git a/dht/byte_ordered_partitioner.hh b/dht/byte_ordered_partitioner.hh index 906431036a..06d223e87e 100644 --- a/dht/byte_ordered_partitioner.hh +++ b/dht/byte_ordered_partitioner.hh @@ -39,6 +39,7 @@ public: virtual sstring to_sstring(const dht::token& t) const override { return to_hex(t._data); } + virtual unsigned shard_of(const token& t) const override; }; } diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index 74ef81ae08..a590bf6827 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -279,12 +279,7 @@ ring_position ring_position::deserialize(bytes_view& in) { } unsigned shard_of(const token& t) { - if (t._data.size() < 2) { - return 0; - } - uint16_t v = uint8_t(t._data[t._data.size() - 1]) - | (uint8_t(t._data[t._data.size() - 2]) << 8); - return v % smp::count; + return global_partitioner().shard_of(t); } int ring_position_comparator::operator()(const ring_position& lh, const ring_position& rh) const { diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index b85673016f..fd1bd72dbe 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -224,6 +224,11 @@ public: * @return name of partitioner. */ virtual const sstring name() = 0; + + /** + * Calculates the shard that handles a particular token. + */ + virtual unsigned shard_of(const token& t) const = 0; protected: /** * @return true if t1's _data array is equal t2's. _kind comparison should be done separately. diff --git a/dht/murmur3_partitioner.cc b/dht/murmur3_partitioner.cc index d587901d0a..66f20c2d19 100644 --- a/dht/murmur3_partitioner.cc +++ b/dht/murmur3_partitioner.cc @@ -111,6 +111,15 @@ murmur3_partitioner::get_token_validator() { abort(); } +unsigned +murmur3_partitioner::shard_of(const token& t) const { + int64_t l = long_token(t); + // treat l as a fraction between 0 and 1 and use 128-bit arithmetic to + // divide that range evenly among shards: + uint64_t adjusted = uint64_t(l) + uint64_t(std::numeric_limits::min()); + return (__int128(adjusted) * smp::count) >> 64; +} + using registry = class_registrator; static registry registrator("org.apache.cassandra.dht.Murmur3Partitioner"); static registry registrator_short_name("Murmur3Partitioner"); diff --git a/dht/murmur3_partitioner.hh b/dht/murmur3_partitioner.hh index fcb5c9364c..db640a8349 100644 --- a/dht/murmur3_partitioner.hh +++ b/dht/murmur3_partitioner.hh @@ -22,6 +22,7 @@ public: virtual bool is_less(const token& t1, const token& t2) override; virtual token midpoint(const token& t1, const token& t2) const override; virtual sstring to_sstring(const dht::token& t) const override; + virtual unsigned shard_of(const token& t) const override; private: static int64_t normalize(int64_t in); token get_token(bytes_view key); From 6ca6f0c3a4f2ced6ae7cadd327b286070c3b56cf Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 3 Aug 2015 19:17:47 +0300 Subject: [PATCH 2/4] sstables: add conversion function from sstable key to partition key --- sstables/key.cc | 5 +++++ sstables/key.hh | 1 + 2 files changed, 6 insertions(+) diff --git a/sstables/key.cc b/sstables/key.cc index e2e788d72f..02ce7cd211 100644 --- a/sstables/key.cc +++ b/sstables/key.cc @@ -124,6 +124,11 @@ key key::from_partition_key(const schema& s, const partition_key& pk) { return from_components(pk.begin(s), pk.end(s), sstable_serializer(), composite); } +partition_key +key::to_partition_key(const schema& s) { + return partition_key::from_exploded(s, explode(s)); +} + template composite composite::from_clustering_element(const schema& s, const ClusteringElement& ce) { return from_components(ce.begin(s), ce.end(s), sstable_serializer()); diff --git a/sstables/key.hh b/sstables/key.hh index 2ccd8a6549..66997c6127 100644 --- a/sstables/key.hh +++ b/sstables/key.hh @@ -79,6 +79,7 @@ public: static key from_exploded(const schema& s, std::vector&& v); // Unfortunately, the _bytes field for the partition_key are not public. We can't move. static key from_partition_key(const schema& s, const partition_key& pk); + partition_key to_partition_key(const schema& s); std::vector explode(const schema& s) const; From ad443e4771aa6aab7491f829157b2adb5a886e6a Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 3 Aug 2015 19:18:16 +0300 Subject: [PATCH 3/4] sstable: add accessor for first/last partition keys --- sstables/sstables.cc | 10 ++++++++++ sstables/sstables.hh | 3 +++ 2 files changed, 13 insertions(+) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 227d3008f6..f8d939860a 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1441,6 +1441,16 @@ future> sstable::data_read(uint64_t pos, size_t len) { }); } +partition_key +sstable::get_first_partition_key(const schema& s) const { + return key::from_bytes(_summary.first_key.value).to_partition_key(s); +} + +partition_key +sstable::get_last_partition_key(const schema& s) const { + return key::from_bytes(_summary.last_key.value).to_partition_key(s); +} + sstable::~sstable() { if (_index_file) { _index_file.close().handle_exception([save = _index_file] (auto ep) { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index a5923ab546..2d23e53b7b 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -235,6 +235,9 @@ public: // Returns the total bytes of all components. future bytes_on_disk(); + partition_key get_first_partition_key(const schema& s) const; + partition_key get_last_partition_key(const schema& s) const; + const sstring get_filename() { return filename(component_type::Data); } From c1a2831d41f0ba61ba80e1101cdec3f51e8f06ac Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 3 Aug 2015 19:18:37 +0300 Subject: [PATCH 4/4] db: ignore sstables that clearly don't belong to this shard --- database.cc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/database.cc b/database.cc index a483dcab7b..795c5e49b6 100644 --- a/database.cc +++ b/database.cc @@ -349,6 +349,16 @@ future<> column_family::probe_file(sstring sstdir, sstring fname) { } void column_family::add_sstable(sstables::sstable&& sstable) { + auto key_shard = [this] (const partition_key& pk) { + auto token = dht::global_partitioner().get_token(*_schema, pk); + return dht::shard_of(token); + }; + auto s1 = key_shard(sstable.get_first_partition_key(*_schema)); + auto s2 = key_shard(sstable.get_last_partition_key(*_schema)); + if (s1 > engine().cpu_id() || engine().cpu_id() < s2) { + dblog.info("sstable {} not relevant for this shard, ignoring", sstable.get_filename()); + return; + } auto generation = sstable.generation(); // allow in-progress reads to continue using old list _sstables = make_lw_shared(*_sstables);