Convert to use dht::token_range

This commit is contained in:
Asias He
2016-12-14 16:11:27 +08:00
parent 1f06eedb58
commit d1178fa299
33 changed files with 216 additions and 216 deletions

View File

@@ -1362,13 +1362,13 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
}
static bool needs_cleanup(const lw_shared_ptr<sstables::sstable>& sst,
const lw_shared_ptr<std::vector<nonwrapping_range<dht::token>>>& owned_ranges,
const lw_shared_ptr<std::vector<dht::token_range>>& owned_ranges,
schema_ptr s) {
auto first = sst->get_first_partition_key();
auto last = sst->get_last_partition_key();
auto first_token = dht::global_partitioner().get_token(*s, first);
auto last_token = dht::global_partitioner().get_token(*s, last);
nonwrapping_range<dht::token> sst_token_range = nonwrapping_range<dht::token>::make(first_token, last_token);
dht::token_range sst_token_range = dht::token_range::make(first_token, last_token);
// return true iff sst partition range isn't fully contained in any of the owned ranges.
for (auto& r : *owned_ranges) {
@@ -1380,8 +1380,8 @@ static bool needs_cleanup(const lw_shared_ptr<sstables::sstable>& sst,
}
future<> column_family::cleanup_sstables(sstables::compaction_descriptor descriptor) {
std::vector<nonwrapping_range<dht::token>> r = service::get_local_storage_service().get_local_ranges(_schema->ks_name());
auto owned_ranges = make_lw_shared<std::vector<nonwrapping_range<dht::token>>>(std::move(r));
std::vector<dht::token_range> r = service::get_local_storage_service().get_local_ranges(_schema->ks_name());
auto owned_ranges = make_lw_shared<std::vector<dht::token_range>>(std::move(r));
auto sstables_to_cleanup = make_lw_shared<std::vector<sstables::shared_sstable>>(std::move(descriptor.sstables));
return parallel_for_each(*sstables_to_cleanup, [this, owned_ranges = std::move(owned_ranges), sstables_to_cleanup] (auto& sst) {

View File

@@ -85,7 +85,7 @@ public:
return ss.get_local_tokens().then([&ss] (auto&& tokens) {
auto ranges = ss.get_token_metadata().get_primary_ranges_for(std::move(tokens));
std::vector<token_range> local_ranges;
auto to_bytes = [](const stdx::optional<nonwrapping_range<dht::token>::bound>& b) {
auto to_bytes = [](const stdx::optional<dht::token_range::bound>& b) {
assert(b);
return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value()));
};
@@ -230,7 +230,7 @@ private:
/**
* Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables.
*/
static nonwrapping_range<dht::ring_position> as_ring_position_range(nonwrapping_range<dht::token>& r) {
static nonwrapping_range<dht::ring_position> as_ring_position_range(dht::token_range& r) {
stdx::optional<range<dht::ring_position>::bound> start_bound, end_bound;
if (r.start()) {
start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }};
@@ -250,7 +250,7 @@ private:
auto from_bytes = [] (auto& b) {
return dht::global_partitioner().from_sstring(utf8_type->to_string(b));
};
std::vector<nonwrapping_range<dht::token>> ranges;
std::vector<dht::token_range> ranges;
compat::unwrap_into(
wrapping_range<dht::token>({{ from_bytes(r.start) }}, {{ from_bytes(r.end) }}),
dht::token_comparator(),

View File

@@ -54,7 +54,7 @@ future<> boot_strapper::bootstrap() {
for (const auto& keyspace_name : _db.local().get_non_system_keyspaces()) {
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strategy = ks.get_replication_strategy();
std::vector<nonwrapping_range<token>> ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address);
std::vector<dht::token_range> ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address);
logger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges);
streamer->add_ranges(keyspace_name, ranges);
}

View File

@@ -324,7 +324,7 @@ int ring_position::tri_compare(const schema& s, const ring_position& o) const {
}
nonwrapping_range<ring_position>
to_partition_range(nonwrapping_range<dht::token> r) {
to_partition_range(dht::token_range r) {
using bound_opt = std::experimental::optional<nonwrapping_range<ring_position>::bound>;
auto start = r.start()
? bound_opt(dht::ring_position(r.start()->value(),
@@ -356,7 +356,7 @@ split_range_to_shards(nonwrapping_range<ring_position> pr, const schema& s) {
}
std::map<unsigned, std::vector<nonwrapping_range<ring_position>>>
split_ranges_to_shards(const std::vector<nonwrapping_range<dht::token>>& ranges, const schema& s) {
split_ranges_to_shards(const std::vector<dht::token_range>& ranges, const schema& s) {
std::map<unsigned, std::vector<nonwrapping_range<ring_position>>> ret;
for (const auto& range : ranges) {
auto pr = dht::to_partition_range(range);

View File

@@ -471,7 +471,7 @@ public:
stdx::optional<ring_position_range_and_shard> next(const schema& s);
};
nonwrapping_range<ring_position> to_partition_range(nonwrapping_range<dht::token>);
nonwrapping_range<ring_position> to_partition_range(dht::token_range);
// Each shard gets a sorted, disjoint vector of ranges
std::map<unsigned, std::vector<nonwrapping_range<ring_position>>>
@@ -480,7 +480,7 @@ split_range_to_shards(nonwrapping_range<ring_position> pr, const schema& s);
// If input ranges are sorted and disjoint then the ranges for each shard
// are also sorted and disjoint.
std::map<unsigned, std::vector<nonwrapping_range<ring_position>>>
split_ranges_to_shards(const std::vector<nonwrapping_range<dht::token>>& ranges, const schema& s);
split_ranges_to_shards(const std::vector<dht::token_range>& ranges, const schema& s);
} // dht

View File

@@ -53,22 +53,22 @@ logging::logger logger("range_streamer");
using inet_address = gms::inet_address;
static std::unordered_map<nonwrapping_range<token>, std::unordered_set<inet_address>>
unordered_multimap_to_unordered_map(const std::unordered_multimap<nonwrapping_range<token>, inet_address>& multimap) {
std::unordered_map<nonwrapping_range<token>, std::unordered_set<inet_address>> ret;
static std::unordered_map<dht::token_range, std::unordered_set<inet_address>>
unordered_multimap_to_unordered_map(const std::unordered_multimap<dht::token_range, inet_address>& multimap) {
std::unordered_map<dht::token_range, std::unordered_set<inet_address>> ret;
for (auto x : multimap) {
ret[x.first].emplace(x.second);
}
return ret;
}
std::unordered_multimap<inet_address, nonwrapping_range<token>>
range_streamer::get_range_fetch_map(const std::unordered_multimap<nonwrapping_range<token>, inet_address>& ranges_with_sources,
std::unordered_multimap<inet_address, dht::token_range>
range_streamer::get_range_fetch_map(const std::unordered_multimap<dht::token_range, inet_address>& ranges_with_sources,
const std::unordered_set<std::unique_ptr<i_source_filter>>& source_filters,
const sstring& keyspace) {
std::unordered_multimap<inet_address, nonwrapping_range<token>> range_fetch_map_map;
std::unordered_multimap<inet_address, dht::token_range> range_fetch_map_map;
for (auto x : unordered_multimap_to_unordered_map(ranges_with_sources)) {
const nonwrapping_range<token>& range_ = x.first;
const dht::token_range& range_ = x.first;
const std::unordered_set<inet_address>& addresses = x.second;
bool found_source = false;
for (auto address : addresses) {
@@ -103,8 +103,8 @@ range_streamer::get_range_fetch_map(const std::unordered_multimap<nonwrapping_ra
return range_fetch_map_map;
}
std::unordered_multimap<nonwrapping_range<token>, inet_address>
range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> desired_ranges) {
std::unordered_multimap<dht::token_range, inet_address>
range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector<dht::token_range> desired_ranges) {
logger.debug("{} ks={}", __func__, keyspace_name);
auto& ks = _db.local().find_keyspace(keyspace_name);
@@ -113,7 +113,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, st
auto tm = _metadata.clone_only_token_map();
auto range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(tm));
std::unordered_multimap<nonwrapping_range<token>, inet_address> range_sources;
std::unordered_multimap<dht::token_range, inet_address> range_sources;
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
for (auto& desired_range : desired_ranges) {
auto found = false;
@@ -137,8 +137,8 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, st
return range_sources;
}
std::unordered_multimap<nonwrapping_range<token>, inet_address>
range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> desired_ranges) {
std::unordered_multimap<dht::token_range, inet_address>
range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector<dht::token_range> desired_ranges) {
logger.debug("{} ks={}", __func__, keyspace_name);
assert (_tokens.empty() == false);
@@ -154,7 +154,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
auto pending_range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(metadata_clone));
//Collects the source that will have its range moved to the new node
std::unordered_multimap<nonwrapping_range<token>, inet_address> range_sources;
std::unordered_multimap<dht::token_range, inet_address> range_sources;
for (auto& desired_range : desired_ranges) {
for (auto& x : range_addresses) {
@@ -211,7 +211,7 @@ bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name)
&& _metadata.get_all_endpoints().size() != strat.get_replication_factor();
}
void range_streamer::add_ranges(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> ranges) {
void range_streamer::add_ranges(const sstring& keyspace_name, std::vector<dht::token_range> ranges) {
auto ranges_for_keyspace = use_strict_sources_for_ranges(keyspace_name)
? get_all_ranges_with_strict_sources_for(keyspace_name, ranges)
: get_all_ranges_with_sources_for(keyspace_name, ranges);
@@ -222,7 +222,7 @@ void range_streamer::add_ranges(const sstring& keyspace_name, std::vector<nonwra
}
}
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>> range_fetch_map;
std::unordered_map<inet_address, std::vector<dht::token_range>> range_fetch_map;
for (auto& x : get_range_fetch_map(ranges_for_keyspace, _source_filters, keyspace_name)) {
range_fetch_map[x.first].emplace_back(x.second);
}
@@ -252,8 +252,8 @@ future<streaming::stream_state> range_streamer::fetch_async() {
return _stream_plan.execute();
}
std::unordered_multimap<inet_address, nonwrapping_range<token>>
range_streamer::get_work_map(const std::unordered_multimap<nonwrapping_range<token>, inet_address>& ranges_with_source_target,
std::unordered_multimap<inet_address, dht::token_range>
range_streamer::get_work_map(const std::unordered_multimap<dht::token_range, inet_address>& ranges_with_source_target,
const sstring& keyspace) {
auto filter = std::make_unique<dht::range_streamer::failure_detector_source_filter>(gms::get_local_failure_detector());
std::unordered_set<std::unique_ptr<i_source_filter>> source_filters;

View File

@@ -118,22 +118,22 @@ public:
_source_filters.emplace(std::move(filter));
}
void add_ranges(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> ranges);
void add_ranges(const sstring& keyspace_name, std::vector<dht::token_range> ranges);
private:
bool use_strict_sources_for_ranges(const sstring& keyspace_name);
/**
* Get a map of all ranges and their respective sources that are candidates for streaming the given ranges
* to us. For each range, the list of sources is sorted by proximity relative to the given destAddress.
*/
std::unordered_multimap<nonwrapping_range<token>, inet_address>
get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> desired_ranges);
std::unordered_multimap<dht::token_range, inet_address>
get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector<dht::token_range> desired_ranges);
/**
* Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
* consistency.
*/
std::unordered_multimap<nonwrapping_range<token>, inet_address>
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector<nonwrapping_range<token>> desired_ranges);
std::unordered_multimap<dht::token_range, inet_address>
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector<dht::token_range> desired_ranges);
private:
/**
* @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)
@@ -141,14 +141,14 @@ private:
* here, we always exclude ourselves.
* @return
*/
static std::unordered_multimap<inet_address, nonwrapping_range<token>>
get_range_fetch_map(const std::unordered_multimap<nonwrapping_range<token>, inet_address>& ranges_with_sources,
static std::unordered_multimap<inet_address, dht::token_range>
get_range_fetch_map(const std::unordered_multimap<dht::token_range, inet_address>& ranges_with_sources,
const std::unordered_set<std::unique_ptr<i_source_filter>>& source_filters,
const sstring& keyspace);
public:
static std::unordered_multimap<inet_address, nonwrapping_range<token>>
get_work_map(const std::unordered_multimap<nonwrapping_range<token>, inet_address>& ranges_with_source_target,
static std::unordered_multimap<inet_address, dht::token_range>
get_work_map(const std::unordered_multimap<dht::token_range, inet_address>& ranges_with_source_target,
const sstring& keyspace);
#if 0
@@ -166,7 +166,7 @@ private:
std::unordered_set<token> _tokens;
inet_address _address;
sstring _description;
std::unordered_multimap<sstring, std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>>> _to_fetch;
std::unordered_multimap<sstring, std::unordered_map<inet_address, std::vector<dht::token_range>>> _to_fetch;
std::unordered_set<std::unique_ptr<i_source_filter>> _source_filters;
stream_plan _stream_plan;
};

View File

@@ -117,26 +117,26 @@ void
insert_token_range_to_sorted_container_while_unwrapping(
const dht::token& prev_tok,
const dht::token& tok,
std::vector<nonwrapping_range<dht::token>>& ret) {
std::vector<dht::token_range>& ret) {
if (prev_tok < tok) {
ret.emplace_back(
nonwrapping_range<token>::bound(prev_tok, false),
nonwrapping_range<token>::bound(tok, true));
dht::token_range::bound(prev_tok, false),
dht::token_range::bound(tok, true));
} else {
ret.emplace_back(
nonwrapping_range<token>::bound(prev_tok, false),
dht::token_range::bound(prev_tok, false),
stdx::nullopt);
// Insert in front to maintain sorded order
ret.emplace(
ret.begin(),
stdx::nullopt,
nonwrapping_range<token>::bound(tok, true));
dht::token_range::bound(tok, true));
}
}
std::vector<nonwrapping_range<token>>
std::vector<dht::token_range>
abstract_replication_strategy::get_ranges(inet_address ep) const {
std::vector<nonwrapping_range<token>> ret;
std::vector<dht::token_range> ret;
auto prev_tok = _token_metadata.sorted_tokens().back();
for (auto tok : _token_metadata.sorted_tokens()) {
for (inet_address a : calculate_natural_endpoints(tok, _token_metadata)) {
@@ -150,9 +150,9 @@ abstract_replication_strategy::get_ranges(inet_address ep) const {
return ret;
}
std::vector<nonwrapping_range<token>>
std::vector<dht::token_range>
abstract_replication_strategy::get_primary_ranges(inet_address ep) {
std::vector<nonwrapping_range<token>> ret;
std::vector<dht::token_range> ret;
auto prev_tok = _token_metadata.sorted_tokens().back();
for (auto tok : _token_metadata.sorted_tokens()) {
auto&& eps = calculate_natural_endpoints(tok, _token_metadata);
@@ -164,11 +164,11 @@ abstract_replication_strategy::get_primary_ranges(inet_address ep) {
return ret;
}
std::unordered_multimap<inet_address, nonwrapping_range<token>>
std::unordered_multimap<inet_address, dht::token_range>
abstract_replication_strategy::get_address_ranges(token_metadata& tm) const {
std::unordered_multimap<inet_address, nonwrapping_range<token>> ret;
std::unordered_multimap<inet_address, dht::token_range> ret;
for (auto& t : tm.sorted_tokens()) {
std::vector<nonwrapping_range<token>> r = tm.get_primary_ranges_for(t);
std::vector<dht::token_range> r = tm.get_primary_ranges_for(t);
auto eps = calculate_natural_endpoints(t, tm);
logger.debug("token={}, primary_range={}, address={}", t, r, eps);
for (auto ep : eps) {
@@ -180,11 +180,11 @@ abstract_replication_strategy::get_address_ranges(token_metadata& tm) const {
return ret;
}
std::unordered_multimap<nonwrapping_range<token>, inet_address>
std::unordered_multimap<dht::token_range, inet_address>
abstract_replication_strategy::get_range_addresses(token_metadata& tm) const {
std::unordered_multimap<nonwrapping_range<token>, inet_address> ret;
std::unordered_multimap<dht::token_range, inet_address> ret;
for (auto& t : tm.sorted_tokens()) {
std::vector<nonwrapping_range<token>> r = tm.get_primary_ranges_for(t);
std::vector<dht::token_range> r = tm.get_primary_ranges_for(t);
auto eps = calculate_natural_endpoints(t, tm);
for (auto ep : eps) {
for (auto&& rng : r)
@@ -194,14 +194,14 @@ abstract_replication_strategy::get_range_addresses(token_metadata& tm) const {
return ret;
}
std::vector<nonwrapping_range<token>>
std::vector<dht::token_range>
abstract_replication_strategy::get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address) {
return get_pending_address_ranges(tm, std::unordered_set<token>{pending_token}, pending_address);
}
std::vector<nonwrapping_range<token>>
std::vector<dht::token_range>
abstract_replication_strategy::get_pending_address_ranges(token_metadata& tm, std::unordered_set<token> pending_tokens, inet_address pending_address) {
std::vector<nonwrapping_range<token>> ret;
std::vector<dht::token_range> ret;
auto temp = tm.clone_only_token_map();
temp.update_normal_tokens(pending_tokens, pending_address);
for (auto& x : get_address_ranges(temp)) {

View File

@@ -105,22 +105,22 @@ public:
// The list is sorted, and its elements are non overlapping and non wrap-around.
// It the analogue of Origin's getAddressRanges().get(endpoint).
// This function is not efficient, and not meant for the fast path.
std::vector<nonwrapping_range<token>> get_ranges(inet_address ep) const;
std::vector<dht::token_range> get_ranges(inet_address ep) const;
// get_primary_ranges() returns the list of "primary ranges" for the given
// endpoint. "Primary ranges" are the ranges that the node is responsible
// for storing replica primarily, which means this is the first node
// returned calculate_natural_endpoints().
// This function is the analogue of Origin's
// StorageService.getPrimaryRangesForEndpoint().
std::vector<nonwrapping_range<token>> get_primary_ranges(inet_address ep);
std::vector<dht::token_range> get_primary_ranges(inet_address ep);
std::unordered_multimap<inet_address, nonwrapping_range<token>> get_address_ranges(token_metadata& tm) const;
std::unordered_multimap<inet_address, dht::token_range> get_address_ranges(token_metadata& tm) const;
std::unordered_multimap<nonwrapping_range<token>, inet_address> get_range_addresses(token_metadata& tm) const;
std::unordered_multimap<dht::token_range, inet_address> get_range_addresses(token_metadata& tm) const;
std::vector<nonwrapping_range<token>> get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address);
std::vector<dht::token_range> get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address);
std::vector<nonwrapping_range<token>> get_pending_address_ranges(token_metadata& tm, std::unordered_set<token> pending_tokens, inet_address pending_address);
std::vector<dht::token_range> get_pending_address_ranges(token_metadata& tm, std::unordered_set<token> pending_tokens, inet_address pending_address);
};
}

View File

@@ -329,8 +329,8 @@ token token_metadata::get_predecessor(token t) {
}
}
std::vector<nonwrapping_range<token>> token_metadata::get_primary_ranges_for(std::unordered_set<token> tokens) {
std::vector<nonwrapping_range<token>> ranges;
std::vector<dht::token_range> token_metadata::get_primary_ranges_for(std::unordered_set<token> tokens) {
std::vector<dht::token_range> ranges;
ranges.reserve(tokens.size() + 1); // one of the ranges will wrap
for (auto right : tokens) {
auto left = get_predecessor(right);
@@ -342,7 +342,7 @@ std::vector<nonwrapping_range<token>> token_metadata::get_primary_ranges_for(std
return ranges;
}
std::vector<nonwrapping_range<token>> token_metadata::get_primary_ranges_for(token right) {
std::vector<dht::token_range> token_metadata::get_primary_ranges_for(token right) {
return get_primary_ranges_for(std::unordered_set<token>{right});
}
@@ -452,7 +452,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
return;
}
std::unordered_multimap<inet_address, nonwrapping_range<token>> address_ranges = strategy.get_address_ranges(*this);
std::unordered_multimap<inet_address, dht::token_range> address_ranges = strategy.get_address_ranges(*this);
// FIMXE
// Copy of metadata reflecting the situation after all leave operations are finished.

View File

@@ -609,9 +609,9 @@ public:
}
#endif
public:
std::vector<nonwrapping_range<token>> get_primary_ranges_for(std::unordered_set<token> tokens);
std::vector<dht::token_range> get_primary_ranges_for(std::unordered_set<token> tokens);
std::vector<nonwrapping_range<token>> get_primary_ranges_for(token right);
std::vector<dht::token_range> get_primary_ranges_for(token right);
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
static range<dht::token> interval_to_range(boost::icl::interval<token>::interval_type i);

View File

@@ -605,7 +605,7 @@ int main(int ac, char** av) {
api::set_server_stream_manager(ctx).get();
// Start handling REPAIR_CHECKSUM_RANGE messages
net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, nonwrapping_range<dht::token> range, rpc::optional<repair_checksum> hash_version) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version) {
auto hv = hash_version ? *hash_version : repair_checksum::legacy;
return do_with(std::move(keyspace), std::move(cf), std::move(range),
[&db, hv] (auto& keyspace, auto& cf, auto& range) {

View File

@@ -708,7 +708,7 @@ future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, froz
// STREAM_MUTATION_DONE
void messaging_service::register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo,
UUID plan_id, std::vector<nonwrapping_range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func) {
UUID plan_id, std::vector<dht::token_range> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_DONE,
[func = std::move(func)] (const rpc::client_info& cinfo,
UUID plan_id, std::vector<wrapping_range<dht::token>> ranges,
@@ -716,7 +716,7 @@ void messaging_service::register_stream_mutation_done(std::function<future<> (co
return func(cinfo, plan_id, compat::unwrap(std::move(ranges)), cf_id, dst_cpu_id);
});
}
future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector<nonwrapping_range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id) {
future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector<dht::token_range> ranges, UUID cf_id, unsigned dst_cpu_id) {
return send_message_timeout_and_retry<void>(this, messaging_verb::STREAM_MUTATION_DONE, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
plan_id, std::move(ranges), cf_id, dst_cpu_id);
@@ -906,14 +906,14 @@ future<> messaging_service::send_replication_finished(msg_addr id, inet_address
// Wrapper for REPAIR_CHECKSUM_RANGE
void messaging_service::register_repair_checksum_range(
std::function<future<partition_checksum> (sstring keyspace,
sstring cf, nonwrapping_range<dht::token> range, rpc::optional<repair_checksum> hash_version)>&& f) {
sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version)>&& f) {
register_handler(this, messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(f));
}
void messaging_service::unregister_repair_checksum_range() {
_rpc->unregister_handler(messaging_verb::REPAIR_CHECKSUM_RANGE);
}
future<partition_checksum> messaging_service::send_repair_checksum_range(
msg_addr id, sstring keyspace, sstring cf, ::nonwrapping_range<dht::token> range, repair_checksum hash_version)
msg_addr id, sstring keyspace, sstring cf, ::dht::token_range range, repair_checksum hash_version)
{
return send_message<partition_checksum>(this,
messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(id),

View File

@@ -237,16 +237,16 @@ public:
void register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional<bool>)>&& func);
future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented);
void register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, std::vector<nonwrapping_range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector<nonwrapping_range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id);
void register_stream_mutation_done(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, std::vector<dht::token_range> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector<dht::token_range> ranges, UUID cf_id, unsigned dst_cpu_id);
void register_complete_message(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func);
future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id);
// Wrapper for REPAIR_CHECKSUM_RANGE verb
void register_repair_checksum_range(std::function<future<partition_checksum> (sstring keyspace, sstring cf, nonwrapping_range<dht::token> range, rpc::optional<repair_checksum> hash_version)>&& func);
void register_repair_checksum_range(std::function<future<partition_checksum> (sstring keyspace, sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version)>&& func);
void unregister_repair_checksum_range();
future<partition_checksum> send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, nonwrapping_range<dht::token> range, repair_checksum hash_version);
future<partition_checksum> send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, dht::token_range range, repair_checksum hash_version);
// Wrapper for GOSSIP_ECHO verb
void register_gossip_echo(std::function<future<> ()>&& func);

View File

@@ -102,13 +102,13 @@ wrap(std::vector<nonwrapping_range<T>>&& v) {
}
inline
std::vector<nonwrapping_range<dht::token>>
std::vector<dht::token_range>
unwrap(const std::vector<wrapping_range<dht::token>>& v) {
return unwrap(v, dht::token_comparator());
}
inline
std::vector<nonwrapping_range<dht::token>>
std::vector<dht::token_range>
unwrap(std::vector<wrapping_range<dht::token>>&& v) {
return unwrap(std::move(v), dht::token_comparator());
}

View File

@@ -42,14 +42,14 @@ static logging::logger logger("repair");
struct failed_range {
sstring cf;
::nonwrapping_range<dht::token> range;
::dht::token_range range;
};
class repair_info {
public:
seastar::sharded<database>& db;
sstring keyspace;
std::vector<nonwrapping_range<dht::token>> ranges;
std::vector<dht::token_range> ranges;
std::vector<sstring> cfs;
int id;
std::vector<sstring> data_centers;
@@ -64,7 +64,7 @@ public:
public:
repair_info(seastar::sharded<database>& db_,
const sstring& keyspace_,
const std::vector<nonwrapping_range<dht::token>>& ranges_,
const std::vector<dht::token_range>& ranges_,
const std::vector<sstring>& cfs_,
int id_,
const std::vector<sstring>& data_centers_,
@@ -101,7 +101,7 @@ public:
}
}
void request_transfer_ranges(const sstring& cf,
const ::nonwrapping_range<dht::token>& range,
const ::dht::token_range& range,
const std::vector<gms::inet_address>& neighbors_in,
const std::vector<gms::inet_address>& neighbors_out) {
for (const auto& peer : neighbors_in) {
@@ -458,7 +458,7 @@ static future<partition_checksum> checksum_range_shard(database &db,
// function is not resolved.
future<partition_checksum> checksum_range(seastar::sharded<database> &db,
const sstring& keyspace, const sstring& cf,
const ::nonwrapping_range<dht::token>& range, repair_checksum hash_version) {
const ::dht::token_range& range, repair_checksum hash_version) {
auto& schema = db.local().find_column_family(keyspace, cf).schema();
auto shard_ranges = dht::split_range_to_shards(dht::to_partition_range(range), *schema);
return do_with(partition_checksum(), std::move(shard_ranges), [&db, &keyspace, &cf, hash_version] (auto& result, auto& shard_ranges) {
@@ -478,8 +478,8 @@ future<partition_checksum> checksum_range(seastar::sharded<database> &db,
});
}
static void split_and_add(std::vector<::nonwrapping_range<dht::token>>& ranges,
const nonwrapping_range<dht::token>& range,
static void split_and_add(std::vector<::dht::token_range>& ranges,
const dht::token_range& range,
uint64_t estimated_partitions, uint64_t target_partitions) {
if (estimated_partitions < target_partitions) {
// We're done, the range is small enough to not be split further
@@ -515,14 +515,14 @@ static thread_local semaphore parallelism_semaphore(parallelism);
// Repair a single cf in a single local range.
// Comparable to RepairJob in Origin.
static future<> repair_cf_range(repair_info& ri,
sstring cf, ::nonwrapping_range<dht::token> range,
sstring cf, ::dht::token_range range,
const std::vector<gms::inet_address>& neighbors) {
if (neighbors.empty()) {
// Nothing to do in this case...
return make_ready_future<>();
}
std::vector<::nonwrapping_range<dht::token>> ranges;
std::vector<::dht::token_range> ranges;
ranges.push_back(range);
// Additionally, we want to break up large ranges so they will have
@@ -538,7 +538,7 @@ static future<> repair_cf_range(repair_info& ri,
// FIXME: we should have an on-the-fly iterator generator here, not
// fill a vector in advance.
std::vector<::nonwrapping_range<dht::token>> tosplit;
std::vector<::dht::token_range> tosplit;
while (estimated_partitions > ri.target_partitions) {
tosplit.clear();
ranges.swap(tosplit);
@@ -741,24 +741,24 @@ static future<> repair_range(repair_info& ri, auto& range) {
});
}
static std::vector<nonwrapping_range<dht::token>> get_ranges_for_endpoint(
static std::vector<dht::token_range> get_ranges_for_endpoint(
database& db, sstring keyspace, gms::inet_address ep) {
auto& rs = db.find_keyspace(keyspace).get_replication_strategy();
return rs.get_ranges(ep);
}
static std::vector<nonwrapping_range<dht::token>> get_local_ranges(
static std::vector<dht::token_range> get_local_ranges(
database& db, sstring keyspace) {
return get_ranges_for_endpoint(db, keyspace, utils::fb_utilities::get_broadcast_address());
}
static std::vector<nonwrapping_range<dht::token>> get_primary_ranges_for_endpoint(
static std::vector<dht::token_range> get_primary_ranges_for_endpoint(
database& db, sstring keyspace, gms::inet_address ep) {
auto& rs = db.find_keyspace(keyspace).get_replication_strategy();
return rs.get_primary_ranges(ep);
}
static std::vector<nonwrapping_range<dht::token>> get_primary_ranges(
static std::vector<dht::token_range> get_primary_ranges(
database& db, sstring keyspace) {
return get_primary_ranges_for_endpoint(db, keyspace,
utils::fb_utilities::get_broadcast_address());
@@ -774,7 +774,7 @@ struct repair_options {
// If ranges is not empty, it overrides the repair's default heuristics
// for determining the list of ranges to repair. In particular, "ranges"
// overrides the setting of "primary_range".
std::vector<nonwrapping_range<dht::token>> ranges;
std::vector<dht::token_range> ranges;
// If start_token and end_token are set, they define a range which is
// intersected with the ranges actually held by this node to decide what
// to repair.
@@ -897,7 +897,7 @@ private:
// A range is expressed as start_token:end token and multiple ranges can
// be given as comma separated ranges(e.g. aaa:bbb,ccc:ddd).
static void ranges_opt(std::vector<nonwrapping_range<dht::token>>& var,
static void ranges_opt(std::vector<dht::token_range>& var,
std::unordered_map<sstring, sstring>& options,
const sstring& key) {
auto it = options.find(key);
@@ -918,7 +918,7 @@ private:
auto rng = wrapping_range<dht::token>(
::range<dht::token>::bound(tok_start, false),
::range<dht::token>::bound(tok_end, true));
compat::unwrap_into(std::move(rng), dht::token_comparator(), [&] (nonwrapping_range<dht::token>&& x) {
compat::unwrap_into(std::move(rng), dht::token_comparator(), [&] (dht::token_range&& x) {
var.push_back(std::move(x));
});
}
@@ -989,7 +989,7 @@ static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
// local ranges (the token ranges for which this node holds a replica of).
// Each of these ranges may have a different set of replicas, so the
// repair of each range is performed separately with repair_range().
std::vector<nonwrapping_range<dht::token>> ranges;
std::vector<dht::token_range> ranges;
if (options.ranges.size()) {
ranges = options.ranges;
} else if (options.primary_range) {
@@ -1032,8 +1032,8 @@ static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
dht::global_partitioner().from_sstring(options.end_token),
false);
}
nonwrapping_range<dht::token> given_range_complement(tok_end, tok_start);
std::vector<nonwrapping_range<dht::token>> intersections;
dht::token_range given_range_complement(tok_end, tok_start);
std::vector<dht::token_range> intersections;
for (const auto& range : ranges) {
auto rs = range.subtract(given_range_complement,
dht::token_comparator());

View File

@@ -107,7 +107,7 @@ public:
// not resolved.
future<partition_checksum> checksum_range(seastar::sharded<database> &db,
const sstring& keyspace, const sstring& cf,
const ::nonwrapping_range<dht::token>& range, repair_checksum rt);
const ::dht::token_range& range, repair_checksum rt);
namespace std {
template<>

View File

@@ -554,12 +554,12 @@ storage_service::get_rpc_address(const inet_address& endpoint) const {
return boost::lexical_cast<std::string>(endpoint);
}
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>>
std::unordered_map<dht::token_range, std::vector<inet_address>>
storage_service::get_range_to_address_map(const sstring& keyspace) const {
return get_range_to_address_map(keyspace, _token_metadata.sorted_tokens());
}
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>>
std::unordered_map<dht::token_range, std::vector<inet_address>>
storage_service::get_range_to_address_map_in_local_dc(
const sstring& keyspace) const {
std::function<bool(const inet_address&)> filter = [this](const inet_address& address) {
@@ -567,7 +567,7 @@ storage_service::get_range_to_address_map_in_local_dc(
};
auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc());
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> filtered_map;
std::unordered_map<dht::token_range, std::vector<inet_address>> filtered_map;
for (auto entry : orig_map) {
auto& addresses = filtered_map[entry.first];
addresses.reserve(entry.second.size());
@@ -595,7 +595,7 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
return remote_dc == local_dc;
}
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>>
std::unordered_map<dht::token_range, std::vector<inet_address>>
storage_service::get_range_to_address_map(const sstring& keyspace,
const std::vector<token>& sorted_tokens) const {
// some people just want to get a visual representation of things. Allow null and set it to the first
@@ -2149,7 +2149,7 @@ future<> storage_service::removenode(sstring host_id_string) {
// get all ranges that change ownership (that is, a node needs
// to take responsibility for new range)
std::unordered_multimap<nonwrapping_range<token>, inet_address> changed_ranges =
std::unordered_multimap<dht::token_range, inet_address> changed_ranges =
ss.get_changed_ranges_for_leaving(keyspace_name, endpoint);
auto& fd = gms::get_local_failure_detector();
for (auto& x: changed_ranges) {
@@ -2367,13 +2367,13 @@ future<bool> storage_service::is_initialized() {
});
}
std::unordered_multimap<nonwrapping_range<token>, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
std::unordered_multimap<dht::token_range, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
// First get all ranges the leaving endpoint is responsible for
auto ranges = get_ranges_for_endpoint(keyspace_name, endpoint);
logger.debug("Node {} ranges [{}]", endpoint, ranges);
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> current_replica_endpoints;
std::unordered_map<dht::token_range, std::vector<inet_address>> current_replica_endpoints;
// Find (for each range) all nodes that store replicas for these ranges as well
auto metadata = _token_metadata.clone_only_token_map(); // don't do this in the loop! #7758
@@ -2392,7 +2392,7 @@ std::unordered_multimap<nonwrapping_range<token>, inet_address> storage_service:
temp.remove_endpoint(endpoint);
}
std::unordered_multimap<nonwrapping_range<token>, inet_address> changed_ranges;
std::unordered_multimap<dht::token_range, inet_address> changed_ranges;
// Go through the ranges and for each range check who will be
// storing replicas for these ranges when the leaving endpoint
@@ -2406,7 +2406,7 @@ std::unordered_multimap<nonwrapping_range<token>, inet_address> storage_service:
auto rg = current_replica_endpoints.equal_range(r);
for (auto it = rg.first; it != rg.second; it++) {
const nonwrapping_range<token>& range_ = it->first;
const dht::token_range& range_ = it->first;
std::vector<inet_address>& current_eps = it->second;
logger.debug("range={}, current_replica_endpoints={}, new_replica_endpoints={}", range_, current_eps, new_replica_endpoints);
for (auto ep : it->second) {
@@ -2433,7 +2433,7 @@ std::unordered_multimap<nonwrapping_range<token>, inet_address> storage_service:
// Runs inside seastar::async context
void storage_service::unbootstrap() {
std::unordered_map<sstring, std::unordered_multimap<nonwrapping_range<token>, inet_address>> ranges_to_stream;
std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream;
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
@@ -2474,21 +2474,21 @@ void storage_service::unbootstrap() {
}
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
std::unordered_multimap<sstring, std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>>> ranges_to_fetch;
std::unordered_multimap<sstring, std::unordered_map<inet_address, std::vector<dht::token_range>>> ranges_to_fetch;
auto my_address = get_broadcast_address();
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
std::unordered_multimap<nonwrapping_range<token>, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint);
std::vector<nonwrapping_range<token>> my_new_ranges;
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint);
std::vector<dht::token_range> my_new_ranges;
for (auto& x : changed_ranges) {
if (x.second == my_address) {
my_new_ranges.emplace_back(x.first);
}
}
std::unordered_multimap<inet_address, nonwrapping_range<token>> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges);
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>> tmp;
std::unordered_multimap<inet_address, dht::token_range> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges);
std::unordered_map<inet_address, std::vector<dht::token_range>> tmp;
for (auto& x : source_ranges) {
tmp[x.first].emplace_back(x.second);
}
@@ -2497,7 +2497,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
auto sp = make_lw_shared<streaming::stream_plan>("Restore replica count");
for (auto& x: ranges_to_fetch) {
const sstring& keyspace_name = x.first;
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>>& maps = x.second;
std::unordered_map<inet_address, std::vector<dht::token_range>>& maps = x.second;
for (auto& m : maps) {
auto source = m.first;
auto ranges = m.second;
@@ -2598,9 +2598,9 @@ void storage_service::leave_ring() {
}
future<>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<nonwrapping_range<token>, inet_address>> ranges_to_stream_by_keyspace) {
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream_by_keyspace) {
// First, we build a list of ranges to stream to each host, per table
std::unordered_map<sstring, std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>>> sessions_to_stream_by_keyspace;
std::unordered_map<sstring, std::unordered_map<inet_address, std::vector<dht::token_range>>> sessions_to_stream_by_keyspace;
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
@@ -2609,9 +2609,9 @@ storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multim
continue;
}
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>> ranges_per_endpoint;
std::unordered_map<inet_address, std::vector<dht::token_range>> ranges_per_endpoint;
for (auto& end_point_entry : ranges_with_endpoints) {
nonwrapping_range<token> r = end_point_entry.first;
dht::token_range r = end_point_entry.first;
inet_address endpoint = end_point_entry.second;
ranges_per_endpoint[endpoint].emplace_back(r);
}
@@ -2666,7 +2666,7 @@ future<> storage_service::stream_hints() {
auto hints_destination_host = candidates.front();
// stream all hints -- range list will be a singleton of "the entire ring"
std::vector<nonwrapping_range<token>> ranges = {nonwrapping_range<token>::make_open_ended_both_sides()};
std::vector<dht::token_range> ranges = {dht::token_range::make_open_ended_both_sides()};
logger.debug("stream_hints: ranges={}", ranges);
auto sp = make_lw_shared<streaming::stream_plan>("Hints");
@@ -2836,15 +2836,15 @@ future<> storage_service::shutdown_client_servers() {
return do_stop_rpc_server().then([this] { return do_stop_native_transport(); });
}
std::unordered_multimap<inet_address, nonwrapping_range<token>>
storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::vector<nonwrapping_range<token>>& ranges) {
std::unordered_multimap<inet_address, dht::token_range>
storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::vector<dht::token_range>& ranges) {
auto my_address = get_broadcast_address();
auto& fd = gms::get_local_failure_detector();
auto& ks = _db.local().find_keyspace(keyspace_name);
auto& strat = ks.get_replication_strategy();
auto tm = _token_metadata.clone_only_token_map();
std::unordered_multimap<nonwrapping_range<token>, inet_address> range_addresses = strat.get_range_addresses(tm);
std::unordered_multimap<inet_address, nonwrapping_range<token>> source_ranges;
std::unordered_multimap<dht::token_range, inet_address> range_addresses = strat.get_range_addresses(tm);
std::unordered_multimap<inet_address, dht::token_range> source_ranges;
// find alive sources for our new ranges
for (auto r : ranges) {
@@ -2873,10 +2873,10 @@ storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::
return source_ranges;
}
std::pair<std::unordered_set<nonwrapping_range<token>>, std::unordered_set<nonwrapping_range<token>>>
storage_service::calculate_stream_and_fetch_ranges(const std::vector<nonwrapping_range<token>>& current, const std::vector<nonwrapping_range<token>>& updated) {
std::unordered_set<nonwrapping_range<token>> to_stream;
std::unordered_set<nonwrapping_range<token>> to_fetch;
std::pair<std::unordered_set<dht::token_range>, std::unordered_set<dht::token_range>>
storage_service::calculate_stream_and_fetch_ranges(const std::vector<dht::token_range>& current, const std::vector<dht::token_range>& updated) {
std::unordered_set<dht::token_range> to_stream;
std::unordered_set<dht::token_range> to_fetch;
for (auto r1 : current) {
bool intersect = false;
@@ -2917,7 +2917,7 @@ storage_service::calculate_stream_and_fetch_ranges(const std::vector<nonwrapping
logger.debug("to_fetch = {}", to_fetch);
}
return std::pair<std::unordered_set<nonwrapping_range<token>>, std::unordered_set<nonwrapping_range<token>>>(to_stream, to_fetch);
return std::pair<std::unordered_set<dht::token_range>, std::unordered_set<dht::token_range>>(to_stream, to_fetch);
}
void storage_service::range_relocator::calculate_to_from_streams(std::unordered_set<token> new_tokens, std::vector<sstring> keyspace_names) {
@@ -2937,30 +2937,30 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
auto& ks = ss._db.local().find_keyspace(keyspace);
auto& strategy = ks.get_replication_strategy();
// getting collection of the currently used ranges by this keyspace
std::vector<nonwrapping_range<token>> current_ranges = ss.get_ranges_for_endpoint(keyspace, local_address);
std::vector<dht::token_range> current_ranges = ss.get_ranges_for_endpoint(keyspace, local_address);
// collection of ranges which this node will serve after move to the new token
std::vector<nonwrapping_range<token>> updated_ranges = strategy.get_pending_address_ranges(token_meta_clone, new_token, local_address);
std::vector<dht::token_range> updated_ranges = strategy.get_pending_address_ranges(token_meta_clone, new_token, local_address);
// ring ranges and endpoints associated with them
// this used to determine what nodes should we ping about range data
std::unordered_multimap<nonwrapping_range<token>, inet_address> range_addresses = strategy.get_range_addresses(token_meta_clone);
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> range_addresses_map;
std::unordered_multimap<dht::token_range, inet_address> range_addresses = strategy.get_range_addresses(token_meta_clone);
std::unordered_map<dht::token_range, std::vector<inet_address>> range_addresses_map;
for (auto& x : range_addresses) {
range_addresses_map[x.first].emplace_back(x.second);
}
// calculated parts of the ranges to request/stream from/to nodes in the ring
// std::pair(to_stream, to_fetch)
std::pair<std::unordered_set<nonwrapping_range<token>>, std::unordered_set<nonwrapping_range<token>>> ranges_per_keyspace =
std::pair<std::unordered_set<dht::token_range>, std::unordered_set<dht::token_range>> ranges_per_keyspace =
ss.calculate_stream_and_fetch_ranges(current_ranges, updated_ranges);
/**
* In this loop we are going through all ranges "to fetch" and determining
* nodes in the ring responsible for data we are interested in
*/
std::unordered_multimap<nonwrapping_range<token>, inet_address> ranges_to_fetch_with_preferred_endpoints;
for (nonwrapping_range<token> to_fetch : ranges_per_keyspace.second) {
std::unordered_multimap<dht::token_range, inet_address> ranges_to_fetch_with_preferred_endpoints;
for (dht::token_range to_fetch : ranges_per_keyspace.second) {
for (auto& x : range_addresses_map) {
const nonwrapping_range<token>& r = x.first;
const dht::token_range& r = x.first;
std::vector<inet_address>& eps = x.second;
if (r.contains(to_fetch, dht::token_comparator())) {
std::vector<inet_address> endpoints;
@@ -3023,9 +3023,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
}
// calculating endpoints to stream current ranges to if needed
// in some situations node will handle current ranges as part of the new ranges
std::unordered_multimap<inet_address, nonwrapping_range<token>> endpoint_ranges;
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>> endpoint_ranges_map;
for (nonwrapping_range<token> to_stream : ranges_per_keyspace.first) {
std::unordered_multimap<inet_address, dht::token_range> endpoint_ranges;
std::unordered_map<inet_address, std::vector<dht::token_range>> endpoint_ranges_map;
for (dht::token_range to_stream : ranges_per_keyspace.first) {
auto end_token = to_stream.end() ? to_stream.end()->value() : dht::maximum_token();
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone);
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled);
@@ -3054,9 +3054,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
}
// stream requests
std::unordered_multimap<inet_address, nonwrapping_range<token>> work =
std::unordered_multimap<inet_address, dht::token_range> work =
dht::range_streamer::get_work_map(ranges_to_fetch_with_preferred_endpoints, keyspace);
std::unordered_map<inet_address, std::vector<nonwrapping_range<token>>> work_map;
std::unordered_map<inet_address, std::vector<dht::token_range>> work_map;
for (auto& x : work) {
work_map[x.first].emplace_back(x.second);
}
@@ -3135,7 +3135,7 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
std::vector<token_range_endpoints> ranges;
//Token.TokenFactory tf = getPartitioner().getTokenFactory();
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> range_to_address_map =
std::unordered_map<dht::token_range, std::vector<inet_address>> range_to_address_map =
include_only_local_dc
? get_range_to_address_map_in_local_dc(keyspace)
: get_range_to_address_map(keyspace);
@@ -3179,11 +3179,11 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
return ranges;
}
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>>
std::unordered_map<dht::token_range, std::vector<inet_address>>
storage_service::construct_range_to_endpoint_map(
const sstring& keyspace,
const std::vector<nonwrapping_range<token>>& ranges) const {
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> res;
const std::vector<dht::token_range>& ranges) const {
std::unordered_map<dht::token_range, std::vector<inet_address>> res;
for (auto r : ranges) {
res[r] = _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(
r.end() ? r.end()->value() : dht::maximum_token());
@@ -3331,16 +3331,16 @@ future<> storage_service::force_remove_completion() {
/**
* Takes an ordered list of adjacent tokens and divides them in the specified number of ranges.
*/
static std::vector<std::pair<nonwrapping_range<dht::token>, uint64_t>>
static std::vector<std::pair<dht::token_range, uint64_t>>
calculate_splits(std::vector<dht::token> tokens, uint32_t split_count, column_family& cf) {
auto sstables = cf.get_sstables();
const double step = static_cast<double>(tokens.size() - 1) / split_count;
auto prev_token_idx = 0;
std::vector<std::pair<nonwrapping_range<dht::token>, uint64_t>> splits;
std::vector<std::pair<dht::token_range, uint64_t>> splits;
splits.reserve(split_count);
for (uint32_t i = 1; i <= split_count; ++i) {
auto index = static_cast<uint32_t>(std::round(i * step));
nonwrapping_range<dht::token> range({{ std::move(tokens[prev_token_idx]), false }}, {{ tokens[index], true }});
dht::token_range range({{ std::move(tokens[prev_token_idx]), false }}, {{ tokens[index], true }});
// always return an estimate > 0 (see CASSANDRA-7322)
uint64_t estimated_keys_for_range = 0;
for (auto&& sst : *sstables) {
@@ -3352,9 +3352,9 @@ calculate_splits(std::vector<dht::token> tokens, uint32_t split_count, column_fa
return splits;
};
std::vector<std::pair<nonwrapping_range<dht::token>, uint64_t>>
std::vector<std::pair<dht::token_range, uint64_t>>
storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, range<dht::token> range, uint32_t keys_per_split) {
using range_type = nonwrapping_range<dht::token>;
using range_type = dht::token_range;
auto& cf = _db.local().find_column_family(ks_name, cf_name);
auto schema = cf.schema();
auto sstables = cf.get_sstables();
@@ -3389,23 +3389,23 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang
return calculate_splits(std::move(tokens), split_count, cf);
};
std::vector<nonwrapping_range<token>>
std::vector<dht::token_range>
storage_service::get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const {
return _db.local().find_keyspace(name).get_replication_strategy().get_ranges(ep);
}
std::vector<nonwrapping_range<token>>
std::vector<dht::token_range>
storage_service::get_all_ranges(const std::vector<token>& sorted_tokens) const {
if (sorted_tokens.empty())
return std::vector<nonwrapping_range<token>>();
return std::vector<dht::token_range>();
int size = sorted_tokens.size();
std::vector<nonwrapping_range<token>> ranges;
ranges.push_back(nonwrapping_range<token>::make_ending_with(range_bound<token>(sorted_tokens[0], true)));
std::vector<dht::token_range> ranges;
ranges.push_back(dht::token_range::make_ending_with(range_bound<token>(sorted_tokens[0], true)));
for (int i = 1; i < size; ++i) {
nonwrapping_range<token> r(range<token>::bound(sorted_tokens[i - 1], false), range<token>::bound(sorted_tokens[i], true));
dht::token_range r(range<token>::bound(sorted_tokens[i - 1], false), range<token>::bound(sorted_tokens[i], true));
ranges.push_back(r);
}
ranges.push_back(nonwrapping_range<token>::make_starting_with(range_bound<token>(sorted_tokens[size-1], false)));
ranges.push_back(dht::token_range::make_starting_with(range_bound<token>(sorted_tokens[size-1], false)));
return ranges;
}

View File

@@ -185,7 +185,7 @@ public:
}
#endif
public:
std::vector<nonwrapping_range<token>> get_local_ranges(const sstring& keyspace_name) {
std::vector<dht::token_range> get_local_ranges(const sstring& keyspace_name) {
return get_ranges_for_endpoint(keyspace_name, get_broadcast_address());
}
#if 0
@@ -529,16 +529,16 @@ public:
return map;
}
#endif
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace) const;
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace) const;
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> get_range_to_address_map_in_local_dc(
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_to_address_map_in_local_dc(
const sstring& keyspace) const;
std::vector<token> get_tokens_in_local_dc() const;
bool is_local_dc(const inet_address& targetHost) const;
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace,
std::unordered_map<dht::token_range, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace,
const std::vector<token>& sorted_tokens) const;
/**
@@ -596,9 +596,9 @@ public:
* @param ranges
* @return mapping of ranges to the replicas responsible for them.
*/
std::unordered_map<nonwrapping_range<token>, std::vector<inet_address>> construct_range_to_endpoint_map(
std::unordered_map<dht::token_range, std::vector<inet_address>> construct_range_to_endpoint_map(
const sstring& keyspace,
const std::vector<nonwrapping_range<token>>& ranges) const;
const std::vector<dht::token_range>& ranges) const;
public:
virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override;
@@ -757,7 +757,7 @@ private:
* @param ranges the ranges to find sources for
* @return multimap of addresses to ranges the address is responsible for
*/
std::unordered_multimap<inet_address, nonwrapping_range<token>> get_new_source_ranges(const sstring& keyspaceName, const std::vector<nonwrapping_range<token>>& ranges);
std::unordered_multimap<inet_address, dht::token_range> get_new_source_ranges(const sstring& keyspaceName, const std::vector<dht::token_range>& ranges);
public:
future<> confirm_replication(inet_address node);
@@ -783,7 +783,7 @@ private:
future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint);
// needs to be modified to accept either a keyspace or ARS.
std::unordered_multimap<nonwrapping_range<token>, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
std::unordered_multimap<dht::token_range, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
public:
/** raw load value */
double get_load();
@@ -1600,7 +1600,7 @@ public:
* @param ep endpoint we are interested in.
* @return ranges for the specified endpoint.
*/
std::vector<nonwrapping_range<token>> get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const;
std::vector<dht::token_range> get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const;
/**
* Get all ranges that span the ring given a set
@@ -1608,7 +1608,7 @@ public:
* ranges.
* @return ranges in sorted order
*/
std::vector<nonwrapping_range<token>> get_all_ranges(const std::vector<token>& sorted_tokens) const;
std::vector<dht::token_range> get_all_ranges(const std::vector<token>& sorted_tokens) const;
/**
* This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication.
@@ -1713,7 +1713,7 @@ public:
* @return Vector of Token ranges (_not_ keys!) together with estimated key count,
* breaking up the data this node is responsible for into pieces of roughly keys_per_split
*/
std::vector<std::pair<nonwrapping_range<dht::token>, uint64_t>> get_splits(const sstring& ks_name,
std::vector<std::pair<dht::token_range, uint64_t>> get_splits(const sstring& ks_name,
const sstring& cf_name,
range<dht::token> range,
uint32_t keys_per_split);
@@ -1982,7 +1982,7 @@ private:
* @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each
* @return async Future for whether stream was success
*/
future<> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<nonwrapping_range<token>, inet_address>> ranges_to_stream_by_keyspace);
future<> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream_by_keyspace);
public:
/**
@@ -1993,8 +1993,8 @@ public:
* @param updated collection of the ranges after token is changed
* @return pair of ranges to stream/fetch for given current and updated range collections
*/
std::pair<std::unordered_set<nonwrapping_range<token>>, std::unordered_set<nonwrapping_range<token>>>
calculate_stream_and_fetch_ranges(const std::vector<nonwrapping_range<token>>& current, const std::vector<nonwrapping_range<token>>& updated);
std::pair<std::unordered_set<dht::token_range>, std::unordered_set<dht::token_range>>
calculate_stream_and_fetch_ranges(const std::vector<dht::token_range>& current, const std::vector<dht::token_range>& updated);
#if 0
public void bulkLoad(String directory)
{

View File

@@ -99,7 +99,7 @@ static api::timestamp_type get_max_purgeable_timestamp(const column_family& cf,
return timestamp;
}
static bool belongs_to_current_node(const dht::token& t, const std::vector<nonwrapping_range<dht::token>>& sorted_owned_ranges) {
static bool belongs_to_current_node(const dht::token& t, const std::vector<dht::token_range>& sorted_owned_ranges) {
auto low = std::lower_bound(sorted_owned_ranges.begin(), sorted_owned_ranges.end(), t,
[] (const range<dht::token>& a, const dht::token& b) {
// check that range a is before token b.
@@ -107,7 +107,7 @@ static bool belongs_to_current_node(const dht::token& t, const std::vector<nonwr
});
if (low != sorted_owned_ranges.end()) {
const nonwrapping_range<dht::token>& r = *low;
const dht::token_range& r = *low;
return r.contains(t, dht::token_comparator());
}
@@ -272,7 +272,7 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
info->cf = schema->cf_name();
logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg);
std::vector<nonwrapping_range<dht::token>> owned_ranges;
std::vector<dht::token_range> owned_ranges;
if (cleanup) {
owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name());
}

View File

@@ -62,7 +62,7 @@ extern logging::logger logger;
class incremental_selector_impl {
public:
virtual ~incremental_selector_impl() {}
virtual std::pair<nonwrapping_range<dht::token>, std::vector<shared_sstable>> select(const dht::token& token) = 0;
virtual std::pair<dht::token_range, std::vector<shared_sstable>> select(const dht::token& token) = 0;
};
class sstable_set_impl {
@@ -173,8 +173,8 @@ public:
incremental_selector(const std::vector<shared_sstable>& sstables)
: _sstables(sstables) {
}
virtual std::pair<nonwrapping_range<dht::token>, std::vector<shared_sstable>> select(const dht::token& token) override {
return std::make_pair(nonwrapping_range<dht::token>::make_open_ended_both_sides(), _sstables);
virtual std::pair<dht::token_range, std::vector<shared_sstable>> select(const dht::token& token) override {
return std::make_pair(dht::token_range::make_open_ended_both_sides(), _sstables);
}
};
@@ -267,8 +267,8 @@ class partitioned_sstable_set::incremental_selector : public incremental_selecto
map_iterator _it;
const map_iterator _end;
private:
static nonwrapping_range<dht::token> to_token_range(const interval_type& i) {
return nonwrapping_range<dht::token>::make({i.lower().token(), boost::icl::is_left_closed(i.bounds())},
static dht::token_range to_token_range(const interval_type& i) {
return dht::token_range::make({i.lower().token(), boost::icl::is_left_closed(i.bounds())},
{i.upper().token(), boost::icl::is_right_closed(i.bounds())});
}
public:
@@ -277,7 +277,7 @@ public:
, _it(sstables.begin())
, _end(sstables.end()) {
}
virtual std::pair<nonwrapping_range<dht::token>, std::vector<shared_sstable>> select(const dht::token& token) override {
virtual std::pair<dht::token_range, std::vector<shared_sstable>> select(const dht::token& token) override {
auto pr = query::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token));
auto interval = make_interval(*_schema, std::move(pr));
@@ -287,12 +287,12 @@ public:
}
// we don't want to skip current interval if token lies before it.
if (boost::icl::lower_less(interval, _it->first)) {
return std::make_pair(nonwrapping_range<dht::token>::make({token, true}, {_it->first.lower().token(), false}),
return std::make_pair(dht::token_range::make({token, true}, {_it->first.lower().token(), false}),
std::vector<shared_sstable>());
}
_it++;
}
return std::make_pair(nonwrapping_range<dht::token>::make_open_ended_both_sides(), std::vector<shared_sstable>());
return std::make_pair(dht::token_range::make_open_ended_both_sides(), std::vector<shared_sstable>());
}
};

View File

@@ -53,7 +53,7 @@ public:
// selector is used.
class incremental_selector {
std::unique_ptr<incremental_selector_impl> _impl;
mutable stdx::optional<nonwrapping_range<dht::token>> _current_token_range;
mutable stdx::optional<dht::token_range> _current_token_range;
mutable std::vector<shared_sstable> _current_sstables;
public:
~incremental_selector();

View File

@@ -2545,7 +2545,7 @@ void sstable::mark_sstable_for_deletion(const schema_ptr& schema, sstring dir, i
* Returns a pair of positions [p1, p2) in the summary file corresponding to entries
* covered by the specified range, or a disengaged optional if no such pair exists.
*/
stdx::optional<std::pair<uint64_t, uint64_t>> sstable::get_sample_indexes_for_range(const nonwrapping_range<dht::token>& range) {
stdx::optional<std::pair<uint64_t, uint64_t>> sstable::get_sample_indexes_for_range(const dht::token_range& range) {
auto entries_size = _summary.entries.size();
auto search = [this](bool before, const dht::token& token) {
auto kind = before ? key::kind::before_all_keys : key::kind::after_all_keys;
@@ -2575,7 +2575,7 @@ stdx::optional<std::pair<uint64_t, uint64_t>> sstable::get_sample_indexes_for_ra
return stdx::nullopt;
}
std::vector<dht::decorated_key> sstable::get_key_samples(const schema& s, const nonwrapping_range<dht::token>& range) {
std::vector<dht::decorated_key> sstable::get_key_samples(const schema& s, const dht::token_range& range) {
auto index_range = get_sample_indexes_for_range(range);
std::vector<dht::decorated_key> res;
if (index_range) {
@@ -2587,7 +2587,7 @@ std::vector<dht::decorated_key> sstable::get_key_samples(const schema& s, const
return res;
}
uint64_t sstable::estimated_keys_for_range(const nonwrapping_range<dht::token>& range) {
uint64_t sstable::estimated_keys_for_range(const dht::token_range& range) {
auto sample_index_range = get_sample_indexes_for_range(range);
uint64_t sample_key_count = sample_index_range ? sample_index_range->second - sample_index_range->first : 0;
// adjust for the current sampling level

View File

@@ -294,9 +294,9 @@ public:
_summary.header.min_index_interval;
}
uint64_t estimated_keys_for_range(const nonwrapping_range<dht::token>& range);
uint64_t estimated_keys_for_range(const dht::token_range& range);
std::vector<dht::decorated_key> get_key_samples(const schema& s, const nonwrapping_range<dht::token>& range);
std::vector<dht::decorated_key> get_key_samples(const schema& s, const dht::token_range& range);
// mark_for_deletion() specifies that a sstable isn't relevant to the
// current shard, and thus can be deleted by the deletion manager, if
@@ -599,7 +599,7 @@ private:
}
void write_collection(file_writer& out, const composite& clustering_key, const column_definition& cdef, collection_mutation_view collection);
stdx::optional<std::pair<uint64_t, uint64_t>> get_sample_indexes_for_range(const nonwrapping_range<dht::token>& range);
stdx::optional<std::pair<uint64_t, uint64_t>> get_sample_indexes_for_range(const dht::token_range& range);
public:
future<> read_toc();

View File

@@ -44,22 +44,22 @@ namespace streaming {
extern logging::logger sslog;
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector<nonwrapping_range<token>> ranges) {
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector<dht::token_range> ranges) {
return request_ranges(from, keyspace, std::move(ranges), {});
}
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families) {
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector<dht::token_range> ranges, std::vector<sstring> column_families) {
_range_added = true;
auto session = _coordinator->get_or_create_session(from);
session->add_stream_request(keyspace, std::move(ranges), std::move(column_families));
return *this;
}
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<nonwrapping_range<token>> ranges) {
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<dht::token_range> ranges) {
return transfer_ranges(to, keyspace, std::move(ranges), {});
}
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families) {
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<dht::token_range> ranges, std::vector<sstring> column_families) {
_range_added = true;
auto session = _coordinator->get_or_create_session(to);
session->add_transfer_ranges(keyspace, std::move(ranges), std::move(column_families));

View File

@@ -90,7 +90,7 @@ public:
* @param ranges ranges to fetch
* @return this object for chaining
*/
stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector<nonwrapping_range<token>> ranges);
stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector<dht::token_range> ranges);
/**
* Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node.
@@ -102,7 +102,7 @@ public:
* @param columnFamilies specific column families
* @return this object for chaining
*/
stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families);
stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector<dht::token_range> ranges, std::vector<sstring> column_families);
/**
* Add transfer task to send data of specific keyspace and ranges.
@@ -113,7 +113,7 @@ public:
* @param ranges ranges to send
* @return this object for chaining
*/
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<nonwrapping_range<token>> ranges);
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<dht::token_range> ranges);
/**
* Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
@@ -125,7 +125,7 @@ public:
* @param columnFamilies specific column families
* @return this object for chaining
*/
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families);
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<dht::token_range> ranges, std::vector<sstring> column_families);
stream_plan& listeners(std::vector<stream_event_handler*> handlers);
public:

View File

@@ -50,14 +50,14 @@ class stream_request {
public:
using token = dht::token;
sstring keyspace;
std::vector<nonwrapping_range<token>> ranges;
std::vector<dht::token_range> ranges;
// For compatibility with <= 1.5, we send wrapping ranges (though they will never wrap).
std::vector<wrapping_range<token>> ranges_compat() const {
return compat::wrap(ranges);
}
std::vector<sstring> column_families;
stream_request() = default;
stream_request(sstring _keyspace, std::vector<nonwrapping_range<token>> _ranges, std::vector<sstring> _column_families)
stream_request(sstring _keyspace, std::vector<dht::token_range> _ranges, std::vector<sstring> _column_families)
: keyspace(std::move(_keyspace))
, ranges(std::move(_ranges))
, column_families(std::move(_column_families)) {

View File

@@ -146,7 +146,7 @@ void stream_session::init_messaging_service_handler() {
});
});
});
ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, std::vector<nonwrapping_range<dht::token>> ranges, UUID cf_id, unsigned dst_cpu_id) {
ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, std::vector<dht::token_range> ranges, UUID cf_id, unsigned dst_cpu_id) {
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable {
auto session = get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id);
@@ -415,7 +415,7 @@ std::vector<column_family*> stream_session::get_column_family_stores(const sstri
return stores;
}
void stream_session::add_transfer_ranges(sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families) {
void stream_session::add_transfer_ranges(sstring keyspace, std::vector<dht::token_range> ranges, std::vector<sstring> column_families) {
auto cfs = get_column_family_stores(keyspace, column_families);
for (auto& cf : cfs) {
auto cf_id = cf->schema()->id();

View File

@@ -238,7 +238,7 @@ public:
* @param ranges Ranges to retrieve data
* @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace.
*/
void add_stream_request(sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families) {
void add_stream_request(sstring keyspace, std::vector<dht::token_range> ranges, std::vector<sstring> column_families) {
_requests.emplace_back(std::move(keyspace), std::move(ranges), std::move(column_families));
}
@@ -253,7 +253,7 @@ public:
* @param flushTables flush tables?
* @param repairedAt the time the repair started.
*/
void add_transfer_ranges(sstring keyspace, std::vector<nonwrapping_range<token>> ranges, std::vector<sstring> column_families);
void add_transfer_ranges(sstring keyspace, std::vector<dht::token_range> ranges, std::vector<sstring> column_families);
std::vector<column_family*> get_column_family_stores(const sstring& keyspace, const std::vector<sstring>& column_families);

View File

@@ -57,7 +57,7 @@ namespace streaming {
extern logging::logger sslog;
stream_transfer_task::stream_transfer_task(shared_ptr<stream_session> session, UUID cf_id, std::vector<nonwrapping_range<dht::token>> ranges, long total_size)
stream_transfer_task::stream_transfer_task(shared_ptr<stream_session> session, UUID cf_id, std::vector<dht::token_range> ranges, long total_size)
: stream_task(session, cf_id)
, _ranges(std::move(ranges))
, _total_size(total_size) {
@@ -167,13 +167,13 @@ void stream_transfer_task::start() {
});
}
void stream_transfer_task::append_ranges(const std::vector<nonwrapping_range<dht::token>>& ranges) {
void stream_transfer_task::append_ranges(const std::vector<dht::token_range>& ranges) {
_ranges.insert(_ranges.end(), ranges.begin(), ranges.end());
}
void stream_transfer_task::sort_and_merge_ranges() {
boost::icl::interval_set<dht::token> myset;
std::vector<nonwrapping_range<dht::token>> ranges;
std::vector<dht::token_range> ranges;
sslog.debug("cf_id = {}, before ranges = {}, size={}", cf_id, _ranges, _ranges.size());
_ranges.swap(ranges);
for (auto& range : ranges) {
@@ -185,7 +185,7 @@ void stream_transfer_task::sort_and_merge_ranges() {
ranges.shrink_to_fit();
for (auto& i : myset) {
auto r = locator::token_metadata::interval_to_range(i);
_ranges.push_back(nonwrapping_range<dht::token>(r));
_ranges.push_back(dht::token_range(r));
}
sslog.debug("cf_id = {}, after ranges = {}, size={}", cf_id, _ranges, _ranges.size());
}

View File

@@ -58,13 +58,13 @@ private:
int32_t sequence_number = 0;
bool aborted = false;
// A stream_transfer_task always contains the same range to stream
std::vector<nonwrapping_range<dht::token>> _ranges;
std::vector<dht::token_range> _ranges;
std::map<unsigned, std::vector<query::partition_range>> _shard_ranges;
long _total_size;
public:
using UUID = utils::UUID;
stream_transfer_task(stream_transfer_task&&) = default;
stream_transfer_task(shared_ptr<stream_session> session, UUID cf_id, std::vector<nonwrapping_range<dht::token>> ranges, long total_size = 0);
stream_transfer_task(shared_ptr<stream_session> session, UUID cf_id, std::vector<dht::token_range> ranges, long total_size = 0);
~stream_transfer_task();
public:
virtual void abort() override {
@@ -80,7 +80,7 @@ public:
void start();
void append_ranges(const std::vector<nonwrapping_range<dht::token>>& ranges);
void append_ranges(const std::vector<dht::token_range>& ranges);
void sort_and_merge_ranges();
};

View File

@@ -251,7 +251,7 @@ auto get_item(std::string left, std::string right, std::string val) {
using value_type = std::unordered_set<std::string>;
auto l = dht::global_partitioner().from_sstring(left);
auto r = dht::global_partitioner().from_sstring(right);
auto rg = nonwrapping_range<dht::token>({{l, false}}, {r});
auto rg = dht::token_range({{l, false}}, {r});
value_type v{val};
return std::make_pair(locator::token_metadata::range_to_interval(rg), v);
}
@@ -277,7 +277,7 @@ BOOST_AUTO_TEST_CASE(test_range_interval_map) {
auto search_item = [&mymap] (std::string val) {
auto tok = dht::global_partitioner().from_sstring(val);
auto search = nonwrapping_range<token>(tok);
auto search = dht::token_range(tok);
auto it = mymap.find(locator::token_metadata::range_to_interval(search));
if (it != mymap.end()) {
std::cout << "Found OK:" << " token = " << tok << " in range: " << it->first << "\n";

View File

@@ -722,7 +722,7 @@ public:
void describe_splits_ex(tcxx::function<void(std::vector<CfSplit> const& _return)> cob, tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) {
with_cob(std::move(cob), std::move(exn_cob), [&]{
std::vector<nonwrapping_range<dht::token>> ranges;
std::vector<dht::token_range> ranges;
auto tstart = start_token.empty() ? dht::minimum_token() : dht::global_partitioner().from_sstring(sstring(start_token));
auto tend = end_token.empty() ? dht::maximum_token() : dht::global_partitioner().from_sstring(sstring(end_token));
range<dht::token> r({{ std::move(tstart), false }}, {{ std::move(tend), true }});