Merge "move node support + decommission fix" from Asias

"Usage example:

$ nodetool -p 7200 move -- "-9223372036854775000"

or

$ curl -X POST --header "Content-Type: application/json" --header "Accept: application/json"
  "http://127.0.0.2:10000/storage_service/move?new_token=8000000"

Note: Tomek's range subtract and cql_test_env fix for system_keyspace seriers are included."
This commit is contained in:
Avi Kivity
2015-11-09 11:08:51 +02:00
14 changed files with 527 additions and 386 deletions

View File

@@ -366,11 +366,11 @@ void set_storage_service(http_context& ctx, routes& r) {
});
});
ss::move.set(r, [](std::unique_ptr<request> req) {
//TBD
unimplemented();
ss::move.set(r, [] (std::unique_ptr<request> req) {
auto new_token = req->get_query_param("new_token");
return make_ready_future<json::json_return_type>(json_void());
return service::get_local_storage_service().move(new_token).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
ss::remove_node.set(r, [](std::unique_ptr<request> req) {

View File

@@ -67,9 +67,8 @@ extern std::unique_ptr<query_context> qctx;
// we executed the query, and return an empty result
template <typename... Args>
static future<::shared_ptr<cql3::untyped_result_set>> execute_cql(sstring text, Args&&... args) {
if (qctx) {
return qctx->execute_cql(text, std::forward<Args>(args)...);
}
return make_ready_future<shared_ptr<cql3::untyped_result_set>>(::make_shared<cql3::untyped_result_set>(cql3::untyped_result_set::make_empty()));
assert(qctx);
return qctx->execute_cql(text, std::forward<Args>(args)...);
}
}

View File

@@ -486,9 +486,7 @@ future<> init_local_cache() {
}
void minimal_setup(distributed<database>& db, distributed<cql3::query_processor>& qp) {
auto new_ctx = std::make_unique<query_context>(db, qp);
qctx.swap(new_ctx);
assert(!new_ctx);
qctx = std::make_unique<query_context>(db, qp);
}
future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp) {
@@ -817,10 +815,7 @@ future<> update_tokens(std::unordered_set<dht::token> tokens) {
}
future<> force_blocking_flush(sstring cfname) {
if (!qctx) {
return make_ready_future<>();
}
assert(qctx);
return qctx->_db.invoke_on_all([cfname = std::move(cfname)](database& db) {
// if (!Boolean.getBoolean("cassandra.unsafesystem"))
column_family& cf = db.find_column_family(NAME, cfname);

View File

@@ -41,6 +41,7 @@
#include "locator/snitch_base.hh"
#include "database.hh"
#include "gms/gossiper.hh"
#include "gms/failure_detector.hh"
#include "log.hh"
#include "streaming/stream_plan.hh"
#include "streaming/stream_state.hh"
@@ -55,14 +56,7 @@ static std::unordered_map<range<token>, std::unordered_set<inet_address>>
unordered_multimap_to_unordered_map(const std::unordered_multimap<range<token>, inet_address>& multimap) {
std::unordered_map<range<token>, std::unordered_set<inet_address>> ret;
for (auto x : multimap) {
auto& range_token = x.first;
auto& ep = x.second;
auto it = ret.find(range_token);
if (it != ret.end()) {
it->second.emplace(ep);
} else {
ret.emplace(range_token, std::unordered_set<inet_address>{ep});
}
ret[x.first].emplace(x.second);
}
return ret;
}
@@ -166,23 +160,24 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
for (auto& x : range_addresses) {
const range<token>& src_range = x.first;
if (src_range.contains(desired_range, dht::tri_compare)) {
auto old_endpoints = x.second;
std::vector<inet_address> old_endpoints(x.second.begin(), x.second.end());
auto it = pending_range_addresses.find(desired_range);
assert (it != pending_range_addresses.end());
auto new_endpoints = it->second;
if (it == pending_range_addresses.end()) {
throw std::runtime_error(sprint("Can not find desired_range = {} in pending_range_addresses", desired_range));
}
std::unordered_set<inet_address> new_endpoints = it->second;
//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
//So we need to be careful to only be strict when endpoints == RF
if (old_endpoints.size() == strat.get_replication_factor()) {
std::unordered_set<inet_address> diff;
std::set_difference(old_endpoints.begin(), old_endpoints.end(),
new_endpoints.begin(), new_endpoints.end(), std::inserter(diff, diff.begin()));
old_endpoints = std::move(diff);
auto it = std::remove_if(old_endpoints.begin(), old_endpoints.end(),
[&new_endpoints] (inet_address ep) { return new_endpoints.count(ep); });
old_endpoints.erase(it, old_endpoints.end());
if (old_endpoints.size() != 1) {
throw std::runtime_error(sprint("Expected 1 endpoint but found ", old_endpoints.size()));
throw std::runtime_error(sprint("Expected 1 endpoint but found %d", old_endpoints.size()));
}
}
range_sources.emplace(desired_range, *(old_endpoints.begin()));
range_sources.emplace(desired_range, old_endpoints.front());
}
}
@@ -268,4 +263,13 @@ future<streaming::stream_state> range_streamer::fetch_async() {
return _stream_plan.execute();
}
std::unordered_multimap<inet_address, range<token>>
range_streamer::get_work_map(const std::unordered_multimap<range<token>, inet_address>& ranges_with_source_target,
const sstring& keyspace) {
auto filter = std::make_unique<dht::range_streamer::failure_detector_source_filter>(gms::get_local_failure_detector());
std::unordered_set<std::unique_ptr<i_source_filter>> source_filters;
source_filters.emplace(std::move(filter));
return get_range_fetch_map(ranges_with_source_target, source_filters, keyspace);
}
} // dht

View File

@@ -148,11 +148,11 @@ private:
const std::unordered_set<std::unique_ptr<i_source_filter>>& source_filters,
const sstring& keyspace);
public:
static std::unordered_multimap<inet_address, range<token>>
get_work_map(const std::unordered_multimap<range<token>, inet_address>& ranges_with_source_target,
const sstring& keyspace);
#if 0
public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String keyspace)
{
return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(FailureDetector.instance)), keyspace);
}
// For testing purposes
Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch()

View File

@@ -399,6 +399,8 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata);
auto new_endpoints = strategy.calculate_natural_endpoints(t, all_left_metadata);
std::vector<inet_address> diff;
std::sort(current_endpoints.begin(), current_endpoints.end());
std::sort(new_endpoints.begin(), new_endpoints.end());
std::set_difference(new_endpoints.begin(), new_endpoints.end(),
current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff));
for (auto& ep : diff) {
@@ -468,6 +470,25 @@ void token_metadata::add_leaving_endpoint(inet_address endpoint) {
_leaving_endpoints.emplace(endpoint);
}
token_metadata token_metadata::clone_after_all_settled() {
token_metadata metadata = clone_only_token_map();
for (auto endpoint : _leaving_endpoints) {
metadata.remove_endpoint(endpoint);
}
for (auto x : _moving_endpoints) {
metadata.update_normal_token(x.first, x.second);
}
return metadata;
}
void token_metadata::add_moving_endpoint(token t, inet_address endpoint) {
_moving_endpoints[t] = endpoint;
}
/////////////////// class topology /////////////////////////////////////////////
inline void topology::clear() {
_dc_endpoints.clear();

View File

@@ -473,29 +473,14 @@ public:
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
void add_leaving_endpoint(inet_address endpoint);
public:
#if 0
/**
* Add a new moving endpoint
* @param token token which is node moving to
* @param endpoint address of the moving node
*/
public void addMovingEndpoint(Token token, InetAddress endpoint)
{
assert endpoint != null;
lock.writeLock().lock();
try
{
_moving_endpoints.add(Pair.create(token, endpoint));
}
finally
{
lock.writeLock().unlock();
}
}
#endif
void add_moving_endpoint(token t, inet_address endpoint);
public:
void remove_endpoint(inet_address endpoint);
@@ -597,36 +582,15 @@ public:
return all_left_metadata;
}
#if 0
public:
/**
* Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
* current leave, and move operations have finished.
*
* @return new token metadata
*/
public TokenMetadata cloneAfterAllSettled()
{
lock.readLock().lock();
try
{
TokenMetadata metadata = cloneOnlyTokenMap();
for (InetAddress endpoint : _leaving_endpoints)
metadata.removeEndpoint(endpoint);
for (Pair<Token, InetAddress> pair : _moving_endpoints)
metadata.updateNormalToken(pair.left, pair.right);
return metadata;
}
finally
{
lock.readLock().unlock();
}
}
token_metadata clone_after_all_settled();
#if 0
public InetAddress getEndpoint(Token token)
{
lock.readLock().lock();

100
range.hh
View File

@@ -23,6 +23,7 @@
#include <experimental/optional>
#include <iostream>
#include <boost/range/algorithm/copy.hpp>
// A range which can have inclusive, exclusive or open-ended bounds on each end.
template<typename T>
@@ -61,6 +62,36 @@ public:
, _singular(true)
{ }
range() : range({}, {}) {}
private:
// Bound wrappers for compile-time dispatch and safety.
struct start_bound_ref { const optional<bound>& b; };
struct end_bound_ref { const optional<bound>& b; };
start_bound_ref start_bound() const { return { start() }; }
end_bound_ref end_bound() const { return { end() }; }
template<typename Comparator>
static bool greater_than_or_equal(end_bound_ref end, start_bound_ref start, Comparator&& cmp) {
return !end.b || !start.b || cmp(end.b->value(), start.b->value())
>= (!end.b->is_inclusive() || !start.b->is_inclusive());
}
template<typename Comparator>
static bool less_than(end_bound_ref end, start_bound_ref start, Comparator&& cmp) {
return !greater_than_or_equal(end, start, cmp);
}
template<typename Comparator>
static bool less_than_or_equal(start_bound_ref first, start_bound_ref second, Comparator&& cmp) {
return !first.b || (second.b && cmp(first.b->value(), second.b->value())
<= -(!first.b->is_inclusive() && second.b->is_inclusive()));
}
template<typename Comparator>
static bool greater_than_or_equal(end_bound_ref first, end_bound_ref second, Comparator&& cmp) {
return !first.b || (second.b && cmp(first.b->value(), second.b->value())
>= (!first.b->is_inclusive() && second.b->is_inclusive()));
}
public:
// the point is before the range (works only for non wrapped ranges)
// Comparator must define a total ordering on T.
@@ -122,17 +153,8 @@ public:
return true;
}
// check if end is greater than or equal to start, taking into account if either is inclusive.
auto greater_than_or_equal = [cmp] (const optional<bound>& end, const optional<bound>& start) {
// !start means -inf, whereas !end means +inf
if (!end || !start) {
return true;
}
return cmp(end->value(), start->value())
>= (!end->is_inclusive() || !start->is_inclusive());
};
return greater_than_or_equal(end(), other.start()) && greater_than_or_equal(other.end(), start());
return greater_than_or_equal(end_bound(), other.start_bound(), cmp)
&& greater_than_or_equal(other.end_bound(), start_bound(), cmp);
}
static range make(bound start, bound end) {
return range({std::move(start)}, {std::move(end)});
@@ -180,11 +202,12 @@ public:
}
}
// Converts a wrap-around range to two non-wrap-around ranges.
// The returned ranges are not overlapping and ordered.
// Call only when is_wrap_around().
std::pair<range, range> unwrap() const {
return {
{ start(), {} },
{ {}, end() }
{ {}, end() },
{ start(), {} }
};
}
// the point is inside the range
@@ -214,12 +237,8 @@ public:
}
if (!this_wraps && !other_wraps) {
return (!start() || (other.start()
&& cmp(start()->value(), other.start()->value())
<= -(!start()->is_inclusive() && other.start()->is_inclusive())))
&& (!end() || (other.end()
&& cmp(end()->value(), other.end()->value())
>= (!end()->is_inclusive() && other.end()->is_inclusive())));
return less_than_or_equal(start_bound(), other.start_bound(), cmp)
&& greater_than_or_equal(end_bound(), other.end_bound(), cmp);
}
if (other_wraps) { // && !this_wraps
@@ -232,6 +251,49 @@ public:
|| (other.end() && cmp(end()->value(), other.end()->value())
>= (!end()->is_inclusive() && other.end()->is_inclusive()));
}
// Returns ranges which cover all values covered by this range but not covered by the other range.
// Ranges are not overlapping and ordered.
// Comparator must define a total ordering on T.
template<typename Comparator>
std::vector<range> subtract(const range& other, Comparator&& cmp) const {
std::vector<range> result;
auto this_wraps = is_wrap_around(cmp);
auto other_wraps = other.is_wrap_around(cmp);
if (this_wraps && other_wraps) {
auto this_unwrapped = unwrap();
auto other_unwrapped = other.unwrap();
boost::copy(this_unwrapped.first.subtract(other_unwrapped.first, cmp), std::back_inserter(result));
boost::copy(this_unwrapped.second.subtract(other_unwrapped.second, cmp), std::back_inserter(result));
} else if (this_wraps) {
auto this_unwrapped = unwrap();
boost::copy(this_unwrapped.first.subtract(other, cmp), std::back_inserter(result));
boost::copy(this_unwrapped.second.subtract(other, cmp), std::back_inserter(result));
} else if (other_wraps) {
auto other_unwrapped = other.unwrap();
for (auto &&r : subtract(other_unwrapped.first, cmp)) {
boost::copy(r.subtract(other_unwrapped.second, cmp), std::back_inserter(result));
}
} else {
if (less_than(end_bound(), other.start_bound(), cmp)
|| less_than(other.end_bound(), start_bound(), cmp)) {
// Not overlapping
result.push_back(*this);
} else {
// Overlapping
if (!less_than_or_equal(other.start_bound(), start_bound(), cmp)) {
result.push_back({start(), bound(other.start()->value(), !other.start()->is_inclusive())});
}
if (!greater_than_or_equal(other.end_bound(), end_bound(), cmp)) {
result.push_back({bound(other.end()->value(), !other.end()->is_inclusive()), end()});
}
}
}
// TODO: Merge adjacent ranges (optimization)
return result;
}
// split range in two around a split_point. split_point has to be inside the range
// split_point will belong to first range
// Comparator must define a total ordering on T.

View File

@@ -2837,8 +2837,8 @@ storage_proxy::make_local_reader(utils::UUID cf_id, const query::partition_range
auto unwrapped = range.unwrap();
std::vector<mutation_reader> both;
both.reserve(2);
both.push_back(make_local_reader(cf_id, unwrapped.second));
both.push_back(make_local_reader(cf_id, unwrapped.first));
both.push_back(make_local_reader(cf_id, unwrapped.second));
return make_joining_reader(std::move(both));
}

View File

@@ -611,9 +611,7 @@ void storage_service::handle_state_moving(inet_address endpoint, std::vector<sst
assert(pieces.size() >= 2);
auto token = dht::global_partitioner().from_sstring(pieces[1]);
logger.debug("Node {} state moving, new token {}", endpoint, token);
#if 0
_token_metadata.addMovingEndpoint(token, endpoint);
#endif
_token_metadata.add_moving_endpoint(token, endpoint);
get_local_pending_range_calculator_service().update().get();
}
@@ -741,7 +739,7 @@ void storage_service::on_remove(gms::inet_address endpoint) {
}
void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) {
logger.debug("on_restart endpoint={}", endpoint);
logger.debug("on_dead endpoint={}", endpoint);
#if 0
MessagingService.instance().convict(endpoint);
#endif
@@ -754,11 +752,10 @@ void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state st
void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) {
logger.debug("on_restart endpoint={}", endpoint);
#if 0
// If we have restarted before the node was even marked down, we need to reset the connection pool
if (state.isAlive())
onDead(endpoint, state);
#endif
if (state.is_alive()) {
on_dead(endpoint, state);
}
}
// Runs inside seastar::async context
@@ -847,8 +844,7 @@ void storage_service::set_tokens(std::unordered_set<token> tokens) {
logger.debug("Setting tokens to {}", tokens);
db::system_keyspace::update_tokens(tokens).get();
_token_metadata.update_normal_tokens(tokens, get_broadcast_address());
// Collection<Token> localTokens = getLocalTokens();
auto local_tokens = _bootstrap_tokens;
auto local_tokens = get_local_tokens();
auto& gossiper = gms::get_local_gossiper();
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens)).get();
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.normal(local_tokens)).get();
@@ -1473,15 +1469,19 @@ future<> storage_service::start_rpc_server() {
});
}
future<> storage_service::do_stop_rpc_server() {
auto tserver = _thrift_server;
_thrift_server = {};
if (tserver) {
// FIXME: thrift_server::stop() doesn't kill existing connections and wait for them
return tserver->stop();
}
return make_ready_future<>();
}
future<> storage_service::stop_rpc_server() {
return run_with_write_api_lock([] (storage_service& ss) {
auto tserver = ss._thrift_server;
ss._thrift_server = {};
if (tserver) {
// FIXME: thrift_server::stop() doesn't kill existing connections and wait for them
return tserver->stop();
}
return make_ready_future<>();
return ss.do_stop_rpc_server();
});
}
@@ -1518,15 +1518,19 @@ future<> storage_service::start_native_transport() {
});
}
future<> storage_service::do_stop_native_transport() {
auto cserver = _cql_server;
_cql_server = {};
if (cserver) {
// FIXME: cql_server::stop() doesn't kill existing connections and wait for them
return cserver->stop();
}
return make_ready_future<>();
}
future<> storage_service::stop_native_transport() {
return run_with_write_api_lock([] (storage_service& ss) {
auto cserver = ss._cql_server;
ss._cql_server = {};
if (cserver) {
// FIXME: cql_server::stop() doesn't kill existing connections and wait for them
return cserver->stop();
}
return make_ready_future<>();
return ss.do_stop_native_transport();
});
}
@@ -1627,7 +1631,7 @@ future<> storage_service::remove_node(sstring host_id_string) {
// to take responsibility for new range)
std::unordered_multimap<range<token>, inet_address> changed_ranges =
ss.get_changed_ranges_for_leaving(keyspace_name, endpoint);
auto fd = gms::get_local_failure_detector();
auto& fd = gms::get_local_failure_detector();
for (auto& x: changed_ranges) {
auto ep = x.second;
if (fd.is_alive(ep)) {
@@ -2236,7 +2240,7 @@ shared_ptr<load_broadcaster>& storage_service::get_load_broadcaster() {
}
future<> storage_service::shutdown_client_servers() {
return stop_rpc_server().then([this] { return stop_native_transport(); });
return do_stop_rpc_server().then([this] { return do_stop_native_transport(); });
}
std::unordered_multimap<inet_address, range<token>>
@@ -2271,5 +2275,262 @@ storage_service::get_new_source_ranges(const sstring& keyspace_name, const std::
return source_ranges;
}
std::pair<std::unordered_set<range<token>>, std::unordered_set<range<token>>>
storage_service::calculate_stream_and_fetch_ranges(const std::vector<range<token>>& current, const std::vector<range<token>>& updated) {
std::unordered_set<range<token>> to_stream;
std::unordered_set<range<token>> to_fetch;
for (auto r1 : current) {
bool intersect = false;
for (auto r2 : updated) {
if (r1.overlaps(r2, dht::token_comparator())) {
// adding difference ranges to fetch from a ring
for (auto r : r1.subtract(r2, dht::token_comparator())) {
to_stream.emplace(r);
}
intersect = true;
}
}
if (!intersect) {
to_stream.emplace(r1); // should seed whole old range
}
}
for (auto r2 : updated) {
bool intersect = false;
for (auto r1 : current) {
if (r2.overlaps(r1, dht::token_comparator())) {
// adding difference ranges to fetch from a ring
for (auto r : r2.subtract(r1, dht::token_comparator())) {
to_fetch.emplace(r);
}
intersect = true;
}
}
if (!intersect) {
to_fetch.emplace(r2); // should fetch whole old range
}
}
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("current = {}", current);
logger.debug("updated = {}", updated);
logger.debug("to_stream = {}", to_stream);
logger.debug("to_fetch = {}", to_fetch);
}
return std::pair<std::unordered_set<range<token>>, std::unordered_set<range<token>>>(to_stream, to_fetch);
}
void storage_service::range_relocator::calculate_to_from_streams(std::unordered_set<token> new_tokens, std::vector<sstring> keyspace_names) {
auto& ss = get_local_storage_service();
auto local_address = ss.get_broadcast_address();
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
auto token_meta_clone_all_settled = ss._token_metadata.clone_after_all_settled();
// clone to avoid concurrent modification in calculateNaturalEndpoints
auto token_meta_clone = ss._token_metadata.clone_only_token_map();
for (auto keyspace : keyspace_names) {
logger.debug("Calculating ranges to stream and request for keyspace {}", keyspace);
for (auto new_token : new_tokens) {
// replication strategy of the current keyspace (aka table)
auto& ks = ss._db.local().find_keyspace(keyspace);
auto& strategy = ks.get_replication_strategy();
// getting collection of the currently used ranges by this keyspace
std::vector<range<token>> current_ranges = ss.get_ranges_for_endpoint(keyspace, local_address);
// collection of ranges which this node will serve after move to the new token
std::vector<range<token>> updated_ranges = strategy.get_pending_address_ranges(token_meta_clone, new_token, local_address);
// ring ranges and endpoints associated with them
// this used to determine what nodes should we ping about range data
std::unordered_multimap<range<token>, inet_address> range_addresses = strategy.get_range_addresses(token_meta_clone);
std::unordered_map<range<token>, std::vector<inet_address>> range_addresses_map;
for (auto& x : range_addresses) {
range_addresses_map[x.first].emplace_back(x.second);
}
// calculated parts of the ranges to request/stream from/to nodes in the ring
// std::pair(to_stream, to_fetch)
std::pair<std::unordered_set<range<token>>, std::unordered_set<range<token>>> ranges_per_keyspace =
ss.calculate_stream_and_fetch_ranges(current_ranges, updated_ranges);
/**
* In this loop we are going through all ranges "to fetch" and determining
* nodes in the ring responsible for data we are interested in
*/
std::unordered_multimap<range<token>, inet_address> ranges_to_fetch_with_preferred_endpoints;
for (range<token> to_fetch : ranges_per_keyspace.second) {
for (auto& x : range_addresses_map) {
const range<token>& r = x.first;
std::vector<inet_address>& eps = x.second;
if (r.contains(to_fetch, dht::token_comparator())) {
std::vector<inet_address> endpoints;
if (dht::range_streamer::use_strict_consistency()) {
std::vector<inet_address> old_endpoints = eps;
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(to_fetch.end()->value(), token_meta_clone_all_settled);
//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
//So we need to be careful to only be strict when endpoints == RF
if (old_endpoints.size() == strategy.get_replication_factor()) {
for (auto n : new_endpoints) {
auto beg = old_endpoints.begin();
auto end = old_endpoints.end();
old_endpoints.erase(std::remove(beg, end, n), end);
}
//No relocation required
if (old_endpoints.empty()) {
continue;
}
if (old_endpoints.size() != 1) {
throw std::runtime_error(sprint("Expected 1 endpoint but found %d", old_endpoints.size()));
}
}
endpoints.emplace_back(old_endpoints.front());
} else {
std::unordered_set<inet_address> eps_set(eps.begin(), eps.end());
endpoints = snitch->get_sorted_list_by_proximity(local_address, eps_set);
}
// storing range and preferred endpoint set
for (auto ep : endpoints) {
ranges_to_fetch_with_preferred_endpoints.emplace(to_fetch, ep);
}
}
}
std::vector<inet_address> address_list;
auto rg = ranges_to_fetch_with_preferred_endpoints.equal_range(to_fetch);
for (auto it = rg.first; it != rg.second; it++) {
address_list.push_back(it->second);
}
if (address_list.empty()) {
continue;
}
if (dht::range_streamer::use_strict_consistency()) {
if (address_list.size() > 1) {
throw std::runtime_error(sprint("Multiple strict sources found for %s", to_fetch));
}
auto source_ip = address_list.front();
auto& gossiper = gms::get_local_gossiper();
auto state = gossiper.get_endpoint_state_for_endpoint(source_ip);
if (gossiper.is_enabled() && state && !state->is_alive())
throw std::runtime_error(sprint("A node required to move the data consistently is down (%s). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false", source_ip));
}
}
// calculating endpoints to stream current ranges to if needed
// in some situations node will handle current ranges as part of the new ranges
std::unordered_multimap<inet_address, range<token>> endpoint_ranges;
std::unordered_map<inet_address, std::vector<range<token>>> endpoint_ranges_map;
for (range<token> to_stream : ranges_per_keyspace.first) {
std::vector<inet_address> current_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone);
std::vector<inet_address> new_endpoints = strategy.calculate_natural_endpoints(to_stream.end()->value(), token_meta_clone_all_settled);
logger.debug("Range: {} Current endpoints: {} New endpoints: {}", to_stream, current_endpoints, new_endpoints);
std::sort(current_endpoints.begin(), current_endpoints.end());
std::sort(new_endpoints.begin(), new_endpoints.end());
std::vector<inet_address> diff;
std::set_difference(new_endpoints.begin(), new_endpoints.end(),
current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff));
for (auto address : diff) {
logger.debug("Range {} has new owner {}", to_stream, address);
endpoint_ranges.emplace(address, to_stream);
}
}
for (auto& x : endpoint_ranges) {
endpoint_ranges_map[x.first].emplace_back(x.second);
}
// stream ranges
for (auto& x : endpoint_ranges_map) {
auto& address = x.first;
auto& ranges = x.second;
logger.debug("Will stream range {} of keyspace {} to endpoint {}", ranges , keyspace, address);
auto preferred = net::get_local_messaging_service().get_preferred_ip(address);
_stream_plan.transfer_ranges(address, preferred, keyspace, ranges);
}
// stream requests
std::unordered_multimap<inet_address, range<token>> work =
dht::range_streamer::get_work_map(ranges_to_fetch_with_preferred_endpoints, keyspace);
std::unordered_map<inet_address, std::vector<range<token>>> work_map;
for (auto& x : work) {
work_map[x.first].emplace_back(x.second);
}
for (auto& x : work_map) {
auto& address = x.first;
auto& ranges = x.second;
logger.debug("Will request range {} of keyspace {} from endpoint {}", ranges, keyspace, address);
auto preferred = net::get_local_messaging_service().get_preferred_ip(address);
_stream_plan.request_ranges(address, preferred, keyspace, ranges);
}
if (logger.is_enabled(logging::log_level::debug)) {
for (auto& x : work) {
logger.debug("Keyspace {}: work map ep = {} --> range = {}", keyspace, x.first, x.second);
}
}
}
}
}
future<> storage_service::move(token new_token) {
return run_with_write_api_lock([new_token] (storage_service& ss) mutable {
return seastar::async([new_token, &ss] {
auto tokens = ss._token_metadata.sorted_tokens();
if (std::find(tokens.begin(), tokens.end(), new_token) != tokens.end()) {
throw std::runtime_error(sprint("target token %s is already owned by another node.", new_token));
}
// address of the current node
auto local_address = ss.get_broadcast_address();
// This doesn't make any sense in a vnodes environment.
if (ss.get_token_metadata().get_tokens(local_address).size() > 1) {
logger.error("Invalid request to move(Token); This node has more than one token and cannot be moved thusly.");
throw std::runtime_error("This node has more than one token and cannot be moved thusly.");
}
auto keyspaces_to_process = ss._db.local().get_non_system_keyspaces();
get_local_pending_range_calculator_service().block_until_finished().get();
// checking if data is moving to this node
for (auto keyspace_name : keyspaces_to_process) {
if (ss._token_metadata.get_pending_ranges(keyspace_name, local_address).size() > 0) {
throw std::runtime_error("data is currently moving to this node; unable to leave the ring");
}
}
gms::get_local_gossiper().add_local_application_state(application_state::STATUS, ss.value_factory.moving(new_token)).get();
ss.set_mode(mode::MOVING, sprint("Moving %s from %s to %s.", local_address, *(ss.get_local_tokens().begin()), new_token), true);
ss.set_mode(mode::MOVING, sprint("Sleeping %d ms before start streaming/fetching ranges", RING_DELAY), true);
sleep(std::chrono::milliseconds(RING_DELAY)).get();
storage_service::range_relocator relocator(std::unordered_set<token>{new_token}, keyspaces_to_process);
if (relocator.streams_needed()) {
ss.set_mode(mode::MOVING, "fetching new ranges and streaming old ranges", true);
try {
relocator.stream().get();
} catch (...) {
throw std::runtime_error(sprint("Interrupted while waiting for stream/fetch ranges to finish: %s", std::current_exception()));
}
} else {
ss.set_mode(mode::MOVING, "No ranges to fetch/stream", true);
}
ss.set_tokens(std::unordered_set<token>{new_token}); // setting new token as we have everything settled
logger.debug("Successfully moved to new token {}", *(ss.get_local_tokens().begin()));
});
});
}
} // namespace service

View File

@@ -55,6 +55,7 @@
#include "utils/fb_utilities.hh"
#include "database.hh"
#include "streaming/stream_state.hh"
#include "streaming/stream_plan.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/rwlock.hh>
@@ -281,6 +282,9 @@ public:
future<bool> is_native_transport_running();
private:
future<> do_stop_rpc_server();
future<> do_stop_native_transport();
#if 0
public void stopTransports()
{
@@ -1802,21 +1806,14 @@ private:
void leave_ring();
void unbootstrap();
future<> stream_hints();
#if 0
public void move(String newToken) throws IOException
{
try
{
getPartitioner().getTokenFactory().validate(newToken);
}
catch (ConfigurationException e)
{
throw new IOException(e.getMessage());
}
move(getPartitioner().getTokenFactory().fromString(newToken));
public:
future<> move(sstring new_token) {
// FIXME: getPartitioner().getTokenFactory().validate(newToken);
return move(dht::global_partitioner().from_sstring(new_token));
}
private:
/**
* move the node to new token or find a new token to boot to according to load
*
@@ -1824,207 +1821,33 @@ private:
*
* @throws IOException on any I/O operation error
*/
private void move(Token newToken) throws IOException
{
if (newToken == null)
throw new IOException("Can't move to the undefined (null) token.");
future<> move(token new_token);
public:
if (_token_metadata.sortedTokens().contains(newToken))
throw new IOException("target token " + newToken + " is already owned by another node.");
class range_relocator {
private:
streaming::stream_plan _stream_plan;
// address of the current node
InetAddress localAddress = FBUtilities.getBroadcastAddress();
// This doesn't make any sense in a vnodes environment.
if (getTokenMetadata().getTokens(localAddress).size() > 1)
{
logger.error("Invalid request to move(Token); This node has more than one token and cannot be moved thusly.");
throw new UnsupportedOperationException("This node has more than one token and cannot be moved thusly.");
public:
range_relocator(std::unordered_set<token> tokens, std::vector<sstring> keyspace_names)
: _stream_plan("Relocation") {
calculate_to_from_streams(std::move(tokens), std::move(keyspace_names));
}
List<String> keyspacesToProcess = Schema.instance.getNonSystemKeyspaces();
private:
void calculate_to_from_streams(std::unordered_set<token> new_tokens, std::vector<sstring> keyspace_names);
PendingRangeCalculatorService.instance.blockUntilFinished();
// checking if data is moving to this node
for (String keyspaceName : keyspacesToProcess)
{
if (_token_metadata.getPendingRanges(keyspaceName, localAddress).size() > 0)
throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
public:
future<> stream() {
return _stream_plan.execute().discard_result();
}
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.moving(newToken));
setMode(Mode.MOVING, String.format("Moving %s from %s to %s.", localAddress, getLocalTokens().iterator().next(), newToken), true);
setMode(Mode.MOVING, String.format("Sleeping %s ms before start streaming/fetching ranges", RING_DELAY), true);
Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS);
RangeRelocator relocator = new RangeRelocator(Collections.singleton(newToken), keyspacesToProcess);
if (relocator.streamsNeeded())
{
setMode(Mode.MOVING, "fetching new ranges and streaming old ranges", true);
try
{
relocator.stream().get();
}
catch (ExecutionException | InterruptedException e)
{
throw new RuntimeException("Interrupted while waiting for stream/fetch ranges to finish: " + e.getMessage());
}
}
else
{
setMode(Mode.MOVING, "No ranges to fetch/stream", true);
bool streams_needed() {
return !_stream_plan.is_empty();
}
};
set_tokens(Collections.singleton(newToken)); // setting new token as we have everything settled
if (logger.isDebugEnabled())
logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next());
}
private class RangeRelocator
{
private final StreamPlan streamPlan = new StreamPlan("Relocation");
private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames)
{
calculateToFromStreams(tokens, keyspaceNames);
}
private void calculateToFromStreams(Collection<Token> newTokens, List<String> keyspaceNames)
{
InetAddress localAddress = FBUtilities.getBroadcastAddress();
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
TokenMetadata tokenMetaCloneAllSettled = _token_metadata.cloneAfterAllSettled();
// clone to avoid concurrent modification in calculateNaturalEndpoints
TokenMetadata tokenMetaClone = _token_metadata.cloneOnlyTokenMap();
for (String keyspace : keyspaceNames)
{
logger.debug("Calculating ranges to stream and request for keyspace {}", keyspace);
for (Token newToken : newTokens)
{
// replication strategy of the current keyspace (aka table)
AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
// getting collection of the currently used ranges by this keyspace
Collection<Range<Token>> currentRanges = getRangesForEndpoint(keyspace, localAddress);
// collection of ranges which this node will serve after move to the new token
Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress);
// ring ranges and endpoints associated with them
// this used to determine what nodes should we ping about range data
Multimap<Range<Token>, InetAddress> rangeAddresses = strategy.getRangeAddresses(tokenMetaClone);
// calculated parts of the ranges to request/stream from/to nodes in the ring
Pair<Set<Range<Token>>, Set<Range<Token>>> rangesPerKeyspace = calculateStreamAndFetchRanges(currentRanges, updatedRanges);
/**
* In this loop we are going through all ranges "to fetch" and determining
* nodes in the ring responsible for data we are interested in
*/
Multimap<Range<Token>, InetAddress> rangesToFetchWithPreferredEndpoints = ArrayListMultimap.create();
for (Range<Token> toFetch : rangesPerKeyspace.right)
{
for (Range<Token> range : rangeAddresses.keySet())
{
if (range.contains(toFetch))
{
List<InetAddress> endpoints = null;
if (RangeStreamer.useStrictConsistency)
{
Set<InetAddress> oldEndpoints = Sets.newHashSet(rangeAddresses.get(range));
Set<InetAddress> newEndpoints = Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, tokenMetaCloneAllSettled));
//Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
//So we need to be careful to only be strict when endpoints == RF
if (oldEndpoints.size() == strategy.getReplicationFactor())
{
oldEndpoints.removeAll(newEndpoints);
//No relocation required
if (oldEndpoints.isEmpty())
continue;
assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size();
}
endpoints = Lists.newArrayList(oldEndpoints.iterator().next());
}
else
{
endpoints = snitch.getSortedListByProximity(localAddress, rangeAddresses.get(range));
}
// storing range and preferred endpoint set
rangesToFetchWithPreferredEndpoints.putAll(toFetch, endpoints);
}
}
Collection<InetAddress> addressList = rangesToFetchWithPreferredEndpoints.get(toFetch);
if (addressList == null || addressList.isEmpty())
continue;
if (RangeStreamer.useStrictConsistency)
{
if (addressList.size() > 1)
throw new IllegalStateException("Multiple strict sources found for " + toFetch);
InetAddress sourceIp = addressList.iterator().next();
if (Gossiper.instance.isEnabled() && !Gossiper.instance.getEndpointStateForEndpoint(sourceIp).isAlive())
throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+"). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false");
}
}
// calculating endpoints to stream current ranges to if needed
// in some situations node will handle current ranges as part of the new ranges
Multimap<InetAddress, Range<Token>> endpointRanges = HashMultimap.create();
for (Range<Token> toStream : rangesPerKeyspace.left)
{
Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone));
Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaCloneAllSettled));
logger.debug("Range: {} Current endpoints: {} New endpoints: {}", toStream, currentEndpoints, newEndpoints);
for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
{
logger.debug("Range {} has new owner {}", toStream, address);
endpointRanges.put(address, toStream);
}
}
// stream ranges
for (InetAddress address : endpointRanges.keySet())
{
logger.debug("Will stream range {} of keyspace {} to endpoint {}", endpointRanges.get(address), keyspace, address);
InetAddress preferred = SystemKeyspace.getPreferredIP(address);
streamPlan.transferRanges(address, preferred, keyspace, endpointRanges.get(address));
}
// stream requests
Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace);
for (InetAddress address : workMap.keySet())
{
logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address);
InetAddress preferred = SystemKeyspace.getPreferredIP(address);
streamPlan.requestRanges(address, preferred, keyspace, workMap.get(address));
}
logger.debug("Keyspace {}: work map {}.", keyspace, workMap);
}
}
}
public Future<StreamState> stream()
{
return streamPlan.execute();
}
public boolean streamsNeeded()
{
return !streamPlan.isEmpty();
}
}
#if 0
/**
* Get the status of a token removal.
@@ -2268,7 +2091,7 @@ private:
*/
future<> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> ranges_to_stream_by_keyspace);
#if 0
public:
/**
* Calculate pair of ranges to stream/fetch for given two range collections
* (current ranges for keyspace and ranges after move to new token)
@@ -2277,51 +2100,9 @@ private:
* @param updated collection of the ranges after token is changed
* @return pair of ranges to stream/fetch for given current and updated range collections
*/
public Pair<Set<Range<Token>>, Set<Range<Token>>> calculateStreamAndFetchRanges(Collection<Range<Token>> current, Collection<Range<Token>> updated)
{
Set<Range<Token>> toStream = new HashSet<>();
Set<Range<Token>> toFetch = new HashSet<>();
for (Range r1 : current)
{
boolean intersect = false;
for (Range r2 : updated)
{
if (r1.intersects(r2))
{
// adding difference ranges to fetch from a ring
toStream.addAll(r1.subtract(r2));
intersect = true;
}
}
if (!intersect)
{
toStream.add(r1); // should seed whole old range
}
}
for (Range r2 : updated)
{
boolean intersect = false;
for (Range r1 : current)
{
if (r2.intersects(r1))
{
// adding difference ranges to fetch from a ring
toFetch.addAll(r2.subtract(r1));
intersect = true;
}
}
if (!intersect)
{
toFetch.add(r2); // should fetch whole old range
}
}
return Pair.create(toStream, toFetch);
}
std::pair<std::unordered_set<range<token>>, std::unordered_set<range<token>>>
calculate_stream_and_fetch_ranges(const std::vector<range<token>>& current, const std::vector<range<token>>& updated);
#if 0
public void bulkLoad(String directory)
{
try

View File

@@ -43,7 +43,6 @@ static atomic_cell make_atomic_cell(bytes value) {
SEASTAR_TEST_CASE(test_execute_batch) {
return do_with_cql_env([] (auto& e) {
db::system_keyspace::minimal_setup(e.db(), e.qp());
auto& qp = e.local_qp();
auto bp = make_lw_shared<db::batchlog_manager>(qp);

View File

@@ -110,6 +110,7 @@ future<> init_once(shared_ptr<distributed<database>> db) {
class single_node_cql_env : public cql_test_env {
public:
static auto constexpr ks_name = "ks";
static std::atomic<bool> active;
private:
::shared_ptr<distributed<database>> _db;
::shared_ptr<distributed<cql3::query_processor>> _qp;
@@ -275,6 +276,11 @@ public:
}
future<> start() {
bool old_active = false;
if (!active.compare_exchange_strong(old_active, true)) {
throw std::runtime_error("Starting more than one cql_test_env at a time not supported "
"due to singletons.");
}
return seastar::async([this] {
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
@@ -305,6 +311,8 @@ public:
mm.start().get();
qp->start(std::ref(proxy), std::ref(*db)).get();
db::system_keyspace::minimal_setup(*db, *qp);
auto& ss = service::get_local_storage_service();
static bool storage_service_started = false;
if (!storage_service_started) {
@@ -324,22 +332,22 @@ public:
}
virtual future<> stop() override {
return _core_local.stop().then([this] {
return db::get_batchlog_manager().stop().then([this] {
return _qp->stop().then([this] {
return service::get_migration_manager().stop().then([this] {
return service::get_storage_proxy().stop().then([this] {
return _db->stop().then([this] {
return locator::i_endpoint_snitch::stop_snitch();
});
});
});
});
});
return seastar::async([this] {
_core_local.stop().get();
db::get_batchlog_manager().stop().get();
_qp->stop().get();
service::get_migration_manager().stop().get();
service::get_storage_proxy().stop().get();
_db->stop().get();
locator::i_endpoint_snitch::stop_snitch().get();
bool old_active = true;
assert(active.compare_exchange_strong(old_active, false));
});
}
};
std::atomic<bool> single_node_cql_env::active = { false };
future<::shared_ptr<cql_test_env>> make_env_for_test() {
return seastar::async([] {
auto env = ::make_shared<single_node_cql_env>();

View File

@@ -327,6 +327,53 @@ BOOST_AUTO_TEST_CASE(test_range_contains) {
BOOST_REQUIRE(!range<int>({3}, {1}).contains(range<int>({{1, false}}, {3}), cmp));
}
BOOST_AUTO_TEST_CASE(test_range_subtract) {
auto cmp = [] (int i1, int i2) -> int { return i1 - i2; };
using r = range<int>;
using vec = std::vector<r>;
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({0}, {1}), cmp), vec({r({2}, {4})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {1}), cmp), vec({r({2}, {4})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {2}), cmp), vec({r({{2, false}}, {4})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {3}), cmp), vec({r({{3, false}}, {4})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {4}), cmp), vec());
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({1}, {4}), cmp), vec());
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({1}, {3}), cmp), vec({r({{3, false}}, {4})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({1}, {{3, false}}), cmp), vec({r({3}, {4})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({2}, {4}), cmp), vec());
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {{4, false}}), cmp), vec({r({4}, {4})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({}, {}), cmp), vec());
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({{2, false}}, {}), cmp), vec({r({2}, {2})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({{2, false}}, {4}), cmp), vec({r({2}, {2})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({3}, {5}), cmp), vec({r({2}, {{3, false}})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r({3}, {1}), cmp), vec({r({2}, {{3, false}})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r::make_singular(3), cmp), vec({r({2}, {{3, false}}), r({{3, false}}, {4})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r::make_singular(4), cmp), vec({r({2}, {{4, false}})}));
BOOST_REQUIRE_EQUAL(r({2}, {4}).subtract(r::make_singular(5), cmp), vec({r({2}, {4})}));
BOOST_REQUIRE_EQUAL(r({}, {4}).subtract(r({3}, {5}), cmp), vec({r({}, {{3, false}})}));
BOOST_REQUIRE_EQUAL(r({}, {4}).subtract(r({5}, {}), cmp), vec({r({}, {4})}));
BOOST_REQUIRE_EQUAL(r({}, {4}).subtract(r({5}, {6}), cmp), vec({r({}, {4})}));
BOOST_REQUIRE_EQUAL(r({}, {4}).subtract(r({5}, {2}), cmp), vec({r({{2, false}}, {4})}));
BOOST_REQUIRE_EQUAL(r({4}, {}).subtract(r({3}, {5}), cmp), vec({r({{5, false}}, {})}));
BOOST_REQUIRE_EQUAL(r({4}, {}).subtract(r({1}, {3}), cmp), vec({r({4}, {})}));
BOOST_REQUIRE_EQUAL(r({4}, {}).subtract(r({}, {3}), cmp), vec({r({4}, {})}));
BOOST_REQUIRE_EQUAL(r({4}, {}).subtract(r({7}, {5}), cmp), vec({r({{5, false}}, {{7, false}})}));
BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({6}, {}), cmp), vec({r({}, {1}), r({5}, {{6, false}})}));
BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({6}, {1}), cmp), vec({r({5}, {{6, false}})}));
BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({6}, {2}), cmp), vec({r({5}, {{6, false}})}));
// FIXME: Also accept adjacent ranges merged
BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({4}, {7}), cmp), vec({r({}, {1}), r({{7, false}}, {})}));
// FIXME: Also accept adjacent ranges merged
BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({6}, {7}), cmp), vec({r({}, {1}), r({5}, {{6, false}}), r({{7, false}}, {})}));
BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({6}, {0}), cmp), vec({r({{0, false}}, {1}), r({5}, {{6, false}})}));
BOOST_REQUIRE_EQUAL(r({5}, {1}).subtract(r({}, {0}), cmp), vec({r({{0, false}}, {1}), r({5}, {})}));
}
struct unsigned_comparator {
int operator()(unsigned u1, unsigned u2) const {
return (u1 > u2 ? 1 : (u1 == u2 ? 0 : -1));