diff --git a/database.cc b/database.cc index a483dcab7b..795c5e49b6 100644 --- a/database.cc +++ b/database.cc @@ -349,6 +349,16 @@ future<> column_family::probe_file(sstring sstdir, sstring fname) { } void column_family::add_sstable(sstables::sstable&& sstable) { + auto key_shard = [this] (const partition_key& pk) { + auto token = dht::global_partitioner().get_token(*_schema, pk); + return dht::shard_of(token); + }; + auto s1 = key_shard(sstable.get_first_partition_key(*_schema)); + auto s2 = key_shard(sstable.get_last_partition_key(*_schema)); + if (s1 > engine().cpu_id() || engine().cpu_id() < s2) { + dblog.info("sstable {} not relevant for this shard, ignoring", sstable.get_filename()); + return; + } auto generation = sstable.generation(); // allow in-progress reads to continue using old list _sstables = make_lw_shared(*_sstables); diff --git a/dht/byte_ordered_partitioner.cc b/dht/byte_ordered_partitioner.cc index c9c9650cb3..c84cb3bf57 100644 --- a/dht/byte_ordered_partitioner.cc +++ b/dht/byte_ordered_partitioner.cc @@ -25,6 +25,15 @@ token byte_ordered_partitioner::midpoint(const token& t1, const token& t2) const throw std::runtime_error("not implemented"); } +unsigned +byte_ordered_partitioner::shard_of(const token& t) const { + if (t._data.empty()) { + return 0; + } + // treat first byte as a fraction in the range [0, 1) and divide it evenly: + return (uint8_t(t._data[0]) * smp::count) >> 8; +} + using registry = class_registrator; static registry registrator("org.apache.cassandra.dht.ByteOrderedPartitioner"); static registry registrator_short_name("ByteOrderedPartitioner"); diff --git a/dht/byte_ordered_partitioner.hh b/dht/byte_ordered_partitioner.hh index 906431036a..06d223e87e 100644 --- a/dht/byte_ordered_partitioner.hh +++ b/dht/byte_ordered_partitioner.hh @@ -39,6 +39,7 @@ public: virtual sstring to_sstring(const dht::token& t) const override { return to_hex(t._data); } + virtual unsigned shard_of(const token& t) const override; }; } diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index 74ef81ae08..a590bf6827 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -279,12 +279,7 @@ ring_position ring_position::deserialize(bytes_view& in) { } unsigned shard_of(const token& t) { - if (t._data.size() < 2) { - return 0; - } - uint16_t v = uint8_t(t._data[t._data.size() - 1]) - | (uint8_t(t._data[t._data.size() - 2]) << 8); - return v % smp::count; + return global_partitioner().shard_of(t); } int ring_position_comparator::operator()(const ring_position& lh, const ring_position& rh) const { diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index b85673016f..fd1bd72dbe 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -224,6 +224,11 @@ public: * @return name of partitioner. */ virtual const sstring name() = 0; + + /** + * Calculates the shard that handles a particular token. + */ + virtual unsigned shard_of(const token& t) const = 0; protected: /** * @return true if t1's _data array is equal t2's. _kind comparison should be done separately. diff --git a/dht/murmur3_partitioner.cc b/dht/murmur3_partitioner.cc index d587901d0a..66f20c2d19 100644 --- a/dht/murmur3_partitioner.cc +++ b/dht/murmur3_partitioner.cc @@ -111,6 +111,15 @@ murmur3_partitioner::get_token_validator() { abort(); } +unsigned +murmur3_partitioner::shard_of(const token& t) const { + int64_t l = long_token(t); + // treat l as a fraction between 0 and 1 and use 128-bit arithmetic to + // divide that range evenly among shards: + uint64_t adjusted = uint64_t(l) + uint64_t(std::numeric_limits::min()); + return (__int128(adjusted) * smp::count) >> 64; +} + using registry = class_registrator; static registry registrator("org.apache.cassandra.dht.Murmur3Partitioner"); static registry registrator_short_name("Murmur3Partitioner"); diff --git a/dht/murmur3_partitioner.hh b/dht/murmur3_partitioner.hh index fcb5c9364c..db640a8349 100644 --- a/dht/murmur3_partitioner.hh +++ b/dht/murmur3_partitioner.hh @@ -22,6 +22,7 @@ public: virtual bool is_less(const token& t1, const token& t2) override; virtual token midpoint(const token& t1, const token& t2) const override; virtual sstring to_sstring(const dht::token& t) const override; + virtual unsigned shard_of(const token& t) const override; private: static int64_t normalize(int64_t in); token get_token(bytes_view key); diff --git a/sstables/key.cc b/sstables/key.cc index e2e788d72f..02ce7cd211 100644 --- a/sstables/key.cc +++ b/sstables/key.cc @@ -124,6 +124,11 @@ key key::from_partition_key(const schema& s, const partition_key& pk) { return from_components(pk.begin(s), pk.end(s), sstable_serializer(), composite); } +partition_key +key::to_partition_key(const schema& s) { + return partition_key::from_exploded(s, explode(s)); +} + template composite composite::from_clustering_element(const schema& s, const ClusteringElement& ce) { return from_components(ce.begin(s), ce.end(s), sstable_serializer()); diff --git a/sstables/key.hh b/sstables/key.hh index 2ccd8a6549..66997c6127 100644 --- a/sstables/key.hh +++ b/sstables/key.hh @@ -79,6 +79,7 @@ public: static key from_exploded(const schema& s, std::vector&& v); // Unfortunately, the _bytes field for the partition_key are not public. We can't move. static key from_partition_key(const schema& s, const partition_key& pk); + partition_key to_partition_key(const schema& s); std::vector explode(const schema& s) const; diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 227d3008f6..f8d939860a 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1441,6 +1441,16 @@ future> sstable::data_read(uint64_t pos, size_t len) { }); } +partition_key +sstable::get_first_partition_key(const schema& s) const { + return key::from_bytes(_summary.first_key.value).to_partition_key(s); +} + +partition_key +sstable::get_last_partition_key(const schema& s) const { + return key::from_bytes(_summary.last_key.value).to_partition_key(s); +} + sstable::~sstable() { if (_index_file) { _index_file.close().handle_exception([save = _index_file] (auto ep) { diff --git a/sstables/sstables.hh b/sstables/sstables.hh index a5923ab546..2d23e53b7b 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -235,6 +235,9 @@ public: // Returns the total bytes of all components. future bytes_on_disk(); + partition_key get_first_partition_key(const schema& s) const; + partition_key get_last_partition_key(const schema& s) const; + const sstring get_filename() { return filename(component_type::Data); }