From e5feff5d7157de93e558ff7ed186fadd4e84b685 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 23 Jul 2015 18:28:23 +0200 Subject: [PATCH] dht: ring_position: Switch to total ordering range::is_wrap_around() and range::contains() rely on total ordering on values to work properly. Current ring_position_comparator was only imposing a weak ordering (token positions equal to all key positions with that token). range::before() and range::after() can't work for weak ordering. If the bound is exclusive, we don't know if user-provided token position is inside or outside. Also, is_wrap_around() can't properly detect wrap around in all cases. Consider this case: (1) ]A; B] (2) [A; B] For A = (tok1) and B = (tok1, key1), (1) is a wrap around and (2) is not. Without total ordering between A and B, range::is_wrap_around() can't tell that. I think the simplest soution is to define a total ordering on ring_position by making token positions positioned either before or after all keys with that token. --- cql3/restrictions/token_restriction.hh | 10 ++-- dht/i_partitioner.cc | 44 ++++++++++------- dht/i_partitioner.hh | 66 ++++++++++++++++++++++---- query-request.hh | 5 ++ service/storage_proxy.cc | 9 ++-- sstables/partition.cc | 6 ++- streaming/stream_session.cc | 25 ++++++++-- tests/urchin/mutation_source_test.cc | 22 +++++---- tests/urchin/partitioner_test.cc | 9 ++-- 9 files changed, 145 insertions(+), 51 deletions(-) diff --git a/cql3/restrictions/token_restriction.hh b/cql3/restrictions/token_restriction.hh index 4a634cefb1..f248aa5108 100644 --- a/cql3/restrictions/token_restriction.hh +++ b/cql3/restrictions/token_restriction.hh @@ -104,10 +104,14 @@ public: typedef typename bounds_range_type::bound bound; - auto start = bound(start_token, include_start); - auto end = bound(end_token, include_end); + auto start = bound(include_start + ? dht::ring_position::starting_at(start_token) + : dht::ring_position::ending_at(start_token)); + auto end = bound(include_end + ? dht::ring_position::ending_at(end_token) + : dht::ring_position::starting_at(end_token)); - return { bounds_range_type(start, end) }; + return { bounds_range_type(std::move(start), std::move(end)) }; } class EQ; diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index 663d165e52..74ef81ae08 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -203,7 +203,7 @@ decorated_key::tri_compare(const schema& s, const ring_position& other) const { if (other.has_key()) { return _key.legacy_tri_compare(s, *other.key()); } - return 0; + return -other.relation_to_keys(); } bool @@ -239,16 +239,20 @@ std::ostream& operator<<(std::ostream& out, const ring_position& pos) { out << "{" << pos.token(); if (pos.has_key()) { out << ", " << *pos.key(); + } else { + out << ", " << ((pos.relation_to_keys() < 0) ? "start" : "end"); } return out << "}"; } size_t ring_position::serialized_size() const { - size_t key_size = serialize_int32_size; + size_t size = serialize_int32_size; /* _key length */ if (_key) { - key_size += _key.value().representation().size(); + size += _key.value().representation().size(); + } else { + size += sizeof(int8_t); /* _token_bund */ } - return _token.serialized_size() + key_size; + return size + _token.serialized_size(); } void ring_position::serialize(bytes::iterator& out) const { @@ -259,6 +263,7 @@ void ring_position::serialize(bytes::iterator& out) const { out = std::copy(v.begin(), v.end(), out); } else { serialize_int32(out, 0); + serialize_int8(out, static_cast(_token_bound)); } } @@ -266,7 +271,8 @@ ring_position ring_position::deserialize(bytes_view& in) { auto token = token::deserialize(in); auto size = read_simple(in); if (size == 0) { - return ring_position(std::move(token)); + auto bound = dht::ring_position::token_bound(read_simple(in)); + return ring_position(std::move(token), bound); } else { return ring_position(std::move(token), partition_key::from_bytes(to_bytes(read_simple_bytes(in, size)))); } @@ -282,13 +288,7 @@ unsigned shard_of(const token& t) { } int ring_position_comparator::operator()(const ring_position& lh, const ring_position& rh) const { - if (lh.less_compare(s, rh)) { - return -1; - } else if (lh.equal(s, rh)) { - return 0; - } else { - return 1; - } + return lh.tri_compare(s, rh); } void token::serialize(bytes::iterator& out) const { @@ -322,13 +322,21 @@ bool ring_position::less_compare(const schema& s, const ring_position& other) co return tri_compare(s, other) < 0; } -int ring_position::tri_compare(const schema& s, const ring_position& other) const { - if (_token != other._token) { - return _token < other._token ? -1 : 1; - } else if (!_key || !other._key) { - return 0; +int ring_position::tri_compare(const schema& s, const ring_position& o) const { + if (_token != o._token) { + return _token < o._token ? -1 : 1; + } + + if (_key && o._key) { + return _key->legacy_tri_compare(s, *o._key); + } + + if (!_key && !o._key) { + return relation_to_keys() - o.relation_to_keys(); + } else if (!_key) { + return relation_to_keys(); } else { - return _key->legacy_tri_compare(s, *other._key); + return -o.relation_to_keys(); } } diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index 4282594d5c..b85673016f 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -126,6 +126,8 @@ public: bool less_compare(const schema& s, const decorated_key& other) const; bool less_compare(const schema& s, const ring_position& other) const; + // Trichotomic comparators defining total ordering on the union of + // decorated_key and ring_position objects. int tri_compare(const schema& s, const decorated_key& other) const; int tri_compare(const schema& s, const ring_position& other) const; @@ -237,23 +239,52 @@ protected: }; // -// Represents position in the ring of partitons, where partitions are ordered +// Represents position in the ring of partitions, where partitions are ordered // according to decorated_key ordering (first by token, then by key value). +// Intended to be used for defining partition ranges. // -// The 'key' part is optional. When it's absent, this object selects all -// partitions which share given token. When 'key' is present, position is -// further narrowed down to a partition with given key value among all -// partitions which share given token. +// The 'key' part is optional. When it's absent, this object represents a position +// which is either before or after all keys sharing given token. That's determined +// by relation_to_keys(). +// +// For example for the following data: +// +// tokens: | t1 | t2 | +// +----+----+----+ +// keys: | k1 | k2 | k3 | +// +// The ordering is: +// +// ring_position(t1, token_bound::start) < ring_position(k1) +// ring_position(k1) < ring_position(k2) +// ring_position(k1) == decorated_key(k1) +// ring_position(k2) == decorated_key(k2) +// ring_position(k2) < ring_position(t1, token_bound::end) +// ring_position(k2) < ring_position(k3) +// ring_position(t1, token_bound::end) < ring_position(t2, token_bound::start) // // Maps to org.apache.cassandra.db.RowPosition and its derivatives in Origin. -// Range bound intricacies are handled separately, by wrapping range<>. This -// allows us to devirtualize this. // class ring_position { +public: + enum class token_bound : int8_t { start = -1, end = 1 }; +private: dht::token _token; + token_bound _token_bound; // valid when !_key std::experimental::optional _key; public: - ring_position(dht::token token) : _token(std::move(token)) {} + static ring_position starting_at(dht::token token) { + return { std::move(token), token_bound::start }; + } + + static ring_position ending_at(dht::token token) { + return { std::move(token), token_bound::end }; + } + + ring_position(dht::token token, token_bound bound) + : _token(std::move(token)) + , _token_bound(bound) + { } ring_position(dht::token token, partition_key key) : _token(std::move(token)) @@ -269,6 +300,17 @@ public: return _token; } + // Valid when !has_key() + token_bound bound() const { + return _token_bound; + } + + // Returns -1 if smaller than keys with the same token, +1 if greater. + // Valid when !has_key(). + int relation_to_keys() const { + return static_cast(_token_bound); + } + const std::experimental::optional& key() const { return _key; } @@ -283,12 +325,16 @@ public: } bool equal(const schema&, const ring_position&) const; - bool less_compare(const schema&, const ring_position&) const; + + // Trichotomic comparator defining a total ordering on ring_position objects int tri_compare(const schema&, const ring_position&) const; + // "less" comparator corresponding to tri_compare() + bool less_compare(const schema&, const ring_position&) const; + + size_t serialized_size() const; void serialize(bytes::iterator& out) const; static ring_position deserialize(bytes_view& in); - size_t serialized_size() const; friend std::ostream& operator<<(std::ostream&, const ring_position&); }; diff --git a/query-request.hh b/query-request.hh index 1a2449112e..e66c85b6a6 100644 --- a/query-request.hh +++ b/query-request.hh @@ -43,6 +43,7 @@ public: range() : range({}, {}) {} private: // the point is before the range (works only for non wrapped ranges) + // Comparator must define a total ordering on T. template bool before(const T& point, Comparator&& cmp) const { if (!start()) { @@ -58,6 +59,7 @@ private: return false; } // the point is after the range (works only for non wrapped ranges) + // Comparator must define a total ordering on T. template bool after(const T& point, Comparator&& cmp) const { if (!end()) { @@ -106,6 +108,7 @@ public: return _singular ? _start : _end; } // end is smaller than start + // Comparator must define a total ordering on T. template bool is_wrap_around(Comparator&& cmp) const { if (_end && _start) { @@ -115,6 +118,7 @@ public: } } // the point is inside the range + // Comparator must define a total ordering on T. template bool contains(const T& point, Comparator&& cmp) const { if (is_wrap_around(cmp)) { @@ -126,6 +130,7 @@ public: } // split range in two around a split_point. split_point has to be inside the range // split_point will belong to first range + // Comparator must define a total ordering on T. template std::pair, range> split(const T& split_point, Comparator&& cmp) const { assert(contains(split_point, std::forward(cmp))); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index f93088e0da..b9dd1a2df5 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2501,8 +2501,11 @@ float storage_proxy::estimate_result_rows_per_range(lw_shared_ptr storage_proxy::get_restricted_ranges(keyspace& ks, const schema& s, query::partition_range range) { - // special case for bounds containing exactly 1 (non-minimum) token - if (range.is_singular()) { + // special case for bounds containing exactly 1 token + if (start_token(range) == end_token(range) && !range.is_wrap_around(dht::ring_position_comparator(s))) { + if (start_token(range).is_minimum()) { + return {}; + } return std::vector({std::move(range)}); } @@ -2526,7 +2529,7 @@ storage_proxy::get_restricted_ranges(keyspace& ks, const schema& s, query::parti * asSplitValue() abstracts that choice. */ - dht::ring_position split_point(upper_bound_token); + dht::ring_position split_point(upper_bound_token, dht::ring_position::token_bound::end); if (!remainder.contains(split_point, dht::ring_position_comparator(s))) { break; // no more splits } diff --git a/sstables/partition.cc b/sstables/partition.cc index c59fe2b3f0..9465c6091b 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -506,7 +506,7 @@ public: if (pos.has_key()) { return k2.tri_compare(_s, *pos.key()); } else { - return 0; + return -pos.relation_to_keys(); } } else { return k2_token < pos.token() ? -1 : 1; @@ -574,7 +574,9 @@ mutation_reader sstable::read_range_rows(schema_ptr schema, return std::make_unique(); } return read_range_rows(std::move(schema), - query::range::make({min_token}, {max_token})); + query::range::make( + dht::ring_position::starting_at(min_token), + dht::ring_position::ending_at(max_token))); } mutation_reader diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index ad7de30dc7..8a17d78d6d 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -489,6 +489,26 @@ std::vector stream_session::get_column_family_stores(const sstri return stores; } +static +query::partition_range to_partition_range(query::range r) { + using bound_opt = std::experimental::optional; + auto start = r.start() + ? bound_opt(dht::ring_position(r.start()->value(), + r.start()->is_inclusive() + ? dht::ring_position::token_bound::start + : dht::ring_position::token_bound::end)) + : bound_opt(); + + auto end = r.end() + ? bound_opt(dht::ring_position(r.end()->value(), + r.start()->is_inclusive() + ? dht::ring_position::token_bound::end + : dht::ring_position::token_bound::start)) + : bound_opt(); + + return { std::move(start), std::move(end) }; +} + void stream_session::add_transfer_ranges(sstring keyspace, std::vector> ranges, std::vector column_families, bool flush_tables, long repaired_at) { std::vector stream_details; auto cfs = get_column_family_stores(keyspace, column_families); @@ -500,10 +520,7 @@ void stream_session::add_transfer_ranges(sstring keyspace, std::vector>> prs; auto cf_id = cf->schema()->id(); for (auto& range : ranges) { - auto pr = make_shared>(std::move(range).transform( - [this] (token&& t) -> ring_position { - return { std::move(t) }; - })); + auto pr = make_shared>(to_partition_range(range)); prs.push_back(pr); auto mr = cf->make_reader(*pr); readers.push_back(std::move(mr)); diff --git a/tests/urchin/mutation_source_test.cc b/tests/urchin/mutation_source_test.cc index 3ebdb8ab13..8889955e9b 100644 --- a/tests/urchin/mutation_source_test.cc +++ b/tests/urchin/mutation_source_test.cc @@ -60,8 +60,8 @@ static void test_range_queries(populate_fn populate) { auto inclusive_token_range = [&] (size_t start, size_t end) { return query::partition_range::make( - {partitions[start].token(), true}, - {partitions[end].token(), true}); + {dht::ring_position::starting_at(partitions[start].token())}, + {dht::ring_position::ending_at(partitions[end].token())}); }; test_slice(query::partition_range::make( @@ -74,13 +74,16 @@ static void test_range_queries(populate_fn populate) { {key_before_all, false}, {partitions.front().decorated_key(), false})); test_slice(query::partition_range::make( - {key_before_all.token(), true}, {partitions.front().token(), true})); + {dht::ring_position::starting_at(key_before_all.token())}, + {dht::ring_position::ending_at(partitions.front().token())})); test_slice(query::partition_range::make( - {key_before_all.token(), false}, {partitions.front().token(), true})); + {dht::ring_position::ending_at(key_before_all.token())}, + {dht::ring_position::ending_at(partitions.front().token())})); test_slice(query::partition_range::make( - {key_before_all.token(), false}, {partitions.front().token(), false})); + {dht::ring_position::ending_at(key_before_all.token())}, + {dht::ring_position::starting_at(partitions.front().token())})); test_slice(query::partition_range::make( {partitions.back().decorated_key(), true}, {key_after_all, true})); @@ -92,13 +95,16 @@ static void test_range_queries(populate_fn populate) { {partitions.back().decorated_key(), false}, {key_after_all, false})); test_slice(query::partition_range::make( - {partitions.back().token(), true}, {key_after_all.token(), true})); + {dht::ring_position::starting_at(partitions.back().token())}, + {dht::ring_position::ending_at(key_after_all.token())})); test_slice(query::partition_range::make( - {partitions.back().token(), true}, {key_after_all.token(), false})); + {dht::ring_position::starting_at(partitions.back().token())}, + {dht::ring_position::starting_at(key_after_all.token())})); test_slice(query::partition_range::make( - {partitions.back().token(), false}, {key_after_all.token(), false})); + {dht::ring_position::ending_at(partitions.back().token())}, + {dht::ring_position::starting_at(key_after_all.token())})); test_slice(query::partition_range::make( {partitions[0].decorated_key(), false}, diff --git a/tests/urchin/partitioner_test.cc b/tests/urchin/partitioner_test.cc index 4213b502d2..47e1d5dc20 100644 --- a/tests/urchin/partitioner_test.cc +++ b/tests/urchin/partitioner_test.cc @@ -72,12 +72,15 @@ BOOST_AUTO_TEST_CASE(test_ring_position_is_comparable_with_decorated_key) { BOOST_REQUIRE(k1._token != k2._token); // The rest of the test assumes that. - BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position(k1._token)) == 0); + BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position::starting_at(k1._token)) > 0); + BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position::ending_at(k1._token)) < 0); BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position(k1)) == 0); - BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position(k2._token)) < 0); + BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position::starting_at(k2._token)) < 0); + BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position::ending_at(k2._token)) < 0); BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position(k2)) < 0); - BOOST_REQUIRE(k2.tri_compare(*s, dht::ring_position(k1._token)) > 0); + BOOST_REQUIRE(k2.tri_compare(*s, dht::ring_position::starting_at(k1._token)) > 0); + BOOST_REQUIRE(k2.tri_compare(*s, dht::ring_position::ending_at(k1._token)) > 0); BOOST_REQUIRE(k2.tri_compare(*s, dht::ring_position(k1)) > 0); }