diff --git a/api/storage_service.cc b/api/storage_service.cc index abcb2e53ea..cb18864642 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -366,11 +366,11 @@ void set_storage_service(http_context& ctx, routes& r) { }); }); - ss::move.set(r, [](std::unique_ptr req) { - //TBD - unimplemented(); + ss::move.set(r, [] (std::unique_ptr req) { auto new_token = req->get_query_param("new_token"); - return make_ready_future(json_void()); + return service::get_local_storage_service().move(new_token).then([] { + return make_ready_future(json_void()); + }); }); ss::remove_node.set(r, [](std::unique_ptr req) { diff --git a/db/query_context.hh b/db/query_context.hh index 8c4386bceb..ee69b22004 100644 --- a/db/query_context.hh +++ b/db/query_context.hh @@ -67,9 +67,8 @@ extern std::unique_ptr qctx; // we executed the query, and return an empty result template static future<::shared_ptr> execute_cql(sstring text, Args&&... args) { - if (qctx) { - return qctx->execute_cql(text, std::forward(args)...); - } - return make_ready_future>(::make_shared(cql3::untyped_result_set::make_empty())); + assert(qctx); + return qctx->execute_cql(text, std::forward(args)...); } + } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 9024e51a31..5aa11cca15 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -486,9 +486,7 @@ future<> init_local_cache() { } void minimal_setup(distributed& db, distributed& qp) { - auto new_ctx = std::make_unique(db, qp); - qctx.swap(new_ctx); - assert(!new_ctx); + qctx = std::make_unique(db, qp); } future<> setup(distributed& db, distributed& qp) { @@ -817,10 +815,7 @@ future<> update_tokens(std::unordered_set tokens) { } future<> force_blocking_flush(sstring cfname) { - if (!qctx) { - return make_ready_future<>(); - } - + assert(qctx); return qctx->_db.invoke_on_all([cfname = std::move(cfname)](database& db) { // if (!Boolean.getBoolean("cassandra.unsafesystem")) column_family& cf = db.find_column_family(NAME, cfname); diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 874d9e4cd5..8595a6a088 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -41,6 +41,7 @@ #include "locator/snitch_base.hh" #include "database.hh" #include "gms/gossiper.hh" +#include "gms/failure_detector.hh" #include "log.hh" #include "streaming/stream_plan.hh" #include "streaming/stream_state.hh" @@ -55,14 +56,7 @@ static std::unordered_map, std::unordered_set> unordered_multimap_to_unordered_map(const std::unordered_multimap, inet_address>& multimap) { std::unordered_map, std::unordered_set> ret; for (auto x : multimap) { - auto& range_token = x.first; - auto& ep = x.second; - auto it = ret.find(range_token); - if (it != ret.end()) { - it->second.emplace(ep); - } else { - ret.emplace(range_token, std::unordered_set{ep}); - } + ret[x.first].emplace(x.second); } return ret; } @@ -166,23 +160,24 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n for (auto& x : range_addresses) { const range& src_range = x.first; if (src_range.contains(desired_range, dht::tri_compare)) { - auto old_endpoints = x.second; + std::vector old_endpoints(x.second.begin(), x.second.end()); auto it = pending_range_addresses.find(desired_range); - assert (it != pending_range_addresses.end()); - auto new_endpoints = it->second; + if (it == pending_range_addresses.end()) { + throw std::runtime_error(sprint("Can not find desired_range = {} in pending_range_addresses", desired_range)); + } + std::unordered_set new_endpoints = it->second; //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. //So we need to be careful to only be strict when endpoints == RF if (old_endpoints.size() == strat.get_replication_factor()) { - std::unordered_set diff; - std::set_difference(old_endpoints.begin(), old_endpoints.end(), - new_endpoints.begin(), new_endpoints.end(), std::inserter(diff, diff.begin())); - old_endpoints = std::move(diff); + auto it = std::remove_if(old_endpoints.begin(), old_endpoints.end(), + [&new_endpoints] (inet_address ep) { return new_endpoints.count(ep); }); + old_endpoints.erase(it, old_endpoints.end()); if (old_endpoints.size() != 1) { - throw std::runtime_error(sprint("Expected 1 endpoint but found ", old_endpoints.size())); + throw std::runtime_error(sprint("Expected 1 endpoint but found %d", old_endpoints.size())); } } - range_sources.emplace(desired_range, *(old_endpoints.begin())); + range_sources.emplace(desired_range, old_endpoints.front()); } } @@ -268,4 +263,13 @@ future range_streamer::fetch_async() { return _stream_plan.execute(); } +std::unordered_multimap> +range_streamer::get_work_map(const std::unordered_multimap, inet_address>& ranges_with_source_target, + const sstring& keyspace) { + auto filter = std::make_unique(gms::get_local_failure_detector()); + std::unordered_set> source_filters; + source_filters.emplace(std::move(filter)); + return get_range_fetch_map(ranges_with_source_target, source_filters, keyspace); +} + } // dht diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index e747d9d23e..7bbebfbb5e 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -148,11 +148,11 @@ private: const std::unordered_set>& source_filters, const sstring& keyspace); +public: + static std::unordered_multimap> + get_work_map(const std::unordered_multimap, inet_address>& ranges_with_source_target, + const sstring& keyspace); #if 0 - public static Multimap> getWorkMap(Multimap, InetAddress> rangesWithSourceTarget, String keyspace) - { - return getRangeFetchMap(rangesWithSourceTarget, Collections.singleton(new FailureDetectorSourceFilter(FailureDetector.instance)), keyspace); - } // For testing purposes Multimap>>> toFetch() diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 3107d3a26e..ac575e8f6d 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -399,6 +399,8 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata); auto new_endpoints = strategy.calculate_natural_endpoints(t, all_left_metadata); std::vector diff; + std::sort(current_endpoints.begin(), current_endpoints.end()); + std::sort(new_endpoints.begin(), new_endpoints.end()); std::set_difference(new_endpoints.begin(), new_endpoints.end(), current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff)); for (auto& ep : diff) { @@ -468,6 +470,25 @@ void token_metadata::add_leaving_endpoint(inet_address endpoint) { _leaving_endpoints.emplace(endpoint); } +token_metadata token_metadata::clone_after_all_settled() { + token_metadata metadata = clone_only_token_map(); + + for (auto endpoint : _leaving_endpoints) { + metadata.remove_endpoint(endpoint); + } + + + for (auto x : _moving_endpoints) { + metadata.update_normal_token(x.first, x.second); + } + + return metadata; +} + +void token_metadata::add_moving_endpoint(token t, inet_address endpoint) { + _moving_endpoints[t] = endpoint; +} + /////////////////// class topology ///////////////////////////////////////////// inline void topology::clear() { _dc_endpoints.clear(); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 6bcc8f1c8c..382235fb12 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -473,29 +473,14 @@ public: void remove_bootstrap_tokens(std::unordered_set tokens); void add_leaving_endpoint(inet_address endpoint); +public: -#if 0 /** * Add a new moving endpoint * @param token token which is node moving to * @param endpoint address of the moving node */ - public void addMovingEndpoint(Token token, InetAddress endpoint) - { - assert endpoint != null; - - lock.writeLock().lock(); - - try - { - _moving_endpoints.add(Pair.create(token, endpoint)); - } - finally - { - lock.writeLock().unlock(); - } - } -#endif + void add_moving_endpoint(token t, inet_address endpoint); public: void remove_endpoint(inet_address endpoint); @@ -597,36 +582,15 @@ public: return all_left_metadata; } -#if 0 +public: /** * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all * current leave, and move operations have finished. * * @return new token metadata */ - public TokenMetadata cloneAfterAllSettled() - { - lock.readLock().lock(); - - try - { - TokenMetadata metadata = cloneOnlyTokenMap(); - - for (InetAddress endpoint : _leaving_endpoints) - metadata.removeEndpoint(endpoint); - - - for (Pair pair : _moving_endpoints) - metadata.updateNormalToken(pair.left, pair.right); - - return metadata; - } - finally - { - lock.readLock().unlock(); - } - } - + token_metadata clone_after_all_settled(); +#if 0 public InetAddress getEndpoint(Token token) { lock.readLock().lock(); diff --git a/range.hh b/range.hh index 4025690a87..e64c7a193d 100644 --- a/range.hh +++ b/range.hh @@ -23,6 +23,7 @@ #include #include +#include // A range which can have inclusive, exclusive or open-ended bounds on each end. template @@ -61,6 +62,36 @@ public: , _singular(true) { } range() : range({}, {}) {} +private: + // Bound wrappers for compile-time dispatch and safety. + struct start_bound_ref { const optional& b; }; + struct end_bound_ref { const optional& b; }; + + start_bound_ref start_bound() const { return { start() }; } + end_bound_ref end_bound() const { return { end() }; } + + template + static bool greater_than_or_equal(end_bound_ref end, start_bound_ref start, Comparator&& cmp) { + return !end.b || !start.b || cmp(end.b->value(), start.b->value()) + >= (!end.b->is_inclusive() || !start.b->is_inclusive()); + } + + template + static bool less_than(end_bound_ref end, start_bound_ref start, Comparator&& cmp) { + return !greater_than_or_equal(end, start, cmp); + } + + template + static bool less_than_or_equal(start_bound_ref first, start_bound_ref second, Comparator&& cmp) { + return !first.b || (second.b && cmp(first.b->value(), second.b->value()) + <= -(!first.b->is_inclusive() && second.b->is_inclusive())); + } + + template + static bool greater_than_or_equal(end_bound_ref first, end_bound_ref second, Comparator&& cmp) { + return !first.b || (second.b && cmp(first.b->value(), second.b->value()) + >= (!first.b->is_inclusive() && second.b->is_inclusive())); + } public: // the point is before the range (works only for non wrapped ranges) // Comparator must define a total ordering on T. @@ -122,17 +153,8 @@ public: return true; } - // check if end is greater than or equal to start, taking into account if either is inclusive. - auto greater_than_or_equal = [cmp] (const optional& end, const optional& start) { - // !start means -inf, whereas !end means +inf - if (!end || !start) { - return true; - } - return cmp(end->value(), start->value()) - >= (!end->is_inclusive() || !start->is_inclusive()); - }; - - return greater_than_or_equal(end(), other.start()) && greater_than_or_equal(other.end(), start()); + return greater_than_or_equal(end_bound(), other.start_bound(), cmp) + && greater_than_or_equal(other.end_bound(), start_bound(), cmp); } static range make(bound start, bound end) { return range({std::move(start)}, {std::move(end)}); @@ -180,11 +202,12 @@ public: } } // Converts a wrap-around range to two non-wrap-around ranges. + // The returned ranges are not overlapping and ordered. // Call only when is_wrap_around(). std::pair unwrap() const { return { - { start(), {} }, - { {}, end() } + { {}, end() }, + { start(), {} } }; } // the point is inside the range @@ -214,12 +237,8 @@ public: } if (!this_wraps && !other_wraps) { - return (!start() || (other.start() - && cmp(start()->value(), other.start()->value()) - <= -(!start()->is_inclusive() && other.start()->is_inclusive()))) - && (!end() || (other.end() - && cmp(end()->value(), other.end()->value()) - >= (!end()->is_inclusive() && other.end()->is_inclusive()))); + return less_than_or_equal(start_bound(), other.start_bound(), cmp) + && greater_than_or_equal(end_bound(), other.end_bound(), cmp); } if (other_wraps) { // && !this_wraps @@ -232,6 +251,49 @@ public: || (other.end() && cmp(end()->value(), other.end()->value()) >= (!end()->is_inclusive() && other.end()->is_inclusive())); } + // Returns ranges which cover all values covered by this range but not covered by the other range. + // Ranges are not overlapping and ordered. + // Comparator must define a total ordering on T. + template + std::vector subtract(const range& other, Comparator&& cmp) const { + std::vector result; + + auto this_wraps = is_wrap_around(cmp); + auto other_wraps = other.is_wrap_around(cmp); + + if (this_wraps && other_wraps) { + auto this_unwrapped = unwrap(); + auto other_unwrapped = other.unwrap(); + boost::copy(this_unwrapped.first.subtract(other_unwrapped.first, cmp), std::back_inserter(result)); + boost::copy(this_unwrapped.second.subtract(other_unwrapped.second, cmp), std::back_inserter(result)); + } else if (this_wraps) { + auto this_unwrapped = unwrap(); + boost::copy(this_unwrapped.first.subtract(other, cmp), std::back_inserter(result)); + boost::copy(this_unwrapped.second.subtract(other, cmp), std::back_inserter(result)); + } else if (other_wraps) { + auto other_unwrapped = other.unwrap(); + for (auto &&r : subtract(other_unwrapped.first, cmp)) { + boost::copy(r.subtract(other_unwrapped.second, cmp), std::back_inserter(result)); + } + } else { + if (less_than(end_bound(), other.start_bound(), cmp) + || less_than(other.end_bound(), start_bound(), cmp)) { + // Not overlapping + result.push_back(*this); + } else { + // Overlapping + if (!less_than_or_equal(other.start_bound(), start_bound(), cmp)) { + result.push_back({start(), bound(other.start()->value(), !other.start()->is_inclusive())}); + } + if (!greater_than_or_equal(other.end_bound(), end_bound(), cmp)) { + result.push_back({bound(other.end()->value(), !other.end()->is_inclusive()), end()}); + } + } + } + + // TODO: Merge adjacent ranges (optimization) + return result; + } // split range in two around a split_point. split_point has to be inside the range // split_point will belong to first range // Comparator must define a total ordering on T. diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index e7c92fc444..34e3f44138 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2837,8 +2837,8 @@ storage_proxy::make_local_reader(utils::UUID cf_id, const query::partition_range auto unwrapped = range.unwrap(); std::vector both; both.reserve(2); - both.push_back(make_local_reader(cf_id, unwrapped.second)); both.push_back(make_local_reader(cf_id, unwrapped.first)); + both.push_back(make_local_reader(cf_id, unwrapped.second)); return make_joining_reader(std::move(both)); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 0b93fcdb0d..5aa67dc473 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -611,9 +611,7 @@ void storage_service::handle_state_moving(inet_address endpoint, std::vector= 2); auto token = dht::global_partitioner().from_sstring(pieces[1]); logger.debug("Node {} state moving, new token {}", endpoint, token); -#if 0 - _token_metadata.addMovingEndpoint(token, endpoint); -#endif + _token_metadata.add_moving_endpoint(token, endpoint); get_local_pending_range_calculator_service().update().get(); } @@ -741,7 +739,7 @@ void storage_service::on_remove(gms::inet_address endpoint) { } void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) { - logger.debug("on_restart endpoint={}", endpoint); + logger.debug("on_dead endpoint={}", endpoint); #if 0 MessagingService.instance().convict(endpoint); #endif @@ -754,11 +752,10 @@ void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state st void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) { logger.debug("on_restart endpoint={}", endpoint); -#if 0 // If we have restarted before the node was even marked down, we need to reset the connection pool - if (state.isAlive()) - onDead(endpoint, state); -#endif + if (state.is_alive()) { + on_dead(endpoint, state); + } } // Runs inside seastar::async context @@ -847,8 +844,7 @@ void storage_service::set_tokens(std::unordered_set tokens) { logger.debug("Setting tokens to {}", tokens); db::system_keyspace::update_tokens(tokens).get(); _token_metadata.update_normal_tokens(tokens, get_broadcast_address()); - // Collection localTokens = getLocalTokens(); - auto local_tokens = _bootstrap_tokens; + auto local_tokens = get_local_tokens(); auto& gossiper = gms::get_local_gossiper(); gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens)).get(); gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.normal(local_tokens)).get(); @@ -1473,15 +1469,19 @@ future<> storage_service::start_rpc_server() { }); } +future<> storage_service::do_stop_rpc_server() { + auto tserver = _thrift_server; + _thrift_server = {}; + if (tserver) { + // FIXME: thrift_server::stop() doesn't kill existing connections and wait for them + return tserver->stop(); + } + return make_ready_future<>(); +} + future<> storage_service::stop_rpc_server() { return run_with_write_api_lock([] (storage_service& ss) { - auto tserver = ss._thrift_server; - ss._thrift_server = {}; - if (tserver) { - // FIXME: thrift_server::stop() doesn't kill existing connections and wait for them - return tserver->stop(); - } - return make_ready_future<>(); + return ss.do_stop_rpc_server(); }); } @@ -1518,15 +1518,19 @@ future<> storage_service::start_native_transport() { }); } +future<> storage_service::do_stop_native_transport() { + auto cserver = _cql_server; + _cql_server = {}; + if (cserver) { + // FIXME: cql_server::stop() doesn't kill existing connections and wait for them + return cserver->stop(); + } + return make_ready_future<>(); +} + future<> storage_service::stop_native_transport() { return run_with_write_api_lock([] (storage_service& ss) { - auto cserver = ss._cql_server; - ss._cql_server = {}; - if (cserver) { - // FIXME: cql_server::stop() doesn't kill existing connections and wait for them - return cserver->stop(); - } - return make_ready_future<>(); + return ss.do_stop_native_transport(); }); } @@ -1627,7 +1631,7 @@ future<> storage_service::remove_node(sstring host_id_string) { // to take responsibility for new range) std::unordered_multimap, inet_address> changed_ranges = ss.get_changed_ranges_for_leaving(keyspace_name, endpoint); - auto fd = gms::get_local_failure_detector(); + auto& fd = gms::get_local_failure_detector(); for (auto& x: changed_ranges) { auto ep = x.second; if (fd.is_alive(ep)) { @@ -2236,7 +2240,7 @@ shared_ptr& storage_service::get_load_broadcaster() { } future<> storage_service::shutdown_client_servers() { - return stop_rpc_server().then([this] { return stop_native_transport(); }); + return do_stop_rpc_server().then([this] { return do_stop_native_transport(); }); } std::unordered_multimap> @@ -2271,5 +2275,262 @@ storage_service::get_new_source_ranges(const sstring& keyspace_name, const std:: return source_ranges; } +std::pair>, std::unordered_set>> +storage_service::calculate_stream_and_fetch_ranges(const std::vector>& current, const std::vector>& updated) { + std::unordered_set> to_stream; + std::unordered_set> to_fetch; + + for (auto r1 : current) { + bool intersect = false; + for (auto r2 : updated) { + if (r1.overlaps(r2, dht::token_comparator())) { + // adding difference ranges to fetch from a ring + for (auto r : r1.subtract(r2, dht::token_comparator())) { + to_stream.emplace(r); + } + intersect = true; + } + } + if (!intersect) { + to_stream.emplace(r1); // should seed whole old range + } + } + + for (auto r2 : updated) { + bool intersect = false; + for (auto r1 : current) { + if (r2.overlaps(r1, dht::token_comparator())) { + // adding difference ranges to fetch from a ring + for (auto r : r2.subtract(r1, dht::token_comparator())) { + to_fetch.emplace(r); + } + intersect = true; + } + } + if (!intersect) { + to_fetch.emplace(r2); // should fetch whole old range + } + } + + if (logger.is_enabled(logging::log_level::debug)) { + logger.debug("current = {}", current); + logger.debug("updated = {}", updated); + logger.debug("to_stream = {}", to_stream); + logger.debug("to_fetch = {}", to_fetch); + } + + return std::pair>, std::unordered_set>>(to_stream, to_fetch); +} + +void storage_service::range_relocator::calculate_to_from_streams(std::unordered_set new_tokens, std::vector keyspace_names) { + auto& ss = get_local_storage_service(); + + auto local_address = ss.get_broadcast_address(); + auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); + + auto token_meta_clone_all_settled = ss._token_metadata.clone_after_all_settled(); + // clone to avoid concurrent modification in calculateNaturalEndpoints + auto token_meta_clone = ss._token_metadata.clone_only_token_map(); + + for (auto keyspace : keyspace_names) { + logger.debug("Calculating ranges to stream and request for keyspace {}", keyspace); + for (auto new_token : new_tokens) { + // replication strategy of the current keyspace (aka table) + auto& ks = ss._db.local().find_keyspace(keyspace); + auto& strategy = ks.get_replication_strategy(); + // getting collection of the currently used ranges by this keyspace + std::vector> current_ranges = ss.get_ranges_for_endpoint(keyspace, local_address); + // collection of ranges which this node will serve after move to the new token + std::vector> updated_ranges = strategy.get_pending_address_ranges(token_meta_clone, new_token, local_address); + + // ring ranges and endpoints associated with them + // this used to determine what nodes should we ping about range data + std::unordered_multimap, inet_address> range_addresses = strategy.get_range_addresses(token_meta_clone); + std::unordered_map, std::vector> range_addresses_map; + for (auto& x : range_addresses) { + range_addresses_map[x.first].emplace_back(x.second); + } + + // calculated parts of the ranges to request/stream from/to nodes in the ring + // std::pair(to_stream, to_fetch) + std::pair>, std::unordered_set>> ranges_per_keyspace = + ss.calculate_stream_and_fetch_ranges(current_ranges, updated_ranges); + /** + * In this loop we are going through all ranges "to fetch" and determining + * nodes in the ring responsible for data we are interested in + */ + std::unordered_multimap, inet_address> ranges_to_fetch_with_preferred_endpoints; + for (range to_fetch : ranges_per_keyspace.second) { + for (auto& x : range_addresses_map) { + const range& r = x.first; + std::vector& eps = x.second; + if (r.contains(to_fetch, dht::token_comparator())) { + std::vector endpoints; + if (dht::range_streamer::use_strict_consistency()) { + std::vector old_endpoints = eps; + std::vector new_endpoints = strategy.calculate_natural_endpoints(to_fetch.end()->value(), token_meta_clone_all_settled); + + //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. + //So we need to be careful to only be strict when endpoints == RF + if (old_endpoints.size() == strategy.get_replication_factor()) { + for (auto n : new_endpoints) { + auto beg = old_endpoints.begin(); + auto end = old_endpoints.end(); + old_endpoints.erase(std::remove(beg, end, n), end); + } + //No relocation required + if (old_endpoints.empty()) { + continue; + } + + if (old_endpoints.size() != 1) { + throw std::runtime_error(sprint("Expected 1 endpoint but found %d", old_endpoints.size())); + } + } + endpoints.emplace_back(old_endpoints.front()); + } else { + std::unordered_set eps_set(eps.begin(), eps.end()); + endpoints = snitch->get_sorted_list_by_proximity(local_address, eps_set); + } + + // storing range and preferred endpoint set + for (auto ep : endpoints) { + ranges_to_fetch_with_preferred_endpoints.emplace(to_fetch, ep); + } + } + } + + std::vector address_list; + auto rg = ranges_to_fetch_with_preferred_endpoints.equal_range(to_fetch); + for (auto it = rg.first; it != rg.second; it++) { + address_list.push_back(it->second); + } + + if (address_list.empty()) { + continue; + } + + if (dht::range_streamer::use_strict_consistency()) { + if (address_list.size() > 1) { + throw std::runtime_error(sprint("Multiple strict sources found for %s", to_fetch)); + } + + auto source_ip = address_list.front(); + auto& gossiper = gms::get_local_gossiper(); + auto state = gossiper.get_endpoint_state_for_endpoint(source_ip); + if (gossiper.is_enabled() && state && !state->is_alive()) + throw std::runtime_error(sprint("A node required to move the data consistently is down (%s). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false", source_ip)); + } + } + // calculating endpoints to stream current ranges to if needed + // in some situations node will handle current ranges as part of the new ranges + std::unordered_multimap> endpoint_ranges; + std::unordered_map>> endpoint_ranges_map; + for (range to_stream : ranges_per_keyspace.first) { + std::vector current_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone); + std::vector new_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone_all_settled); + logger.debug("Range: {} Current endpoints: {} New endpoints: {}", to_stream, current_endpoints, new_endpoints); + std::sort(current_endpoints.begin(), current_endpoints.end()); + std::sort(new_endpoints.begin(), new_endpoints.end()); + + std::vector diff; + std::set_difference(new_endpoints.begin(), new_endpoints.end(), + current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff)); + for (auto address : diff) { + logger.debug("Range {} has new owner {}", to_stream, address); + endpoint_ranges.emplace(address, to_stream); + } + } + for (auto& x : endpoint_ranges) { + endpoint_ranges_map[x.first].emplace_back(x.second); + } + + // stream ranges + for (auto& x : endpoint_ranges_map) { + auto& address = x.first; + auto& ranges = x.second; + logger.debug("Will stream range {} of keyspace {} to endpoint {}", ranges , keyspace, address); + auto preferred = net::get_local_messaging_service().get_preferred_ip(address); + _stream_plan.transfer_ranges(address, preferred, keyspace, ranges); + } + + // stream requests + std::unordered_multimap> work = + dht::range_streamer::get_work_map(ranges_to_fetch_with_preferred_endpoints, keyspace); + std::unordered_map>> work_map; + for (auto& x : work) { + work_map[x.first].emplace_back(x.second); + } + + for (auto& x : work_map) { + auto& address = x.first; + auto& ranges = x.second; + logger.debug("Will request range {} of keyspace {} from endpoint {}", ranges, keyspace, address); + auto preferred = net::get_local_messaging_service().get_preferred_ip(address); + _stream_plan.request_ranges(address, preferred, keyspace, ranges); + } + if (logger.is_enabled(logging::log_level::debug)) { + for (auto& x : work) { + logger.debug("Keyspace {}: work map ep = {} --> range = {}", keyspace, x.first, x.second); + } + } + } + } +} + +future<> storage_service::move(token new_token) { + return run_with_write_api_lock([new_token] (storage_service& ss) mutable { + return seastar::async([new_token, &ss] { + auto tokens = ss._token_metadata.sorted_tokens(); + if (std::find(tokens.begin(), tokens.end(), new_token) != tokens.end()) { + throw std::runtime_error(sprint("target token %s is already owned by another node.", new_token)); + } + + // address of the current node + auto local_address = ss.get_broadcast_address(); + + // This doesn't make any sense in a vnodes environment. + if (ss.get_token_metadata().get_tokens(local_address).size() > 1) { + logger.error("Invalid request to move(Token); This node has more than one token and cannot be moved thusly."); + throw std::runtime_error("This node has more than one token and cannot be moved thusly."); + } + + auto keyspaces_to_process = ss._db.local().get_non_system_keyspaces(); + + get_local_pending_range_calculator_service().block_until_finished().get(); + + // checking if data is moving to this node + for (auto keyspace_name : keyspaces_to_process) { + if (ss._token_metadata.get_pending_ranges(keyspace_name, local_address).size() > 0) { + throw std::runtime_error("data is currently moving to this node; unable to leave the ring"); + } + } + + gms::get_local_gossiper().add_local_application_state(application_state::STATUS, ss.value_factory.moving(new_token)).get(); + ss.set_mode(mode::MOVING, sprint("Moving %s from %s to %s.", local_address, *(ss.get_local_tokens().begin()), new_token), true); + + ss.set_mode(mode::MOVING, sprint("Sleeping %d ms before start streaming/fetching ranges", RING_DELAY), true); + sleep(std::chrono::milliseconds(RING_DELAY)).get(); + + storage_service::range_relocator relocator(std::unordered_set{new_token}, keyspaces_to_process); + + if (relocator.streams_needed()) { + ss.set_mode(mode::MOVING, "fetching new ranges and streaming old ranges", true); + try { + relocator.stream().get(); + } catch (...) { + throw std::runtime_error(sprint("Interrupted while waiting for stream/fetch ranges to finish: %s", std::current_exception())); + } + } else { + ss.set_mode(mode::MOVING, "No ranges to fetch/stream", true); + } + + ss.set_tokens(std::unordered_set{new_token}); // setting new token as we have everything settled + + logger.debug("Successfully moved to new token {}", *(ss.get_local_tokens().begin())); + }); + }); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 3525aa43b0..9609cfdc8c 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -55,6 +55,7 @@ #include "utils/fb_utilities.hh" #include "database.hh" #include "streaming/stream_state.hh" +#include "streaming/stream_plan.hh" #include #include @@ -281,6 +282,9 @@ public: future is_native_transport_running(); +private: + future<> do_stop_rpc_server(); + future<> do_stop_native_transport(); #if 0 public void stopTransports() { @@ -1802,21 +1806,14 @@ private: void leave_ring(); void unbootstrap(); future<> stream_hints(); -#if 0 - public void move(String newToken) throws IOException - { - try - { - getPartitioner().getTokenFactory().validate(newToken); - } - catch (ConfigurationException e) - { - throw new IOException(e.getMessage()); - } - move(getPartitioner().getTokenFactory().fromString(newToken)); +public: + future<> move(sstring new_token) { + // FIXME: getPartitioner().getTokenFactory().validate(newToken); + return move(dht::global_partitioner().from_sstring(new_token)); } +private: /** * move the node to new token or find a new token to boot to according to load * @@ -1824,207 +1821,33 @@ private: * * @throws IOException on any I/O operation error */ - private void move(Token newToken) throws IOException - { - if (newToken == null) - throw new IOException("Can't move to the undefined (null) token."); + future<> move(token new_token); +public: - if (_token_metadata.sortedTokens().contains(newToken)) - throw new IOException("target token " + newToken + " is already owned by another node."); + class range_relocator { + private: + streaming::stream_plan _stream_plan; - // address of the current node - InetAddress localAddress = FBUtilities.getBroadcastAddress(); - - // This doesn't make any sense in a vnodes environment. - if (getTokenMetadata().getTokens(localAddress).size() > 1) - { - logger.error("Invalid request to move(Token); This node has more than one token and cannot be moved thusly."); - throw new UnsupportedOperationException("This node has more than one token and cannot be moved thusly."); + public: + range_relocator(std::unordered_set tokens, std::vector keyspace_names) + : _stream_plan("Relocation") { + calculate_to_from_streams(std::move(tokens), std::move(keyspace_names)); } - List keyspacesToProcess = Schema.instance.getNonSystemKeyspaces(); + private: + void calculate_to_from_streams(std::unordered_set new_tokens, std::vector keyspace_names); - PendingRangeCalculatorService.instance.blockUntilFinished(); - // checking if data is moving to this node - for (String keyspaceName : keyspacesToProcess) - { - if (_token_metadata.getPendingRanges(keyspaceName, localAddress).size() > 0) - throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring"); + public: + future<> stream() { + return _stream_plan.execute().discard_result(); } - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.moving(newToken)); - setMode(Mode.MOVING, String.format("Moving %s from %s to %s.", localAddress, getLocalTokens().iterator().next(), newToken), true); - - setMode(Mode.MOVING, String.format("Sleeping %s ms before start streaming/fetching ranges", RING_DELAY), true); - Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS); - - RangeRelocator relocator = new RangeRelocator(Collections.singleton(newToken), keyspacesToProcess); - - if (relocator.streamsNeeded()) - { - setMode(Mode.MOVING, "fetching new ranges and streaming old ranges", true); - try - { - relocator.stream().get(); - } - catch (ExecutionException | InterruptedException e) - { - throw new RuntimeException("Interrupted while waiting for stream/fetch ranges to finish: " + e.getMessage()); - } - } - else - { - setMode(Mode.MOVING, "No ranges to fetch/stream", true); + bool streams_needed() { + return !_stream_plan.is_empty(); } + }; - set_tokens(Collections.singleton(newToken)); // setting new token as we have everything settled - - if (logger.isDebugEnabled()) - logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next()); - } - - private class RangeRelocator - { - private final StreamPlan streamPlan = new StreamPlan("Relocation"); - - private RangeRelocator(Collection tokens, List keyspaceNames) - { - calculateToFromStreams(tokens, keyspaceNames); - } - - private void calculateToFromStreams(Collection newTokens, List keyspaceNames) - { - InetAddress localAddress = FBUtilities.getBroadcastAddress(); - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - TokenMetadata tokenMetaCloneAllSettled = _token_metadata.cloneAfterAllSettled(); - // clone to avoid concurrent modification in calculateNaturalEndpoints - TokenMetadata tokenMetaClone = _token_metadata.cloneOnlyTokenMap(); - - for (String keyspace : keyspaceNames) - { - logger.debug("Calculating ranges to stream and request for keyspace {}", keyspace); - for (Token newToken : newTokens) - { - // replication strategy of the current keyspace (aka table) - AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); - - // getting collection of the currently used ranges by this keyspace - Collection> currentRanges = getRangesForEndpoint(keyspace, localAddress); - // collection of ranges which this node will serve after move to the new token - Collection> updatedRanges = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress); - - // ring ranges and endpoints associated with them - // this used to determine what nodes should we ping about range data - Multimap, InetAddress> rangeAddresses = strategy.getRangeAddresses(tokenMetaClone); - - // calculated parts of the ranges to request/stream from/to nodes in the ring - Pair>, Set>> rangesPerKeyspace = calculateStreamAndFetchRanges(currentRanges, updatedRanges); - - /** - * In this loop we are going through all ranges "to fetch" and determining - * nodes in the ring responsible for data we are interested in - */ - Multimap, InetAddress> rangesToFetchWithPreferredEndpoints = ArrayListMultimap.create(); - for (Range toFetch : rangesPerKeyspace.right) - { - for (Range range : rangeAddresses.keySet()) - { - if (range.contains(toFetch)) - { - List endpoints = null; - - if (RangeStreamer.useStrictConsistency) - { - Set oldEndpoints = Sets.newHashSet(rangeAddresses.get(range)); - Set newEndpoints = Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, tokenMetaCloneAllSettled)); - - //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. - //So we need to be careful to only be strict when endpoints == RF - if (oldEndpoints.size() == strategy.getReplicationFactor()) - { - oldEndpoints.removeAll(newEndpoints); - - //No relocation required - if (oldEndpoints.isEmpty()) - continue; - - assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size(); - } - - endpoints = Lists.newArrayList(oldEndpoints.iterator().next()); - } - else - { - endpoints = snitch.getSortedListByProximity(localAddress, rangeAddresses.get(range)); - } - - // storing range and preferred endpoint set - rangesToFetchWithPreferredEndpoints.putAll(toFetch, endpoints); - } - } - - Collection addressList = rangesToFetchWithPreferredEndpoints.get(toFetch); - if (addressList == null || addressList.isEmpty()) - continue; - - if (RangeStreamer.useStrictConsistency) - { - if (addressList.size() > 1) - throw new IllegalStateException("Multiple strict sources found for " + toFetch); - - InetAddress sourceIp = addressList.iterator().next(); - if (Gossiper.instance.isEnabled() && !Gossiper.instance.getEndpointStateForEndpoint(sourceIp).isAlive()) - throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+"). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false"); - } - } - - // calculating endpoints to stream current ranges to if needed - // in some situations node will handle current ranges as part of the new ranges - Multimap> endpointRanges = HashMultimap.create(); - for (Range toStream : rangesPerKeyspace.left) - { - Set currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone)); - Set newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaCloneAllSettled)); - logger.debug("Range: {} Current endpoints: {} New endpoints: {}", toStream, currentEndpoints, newEndpoints); - for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints)) - { - logger.debug("Range {} has new owner {}", toStream, address); - endpointRanges.put(address, toStream); - } - } - - // stream ranges - for (InetAddress address : endpointRanges.keySet()) - { - logger.debug("Will stream range {} of keyspace {} to endpoint {}", endpointRanges.get(address), keyspace, address); - InetAddress preferred = SystemKeyspace.getPreferredIP(address); - streamPlan.transferRanges(address, preferred, keyspace, endpointRanges.get(address)); - } - - // stream requests - Multimap> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace); - for (InetAddress address : workMap.keySet()) - { - logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address); - InetAddress preferred = SystemKeyspace.getPreferredIP(address); - streamPlan.requestRanges(address, preferred, keyspace, workMap.get(address)); - } - - logger.debug("Keyspace {}: work map {}.", keyspace, workMap); - } - } - } - - public Future stream() - { - return streamPlan.execute(); - } - - public boolean streamsNeeded() - { - return !streamPlan.isEmpty(); - } - } +#if 0 /** * Get the status of a token removal. @@ -2268,7 +2091,7 @@ private: */ future<> stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace); -#if 0 +public: /** * Calculate pair of ranges to stream/fetch for given two range collections * (current ranges for keyspace and ranges after move to new token) @@ -2277,51 +2100,9 @@ private: * @param updated collection of the ranges after token is changed * @return pair of ranges to stream/fetch for given current and updated range collections */ - public Pair>, Set>> calculateStreamAndFetchRanges(Collection> current, Collection> updated) - { - Set> toStream = new HashSet<>(); - Set> toFetch = new HashSet<>(); - - - for (Range r1 : current) - { - boolean intersect = false; - for (Range r2 : updated) - { - if (r1.intersects(r2)) - { - // adding difference ranges to fetch from a ring - toStream.addAll(r1.subtract(r2)); - intersect = true; - } - } - if (!intersect) - { - toStream.add(r1); // should seed whole old range - } - } - - for (Range r2 : updated) - { - boolean intersect = false; - for (Range r1 : current) - { - if (r2.intersects(r1)) - { - // adding difference ranges to fetch from a ring - toFetch.addAll(r2.subtract(r1)); - intersect = true; - } - } - if (!intersect) - { - toFetch.add(r2); // should fetch whole old range - } - } - - return Pair.create(toStream, toFetch); - } - + std::pair>, std::unordered_set>> + calculate_stream_and_fetch_ranges(const std::vector>& current, const std::vector>& updated); +#if 0 public void bulkLoad(String directory) { try diff --git a/tests/batchlog_manager_test.cc b/tests/batchlog_manager_test.cc index f4803514fe..bdcc83a411 100644 --- a/tests/batchlog_manager_test.cc +++ b/tests/batchlog_manager_test.cc @@ -43,7 +43,6 @@ static atomic_cell make_atomic_cell(bytes value) { SEASTAR_TEST_CASE(test_execute_batch) { return do_with_cql_env([] (auto& e) { - db::system_keyspace::minimal_setup(e.db(), e.qp()); auto& qp = e.local_qp(); auto bp = make_lw_shared(qp); diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index 67c3d548a4..4a198b472c 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -110,6 +110,7 @@ future<> init_once(shared_ptr> db) { class single_node_cql_env : public cql_test_env { public: static auto constexpr ks_name = "ks"; + static std::atomic active; private: ::shared_ptr> _db; ::shared_ptr> _qp; @@ -275,6 +276,11 @@ public: } future<> start() { + bool old_active = false; + if (!active.compare_exchange_strong(old_active, true)) { + throw std::runtime_error("Starting more than one cql_test_env at a time not supported " + "due to singletons."); + } return seastar::async([this] { utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost")); locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get(); @@ -305,6 +311,8 @@ public: mm.start().get(); qp->start(std::ref(proxy), std::ref(*db)).get(); + db::system_keyspace::minimal_setup(*db, *qp); + auto& ss = service::get_local_storage_service(); static bool storage_service_started = false; if (!storage_service_started) { @@ -324,22 +332,22 @@ public: } virtual future<> stop() override { - return _core_local.stop().then([this] { - return db::get_batchlog_manager().stop().then([this] { - return _qp->stop().then([this] { - return service::get_migration_manager().stop().then([this] { - return service::get_storage_proxy().stop().then([this] { - return _db->stop().then([this] { - return locator::i_endpoint_snitch::stop_snitch(); - }); - }); - }); - }); - }); + return seastar::async([this] { + _core_local.stop().get(); + db::get_batchlog_manager().stop().get(); + _qp->stop().get(); + service::get_migration_manager().stop().get(); + service::get_storage_proxy().stop().get(); + _db->stop().get(); + locator::i_endpoint_snitch::stop_snitch().get(); + bool old_active = true; + assert(active.compare_exchange_strong(old_active, false)); }); } }; +std::atomic single_node_cql_env::active = { false }; + future<::shared_ptr> make_env_for_test() { return seastar::async([] { auto env = ::make_shared(); diff --git a/tests/range_test.cc b/tests/range_test.cc index 6f23e8b9b4..7daaa8dbe8 100644 --- a/tests/range_test.cc +++ b/tests/range_test.cc @@ -327,6 +327,53 @@ BOOST_AUTO_TEST_CASE(test_range_contains) { BOOST_REQUIRE(!range({3}, {1}).contains(range({{1, false}}, {3}), cmp)); } +BOOST_AUTO_TEST_CASE(test_range_subtract) { + auto cmp = [] (int i1, int i2) -> int { return i1 - i2; }; + using r = range; + using vec = std::vector; + + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({0}, {1}), cmp), vec({r({2}, {4})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {1}), cmp), vec({r({2}, {4})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {2}), cmp), vec({r({{2, false}}, {4})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {3}), cmp), vec({r({{3, false}}, {4})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {4}), cmp), vec()); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({1}, {4}), cmp), vec()); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({1}, {3}), cmp), vec({r({{3, false}}, {4})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({1}, {{3, false}}), cmp), vec({r({3}, {4})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({2}, {4}), cmp), vec()); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {{4, false}}), cmp), vec({r({4}, {4})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {}), cmp), vec()); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({{2, false}}, {}), cmp), vec({r({2}, {2})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({{2, false}}, {4}), cmp), vec({r({2}, {2})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({3}, {5}), cmp), vec({r({2}, {{3, false}})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({3}, {1}), cmp), vec({r({2}, {{3, false}})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r::make_singular(3), cmp), vec({r({2}, {{3, false}}), r({{3, false}}, {4})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r::make_singular(4), cmp), vec({r({2}, {{4, false}})})); + BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r::make_singular(5), cmp), vec({r({2}, {4})})); + + BOOST_REQUIRE_EQUAL(r({}, {4}).subtract(r({3}, {5}), cmp), vec({r({}, {{3, false}})})); + BOOST_REQUIRE_EQUAL(r({}, {4}).subtract(r({5}, {}), cmp), vec({r({}, {4})})); + BOOST_REQUIRE_EQUAL(r({}, {4}).subtract(r({5}, {6}), cmp), vec({r({}, {4})})); + BOOST_REQUIRE_EQUAL(r({}, {4}).subtract(r({5}, {2}), cmp), vec({r({{2, false}}, {4})})); + BOOST_REQUIRE_EQUAL(r({4}, {}).subtract(r({3}, {5}), cmp), vec({r({{5, false}}, {})})); + BOOST_REQUIRE_EQUAL(r({4}, {}).subtract(r({1}, {3}), cmp), vec({r({4}, {})})); + BOOST_REQUIRE_EQUAL(r({4}, {}).subtract(r({}, {3}), cmp), vec({r({4}, {})})); + BOOST_REQUIRE_EQUAL(r({4}, {}).subtract(r({7}, {5}), cmp), vec({r({{5, false}}, {{7, false}})})); + + BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({6}, {}), cmp), vec({r({}, {1}), r({5}, {{6, false}})})); + BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({6}, {1}), cmp), vec({r({5}, {{6, false}})})); + BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({6}, {2}), cmp), vec({r({5}, {{6, false}})})); + + // FIXME: Also accept adjacent ranges merged + BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({4}, {7}), cmp), vec({r({}, {1}), r({{7, false}}, {})})); + + // FIXME: Also accept adjacent ranges merged + BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({6}, {7}), cmp), vec({r({}, {1}), r({5}, {{6, false}}), r({{7, false}}, {})})); + + BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({6}, {0}), cmp), vec({r({{0, false}}, {1}), r({5}, {{6, false}})})); + BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({}, {0}), cmp), vec({r({{0, false}}, {1}), r({5}, {})})); +} + struct unsigned_comparator { int operator()(unsigned u1, unsigned u2) const { return (u1 > u2 ? 1 : (u1 == u2 ? 0 : -1));