dht: ring_position: Switch to total ordering
range::is_wrap_around() and range::contains() rely on total ordering on values to work properly. Current ring_position_comparator was only imposing a weak ordering (token positions equal to all key positions with that token). range::before() and range::after() can't work for weak ordering. If the bound is exclusive, we don't know if user-provided token position is inside or outside. Also, is_wrap_around() can't properly detect wrap around in all cases. Consider this case: (1) ]A; B] (2) [A; B] For A = (tok1) and B = (tok1, key1), (1) is a wrap around and (2) is not. Without total ordering between A and B, range::is_wrap_around() can't tell that. I think the simplest soution is to define a total ordering on ring_position by making token positions positioned either before or after all keys with that token.
This commit is contained in:
@@ -104,10 +104,14 @@ public:
|
||||
|
||||
typedef typename bounds_range_type::bound bound;
|
||||
|
||||
auto start = bound(start_token, include_start);
|
||||
auto end = bound(end_token, include_end);
|
||||
auto start = bound(include_start
|
||||
? dht::ring_position::starting_at(start_token)
|
||||
: dht::ring_position::ending_at(start_token));
|
||||
auto end = bound(include_end
|
||||
? dht::ring_position::ending_at(end_token)
|
||||
: dht::ring_position::starting_at(end_token));
|
||||
|
||||
return { bounds_range_type(start, end) };
|
||||
return { bounds_range_type(std::move(start), std::move(end)) };
|
||||
}
|
||||
|
||||
class EQ;
|
||||
|
||||
@@ -203,7 +203,7 @@ decorated_key::tri_compare(const schema& s, const ring_position& other) const {
|
||||
if (other.has_key()) {
|
||||
return _key.legacy_tri_compare(s, *other.key());
|
||||
}
|
||||
return 0;
|
||||
return -other.relation_to_keys();
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -239,16 +239,20 @@ std::ostream& operator<<(std::ostream& out, const ring_position& pos) {
|
||||
out << "{" << pos.token();
|
||||
if (pos.has_key()) {
|
||||
out << ", " << *pos.key();
|
||||
} else {
|
||||
out << ", " << ((pos.relation_to_keys() < 0) ? "start" : "end");
|
||||
}
|
||||
return out << "}";
|
||||
}
|
||||
|
||||
size_t ring_position::serialized_size() const {
|
||||
size_t key_size = serialize_int32_size;
|
||||
size_t size = serialize_int32_size; /* _key length */
|
||||
if (_key) {
|
||||
key_size += _key.value().representation().size();
|
||||
size += _key.value().representation().size();
|
||||
} else {
|
||||
size += sizeof(int8_t); /* _token_bund */
|
||||
}
|
||||
return _token.serialized_size() + key_size;
|
||||
return size + _token.serialized_size();
|
||||
}
|
||||
|
||||
void ring_position::serialize(bytes::iterator& out) const {
|
||||
@@ -259,6 +263,7 @@ void ring_position::serialize(bytes::iterator& out) const {
|
||||
out = std::copy(v.begin(), v.end(), out);
|
||||
} else {
|
||||
serialize_int32(out, 0);
|
||||
serialize_int8(out, static_cast<int8_t>(_token_bound));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,7 +271,8 @@ ring_position ring_position::deserialize(bytes_view& in) {
|
||||
auto token = token::deserialize(in);
|
||||
auto size = read_simple<uint32_t>(in);
|
||||
if (size == 0) {
|
||||
return ring_position(std::move(token));
|
||||
auto bound = dht::ring_position::token_bound(read_simple<int8_t>(in));
|
||||
return ring_position(std::move(token), bound);
|
||||
} else {
|
||||
return ring_position(std::move(token), partition_key::from_bytes(to_bytes(read_simple_bytes(in, size))));
|
||||
}
|
||||
@@ -282,13 +288,7 @@ unsigned shard_of(const token& t) {
|
||||
}
|
||||
|
||||
int ring_position_comparator::operator()(const ring_position& lh, const ring_position& rh) const {
|
||||
if (lh.less_compare(s, rh)) {
|
||||
return -1;
|
||||
} else if (lh.equal(s, rh)) {
|
||||
return 0;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
return lh.tri_compare(s, rh);
|
||||
}
|
||||
|
||||
void token::serialize(bytes::iterator& out) const {
|
||||
@@ -322,13 +322,21 @@ bool ring_position::less_compare(const schema& s, const ring_position& other) co
|
||||
return tri_compare(s, other) < 0;
|
||||
}
|
||||
|
||||
int ring_position::tri_compare(const schema& s, const ring_position& other) const {
|
||||
if (_token != other._token) {
|
||||
return _token < other._token ? -1 : 1;
|
||||
} else if (!_key || !other._key) {
|
||||
return 0;
|
||||
int ring_position::tri_compare(const schema& s, const ring_position& o) const {
|
||||
if (_token != o._token) {
|
||||
return _token < o._token ? -1 : 1;
|
||||
}
|
||||
|
||||
if (_key && o._key) {
|
||||
return _key->legacy_tri_compare(s, *o._key);
|
||||
}
|
||||
|
||||
if (!_key && !o._key) {
|
||||
return relation_to_keys() - o.relation_to_keys();
|
||||
} else if (!_key) {
|
||||
return relation_to_keys();
|
||||
} else {
|
||||
return _key->legacy_tri_compare(s, *other._key);
|
||||
return -o.relation_to_keys();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -126,6 +126,8 @@ public:
|
||||
bool less_compare(const schema& s, const decorated_key& other) const;
|
||||
bool less_compare(const schema& s, const ring_position& other) const;
|
||||
|
||||
// Trichotomic comparators defining total ordering on the union of
|
||||
// decorated_key and ring_position objects.
|
||||
int tri_compare(const schema& s, const decorated_key& other) const;
|
||||
int tri_compare(const schema& s, const ring_position& other) const;
|
||||
|
||||
@@ -237,23 +239,52 @@ protected:
|
||||
};
|
||||
|
||||
//
|
||||
// Represents position in the ring of partitons, where partitions are ordered
|
||||
// Represents position in the ring of partitions, where partitions are ordered
|
||||
// according to decorated_key ordering (first by token, then by key value).
|
||||
// Intended to be used for defining partition ranges.
|
||||
//
|
||||
// The 'key' part is optional. When it's absent, this object selects all
|
||||
// partitions which share given token. When 'key' is present, position is
|
||||
// further narrowed down to a partition with given key value among all
|
||||
// partitions which share given token.
|
||||
// The 'key' part is optional. When it's absent, this object represents a position
|
||||
// which is either before or after all keys sharing given token. That's determined
|
||||
// by relation_to_keys().
|
||||
//
|
||||
// For example for the following data:
|
||||
//
|
||||
// tokens: | t1 | t2 |
|
||||
// +----+----+----+
|
||||
// keys: | k1 | k2 | k3 |
|
||||
//
|
||||
// The ordering is:
|
||||
//
|
||||
// ring_position(t1, token_bound::start) < ring_position(k1)
|
||||
// ring_position(k1) < ring_position(k2)
|
||||
// ring_position(k1) == decorated_key(k1)
|
||||
// ring_position(k2) == decorated_key(k2)
|
||||
// ring_position(k2) < ring_position(t1, token_bound::end)
|
||||
// ring_position(k2) < ring_position(k3)
|
||||
// ring_position(t1, token_bound::end) < ring_position(t2, token_bound::start)
|
||||
//
|
||||
// Maps to org.apache.cassandra.db.RowPosition and its derivatives in Origin.
|
||||
// Range bound intricacies are handled separately, by wrapping range<>. This
|
||||
// allows us to devirtualize this.
|
||||
//
|
||||
class ring_position {
|
||||
public:
|
||||
enum class token_bound : int8_t { start = -1, end = 1 };
|
||||
private:
|
||||
dht::token _token;
|
||||
token_bound _token_bound; // valid when !_key
|
||||
std::experimental::optional<partition_key> _key;
|
||||
public:
|
||||
ring_position(dht::token token) : _token(std::move(token)) {}
|
||||
static ring_position starting_at(dht::token token) {
|
||||
return { std::move(token), token_bound::start };
|
||||
}
|
||||
|
||||
static ring_position ending_at(dht::token token) {
|
||||
return { std::move(token), token_bound::end };
|
||||
}
|
||||
|
||||
ring_position(dht::token token, token_bound bound)
|
||||
: _token(std::move(token))
|
||||
, _token_bound(bound)
|
||||
{ }
|
||||
|
||||
ring_position(dht::token token, partition_key key)
|
||||
: _token(std::move(token))
|
||||
@@ -269,6 +300,17 @@ public:
|
||||
return _token;
|
||||
}
|
||||
|
||||
// Valid when !has_key()
|
||||
token_bound bound() const {
|
||||
return _token_bound;
|
||||
}
|
||||
|
||||
// Returns -1 if smaller than keys with the same token, +1 if greater.
|
||||
// Valid when !has_key().
|
||||
int relation_to_keys() const {
|
||||
return static_cast<int>(_token_bound);
|
||||
}
|
||||
|
||||
const std::experimental::optional<partition_key>& key() const {
|
||||
return _key;
|
||||
}
|
||||
@@ -283,12 +325,16 @@ public:
|
||||
}
|
||||
|
||||
bool equal(const schema&, const ring_position&) const;
|
||||
bool less_compare(const schema&, const ring_position&) const;
|
||||
|
||||
// Trichotomic comparator defining a total ordering on ring_position objects
|
||||
int tri_compare(const schema&, const ring_position&) const;
|
||||
|
||||
// "less" comparator corresponding to tri_compare()
|
||||
bool less_compare(const schema&, const ring_position&) const;
|
||||
|
||||
size_t serialized_size() const;
|
||||
void serialize(bytes::iterator& out) const;
|
||||
static ring_position deserialize(bytes_view& in);
|
||||
size_t serialized_size() const;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream&, const ring_position&);
|
||||
};
|
||||
|
||||
@@ -43,6 +43,7 @@ public:
|
||||
range() : range({}, {}) {}
|
||||
private:
|
||||
// the point is before the range (works only for non wrapped ranges)
|
||||
// Comparator must define a total ordering on T.
|
||||
template<typename Comparator>
|
||||
bool before(const T& point, Comparator&& cmp) const {
|
||||
if (!start()) {
|
||||
@@ -58,6 +59,7 @@ private:
|
||||
return false;
|
||||
}
|
||||
// the point is after the range (works only for non wrapped ranges)
|
||||
// Comparator must define a total ordering on T.
|
||||
template<typename Comparator>
|
||||
bool after(const T& point, Comparator&& cmp) const {
|
||||
if (!end()) {
|
||||
@@ -106,6 +108,7 @@ public:
|
||||
return _singular ? _start : _end;
|
||||
}
|
||||
// end is smaller than start
|
||||
// Comparator must define a total ordering on T.
|
||||
template<typename Comparator>
|
||||
bool is_wrap_around(Comparator&& cmp) const {
|
||||
if (_end && _start) {
|
||||
@@ -115,6 +118,7 @@ public:
|
||||
}
|
||||
}
|
||||
// the point is inside the range
|
||||
// Comparator must define a total ordering on T.
|
||||
template<typename Comparator>
|
||||
bool contains(const T& point, Comparator&& cmp) const {
|
||||
if (is_wrap_around(cmp)) {
|
||||
@@ -126,6 +130,7 @@ public:
|
||||
}
|
||||
// split range in two around a split_point. split_point has to be inside the range
|
||||
// split_point will belong to first range
|
||||
// Comparator must define a total ordering on T.
|
||||
template<typename Comparator>
|
||||
std::pair<range<T>, range<T>> split(const T& split_point, Comparator&& cmp) const {
|
||||
assert(contains(split_point, std::forward<Comparator>(cmp)));
|
||||
|
||||
@@ -2501,8 +2501,11 @@ float storage_proxy::estimate_result_rows_per_range(lw_shared_ptr<query::read_co
|
||||
*/
|
||||
std::vector<query::partition_range>
|
||||
storage_proxy::get_restricted_ranges(keyspace& ks, const schema& s, query::partition_range range) {
|
||||
// special case for bounds containing exactly 1 (non-minimum) token
|
||||
if (range.is_singular()) {
|
||||
// special case for bounds containing exactly 1 token
|
||||
if (start_token(range) == end_token(range) && !range.is_wrap_around(dht::ring_position_comparator(s))) {
|
||||
if (start_token(range).is_minimum()) {
|
||||
return {};
|
||||
}
|
||||
return std::vector<query::partition_range>({std::move(range)});
|
||||
}
|
||||
|
||||
@@ -2526,7 +2529,7 @@ storage_proxy::get_restricted_ranges(keyspace& ks, const schema& s, query::parti
|
||||
* asSplitValue() abstracts that choice.
|
||||
*/
|
||||
|
||||
dht::ring_position split_point(upper_bound_token);
|
||||
dht::ring_position split_point(upper_bound_token, dht::ring_position::token_bound::end);
|
||||
if (!remainder.contains(split_point, dht::ring_position_comparator(s))) {
|
||||
break; // no more splits
|
||||
}
|
||||
|
||||
@@ -506,7 +506,7 @@ public:
|
||||
if (pos.has_key()) {
|
||||
return k2.tri_compare(_s, *pos.key());
|
||||
} else {
|
||||
return 0;
|
||||
return -pos.relation_to_keys();
|
||||
}
|
||||
} else {
|
||||
return k2_token < pos.token() ? -1 : 1;
|
||||
@@ -574,7 +574,9 @@ mutation_reader sstable::read_range_rows(schema_ptr schema,
|
||||
return std::make_unique<mutation_reader::impl>();
|
||||
}
|
||||
return read_range_rows(std::move(schema),
|
||||
query::range<dht::ring_position>::make({min_token}, {max_token}));
|
||||
query::range<dht::ring_position>::make(
|
||||
dht::ring_position::starting_at(min_token),
|
||||
dht::ring_position::ending_at(max_token)));
|
||||
}
|
||||
|
||||
mutation_reader
|
||||
|
||||
@@ -489,6 +489,26 @@ std::vector<column_family*> stream_session::get_column_family_stores(const sstri
|
||||
return stores;
|
||||
}
|
||||
|
||||
static
|
||||
query::partition_range to_partition_range(query::range<dht::token> r) {
|
||||
using bound_opt = std::experimental::optional<query::partition_range::bound>;
|
||||
auto start = r.start()
|
||||
? bound_opt(dht::ring_position(r.start()->value(),
|
||||
r.start()->is_inclusive()
|
||||
? dht::ring_position::token_bound::start
|
||||
: dht::ring_position::token_bound::end))
|
||||
: bound_opt();
|
||||
|
||||
auto end = r.end()
|
||||
? bound_opt(dht::ring_position(r.end()->value(),
|
||||
r.start()->is_inclusive()
|
||||
? dht::ring_position::token_bound::end
|
||||
: dht::ring_position::token_bound::start))
|
||||
: bound_opt();
|
||||
|
||||
return { std::move(start), std::move(end) };
|
||||
}
|
||||
|
||||
void stream_session::add_transfer_ranges(sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families, bool flush_tables, long repaired_at) {
|
||||
std::vector<stream_detail> stream_details;
|
||||
auto cfs = get_column_family_stores(keyspace, column_families);
|
||||
@@ -500,10 +520,7 @@ void stream_session::add_transfer_ranges(sstring keyspace, std::vector<query::ra
|
||||
std::vector<shared_ptr<query::range<ring_position>>> prs;
|
||||
auto cf_id = cf->schema()->id();
|
||||
for (auto& range : ranges) {
|
||||
auto pr = make_shared<query::range<ring_position>>(std::move(range).transform<ring_position>(
|
||||
[this] (token&& t) -> ring_position {
|
||||
return { std::move(t) };
|
||||
}));
|
||||
auto pr = make_shared<query::range<ring_position>>(to_partition_range(range));
|
||||
prs.push_back(pr);
|
||||
auto mr = cf->make_reader(*pr);
|
||||
readers.push_back(std::move(mr));
|
||||
|
||||
@@ -60,8 +60,8 @@ static void test_range_queries(populate_fn populate) {
|
||||
|
||||
auto inclusive_token_range = [&] (size_t start, size_t end) {
|
||||
return query::partition_range::make(
|
||||
{partitions[start].token(), true},
|
||||
{partitions[end].token(), true});
|
||||
{dht::ring_position::starting_at(partitions[start].token())},
|
||||
{dht::ring_position::ending_at(partitions[end].token())});
|
||||
};
|
||||
|
||||
test_slice(query::partition_range::make(
|
||||
@@ -74,13 +74,16 @@ static void test_range_queries(populate_fn populate) {
|
||||
{key_before_all, false}, {partitions.front().decorated_key(), false}));
|
||||
|
||||
test_slice(query::partition_range::make(
|
||||
{key_before_all.token(), true}, {partitions.front().token(), true}));
|
||||
{dht::ring_position::starting_at(key_before_all.token())},
|
||||
{dht::ring_position::ending_at(partitions.front().token())}));
|
||||
|
||||
test_slice(query::partition_range::make(
|
||||
{key_before_all.token(), false}, {partitions.front().token(), true}));
|
||||
{dht::ring_position::ending_at(key_before_all.token())},
|
||||
{dht::ring_position::ending_at(partitions.front().token())}));
|
||||
|
||||
test_slice(query::partition_range::make(
|
||||
{key_before_all.token(), false}, {partitions.front().token(), false}));
|
||||
{dht::ring_position::ending_at(key_before_all.token())},
|
||||
{dht::ring_position::starting_at(partitions.front().token())}));
|
||||
|
||||
test_slice(query::partition_range::make(
|
||||
{partitions.back().decorated_key(), true}, {key_after_all, true}));
|
||||
@@ -92,13 +95,16 @@ static void test_range_queries(populate_fn populate) {
|
||||
{partitions.back().decorated_key(), false}, {key_after_all, false}));
|
||||
|
||||
test_slice(query::partition_range::make(
|
||||
{partitions.back().token(), true}, {key_after_all.token(), true}));
|
||||
{dht::ring_position::starting_at(partitions.back().token())},
|
||||
{dht::ring_position::ending_at(key_after_all.token())}));
|
||||
|
||||
test_slice(query::partition_range::make(
|
||||
{partitions.back().token(), true}, {key_after_all.token(), false}));
|
||||
{dht::ring_position::starting_at(partitions.back().token())},
|
||||
{dht::ring_position::starting_at(key_after_all.token())}));
|
||||
|
||||
test_slice(query::partition_range::make(
|
||||
{partitions.back().token(), false}, {key_after_all.token(), false}));
|
||||
{dht::ring_position::ending_at(partitions.back().token())},
|
||||
{dht::ring_position::starting_at(key_after_all.token())}));
|
||||
|
||||
test_slice(query::partition_range::make(
|
||||
{partitions[0].decorated_key(), false},
|
||||
|
||||
@@ -72,12 +72,15 @@ BOOST_AUTO_TEST_CASE(test_ring_position_is_comparable_with_decorated_key) {
|
||||
|
||||
BOOST_REQUIRE(k1._token != k2._token); // The rest of the test assumes that.
|
||||
|
||||
BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position(k1._token)) == 0);
|
||||
BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position::starting_at(k1._token)) > 0);
|
||||
BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position::ending_at(k1._token)) < 0);
|
||||
BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position(k1)) == 0);
|
||||
|
||||
BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position(k2._token)) < 0);
|
||||
BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position::starting_at(k2._token)) < 0);
|
||||
BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position::ending_at(k2._token)) < 0);
|
||||
BOOST_REQUIRE(k1.tri_compare(*s, dht::ring_position(k2)) < 0);
|
||||
|
||||
BOOST_REQUIRE(k2.tri_compare(*s, dht::ring_position(k1._token)) > 0);
|
||||
BOOST_REQUIRE(k2.tri_compare(*s, dht::ring_position::starting_at(k1._token)) > 0);
|
||||
BOOST_REQUIRE(k2.tri_compare(*s, dht::ring_position::ending_at(k1._token)) > 0);
|
||||
BOOST_REQUIRE(k2.tri_compare(*s, dht::ring_position(k1)) > 0);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user