From d1178fa299565aeca89a64eebf58d51af9f8635e Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 14 Dec 2016 16:11:27 +0800 Subject: [PATCH] Convert to use dht::token_range --- database.cc | 8 +- db/size_estimates_virtual_reader.hh | 6 +- dht/boot_strapper.cc | 2 +- dht/i_partitioner.cc | 4 +- dht/i_partitioner.hh | 4 +- dht/range_streamer.cc | 34 +++---- dht/range_streamer.hh | 20 ++-- locator/abstract_replication_strategy.cc | 36 +++---- locator/abstract_replication_strategy.hh | 12 +-- locator/token_metadata.cc | 8 +- locator/token_metadata.hh | 4 +- main.cc | 2 +- message/messaging_service.cc | 8 +- message/messaging_service.hh | 8 +- partition_range_compat.hh | 4 +- repair/repair.cc | 40 ++++---- repair/repair.hh | 2 +- service/storage_service.cc | 118 +++++++++++------------ service/storage_service.hh | 28 +++--- sstables/compaction.cc | 6 +- sstables/compaction_strategy.cc | 16 +-- sstables/sstable_set.hh | 2 +- sstables/sstables.cc | 6 +- sstables/sstables.hh | 6 +- streaming/stream_plan.cc | 8 +- streaming/stream_plan.hh | 8 +- streaming/stream_request.hh | 4 +- streaming/stream_session.cc | 4 +- streaming/stream_session.hh | 4 +- streaming/stream_transfer_task.cc | 8 +- streaming/stream_transfer_task.hh | 6 +- tests/nonwrapping_range_test.cc | 4 +- thrift/handler.cc | 2 +- 33 files changed, 216 insertions(+), 216 deletions(-) diff --git a/database.cc b/database.cc index e19da1e9d0..861887be66 100644 --- a/database.cc +++ b/database.cc @@ -1362,13 +1362,13 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool } static bool needs_cleanup(const lw_shared_ptr& sst, - const lw_shared_ptr>>& owned_ranges, + const lw_shared_ptr>& owned_ranges, schema_ptr s) { auto first = sst->get_first_partition_key(); auto last = sst->get_last_partition_key(); auto first_token = dht::global_partitioner().get_token(*s, first); auto last_token = dht::global_partitioner().get_token(*s, last); - nonwrapping_range sst_token_range = nonwrapping_range::make(first_token, last_token); + dht::token_range sst_token_range = dht::token_range::make(first_token, last_token); // return true iff sst partition range isn't fully contained in any of the owned ranges. for (auto& r : *owned_ranges) { @@ -1380,8 +1380,8 @@ static bool needs_cleanup(const lw_shared_ptr& sst, } future<> column_family::cleanup_sstables(sstables::compaction_descriptor descriptor) { - std::vector> r = service::get_local_storage_service().get_local_ranges(_schema->ks_name()); - auto owned_ranges = make_lw_shared>>(std::move(r)); + std::vector r = service::get_local_storage_service().get_local_ranges(_schema->ks_name()); + auto owned_ranges = make_lw_shared>(std::move(r)); auto sstables_to_cleanup = make_lw_shared>(std::move(descriptor.sstables)); return parallel_for_each(*sstables_to_cleanup, [this, owned_ranges = std::move(owned_ranges), sstables_to_cleanup] (auto& sst) { diff --git a/db/size_estimates_virtual_reader.hh b/db/size_estimates_virtual_reader.hh index 2d6e496981..2c87fd24fb 100644 --- a/db/size_estimates_virtual_reader.hh +++ b/db/size_estimates_virtual_reader.hh @@ -85,7 +85,7 @@ public: return ss.get_local_tokens().then([&ss] (auto&& tokens) { auto ranges = ss.get_token_metadata().get_primary_ranges_for(std::move(tokens)); std::vector local_ranges; - auto to_bytes = [](const stdx::optional::bound>& b) { + auto to_bytes = [](const stdx::optional& b) { assert(b); return utf8_type->decompose(dht::global_partitioner().to_sstring(b->value())); }; @@ -230,7 +230,7 @@ private: /** * Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables. */ - static nonwrapping_range as_ring_position_range(nonwrapping_range& r) { + static nonwrapping_range as_ring_position_range(dht::token_range& r) { stdx::optional::bound> start_bound, end_bound; if (r.start()) { start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }}; @@ -250,7 +250,7 @@ private: auto from_bytes = [] (auto& b) { return dht::global_partitioner().from_sstring(utf8_type->to_string(b)); }; - std::vector> ranges; + std::vector ranges; compat::unwrap_into( wrapping_range({{ from_bytes(r.start) }}, {{ from_bytes(r.end) }}), dht::token_comparator(), diff --git a/dht/boot_strapper.cc b/dht/boot_strapper.cc index fec9ca8a0d..b66c0a2aec 100644 --- a/dht/boot_strapper.cc +++ b/dht/boot_strapper.cc @@ -54,7 +54,7 @@ future<> boot_strapper::bootstrap() { for (const auto& keyspace_name : _db.local().get_non_system_keyspaces()) { auto& ks = _db.local().find_keyspace(keyspace_name); auto& strategy = ks.get_replication_strategy(); - std::vector> ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address); + std::vector ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address); logger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges); streamer->add_ranges(keyspace_name, ranges); } diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index 4842bd1648..7519200d99 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -324,7 +324,7 @@ int ring_position::tri_compare(const schema& s, const ring_position& o) const { } nonwrapping_range -to_partition_range(nonwrapping_range r) { +to_partition_range(dht::token_range r) { using bound_opt = std::experimental::optional::bound>; auto start = r.start() ? bound_opt(dht::ring_position(r.start()->value(), @@ -356,7 +356,7 @@ split_range_to_shards(nonwrapping_range pr, const schema& s) { } std::map>> -split_ranges_to_shards(const std::vector>& ranges, const schema& s) { +split_ranges_to_shards(const std::vector& ranges, const schema& s) { std::map>> ret; for (const auto& range : ranges) { auto pr = dht::to_partition_range(range); diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index 9527b50b90..d36da77976 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -471,7 +471,7 @@ public: stdx::optional next(const schema& s); }; -nonwrapping_range to_partition_range(nonwrapping_range); +nonwrapping_range to_partition_range(dht::token_range); // Each shard gets a sorted, disjoint vector of ranges std::map>> @@ -480,7 +480,7 @@ split_range_to_shards(nonwrapping_range pr, const schema& s); // If input ranges are sorted and disjoint then the ranges for each shard // are also sorted and disjoint. std::map>> -split_ranges_to_shards(const std::vector>& ranges, const schema& s); +split_ranges_to_shards(const std::vector& ranges, const schema& s); } // dht diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index d8c79291d4..6cd24084a8 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -53,22 +53,22 @@ logging::logger logger("range_streamer"); using inet_address = gms::inet_address; -static std::unordered_map, std::unordered_set> -unordered_multimap_to_unordered_map(const std::unordered_multimap, inet_address>& multimap) { - std::unordered_map, std::unordered_set> ret; +static std::unordered_map> +unordered_multimap_to_unordered_map(const std::unordered_multimap& multimap) { + std::unordered_map> ret; for (auto x : multimap) { ret[x.first].emplace(x.second); } return ret; } -std::unordered_multimap> -range_streamer::get_range_fetch_map(const std::unordered_multimap, inet_address>& ranges_with_sources, +std::unordered_multimap +range_streamer::get_range_fetch_map(const std::unordered_multimap& ranges_with_sources, const std::unordered_set>& source_filters, const sstring& keyspace) { - std::unordered_multimap> range_fetch_map_map; + std::unordered_multimap range_fetch_map_map; for (auto x : unordered_multimap_to_unordered_map(ranges_with_sources)) { - const nonwrapping_range& range_ = x.first; + const dht::token_range& range_ = x.first; const std::unordered_set& addresses = x.second; bool found_source = false; for (auto address : addresses) { @@ -103,8 +103,8 @@ range_streamer::get_range_fetch_map(const std::unordered_multimap, inet_address> -range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector> desired_ranges) { +std::unordered_multimap +range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector desired_ranges) { logger.debug("{} ks={}", __func__, keyspace_name); auto& ks = _db.local().find_keyspace(keyspace_name); @@ -113,7 +113,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, st auto tm = _metadata.clone_only_token_map(); auto range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(tm)); - std::unordered_multimap, inet_address> range_sources; + std::unordered_multimap range_sources; auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); for (auto& desired_range : desired_ranges) { auto found = false; @@ -137,8 +137,8 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, st return range_sources; } -std::unordered_multimap, inet_address> -range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector> desired_ranges) { +std::unordered_multimap +range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector desired_ranges) { logger.debug("{} ks={}", __func__, keyspace_name); assert (_tokens.empty() == false); @@ -154,7 +154,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n auto pending_range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(metadata_clone)); //Collects the source that will have its range moved to the new node - std::unordered_multimap, inet_address> range_sources; + std::unordered_multimap range_sources; for (auto& desired_range : desired_ranges) { for (auto& x : range_addresses) { @@ -211,7 +211,7 @@ bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name) && _metadata.get_all_endpoints().size() != strat.get_replication_factor(); } -void range_streamer::add_ranges(const sstring& keyspace_name, std::vector> ranges) { +void range_streamer::add_ranges(const sstring& keyspace_name, std::vector ranges) { auto ranges_for_keyspace = use_strict_sources_for_ranges(keyspace_name) ? get_all_ranges_with_strict_sources_for(keyspace_name, ranges) : get_all_ranges_with_sources_for(keyspace_name, ranges); @@ -222,7 +222,7 @@ void range_streamer::add_ranges(const sstring& keyspace_name, std::vector>> range_fetch_map; + std::unordered_map> range_fetch_map; for (auto& x : get_range_fetch_map(ranges_for_keyspace, _source_filters, keyspace_name)) { range_fetch_map[x.first].emplace_back(x.second); } @@ -252,8 +252,8 @@ future range_streamer::fetch_async() { return _stream_plan.execute(); } -std::unordered_multimap> -range_streamer::get_work_map(const std::unordered_multimap, inet_address>& ranges_with_source_target, +std::unordered_multimap +range_streamer::get_work_map(const std::unordered_multimap& ranges_with_source_target, const sstring& keyspace) { auto filter = std::make_unique(gms::get_local_failure_detector()); std::unordered_set> source_filters; diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index ee88c051ce..9a6555f5de 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -118,22 +118,22 @@ public: _source_filters.emplace(std::move(filter)); } - void add_ranges(const sstring& keyspace_name, std::vector> ranges); + void add_ranges(const sstring& keyspace_name, std::vector ranges); private: bool use_strict_sources_for_ranges(const sstring& keyspace_name); /** * Get a map of all ranges and their respective sources that are candidates for streaming the given ranges * to us. For each range, the list of sources is sorted by proximity relative to the given destAddress. */ - std::unordered_multimap, inet_address> - get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector> desired_ranges); + std::unordered_multimap + get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector desired_ranges); /** * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges. * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating * consistency. */ - std::unordered_multimap, inet_address> - get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector> desired_ranges); + std::unordered_multimap + get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector desired_ranges); private: /** * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value) @@ -141,14 +141,14 @@ private: * here, we always exclude ourselves. * @return */ - static std::unordered_multimap> - get_range_fetch_map(const std::unordered_multimap, inet_address>& ranges_with_sources, + static std::unordered_multimap + get_range_fetch_map(const std::unordered_multimap& ranges_with_sources, const std::unordered_set>& source_filters, const sstring& keyspace); public: - static std::unordered_multimap> - get_work_map(const std::unordered_multimap, inet_address>& ranges_with_source_target, + static std::unordered_multimap + get_work_map(const std::unordered_multimap& ranges_with_source_target, const sstring& keyspace); #if 0 @@ -166,7 +166,7 @@ private: std::unordered_set _tokens; inet_address _address; sstring _description; - std::unordered_multimap>>> _to_fetch; + std::unordered_multimap>> _to_fetch; std::unordered_set> _source_filters; stream_plan _stream_plan; }; diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 72727be442..d9892c0b9a 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -117,26 +117,26 @@ void insert_token_range_to_sorted_container_while_unwrapping( const dht::token& prev_tok, const dht::token& tok, - std::vector>& ret) { + std::vector& ret) { if (prev_tok < tok) { ret.emplace_back( - nonwrapping_range::bound(prev_tok, false), - nonwrapping_range::bound(tok, true)); + dht::token_range::bound(prev_tok, false), + dht::token_range::bound(tok, true)); } else { ret.emplace_back( - nonwrapping_range::bound(prev_tok, false), + dht::token_range::bound(prev_tok, false), stdx::nullopt); // Insert in front to maintain sorded order ret.emplace( ret.begin(), stdx::nullopt, - nonwrapping_range::bound(tok, true)); + dht::token_range::bound(tok, true)); } } -std::vector> +std::vector abstract_replication_strategy::get_ranges(inet_address ep) const { - std::vector> ret; + std::vector ret; auto prev_tok = _token_metadata.sorted_tokens().back(); for (auto tok : _token_metadata.sorted_tokens()) { for (inet_address a : calculate_natural_endpoints(tok, _token_metadata)) { @@ -150,9 +150,9 @@ abstract_replication_strategy::get_ranges(inet_address ep) const { return ret; } -std::vector> +std::vector abstract_replication_strategy::get_primary_ranges(inet_address ep) { - std::vector> ret; + std::vector ret; auto prev_tok = _token_metadata.sorted_tokens().back(); for (auto tok : _token_metadata.sorted_tokens()) { auto&& eps = calculate_natural_endpoints(tok, _token_metadata); @@ -164,11 +164,11 @@ abstract_replication_strategy::get_primary_ranges(inet_address ep) { return ret; } -std::unordered_multimap> +std::unordered_multimap abstract_replication_strategy::get_address_ranges(token_metadata& tm) const { - std::unordered_multimap> ret; + std::unordered_multimap ret; for (auto& t : tm.sorted_tokens()) { - std::vector> r = tm.get_primary_ranges_for(t); + std::vector r = tm.get_primary_ranges_for(t); auto eps = calculate_natural_endpoints(t, tm); logger.debug("token={}, primary_range={}, address={}", t, r, eps); for (auto ep : eps) { @@ -180,11 +180,11 @@ abstract_replication_strategy::get_address_ranges(token_metadata& tm) const { return ret; } -std::unordered_multimap, inet_address> +std::unordered_multimap abstract_replication_strategy::get_range_addresses(token_metadata& tm) const { - std::unordered_multimap, inet_address> ret; + std::unordered_multimap ret; for (auto& t : tm.sorted_tokens()) { - std::vector> r = tm.get_primary_ranges_for(t); + std::vector r = tm.get_primary_ranges_for(t); auto eps = calculate_natural_endpoints(t, tm); for (auto ep : eps) { for (auto&& rng : r) @@ -194,14 +194,14 @@ abstract_replication_strategy::get_range_addresses(token_metadata& tm) const { return ret; } -std::vector> +std::vector abstract_replication_strategy::get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address) { return get_pending_address_ranges(tm, std::unordered_set{pending_token}, pending_address); } -std::vector> +std::vector abstract_replication_strategy::get_pending_address_ranges(token_metadata& tm, std::unordered_set pending_tokens, inet_address pending_address) { - std::vector> ret; + std::vector ret; auto temp = tm.clone_only_token_map(); temp.update_normal_tokens(pending_tokens, pending_address); for (auto& x : get_address_ranges(temp)) { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 3c90c7edff..fb349969ef 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -105,22 +105,22 @@ public: // The list is sorted, and its elements are non overlapping and non wrap-around. // It the analogue of Origin's getAddressRanges().get(endpoint). // This function is not efficient, and not meant for the fast path. - std::vector> get_ranges(inet_address ep) const; + std::vector get_ranges(inet_address ep) const; // get_primary_ranges() returns the list of "primary ranges" for the given // endpoint. "Primary ranges" are the ranges that the node is responsible // for storing replica primarily, which means this is the first node // returned calculate_natural_endpoints(). // This function is the analogue of Origin's // StorageService.getPrimaryRangesForEndpoint(). - std::vector> get_primary_ranges(inet_address ep); + std::vector get_primary_ranges(inet_address ep); - std::unordered_multimap> get_address_ranges(token_metadata& tm) const; + std::unordered_multimap get_address_ranges(token_metadata& tm) const; - std::unordered_multimap, inet_address> get_range_addresses(token_metadata& tm) const; + std::unordered_multimap get_range_addresses(token_metadata& tm) const; - std::vector> get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address); + std::vector get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address); - std::vector> get_pending_address_ranges(token_metadata& tm, std::unordered_set pending_tokens, inet_address pending_address); + std::vector get_pending_address_ranges(token_metadata& tm, std::unordered_set pending_tokens, inet_address pending_address); }; } diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index ef0867d1ca..e3a136e5fd 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -329,8 +329,8 @@ token token_metadata::get_predecessor(token t) { } } -std::vector> token_metadata::get_primary_ranges_for(std::unordered_set tokens) { - std::vector> ranges; +std::vector token_metadata::get_primary_ranges_for(std::unordered_set tokens) { + std::vector ranges; ranges.reserve(tokens.size() + 1); // one of the ranges will wrap for (auto right : tokens) { auto left = get_predecessor(right); @@ -342,7 +342,7 @@ std::vector> token_metadata::get_primary_ranges_for(std return ranges; } -std::vector> token_metadata::get_primary_ranges_for(token right) { +std::vector token_metadata::get_primary_ranges_for(token right) { return get_primary_ranges_for(std::unordered_set{right}); } @@ -452,7 +452,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str return; } - std::unordered_multimap> address_ranges = strategy.get_address_ranges(*this); + std::unordered_multimap address_ranges = strategy.get_address_ranges(*this); // FIMXE // Copy of metadata reflecting the situation after all leave operations are finished. diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 7e3fabbb11..fe7d81984a 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -609,9 +609,9 @@ public: } #endif public: - std::vector> get_primary_ranges_for(std::unordered_set tokens); + std::vector get_primary_ranges_for(std::unordered_set tokens); - std::vector> get_primary_ranges_for(token right); + std::vector get_primary_ranges_for(token right); static boost::icl::interval::interval_type range_to_interval(range r); static range interval_to_range(boost::icl::interval::interval_type i); diff --git a/main.cc b/main.cc index b910e3ad9b..b2682735b8 100644 --- a/main.cc +++ b/main.cc @@ -605,7 +605,7 @@ int main(int ac, char** av) { api::set_server_stream_manager(ctx).get(); // Start handling REPAIR_CHECKSUM_RANGE messages net::get_messaging_service().invoke_on_all([&db] (auto& ms) { - ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, nonwrapping_range range, rpc::optional hash_version) { + ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, dht::token_range range, rpc::optional hash_version) { auto hv = hash_version ? *hash_version : repair_checksum::legacy; return do_with(std::move(keyspace), std::move(cf), std::move(range), [&db, hv] (auto& keyspace, auto& cf, auto& range) { diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 001a7256be..fc2ce49618 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -708,7 +708,7 @@ future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, froz // STREAM_MUTATION_DONE void messaging_service::register_stream_mutation_done(std::function (const rpc::client_info& cinfo, - UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func) { + UUID plan_id, std::vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func) { register_handler(this, messaging_verb::STREAM_MUTATION_DONE, [func = std::move(func)] (const rpc::client_info& cinfo, UUID plan_id, std::vector> ranges, @@ -716,7 +716,7 @@ void messaging_service::register_stream_mutation_done(std::function (co return func(cinfo, plan_id, compat::unwrap(std::move(ranges)), cf_id, dst_cpu_id); }); } -future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id) { +future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector ranges, UUID cf_id, unsigned dst_cpu_id) { return send_message_timeout_and_retry(this, messaging_verb::STREAM_MUTATION_DONE, id, streaming_timeout, streaming_nr_retry, streaming_wait_before_retry, plan_id, std::move(ranges), cf_id, dst_cpu_id); @@ -906,14 +906,14 @@ future<> messaging_service::send_replication_finished(msg_addr id, inet_address // Wrapper for REPAIR_CHECKSUM_RANGE void messaging_service::register_repair_checksum_range( std::function (sstring keyspace, - sstring cf, nonwrapping_range range, rpc::optional hash_version)>&& f) { + sstring cf, dht::token_range range, rpc::optional hash_version)>&& f) { register_handler(this, messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(f)); } void messaging_service::unregister_repair_checksum_range() { _rpc->unregister_handler(messaging_verb::REPAIR_CHECKSUM_RANGE); } future messaging_service::send_repair_checksum_range( - msg_addr id, sstring keyspace, sstring cf, ::nonwrapping_range range, repair_checksum hash_version) + msg_addr id, sstring keyspace, sstring cf, ::dht::token_range range, repair_checksum hash_version) { return send_message(this, messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(id), diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 7d25485335..850435de99 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -237,16 +237,16 @@ public: void register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional)>&& func); future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented); - void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); - future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id); + void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, std::vector ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); + future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector ranges, UUID cf_id, unsigned dst_cpu_id); void register_complete_message(std::function (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func); future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id); // Wrapper for REPAIR_CHECKSUM_RANGE verb - void register_repair_checksum_range(std::function (sstring keyspace, sstring cf, nonwrapping_range range, rpc::optional hash_version)>&& func); + void register_repair_checksum_range(std::function (sstring keyspace, sstring cf, dht::token_range range, rpc::optional hash_version)>&& func); void unregister_repair_checksum_range(); - future send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, nonwrapping_range range, repair_checksum hash_version); + future send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, dht::token_range range, repair_checksum hash_version); // Wrapper for GOSSIP_ECHO verb void register_gossip_echo(std::function ()>&& func); diff --git a/partition_range_compat.hh b/partition_range_compat.hh index 93e332b1b7..0f34a6bad7 100644 --- a/partition_range_compat.hh +++ b/partition_range_compat.hh @@ -102,13 +102,13 @@ wrap(std::vector>&& v) { } inline -std::vector> +std::vector unwrap(const std::vector>& v) { return unwrap(v, dht::token_comparator()); } inline -std::vector> +std::vector unwrap(std::vector>&& v) { return unwrap(std::move(v), dht::token_comparator()); } diff --git a/repair/repair.cc b/repair/repair.cc index 063e47b2dd..e4bcb6c1bf 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -42,14 +42,14 @@ static logging::logger logger("repair"); struct failed_range { sstring cf; - ::nonwrapping_range range; + ::dht::token_range range; }; class repair_info { public: seastar::sharded& db; sstring keyspace; - std::vector> ranges; + std::vector ranges; std::vector cfs; int id; std::vector data_centers; @@ -64,7 +64,7 @@ public: public: repair_info(seastar::sharded& db_, const sstring& keyspace_, - const std::vector>& ranges_, + const std::vector& ranges_, const std::vector& cfs_, int id_, const std::vector& data_centers_, @@ -101,7 +101,7 @@ public: } } void request_transfer_ranges(const sstring& cf, - const ::nonwrapping_range& range, + const ::dht::token_range& range, const std::vector& neighbors_in, const std::vector& neighbors_out) { for (const auto& peer : neighbors_in) { @@ -458,7 +458,7 @@ static future checksum_range_shard(database &db, // function is not resolved. future checksum_range(seastar::sharded &db, const sstring& keyspace, const sstring& cf, - const ::nonwrapping_range& range, repair_checksum hash_version) { + const ::dht::token_range& range, repair_checksum hash_version) { auto& schema = db.local().find_column_family(keyspace, cf).schema(); auto shard_ranges = dht::split_range_to_shards(dht::to_partition_range(range), *schema); return do_with(partition_checksum(), std::move(shard_ranges), [&db, &keyspace, &cf, hash_version] (auto& result, auto& shard_ranges) { @@ -478,8 +478,8 @@ future checksum_range(seastar::sharded &db, }); } -static void split_and_add(std::vector<::nonwrapping_range>& ranges, - const nonwrapping_range& range, +static void split_and_add(std::vector<::dht::token_range>& ranges, + const dht::token_range& range, uint64_t estimated_partitions, uint64_t target_partitions) { if (estimated_partitions < target_partitions) { // We're done, the range is small enough to not be split further @@ -515,14 +515,14 @@ static thread_local semaphore parallelism_semaphore(parallelism); // Repair a single cf in a single local range. // Comparable to RepairJob in Origin. static future<> repair_cf_range(repair_info& ri, - sstring cf, ::nonwrapping_range range, + sstring cf, ::dht::token_range range, const std::vector& neighbors) { if (neighbors.empty()) { // Nothing to do in this case... return make_ready_future<>(); } - std::vector<::nonwrapping_range> ranges; + std::vector<::dht::token_range> ranges; ranges.push_back(range); // Additionally, we want to break up large ranges so they will have @@ -538,7 +538,7 @@ static future<> repair_cf_range(repair_info& ri, // FIXME: we should have an on-the-fly iterator generator here, not // fill a vector in advance. - std::vector<::nonwrapping_range> tosplit; + std::vector<::dht::token_range> tosplit; while (estimated_partitions > ri.target_partitions) { tosplit.clear(); ranges.swap(tosplit); @@ -741,24 +741,24 @@ static future<> repair_range(repair_info& ri, auto& range) { }); } -static std::vector> get_ranges_for_endpoint( +static std::vector get_ranges_for_endpoint( database& db, sstring keyspace, gms::inet_address ep) { auto& rs = db.find_keyspace(keyspace).get_replication_strategy(); return rs.get_ranges(ep); } -static std::vector> get_local_ranges( +static std::vector get_local_ranges( database& db, sstring keyspace) { return get_ranges_for_endpoint(db, keyspace, utils::fb_utilities::get_broadcast_address()); } -static std::vector> get_primary_ranges_for_endpoint( +static std::vector get_primary_ranges_for_endpoint( database& db, sstring keyspace, gms::inet_address ep) { auto& rs = db.find_keyspace(keyspace).get_replication_strategy(); return rs.get_primary_ranges(ep); } -static std::vector> get_primary_ranges( +static std::vector get_primary_ranges( database& db, sstring keyspace) { return get_primary_ranges_for_endpoint(db, keyspace, utils::fb_utilities::get_broadcast_address()); @@ -774,7 +774,7 @@ struct repair_options { // If ranges is not empty, it overrides the repair's default heuristics // for determining the list of ranges to repair. In particular, "ranges" // overrides the setting of "primary_range". - std::vector> ranges; + std::vector ranges; // If start_token and end_token are set, they define a range which is // intersected with the ranges actually held by this node to decide what // to repair. @@ -897,7 +897,7 @@ private: // A range is expressed as start_token:end token and multiple ranges can // be given as comma separated ranges(e.g. aaa:bbb,ccc:ddd). - static void ranges_opt(std::vector>& var, + static void ranges_opt(std::vector& var, std::unordered_map& options, const sstring& key) { auto it = options.find(key); @@ -918,7 +918,7 @@ private: auto rng = wrapping_range( ::range::bound(tok_start, false), ::range::bound(tok_end, true)); - compat::unwrap_into(std::move(rng), dht::token_comparator(), [&] (nonwrapping_range&& x) { + compat::unwrap_into(std::move(rng), dht::token_comparator(), [&] (dht::token_range&& x) { var.push_back(std::move(x)); }); } @@ -989,7 +989,7 @@ static int do_repair_start(seastar::sharded& db, sstring keyspace, // local ranges (the token ranges for which this node holds a replica of). // Each of these ranges may have a different set of replicas, so the // repair of each range is performed separately with repair_range(). - std::vector> ranges; + std::vector ranges; if (options.ranges.size()) { ranges = options.ranges; } else if (options.primary_range) { @@ -1032,8 +1032,8 @@ static int do_repair_start(seastar::sharded& db, sstring keyspace, dht::global_partitioner().from_sstring(options.end_token), false); } - nonwrapping_range given_range_complement(tok_end, tok_start); - std::vector> intersections; + dht::token_range given_range_complement(tok_end, tok_start); + std::vector intersections; for (const auto& range : ranges) { auto rs = range.subtract(given_range_complement, dht::token_comparator()); diff --git a/repair/repair.hh b/repair/repair.hh index 7b42dc0308..7562acd2cf 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -107,7 +107,7 @@ public: // not resolved. future checksum_range(seastar::sharded &db, const sstring& keyspace, const sstring& cf, - const ::nonwrapping_range& range, repair_checksum rt); + const ::dht::token_range& range, repair_checksum rt); namespace std { template<> diff --git a/service/storage_service.cc b/service/storage_service.cc index c4d4bb3c69..c14b1d9cd8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -554,12 +554,12 @@ storage_service::get_rpc_address(const inet_address& endpoint) const { return boost::lexical_cast(endpoint); } -std::unordered_map, std::vector> +std::unordered_map> storage_service::get_range_to_address_map(const sstring& keyspace) const { return get_range_to_address_map(keyspace, _token_metadata.sorted_tokens()); } -std::unordered_map, std::vector> +std::unordered_map> storage_service::get_range_to_address_map_in_local_dc( const sstring& keyspace) const { std::function filter = [this](const inet_address& address) { @@ -567,7 +567,7 @@ storage_service::get_range_to_address_map_in_local_dc( }; auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc()); - std::unordered_map, std::vector> filtered_map; + std::unordered_map> filtered_map; for (auto entry : orig_map) { auto& addresses = filtered_map[entry.first]; addresses.reserve(entry.second.size()); @@ -595,7 +595,7 @@ storage_service::is_local_dc(const inet_address& targetHost) const { return remote_dc == local_dc; } -std::unordered_map, std::vector> +std::unordered_map> storage_service::get_range_to_address_map(const sstring& keyspace, const std::vector& sorted_tokens) const { // some people just want to get a visual representation of things. Allow null and set it to the first @@ -2149,7 +2149,7 @@ future<> storage_service::removenode(sstring host_id_string) { // get all ranges that change ownership (that is, a node needs // to take responsibility for new range) - std::unordered_multimap, inet_address> changed_ranges = + std::unordered_multimap changed_ranges = ss.get_changed_ranges_for_leaving(keyspace_name, endpoint); auto& fd = gms::get_local_failure_detector(); for (auto& x: changed_ranges) { @@ -2367,13 +2367,13 @@ future storage_service::is_initialized() { }); } -std::unordered_multimap, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) { +std::unordered_multimap storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) { // First get all ranges the leaving endpoint is responsible for auto ranges = get_ranges_for_endpoint(keyspace_name, endpoint); logger.debug("Node {} ranges [{}]", endpoint, ranges); - std::unordered_map, std::vector> current_replica_endpoints; + std::unordered_map> current_replica_endpoints; // Find (for each range) all nodes that store replicas for these ranges as well auto metadata = _token_metadata.clone_only_token_map(); // don't do this in the loop! #7758 @@ -2392,7 +2392,7 @@ std::unordered_multimap, inet_address> storage_service: temp.remove_endpoint(endpoint); } - std::unordered_multimap, inet_address> changed_ranges; + std::unordered_multimap changed_ranges; // Go through the ranges and for each range check who will be // storing replicas for these ranges when the leaving endpoint @@ -2406,7 +2406,7 @@ std::unordered_multimap, inet_address> storage_service: auto rg = current_replica_endpoints.equal_range(r); for (auto it = rg.first; it != rg.second; it++) { - const nonwrapping_range& range_ = it->first; + const dht::token_range& range_ = it->first; std::vector& current_eps = it->second; logger.debug("range={}, current_replica_endpoints={}, new_replica_endpoints={}", range_, current_eps, new_replica_endpoints); for (auto ep : it->second) { @@ -2433,7 +2433,7 @@ std::unordered_multimap, inet_address> storage_service: // Runs inside seastar::async context void storage_service::unbootstrap() { - std::unordered_map, inet_address>> ranges_to_stream; + std::unordered_map> ranges_to_stream; auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); for (const auto& keyspace_name : non_system_keyspaces) { @@ -2474,21 +2474,21 @@ void storage_service::unbootstrap() { } future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { - std::unordered_multimap>>> ranges_to_fetch; + std::unordered_multimap>> ranges_to_fetch; auto my_address = get_broadcast_address(); auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); for (const auto& keyspace_name : non_system_keyspaces) { - std::unordered_multimap, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint); - std::vector> my_new_ranges; + std::unordered_multimap changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint); + std::vector my_new_ranges; for (auto& x : changed_ranges) { if (x.second == my_address) { my_new_ranges.emplace_back(x.first); } } - std::unordered_multimap> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges); - std::unordered_map>> tmp; + std::unordered_multimap source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges); + std::unordered_map> tmp; for (auto& x : source_ranges) { tmp[x.first].emplace_back(x.second); } @@ -2497,7 +2497,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr auto sp = make_lw_shared("Restore replica count"); for (auto& x: ranges_to_fetch) { const sstring& keyspace_name = x.first; - std::unordered_map>>& maps = x.second; + std::unordered_map>& maps = x.second; for (auto& m : maps) { auto source = m.first; auto ranges = m.second; @@ -2598,9 +2598,9 @@ void storage_service::leave_ring() { } future<> -storage_service::stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace) { +storage_service::stream_ranges(std::unordered_map> ranges_to_stream_by_keyspace) { // First, we build a list of ranges to stream to each host, per table - std::unordered_map>>> sessions_to_stream_by_keyspace; + std::unordered_map>> sessions_to_stream_by_keyspace; for (auto& entry : ranges_to_stream_by_keyspace) { const auto& keyspace = entry.first; auto& ranges_with_endpoints = entry.second; @@ -2609,9 +2609,9 @@ storage_service::stream_ranges(std::unordered_map>> ranges_per_endpoint; + std::unordered_map> ranges_per_endpoint; for (auto& end_point_entry : ranges_with_endpoints) { - nonwrapping_range r = end_point_entry.first; + dht::token_range r = end_point_entry.first; inet_address endpoint = end_point_entry.second; ranges_per_endpoint[endpoint].emplace_back(r); } @@ -2666,7 +2666,7 @@ future<> storage_service::stream_hints() { auto hints_destination_host = candidates.front(); // stream all hints -- range list will be a singleton of "the entire ring" - std::vector> ranges = {nonwrapping_range::make_open_ended_both_sides()}; + std::vector ranges = {dht::token_range::make_open_ended_both_sides()}; logger.debug("stream_hints: ranges={}", ranges); auto sp = make_lw_shared("Hints"); @@ -2836,15 +2836,15 @@ future<> storage_service::shutdown_client_servers() { return do_stop_rpc_server().then([this] { return do_stop_native_transport(); }); } -std::unordered_multimap> -storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::vector>& ranges) { +std::unordered_multimap +storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::vector& ranges) { auto my_address = get_broadcast_address(); auto& fd = gms::get_local_failure_detector(); auto& ks = _db.local().find_keyspace(keyspace_name); auto& strat = ks.get_replication_strategy(); auto tm = _token_metadata.clone_only_token_map(); - std::unordered_multimap, inet_address> range_addresses = strat.get_range_addresses(tm); - std::unordered_multimap> source_ranges; + std::unordered_multimap range_addresses = strat.get_range_addresses(tm); + std::unordered_multimap source_ranges; // find alive sources for our new ranges for (auto r : ranges) { @@ -2873,10 +2873,10 @@ storage_service::get_new_source_ranges(const sstring& keyspace_name, const std:: return source_ranges; } -std::pair>, std::unordered_set>> -storage_service::calculate_stream_and_fetch_ranges(const std::vector>& current, const std::vector>& updated) { - std::unordered_set> to_stream; - std::unordered_set> to_fetch; +std::pair, std::unordered_set> +storage_service::calculate_stream_and_fetch_ranges(const std::vector& current, const std::vector& updated) { + std::unordered_set to_stream; + std::unordered_set to_fetch; for (auto r1 : current) { bool intersect = false; @@ -2917,7 +2917,7 @@ storage_service::calculate_stream_and_fetch_ranges(const std::vector>, std::unordered_set>>(to_stream, to_fetch); + return std::pair, std::unordered_set>(to_stream, to_fetch); } void storage_service::range_relocator::calculate_to_from_streams(std::unordered_set new_tokens, std::vector keyspace_names) { @@ -2937,30 +2937,30 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_ auto& ks = ss._db.local().find_keyspace(keyspace); auto& strategy = ks.get_replication_strategy(); // getting collection of the currently used ranges by this keyspace - std::vector> current_ranges = ss.get_ranges_for_endpoint(keyspace, local_address); + std::vector current_ranges = ss.get_ranges_for_endpoint(keyspace, local_address); // collection of ranges which this node will serve after move to the new token - std::vector> updated_ranges = strategy.get_pending_address_ranges(token_meta_clone, new_token, local_address); + std::vector updated_ranges = strategy.get_pending_address_ranges(token_meta_clone, new_token, local_address); // ring ranges and endpoints associated with them // this used to determine what nodes should we ping about range data - std::unordered_multimap, inet_address> range_addresses = strategy.get_range_addresses(token_meta_clone); - std::unordered_map, std::vector> range_addresses_map; + std::unordered_multimap range_addresses = strategy.get_range_addresses(token_meta_clone); + std::unordered_map> range_addresses_map; for (auto& x : range_addresses) { range_addresses_map[x.first].emplace_back(x.second); } // calculated parts of the ranges to request/stream from/to nodes in the ring // std::pair(to_stream, to_fetch) - std::pair>, std::unordered_set>> ranges_per_keyspace = + std::pair, std::unordered_set> ranges_per_keyspace = ss.calculate_stream_and_fetch_ranges(current_ranges, updated_ranges); /** * In this loop we are going through all ranges "to fetch" and determining * nodes in the ring responsible for data we are interested in */ - std::unordered_multimap, inet_address> ranges_to_fetch_with_preferred_endpoints; - for (nonwrapping_range to_fetch : ranges_per_keyspace.second) { + std::unordered_multimap ranges_to_fetch_with_preferred_endpoints; + for (dht::token_range to_fetch : ranges_per_keyspace.second) { for (auto& x : range_addresses_map) { - const nonwrapping_range& r = x.first; + const dht::token_range& r = x.first; std::vector& eps = x.second; if (r.contains(to_fetch, dht::token_comparator())) { std::vector endpoints; @@ -3023,9 +3023,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_ } // calculating endpoints to stream current ranges to if needed // in some situations node will handle current ranges as part of the new ranges - std::unordered_multimap> endpoint_ranges; - std::unordered_map>> endpoint_ranges_map; - for (nonwrapping_range to_stream : ranges_per_keyspace.first) { + std::unordered_multimap endpoint_ranges; + std::unordered_map> endpoint_ranges_map; + for (dht::token_range to_stream : ranges_per_keyspace.first) { auto end_token = to_stream.end() ? to_stream.end()->value() : dht::maximum_token(); std::vector current_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone); std::vector new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled); @@ -3054,9 +3054,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_ } // stream requests - std::unordered_multimap> work = + std::unordered_multimap work = dht::range_streamer::get_work_map(ranges_to_fetch_with_preferred_endpoints, keyspace); - std::unordered_map>> work_map; + std::unordered_map> work_map; for (auto& x : work) { work_map[x.first].emplace_back(x.second); } @@ -3135,7 +3135,7 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ std::vector ranges; //Token.TokenFactory tf = getPartitioner().getTokenFactory(); - std::unordered_map, std::vector> range_to_address_map = + std::unordered_map> range_to_address_map = include_only_local_dc ? get_range_to_address_map_in_local_dc(keyspace) : get_range_to_address_map(keyspace); @@ -3179,11 +3179,11 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ return ranges; } -std::unordered_map, std::vector> +std::unordered_map> storage_service::construct_range_to_endpoint_map( const sstring& keyspace, - const std::vector>& ranges) const { - std::unordered_map, std::vector> res; + const std::vector& ranges) const { + std::unordered_map> res; for (auto r : ranges) { res[r] = _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints( r.end() ? r.end()->value() : dht::maximum_token()); @@ -3331,16 +3331,16 @@ future<> storage_service::force_remove_completion() { /** * Takes an ordered list of adjacent tokens and divides them in the specified number of ranges. */ -static std::vector, uint64_t>> +static std::vector> calculate_splits(std::vector tokens, uint32_t split_count, column_family& cf) { auto sstables = cf.get_sstables(); const double step = static_cast(tokens.size() - 1) / split_count; auto prev_token_idx = 0; - std::vector, uint64_t>> splits; + std::vector> splits; splits.reserve(split_count); for (uint32_t i = 1; i <= split_count; ++i) { auto index = static_cast(std::round(i * step)); - nonwrapping_range range({{ std::move(tokens[prev_token_idx]), false }}, {{ tokens[index], true }}); + dht::token_range range({{ std::move(tokens[prev_token_idx]), false }}, {{ tokens[index], true }}); // always return an estimate > 0 (see CASSANDRA-7322) uint64_t estimated_keys_for_range = 0; for (auto&& sst : *sstables) { @@ -3352,9 +3352,9 @@ calculate_splits(std::vector tokens, uint32_t split_count, column_fa return splits; }; -std::vector, uint64_t>> +std::vector> storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, range range, uint32_t keys_per_split) { - using range_type = nonwrapping_range; + using range_type = dht::token_range; auto& cf = _db.local().find_column_family(ks_name, cf_name); auto schema = cf.schema(); auto sstables = cf.get_sstables(); @@ -3389,23 +3389,23 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang return calculate_splits(std::move(tokens), split_count, cf); }; -std::vector> +std::vector storage_service::get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const { return _db.local().find_keyspace(name).get_replication_strategy().get_ranges(ep); } -std::vector> +std::vector storage_service::get_all_ranges(const std::vector& sorted_tokens) const { if (sorted_tokens.empty()) - return std::vector>(); + return std::vector(); int size = sorted_tokens.size(); - std::vector> ranges; - ranges.push_back(nonwrapping_range::make_ending_with(range_bound(sorted_tokens[0], true))); + std::vector ranges; + ranges.push_back(dht::token_range::make_ending_with(range_bound(sorted_tokens[0], true))); for (int i = 1; i < size; ++i) { - nonwrapping_range r(range::bound(sorted_tokens[i - 1], false), range::bound(sorted_tokens[i], true)); + dht::token_range r(range::bound(sorted_tokens[i - 1], false), range::bound(sorted_tokens[i], true)); ranges.push_back(r); } - ranges.push_back(nonwrapping_range::make_starting_with(range_bound(sorted_tokens[size-1], false))); + ranges.push_back(dht::token_range::make_starting_with(range_bound(sorted_tokens[size-1], false))); return ranges; } diff --git a/service/storage_service.hh b/service/storage_service.hh index 25366f3dc0..d97efd591a 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -185,7 +185,7 @@ public: } #endif public: - std::vector> get_local_ranges(const sstring& keyspace_name) { + std::vector get_local_ranges(const sstring& keyspace_name) { return get_ranges_for_endpoint(keyspace_name, get_broadcast_address()); } #if 0 @@ -529,16 +529,16 @@ public: return map; } #endif - std::unordered_map, std::vector> get_range_to_address_map(const sstring& keyspace) const; + std::unordered_map> get_range_to_address_map(const sstring& keyspace) const; - std::unordered_map, std::vector> get_range_to_address_map_in_local_dc( + std::unordered_map> get_range_to_address_map_in_local_dc( const sstring& keyspace) const; std::vector get_tokens_in_local_dc() const; bool is_local_dc(const inet_address& targetHost) const; - std::unordered_map, std::vector> get_range_to_address_map(const sstring& keyspace, + std::unordered_map> get_range_to_address_map(const sstring& keyspace, const std::vector& sorted_tokens) const; /** @@ -596,9 +596,9 @@ public: * @param ranges * @return mapping of ranges to the replicas responsible for them. */ - std::unordered_map, std::vector> construct_range_to_endpoint_map( + std::unordered_map> construct_range_to_endpoint_map( const sstring& keyspace, - const std::vector>& ranges) const; + const std::vector& ranges) const; public: virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, const gms::versioned_value& new_value) override; @@ -757,7 +757,7 @@ private: * @param ranges the ranges to find sources for * @return multimap of addresses to ranges the address is responsible for */ - std::unordered_multimap> get_new_source_ranges(const sstring& keyspaceName, const std::vector>& ranges); + std::unordered_multimap get_new_source_ranges(const sstring& keyspaceName, const std::vector& ranges); public: future<> confirm_replication(inet_address node); @@ -783,7 +783,7 @@ private: future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint); // needs to be modified to accept either a keyspace or ARS. - std::unordered_multimap, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint); + std::unordered_multimap get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint); public: /** raw load value */ double get_load(); @@ -1600,7 +1600,7 @@ public: * @param ep endpoint we are interested in. * @return ranges for the specified endpoint. */ - std::vector> get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const; + std::vector get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const; /** * Get all ranges that span the ring given a set @@ -1608,7 +1608,7 @@ public: * ranges. * @return ranges in sorted order */ - std::vector> get_all_ranges(const std::vector& sorted_tokens) const; + std::vector get_all_ranges(const std::vector& sorted_tokens) const; /** * This method returns the N endpoints that are responsible for storing the * specified key i.e for replication. @@ -1713,7 +1713,7 @@ public: * @return Vector of Token ranges (_not_ keys!) together with estimated key count, * breaking up the data this node is responsible for into pieces of roughly keys_per_split */ - std::vector, uint64_t>> get_splits(const sstring& ks_name, + std::vector> get_splits(const sstring& ks_name, const sstring& cf_name, range range, uint32_t keys_per_split); @@ -1982,7 +1982,7 @@ private: * @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each * @return async Future for whether stream was success */ - future<> stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace); + future<> stream_ranges(std::unordered_map> ranges_to_stream_by_keyspace); public: /** @@ -1993,8 +1993,8 @@ public: * @param updated collection of the ranges after token is changed * @return pair of ranges to stream/fetch for given current and updated range collections */ - std::pair>, std::unordered_set>> - calculate_stream_and_fetch_ranges(const std::vector>& current, const std::vector>& updated); + std::pair, std::unordered_set> + calculate_stream_and_fetch_ranges(const std::vector& current, const std::vector& updated); #if 0 public void bulkLoad(String directory) { diff --git a/sstables/compaction.cc b/sstables/compaction.cc index f0e94f0c7f..65957d8123 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -99,7 +99,7 @@ static api::timestamp_type get_max_purgeable_timestamp(const column_family& cf, return timestamp; } -static bool belongs_to_current_node(const dht::token& t, const std::vector>& sorted_owned_ranges) { +static bool belongs_to_current_node(const dht::token& t, const std::vector& sorted_owned_ranges) { auto low = std::lower_bound(sorted_owned_ranges.begin(), sorted_owned_ranges.end(), t, [] (const range& a, const dht::token& b) { // check that range a is before token b. @@ -107,7 +107,7 @@ static bool belongs_to_current_node(const dht::token& t, const std::vector& r = *low; + const dht::token_range& r = *low; return r.contains(t, dht::token_comparator()); } @@ -272,7 +272,7 @@ compact_sstables(std::vector sstables, column_family& cf, std::f info->cf = schema->cf_name(); logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg); - std::vector> owned_ranges; + std::vector owned_ranges; if (cleanup) { owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name()); } diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 8d80dd4669..5885712325 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -62,7 +62,7 @@ extern logging::logger logger; class incremental_selector_impl { public: virtual ~incremental_selector_impl() {} - virtual std::pair, std::vector> select(const dht::token& token) = 0; + virtual std::pair> select(const dht::token& token) = 0; }; class sstable_set_impl { @@ -173,8 +173,8 @@ public: incremental_selector(const std::vector& sstables) : _sstables(sstables) { } - virtual std::pair, std::vector> select(const dht::token& token) override { - return std::make_pair(nonwrapping_range::make_open_ended_both_sides(), _sstables); + virtual std::pair> select(const dht::token& token) override { + return std::make_pair(dht::token_range::make_open_ended_both_sides(), _sstables); } }; @@ -267,8 +267,8 @@ class partitioned_sstable_set::incremental_selector : public incremental_selecto map_iterator _it; const map_iterator _end; private: - static nonwrapping_range to_token_range(const interval_type& i) { - return nonwrapping_range::make({i.lower().token(), boost::icl::is_left_closed(i.bounds())}, + static dht::token_range to_token_range(const interval_type& i) { + return dht::token_range::make({i.lower().token(), boost::icl::is_left_closed(i.bounds())}, {i.upper().token(), boost::icl::is_right_closed(i.bounds())}); } public: @@ -277,7 +277,7 @@ public: , _it(sstables.begin()) , _end(sstables.end()) { } - virtual std::pair, std::vector> select(const dht::token& token) override { + virtual std::pair> select(const dht::token& token) override { auto pr = query::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token)); auto interval = make_interval(*_schema, std::move(pr)); @@ -287,12 +287,12 @@ public: } // we don't want to skip current interval if token lies before it. if (boost::icl::lower_less(interval, _it->first)) { - return std::make_pair(nonwrapping_range::make({token, true}, {_it->first.lower().token(), false}), + return std::make_pair(dht::token_range::make({token, true}, {_it->first.lower().token(), false}), std::vector()); } _it++; } - return std::make_pair(nonwrapping_range::make_open_ended_both_sides(), std::vector()); + return std::make_pair(dht::token_range::make_open_ended_both_sides(), std::vector()); } }; diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index c0f39aacce..280fd929a7 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -53,7 +53,7 @@ public: // selector is used. class incremental_selector { std::unique_ptr _impl; - mutable stdx::optional> _current_token_range; + mutable stdx::optional _current_token_range; mutable std::vector _current_sstables; public: ~incremental_selector(); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 0bf5813ff2..b6b7826484 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2545,7 +2545,7 @@ void sstable::mark_sstable_for_deletion(const schema_ptr& schema, sstring dir, i * Returns a pair of positions [p1, p2) in the summary file corresponding to entries * covered by the specified range, or a disengaged optional if no such pair exists. */ -stdx::optional> sstable::get_sample_indexes_for_range(const nonwrapping_range& range) { +stdx::optional> sstable::get_sample_indexes_for_range(const dht::token_range& range) { auto entries_size = _summary.entries.size(); auto search = [this](bool before, const dht::token& token) { auto kind = before ? key::kind::before_all_keys : key::kind::after_all_keys; @@ -2575,7 +2575,7 @@ stdx::optional> sstable::get_sample_indexes_for_ra return stdx::nullopt; } -std::vector sstable::get_key_samples(const schema& s, const nonwrapping_range& range) { +std::vector sstable::get_key_samples(const schema& s, const dht::token_range& range) { auto index_range = get_sample_indexes_for_range(range); std::vector res; if (index_range) { @@ -2587,7 +2587,7 @@ std::vector sstable::get_key_samples(const schema& s, const return res; } -uint64_t sstable::estimated_keys_for_range(const nonwrapping_range& range) { +uint64_t sstable::estimated_keys_for_range(const dht::token_range& range) { auto sample_index_range = get_sample_indexes_for_range(range); uint64_t sample_key_count = sample_index_range ? sample_index_range->second - sample_index_range->first : 0; // adjust for the current sampling level diff --git a/sstables/sstables.hh b/sstables/sstables.hh index c2e1cf9280..ac457ded4e 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -294,9 +294,9 @@ public: _summary.header.min_index_interval; } - uint64_t estimated_keys_for_range(const nonwrapping_range& range); + uint64_t estimated_keys_for_range(const dht::token_range& range); - std::vector get_key_samples(const schema& s, const nonwrapping_range& range); + std::vector get_key_samples(const schema& s, const dht::token_range& range); // mark_for_deletion() specifies that a sstable isn't relevant to the // current shard, and thus can be deleted by the deletion manager, if @@ -599,7 +599,7 @@ private: } void write_collection(file_writer& out, const composite& clustering_key, const column_definition& cdef, collection_mutation_view collection); - stdx::optional> get_sample_indexes_for_range(const nonwrapping_range& range); + stdx::optional> get_sample_indexes_for_range(const dht::token_range& range); public: future<> read_toc(); diff --git a/streaming/stream_plan.cc b/streaming/stream_plan.cc index 05f8cdf45e..d5177729f3 100644 --- a/streaming/stream_plan.cc +++ b/streaming/stream_plan.cc @@ -44,22 +44,22 @@ namespace streaming { extern logging::logger sslog; -stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector> ranges) { +stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector ranges) { return request_ranges(from, keyspace, std::move(ranges), {}); } -stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector> ranges, std::vector column_families) { +stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector ranges, std::vector column_families) { _range_added = true; auto session = _coordinator->get_or_create_session(from); session->add_stream_request(keyspace, std::move(ranges), std::move(column_families)); return *this; } -stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges) { +stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector ranges) { return transfer_ranges(to, keyspace, std::move(ranges), {}); } -stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges, std::vector column_families) { +stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector ranges, std::vector column_families) { _range_added = true; auto session = _coordinator->get_or_create_session(to); session->add_transfer_ranges(keyspace, std::move(ranges), std::move(column_families)); diff --git a/streaming/stream_plan.hh b/streaming/stream_plan.hh index 90959e2942..a57ab9a001 100644 --- a/streaming/stream_plan.hh +++ b/streaming/stream_plan.hh @@ -90,7 +90,7 @@ public: * @param ranges ranges to fetch * @return this object for chaining */ - stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector> ranges); + stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector ranges); /** * Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node. @@ -102,7 +102,7 @@ public: * @param columnFamilies specific column families * @return this object for chaining */ - stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector> ranges, std::vector column_families); + stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector ranges, std::vector column_families); /** * Add transfer task to send data of specific keyspace and ranges. @@ -113,7 +113,7 @@ public: * @param ranges ranges to send * @return this object for chaining */ - stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges); + stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector ranges); /** * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. @@ -125,7 +125,7 @@ public: * @param columnFamilies specific column families * @return this object for chaining */ - stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges, std::vector column_families); + stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector ranges, std::vector column_families); stream_plan& listeners(std::vector handlers); public: diff --git a/streaming/stream_request.hh b/streaming/stream_request.hh index d5a608a2b4..b77e721344 100644 --- a/streaming/stream_request.hh +++ b/streaming/stream_request.hh @@ -50,14 +50,14 @@ class stream_request { public: using token = dht::token; sstring keyspace; - std::vector> ranges; + std::vector ranges; // For compatibility with <= 1.5, we send wrapping ranges (though they will never wrap). std::vector> ranges_compat() const { return compat::wrap(ranges); } std::vector column_families; stream_request() = default; - stream_request(sstring _keyspace, std::vector> _ranges, std::vector _column_families) + stream_request(sstring _keyspace, std::vector _ranges, std::vector _column_families) : keyspace(std::move(_keyspace)) , ranges(std::move(_ranges)) , column_families(std::move(_column_families)) { diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 991ea760be..af33ae016d 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -146,7 +146,7 @@ void stream_session::init_messaging_service_handler() { }); }); }); - ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id) { + ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, std::vector ranges, UUID cf_id, unsigned dst_cpu_id) { const auto& from = cinfo.retrieve_auxiliary("baddr"); return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable { auto session = get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id); @@ -415,7 +415,7 @@ std::vector stream_session::get_column_family_stores(const sstri return stores; } -void stream_session::add_transfer_ranges(sstring keyspace, std::vector> ranges, std::vector column_families) { +void stream_session::add_transfer_ranges(sstring keyspace, std::vector ranges, std::vector column_families) { auto cfs = get_column_family_stores(keyspace, column_families); for (auto& cf : cfs) { auto cf_id = cf->schema()->id(); diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index fc253d58ac..286c00ef31 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -238,7 +238,7 @@ public: * @param ranges Ranges to retrieve data * @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace. */ - void add_stream_request(sstring keyspace, std::vector> ranges, std::vector column_families) { + void add_stream_request(sstring keyspace, std::vector ranges, std::vector column_families) { _requests.emplace_back(std::move(keyspace), std::move(ranges), std::move(column_families)); } @@ -253,7 +253,7 @@ public: * @param flushTables flush tables? * @param repairedAt the time the repair started. */ - void add_transfer_ranges(sstring keyspace, std::vector> ranges, std::vector column_families); + void add_transfer_ranges(sstring keyspace, std::vector ranges, std::vector column_families); std::vector get_column_family_stores(const sstring& keyspace, const std::vector& column_families); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 40001969c6..5cd59baff2 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -57,7 +57,7 @@ namespace streaming { extern logging::logger sslog; -stream_transfer_task::stream_transfer_task(shared_ptr session, UUID cf_id, std::vector> ranges, long total_size) +stream_transfer_task::stream_transfer_task(shared_ptr session, UUID cf_id, std::vector ranges, long total_size) : stream_task(session, cf_id) , _ranges(std::move(ranges)) , _total_size(total_size) { @@ -167,13 +167,13 @@ void stream_transfer_task::start() { }); } -void stream_transfer_task::append_ranges(const std::vector>& ranges) { +void stream_transfer_task::append_ranges(const std::vector& ranges) { _ranges.insert(_ranges.end(), ranges.begin(), ranges.end()); } void stream_transfer_task::sort_and_merge_ranges() { boost::icl::interval_set myset; - std::vector> ranges; + std::vector ranges; sslog.debug("cf_id = {}, before ranges = {}, size={}", cf_id, _ranges, _ranges.size()); _ranges.swap(ranges); for (auto& range : ranges) { @@ -185,7 +185,7 @@ void stream_transfer_task::sort_and_merge_ranges() { ranges.shrink_to_fit(); for (auto& i : myset) { auto r = locator::token_metadata::interval_to_range(i); - _ranges.push_back(nonwrapping_range(r)); + _ranges.push_back(dht::token_range(r)); } sslog.debug("cf_id = {}, after ranges = {}, size={}", cf_id, _ranges, _ranges.size()); } diff --git a/streaming/stream_transfer_task.hh b/streaming/stream_transfer_task.hh index 8fcc7660b3..76e9326255 100644 --- a/streaming/stream_transfer_task.hh +++ b/streaming/stream_transfer_task.hh @@ -58,13 +58,13 @@ private: int32_t sequence_number = 0; bool aborted = false; // A stream_transfer_task always contains the same range to stream - std::vector> _ranges; + std::vector _ranges; std::map> _shard_ranges; long _total_size; public: using UUID = utils::UUID; stream_transfer_task(stream_transfer_task&&) = default; - stream_transfer_task(shared_ptr session, UUID cf_id, std::vector> ranges, long total_size = 0); + stream_transfer_task(shared_ptr session, UUID cf_id, std::vector ranges, long total_size = 0); ~stream_transfer_task(); public: virtual void abort() override { @@ -80,7 +80,7 @@ public: void start(); - void append_ranges(const std::vector>& ranges); + void append_ranges(const std::vector& ranges); void sort_and_merge_ranges(); }; diff --git a/tests/nonwrapping_range_test.cc b/tests/nonwrapping_range_test.cc index 99007467b2..c4885ee924 100644 --- a/tests/nonwrapping_range_test.cc +++ b/tests/nonwrapping_range_test.cc @@ -251,7 +251,7 @@ auto get_item(std::string left, std::string right, std::string val) { using value_type = std::unordered_set; auto l = dht::global_partitioner().from_sstring(left); auto r = dht::global_partitioner().from_sstring(right); - auto rg = nonwrapping_range({{l, false}}, {r}); + auto rg = dht::token_range({{l, false}}, {r}); value_type v{val}; return std::make_pair(locator::token_metadata::range_to_interval(rg), v); } @@ -277,7 +277,7 @@ BOOST_AUTO_TEST_CASE(test_range_interval_map) { auto search_item = [&mymap] (std::string val) { auto tok = dht::global_partitioner().from_sstring(val); - auto search = nonwrapping_range(tok); + auto search = dht::token_range(tok); auto it = mymap.find(locator::token_metadata::range_to_interval(search)); if (it != mymap.end()) { std::cout << "Found OK:" << " token = " << tok << " in range: " << it->first << "\n"; diff --git a/thrift/handler.cc b/thrift/handler.cc index 5b613d5cfe..ca9b6ca4c5 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -722,7 +722,7 @@ public: void describe_splits_ex(tcxx::function const& _return)> cob, tcxx::function exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) { with_cob(std::move(cob), std::move(exn_cob), [&]{ - std::vector> ranges; + std::vector ranges; auto tstart = start_token.empty() ? dht::minimum_token() : dht::global_partitioner().from_sstring(sstring(start_token)); auto tend = end_token.empty() ? dht::maximum_token() : dht::global_partitioner().from_sstring(sstring(end_token)); range r({{ std::move(tstart), false }}, {{ std::move(tend), true }});