From a35136533d9fb080979f4cdacfde02935d63aad2 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 2 Nov 2016 18:58:25 +0200 Subject: [PATCH] Convert ring_position and token ranges to be nonwrapping Wrapping ranges are a pain, so we are moving wrap handling to the edges. Since cql can't generate wrapping ranges, this means thrift and the ring maintenance code; also range->ring transformations need to merge the first and last ranges. Message-Id: <1478105905-31613-1-git-send-email-avi@scylladb.com> --- database.cc | 17 +-- db/size_estimates_recorder.cc | 26 +--- dht/boot_strapper.cc | 2 +- dht/i_partitioner.cc | 6 +- dht/i_partitioner.hh | 2 +- dht/range_streamer.cc | 34 ++--- dht/range_streamer.hh | 20 +-- idl/streaming.idl.hh | 4 +- locator/abstract_replication_strategy.cc | 99 ++++++------- locator/abstract_replication_strategy.hh | 12 +- locator/token_metadata.cc | 21 ++- locator/token_metadata.hh | 4 +- main.cc | 2 +- memtable.cc | 4 - message/messaging_service.cc | 22 ++- message/messaging_service.hh | 22 ++- partition_range_compat.hh | 175 ++++++++++++++++++++++ query-request.hh | 7 +- range.hh | 9 ++ repair/repair.cc | 55 ++++--- repair/repair.hh | 2 +- row_cache.cc | 12 +- service/storage_proxy.cc | 64 ++++---- service/storage_proxy.hh | 18 ++- service/storage_service.cc | 78 +++++----- service/storage_service.hh | 50 ++++--- sstables/compaction.cc | 6 +- sstables/partition.cc | 3 - streaming/stream_plan.cc | 8 +- streaming/stream_plan.hh | 8 +- streaming/stream_request.hh | 14 +- streaming/stream_session.cc | 4 +- streaming/stream_session.hh | 4 +- streaming/stream_transfer_task.cc | 4 +- streaming/stream_transfer_task.hh | 6 +- tests/mutation_source_test.cc | 2 +- tests/range_test.cc | 178 ----------------------- tests/row_cache_test.cc | 44 ------ tests/sstable_mutation_test.cc | 2 +- tests/storage_proxy_test.cc | 13 -- 40 files changed, 507 insertions(+), 556 deletions(-) create mode 100644 partition_range_compat.hh diff --git a/database.cc b/database.cc index 7d677c8fc1..9dd008c463 100644 --- a/database.cc +++ b/database.cc @@ -572,11 +572,6 @@ column_family::make_reader(schema_ptr s, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state) const { - if (query::is_wrap_around(range, *s)) { - // make_combined_reader() can't handle streams that wrap around yet. - fail(unimplemented::cause::WRAP_AROUND); - } - std::vector readers; readers.reserve(_memtables->size() + 1); @@ -618,10 +613,6 @@ column_family::make_streaming_reader(schema_ptr s, const query::partition_range& range) const { auto& slice = query::full_slice; auto& pc = service::get_local_streaming_read_priority(); - if (query::is_wrap_around(range, *s)) { - // make_combined_reader() can't handle streams that wrap around yet. - fail(unimplemented::cause::WRAP_AROUND); - } std::vector readers; readers.reserve(_memtables->size() + 1); @@ -1357,13 +1348,13 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool } static bool needs_cleanup(const lw_shared_ptr& sst, - const lw_shared_ptr>>& owned_ranges, + const lw_shared_ptr>>& owned_ranges, schema_ptr s) { auto first = sst->get_first_partition_key(); auto last = sst->get_last_partition_key(); auto first_token = dht::global_partitioner().get_token(*s, first); auto last_token = dht::global_partitioner().get_token(*s, last); - range sst_token_range = range::make(first_token, last_token); + nonwrapping_range sst_token_range = nonwrapping_range::make(first_token, last_token); // return true iff sst partition range isn't fully contained in any of the owned ranges. for (auto& r : *owned_ranges) { @@ -1375,8 +1366,8 @@ static bool needs_cleanup(const lw_shared_ptr& sst, } future<> column_family::cleanup_sstables(sstables::compaction_descriptor descriptor) { - std::vector> r = service::get_local_storage_service().get_local_ranges(_schema->ks_name()); - auto owned_ranges = make_lw_shared>>(std::move(r)); + std::vector> r = service::get_local_storage_service().get_local_ranges(_schema->ks_name()); + auto owned_ranges = make_lw_shared>>(std::move(r)); auto sstables_to_cleanup = make_lw_shared>(std::move(descriptor.sstables)); return parallel_for_each(*sstables_to_cleanup, [this, owned_ranges = std::move(owned_ranges), sstables_to_cleanup] (auto& sst) { diff --git a/db/size_estimates_recorder.cc b/db/size_estimates_recorder.cc index 1030d10555..de1a39601d 100644 --- a/db/size_estimates_recorder.cc +++ b/db/size_estimates_recorder.cc @@ -71,25 +71,16 @@ static std::vector estimates_for(const col std::vector estimates; estimates.reserve(local_ranges.size()); - std::vector unwrapped; - // Each range defines both bounds. - for (auto& range : local_ranges) { + // Each range defines both bounds (with the call to compat::wrap) + for (auto& range : compat::wrap(local_ranges)) { int64_t count{0}; utils::estimated_histogram hist{0}; - unwrapped.clear(); - if (range.is_wrap_around(dht::ring_position_comparator(*cf.schema()))) { - auto uw = range.unwrap(); - unwrapped.push_back(std::move(uw.first)); - unwrapped.push_back(std::move(uw.second)); - } else { - unwrapped.push_back(range); - } - for (auto&& uwr : unwrapped) { - for (auto&& sstable : cf.select_sstables(uwr)) { - nonwrapping_range r(std::move(uwr).transform([](auto&& rp) { return rp.token(); })); - count += sstable->estimated_keys_for_range(r); - hist.merge(sstable->get_stats_metadata().estimated_row_size); - } + for (auto uwr : std::vector(compat::unwrap(range, *cf.schema()))) { + for (auto&& sstable : cf.select_sstables(uwr)) { + nonwrapping_range r(std::move(uwr).transform([](auto&& rp) { return rp.token(); })); + count += sstable->estimated_keys_for_range(r); + hist.merge(sstable->get_stats_metadata().estimated_row_size); + } } estimates.emplace_back(db::system_keyspace::range_estimates{ range.start()->value().token(), @@ -97,7 +88,6 @@ static std::vector estimates_for(const col count, count > 0 ? hist.mean() : 0}); } - return estimates; } diff --git a/dht/boot_strapper.cc b/dht/boot_strapper.cc index 6fd5dfd7f2..fec9ca8a0d 100644 --- a/dht/boot_strapper.cc +++ b/dht/boot_strapper.cc @@ -54,7 +54,7 @@ future<> boot_strapper::bootstrap() { for (const auto& keyspace_name : _db.local().get_non_system_keyspaces()) { auto& ks = _db.local().find_keyspace(keyspace_name); auto& strategy = ks.get_replication_strategy(); - std::vector> ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address); + std::vector> ranges = strategy.get_pending_address_ranges(_token_metadata, _tokens, _address); logger.debug("Will stream keyspace={}, ranges={}", keyspace_name, ranges); streamer->add_ranges(keyspace_name, ranges); } diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index b76b777094..9602e9780a 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -297,9 +297,9 @@ int ring_position::tri_compare(const schema& s, const ring_position& o) const { } } -range -to_partition_range(range r) { - using bound_opt = std::experimental::optional::bound>; +nonwrapping_range +to_partition_range(nonwrapping_range r) { + using bound_opt = std::experimental::optional::bound>; auto start = r.start() ? bound_opt(dht::ring_position(r.start()->value(), r.start()->is_inclusive() diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index 41dc1553c6..19f603b1b0 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -422,7 +422,7 @@ i_partitioner& global_partitioner(); unsigned shard_of(const token&); -range to_partition_range(range); +nonwrapping_range to_partition_range(nonwrapping_range); } // dht diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index c1a3341624..d8c79291d4 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -53,22 +53,22 @@ logging::logger logger("range_streamer"); using inet_address = gms::inet_address; -static std::unordered_map, std::unordered_set> -unordered_multimap_to_unordered_map(const std::unordered_multimap, inet_address>& multimap) { - std::unordered_map, std::unordered_set> ret; +static std::unordered_map, std::unordered_set> +unordered_multimap_to_unordered_map(const std::unordered_multimap, inet_address>& multimap) { + std::unordered_map, std::unordered_set> ret; for (auto x : multimap) { ret[x.first].emplace(x.second); } return ret; } -std::unordered_multimap> -range_streamer::get_range_fetch_map(const std::unordered_multimap, inet_address>& ranges_with_sources, +std::unordered_multimap> +range_streamer::get_range_fetch_map(const std::unordered_multimap, inet_address>& ranges_with_sources, const std::unordered_set>& source_filters, const sstring& keyspace) { - std::unordered_multimap> range_fetch_map_map; + std::unordered_multimap> range_fetch_map_map; for (auto x : unordered_multimap_to_unordered_map(ranges_with_sources)) { - const range& range_ = x.first; + const nonwrapping_range& range_ = x.first; const std::unordered_set& addresses = x.second; bool found_source = false; for (auto address : addresses) { @@ -103,8 +103,8 @@ range_streamer::get_range_fetch_map(const std::unordered_multimap, return range_fetch_map_map; } -std::unordered_multimap, inet_address> -range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector> desired_ranges) { +std::unordered_multimap, inet_address> +range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector> desired_ranges) { logger.debug("{} ks={}", __func__, keyspace_name); auto& ks = _db.local().find_keyspace(keyspace_name); @@ -113,7 +113,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, st auto tm = _metadata.clone_only_token_map(); auto range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(tm)); - std::unordered_multimap, inet_address> range_sources; + std::unordered_multimap, inet_address> range_sources; auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); for (auto& desired_range : desired_ranges) { auto found = false; @@ -137,8 +137,8 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, st return range_sources; } -std::unordered_multimap, inet_address> -range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector> desired_ranges) { +std::unordered_multimap, inet_address> +range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector> desired_ranges) { logger.debug("{} ks={}", __func__, keyspace_name); assert (_tokens.empty() == false); @@ -154,7 +154,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n auto pending_range_addresses = unordered_multimap_to_unordered_map(strat.get_range_addresses(metadata_clone)); //Collects the source that will have its range moved to the new node - std::unordered_multimap, inet_address> range_sources; + std::unordered_multimap, inet_address> range_sources; for (auto& desired_range : desired_ranges) { for (auto& x : range_addresses) { @@ -211,7 +211,7 @@ bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name) && _metadata.get_all_endpoints().size() != strat.get_replication_factor(); } -void range_streamer::add_ranges(const sstring& keyspace_name, std::vector> ranges) { +void range_streamer::add_ranges(const sstring& keyspace_name, std::vector> ranges) { auto ranges_for_keyspace = use_strict_sources_for_ranges(keyspace_name) ? get_all_ranges_with_strict_sources_for(keyspace_name, ranges) : get_all_ranges_with_sources_for(keyspace_name, ranges); @@ -222,7 +222,7 @@ void range_streamer::add_ranges(const sstring& keyspace_name, std::vector>> range_fetch_map; + std::unordered_map>> range_fetch_map; for (auto& x : get_range_fetch_map(ranges_for_keyspace, _source_filters, keyspace_name)) { range_fetch_map[x.first].emplace_back(x.second); } @@ -252,8 +252,8 @@ future range_streamer::fetch_async() { return _stream_plan.execute(); } -std::unordered_multimap> -range_streamer::get_work_map(const std::unordered_multimap, inet_address>& ranges_with_source_target, +std::unordered_multimap> +range_streamer::get_work_map(const std::unordered_multimap, inet_address>& ranges_with_source_target, const sstring& keyspace) { auto filter = std::make_unique(gms::get_local_failure_detector()); std::unordered_set> source_filters; diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index b35ac8a314..ee88c051ce 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -118,22 +118,22 @@ public: _source_filters.emplace(std::move(filter)); } - void add_ranges(const sstring& keyspace_name, std::vector> ranges); + void add_ranges(const sstring& keyspace_name, std::vector> ranges); private: bool use_strict_sources_for_ranges(const sstring& keyspace_name); /** * Get a map of all ranges and their respective sources that are candidates for streaming the given ranges * to us. For each range, the list of sources is sorted by proximity relative to the given destAddress. */ - std::unordered_multimap, inet_address> - get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector> desired_ranges); + std::unordered_multimap, inet_address> + get_all_ranges_with_sources_for(const sstring& keyspace_name, std::vector> desired_ranges); /** * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges. * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating * consistency. */ - std::unordered_multimap, inet_address> - get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector> desired_ranges); + std::unordered_multimap, inet_address> + get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, std::vector> desired_ranges); private: /** * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value) @@ -141,14 +141,14 @@ private: * here, we always exclude ourselves. * @return */ - static std::unordered_multimap> - get_range_fetch_map(const std::unordered_multimap, inet_address>& ranges_with_sources, + static std::unordered_multimap> + get_range_fetch_map(const std::unordered_multimap, inet_address>& ranges_with_sources, const std::unordered_set>& source_filters, const sstring& keyspace); public: - static std::unordered_multimap> - get_work_map(const std::unordered_multimap, inet_address>& ranges_with_source_target, + static std::unordered_multimap> + get_work_map(const std::unordered_multimap, inet_address>& ranges_with_source_target, const sstring& keyspace); #if 0 @@ -166,7 +166,7 @@ private: std::unordered_set _tokens; inet_address _address; sstring _description; - std::unordered_multimap>>> _to_fetch; + std::unordered_multimap>>> _to_fetch; std::unordered_set> _source_filters; stream_plan _stream_plan; }; diff --git a/idl/streaming.idl.hh b/idl/streaming.idl.hh index 46000cfcad..da3e1ebdcc 100644 --- a/idl/streaming.idl.hh +++ b/idl/streaming.idl.hh @@ -23,7 +23,9 @@ namespace streaming { class stream_request { sstring keyspace; - std::vector> ranges; + // For compatibility with <= 1.5, we use wrapping ranges + // (though we never send wraparounds; only allow receiving them) + std::vector> ranges_compat(); std::vector column_families; }; diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 96af978503..72727be442 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -22,6 +22,7 @@ #include "locator/abstract_replication_strategy.hh" #include "utils/class_registrator.hh" #include "exceptions/exceptions.hh" +#include "stdx.hh" namespace locator { @@ -39,19 +40,6 @@ abstract_replication_strategy::abstract_replication_strategy( , _snitch(snitch) , _my_type(my_type) {} -static void unwrap_first_range(std::vector>& ret) { - if (ret.empty()) { - return; - } - // Make ret contain no wrap-around range by unwrapping the first element. - auto& r = ret.front(); - if (r.is_wrap_around(dht::token_comparator())) { - auto split_ranges = r.unwrap(); - r = std::move(split_ranges.first); - ret.push_back(std::move(split_ranges.second)); - } -} - std::unique_ptr abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, const std::map& config_options) { assert(locator::i_endpoint_snitch::get_local_snitch_ptr()); try { @@ -124,95 +112,96 @@ abstract_replication_strategy::get_cached_endpoints() { return _cached_endpoints; } -std::vector> +static +void +insert_token_range_to_sorted_container_while_unwrapping( + const dht::token& prev_tok, + const dht::token& tok, + std::vector>& ret) { + if (prev_tok < tok) { + ret.emplace_back( + nonwrapping_range::bound(prev_tok, false), + nonwrapping_range::bound(tok, true)); + } else { + ret.emplace_back( + nonwrapping_range::bound(prev_tok, false), + stdx::nullopt); + // Insert in front to maintain sorded order + ret.emplace( + ret.begin(), + stdx::nullopt, + nonwrapping_range::bound(tok, true)); + } +} + +std::vector> abstract_replication_strategy::get_ranges(inet_address ep) const { - std::vector> ret; + std::vector> ret; auto prev_tok = _token_metadata.sorted_tokens().back(); for (auto tok : _token_metadata.sorted_tokens()) { for (inet_address a : calculate_natural_endpoints(tok, _token_metadata)) { if (a == ep) { - ret.emplace_back( - range::bound(prev_tok, false), - range::bound(tok, true)); + insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret); break; } } prev_tok = tok; } - unwrap_first_range(ret); return ret; } -std::vector> +std::vector> abstract_replication_strategy::get_primary_ranges(inet_address ep) { - std::vector> ret; + std::vector> ret; auto prev_tok = _token_metadata.sorted_tokens().back(); for (auto tok : _token_metadata.sorted_tokens()) { auto&& eps = calculate_natural_endpoints(tok, _token_metadata); if (eps.size() > 0 && eps[0] == ep) { - ret.emplace_back( - range::bound(prev_tok, false), - range::bound(tok, true)); + insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret); } prev_tok = tok; } - unwrap_first_range(ret); return ret; } -std::unordered_multimap> +std::unordered_multimap> abstract_replication_strategy::get_address_ranges(token_metadata& tm) const { - std::unordered_multimap> ret; + std::unordered_multimap> ret; for (auto& t : tm.sorted_tokens()) { - range r = tm.get_primary_range_for(t); + std::vector> r = tm.get_primary_ranges_for(t); auto eps = calculate_natural_endpoints(t, tm); logger.debug("token={}, primary_range={}, address={}", t, r, eps); - bool wrap = r.is_wrap_around(dht::token_comparator()); - if (wrap) { - auto split_ranges = r.unwrap(); - for (auto ep : eps) { - ret.emplace(ep, split_ranges.first); - ret.emplace(ep, split_ranges.second); - } - } else { - for (auto ep : eps) { - ret.emplace(ep, r); + for (auto ep : eps) { + for (auto&& rng : r) { + ret.emplace(ep, rng); } } } return ret; } -std::unordered_multimap, inet_address> +std::unordered_multimap, inet_address> abstract_replication_strategy::get_range_addresses(token_metadata& tm) const { - std::unordered_multimap, inet_address> ret; + std::unordered_multimap, inet_address> ret; for (auto& t : tm.sorted_tokens()) { - range r = tm.get_primary_range_for(t); + std::vector> r = tm.get_primary_ranges_for(t); auto eps = calculate_natural_endpoints(t, tm); - bool wrap = r.is_wrap_around(dht::token_comparator()); - if (wrap) { - auto split_ranges = r.unwrap(); - for (auto ep : eps) { - ret.emplace(split_ranges.first, ep); - ret.emplace(split_ranges.second, ep); - } - } else { - for (auto ep : eps) { - ret.emplace(r, ep); - } + for (auto ep : eps) { + for (auto&& rng : r) + ret.emplace(rng, ep); } } return ret; } -std::vector> +std::vector> abstract_replication_strategy::get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address) { return get_pending_address_ranges(tm, std::unordered_set{pending_token}, pending_address); } -std::vector> +std::vector> abstract_replication_strategy::get_pending_address_ranges(token_metadata& tm, std::unordered_set pending_tokens, inet_address pending_address) { - std::vector> ret; + std::vector> ret; auto temp = tm.clone_only_token_map(); temp.update_normal_tokens(pending_tokens, pending_address); for (auto& x : get_address_ranges(temp)) { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 31f412a966..3c90c7edff 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -105,22 +105,22 @@ public: // The list is sorted, and its elements are non overlapping and non wrap-around. // It the analogue of Origin's getAddressRanges().get(endpoint). // This function is not efficient, and not meant for the fast path. - std::vector> get_ranges(inet_address ep) const; + std::vector> get_ranges(inet_address ep) const; // get_primary_ranges() returns the list of "primary ranges" for the given // endpoint. "Primary ranges" are the ranges that the node is responsible // for storing replica primarily, which means this is the first node // returned calculate_natural_endpoints(). // This function is the analogue of Origin's // StorageService.getPrimaryRangesForEndpoint(). - std::vector> get_primary_ranges(inet_address ep); + std::vector> get_primary_ranges(inet_address ep); - std::unordered_multimap> get_address_ranges(token_metadata& tm) const; + std::unordered_multimap> get_address_ranges(token_metadata& tm) const; - std::unordered_multimap, inet_address> get_range_addresses(token_metadata& tm) const; + std::unordered_multimap, inet_address> get_range_addresses(token_metadata& tm) const; - std::vector> get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address); + std::vector> get_pending_address_ranges(token_metadata& tm, token pending_token, inet_address pending_address); - std::vector> get_pending_address_ranges(token_metadata& tm, std::unordered_set pending_tokens, inet_address pending_address); + std::vector> get_pending_address_ranges(token_metadata& tm, std::unordered_set pending_tokens, inet_address pending_address); }; } diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 0590caf96d..8f2d5b3677 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -25,6 +25,8 @@ #include "locator/snitch_base.hh" #include "locator/abstract_replication_strategy.hh" #include "log.hh" +#include "stdx.hh" +#include "partition_range_compat.hh" #include #include #include @@ -327,18 +329,21 @@ token token_metadata::get_predecessor(token t) { } } -std::vector> token_metadata::get_primary_ranges_for(std::unordered_set tokens) { - std::vector> ranges; - ranges.reserve(tokens.size()); +std::vector> token_metadata::get_primary_ranges_for(std::unordered_set tokens) { + std::vector> ranges; + ranges.reserve(tokens.size() + 1); // one of the ranges will wrap for (auto right : tokens) { - ranges.emplace_back(range::bound(get_predecessor(right), false), - range::bound(right, true)); + auto left = get_predecessor(right); + compat::unwrap_into( + wrapping_range(range_bound(left, false), range_bound(right)), + dht::token_comparator(), + [&] (auto&& rng) { ranges.push_back(std::move(rng)); }); } return ranges; } -range token_metadata::get_primary_range_for(token right) { - return get_primary_ranges_for({right}).front(); +std::vector> token_metadata::get_primary_ranges_for(token right) { + return get_primary_ranges_for(std::unordered_set{right}); } boost::icl::interval::interval_type @@ -424,7 +429,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str return; } - std::unordered_multimap> address_ranges = strategy.get_address_ranges(*this); + std::unordered_multimap> address_ranges = strategy.get_address_ranges(*this); // FIMXE // Copy of metadata reflecting the situation after all leave operations are finished. diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 191cf558df..0be54f31c7 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -609,9 +609,9 @@ public: } #endif public: - std::vector> get_primary_ranges_for(std::unordered_set tokens); + std::vector> get_primary_ranges_for(std::unordered_set tokens); - range get_primary_range_for(token right); + std::vector> get_primary_ranges_for(token right); static boost::icl::interval::interval_type range_to_interval(range r); private: diff --git a/main.cc b/main.cc index b2cecd6e60..7ebf9ee377 100644 --- a/main.cc +++ b/main.cc @@ -585,7 +585,7 @@ int main(int ac, char** av) { api::set_server_stream_manager(ctx).get(); // Start handling REPAIR_CHECKSUM_RANGE messages net::get_messaging_service().invoke_on_all([&db] (auto& ms) { - ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range range, rpc::optional hash_version) { + ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, nonwrapping_range range, rpc::optional hash_version) { auto hv = hash_version ? *hash_version : repair_checksum::legacy; return do_with(std::move(keyspace), std::move(cf), std::move(range), [&db, hv] (auto& keyspace, auto& cf, auto& range) { diff --git a/memtable.cc b/memtable.cc index 7ca100d981..ea6ace8cc7 100644 --- a/memtable.cc +++ b/memtable.cc @@ -339,10 +339,6 @@ memtable::make_reader(schema_ptr s, const query::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc) { - if (query::is_wrap_around(range, *s)) { - fail(unimplemented::cause::WRAP_AROUND); - } - if (query::is_single_partition(range)) { const query::ring_position& pos = range.start()->value(); return _read_section(*this, [&] { diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 2f37f6f847..1d55ba12d4 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -69,6 +69,7 @@ #include "idl/partition_checksum.dist.impl.hh" #include "rpc/lz4_compressor.hh" #include "rpc/multi_algo_compressor_factory.hh" +#include "partition_range_compat.hh" namespace net { @@ -704,10 +705,15 @@ future<> messaging_service::send_stream_mutation(msg_addr id, UUID plan_id, froz // STREAM_MUTATION_DONE void messaging_service::register_stream_mutation_done(std::function (const rpc::client_info& cinfo, - UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func) { - register_handler(this, messaging_verb::STREAM_MUTATION_DONE, std::move(func)); + UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func) { + register_handler(this, messaging_verb::STREAM_MUTATION_DONE, + [func = std::move(func)] (const rpc::client_info& cinfo, + UUID plan_id, std::vector> ranges, + UUID cf_id, unsigned dst_cpu_id) mutable { + return func(cinfo, plan_id, compat::unwrap(std::move(ranges)), cf_id, dst_cpu_id); + }); } -future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id) { +future<> messaging_service::send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id) { return send_message_timeout_and_retry(this, messaging_verb::STREAM_MUTATION_DONE, id, streaming_timeout, streaming_nr_retry, streaming_wait_before_retry, plan_id, std::move(ranges), cf_id, dst_cpu_id); @@ -819,7 +825,7 @@ future<> messaging_service::send_mutation_done(msg_addr id, unsigned shard, resp return send_message_oneway(this, messaging_verb::MUTATION_DONE, std::move(id), std::move(shard), std::move(response_id)); } -void messaging_service::register_read_data(std::function>> (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func) { +void messaging_service::register_read_data(std::function>> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func) { register_handler(this, net::messaging_verb::READ_DATA, std::move(func)); } void messaging_service::unregister_read_data() { @@ -849,7 +855,7 @@ future messaging_service::send_schema_check(msg_addr dst) { return send_message(this, net::messaging_verb::SCHEMA_CHECK, dst); } -void messaging_service::register_read_mutation_data(std::function>> (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func) { +void messaging_service::register_read_mutation_data(std::function>> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func) { register_handler(this, net::messaging_verb::READ_MUTATION_DATA, std::move(func)); } void messaging_service::unregister_read_mutation_data() { @@ -859,7 +865,7 @@ future messaging_service::send_read_mutation_data(msg_addr return send_message_timeout(this, messaging_verb::READ_MUTATION_DATA, std::move(id), timeout, cmd, pr); } -void messaging_service::register_read_digest(std::function (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func) { +void messaging_service::register_read_digest(std::function (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func) { register_handler(this, net::messaging_verb::READ_DIGEST, std::move(func)); } void messaging_service::unregister_read_digest() { @@ -897,14 +903,14 @@ future<> messaging_service::send_replication_finished(msg_addr id, inet_address // Wrapper for REPAIR_CHECKSUM_RANGE void messaging_service::register_repair_checksum_range( std::function (sstring keyspace, - sstring cf, query::range range, rpc::optional hash_version)>&& f) { + sstring cf, nonwrapping_range range, rpc::optional hash_version)>&& f) { register_handler(this, messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(f)); } void messaging_service::unregister_repair_checksum_range() { _rpc->unregister_handler(messaging_verb::REPAIR_CHECKSUM_RANGE); } future messaging_service::send_repair_checksum_range( - msg_addr id, sstring keyspace, sstring cf, ::range range, repair_checksum hash_version) + msg_addr id, sstring keyspace, sstring cf, ::nonwrapping_range range, repair_checksum hash_version) { return send_message(this, messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(id), diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 5973415c1d..d3f0cbc0ec 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -64,11 +64,17 @@ namespace dht { } namespace query { - using partition_range = range; + using partition_range = nonwrapping_range; class read_command; class result; } +namespace compat { + +using wrapping_partition_range = wrapping_range; + +} + namespace net { /* All verb handler identifiers */ @@ -230,16 +236,16 @@ public: void register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, rpc::optional)>&& func); future<> send_stream_mutation(msg_addr id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id, bool fragmented); - void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); - future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id); + void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id)>&& func); + future<> send_stream_mutation_done(msg_addr id, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id); void register_complete_message(std::function (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id)>&& func); future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id); // Wrapper for REPAIR_CHECKSUM_RANGE verb - void register_repair_checksum_range(std::function (sstring keyspace, sstring cf, range range, rpc::optional hash_version)>&& func); + void register_repair_checksum_range(std::function (sstring keyspace, sstring cf, nonwrapping_range range, rpc::optional hash_version)>&& func); void unregister_repair_checksum_range(); - future send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, range range, repair_checksum hash_version); + future send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, nonwrapping_range range, repair_checksum hash_version); // Wrapper for GOSSIP_ECHO verb void register_gossip_echo(std::function ()>&& func); @@ -292,7 +298,7 @@ public: // Wrapper for READ_DATA // Note: WTH is future> - void register_read_data(std::function>> (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func); + void register_read_data(std::function>> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func); void unregister_read_data(); future send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const query::partition_range& pr); @@ -307,12 +313,12 @@ public: future send_schema_check(msg_addr); // Wrapper for READ_MUTATION_DATA - void register_read_mutation_data(std::function>> (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func); + void register_read_mutation_data(std::function>> (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func); void unregister_read_mutation_data(); future send_read_mutation_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const query::partition_range& pr); // Wrapper for READ_DIGEST - void register_read_digest(std::function (const rpc::client_info&, query::read_command cmd, query::partition_range pr)>&& func); + void register_read_digest(std::function (const rpc::client_info&, query::read_command cmd, compat::wrapping_partition_range pr)>&& func); void unregister_read_digest(); future> send_read_digest(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const query::partition_range& pr); diff --git a/partition_range_compat.hh b/partition_range_compat.hh new file mode 100644 index 0000000000..93e332b1b7 --- /dev/null +++ b/partition_range_compat.hh @@ -0,0 +1,175 @@ +/* + * Copyright 2016 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + + +#pragma once + +#include +#include +#include "range.hh" +#include "dht/i_partitioner.hh" +#include "query-request.hh" +#include "stdx.hh" + +namespace compat { + +using wrapping_partition_range = wrapping_range; + + +// unwraps a vector of wrapping ranges into a vector of nonwrapping ranges +// if the vector happens to be sorted by the left bound, it remains sorted +template +std::vector> +unwrap(std::vector>&& v, Comparator&& cmp) { + std::vector> ret; + ret.reserve(v.size() + 1); + for (auto&& wr : v) { + if (wr.is_wrap_around(cmp)) { + auto&& p = std::move(wr).unwrap(); + ret.insert(ret.begin(), nonwrapping_range(std::move(p.first))); + ret.emplace_back(std::move(p.second)); + } else { + ret.emplace_back(std::move(wr)); + } + } + return ret; +} + +// unwraps a vector of wrapping ranges into a vector of nonwrapping ranges +// if the vector happens to be sorted by the left bound, it remains sorted +template +std::vector> +unwrap(const std::vector>& v, Comparator&& cmp) { + std::vector> ret; + ret.reserve(v.size() + 1); + for (auto&& wr : v) { + if (wr.is_wrap_around(cmp)) { + auto&& p = wr.unwrap(); + ret.insert(ret.begin(), nonwrapping_range(p.first)); + ret.emplace_back(p.second); + } else { + ret.emplace_back(wr); + } + } + return ret; +} + +template +std::vector> +wrap(const std::vector>& v) { + // re-wrap (-inf,x) ... (y, +inf) into (y, x): + if (v.size() >= 2 && !v.front().start() && !v.back().end()) { + auto ret = std::vector>(); + ret.reserve(v.size() - 1); + std::copy(v.begin() + 1, v.end() - 1, std::back_inserter(ret)); + ret.emplace_back(v.back().start(), v.front().end()); + return ret; + } + return boost::copy_range>>(v); +} + +template +std::vector> +wrap(std::vector>&& v) { + // re-wrap (-inf,x) ... (y, +inf) into (y, x): + if (v.size() >= 2 && !v.front().start() && !v.back().end()) { + auto ret = std::vector>(); + ret.reserve(v.size() - 1); + std::move(v.begin() + 1, v.end() - 1, std::back_inserter(ret)); + ret.emplace_back(std::move(v.back()).start(), std::move(v.front()).end()); + return ret; + } + // want boost::adaptor::moved ... + return boost::copy_range>>(v); +} + +inline +std::vector> +unwrap(const std::vector>& v) { + return unwrap(v, dht::token_comparator()); +} + +inline +std::vector> +unwrap(std::vector>&& v) { + return unwrap(std::move(v), dht::token_comparator()); +} + + +class one_or_two_partition_ranges : public std::pair> { + using pair = std::pair>; +public: + explicit one_or_two_partition_ranges(query::partition_range&& f) + : pair(std::move(f), stdx::nullopt) { + } + explicit one_or_two_partition_ranges(query::partition_range&& f, query::partition_range&& s) + : pair(std::move(f), std::move(s)) { + } + operator std::vector() const & { + auto ret = std::vector(); + // not reserving, since ret.size() is likely to be 1 + ret.push_back(first); + if (second) { + ret.push_back(*second); + } + return ret; + } + operator std::vector() && { + auto ret = std::vector(); + // not reserving, since ret.size() is likely to be 1 + ret.push_back(std::move(first)); + if (second) { + ret.push_back(std::move(*second)); + } + return ret; + } +}; + +inline +one_or_two_partition_ranges +unwrap(wrapping_partition_range pr, const schema& s) { + if (pr.is_wrap_around(dht::ring_position_comparator(s))) { + auto unw = std::move(pr).unwrap(); + // Preserve ring order + return one_or_two_partition_ranges( + query::partition_range(std::move(unw.second)), + query::partition_range(std::move(unw.first))); + } else { + return one_or_two_partition_ranges(query::partition_range(std::move(pr))); + } +} + +// Unwraps `range` and calls `func` with its components, with an unwrapped +// range type, as a parameter (once or twice) +template +void +unwrap_into(wrapping_range&& range, const Comparator& cmp, Func&& func) { + if (range.is_wrap_around(cmp)) { + auto&& unw = range.unwrap(); + // Preserve ring order + func(nonwrapping_range(std::move(unw.second))); + func(nonwrapping_range(std::move(unw.first))); + } else { + func(nonwrapping_range(std::move(range))); + } +} + +} diff --git a/query-request.hh b/query-request.hh index 49a81c736e..520a04c34f 100644 --- a/query-request.hh +++ b/query-request.hh @@ -36,16 +36,11 @@ template using range = wrapping_range; using ring_position = dht::ring_position; -using partition_range = range; +using partition_range = nonwrapping_range; using clustering_range = nonwrapping_range; extern const partition_range full_partition_range; -inline -bool is_wrap_around(const query::partition_range& range, const schema& s) { - return range.is_wrap_around(dht::ring_position_comparator(s)); -} - inline bool is_single_partition(const query::partition_range& range) { return range.is_singular() && range.start()->value().has_key(); diff --git a/range.hh b/range.hh index 6fc43c6e8b..803dda77e9 100644 --- a/range.hh +++ b/range.hh @@ -22,6 +22,7 @@ #pragma once #include "stdx.hh" +#include #include #include #include @@ -436,12 +437,17 @@ public: explicit nonwrapping_range(wrapping_range&& r) : _range(std::move(r)) { } + // Can only be called if !r.is_wrap_around(). + explicit nonwrapping_range(const wrapping_range& r) + : _range(r) + { } operator wrapping_range() const & { return _range; } operator wrapping_range() && { return std::move(_range); } + // the point is before the range. // Comparator must define a total ordering on T. template @@ -466,6 +472,9 @@ public: return wrapping_range::greater_than_or_equal(_range.end_bound(), other._range.start_bound(), cmp) && wrapping_range::greater_than_or_equal(other._range.end_bound(), _range.start_bound(), cmp); } + static nonwrapping_range make(bound start, bound end) { + return nonwrapping_range({std::move(start)}, {std::move(end)}); + } static nonwrapping_range make_open_ended_both_sides() { return {{}, {}}; } diff --git a/repair/repair.cc b/repair/repair.cc index 04ee2145b1..67c477db9c 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -358,7 +358,7 @@ std::ostream& operator<<(std::ostream& out, const partition_checksum& c) { // data is coming in). static future checksum_range_shard(database &db, const sstring& keyspace_name, const sstring& cf_name, - const ::range& range, repair_checksum hash_version) { + const ::nonwrapping_range& range, repair_checksum hash_version) { auto& cf = db.find_column_family(keyspace_name, cf_name); return do_with(dht::to_partition_range(range), [&cf, hash_version] (const auto& partition_range) { auto reader = cf.make_streaming_reader(cf.schema(), partition_range); @@ -395,7 +395,7 @@ static future checksum_range_shard(database &db, // function is not resolved. future checksum_range(seastar::sharded &db, const sstring& keyspace, const sstring& cf, - const ::range& range, repair_checksum hash_version) { + const ::nonwrapping_range& range, repair_checksum hash_version) { unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0; unsigned shard_end = range.end() ? @@ -417,7 +417,7 @@ future checksum_range(seastar::sharded &db, static void request_transfer_ranges(seastar::sharded& db, const sstring& keyspace, const sstring& cf, - const ::range& range, + const ::nonwrapping_range& range, const std::vector& neighbors_in, const std::vector& neighbors_out, streaming::stream_plan& sp_in, @@ -430,8 +430,8 @@ static void request_transfer_ranges(seastar::sharded& db, } } -static void split_and_add(std::vector<::range>& ranges, - const range& range, +static void split_and_add(std::vector<::nonwrapping_range>& ranges, + const nonwrapping_range& range, uint64_t estimated_partitions, uint64_t target_partitions) { if (estimated_partitions < target_partitions) { // We're done, the range is small enough to not be split further @@ -467,7 +467,7 @@ static thread_local semaphore parallelism_semaphore(parallelism); // Repair a single cf in a single local range. // Comparable to RepairJob in Origin. static future<> repair_cf_range(seastar::sharded& db, - sstring keyspace, sstring cf, ::range range, + sstring keyspace, sstring cf, ::nonwrapping_range range, std::vector& neighbors, streaming::stream_plan& sp_in, streaming::stream_plan& sp_out, @@ -477,17 +477,9 @@ static future<> repair_cf_range(seastar::sharded& db, return make_ready_future<>(); } - // The partition iterating code inside checksum_range_shard does not - // support wrap-around ranges, so we need to break at least wrap- - // around ranges. - std::vector<::range> ranges; - if (range.is_wrap_around(dht::token_comparator())) { - auto unwrapped = range.unwrap(); - ranges.push_back(unwrapped.first); - ranges.push_back(unwrapped.second); - } else { - ranges.push_back(range); - } + std::vector<::nonwrapping_range> ranges; + ranges.push_back(range); + // Additionally, we want to break up large ranges so they will have // (approximately) a desired number of rows each. // FIXME: column_family should have a method to estimate the number of @@ -507,7 +499,7 @@ static future<> repair_cf_range(seastar::sharded& db, // fill a vector in advance. // FIXME: this "100" needs to be a parameter. uint64_t target_partitions = 100; - std::vector<::range> tosplit; + std::vector<::nonwrapping_range> tosplit; while (estimated_partitions > target_partitions) { tosplit.clear(); ranges.swap(tosplit); @@ -637,7 +629,7 @@ static future<> repair_cf_range(seastar::sharded& db, // Repair a single local range, multiple column families. // Comparable to RepairSession in Origin static future<> repair_range(seastar::sharded& db, sstring keyspace, - ::range range, std::vector& cfs, + ::nonwrapping_range range, std::vector& cfs, const std::vector& data_centers, const std::vector& hosts, streaming::stream_plan& sp_in, @@ -654,24 +646,24 @@ static future<> repair_range(seastar::sharded& db, sstring keyspace, }); } -static std::vector> get_ranges_for_endpoint( +static std::vector> get_ranges_for_endpoint( database& db, sstring keyspace, gms::inet_address ep) { auto& rs = db.find_keyspace(keyspace).get_replication_strategy(); return rs.get_ranges(ep); } -static std::vector> get_local_ranges( +static std::vector> get_local_ranges( database& db, sstring keyspace) { return get_ranges_for_endpoint(db, keyspace, utils::fb_utilities::get_broadcast_address()); } -static std::vector> get_primary_ranges_for_endpoint( +static std::vector> get_primary_ranges_for_endpoint( database& db, sstring keyspace, gms::inet_address ep) { auto& rs = db.find_keyspace(keyspace).get_replication_strategy(); return rs.get_primary_ranges(ep); } -static std::vector> get_primary_ranges( +static std::vector> get_primary_ranges( database& db, sstring keyspace) { return get_primary_ranges_for_endpoint(db, keyspace, utils::fb_utilities::get_broadcast_address()); @@ -687,7 +679,7 @@ struct repair_options { // If ranges is not empty, it overrides the repair's default heuristics // for determining the list of ranges to repair. In particular, "ranges" // overrides the setting of "primary_range". - std::vector> ranges; + std::vector> ranges; // If start_token and end_token are set, they define a range which is // intersected with the ranges actually held by this node to decide what // to repair. @@ -800,7 +792,7 @@ private: // A range is expressed as start_token:end token and multiple ranges can // be given as comma separated ranges(e.g. aaa:bbb,ccc:ddd). - static void ranges_opt(std::vector>& var, + static void ranges_opt(std::vector>& var, std::unordered_map& options, const sstring& key) { auto it = options.find(key); @@ -818,9 +810,12 @@ private: } auto tok_start = dht::global_partitioner().from_sstring(token_strings[0]); auto tok_end = dht::global_partitioner().from_sstring(token_strings[1]); - var.emplace_back( + auto rng = wrapping_range( ::range::bound(tok_start, false), ::range::bound(tok_end, true)); + compat::unwrap_into(std::move(rng), dht::token_comparator(), [&] (nonwrapping_range&& x) { + var.push_back(std::move(x)); + }); } options.erase(it); } @@ -844,7 +839,7 @@ private: // is assumed to be a indivisible in the sense that all the tokens in has the // same nodes as replicas. static future<> repair_ranges(seastar::sharded& db, sstring keyspace, - std::vector> ranges, + std::vector> ranges, std::vector cfs, int id, std::vector data_centers, std::vector hosts) { return do_with(streaming::stream_plan("repair-in"), @@ -911,7 +906,7 @@ static int do_repair_start(seastar::sharded& db, sstring keyspace, // local ranges (the token ranges for which this node holds a replica of). // Each of these ranges may have a different set of replicas, so the // repair of each range is performed separately with repair_range(). - std::vector> ranges; + std::vector> ranges; if (options.ranges.size()) { ranges = options.ranges; } else if (options.primary_range) { @@ -954,8 +949,8 @@ static int do_repair_start(seastar::sharded& db, sstring keyspace, dht::global_partitioner().from_sstring(options.end_token), false); } - ::range given_range_complement(tok_end, tok_start); - std::vector> intersections; + nonwrapping_range given_range_complement(tok_end, tok_start); + std::vector> intersections; for (const auto& range : ranges) { auto rs = range.subtract(given_range_complement, dht::token_comparator()); diff --git a/repair/repair.hh b/repair/repair.hh index 992835b699..55b586a597 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -107,4 +107,4 @@ public: // not resolved. future checksum_range(seastar::sharded &db, const sstring& keyspace, const sstring& cf, - const ::range& range, repair_checksum rt); + const ::nonwrapping_range& range, repair_checksum rt); diff --git a/row_cache.cc b/row_cache.cc index 827dcb025c..41bb59fd76 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -628,10 +628,6 @@ row_cache::make_scanning_reader(schema_ptr s, const io_priority_class& pc, const query::partition_slice& slice, tracing::trace_state_ptr trace_state) { - if (range.is_wrap_around(dht::ring_position_comparator(*s))) { - warn(unimplemented::cause::WRAP_AROUND); - throw std::runtime_error("row_cache doesn't support wrap-around ranges"); - } return make_mutation_reader(std::move(s), *this, range, slice, pc, std::move(trace_state)); } @@ -881,13 +877,7 @@ return _populate_phaser.advance_and_await().then([this, &dk] { future<> row_cache::invalidate(const query::partition_range& range) { return _populate_phaser.advance_and_await().then([this, &range] { with_linearized_managed_bytes([&] { - if (range.is_wrap_around(dht::ring_position_comparator(*_schema))) { - auto unwrapped = range.unwrap(); - invalidate_unwrapped(unwrapped.first); - invalidate_unwrapped(unwrapped.second); - } else { - invalidate_unwrapped(range); - } + invalidate_unwrapped(range); }); }); } diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 3e7cf55538..4f214ad4b6 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -39,6 +39,7 @@ * along with Scylla. If not, see . */ +#include "partition_range_compat.hh" #include "db/consistency_level.hh" #include "db/commitlog/commitlog.hh" #include "storage_proxy.hh" @@ -2905,7 +2906,7 @@ get_restricted_ranges(locator::token_metadata& tm, const schema& s, query::parti dht::ring_position_comparator cmp(s); // special case for bounds containing exactly 1 token - if (start_token(range) == end_token(range) && !range.is_wrap_around(cmp)) { + if (start_token(range) == end_token(range)) { if (start_token(range).is_minimum()) { return {}; } @@ -2915,13 +2916,7 @@ get_restricted_ranges(locator::token_metadata& tm, const schema& s, query::parti std::vector ranges; auto add_range = [&ranges, &cmp] (query::partition_range&& r) { - if (r.is_wrap_around(cmp)) { - auto unwrapped = r.unwrap(); - ranges.emplace_back(std::move(unwrapped.second)); // Append in split order - ranges.emplace_back(std::move(unwrapped.first)); - } else { - ranges.emplace_back(std::move(r)); - } + ranges.emplace_back(std::move(r)); }; // divide the queryRange into pieces delimited by the ring @@ -3269,7 +3264,7 @@ void storage_proxy::init_messaging_service() { return net::messaging_service::no_wait(); }); }); - ms.register_read_data([] (const rpc::client_info& cinfo, query::read_command cmd, query::partition_range pr) { + ms.register_read_data([] (const rpc::client_info& cinfo, query::read_command cmd, compat::wrapping_partition_range pr) { tracing::trace_state_ptr trace_state_ptr; auto src_addr = net::messaging_service::get_source(cinfo); if (cmd.trace_info) { @@ -3278,16 +3273,21 @@ void storage_proxy::init_messaging_service() { tracing::trace(trace_state_ptr, "read_data: message received from /{}", src_addr.addr); } - return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr& p, tracing::trace_state_ptr& trace_state_ptr) mutable { + return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared(std::move(cmd)), src_addr = std::move(src_addr)] (compat::wrapping_partition_range& pr, shared_ptr& p, tracing::trace_state_ptr& trace_state_ptr) mutable { auto src_ip = src_addr.addr; return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr] (schema_ptr s) { - return p->query_singular_local(std::move(s), cmd, pr, query::result_request::result_and_digest, trace_state_ptr); + auto pr2 = compat::unwrap(std::move(pr), *s); + if (pr2.second) { + // this function assumes singular queries but doesn't validate + throw std::runtime_error("READ_DATA called with wrapping range"); + } + return p->query_singular_local(std::move(s), cmd, std::move(pr2.first), query::result_request::result_and_digest, trace_state_ptr); }).finally([&trace_state_ptr, src_ip] () mutable { tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip); }); }); }); - ms.register_read_mutation_data([] (const rpc::client_info& cinfo, query::read_command cmd, query::partition_range pr) { + ms.register_read_mutation_data([] (const rpc::client_info& cinfo, query::read_command cmd, compat::wrapping_partition_range pr) { tracing::trace_state_ptr trace_state_ptr; auto src_addr = net::messaging_service::get_source(cinfo); if (cmd.trace_info) { @@ -3295,16 +3295,16 @@ void storage_proxy::init_messaging_service() { tracing::begin(trace_state_ptr); tracing::trace(trace_state_ptr, "read_mutation_data: message received from /{}", src_addr.addr); } - return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr& p, tracing::trace_state_ptr& trace_state_ptr) mutable { + return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared(std::move(cmd)), src_addr = std::move(src_addr)] (compat::wrapping_partition_range& pr, shared_ptr& p, tracing::trace_state_ptr& trace_state_ptr) mutable { auto src_ip = src_addr.addr; - return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr] (schema_ptr s) { - return p->query_mutations_locally(std::move(s), cmd, pr, trace_state_ptr); + return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr] (schema_ptr s) mutable { + return p->query_mutations_locally(std::move(s), cmd, compat::unwrap(std::move(pr), *s), trace_state_ptr); }).finally([&trace_state_ptr, src_ip] () mutable { tracing::trace(trace_state_ptr, "read_mutation_data handling is done, sending a response to /{}", src_ip); }); }); }); - ms.register_read_digest([] (const rpc::client_info& cinfo, query::read_command cmd, query::partition_range pr) { + ms.register_read_digest([] (const rpc::client_info& cinfo, query::read_command cmd, compat::wrapping_partition_range pr) { tracing::trace_state_ptr trace_state_ptr; auto src_addr = net::messaging_service::get_source(cinfo); if (cmd.trace_info) { @@ -3312,10 +3312,15 @@ void storage_proxy::init_messaging_service() { tracing::begin(trace_state_ptr); tracing::trace(trace_state_ptr, "read_digest: message received from /{}", src_addr.addr); } - return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared(std::move(cmd)), src_addr = std::move(src_addr)] (const query::partition_range& pr, shared_ptr& p, tracing::trace_state_ptr& trace_state_ptr) mutable { + return do_with(std::move(pr), get_local_shared_storage_proxy(), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared(std::move(cmd)), src_addr = std::move(src_addr)] (compat::wrapping_partition_range& pr, shared_ptr& p, tracing::trace_state_ptr& trace_state_ptr) mutable { auto src_ip = src_addr.addr; return get_schema_for_read(cmd->schema_version, std::move(src_addr)).then([cmd, &pr, &p, &trace_state_ptr] (schema_ptr s) { - return p->query_singular_local_digest(std::move(s), cmd, pr, trace_state_ptr); + auto pr2 = compat::unwrap(std::move(pr), *s); + if (pr2.second) { + // this function assumes singular queries but doesn't validate + throw std::runtime_error("READ_DIGEST called with wrapping range"); + } + return p->query_singular_local_digest(std::move(s), cmd, std::move(pr2.first), trace_state_ptr); }).finally([&trace_state_ptr, src_ip] () mutable { tracing::trace(trace_state_ptr, "read_digest handling is done, sending a response to /{}", src_ip); }); @@ -3460,6 +3465,15 @@ storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr>> +storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr cmd, const compat::one_or_two_partition_ranges& pr, tracing::trace_state_ptr trace_state) { + if (!pr.second) { + return query_mutations_locally(std::move(s), std::move(cmd), pr.first, std::move(trace_state)); } else { return query_nonsingular_mutations_locally(std::move(s), std::move(cmd), pr, std::move(trace_state)); } @@ -3467,7 +3481,7 @@ storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr>> -storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state) { +storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr cmd, const std::vector& prs, tracing::trace_state_ptr trace_state) { struct part { query::partition_range pr; unsigned shard; @@ -3479,14 +3493,10 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr& parts) mutable { auto query_part = [this, cmd, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (part p) mutable { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index e9b3bdb69a..3af807dffa 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -52,6 +52,12 @@ #include "utils/estimated_histogram.hh" #include "tracing/trace_state.hh" +namespace compat { + +class one_or_two_partition_ranges; + +} + namespace service { class abstract_write_response_handler; @@ -243,7 +249,7 @@ private: template future<> mutate_internal(Range mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state); future>> query_nonsingular_mutations_locally( - schema_ptr s, lw_shared_ptr cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state); + schema_ptr s, lw_shared_ptr cmd, const std::vector& pr, tracing::trace_state_ptr trace_state); public: storage_proxy(distributed& db); @@ -314,6 +320,16 @@ public: schema_ptr, lw_shared_ptr cmd, const query::partition_range&, tracing::trace_state_ptr trace_state = nullptr); + + future>> query_mutations_locally( + schema_ptr, lw_shared_ptr cmd, const compat::one_or_two_partition_ranges&, + tracing::trace_state_ptr trace_state = nullptr); + + future>> query_mutations_locally( + schema_ptr s, lw_shared_ptr cmd, const std::vector& pr, + tracing::trace_state_ptr trace_state = nullptr); + + future<> stop(); const stats& get_stats() const { diff --git a/service/storage_service.cc b/service/storage_service.cc index 737d8d2d48..31274eb025 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2068,7 +2068,7 @@ future<> storage_service::removenode(sstring host_id_string) { // get all ranges that change ownership (that is, a node needs // to take responsibility for new range) - std::unordered_multimap, inet_address> changed_ranges = + std::unordered_multimap, inet_address> changed_ranges = ss.get_changed_ranges_for_leaving(keyspace_name, endpoint); auto& fd = gms::get_local_failure_detector(); for (auto& x: changed_ranges) { @@ -2286,13 +2286,13 @@ future storage_service::is_initialized() { }); } -std::unordered_multimap, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) { +std::unordered_multimap, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) { // First get all ranges the leaving endpoint is responsible for auto ranges = get_ranges_for_endpoint(keyspace_name, endpoint); logger.debug("Node {} ranges [{}]", endpoint, ranges); - std::unordered_map, std::vector> current_replica_endpoints; + std::unordered_map, std::vector> current_replica_endpoints; // Find (for each range) all nodes that store replicas for these ranges as well auto metadata = _token_metadata.clone_only_token_map(); // don't do this in the loop! #7758 @@ -2311,7 +2311,7 @@ std::unordered_multimap, inet_address> storage_service::get_changed temp.remove_endpoint(endpoint); } - std::unordered_multimap, inet_address> changed_ranges; + std::unordered_multimap, inet_address> changed_ranges; // Go through the ranges and for each range check who will be // storing replicas for these ranges when the leaving endpoint @@ -2325,7 +2325,7 @@ std::unordered_multimap, inet_address> storage_service::get_changed auto rg = current_replica_endpoints.equal_range(r); for (auto it = rg.first; it != rg.second; it++) { - const range& range_ = it->first; + const nonwrapping_range& range_ = it->first; std::vector& current_eps = it->second; logger.debug("range={}, current_replica_endpoints={}, new_replica_endpoints={}", range_, current_eps, new_replica_endpoints); for (auto ep : it->second) { @@ -2352,7 +2352,7 @@ std::unordered_multimap, inet_address> storage_service::get_changed // Runs inside seastar::async context void storage_service::unbootstrap() { - std::unordered_map, inet_address>> ranges_to_stream; + std::unordered_map, inet_address>> ranges_to_stream; auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); for (const auto& keyspace_name : non_system_keyspaces) { @@ -2393,21 +2393,21 @@ void storage_service::unbootstrap() { } future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { - std::unordered_multimap>>> ranges_to_fetch; + std::unordered_multimap>>> ranges_to_fetch; auto my_address = get_broadcast_address(); auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); for (const auto& keyspace_name : non_system_keyspaces) { - std::unordered_multimap, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint); - std::vector> my_new_ranges; + std::unordered_multimap, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint); + std::vector> my_new_ranges; for (auto& x : changed_ranges) { if (x.second == my_address) { my_new_ranges.emplace_back(x.first); } } - std::unordered_multimap> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges); - std::unordered_map>> tmp; + std::unordered_multimap> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges); + std::unordered_map>> tmp; for (auto& x : source_ranges) { tmp[x.first].emplace_back(x.second); } @@ -2416,7 +2416,7 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr auto sp = make_lw_shared("Restore replica count"); for (auto& x: ranges_to_fetch) { const sstring& keyspace_name = x.first; - std::unordered_map>>& maps = x.second; + std::unordered_map>>& maps = x.second; for (auto& m : maps) { auto source = m.first; auto ranges = m.second; @@ -2517,9 +2517,9 @@ void storage_service::leave_ring() { } future<> -storage_service::stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace) { +storage_service::stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace) { // First, we build a list of ranges to stream to each host, per table - std::unordered_map>>> sessions_to_stream_by_keyspace; + std::unordered_map>>> sessions_to_stream_by_keyspace; for (auto& entry : ranges_to_stream_by_keyspace) { const auto& keyspace = entry.first; auto& ranges_with_endpoints = entry.second; @@ -2528,9 +2528,9 @@ storage_service::stream_ranges(std::unordered_map>> ranges_per_endpoint; + std::unordered_map>> ranges_per_endpoint; for (auto& end_point_entry : ranges_with_endpoints) { - range r = end_point_entry.first; + nonwrapping_range r = end_point_entry.first; inet_address endpoint = end_point_entry.second; ranges_per_endpoint[endpoint].emplace_back(r); } @@ -2585,7 +2585,7 @@ future<> storage_service::stream_hints() { auto hints_destination_host = candidates.front(); // stream all hints -- range list will be a singleton of "the entire ring" - std::vector> ranges = {range::make_open_ended_both_sides()}; + std::vector> ranges = {nonwrapping_range::make_open_ended_both_sides()}; logger.debug("stream_hints: ranges={}", ranges); auto sp = make_lw_shared("Hints"); @@ -2755,15 +2755,15 @@ future<> storage_service::shutdown_client_servers() { return do_stop_rpc_server().then([this] { return do_stop_native_transport(); }); } -std::unordered_multimap> -storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::vector>& ranges) { +std::unordered_multimap> +storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::vector>& ranges) { auto my_address = get_broadcast_address(); auto& fd = gms::get_local_failure_detector(); auto& ks = _db.local().find_keyspace(keyspace_name); auto& strat = ks.get_replication_strategy(); auto tm = _token_metadata.clone_only_token_map(); - std::unordered_multimap, inet_address> range_addresses = strat.get_range_addresses(tm); - std::unordered_multimap> source_ranges; + std::unordered_multimap, inet_address> range_addresses = strat.get_range_addresses(tm); + std::unordered_multimap> source_ranges; // find alive sources for our new ranges for (auto r : ranges) { @@ -2792,10 +2792,10 @@ storage_service::get_new_source_ranges(const sstring& keyspace_name, const std:: return source_ranges; } -std::pair>, std::unordered_set>> -storage_service::calculate_stream_and_fetch_ranges(const std::vector>& current, const std::vector>& updated) { - std::unordered_set> to_stream; - std::unordered_set> to_fetch; +std::pair>, std::unordered_set>> +storage_service::calculate_stream_and_fetch_ranges(const std::vector>& current, const std::vector>& updated) { + std::unordered_set> to_stream; + std::unordered_set> to_fetch; for (auto r1 : current) { bool intersect = false; @@ -2836,7 +2836,7 @@ storage_service::calculate_stream_and_fetch_ranges(const std::vector>, std::unordered_set>>(to_stream, to_fetch); + return std::pair>, std::unordered_set>>(to_stream, to_fetch); } void storage_service::range_relocator::calculate_to_from_streams(std::unordered_set new_tokens, std::vector keyspace_names) { @@ -2856,30 +2856,30 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_ auto& ks = ss._db.local().find_keyspace(keyspace); auto& strategy = ks.get_replication_strategy(); // getting collection of the currently used ranges by this keyspace - std::vector> current_ranges = ss.get_ranges_for_endpoint(keyspace, local_address); + std::vector> current_ranges = ss.get_ranges_for_endpoint(keyspace, local_address); // collection of ranges which this node will serve after move to the new token - std::vector> updated_ranges = strategy.get_pending_address_ranges(token_meta_clone, new_token, local_address); + std::vector> updated_ranges = strategy.get_pending_address_ranges(token_meta_clone, new_token, local_address); // ring ranges and endpoints associated with them // this used to determine what nodes should we ping about range data - std::unordered_multimap, inet_address> range_addresses = strategy.get_range_addresses(token_meta_clone); - std::unordered_map, std::vector> range_addresses_map; + std::unordered_multimap, inet_address> range_addresses = strategy.get_range_addresses(token_meta_clone); + std::unordered_map, std::vector> range_addresses_map; for (auto& x : range_addresses) { range_addresses_map[x.first].emplace_back(x.second); } // calculated parts of the ranges to request/stream from/to nodes in the ring // std::pair(to_stream, to_fetch) - std::pair>, std::unordered_set>> ranges_per_keyspace = + std::pair>, std::unordered_set>> ranges_per_keyspace = ss.calculate_stream_and_fetch_ranges(current_ranges, updated_ranges); /** * In this loop we are going through all ranges "to fetch" and determining * nodes in the ring responsible for data we are interested in */ - std::unordered_multimap, inet_address> ranges_to_fetch_with_preferred_endpoints; - for (range to_fetch : ranges_per_keyspace.second) { + std::unordered_multimap, inet_address> ranges_to_fetch_with_preferred_endpoints; + for (nonwrapping_range to_fetch : ranges_per_keyspace.second) { for (auto& x : range_addresses_map) { - const range& r = x.first; + const nonwrapping_range& r = x.first; std::vector& eps = x.second; if (r.contains(to_fetch, dht::token_comparator())) { std::vector endpoints; @@ -2942,9 +2942,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_ } // calculating endpoints to stream current ranges to if needed // in some situations node will handle current ranges as part of the new ranges - std::unordered_multimap> endpoint_ranges; - std::unordered_map>> endpoint_ranges_map; - for (range to_stream : ranges_per_keyspace.first) { + std::unordered_multimap> endpoint_ranges; + std::unordered_map>> endpoint_ranges_map; + for (nonwrapping_range to_stream : ranges_per_keyspace.first) { auto end_token = to_stream.end() ? to_stream.end()->value() : dht::maximum_token(); std::vector current_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone); std::vector new_endpoints = strategy.calculate_natural_endpoints(end_token, token_meta_clone_all_settled); @@ -2973,9 +2973,9 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_ } // stream requests - std::unordered_multimap> work = + std::unordered_multimap> work = dht::range_streamer::get_work_map(ranges_to_fetch_with_preferred_endpoints, keyspace); - std::unordered_map>> work_map; + std::unordered_map>> work_map; for (auto& x : work) { work_map[x.first].emplace_back(x.second); } diff --git a/service/storage_service.hh b/service/storage_service.hh index da0bdcd41a..d0da200734 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -195,7 +195,7 @@ public: } #endif public: - std::vector> get_local_ranges(const sstring& keyspace_name) { + std::vector> get_local_ranges(const sstring& keyspace_name) { return get_ranges_for_endpoint(keyspace_name, get_broadcast_address()); } #if 0 @@ -548,18 +548,18 @@ public: return map; } #endif - std::unordered_map, std::vector> get_range_to_address_map(const sstring& keyspace) const { + std::unordered_map, std::vector> get_range_to_address_map(const sstring& keyspace) const { return get_range_to_address_map(keyspace, _token_metadata.sorted_tokens()); } - std::unordered_map, std::vector> get_range_to_address_map_in_local_dc( + std::unordered_map, std::vector> get_range_to_address_map_in_local_dc( const sstring& keyspace) const { std::function filter = [this](const inet_address& address) { return is_local_dc(address); }; auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc()); - std::unordered_map, std::vector> filtered_map; + std::unordered_map, std::vector> filtered_map; for (auto entry : orig_map) { auto& addresses = filtered_map[entry.first]; addresses.reserve(entry.second.size()); @@ -585,7 +585,7 @@ public: return remote_dc == local_dc; } - std::unordered_map, std::vector> get_range_to_address_map(const sstring& keyspace, + std::unordered_map, std::vector> get_range_to_address_map(const sstring& keyspace, const std::vector& sorted_tokens) const { // some people just want to get a visual representation of things. Allow null and set it to the first // non-system keyspace. @@ -625,7 +625,7 @@ public: std::vector ranges; //Token.TokenFactory tf = getPartitioner().getTokenFactory(); - std::unordered_map, std::vector> range_to_address_map = + std::unordered_map, std::vector> range_to_address_map = include_only_local_dc ? get_range_to_address_map_in_local_dc(keyspace) : get_range_to_address_map(keyspace); @@ -650,6 +650,13 @@ public: } ranges.push_back(tr); } + // Convert to wrapping ranges + auto& rf = ranges.front(); + auto& rb = ranges.back(); + if (rf._start_token.empty() && rb._end_token.empty() && rf._endpoints == rb._endpoints) { + rf._start_token = std::move(rb._start_token); + ranges.pop_back(); + } return ranges; } @@ -681,10 +688,10 @@ public: * @param ranges * @return mapping of ranges to the replicas responsible for them. */ - std::unordered_map, std::vector> construct_range_to_endpoint_map( + std::unordered_map, std::vector> construct_range_to_endpoint_map( const sstring& keyspace, - const std::vector>& ranges) const { - std::unordered_map, std::vector> res; + const std::vector>& ranges) const { + std::unordered_map, std::vector> res; for (auto r : ranges) { res[r] = _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(r.end()->value()); } @@ -849,7 +856,7 @@ private: * @param ranges the ranges to find sources for * @return multimap of addresses to ranges the address is responsible for */ - std::unordered_multimap> get_new_source_ranges(const sstring& keyspaceName, const std::vector>& ranges); + std::unordered_multimap> get_new_source_ranges(const sstring& keyspaceName, const std::vector>& ranges); public: future<> confirm_replication(inet_address node); @@ -875,7 +882,7 @@ private: future<> restore_replica_count(inet_address endpoint, inet_address notify_endpoint); // needs to be modified to accept either a keyspace or ARS. - std::unordered_multimap, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint); + std::unordered_multimap, inet_address> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint); public: /** raw load value */ double get_load(); @@ -1692,7 +1699,7 @@ public: * @param ep endpoint we are interested in. * @return ranges for the specified endpoint. */ - std::vector> get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const { + std::vector> get_ranges_for_endpoint(const sstring& name, const gms::inet_address& ep) const { return _db.local().find_keyspace(name).get_replication_strategy().get_ranges(ep); } @@ -1702,19 +1709,18 @@ public: * ranges. * @return ranges in sorted order */ - std::vector> get_all_ranges(const std::vector& sorted_tokens) const{ + std::vector> get_all_ranges(const std::vector& sorted_tokens) const{ if (sorted_tokens.empty()) - return std::vector>(); + return std::vector>(); int size = sorted_tokens.size(); - std::vector> ranges; + std::vector> ranges; + ranges.push_back(nonwrapping_range::make_ending_with(range_bound(sorted_tokens[0], true))); for (int i = 1; i < size; ++i) { - range r(range::bound(sorted_tokens[i - 1], false), range::bound(sorted_tokens[i], true)); + nonwrapping_range r(range::bound(sorted_tokens[i - 1], false), range::bound(sorted_tokens[i], true)); ranges.push_back(r); } - range r(range::bound(sorted_tokens[size - 1], false), - range::bound(sorted_tokens[0], true)); - ranges.push_back(r); + ranges.push_back(nonwrapping_range::make_starting_with(range_bound(sorted_tokens[size-1], false))); return ranges; } @@ -2098,7 +2104,7 @@ private: * @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each * @return async Future for whether stream was success */ - future<> stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace); + future<> stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace); public: /** @@ -2109,8 +2115,8 @@ public: * @param updated collection of the ranges after token is changed * @return pair of ranges to stream/fetch for given current and updated range collections */ - std::pair>, std::unordered_set>> - calculate_stream_and_fetch_ranges(const std::vector>& current, const std::vector>& updated); + std::pair>, std::unordered_set>> + calculate_stream_and_fetch_ranges(const std::vector>& current, const std::vector>& updated); #if 0 public void bulkLoad(String directory) { diff --git a/sstables/compaction.cc b/sstables/compaction.cc index e8de2d0995..c403526b25 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -96,7 +96,7 @@ static api::timestamp_type get_max_purgeable_timestamp(schema_ptr schema, return timestamp; } -static bool belongs_to_current_node(const dht::token& t, const std::vector>& sorted_owned_ranges) { +static bool belongs_to_current_node(const dht::token& t, const std::vector>& sorted_owned_ranges) { auto low = std::lower_bound(sorted_owned_ranges.begin(), sorted_owned_ranges.end(), t, [] (const range& a, const dht::token& b) { // check that range a is before token b. @@ -104,7 +104,7 @@ static bool belongs_to_current_node(const dht::token& t, const std::vector& r = *low; + const nonwrapping_range& r = *low; return r.contains(t, dht::token_comparator()); } @@ -267,7 +267,7 @@ compact_sstables(std::vector sstables, column_family& cf, std::f info->cf = schema->cf_name(); logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg); - std::vector> owned_ranges; + std::vector> owned_ranges; if (cleanup) { owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name()); } diff --git a/sstables/partition.cc b/sstables/partition.cc index 800e694ed0..baf22e1ffc 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -1101,9 +1101,6 @@ sstable::read_range_rows(schema_ptr schema, const query::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc) { - if (query::is_wrap_around(range, *schema)) { - fail(unimplemented::cause::WRAP_AROUND); - } return std::make_unique( shared_from_this(), std::move(schema), range, slice, pc); } diff --git a/streaming/stream_plan.cc b/streaming/stream_plan.cc index 8432834af3..05f8cdf45e 100644 --- a/streaming/stream_plan.cc +++ b/streaming/stream_plan.cc @@ -44,22 +44,22 @@ namespace streaming { extern logging::logger sslog; -stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector> ranges) { +stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector> ranges) { return request_ranges(from, keyspace, std::move(ranges), {}); } -stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector> ranges, std::vector column_families) { +stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector> ranges, std::vector column_families) { _range_added = true; auto session = _coordinator->get_or_create_session(from); session->add_stream_request(keyspace, std::move(ranges), std::move(column_families)); return *this; } -stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges) { +stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges) { return transfer_ranges(to, keyspace, std::move(ranges), {}); } -stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges, std::vector column_families) { +stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges, std::vector column_families) { _range_added = true; auto session = _coordinator->get_or_create_session(to); session->add_transfer_ranges(keyspace, std::move(ranges), std::move(column_families)); diff --git a/streaming/stream_plan.hh b/streaming/stream_plan.hh index 4ded649574..90959e2942 100644 --- a/streaming/stream_plan.hh +++ b/streaming/stream_plan.hh @@ -90,7 +90,7 @@ public: * @param ranges ranges to fetch * @return this object for chaining */ - stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector> ranges); + stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector> ranges); /** * Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node. @@ -102,7 +102,7 @@ public: * @param columnFamilies specific column families * @return this object for chaining */ - stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector> ranges, std::vector column_families); + stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector> ranges, std::vector column_families); /** * Add transfer task to send data of specific keyspace and ranges. @@ -113,7 +113,7 @@ public: * @param ranges ranges to send * @return this object for chaining */ - stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges); + stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges); /** * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. @@ -125,7 +125,7 @@ public: * @param columnFamilies specific column families * @return this object for chaining */ - stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges, std::vector column_families); + stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges, std::vector column_families); stream_plan& listeners(std::vector handlers); public: diff --git a/streaming/stream_request.hh b/streaming/stream_request.hh index 5f28404f01..d5a608a2b4 100644 --- a/streaming/stream_request.hh +++ b/streaming/stream_request.hh @@ -39,8 +39,9 @@ #pragma once #include "core/sstring.hh" -#include "query-request.hh" +#include "range.hh" #include "dht/i_partitioner.hh" +#include "partition_range_compat.hh" #include namespace streaming { @@ -49,14 +50,21 @@ class stream_request { public: using token = dht::token; sstring keyspace; - std::vector> ranges; + std::vector> ranges; + // For compatibility with <= 1.5, we send wrapping ranges (though they will never wrap). + std::vector> ranges_compat() const { + return compat::wrap(ranges); + } std::vector column_families; stream_request() = default; - stream_request(sstring _keyspace, std::vector> _ranges, std::vector _column_families) + stream_request(sstring _keyspace, std::vector> _ranges, std::vector _column_families) : keyspace(std::move(_keyspace)) , ranges(std::move(_ranges)) , column_families(std::move(_column_families)) { } + stream_request(sstring _keyspace, std::vector> _ranges, std::vector _column_families) + : stream_request(std::move(_keyspace), compat::unwrap(std::move(_ranges)), std::move(_column_families)) { + } friend std::ostream& operator<<(std::ostream& os, const stream_request& r); }; diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index ed72823e34..991ea760be 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -146,7 +146,7 @@ void stream_session::init_messaging_service_handler() { }); }); }); - ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id) { + ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, std::vector> ranges, UUID cf_id, unsigned dst_cpu_id) { const auto& from = cinfo.retrieve_auxiliary("baddr"); return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable { auto session = get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id); @@ -415,7 +415,7 @@ std::vector stream_session::get_column_family_stores(const sstri return stores; } -void stream_session::add_transfer_ranges(sstring keyspace, std::vector> ranges, std::vector column_families) { +void stream_session::add_transfer_ranges(sstring keyspace, std::vector> ranges, std::vector column_families) { auto cfs = get_column_family_stores(keyspace, column_families); for (auto& cf : cfs) { auto cf_id = cf->schema()->id(); diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 085ebc14e6..fc253d58ac 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -238,7 +238,7 @@ public: * @param ranges Ranges to retrieve data * @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace. */ - void add_stream_request(sstring keyspace, std::vector> ranges, std::vector column_families) { + void add_stream_request(sstring keyspace, std::vector> ranges, std::vector column_families) { _requests.emplace_back(std::move(keyspace), std::move(ranges), std::move(column_families)); } @@ -253,7 +253,7 @@ public: * @param flushTables flush tables? * @param repairedAt the time the repair started. */ - void add_transfer_ranges(sstring keyspace, std::vector> ranges, std::vector column_families); + void add_transfer_ranges(sstring keyspace, std::vector> ranges, std::vector column_families); std::vector get_column_family_stores(const sstring& keyspace, const std::vector& column_families); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index a4bf0c5ab5..bdbfc6eb2a 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -55,7 +55,7 @@ namespace streaming { extern logging::logger sslog; -stream_transfer_task::stream_transfer_task(shared_ptr session, UUID cf_id, std::vector> ranges, long total_size) +stream_transfer_task::stream_transfer_task(shared_ptr session, UUID cf_id, std::vector> ranges, long total_size) : stream_task(session, cf_id) , _ranges(std::move(ranges)) , _total_size(total_size) { @@ -171,7 +171,7 @@ void stream_transfer_task::start() { }); } -void stream_transfer_task::append_ranges(const std::vector>& ranges) { +void stream_transfer_task::append_ranges(const std::vector>& ranges) { _ranges.insert(_ranges.end(), ranges.begin(), ranges.end()); } diff --git a/streaming/stream_transfer_task.hh b/streaming/stream_transfer_task.hh index 9aff7d23ec..c1f3712346 100644 --- a/streaming/stream_transfer_task.hh +++ b/streaming/stream_transfer_task.hh @@ -57,12 +57,12 @@ private: int32_t sequence_number = 0; bool aborted = false; // A stream_transfer_task always contains the same range to stream - std::vector> _ranges; + std::vector> _ranges; long _total_size; public: using UUID = utils::UUID; stream_transfer_task(stream_transfer_task&&) = default; - stream_transfer_task(shared_ptr session, UUID cf_id, std::vector> ranges, long total_size = 0); + stream_transfer_task(shared_ptr session, UUID cf_id, std::vector> ranges, long total_size = 0); ~stream_transfer_task(); public: virtual void abort() override { @@ -78,7 +78,7 @@ public: void start(); - void append_ranges(const std::vector>& ranges); + void append_ranges(const std::vector>& ranges); }; } // namespace streaming diff --git a/tests/mutation_source_test.cc b/tests/mutation_source_test.cc index 57748c5f8e..7c88af3c8f 100644 --- a/tests/mutation_source_test.cc +++ b/tests/mutation_source_test.cc @@ -68,7 +68,7 @@ static void test_range_queries(populate_fn populate) { auto ds = populate(s, partitions); - auto test_slice = [&] (query::range r) { + auto test_slice = [&] (nonwrapping_range r) { BOOST_TEST_MESSAGE(sprint("Testing range %s", r)); assert_that(ds(s, r)) .produces(slice(partitions, r)) diff --git a/tests/range_test.cc b/tests/range_test.cc index fb5d8a9b9d..c2eeff50b1 100644 --- a/tests/range_test.cc +++ b/tests/range_test.cc @@ -49,12 +49,6 @@ static bool includes_token(const schema& s, const query::partition_range& r, const dht::token& tok) { dht::ring_position_comparator cmp(s); - if (r.is_wrap_around(cmp)) { - auto sub = r.unwrap(); - return includes_token(s, sub.first, tok) - || includes_token(s, sub.second, tok); - } - return !r.before(dht::ring_position(tok, dht::ring_position::token_bound::end), cmp) && !r.after(dht::ring_position(tok, dht::ring_position::token_bound::start), cmp); } @@ -73,17 +67,6 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) { auto key2 = dht::decorated_key{tok, partition_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("key2"))))}; - { - auto r = query::partition_range::make( - dht::ring_position(key2), - dht::ring_position(key1)); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - } - { auto r = query::partition_range::make( dht::ring_position(key1), @@ -92,18 +75,6 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) { BOOST_REQUIRE(includes_token(*s, r, tok)); BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(!r.is_wrap_around(dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position(key2), false}, - {dht::ring_position(key1), false}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(!r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(!r.contains(key2, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); } { @@ -114,7 +85,6 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) { BOOST_REQUIRE(includes_token(*s, r, tok)); BOOST_REQUIRE(!r.contains(key1, dht::ring_position_comparator(*s))); BOOST_REQUIRE(!r.contains(key2, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(!r.is_wrap_around(dht::ring_position_comparator(*s))); } { @@ -125,40 +95,6 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) { BOOST_REQUIRE(includes_token(*s, r, tok)); BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(!r.is_wrap_around(dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position::ending_at(tok), false}, - {dht::ring_position(key2), true}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position::ending_at(tok), false}, - {dht::ring_position(key1), true}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(!r.contains(key2, dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position(key1), false}, - {dht::ring_position::starting_at(tok), true}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(!r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); } { @@ -167,124 +103,10 @@ BOOST_AUTO_TEST_CASE(test_range_with_positions_within_the_same_token) { {dht::ring_position::ending_at(tok), true}); BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(!r.is_wrap_around(dht::ring_position_comparator(*s))); BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); } - { - auto r = query::partition_range::make( - {dht::ring_position::ending_at(tok), true}, - {dht::ring_position::starting_at(tok), true}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(!r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(!r.contains(key2, dht::ring_position_comparator(*s))); - } -} - -BOOST_AUTO_TEST_CASE(test_range_with_equal_value_but_opposite_inclusiveness_is_a_full_wrap_around) { - auto s = schema_builder("ks", "cf") - .with_column("key", bytes_type, column_kind::partition_key) - .with_column("v", bytes_type) - .build(); - - dht::token tok = dht::global_partitioner().get_random_token(); - - auto key1 = dht::decorated_key{ - tok, partition_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("key1"))))}; - - auto key2 = dht::decorated_key{ - tok, partition_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("key2"))))}; - - { - auto r = query::partition_range::make( - {dht::ring_position::starting_at(tok), true}, - {dht::ring_position::starting_at(tok), false}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position::starting_at(tok), false}, - {dht::ring_position::starting_at(tok), true}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position::ending_at(tok), false}, - {dht::ring_position::ending_at(tok), true}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position::ending_at(tok), true}, - {dht::ring_position::ending_at(tok), false}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position(key1), true}, - {dht::ring_position(key1), false}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position(key1), false}, - {dht::ring_position(key1), true}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position(key1), false}, - {dht::ring_position(key1), false}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(!r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key2, dht::ring_position_comparator(*s))); - } - - { - auto r = query::partition_range::make( - {dht::ring_position(key2), false}, - {dht::ring_position(key2), false}); - - BOOST_REQUIRE(includes_token(*s, r, tok)); - BOOST_REQUIRE(r.is_wrap_around(dht::ring_position_comparator(*s))); - BOOST_REQUIRE(r.contains(key1, dht::ring_position_comparator(*s))); - BOOST_REQUIRE(!r.contains(key2, dht::ring_position_comparator(*s))); - } } BOOST_AUTO_TEST_CASE(test_range_contains) { diff --git a/tests/row_cache_test.cc b/tests/row_cache_test.cc index 7a47fd0ded..50517489eb 100644 --- a/tests/row_cache_test.cc +++ b/tests/row_cache_test.cc @@ -1141,50 +1141,6 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) { } -SEASTAR_TEST_CASE(test_invalidate_works_with_wrap_arounds) { - return seastar::async([] { - auto s = make_schema(); - auto mt = make_lw_shared(s); - - cache_tracker tracker; - row_cache cache(s, mt->as_data_source(), tracker); - - std::vector ring = make_ring(s, 8); - - for (auto& m : ring) { - cache.populate(m); - } - - for (auto& m : ring) { - verify_has(cache, m.decorated_key()); - } - - // wrap-around - cache.invalidate(query::partition_range({ring[6].ring_position()}, {ring[1].ring_position()})).get(); - - verify_does_not_have(cache, ring[0].decorated_key()); - verify_does_not_have(cache, ring[1].decorated_key()); - verify_has(cache, ring[2].decorated_key()); - verify_has(cache, ring[3].decorated_key()); - verify_has(cache, ring[4].decorated_key()); - verify_has(cache, ring[5].decorated_key()); - verify_does_not_have(cache, ring[6].decorated_key()); - verify_does_not_have(cache, ring[7].decorated_key()); - - // not wrap-around - cache.invalidate(query::partition_range({ring[3].ring_position()}, {ring[4].ring_position()})).get(); - - verify_does_not_have(cache, ring[0].decorated_key()); - verify_does_not_have(cache, ring[1].decorated_key()); - verify_has(cache, ring[2].decorated_key()); - verify_does_not_have(cache, ring[3].decorated_key()); - verify_does_not_have(cache, ring[4].decorated_key()); - verify_has(cache, ring[5].decorated_key()); - verify_does_not_have(cache, ring[6].decorated_key()); - verify_does_not_have(cache, ring[7].decorated_key()); - }); -} - SEASTAR_TEST_CASE(test_mvcc) { return seastar::async([] { auto no_difference = [] (auto& m1, auto& m2) { diff --git a/tests/sstable_mutation_test.cc b/tests/sstable_mutation_test.cc index c8e188f82b..d2a473ff78 100644 --- a/tests/sstable_mutation_test.cc +++ b/tests/sstable_mutation_test.cc @@ -310,7 +310,7 @@ future<> test_range_reads(const dht::token& min, const dht::token& max, std::vec auto count = make_lw_shared(0); auto expected_size = expected.size(); auto stop = make_lw_shared(false); - return do_with(query::range::make(dht::ring_position::starting_at(min), + return do_with(nonwrapping_range::make(dht::ring_position::starting_at(min), dht::ring_position::ending_at(max)), [&, sstp, s] (auto& pr) { auto mutations = sstp->read_range_rows(s, pr); return do_until([stop] { return *stop; }, diff --git a/tests/storage_proxy_test.cc b/tests/storage_proxy_test.cc index 8fd5d83ee7..483b325611 100644 --- a/tests/storage_proxy_test.cc +++ b/tests/storage_proxy_test.cc @@ -80,12 +80,6 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) { check(tm, query::partition_range({ring[2]}, {ring[3]}), { query::partition_range({ring[2]}, {ring[3]}) }); - - check(tm, query::partition_range({ring[4]}, {ring[2]}), { - query::partition_range({ring[4]}, {}), - query::partition_range({}, {dht::ring_position::ending_at(dht::minimum_token())}), - query::partition_range({{dht::ring_position::ending_at(dht::minimum_token()), false}}, {ring[2]}) - }); } { @@ -118,13 +112,6 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) { query::partition_range({{ring[2], false}}, {dht::ring_position::ending_at(ring[2].token())}), query::partition_range({{dht::ring_position::ending_at(ring[2].token()), false}}, {ring[3]}) }); - - check(tm, query::partition_range({ring[4]}, {ring[3]}), { - query::partition_range({ring[4]}, {dht::ring_position::ending_at(ring[5].token())}), - query::partition_range({{dht::ring_position::ending_at(ring[5].token()), false}}, {}), - query::partition_range({}, {dht::ring_position::ending_at(ring[2].token())}), - query::partition_range({{dht::ring_position::ending_at(ring[2].token()), false}}, {ring[3]}), - }); } }); });