Convert ring_position and token ranges to be nonwrapping

Wrapping ranges are a pain, so we are moving wrap handling to the edges.

Since cql can't generate wrapping ranges, this means thrift and the ring
maintenance code; also range->ring transformations need to merge the first
and last ranges.

Message-Id: <1478105905-31613-1-git-send-email-avi@scylladb.com>
This commit is contained in:
Avi Kivity
2016-11-02 18:58:25 +02:00
parent 8c55c99353
commit a35136533d
40 changed files with 507 additions and 556 deletions

View File

@@ -572,11 +572,6 @@ column_family::make_reader(schema_ptr s,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state) const {
if (query::is_wrap_around(range, *s)) {
// make_combined_reader() can't handle streams that wrap around yet.
fail(unimplemented::cause::WRAP_AROUND);
}
std::vector<mutation_reader> readers;
readers.reserve(_memtables->size() + 1);
@@ -618,10 +613,6 @@ column_family::make_streaming_reader(schema_ptr s,
const query::partition_range& range) const {
auto& slice = query::full_slice;
auto& pc = service::get_local_streaming_read_priority();
if (query::is_wrap_around(range, *s)) {
// make_combined_reader() can't handle streams that wrap around yet.
fail(unimplemented::cause::WRAP_AROUND);
}
std::vector<mutation_reader> readers;
readers.reserve(_memtables->size() + 1);
@@ -1357,13 +1348,13 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
}
static bool needs_cleanup(const lw_shared_ptr<sstables::sstable>& sst,
const lw_shared_ptr<std::vector<range<dht::token>>>& owned_ranges,
const lw_shared_ptr<std::vector<nonwrapping_range<dht::token>>>& owned_ranges,
schema_ptr s) {
auto first = sst->get_first_partition_key();
auto last = sst->get_last_partition_key();
auto first_token = dht::global_partitioner().get_token(*s, first);
auto last_token = dht::global_partitioner().get_token(*s, last);
range<dht::token> sst_token_range = range<dht::token>::make(first_token, last_token);
nonwrapping_range<dht::token> sst_token_range = nonwrapping_range<dht::token>::make(first_token, last_token);
// return true iff sst partition range isn't fully contained in any of the owned ranges.
for (auto& r : *owned_ranges) {
@@ -1375,8 +1366,8 @@ static bool needs_cleanup(const lw_shared_ptr<sstables::sstable>& sst,
}
future<> column_family::cleanup_sstables(sstables::compaction_descriptor descriptor) {
std::vector<range<dht::token>> r = service::get_local_storage_service().get_local_ranges(_schema->ks_name());
auto owned_ranges = make_lw_shared<std::vector<range<dht::token>>>(std::move(r));
std::vector<nonwrapping_range<dht::token>> r = service::get_local_storage_service().get_local_ranges(_schema->ks_name());
auto owned_ranges = make_lw_shared<std::vector<nonwrapping_range<dht::token>>>(std::move(r));
auto sstables_to_cleanup = make_lw_shared<std::vector<sstables::shared_sstable>>(std::move(descriptor.sstables));
return parallel_for_each(*sstables_to_cleanup, [this, owned_ranges = std::move(owned_ranges), sstables_to_cleanup] (auto& sst) {

View File

@@ -71,25 +71,16 @@ static std::vector<db::system_keyspace::range_estimates> estimates_for(const col
std::vector<db::system_keyspace::range_estimates> estimates;
estimates.reserve(local_ranges.size());
std::vector<query::partition_range> unwrapped;
// Each range defines both bounds.
for (auto& range : local_ranges) {
// Each range defines both bounds (with the call to compat::wrap)
for (auto& range : compat::wrap(local_ranges)) {
int64_t count{0};
utils::estimated_histogram hist{0};
unwrapped.clear();
if (range.is_wrap_around(dht::ring_position_comparator(*cf.schema()))) {
auto uw = range.unwrap();
unwrapped.push_back(std::move(uw.first));
unwrapped.push_back(std::move(uw.second));
} else {
unwrapped.push_back(range);
}
for (auto&& uwr : unwrapped) {
for (auto&& sstable : cf.select_sstables(uwr)) {
nonwrapping_range<dht::token> r(std::move(uwr).transform([](auto&& rp) { return rp.token(); }));
count += sstable->estimated_keys_for_range(r);
hist.merge(sstable->get_stats_metadata().estimated_row_size);
}
for (auto uwr : std::vector<query::partition_range>(compat::unwrap(range, *cf.schema()))) {
for (auto&& sstable : cf.select_sstables(uwr)) {
nonwrapping_range<dht::token> r(std::move(uwr).transform([](auto&& rp) { return rp.token(); }));
count += sstable->estimated_keys_for_range(r);
hist.merge(sstable->get_stats_metadata().estimated_row_size);
}
}
estimates.emplace_back(db::system_keyspace::range_estimates{
range.start()->value().token(),
@@ -97,7 +88,6 @@ static std::vector<db::system_keyspace::range_estimates> estimates_for(const col
count,
count > 0 ? hist.mean() : 0});
}
return estimates;
}

View File

@@ -54,7 +54,7 @@ future<> boot_strapper::bootstrap() {
for (const auto& keyspace_name : _db.local().get_non_system_keyspaces()) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strategy = ks.get_replication_strategy();
std::vector<range<token>> ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address);
std::vector<nonwrapping_range<token>> ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address);
logger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges);
streamer->add_ranges(keyspace_name, ranges);
}

View File

@@ -297,9 +297,9 @@ int ring_position::tri_compare(const schema& s, const ring_position& o) const {
}
}
range<ring_position>
to_partition_range(range<dht::token> r) {
using bound_opt = std::experimental::optional<range<ring_position>::bound>;
nonwrapping_range<ring_position>
to_partition_range(nonwrapping_range<dht::token> r) {
using bound_opt = std::experimental::optional<nonwrapping_range<ring_position>::bound>;
auto start = r.start()
? bound_opt(dht::ring_position(r.start()->value(),
r.start()->is_inclusive()

View File

@@ -422,7 +422,7 @@ i_partitioner& global_partitioner();
unsigned shard_of(const token&);
range<ring_position> to_partition_range(range<dht::token>);
nonwrapping_range<ring_position> to_partition_range(nonwrapping_range<dht::token>);
} // dht

View File

@@ -53,22 +53,22 @@ logging::logger logger("range_streamer");
using inet_address = gms::inet_address;
static std::unordered_map<range<token>, std::unordered_set<inet_address>>
unordered_multimap_to_unordered_map(const std::unordered_multimap<range<token>, inet_address>& multimap) {
std::unordered_map<range<token>, std::unordered_set<inet_address>> ret;
static std::unordered_map<nonwrapping_range<token>, std::unordered_set<inet_address>>
unordered_multimap_to_unordered_map(const std::unordered_multimap<nonwrapping_range<token>, inet_address>& multimap) {
std::unordered_map<nonwrapping_range<token>, std::unordered_set<inet_address>> ret;
for (auto x : multimap) {
ret[x.first].emplace(x.second);
}
return ret;
}
std::unordered_multimap<inet_address, range<token>>
range_streamer::get_range_fetch_map(const std::unordered_multimap<range<token>, inet_address>& ranges_with_sources,
std::unordered_multimap<inet_address, nonwrapping_range<token>>
range_streamer::get_range_fetch_map(const std::unordered_multimap<nonwrapping_range<token>, inet_address>& ranges_with_sources,
const std::unordered_set<std::unique_ptr<i_source_filter>>& source_filters,
const sstring& keyspace) {
std::unordered_multimap<inet_address, range<token>> range_fetch_map_map;
std::unordered_multimap<inet_address, nonwrapping_range<token>> range_fetch_map_map;
for (auto x : unordered_multimap_to_unordered_map(ranges_with_sources)) {
const range<token>& range_ = x.first;
const nonwrapping_range<token>& range_ = x.first;
const std::unordered_set<inet_address>& addresses = x.second;
bool found_source = false;
for (auto address : addresses) {
@@ -103,8 +103,8 @@ range_streamer::get_range_fetch_map(const std::unordered_multimap<range<token>,
return range_fetch_map_map;
}
std::unordered_multimap<range<token>, inet_address>
range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector<range<token>> desired_ranges) {
std::unordered_multimap<nonwrapping_range<token>, inet_address>
range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> desired_ranges) {
logger.debug("{} ks={}", __func__, keyspace_name);
auto& ks = _db.local().find_keyspace(keyspace_name);
@@ -113,7 +113,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, st
auto tm = _metadata.clone_only_token_map();
auto range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(tm));
std::unordered_multimap<range<token>, inet_address> range_sources;
std::unordered_multimap<nonwrapping_range<token>, inet_address> range_sources;
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
for (auto& desired_range : desired_ranges) {
auto found = false;
@@ -137,8 +137,8 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, st
return range_sources;
}
std::unordered_multimap<range<token>, inet_address>
range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector<range<token>> desired_ranges) {
std::unordered_multimap<nonwrapping_range<token>, inet_address>
range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> desired_ranges) {
logger.debug("{} ks={}", __func__, keyspace_name);
assert (_tokens.empty() == false);
@@ -154,7 +154,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
auto pending_range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(metadata_clone));
//Collects the source that will have its range moved to the new node
std::unordered_multimap<range<token>, inet_address> range_sources;
std::unordered_multimap<nonwrapping_range<token>, inet_address> range_sources;
for (auto& desired_range : desired_ranges) {
for (auto& x : range_addresses) {
@@ -211,7 +211,7 @@ bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name)
&& _metadata.get_all_endpoints().size() != strat.get_replication_factor();
}
void range_streamer::add_ranges(const sstring& keyspace_name, std::vector<range<token>> ranges) {
void range_streamer::add_ranges(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> ranges) {
auto ranges_for_keyspace = use_strict_sources_for_ranges(keyspace_name)
? get_all_ranges_with_strict_sources_for(keyspace_name, ranges)
: get_all_ranges_with_sources_for(keyspace_name, ranges);
@@ -222,7 +222,7 @@ void range_streamer::add_ranges(const sstring& keyspace_name, std::vector<range<
}
}
std::unordered_map<inet_address, std::vector<range<token>>> range_fetch_map;
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>> range_fetch_map;
for (auto& x : get_range_fetch_map(ranges_for_keyspace, _source_filters, keyspace_name)) {
range_fetch_map[x.first].emplace_back(x.second);
}
@@ -252,8 +252,8 @@ future<streaming::stream_state> range_streamer::fetch_async() {
return _stream_plan.execute();
}
std::unordered_multimap<inet_address, range<token>>
range_streamer::get_work_map(const std::unordered_multimap<range<token>, inet_address>& ranges_with_source_target,
std::unordered_multimap<inet_address, nonwrapping_range<token>>
range_streamer::get_work_map(const std::unordered_multimap<nonwrapping_range<token>, inet_address>& ranges_with_source_target,
const sstring& keyspace) {
auto filter = std::make_unique<dht::range_streamer::failure_detector_source_filter>(gms::get_local_failure_detector());
std::unordered_set<std::unique_ptr<i_source_filter>> source_filters;

View File

@@ -118,22 +118,22 @@ public:
_source_filters.emplace(std::move(filter));
}
void add_ranges(const sstring& keyspace_name, std::vector<range<token>> ranges);
void add_ranges(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> ranges);
private:
bool use_strict_sources_for_ranges(const sstring& keyspace_name);
/**
* Get a map of all ranges and their respective sources that are candidates for streaming the given ranges
* to us. For each range, the list of sources is sorted by proximity relative to the given destAddress.
*/
std::unordered_multimap<range<token>, inet_address>
get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector<range<token>> desired_ranges);
std::unordered_multimap<nonwrapping_range<token>, inet_address>
get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> desired_ranges);
/**
* Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
* consistency.
*/
std::unordered_multimap<range<token>, inet_address>
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector<range<token>> desired_ranges);
std::unordered_multimap<nonwrapping_range<token>, inet_address>
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> desired_ranges);
private:
/**
* @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)
@@ -141,14 +141,14 @@ private:
* here, we always exclude ourselves.
* @return
*/
static std::unordered_multimap<inet_address, range<token>>
get_range_fetch_map(const std::unordered_multimap<range<token>, inet_address>& ranges_with_sources,
static std::unordered_multimap<inet_address, nonwrapping_range<token>>
get_range_fetch_map(const std::unordered_multimap<nonwrapping_range<token>, inet_address>& ranges_with_sources,
const std::unordered_set<std::unique_ptr<i_source_filter>>& source_filters,
const sstring& keyspace);
public:
static std::unordered_multimap<inet_address, range<token>>
get_work_map(const std::unordered_multimap<range<token>, inet_address>& ranges_with_source_target,
static std::unordered_multimap<inet_address, nonwrapping_range<token>>
get_work_map(const std::unordered_multimap<nonwrapping_range<token>, inet_address>& ranges_with_source_target,
const sstring& keyspace);
#if 0
@@ -166,7 +166,7 @@ private:
std::unordered_set<token> _tokens;
inet_address _address;
sstring _description;
std::unordered_multimap<sstring, std::unordered_map<inet_address, std::vector<range<token>>>> _to_fetch;
std::unordered_multimap<sstring, std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>>> _to_fetch;
std::unordered_set<std::unique_ptr<i_source_filter>> _source_filters;
stream_plan _stream_plan;
};

View File

@@ -23,7 +23,9 @@ namespace streaming {
class stream_request {
sstring keyspace;
std::vector<query::range<dht::token>> ranges;
// For compatibility with <= 1.5, we use wrapping ranges
// (though we never send wraparounds; only allow receiving them)
std::vector<range<dht::token>> ranges_compat();
std::vector<sstring> column_families;
};

View File

@@ -22,6 +22,7 @@
#include "locator/abstract_replication_strategy.hh"
#include "utils/class_registrator.hh"
#include "exceptions/exceptions.hh"
#include "stdx.hh"
namespace locator {
@@ -39,19 +40,6 @@ abstract_replication_strategy::abstract_replication_strategy(
, _snitch(snitch)
, _my_type(my_type) {}
static void unwrap_first_range(std::vector<range<token>>& ret) {
if (ret.empty()) {
return;
}
// Make ret contain no wrap-around range by unwrapping the first element.
auto& r = ret.front();
if (r.is_wrap_around(dht::token_comparator())) {
auto split_ranges = r.unwrap();
r = std::move(split_ranges.first);
ret.push_back(std::move(split_ranges.second));
}
}
std::unique_ptr<abstract_replication_strategy> abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, const std::map<sstring, sstring>& config_options) {
assert(locator::i_endpoint_snitch::get_local_snitch_ptr());
try {
@@ -124,95 +112,96 @@ abstract_replication_strategy::get_cached_endpoints() {
return _cached_endpoints;
}
std::vector<range<token>>
static
void
insert_token_range_to_sorted_container_while_unwrapping(
const dht::token& prev_tok,
const dht::token& tok,
std::vector<nonwrapping_range<dht::token>>& ret) {
if (prev_tok < tok) {
ret.emplace_back(
nonwrapping_range<token>::bound(prev_tok, false),
nonwrapping_range<token>::bound(tok, true));
} else {
ret.emplace_back(
nonwrapping_range<token>::bound(prev_tok, false),
stdx::nullopt);
// Insert in front to maintain sorded order
ret.emplace(
ret.begin(),
stdx::nullopt,
nonwrapping_range<token>::bound(tok, true));
}
}
std::vector<nonwrapping_range<token>>
abstract_replication_strategy::get_ranges(inet_address ep) const {
std::vector<range<token>> ret;
std::vector<nonwrapping_range<token>> ret;
auto prev_tok = _token_metadata.sorted_tokens().back();
for (auto tok : _token_metadata.sorted_tokens()) {
for (inet_address a : calculate_natural_endpoints(tok, _token_metadata)) {
if (a == ep) {
ret.emplace_back(
range<token>::bound(prev_tok, false),
range<token>::bound(tok, true));
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
break;
}
}
prev_tok = tok;
}
unwrap_first_range(ret);
return ret;
}
std::vector<range<token>>
std::vector<nonwrapping_range<token>>
abstract_replication_strategy::get_primary_ranges(inet_address ep) {
std::vector<range<token>> ret;
std::vector<nonwrapping_range<token>> ret;
auto prev_tok = _token_metadata.sorted_tokens().back();
for (auto tok : _token_metadata.sorted_tokens()) {
auto&& eps = calculate_natural_endpoints(tok, _token_metadata);
if (eps.size() > 0 && eps[0] == ep) {
ret.emplace_back(
range<token>::bound(prev_tok, false),
range<token>::bound(tok, true));
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
}
prev_tok = tok;
}
unwrap_first_range(ret);
return ret;
}
std::unordered_multimap<inet_address, range<token>>
std::unordered_multimap<inet_address, nonwrapping_range<token>>
abstract_replication_strategy::get_address_ranges(token_metadata& tm) const {
std::unordered_multimap<inet_address, range<token>> ret;
std::unordered_multimap<inet_address, nonwrapping_range<token>> ret;
for (auto& t : tm.sorted_tokens()) {
range<token> r = tm.get_primary_range_for(t);
std::vector<nonwrapping_range<token>> r = tm.get_primary_ranges_for(t);
auto eps = calculate_natural_endpoints(t, tm);
logger.debug("token={}, primary_range={}, address={}", t, r, eps);
bool wrap = r.is_wrap_around(dht::token_comparator());
if (wrap) {
auto split_ranges = r.unwrap();
for (auto ep : eps) {
ret.emplace(ep, split_ranges.first);
ret.emplace(ep, split_ranges.second);
}
} else {
for (auto ep : eps) {
ret.emplace(ep, r);
for (auto ep : eps) {
for (auto&& rng : r) {
ret.emplace(ep, rng);
}
}
}
return ret;
}
std::unordered_multimap<range<token>, inet_address>
std::unordered_multimap<nonwrapping_range<token>, inet_address>
abstract_replication_strategy::get_range_addresses(token_metadata& tm) const {
std::unordered_multimap<range<token>, inet_address> ret;
std::unordered_multimap<nonwrapping_range<token>, inet_address> ret;
for (auto& t : tm.sorted_tokens()) {
range<token> r = tm.get_primary_range_for(t);
std::vector<nonwrapping_range<token>> r = tm.get_primary_ranges_for(t);
auto eps = calculate_natural_endpoints(t, tm);
bool wrap = r.is_wrap_around(dht::token_comparator());
if (wrap) {
auto split_ranges = r.unwrap();
for (auto ep : eps) {
ret.emplace(split_ranges.first, ep);
ret.emplace(split_ranges.second, ep);
}
} else {
for (auto ep : eps) {
ret.emplace(r, ep);
}
for (auto ep : eps) {
for (auto&& rng : r)
ret.emplace(rng, ep);
}
}
return ret;
}
std::vector<range<token>>
std::vector<nonwrapping_range<token>>
abstract_replication_strategy::get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address) {
return get_pending_address_ranges(tm, std::unordered_set<token>{pending_token}, pending_address);
}
std::vector<range<token>>
std::vector<nonwrapping_range<token>>
abstract_replication_strategy::get_pending_address_ranges(token_metadata& tm, std::unordered_set<token> pending_tokens, inet_address pending_address) {
std::vector<range<token>> ret;
std::vector<nonwrapping_range<token>> ret;
auto temp = tm.clone_only_token_map();
temp.update_normal_tokens(pending_tokens, pending_address);
for (auto& x : get_address_ranges(temp)) {

View File

@@ -105,22 +105,22 @@ public:
// The list is sorted, and its elements are non overlapping and non wrap-around.
// It the analogue of Origin's getAddressRanges().get(endpoint).
// This function is not efficient, and not meant for the fast path.
std::vector<range<token>> get_ranges(inet_address ep) const;
std::vector<nonwrapping_range<token>> get_ranges(inet_address ep) const;
// get_primary_ranges() returns the list of "primary ranges" for the given
// endpoint. "Primary ranges" are the ranges that the node is responsible
// for storing replica primarily, which means this is the first node
// returned calculate_natural_endpoints().
// This function is the analogue of Origin's
// StorageService.getPrimaryRangesForEndpoint().
std::vector<range<token>> get_primary_ranges(inet_address ep);
std::vector<nonwrapping_range<token>> get_primary_ranges(inet_address ep);
std::unordered_multimap<inet_address, range<token>> get_address_ranges(token_metadata& tm) const;
std::unordered_multimap<inet_address, nonwrapping_range<token>> get_address_ranges(token_metadata& tm) const;
std::unordered_multimap<range<token>, inet_address> get_range_addresses(token_metadata& tm) const;
std::unordered_multimap<nonwrapping_range<token>, inet_address> get_range_addresses(token_metadata& tm) const;
std::vector<range<token>> get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address);
std::vector<nonwrapping_range<token>> get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address);
std::vector<range<token>> get_pending_address_ranges(token_metadata& tm, std::unordered_set<token> pending_tokens, inet_address pending_address);
std::vector<nonwrapping_range<token>> get_pending_address_ranges(token_metadata& tm, std::unordered_set<token> pending_tokens, inet_address pending_address);
};
}

View File

@@ -25,6 +25,8 @@
#include "locator/snitch_base.hh"
#include "locator/abstract_replication_strategy.hh"
#include "log.hh"
#include "stdx.hh"
#include "partition_range_compat.hh"
#include <unordered_map>
#include <algorithm>
#include <boost/icl/interval.hpp>
@@ -327,18 +329,21 @@ token token_metadata::get_predecessor(token t) {
}
}
std::vector<range<token>> token_metadata::get_primary_ranges_for(std::unordered_set<token> tokens) {
std::vector<range<token>> ranges;
ranges.reserve(tokens.size());
std::vector<nonwrapping_range<token>> token_metadata::get_primary_ranges_for(std::unordered_set<token> tokens) {
std::vector<nonwrapping_range<token>> ranges;
ranges.reserve(tokens.size() + 1); // one of the ranges will wrap
for (auto right : tokens) {
ranges.emplace_back(range<token>::bound(get_predecessor(right), false),
range<token>::bound(right, true));
auto left = get_predecessor(right);
compat::unwrap_into(
wrapping_range<token>(range_bound<token>(left, false), range_bound<token>(right)),
dht::token_comparator(),
[&] (auto&& rng) { ranges.push_back(std::move(rng)); });
}
return ranges;
}
range<token> token_metadata::get_primary_range_for(token right) {
return get_primary_ranges_for({right}).front();
std::vector<nonwrapping_range<token>> token_metadata::get_primary_ranges_for(token right) {
return get_primary_ranges_for(std::unordered_set<token>{right});
}
boost::icl::interval<token>::interval_type
@@ -424,7 +429,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
return;
}
std::unordered_multimap<inet_address, range<token>> address_ranges = strategy.get_address_ranges(*this);
std::unordered_multimap<inet_address, nonwrapping_range<token>> address_ranges = strategy.get_address_ranges(*this);
// FIMXE
// Copy of metadata reflecting the situation after all leave operations are finished.

View File

@@ -609,9 +609,9 @@ public:
}
#endif
public:
std::vector<range<token>> get_primary_ranges_for(std::unordered_set<token> tokens);
std::vector<nonwrapping_range<token>> get_primary_ranges_for(std::unordered_set<token> tokens);
range<token> get_primary_range_for(token right);
std::vector<nonwrapping_range<token>> get_primary_ranges_for(token right);
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
private:

View File

@@ -585,7 +585,7 @@ int main(int ac, char** av) {
api::set_server_stream_manager(ctx).get();
// Start handling REPAIR_CHECKSUM_RANGE messages
net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range<dht::token> range, rpc::optional<repair_checksum> hash_version) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, nonwrapping_range<dht::token> range, rpc::optional<repair_checksum> hash_version) {
auto hv = hash_version ? *hash_version : repair_checksum::legacy;
return do_with(std::move(keyspace), std::move(cf), std::move(range),
[&db, hv] (auto& keyspace, auto& cf, auto& range) {

View File

@@ -339,10 +339,6 @@ memtable::make_reader(schema_ptr s,
const query::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc) {
if (query::is_wrap_around(range, *s)) {
fail(unimplemented::cause::WRAP_AROUND);
}
if (query::is_single_partition(range)) {
const query::ring_position& pos = range.start()->value();
return _read_section(*this, [&] {

View File

@@ -69,6 +69,7 @@
#include "idl/partition_checksum.dist.impl.hh"
#include "rpc/lz4_compressor.hh"
#include "rpc/multi_algo_compressor_factory.hh"
#include "partition_range_compat.hh"
namespace net {
@@ -704,10 +705,15 @@ future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, froz
// STREAM_MUTATION_DONE
void messaging_service::register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo,
UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_DONE, std::move(func));
UUID plan_id, std::vector<nonwrapping_range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_DONE,
[func = std::move(func)] (const rpc::client_info& cinfo,
UUID plan_id, std::vector<wrapping_range<dht::token>> ranges,
UUID cf_id, unsigned dst_cpu_id) mutable {
return func(cinfo, plan_id, compat::unwrap(std::move(ranges)), cf_id, dst_cpu_id);
});
}
future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id) {
future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector<nonwrapping_range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id) {
return send_message_timeout_and_retry<void>(this, messaging_verb::STREAM_MUTATION_DONE, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
plan_id, std::move(ranges), cf_id, dst_cpu_id);
@@ -819,7 +825,7 @@ future<> messaging_service::send_mutation_done(msg_addr id, unsigned shard, resp
return send_message_oneway(this, messaging_verb::MUTATION_DONE, std::move(id), std::move(shard), std::move(response_id));
}
void messaging_service::register_read_data(std::function<future<foreign_ptr<lw_shared_ptr<query::result>>> (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func) {
void messaging_service::register_read_data(std::function<future<foreign_ptr<lw_shared_ptr<query::result>>> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func) {
register_handler(this, net::messaging_verb::READ_DATA, std::move(func));
}
void messaging_service::unregister_read_data() {
@@ -849,7 +855,7 @@ future<utils::UUID> messaging_service::send_schema_check(msg_addr dst) {
return send_message<utils::UUID>(this, net::messaging_verb::SCHEMA_CHECK, dst);
}
void messaging_service::register_read_mutation_data(std::function<future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func) {
void messaging_service::register_read_mutation_data(std::function<future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func) {
register_handler(this, net::messaging_verb::READ_MUTATION_DATA, std::move(func));
}
void messaging_service::unregister_read_mutation_data() {
@@ -859,7 +865,7 @@ future<reconcilable_result> messaging_service::send_read_mutation_data(msg_addr
return send_message_timeout<reconcilable_result>(this, messaging_verb::READ_MUTATION_DATA, std::move(id), timeout, cmd, pr);
}
void messaging_service::register_read_digest(std::function<future<query::result_digest, api::timestamp_type> (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func) {
void messaging_service::register_read_digest(std::function<future<query::result_digest, api::timestamp_type> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func) {
register_handler(this, net::messaging_verb::READ_DIGEST, std::move(func));
}
void messaging_service::unregister_read_digest() {
@@ -897,14 +903,14 @@ future<> messaging_service::send_replication_finished(msg_addr id, inet_address
// Wrapper for REPAIR_CHECKSUM_RANGE
void messaging_service::register_repair_checksum_range(
std::function<future<partition_checksum> (sstring keyspace,
sstring cf, query::range<dht::token> range, rpc::optional<repair_checksum> hash_version)>&& f) {
sstring cf, nonwrapping_range<dht::token> range, rpc::optional<repair_checksum> hash_version)>&& f) {
register_handler(this, messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(f));
}
void messaging_service::unregister_repair_checksum_range() {
_rpc->unregister_handler(messaging_verb::REPAIR_CHECKSUM_RANGE);
}
future<partition_checksum> messaging_service::send_repair_checksum_range(
msg_addr id, sstring keyspace, sstring cf, ::range<dht::token> range, repair_checksum hash_version)
msg_addr id, sstring keyspace, sstring cf, ::nonwrapping_range<dht::token> range, repair_checksum hash_version)
{
return send_message<partition_checksum>(this,
messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(id),

View File

@@ -64,11 +64,17 @@ namespace dht {
}
namespace query {
using partition_range = range<ring_position>;
using partition_range = nonwrapping_range<ring_position>;
class read_command;
class result;
}
namespace compat {
using wrapping_partition_range = wrapping_range<dht::ring_position>;
}
namespace net {
/* All verb handler identifiers */
@@ -230,16 +236,16 @@ public:
void register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool>)>&& func);
future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented);
void register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id);
void register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, std::vector<nonwrapping_range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector<nonwrapping_range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id);
void register_complete_message(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func);
future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id);
// Wrapper for REPAIR_CHECKSUM_RANGE verb
void register_repair_checksum_range(std::function<future<partition_checksum> (sstring keyspace, sstring cf, range<dht::token> range, rpc::optional<repair_checksum> hash_version)>&& func);
void register_repair_checksum_range(std::function<future<partition_checksum> (sstring keyspace, sstring cf, nonwrapping_range<dht::token> range, rpc::optional<repair_checksum> hash_version)>&& func);
void unregister_repair_checksum_range();
future<partition_checksum> send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, range<dht::token> range, repair_checksum hash_version);
future<partition_checksum> send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, nonwrapping_range<dht::token> range, repair_checksum hash_version);
// Wrapper for GOSSIP_ECHO verb
void register_gossip_echo(std::function<future<> ()>&& func);
@@ -292,7 +298,7 @@ public:
// Wrapper for READ_DATA
// Note: WTH is future<foreign_ptr<lw_shared_ptr<query::result>>
void register_read_data(std::function<future<foreign_ptr<lw_shared_ptr<query::result>>> (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func);
void register_read_data(std::function<future<foreign_ptr<lw_shared_ptr<query::result>>> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func);
void unregister_read_data();
future<query::result> send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const query::partition_range& pr);
@@ -307,12 +313,12 @@ public:
future<utils::UUID> send_schema_check(msg_addr);
// Wrapper for READ_MUTATION_DATA
void register_read_mutation_data(std::function<future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func);
void register_read_mutation_data(std::function<future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func);
void unregister_read_mutation_data();
future<reconcilable_result> send_read_mutation_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const query::partition_range& pr);
// Wrapper for READ_DIGEST
void register_read_digest(std::function<future<query::result_digest, api::timestamp_type> (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func);
void register_read_digest(std::function<future<query::result_digest, api::timestamp_type> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func);
void unregister_read_digest();
future<query::result_digest, rpc::optional<api::timestamp_type>> send_read_digest(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const query::partition_range& pr);

175
partition_range_compat.hh Normal file
View File

@@ -0,0 +1,175 @@
/*
* Copyright 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <vector>
#include <boost/range/iterator_range_core.hpp>
#include "range.hh"
#include "dht/i_partitioner.hh"
#include "query-request.hh"
#include "stdx.hh"
namespace compat {
using wrapping_partition_range = wrapping_range<dht::ring_position>;
// unwraps a vector of wrapping ranges into a vector of nonwrapping ranges
// if the vector happens to be sorted by the left bound, it remains sorted
template <typename T, typename Comparator>
std::vector<nonwrapping_range<T>>
unwrap(std::vector<wrapping_range<T>>&& v, Comparator&& cmp) {
std::vector<nonwrapping_range<T>> ret;
ret.reserve(v.size() + 1);
for (auto&& wr : v) {
if (wr.is_wrap_around(cmp)) {
auto&& p = std::move(wr).unwrap();
ret.insert(ret.begin(), nonwrapping_range<T>(std::move(p.first)));
ret.emplace_back(std::move(p.second));
} else {
ret.emplace_back(std::move(wr));
}
}
return ret;
}
// unwraps a vector of wrapping ranges into a vector of nonwrapping ranges
// if the vector happens to be sorted by the left bound, it remains sorted
template <typename T, typename Comparator>
std::vector<nonwrapping_range<T>>
unwrap(const std::vector<wrapping_range<T>>& v, Comparator&& cmp) {
std::vector<nonwrapping_range<T>> ret;
ret.reserve(v.size() + 1);
for (auto&& wr : v) {
if (wr.is_wrap_around(cmp)) {
auto&& p = wr.unwrap();
ret.insert(ret.begin(), nonwrapping_range<T>(p.first));
ret.emplace_back(p.second);
} else {
ret.emplace_back(wr);
}
}
return ret;
}
template <typename T>
std::vector<wrapping_range<T>>
wrap(const std::vector<nonwrapping_range<T>>& v) {
// re-wrap (-inf,x) ... (y, +inf) into (y, x):
if (v.size() >= 2 && !v.front().start() && !v.back().end()) {
auto ret = std::vector<wrapping_range<T>>();
ret.reserve(v.size() - 1);
std::copy(v.begin() + 1, v.end() - 1, std::back_inserter(ret));
ret.emplace_back(v.back().start(), v.front().end());
return ret;
}
return boost::copy_range<std::vector<wrapping_range<T>>>(v);
}
template <typename T>
std::vector<wrapping_range<T>>
wrap(std::vector<nonwrapping_range<T>>&& v) {
// re-wrap (-inf,x) ... (y, +inf) into (y, x):
if (v.size() >= 2 && !v.front().start() && !v.back().end()) {
auto ret = std::vector<wrapping_range<T>>();
ret.reserve(v.size() - 1);
std::move(v.begin() + 1, v.end() - 1, std::back_inserter(ret));
ret.emplace_back(std::move(v.back()).start(), std::move(v.front()).end());
return ret;
}
// want boost::adaptor::moved ...
return boost::copy_range<std::vector<wrapping_range<T>>>(v);
}
inline
std::vector<nonwrapping_range<dht::token>>
unwrap(const std::vector<wrapping_range<dht::token>>& v) {
return unwrap(v, dht::token_comparator());
}
inline
std::vector<nonwrapping_range<dht::token>>
unwrap(std::vector<wrapping_range<dht::token>>&& v) {
return unwrap(std::move(v), dht::token_comparator());
}
class one_or_two_partition_ranges : public std::pair<query::partition_range, stdx::optional<query::partition_range>> {
using pair = std::pair<query::partition_range, stdx::optional<query::partition_range>>;
public:
explicit one_or_two_partition_ranges(query::partition_range&& f)
: pair(std::move(f), stdx::nullopt) {
}
explicit one_or_two_partition_ranges(query::partition_range&& f, query::partition_range&& s)
: pair(std::move(f), std::move(s)) {
}
operator std::vector<query::partition_range>() const & {
auto ret = std::vector<query::partition_range>();
// not reserving, since ret.size() is likely to be 1
ret.push_back(first);
if (second) {
ret.push_back(*second);
}
return ret;
}
operator std::vector<query::partition_range>() && {
auto ret = std::vector<query::partition_range>();
// not reserving, since ret.size() is likely to be 1
ret.push_back(std::move(first));
if (second) {
ret.push_back(std::move(*second));
}
return ret;
}
};
inline
one_or_two_partition_ranges
unwrap(wrapping_partition_range pr, const schema& s) {
if (pr.is_wrap_around(dht::ring_position_comparator(s))) {
auto unw = std::move(pr).unwrap();
// Preserve ring order
return one_or_two_partition_ranges(
query::partition_range(std::move(unw.second)),
query::partition_range(std::move(unw.first)));
} else {
return one_or_two_partition_ranges(query::partition_range(std::move(pr)));
}
}
// Unwraps `range` and calls `func` with its components, with an unwrapped
// range type, as a parameter (once or twice)
template <typename T, typename Comparator, typename Func>
void
unwrap_into(wrapping_range<T>&& range, const Comparator& cmp, Func&& func) {
if (range.is_wrap_around(cmp)) {
auto&& unw = range.unwrap();
// Preserve ring order
func(nonwrapping_range<T>(std::move(unw.second)));
func(nonwrapping_range<T>(std::move(unw.first)));
} else {
func(nonwrapping_range<T>(std::move(range)));
}
}
}

View File

@@ -36,16 +36,11 @@ template <typename T>
using range = wrapping_range<T>;
using ring_position = dht::ring_position;
using partition_range = range<ring_position>;
using partition_range = nonwrapping_range<ring_position>;
using clustering_range = nonwrapping_range<clustering_key_prefix>;
extern const partition_range full_partition_range;
inline
bool is_wrap_around(const query::partition_range& range, const schema& s) {
return range.is_wrap_around(dht::ring_position_comparator(s));
}
inline
bool is_single_partition(const query::partition_range& range) {
return range.is_singular() && range.start()->value().has_key();

View File

@@ -22,6 +22,7 @@
#pragma once
#include "stdx.hh"
#include <list>
#include <experimental/optional>
#include <iostream>
#include <boost/range/algorithm/copy.hpp>
@@ -436,12 +437,17 @@ public:
explicit nonwrapping_range(wrapping_range<T>&& r)
: _range(std::move(r))
{ }
// Can only be called if !r.is_wrap_around().
explicit nonwrapping_range(const wrapping_range<T>& r)
: _range(r)
{ }
operator wrapping_range<T>() const & {
return _range;
}
operator wrapping_range<T>() && {
return std::move(_range);
}
// the point is before the range.
// Comparator must define a total ordering on T.
template<typename Comparator>
@@ -466,6 +472,9 @@ public:
return wrapping_range<T>::greater_than_or_equal(_range.end_bound(), other._range.start_bound(), cmp)
&& wrapping_range<T>::greater_than_or_equal(other._range.end_bound(), _range.start_bound(), cmp);
}
static nonwrapping_range make(bound start, bound end) {
return nonwrapping_range({std::move(start)}, {std::move(end)});
}
static nonwrapping_range make_open_ended_both_sides() {
return {{}, {}};
}

View File

@@ -358,7 +358,7 @@ std::ostream& operator<<(std::ostream& out, const partition_checksum& c) {
// data is coming in).
static future<partition_checksum> checksum_range_shard(database &db,
const sstring& keyspace_name, const sstring& cf_name,
const ::range<dht::token>& range, repair_checksum hash_version) {
const ::nonwrapping_range<dht::token>& range, repair_checksum hash_version) {
auto& cf = db.find_column_family(keyspace_name, cf_name);
return do_with(dht::to_partition_range(range), [&cf, hash_version] (const auto& partition_range) {
auto reader = cf.make_streaming_reader(cf.schema(), partition_range);
@@ -395,7 +395,7 @@ static future<partition_checksum> checksum_range_shard(database &db,
// function is not resolved.
future<partition_checksum> checksum_range(seastar::sharded<database> &db,
const sstring& keyspace, const sstring& cf,
const ::range<dht::token>& range, repair_checksum hash_version) {
const ::nonwrapping_range<dht::token>& range, repair_checksum hash_version) {
unsigned shard_begin = range.start() ?
dht::shard_of(range.start()->value()) : 0;
unsigned shard_end = range.end() ?
@@ -417,7 +417,7 @@ future<partition_checksum> checksum_range(seastar::sharded<database> &db,
static void request_transfer_ranges(seastar::sharded<database>& db,
const sstring& keyspace, const sstring& cf,
const ::range<dht::token>& range,
const ::nonwrapping_range<dht::token>& range,
const std::vector<gms::inet_address>& neighbors_in,
const std::vector<gms::inet_address>& neighbors_out,
streaming::stream_plan& sp_in,
@@ -430,8 +430,8 @@ static void request_transfer_ranges(seastar::sharded<database>& db,
}
}
static void split_and_add(std::vector<::range<dht::token>>& ranges,
const range<dht::token>& range,
static void split_and_add(std::vector<::nonwrapping_range<dht::token>>& ranges,
const nonwrapping_range<dht::token>& range,
uint64_t estimated_partitions, uint64_t target_partitions) {
if (estimated_partitions < target_partitions) {
// We're done, the range is small enough to not be split further
@@ -467,7 +467,7 @@ static thread_local semaphore parallelism_semaphore(parallelism);
// Repair a single cf in a single local range.
// Comparable to RepairJob in Origin.
static future<> repair_cf_range(seastar::sharded<database>& db,
sstring keyspace, sstring cf, ::range<dht::token> range,
sstring keyspace, sstring cf, ::nonwrapping_range<dht::token> range,
std::vector<gms::inet_address>& neighbors,
streaming::stream_plan& sp_in,
streaming::stream_plan& sp_out,
@@ -477,17 +477,9 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
return make_ready_future<>();
}
// The partition iterating code inside checksum_range_shard does not
// support wrap-around ranges, so we need to break at least wrap-
// around ranges.
std::vector<::range<dht::token>> ranges;
if (range.is_wrap_around(dht::token_comparator())) {
auto unwrapped = range.unwrap();
ranges.push_back(unwrapped.first);
ranges.push_back(unwrapped.second);
} else {
ranges.push_back(range);
}
std::vector<::nonwrapping_range<dht::token>> ranges;
ranges.push_back(range);
// Additionally, we want to break up large ranges so they will have
// (approximately) a desired number of rows each.
// FIXME: column_family should have a method to estimate the number of
@@ -507,7 +499,7 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
// fill a vector in advance.
// FIXME: this "100" needs to be a parameter.
uint64_t target_partitions = 100;
std::vector<::range<dht::token>> tosplit;
std::vector<::nonwrapping_range<dht::token>> tosplit;
while (estimated_partitions > target_partitions) {
tosplit.clear();
ranges.swap(tosplit);
@@ -637,7 +629,7 @@ static future<> repair_cf_range(seastar::sharded<database>& db,
// Repair a single local range, multiple column families.
// Comparable to RepairSession in Origin
static future<> repair_range(seastar::sharded<database>& db, sstring keyspace,
::range<dht::token> range, std::vector<sstring>& cfs,
::nonwrapping_range<dht::token> range, std::vector<sstring>& cfs,
const std::vector<sstring>& data_centers,
const std::vector<sstring>& hosts,
streaming::stream_plan& sp_in,
@@ -654,24 +646,24 @@ static future<> repair_range(seastar::sharded<database>& db, sstring keyspace,
});
}
static std::vector<query::range<dht::token>> get_ranges_for_endpoint(
static std::vector<nonwrapping_range<dht::token>> get_ranges_for_endpoint(
database& db, sstring keyspace, gms::inet_address ep) {
auto& rs = db.find_keyspace(keyspace).get_replication_strategy();
return rs.get_ranges(ep);
}
static std::vector<query::range<dht::token>> get_local_ranges(
static std::vector<nonwrapping_range<dht::token>> get_local_ranges(
database& db, sstring keyspace) {
return get_ranges_for_endpoint(db, keyspace, utils::fb_utilities::get_broadcast_address());
}
static std::vector<query::range<dht::token>> get_primary_ranges_for_endpoint(
static std::vector<nonwrapping_range<dht::token>> get_primary_ranges_for_endpoint(
database& db, sstring keyspace, gms::inet_address ep) {
auto& rs = db.find_keyspace(keyspace).get_replication_strategy();
return rs.get_primary_ranges(ep);
}
static std::vector<query::range<dht::token>> get_primary_ranges(
static std::vector<nonwrapping_range<dht::token>> get_primary_ranges(
database& db, sstring keyspace) {
return get_primary_ranges_for_endpoint(db, keyspace,
utils::fb_utilities::get_broadcast_address());
@@ -687,7 +679,7 @@ struct repair_options {
// If ranges is not empty, it overrides the repair's default heuristics
// for determining the list of ranges to repair. In particular, "ranges"
// overrides the setting of "primary_range".
std::vector<query::range<dht::token>> ranges;
std::vector<nonwrapping_range<dht::token>> ranges;
// If start_token and end_token are set, they define a range which is
// intersected with the ranges actually held by this node to decide what
// to repair.
@@ -800,7 +792,7 @@ private:
// A range is expressed as start_token:end token and multiple ranges can
// be given as comma separated ranges(e.g. aaa:bbb,ccc:ddd).
static void ranges_opt(std::vector<query::range<dht::token>>& var,
static void ranges_opt(std::vector<nonwrapping_range<dht::token>>& var,
std::unordered_map<sstring, sstring>& options,
const sstring& key) {
auto it = options.find(key);
@@ -818,9 +810,12 @@ private:
}
auto tok_start = dht::global_partitioner().from_sstring(token_strings[0]);
auto tok_end = dht::global_partitioner().from_sstring(token_strings[1]);
var.emplace_back(
auto rng = wrapping_range<dht::token>(
::range<dht::token>::bound(tok_start, false),
::range<dht::token>::bound(tok_end, true));
compat::unwrap_into(std::move(rng), dht::token_comparator(), [&] (nonwrapping_range<dht::token>&& x) {
var.push_back(std::move(x));
});
}
options.erase(it);
}
@@ -844,7 +839,7 @@ private:
// is assumed to be a indivisible in the sense that all the tokens in has the
// same nodes as replicas.
static future<> repair_ranges(seastar::sharded<database>& db, sstring keyspace,
std::vector<query::range<dht::token>> ranges,
std::vector<nonwrapping_range<dht::token>> ranges,
std::vector<sstring> cfs, int id,
std::vector<sstring> data_centers, std::vector<sstring> hosts) {
return do_with(streaming::stream_plan("repair-in"),
@@ -911,7 +906,7 @@ static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
// local ranges (the token ranges for which this node holds a replica of).
// Each of these ranges may have a different set of replicas, so the
// repair of each range is performed separately with repair_range().
std::vector<query::range<dht::token>> ranges;
std::vector<nonwrapping_range<dht::token>> ranges;
if (options.ranges.size()) {
ranges = options.ranges;
} else if (options.primary_range) {
@@ -954,8 +949,8 @@ static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
dht::global_partitioner().from_sstring(options.end_token),
false);
}
::range<dht::token> given_range_complement(tok_end, tok_start);
std::vector<query::range<dht::token>> intersections;
nonwrapping_range<dht::token> given_range_complement(tok_end, tok_start);
std::vector<nonwrapping_range<dht::token>> intersections;
for (const auto& range : ranges) {
auto rs = range.subtract(given_range_complement,
dht::token_comparator());

View File

@@ -107,4 +107,4 @@ public:
// not resolved.
future<partition_checksum> checksum_range(seastar::sharded<database> &db,
const sstring& keyspace, const sstring& cf,
const ::range<dht::token>& range, repair_checksum rt);
const ::nonwrapping_range<dht::token>& range, repair_checksum rt);

View File

@@ -628,10 +628,6 @@ row_cache::make_scanning_reader(schema_ptr s,
const io_priority_class& pc,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state) {
if (range.is_wrap_around(dht::ring_position_comparator(*s))) {
warn(unimplemented::cause::WRAP_AROUND);
throw std::runtime_error("row_cache doesn't support wrap-around ranges");
}
return make_mutation_reader<scanning_and_populating_reader>(std::move(s), *this, range, slice, pc, std::move(trace_state));
}
@@ -881,13 +877,7 @@ return _populate_phaser.advance_and_await().then([this, &dk] {
future<> row_cache::invalidate(const query::partition_range& range) {
return _populate_phaser.advance_and_await().then([this, &range] {
with_linearized_managed_bytes([&] {
if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) {
auto unwrapped = range.unwrap();
invalidate_unwrapped(unwrapped.first);
invalidate_unwrapped(unwrapped.second);
} else {
invalidate_unwrapped(range);
}
invalidate_unwrapped(range);
});
});
}

View File

@@ -39,6 +39,7 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "partition_range_compat.hh"
#include "db/consistency_level.hh"
#include "db/commitlog/commitlog.hh"
#include "storage_proxy.hh"
@@ -2905,7 +2906,7 @@ get_restricted_ranges(locator::token_metadata& tm, const schema& s, query::parti
dht::ring_position_comparator cmp(s);
// special case for bounds containing exactly 1 token
if (start_token(range) == end_token(range) && !range.is_wrap_around(cmp)) {
if (start_token(range) == end_token(range)) {
if (start_token(range).is_minimum()) {
return {};
}
@@ -2915,13 +2916,7 @@ get_restricted_ranges(locator::token_metadata& tm, const schema& s, query::parti
std::vector<query::partition_range> ranges;
auto add_range = [&ranges, &cmp] (query::partition_range&& r) {
if (r.is_wrap_around(cmp)) {
auto unwrapped = r.unwrap();
ranges.emplace_back(std::move(unwrapped.second)); // Append in split order
ranges.emplace_back(std::move(unwrapped.first));
} else {
ranges.emplace_back(std::move(r));
}
ranges.emplace_back(std::move(r));
};
// divide the queryRange into pieces delimited by the ring
@@ -3269,7 +3264,7 @@ void storage_proxy::init_messaging_service() {
return net::messaging_service::no_wait();
});
});
ms.register_read_data([] (const rpc::client_info& cinfo, query::read_command cmd, query::partition_range pr) {
ms.register_read_data([] (const rpc::client_info& cinfo, query::read_command cmd, compat::wrapping_partition_range pr) {
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = net::messaging_service::get_source(cinfo);
if (cmd.trace_info) {
@@ -3278,16 +3273,21 @@ void storage_proxy::init_messaging_service() {
tracing::trace(trace_state_ptr, "read_data: message received from /{}", src_addr.addr);
}
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
auto src_ip = src_addr.addr;
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr] (schema_ptr s) {
return p->query_singular_local(std::move(s), cmd, pr, query::result_request::result_and_digest, trace_state_ptr);
auto pr2 = compat::unwrap(std::move(pr), *s);
if (pr2.second) {
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DATA called with wrapping range");
}
return p->query_singular_local(std::move(s), cmd, std::move(pr2.first), query::result_request::result_and_digest, trace_state_ptr);
}).finally([&trace_state_ptr, src_ip] () mutable {
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
});
});
});
ms.register_read_mutation_data([] (const rpc::client_info& cinfo, query::read_command cmd, query::partition_range pr) {
ms.register_read_mutation_data([] (const rpc::client_info& cinfo, query::read_command cmd, compat::wrapping_partition_range pr) {
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = net::messaging_service::get_source(cinfo);
if (cmd.trace_info) {
@@ -3295,16 +3295,16 @@ void storage_proxy::init_messaging_service() {
tracing::begin(trace_state_ptr);
tracing::trace(trace_state_ptr, "read_mutation_data: message received from /{}", src_addr.addr);
}
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
auto src_ip = src_addr.addr;
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr] (schema_ptr s) {
return p->query_mutations_locally(std::move(s), cmd, pr, trace_state_ptr);
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr] (schema_ptr s) mutable {
return p->query_mutations_locally(std::move(s), cmd, compat::unwrap(std::move(pr), *s), trace_state_ptr);
}).finally([&trace_state_ptr, src_ip] () mutable {
tracing::trace(trace_state_ptr, "read_mutation_data handling is done, sending a response to /{}", src_ip);
});
});
});
ms.register_read_digest([] (const rpc::client_info& cinfo, query::read_command cmd, query::partition_range pr) {
ms.register_read_digest([] (const rpc::client_info& cinfo, query::read_command cmd, compat::wrapping_partition_range pr) {
tracing::trace_state_ptr trace_state_ptr;
auto src_addr = net::messaging_service::get_source(cinfo);
if (cmd.trace_info) {
@@ -3312,10 +3312,15 @@ void storage_proxy::init_messaging_service() {
tracing::begin(trace_state_ptr);
tracing::trace(trace_state_ptr, "read_digest: message received from /{}", src_addr.addr);
}
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr)] (compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
auto src_ip = src_addr.addr;
return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr] (schema_ptr s) {
return p->query_singular_local_digest(std::move(s), cmd, pr, trace_state_ptr);
auto pr2 = compat::unwrap(std::move(pr), *s);
if (pr2.second) {
// this function assumes singular queries but doesn't validate
throw std::runtime_error("READ_DIGEST called with wrapping range");
}
return p->query_singular_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr);
}).finally([&trace_state_ptr, src_ip] () mutable {
tracing::trace(trace_state_ptr, "read_digest handling is done, sending a response to /{}", src_ip);
});
@@ -3460,6 +3465,15 @@ storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_c
return make_foreign(make_lw_shared(std::move(result)));
});
});
} else {
return query_nonsingular_mutations_locally(std::move(s), std::move(cmd), {pr}, std::move(trace_state));
}
}
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const compat::one_or_two_partition_ranges& pr, tracing::trace_state_ptr trace_state) {
if (!pr.second) {
return query_mutations_locally(std::move(s), std::move(cmd), pr.first, std::move(trace_state));
} else {
return query_nonsingular_mutations_locally(std::move(s), std::move(cmd), pr, std::move(trace_state));
}
@@ -3467,7 +3481,7 @@ storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_c
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state) {
storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const std::vector<query::partition_range>& prs, tracing::trace_state_ptr trace_state) {
struct part {
query::partition_range pr;
unsigned shard;
@@ -3479,14 +3493,10 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<q
parts.push_back(part{pr, shard, priority});
}
};
if (pr.is_wrap_around(dht::ring_position_comparator(*s))) {
parts.reserve(smp::count * 2);
auto unw = pr.unwrap();
shard_nonwrapped_partition_range(unw.first, 0);
shard_nonwrapped_partition_range(unw.second, 1);
} else {
parts.reserve(smp::count);
shard_nonwrapped_partition_range(pr, 0);
parts.reserve(smp::count * prs.size());
auto priority = 0u;
for (auto&& pr : prs) {
shard_nonwrapped_partition_range(pr, priority++);
}
return do_with(std::move(parts), [this, s, cmd, trace_state] (std::vector<part>& parts) mutable {
auto query_part = [this, cmd, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (part p) mutable {

View File

@@ -52,6 +52,12 @@
#include "utils/estimated_histogram.hh"
#include "tracing/trace_state.hh"
namespace compat {
class one_or_two_partition_ranges;
}
namespace service {
class abstract_write_response_handler;
@@ -243,7 +249,7 @@ private:
template<typename Range>
future<> mutate_internal(Range mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state);
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_nonsingular_mutations_locally(
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state);
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const std::vector<query::partition_range>& pr, tracing::trace_state_ptr trace_state);
public:
storage_proxy(distributed<database>& db);
@@ -314,6 +320,16 @@ public:
schema_ptr, lw_shared_ptr<query::read_command> cmd, const query::partition_range&,
tracing::trace_state_ptr trace_state = nullptr);
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_mutations_locally(
schema_ptr, lw_shared_ptr<query::read_command> cmd, const compat::one_or_two_partition_ranges&,
tracing::trace_state_ptr trace_state = nullptr);
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>> query_mutations_locally(
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const std::vector<query::partition_range>& pr,
tracing::trace_state_ptr trace_state = nullptr);
future<> stop();
const stats& get_stats() const {

View File

@@ -2068,7 +2068,7 @@ future<> storage_service::removenode(sstring host_id_string) {
// get all ranges that change ownership (that is, a node needs
// to take responsibility for new range)
std::unordered_multimap<range<token>, inet_address> changed_ranges =
std::unordered_multimap<nonwrapping_range<token>, inet_address> changed_ranges =
ss.get_changed_ranges_for_leaving(keyspace_name, endpoint);
auto& fd = gms::get_local_failure_detector();
for (auto& x: changed_ranges) {
@@ -2286,13 +2286,13 @@ future<bool> storage_service::is_initialized() {
});
}
std::unordered_multimap<range<token>, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
std::unordered_multimap<nonwrapping_range<token>, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
// First get all ranges the leaving endpoint is responsible for
auto ranges = get_ranges_for_endpoint(keyspace_name, endpoint);
logger.debug("Node {} ranges [{}]", endpoint, ranges);
std::unordered_map<range<token>, std::vector<inet_address>> current_replica_endpoints;
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> current_replica_endpoints;
// Find (for each range) all nodes that store replicas for these ranges as well
auto metadata = _token_metadata.clone_only_token_map(); // don't do this in the loop! #7758
@@ -2311,7 +2311,7 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
temp.remove_endpoint(endpoint);
}
std::unordered_multimap<range<token>, inet_address> changed_ranges;
std::unordered_multimap<nonwrapping_range<token>, inet_address> changed_ranges;
// Go through the ranges and for each range check who will be
// storing replicas for these ranges when the leaving endpoint
@@ -2325,7 +2325,7 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
auto rg = current_replica_endpoints.equal_range(r);
for (auto it = rg.first; it != rg.second; it++) {
const range<token>& range_ = it->first;
const nonwrapping_range<token>& range_ = it->first;
std::vector<inet_address>& current_eps = it->second;
logger.debug("range={}, current_replica_endpoints={}, new_replica_endpoints={}", range_, current_eps, new_replica_endpoints);
for (auto ep : it->second) {
@@ -2352,7 +2352,7 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
// Runs inside seastar::async context
void storage_service::unbootstrap() {
std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> ranges_to_stream;
std::unordered_map<sstring, std::unordered_multimap<nonwrapping_range<token>, inet_address>> ranges_to_stream;
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
@@ -2393,21 +2393,21 @@ void storage_service::unbootstrap() {
}
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
std::unordered_multimap<sstring, std::unordered_map<inet_address, std::vector<range<token>>>> ranges_to_fetch;
std::unordered_multimap<sstring, std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>>> ranges_to_fetch;
auto my_address = get_broadcast_address();
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
std::unordered_multimap<range<token>, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint);
std::vector<range<token>> my_new_ranges;
std::unordered_multimap<nonwrapping_range<token>, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint);
std::vector<nonwrapping_range<token>> my_new_ranges;
for (auto& x : changed_ranges) {
if (x.second == my_address) {
my_new_ranges.emplace_back(x.first);
}
}
std::unordered_multimap<inet_address, range<token>> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges);
std::unordered_map<inet_address, std::vector<range<token>>> tmp;
std::unordered_multimap<inet_address, nonwrapping_range<token>> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges);
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>> tmp;
for (auto& x : source_ranges) {
tmp[x.first].emplace_back(x.second);
}
@@ -2416,7 +2416,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
auto sp = make_lw_shared<streaming::stream_plan>("Restore replica count");
for (auto& x: ranges_to_fetch) {
const sstring& keyspace_name = x.first;
std::unordered_map<inet_address, std::vector<range<token>>>& maps = x.second;
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>>& maps = x.second;
for (auto& m : maps) {
auto source = m.first;
auto ranges = m.second;
@@ -2517,9 +2517,9 @@ void storage_service::leave_ring() {
}
future<>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> ranges_to_stream_by_keyspace) {
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<nonwrapping_range<token>, inet_address>> ranges_to_stream_by_keyspace) {
// First, we build a list of ranges to stream to each host, per table
std::unordered_map<sstring, std::unordered_map<inet_address, std::vector<range<token>>>> sessions_to_stream_by_keyspace;
std::unordered_map<sstring, std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>>> sessions_to_stream_by_keyspace;
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
@@ -2528,9 +2528,9 @@ storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multim
continue;
}
std::unordered_map<inet_address, std::vector<range<token>>> ranges_per_endpoint;
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>> ranges_per_endpoint;
for (auto& end_point_entry : ranges_with_endpoints) {
range<token> r = end_point_entry.first;
nonwrapping_range<token> r = end_point_entry.first;
inet_address endpoint = end_point_entry.second;
ranges_per_endpoint[endpoint].emplace_back(r);
}
@@ -2585,7 +2585,7 @@ future<> storage_service::stream_hints() {
auto hints_destination_host = candidates.front();
// stream all hints -- range list will be a singleton of "the entire ring"
std::vector<range<token>> ranges = {range<token>::make_open_ended_both_sides()};
std::vector<nonwrapping_range<token>> ranges = {nonwrapping_range<token>::make_open_ended_both_sides()};
logger.debug("stream_hints: ranges={}", ranges);
auto sp = make_lw_shared<streaming::stream_plan>("Hints");
@@ -2755,15 +2755,15 @@ future<> storage_service::shutdown_client_servers() {
return do_stop_rpc_server().then([this] { return do_stop_native_transport(); });
}
std::unordered_multimap<inet_address, range<token>>
storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::vector<range<token>>& ranges) {
std::unordered_multimap<inet_address, nonwrapping_range<token>>
storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::vector<nonwrapping_range<token>>& ranges) {
auto my_address = get_broadcast_address();
auto& fd = gms::get_local_failure_detector();
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strat = ks.get_replication_strategy();
auto tm = _token_metadata.clone_only_token_map();
std::unordered_multimap<range<token>, inet_address> range_addresses = strat.get_range_addresses(tm);
std::unordered_multimap<inet_address, range<token>> source_ranges;
std::unordered_multimap<nonwrapping_range<token>, inet_address> range_addresses = strat.get_range_addresses(tm);
std::unordered_multimap<inet_address, nonwrapping_range<token>> source_ranges;
// find alive sources for our new ranges
for (auto r : ranges) {
@@ -2792,10 +2792,10 @@ storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::
return source_ranges;
}
std::pair<std::unordered_set<range<token>>, std::unordered_set<range<token>>>
storage_service::calculate_stream_and_fetch_ranges(const std::vector<range<token>>& current, const std::vector<range<token>>& updated) {
std::unordered_set<range<token>> to_stream;
std::unordered_set<range<token>> to_fetch;
std::pair<std::unordered_set<nonwrapping_range<token>>, std::unordered_set<nonwrapping_range<token>>>
storage_service::calculate_stream_and_fetch_ranges(const std::vector<nonwrapping_range<token>>& current, const std::vector<nonwrapping_range<token>>& updated) {
std::unordered_set<nonwrapping_range<token>> to_stream;
std::unordered_set<nonwrapping_range<token>> to_fetch;
for (auto r1 : current) {
bool intersect = false;
@@ -2836,7 +2836,7 @@ storage_service::calculate_stream_and_fetch_ranges(const std::vector<range<token
logger.debug("to_fetch = {}", to_fetch);
}
return std::pair<std::unordered_set<range<token>>, std::unordered_set<range<token>>>(to_stream, to_fetch);
return std::pair<std::unordered_set<nonwrapping_range<token>>, std::unordered_set<nonwrapping_range<token>>>(to_stream, to_fetch);
}
void storage_service::range_relocator::calculate_to_from_streams(std::unordered_set<token> new_tokens, std::vector<sstring> keyspace_names) {
@@ -2856,30 +2856,30 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
auto& ks = ss._db.local().find_keyspace(keyspace);
auto& strategy = ks.get_replication_strategy();
// getting collection of the currently used ranges by this keyspace
std::vector<range<token>> current_ranges = ss.get_ranges_for_endpoint(keyspace, local_address);
std::vector<nonwrapping_range<token>> current_ranges = ss.get_ranges_for_endpoint(keyspace, local_address);
// collection of ranges which this node will serve after move to the new token
std::vector<range<token>> updated_ranges = strategy.get_pending_address_ranges(token_meta_clone, new_token, local_address);
std::vector<nonwrapping_range<token>> updated_ranges = strategy.get_pending_address_ranges(token_meta_clone, new_token, local_address);
// ring ranges and endpoints associated with them
// this used to determine what nodes should we ping about range data
std::unordered_multimap<range<token>, inet_address> range_addresses = strategy.get_range_addresses(token_meta_clone);
std::unordered_map<range<token>, std::vector<inet_address>> range_addresses_map;
std::unordered_multimap<nonwrapping_range<token>, inet_address> range_addresses = strategy.get_range_addresses(token_meta_clone);
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> range_addresses_map;
for (auto& x : range_addresses) {
range_addresses_map[x.first].emplace_back(x.second);
}
// calculated parts of the ranges to request/stream from/to nodes in the ring
// std::pair(to_stream, to_fetch)
std::pair<std::unordered_set<range<token>>, std::unordered_set<range<token>>> ranges_per_keyspace =
std::pair<std::unordered_set<nonwrapping_range<token>>, std::unordered_set<nonwrapping_range<token>>> ranges_per_keyspace =
ss.calculate_stream_and_fetch_ranges(current_ranges, updated_ranges);
/**
* In this loop we are going through all ranges "to fetch" and determining
* nodes in the ring responsible for data we are interested in
*/
std::unordered_multimap<range<token>, inet_address> ranges_to_fetch_with_preferred_endpoints;
for (range<token> to_fetch : ranges_per_keyspace.second) {
std::unordered_multimap<nonwrapping_range<token>, inet_address> ranges_to_fetch_with_preferred_endpoints;
for (nonwrapping_range<token> to_fetch : ranges_per_keyspace.second) {
for (auto& x : range_addresses_map) {
const range<token>& r = x.first;
const nonwrapping_range<token>& r = x.first;
std::vector<inet_address>& eps = x.second;
if (r.contains(to_fetch, dht::token_comparator())) {
std::vector<inet_address> endpoints;
@@ -2942,9 +2942,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
}
// calculating endpoints to stream current ranges to if needed
// in some situations node will handle current ranges as part of the new ranges
std::unordered_multimap<inet_address, range<token>> endpoint_ranges;
std::unordered_map<inet_address, std::vector<range<token>>> endpoint_ranges_map;
for (range<token> to_stream : ranges_per_keyspace.first) {
std::unordered_multimap<inet_address, nonwrapping_range<token>> endpoint_ranges;
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>> endpoint_ranges_map;
for (nonwrapping_range<token> to_stream : ranges_per_keyspace.first) {
auto end_token = to_stream.end() ? to_stream.end()->value() : dht::maximum_token();
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone);
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled);
@@ -2973,9 +2973,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
}
// stream requests
std::unordered_multimap<inet_address, range<token>> work =
std::unordered_multimap<inet_address, nonwrapping_range<token>> work =
dht::range_streamer::get_work_map(ranges_to_fetch_with_preferred_endpoints, keyspace);
std::unordered_map<inet_address, std::vector<range<token>>> work_map;
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>> work_map;
for (auto& x : work) {
work_map[x.first].emplace_back(x.second);
}

View File

@@ -195,7 +195,7 @@ public:
}
#endif
public:
std::vector<range<token>> get_local_ranges(const sstring& keyspace_name) {
std::vector<nonwrapping_range<token>> get_local_ranges(const sstring& keyspace_name) {
return get_ranges_for_endpoint(keyspace_name, get_broadcast_address());
}
#if 0
@@ -548,18 +548,18 @@ public:
return map;
}
#endif
std::unordered_map<range<token>, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace) const {
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace) const {
return get_range_to_address_map(keyspace, _token_metadata.sorted_tokens());
}
std::unordered_map<range<token>, std::vector<inet_address>> get_range_to_address_map_in_local_dc(
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> get_range_to_address_map_in_local_dc(
const sstring& keyspace) const {
std::function<bool(const inet_address&)> filter = [this](const inet_address& address) {
return is_local_dc(address);
};
auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc());
std::unordered_map<range<token>, std::vector<inet_address>> filtered_map;
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> filtered_map;
for (auto entry : orig_map) {
auto& addresses = filtered_map[entry.first];
addresses.reserve(entry.second.size());
@@ -585,7 +585,7 @@ public:
return remote_dc == local_dc;
}
std::unordered_map<range<token>, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace,
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace,
const std::vector<token>& sorted_tokens) const {
// some people just want to get a visual representation of things. Allow null and set it to the first
// non-system keyspace.
@@ -625,7 +625,7 @@ public:
std::vector<token_range> ranges;
//Token.TokenFactory tf = getPartitioner().getTokenFactory();
std::unordered_map<range<token>, std::vector<inet_address>> range_to_address_map =
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> range_to_address_map =
include_only_local_dc
? get_range_to_address_map_in_local_dc(keyspace)
: get_range_to_address_map(keyspace);
@@ -650,6 +650,13 @@ public:
}
ranges.push_back(tr);
}
// Convert to wrapping ranges
auto& rf = ranges.front();
auto& rb = ranges.back();
if (rf._start_token.empty() && rb._end_token.empty() && rf._endpoints == rb._endpoints) {
rf._start_token = std::move(rb._start_token);
ranges.pop_back();
}
return ranges;
}
@@ -681,10 +688,10 @@ public:
* @param ranges
* @return mapping of ranges to the replicas responsible for them.
*/
std::unordered_map<range<token>, std::vector<inet_address>> construct_range_to_endpoint_map(
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> construct_range_to_endpoint_map(
const sstring& keyspace,
const std::vector<range<token>>& ranges) const {
std::unordered_map<range<token>, std::vector<inet_address>> res;
const std::vector<nonwrapping_range<token>>& ranges) const {
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> res;
for (auto r : ranges) {
res[r] = _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(r.end()->value());
}
@@ -849,7 +856,7 @@ private:
* @param ranges the ranges to find sources for
* @return multimap of addresses to ranges the address is responsible for
*/
std::unordered_multimap<inet_address, range<token>> get_new_source_ranges(const sstring& keyspaceName, const std::vector<range<token>>& ranges);
std::unordered_multimap<inet_address, nonwrapping_range<token>> get_new_source_ranges(const sstring& keyspaceName, const std::vector<nonwrapping_range<token>>& ranges);
public:
future<> confirm_replication(inet_address node);
@@ -875,7 +882,7 @@ private:
future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint);
// needs to be modified to accept either a keyspace or ARS.
std::unordered_multimap<range<token>, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
std::unordered_multimap<nonwrapping_range<token>, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
public:
/** raw load value */
double get_load();
@@ -1692,7 +1699,7 @@ public:
* @param ep endpoint we are interested in.
* @return ranges for the specified endpoint.
*/
std::vector<range<token>> get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const {
std::vector<nonwrapping_range<token>> get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const {
return _db.local().find_keyspace(name).get_replication_strategy().get_ranges(ep);
}
@@ -1702,19 +1709,18 @@ public:
* ranges.
* @return ranges in sorted order
*/
std::vector<range<token>> get_all_ranges(const std::vector<token>& sorted_tokens) const{
std::vector<nonwrapping_range<token>> get_all_ranges(const std::vector<token>& sorted_tokens) const{
if (sorted_tokens.empty())
return std::vector<range<token>>();
return std::vector<nonwrapping_range<token>>();
int size = sorted_tokens.size();
std::vector<range<token>> ranges;
std::vector<nonwrapping_range<token>> ranges;
ranges.push_back(nonwrapping_range<token>::make_ending_with(range_bound<token>(sorted_tokens[0], true)));
for (int i = 1; i < size; ++i) {
range<token> r(range<token>::bound(sorted_tokens[i - 1], false), range<token>::bound(sorted_tokens[i], true));
nonwrapping_range<token> r(range<token>::bound(sorted_tokens[i - 1], false), range<token>::bound(sorted_tokens[i], true));
ranges.push_back(r);
}
range<token> r(range<token>::bound(sorted_tokens[size - 1], false),
range<token>::bound(sorted_tokens[0], true));
ranges.push_back(r);
ranges.push_back(nonwrapping_range<token>::make_starting_with(range_bound<token>(sorted_tokens[size-1], false)));
return ranges;
}
@@ -2098,7 +2104,7 @@ private:
* @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each
* @return async Future for whether stream was success
*/
future<> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> ranges_to_stream_by_keyspace);
future<> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<nonwrapping_range<token>, inet_address>> ranges_to_stream_by_keyspace);
public:
/**
@@ -2109,8 +2115,8 @@ public:
* @param updated collection of the ranges after token is changed
* @return pair of ranges to stream/fetch for given current and updated range collections
*/
std::pair<std::unordered_set<range<token>>, std::unordered_set<range<token>>>
calculate_stream_and_fetch_ranges(const std::vector<range<token>>& current, const std::vector<range<token>>& updated);
std::pair<std::unordered_set<nonwrapping_range<token>>, std::unordered_set<nonwrapping_range<token>>>
calculate_stream_and_fetch_ranges(const std::vector<nonwrapping_range<token>>& current, const std::vector<nonwrapping_range<token>>& updated);
#if 0
public void bulkLoad(String directory)
{

View File

@@ -96,7 +96,7 @@ static api::timestamp_type get_max_purgeable_timestamp(schema_ptr schema,
return timestamp;
}
static bool belongs_to_current_node(const dht::token& t, const std::vector<range<dht::token>>& sorted_owned_ranges) {
static bool belongs_to_current_node(const dht::token& t, const std::vector<nonwrapping_range<dht::token>>& sorted_owned_ranges) {
auto low = std::lower_bound(sorted_owned_ranges.begin(), sorted_owned_ranges.end(), t,
[] (const range<dht::token>& a, const dht::token& b) {
// check that range a is before token b.
@@ -104,7 +104,7 @@ static bool belongs_to_current_node(const dht::token& t, const std::vector<range
});
if (low != sorted_owned_ranges.end()) {
const range<dht::token>& r = *low;
const nonwrapping_range<dht::token>& r = *low;
return r.contains(t, dht::token_comparator());
}
@@ -267,7 +267,7 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
info->cf = schema->cf_name();
logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg);
std::vector<range<dht::token>> owned_ranges;
std::vector<nonwrapping_range<dht::token>> owned_ranges;
if (cleanup) {
owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name());
}

View File

@@ -1101,9 +1101,6 @@ sstable::read_range_rows(schema_ptr schema,
const query::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc) {
if (query::is_wrap_around(range, *schema)) {
fail(unimplemented::cause::WRAP_AROUND);
}
return std::make_unique<mutation_reader::impl>(
shared_from_this(), std::move(schema), range, slice, pc);
}

View File

@@ -44,22 +44,22 @@ namespace streaming {
extern logging::logger sslog;
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector<query::range<token>> ranges) {
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector<nonwrapping_range<token>> ranges) {
return request_ranges(from, keyspace, std::move(ranges), {});
}
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families) {
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families) {
_range_added = true;
auto session = _coordinator->get_or_create_session(from);
session->add_stream_request(keyspace, std::move(ranges), std::move(column_families));
return *this;
}
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<query::range<token>> ranges) {
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<nonwrapping_range<token>> ranges) {
return transfer_ranges(to, keyspace, std::move(ranges), {});
}
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families) {
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families) {
_range_added = true;
auto session = _coordinator->get_or_create_session(to);
session->add_transfer_ranges(keyspace, std::move(ranges), std::move(column_families));

View File

@@ -90,7 +90,7 @@ public:
* @param ranges ranges to fetch
* @return this object for chaining
*/
stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector<query::range<token>> ranges);
stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector<nonwrapping_range<token>> ranges);
/**
* Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node.
@@ -102,7 +102,7 @@ public:
* @param columnFamilies specific column families
* @return this object for chaining
*/
stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families);
stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families);
/**
* Add transfer task to send data of specific keyspace and ranges.
@@ -113,7 +113,7 @@ public:
* @param ranges ranges to send
* @return this object for chaining
*/
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<query::range<token>> ranges);
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<nonwrapping_range<token>> ranges);
/**
* Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
@@ -125,7 +125,7 @@ public:
* @param columnFamilies specific column families
* @return this object for chaining
*/
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families);
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families);
stream_plan& listeners(std::vector<stream_event_handler*> handlers);
public:

View File

@@ -39,8 +39,9 @@
#pragma once
#include "core/sstring.hh"
#include "query-request.hh"
#include "range.hh"
#include "dht/i_partitioner.hh"
#include "partition_range_compat.hh"
#include <vector>
namespace streaming {
@@ -49,14 +50,21 @@ class stream_request {
public:
using token = dht::token;
sstring keyspace;
std::vector<query::range<token>> ranges;
std::vector<nonwrapping_range<token>> ranges;
// For compatibility with <= 1.5, we send wrapping ranges (though they will never wrap).
std::vector<wrapping_range<token>> ranges_compat() const {
return compat::wrap(ranges);
}
std::vector<sstring> column_families;
stream_request() = default;
stream_request(sstring _keyspace, std::vector<query::range<token>> _ranges, std::vector<sstring> _column_families)
stream_request(sstring _keyspace, std::vector<nonwrapping_range<token>> _ranges, std::vector<sstring> _column_families)
: keyspace(std::move(_keyspace))
, ranges(std::move(_ranges))
, column_families(std::move(_column_families)) {
}
stream_request(sstring _keyspace, std::vector<wrapping_range<token>> _ranges, std::vector<sstring> _column_families)
: stream_request(std::move(_keyspace), compat::unwrap(std::move(_ranges)), std::move(_column_families)) {
}
friend std::ostream& operator<<(std::ostream& os, const stream_request& r);
};

View File

@@ -146,7 +146,7 @@ void stream_session::init_messaging_service_handler() {
});
});
});
ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id) {
ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, std::vector<nonwrapping_range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id) {
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable {
auto session = get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id);
@@ -415,7 +415,7 @@ std::vector<column_family*> stream_session::get_column_family_stores(const sstri
return stores;
}
void stream_session::add_transfer_ranges(sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families) {
void stream_session::add_transfer_ranges(sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families) {
auto cfs = get_column_family_stores(keyspace, column_families);
for (auto& cf : cfs) {
auto cf_id = cf->schema()->id();

View File

@@ -238,7 +238,7 @@ public:
* @param ranges Ranges to retrieve data
* @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace.
*/
void add_stream_request(sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families) {
void add_stream_request(sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families) {
_requests.emplace_back(std::move(keyspace), std::move(ranges), std::move(column_families));
}
@@ -253,7 +253,7 @@ public:
* @param flushTables flush tables?
* @param repairedAt the time the repair started.
*/
void add_transfer_ranges(sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families);
void add_transfer_ranges(sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families);
std::vector<column_family*> get_column_family_stores(const sstring& keyspace, const std::vector<sstring>& column_families);

View File

@@ -55,7 +55,7 @@ namespace streaming {
extern logging::logger sslog;
stream_transfer_task::stream_transfer_task(shared_ptr<stream_session> session, UUID cf_id, std::vector<range<dht::token>> ranges, long total_size)
stream_transfer_task::stream_transfer_task(shared_ptr<stream_session> session, UUID cf_id, std::vector<nonwrapping_range<dht::token>> ranges, long total_size)
: stream_task(session, cf_id)
, _ranges(std::move(ranges))
, _total_size(total_size) {
@@ -171,7 +171,7 @@ void stream_transfer_task::start() {
});
}
void stream_transfer_task::append_ranges(const std::vector<range<dht::token>>& ranges) {
void stream_transfer_task::append_ranges(const std::vector<nonwrapping_range<dht::token>>& ranges) {
_ranges.insert(_ranges.end(), ranges.begin(), ranges.end());
}

View File

@@ -57,12 +57,12 @@ private:
int32_t sequence_number = 0;
bool aborted = false;
// A stream_transfer_task always contains the same range to stream
std::vector<range<dht::token>> _ranges;
std::vector<nonwrapping_range<dht::token>> _ranges;
long _total_size;
public:
using UUID = utils::UUID;
stream_transfer_task(stream_transfer_task&&) = default;
stream_transfer_task(shared_ptr<stream_session> session, UUID cf_id, std::vector<range<dht::token>> ranges, long total_size = 0);
stream_transfer_task(shared_ptr<stream_session> session, UUID cf_id, std::vector<nonwrapping_range<dht::token>> ranges, long total_size = 0);
~stream_transfer_task();
public:
virtual void abort() override {
@@ -78,7 +78,7 @@ public:
void start();
void append_ranges(const std::vector<range<dht::token>>& ranges);
void append_ranges(const std::vector<nonwrapping_range<dht::token>>& ranges);
};
} // namespace streaming

View File

@@ -68,7 +68,7 @@ static void test_range_queries(populate_fn populate) {
auto ds = populate(s, partitions);
auto test_slice = [&] (query::range<dht::ring_position> r) {
auto test_slice = [&] (nonwrapping_range<dht::ring_position> r) {
BOOST_TEST_MESSAGE(sprint("Testing range %s", r));
assert_that(ds(s, r))
.produces(slice(partitions, r))

View File

@@ -49,12 +49,6 @@ static
bool includes_token(const schema& s, const query::partition_range& r, const dht::token& tok) {
dht::ring_position_comparator cmp(s);
if (r.is_wrap_around(cmp)) {
auto sub = r.unwrap();
return includes_token(s, sub.first, tok)
|| includes_token(s, sub.second, tok);
}
return !r.before(dht::ring_position(tok, dht::ring_position::token_bound::end), cmp)
&& !r.after(dht::ring_position(tok, dht::ring_position::token_bound::start), cmp);
}
@@ -73,17 +67,6 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) {
auto key2 = dht::decorated_key{tok,
partition_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("key2"))))};
{
auto r = query::partition_range::make(
dht::ring_position(key2),
dht::ring_position(key1));
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
dht::ring_position(key1),
@@ -92,18 +75,6 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) {
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.is_wrap_around(dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position(key2), false},
{dht::ring_position(key1), false});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(!r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.contains(key2, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
}
{
@@ -114,7 +85,6 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) {
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(!r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.contains(key2, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.is_wrap_around(dht::ring_position_comparator(*s)));
}
{
@@ -125,40 +95,6 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) {
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.is_wrap_around(dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position::ending_at(tok), false},
{dht::ring_position(key2), true});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position::ending_at(tok), false},
{dht::ring_position(key1), true});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.contains(key2, dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position(key1), false},
{dht::ring_position::starting_at(tok), true});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
}
{
@@ -167,124 +103,10 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) {
{dht::ring_position::ending_at(tok), true});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(!r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position::ending_at(tok), true},
{dht::ring_position::starting_at(tok), true});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.contains(key2, dht::ring_position_comparator(*s)));
}
}
BOOST_AUTO_TEST_CASE(test_range_with_equal_value_but_opposite_inclusiveness_is_a_full_wrap_around) {
auto s = schema_builder("ks", "cf")
.with_column("key", bytes_type, column_kind::partition_key)
.with_column("v", bytes_type)
.build();
dht::token tok = dht::global_partitioner().get_random_token();
auto key1 = dht::decorated_key{
tok, partition_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("key1"))))};
auto key2 = dht::decorated_key{
tok, partition_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("key2"))))};
{
auto r = query::partition_range::make(
{dht::ring_position::starting_at(tok), true},
{dht::ring_position::starting_at(tok), false});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position::starting_at(tok), false},
{dht::ring_position::starting_at(tok), true});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position::ending_at(tok), false},
{dht::ring_position::ending_at(tok), true});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position::ending_at(tok), true},
{dht::ring_position::ending_at(tok), false});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position(key1), true},
{dht::ring_position(key1), false});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position(key1), false},
{dht::ring_position(key1), true});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position(key1), false},
{dht::ring_position(key1), false});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s)));
}
{
auto r = query::partition_range::make(
{dht::ring_position(key2), false},
{dht::ring_position(key2), false});
BOOST_REQUIRE(includes_token(*s, r, tok));
BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s)));
BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s)));
BOOST_REQUIRE(!r.contains(key2, dht::ring_position_comparator(*s)));
}
}
BOOST_AUTO_TEST_CASE(test_range_contains) {

View File

@@ -1141,50 +1141,6 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
}
SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) {
return seastar::async([] {
auto s = make_schema();
auto mt = make_lw_shared<memtable>(s);
cache_tracker tracker;
row_cache cache(s, mt->as_data_source(), tracker);
std::vector<mutation> ring = make_ring(s, 8);
for (auto& m : ring) {
cache.populate(m);
}
for (auto& m : ring) {
verify_has(cache, m.decorated_key());
}
// wrap-around
cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get();
verify_does_not_have(cache, ring[0].decorated_key());
verify_does_not_have(cache, ring[1].decorated_key());
verify_has(cache, ring[2].decorated_key());
verify_has(cache, ring[3].decorated_key());
verify_has(cache, ring[4].decorated_key());
verify_has(cache, ring[5].decorated_key());
verify_does_not_have(cache, ring[6].decorated_key());
verify_does_not_have(cache, ring[7].decorated_key());
// not wrap-around
cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get();
verify_does_not_have(cache, ring[0].decorated_key());
verify_does_not_have(cache, ring[1].decorated_key());
verify_has(cache, ring[2].decorated_key());
verify_does_not_have(cache, ring[3].decorated_key());
verify_does_not_have(cache, ring[4].decorated_key());
verify_has(cache, ring[5].decorated_key());
verify_does_not_have(cache, ring[6].decorated_key());
verify_does_not_have(cache, ring[7].decorated_key());
});
}
SEASTAR_TEST_CASE(test_mvcc) {
return seastar::async([] {
auto no_difference = [] (auto& m1, auto& m2) {

View File

@@ -310,7 +310,7 @@ future<> test_range_reads(const dht::token& min, const dht::token& max, std::vec
auto count = make_lw_shared<size_t>(0);
auto expected_size = expected.size();
auto stop = make_lw_shared<bool>(false);
return do_with(query::range<dht::ring_position>::make(dht::ring_position::starting_at(min),
return do_with(nonwrapping_range<dht::ring_position>::make(dht::ring_position::starting_at(min),
dht::ring_position::ending_at(max)), [&, sstp, s] (auto& pr) {
auto mutations = sstp->read_range_rows(s, pr);
return do_until([stop] { return *stop; },

View File

@@ -80,12 +80,6 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
check(tm, query::partition_range({ring[2]}, {ring[3]}), {
query::partition_range({ring[2]}, {ring[3]})
});
check(tm, query::partition_range({ring[4]}, {ring[2]}), {
query::partition_range({ring[4]}, {}),
query::partition_range({}, {dht::ring_position::ending_at(dht::minimum_token())}),
query::partition_range({{dht::ring_position::ending_at(dht::minimum_token()), false}}, {ring[2]})
});
}
{
@@ -118,13 +112,6 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) {
query::partition_range({{ring[2], false}}, {dht::ring_position::ending_at(ring[2].token())}),
query::partition_range({{dht::ring_position::ending_at(ring[2].token()), false}}, {ring[3]})
});
check(tm, query::partition_range({ring[4]}, {ring[3]}), {
query::partition_range({ring[4]}, {dht::ring_position::ending_at(ring[5].token())}),
query::partition_range({{dht::ring_position::ending_at(ring[5].token()), false}}, {}),
query::partition_range({}, {dht::ring_position::ending_at(ring[2].token())}),
query::partition_range({{dht::ring_position::ending_at(ring[2].token()), false}}, {ring[3]}),
});
}
});
});