Merge "Use range_streamer everywhere" from Asias

"With this series, all the following cluster operations:

- bootstrap
- rebuild
- decommission
- removenode

will use the same code to do the streaming.

The range_streamer is now extended to support both fetch from and push
to peer node. Another big change is now the range_streamer will stream
less ranges at a time, so less data, per stream_plan and range_streamer
will remember which ranges are failed to stream and can retry later.

The retry policy is very simple at the moment it retries at most 5 times
and sleep 1 minutes, 1.5^2 minutes, 1.5^3 minutes ....

Later, we can introduce api for user to decide when to stop retrying and
the retry interval.

The benefits:

 - All the cluster operation shares the same code to stream
 - We can know the operation progress, e.g., we can know total number of
   ranges need to be streamed and number of ranges finished in
   bootstrap, decommission and etc.
 - All the cluster operation can survive peer node down during the
   operation which usually takes long time to complete, e.g., when adding
   a new node, currently if any of the existing node which streams data to
   the new node had issue sending data to the new node, the whole bootstrap
   process will fail. After this patch, we can fix the problematic node
   and restart it, the joining node will retry streaming from the node
   again.
 - We can fail streaming early and timeout early and retry less because
   all the operations use stream can survive failure of a single
   stream_plan. It is not that important for now to have to make a single
   stream_plan successful. Note, another user of streaming, repair, is now
   using small stream_plan as well and can rerun the repair for the
   failed ranges too.

This is one step closer to supporting the resumable add/remove node
opeartions."

* tag 'asias/use_range_streamer_everywhere_v4' of github.com:cloudius-systems/seastar-dev:
  storage_service: Use the new range_streamer interface for removenode
  storage_service: Use the new range_streamer interface for decommission
  storage_service: Use the new range_streamer interface for rebuild
  storage_service: Use the new range_streamer interface for bootstrap
  dht: Extend range_streamer interface
This commit is contained in:
Avi Kivity
2017-08-09 10:00:25 +03:00
4 changed files with 173 additions and 75 deletions

View File

@@ -59,14 +59,11 @@ future<> boot_strapper::bootstrap() {
streamer->add_ranges(keyspace_name, ranges);
}
return streamer->fetch_async().then_wrapped([streamer] (auto&& f) {
try {
auto state = f.get0();
} catch (...) {
throw std::runtime_error(sprint("Error during boostrap: %s", std::current_exception()));
}
return streamer->stream_async().then([streamer] () {
service::get_local_storage_service().finish_bootstrapping();
return make_ready_future<>();
}).handle_exception([streamer] (std::exception_ptr eptr) {
blogger.warn("Eror during bootstrap: {}", eptr);
return make_exception_future<>(std::move(eptr));
});
}

View File

@@ -211,7 +211,36 @@ bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name)
&& _metadata.get_all_endpoints().size() != strat.get_replication_factor();
}
void range_streamer::add_tx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint, std::vector<sstring> column_families) {
if (_nr_rx_added) {
throw std::runtime_error("Mixed sending and receiving is not supported");
}
_nr_tx_added++;
_to_stream.emplace(keyspace_name, std::move(ranges_per_endpoint));
auto inserted = _column_families.emplace(keyspace_name, std::move(column_families)).second;
if (!inserted) {
throw std::runtime_error("Can not add column_families for the same keyspace more than once");
}
}
void range_streamer::add_rx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint, std::vector<sstring> column_families) {
if (_nr_tx_added) {
throw std::runtime_error("Mixed sending and receiving is not supported");
}
_nr_rx_added++;
_to_stream.emplace(keyspace_name, std::move(ranges_per_endpoint));
auto inserted = _column_families.emplace(keyspace_name, std::move(column_families)).second;
if (!inserted) {
throw std::runtime_error("Can not add column_families for the same keyspace more than once");
}
}
// TODO: This is the legacy range_streamer interface, it is add_rx_ranges which adds rx ranges.
void range_streamer::add_ranges(const sstring& keyspace_name, dht::token_range_vector ranges) {
if (_nr_tx_added) {
throw std::runtime_error("Mixed sending and receiving is not supported");
}
_nr_rx_added++;
auto ranges_for_keyspace = use_strict_sources_for_ranges(keyspace_name)
? get_all_ranges_with_strict_sources_for(keyspace_name, ranges)
: get_all_ranges_with_sources_for(keyspace_name, ranges);
@@ -232,26 +261,113 @@ void range_streamer::add_ranges(const sstring& keyspace_name, dht::token_range_v
logger.debug("{} : range {} from source {} for keyspace {}", _description, x.second, x.first, keyspace_name);
}
}
_to_fetch.emplace(keyspace_name, std::move(range_fetch_map));
_to_stream.emplace(keyspace_name, std::move(range_fetch_map));
}
future<streaming::stream_state> range_streamer::fetch_async() {
for (auto& fetch : _to_fetch) {
const auto& keyspace = fetch.first;
for (auto& x : fetch.second) {
auto& source = x.first;
auto& ranges = x.second;
/* Send messages to respective folks to stream data over to me */
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("{}ing from {} ranges {}", _description, source, ranges);
future<> range_streamer::stream_async() {
return seastar::async([this] {
int sleep_time = 60;
for (;;) {
try {
do_stream_async().get();
break;
} catch (...) {
logger.warn("{} failed to stream. Will retry in {} seconds ...", _description, sleep_time);
sleep_abortable(std::chrono::seconds(sleep_time)).get();
sleep_time *= 1.5;
if (++_nr_retried >= _nr_max_retry) {
throw;
}
}
_stream_plan.request_ranges(source, keyspace, ranges);
}
});
}
future<> range_streamer::do_stream_async() {
auto nr_ranges_remaining = nr_ranges_to_stream();
logger.info("{} starts, nr_ranges_remaining={}", _description, nr_ranges_remaining);
auto start = lowres_clock::now();
return do_for_each(_to_stream, [this, start, description = _description] (auto& stream) {
const auto& keyspace = stream.first;
auto& ip_range_vec = stream.second;
// Fetch from or send to peer node in parallel
return parallel_for_each(ip_range_vec, [this, description, keyspace] (auto& ip_range) {
auto& source = ip_range.first;
auto& range_vec = ip_range.second;
return seastar::async([this, description, keyspace, source, &range_vec] () mutable {
// TODO: It is better to use fiber instead of thread here because
// creating a thread per peer can be some memory in a large cluster.
auto start_time = lowres_clock::now();
unsigned sp_index = 0;
unsigned nr_ranges_streamed = 0;
size_t nr_ranges_total = range_vec.size();
dht::token_range_vector ranges_to_stream;
auto do_streaming = [&] {
auto sp = stream_plan(sprint("%s-%s-index-%d", description, keyspace, sp_index++));
logger.info("{} with {} for keyspace={}, {} out of {} ranges: ranges = {}",
description, source, keyspace, nr_ranges_streamed, nr_ranges_total, ranges_to_stream.size());
if (_nr_rx_added) {
sp.request_ranges(source, keyspace, ranges_to_stream, _column_families[keyspace]);
} else if (_nr_tx_added) {
sp.transfer_ranges(source, keyspace, ranges_to_stream, _column_families[keyspace]);
}
sp.execute().discard_result().get();
ranges_to_stream.clear();
};
try {
for (auto it = range_vec.begin(); it < range_vec.end();) {
it = range_vec.erase(it);
ranges_to_stream.push_back(*it);
nr_ranges_streamed++;
if (ranges_to_stream.size() < _nr_ranges_per_stream_plan) {
continue;
} else {
do_streaming();
}
}
if (ranges_to_stream.size() > 0) {
do_streaming();
}
} catch (...) {
for (auto& range : ranges_to_stream) {
range_vec.push_back(range);
}
auto t = std::chrono::duration_cast<std::chrono::seconds>(lowres_clock::now() - start_time).count();
logger.warn("{} with {} for keyspace={} failed, took {} seconds: {}", description, keyspace, source, t, std::current_exception());
throw;
}
auto t = std::chrono::duration_cast<std::chrono::seconds>(lowres_clock::now() - start_time).count();
logger.info("{} with {} for keyspace={} succeeded, took {} seconds", description, keyspace, source, t);
});
});
}).finally([this, start] {
auto t = std::chrono::duration_cast<std::chrono::seconds>(lowres_clock::now() - start).count();
auto nr_ranges_remaining = nr_ranges_to_stream();
if (nr_ranges_remaining) {
logger.warn("{} failed, took {} seconds, nr_ranges_remaining={}", _description, t, nr_ranges_remaining);
} else {
logger.info("{} succeeded, took {} seconds, nr_ranges_remaining={}", _description, t, nr_ranges_remaining);
}
});
}
size_t range_streamer::nr_ranges_to_stream() {
size_t nr_ranges_remaining = 0;
for (auto& fetch : _to_stream) {
const auto& keyspace = fetch.first;
auto& ip_range_vec = fetch.second;
for (auto& ip_range : ip_range_vec) {
auto& source = ip_range.first;
auto& range_vec = ip_range.second;
nr_ranges_remaining += range_vec.size();
logger.debug("Remaining: keyspace={}, source={}, ranges={}", keyspace, source, range_vec);
}
}
return _stream_plan.execute();
return nr_ranges_remaining;
}
std::unordered_multimap<inet_address, dht::token_range>
range_streamer::get_work_map(const std::unordered_multimap<dht::token_range, inet_address>& ranges_with_source_target,
const sstring& keyspace) {

View File

@@ -119,6 +119,8 @@ public:
}
void add_ranges(const sstring& keyspace_name, dht::token_range_vector ranges);
void add_tx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint, std::vector<sstring> column_families = {});
void add_rx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint, std::vector<sstring> column_families = {});
private:
bool use_strict_sources_for_ranges(const sstring& keyspace_name);
/**
@@ -159,16 +161,27 @@ public:
}
#endif
public:
future<streaming::stream_state> fetch_async();
future<> stream_async();
future<> do_stream_async();
size_t nr_ranges_to_stream();
private:
distributed<database>& _db;
token_metadata& _metadata;
std::unordered_set<token> _tokens;
inet_address _address;
sstring _description;
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> _to_fetch;
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> _to_stream;
std::unordered_set<std::unique_ptr<i_source_filter>> _source_filters;
stream_plan _stream_plan;
std::unordered_map<sstring, std::vector<sstring>> _column_families;
// Number of ranges to stream per stream plan
unsigned _nr_ranges_per_stream_plan = 10;
// Retry the stream plan _nr_max_retry times
unsigned _nr_retried = 0;
unsigned _nr_max_retry = 5;
// Number of tx and rx ranges added
unsigned _nr_tx_added = 0;
unsigned _nr_rx_added = 0;
};
} // dht

View File

@@ -2399,15 +2399,12 @@ future<> storage_service::rebuild(sstring source_dc) {
for (const auto& keyspace_name : ss._db.local().get_non_system_keyspaces()) {
streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name));
}
return streamer->fetch_async().then_wrapped([streamer] (auto&& f) {
try {
auto state = f.get0();
} catch (...) {
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
slogger.error("Error while rebuilding node: {}", std::current_exception());
throw std::runtime_error(sprint("Error while rebuilding node: %s", std::current_exception()));
}
return make_ready_future<>();
return streamer->stream_async().then([streamer] {
slogger.info("Streaming for rebuild successful");
}).handle_exception([] (auto ep) {
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
slogger.warn("Error while rebuilding node: {}", std::current_exception());
return make_exception_future<>(std::move(ep));
});
});
}
@@ -2534,10 +2531,8 @@ void storage_service::unbootstrap() {
}
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> ranges_to_fetch;
auto streamer = make_lw_shared<dht::range_streamer>(_db, get_token_metadata(), get_broadcast_address(), "Restore_replica_count");
auto my_address = get_broadcast_address();
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint);
@@ -2548,26 +2543,15 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
}
}
std::unordered_multimap<inet_address, dht::token_range> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges);
std::unordered_map<inet_address, dht::token_range_vector> tmp;
std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint;
for (auto& x : source_ranges) {
tmp[x.first].emplace_back(x.second);
ranges_per_endpoint[x.first].emplace_back(x.second);
}
ranges_to_fetch.emplace(keyspace_name, std::move(tmp));
streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint));
}
auto sp = make_lw_shared<streaming::stream_plan>("Restore replica count");
for (auto& x: ranges_to_fetch) {
const sstring& keyspace_name = x.first;
std::unordered_map<inet_address, dht::token_range_vector>& maps = x.second;
for (auto& m : maps) {
auto source = m.first;
auto ranges = m.second;
slogger.debug("Requesting from {} ranges {}", source, ranges);
sp->request_ranges(source, keyspace_name, ranges);
}
}
return sp->execute().then_wrapped([this, sp, notify_endpoint] (auto&& f) {
return streamer->stream_async().then_wrapped([this, streamer, notify_endpoint] (auto&& f) {
try {
auto state = f.get0();
f.get();
return this->send_replication_notification(notify_endpoint);
} catch (...) {
slogger.warn("Streaming to restore replica count failed: {}", std::current_exception());
@@ -2659,8 +2643,7 @@ void storage_service::leave_ring() {
future<>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream_by_keyspace) {
// First, we build a list of ranges to stream to each host, per table
std::unordered_map<sstring, std::unordered_map<inet_address, dht::token_range_vector>> sessions_to_stream_by_keyspace;
auto streamer = make_lw_shared<dht::range_streamer>(_db, get_token_metadata(), get_broadcast_address(), "Unbootstrap");
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
@@ -2675,26 +2658,13 @@ storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multim
inet_address endpoint = end_point_entry.second;
ranges_per_endpoint[endpoint].emplace_back(r);
}
sessions_to_stream_by_keyspace.emplace(keyspace, std::move(ranges_per_endpoint));
streamer->add_tx_ranges(keyspace, std::move(ranges_per_endpoint));
}
auto sp = make_lw_shared<streaming::stream_plan>("Unbootstrap");
for (auto& entry : sessions_to_stream_by_keyspace) {
const auto& keyspace_name = entry.first;
// TODO: we can move to avoid copy of std::vector
auto& ranges_per_endpoint = entry.second;
for (auto& ranges_entry : ranges_per_endpoint) {
auto& ranges = ranges_entry.second;
auto new_endpoint = ranges_entry.first;
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
sp->transfer_ranges(new_endpoint, keyspace_name, ranges);
}
}
return sp->execute().discard_result().then([sp] {
return streamer->stream_async().then([streamer] {
slogger.info("stream_ranges successful");
}).handle_exception([] (auto ep) {
slogger.info("stream_ranges failed: {}", ep);
return make_exception_future(std::runtime_error("stream_ranges failed"));
slogger.warn("stream_ranges failed: {}", ep);
return make_exception_future<>(std::move(ep));
});
}
@@ -2728,16 +2698,18 @@ future<> storage_service::stream_hints() {
// stream all hints -- range list will be a singleton of "the entire ring"
dht::token_range_vector ranges = {dht::token_range::make_open_ended_both_sides()};
slogger.debug("stream_hints: ranges={}", ranges);
std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint;
ranges_per_endpoint[hints_destination_host] = std::move(ranges);
auto sp = make_lw_shared<streaming::stream_plan>("Hints");
std::vector<sstring> column_families = { db::system_keyspace::HINTS };
auto streamer = make_lw_shared<dht::range_streamer>(_db, get_token_metadata(), get_broadcast_address(), "Hints");
auto keyspace = db::system_keyspace::NAME;
sp->transfer_ranges(hints_destination_host, keyspace, ranges, column_families);
return sp->execute().discard_result().then([sp] {
std::vector<sstring> column_families = { db::system_keyspace::HINTS };
streamer->add_tx_ranges(keyspace, std::move(ranges_per_endpoint), column_families);
return streamer->stream_async().then([streamer] {
slogger.info("stream_hints successful");
}).handle_exception([] (auto ep) {
slogger.info("stream_hints failed: {}", ep);
return make_exception_future(std::runtime_error("stream_hints failed"));
slogger.warn("stream_hints failed: {}", ep);
return make_exception_future<>(std::move(ep));
});
}
}