Merge "Use range_streamer everywhere" from Asias
"With this series, all the following cluster operations: - bootstrap - rebuild - decommission - removenode will use the same code to do the streaming. The range_streamer is now extended to support both fetch from and push to peer node. Another big change is now the range_streamer will stream less ranges at a time, so less data, per stream_plan and range_streamer will remember which ranges are failed to stream and can retry later. The retry policy is very simple at the moment it retries at most 5 times and sleep 1 minutes, 1.5^2 minutes, 1.5^3 minutes .... Later, we can introduce api for user to decide when to stop retrying and the retry interval. The benefits: - All the cluster operation shares the same code to stream - We can know the operation progress, e.g., we can know total number of ranges need to be streamed and number of ranges finished in bootstrap, decommission and etc. - All the cluster operation can survive peer node down during the operation which usually takes long time to complete, e.g., when adding a new node, currently if any of the existing node which streams data to the new node had issue sending data to the new node, the whole bootstrap process will fail. After this patch, we can fix the problematic node and restart it, the joining node will retry streaming from the node again. - We can fail streaming early and timeout early and retry less because all the operations use stream can survive failure of a single stream_plan. It is not that important for now to have to make a single stream_plan successful. Note, another user of streaming, repair, is now using small stream_plan as well and can rerun the repair for the failed ranges too. This is one step closer to supporting the resumable add/remove node opeartions." * tag 'asias/use_range_streamer_everywhere_v4' of github.com:cloudius-systems/seastar-dev: storage_service: Use the new range_streamer interface for removenode storage_service: Use the new range_streamer interface for decommission storage_service: Use the new range_streamer interface for rebuild storage_service: Use the new range_streamer interface for bootstrap dht: Extend range_streamer interface
This commit is contained in:
@@ -59,14 +59,11 @@ future<> boot_strapper::bootstrap() {
|
||||
streamer->add_ranges(keyspace_name, ranges);
|
||||
}
|
||||
|
||||
return streamer->fetch_async().then_wrapped([streamer] (auto&& f) {
|
||||
try {
|
||||
auto state = f.get0();
|
||||
} catch (...) {
|
||||
throw std::runtime_error(sprint("Error during boostrap: %s", std::current_exception()));
|
||||
}
|
||||
return streamer->stream_async().then([streamer] () {
|
||||
service::get_local_storage_service().finish_bootstrapping();
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([streamer] (std::exception_ptr eptr) {
|
||||
blogger.warn("Eror during bootstrap: {}", eptr);
|
||||
return make_exception_future<>(std::move(eptr));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -211,7 +211,36 @@ bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name)
|
||||
&& _metadata.get_all_endpoints().size() != strat.get_replication_factor();
|
||||
}
|
||||
|
||||
void range_streamer::add_tx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint, std::vector<sstring> column_families) {
|
||||
if (_nr_rx_added) {
|
||||
throw std::runtime_error("Mixed sending and receiving is not supported");
|
||||
}
|
||||
_nr_tx_added++;
|
||||
_to_stream.emplace(keyspace_name, std::move(ranges_per_endpoint));
|
||||
auto inserted = _column_families.emplace(keyspace_name, std::move(column_families)).second;
|
||||
if (!inserted) {
|
||||
throw std::runtime_error("Can not add column_families for the same keyspace more than once");
|
||||
}
|
||||
}
|
||||
|
||||
void range_streamer::add_rx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint, std::vector<sstring> column_families) {
|
||||
if (_nr_tx_added) {
|
||||
throw std::runtime_error("Mixed sending and receiving is not supported");
|
||||
}
|
||||
_nr_rx_added++;
|
||||
_to_stream.emplace(keyspace_name, std::move(ranges_per_endpoint));
|
||||
auto inserted = _column_families.emplace(keyspace_name, std::move(column_families)).second;
|
||||
if (!inserted) {
|
||||
throw std::runtime_error("Can not add column_families for the same keyspace more than once");
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: This is the legacy range_streamer interface, it is add_rx_ranges which adds rx ranges.
|
||||
void range_streamer::add_ranges(const sstring& keyspace_name, dht::token_range_vector ranges) {
|
||||
if (_nr_tx_added) {
|
||||
throw std::runtime_error("Mixed sending and receiving is not supported");
|
||||
}
|
||||
_nr_rx_added++;
|
||||
auto ranges_for_keyspace = use_strict_sources_for_ranges(keyspace_name)
|
||||
? get_all_ranges_with_strict_sources_for(keyspace_name, ranges)
|
||||
: get_all_ranges_with_sources_for(keyspace_name, ranges);
|
||||
@@ -232,26 +261,113 @@ void range_streamer::add_ranges(const sstring& keyspace_name, dht::token_range_v
|
||||
logger.debug("{} : range {} from source {} for keyspace {}", _description, x.second, x.first, keyspace_name);
|
||||
}
|
||||
}
|
||||
_to_fetch.emplace(keyspace_name, std::move(range_fetch_map));
|
||||
_to_stream.emplace(keyspace_name, std::move(range_fetch_map));
|
||||
}
|
||||
|
||||
future<streaming::stream_state> range_streamer::fetch_async() {
|
||||
for (auto& fetch : _to_fetch) {
|
||||
const auto& keyspace = fetch.first;
|
||||
for (auto& x : fetch.second) {
|
||||
auto& source = x.first;
|
||||
auto& ranges = x.second;
|
||||
/* Send messages to respective folks to stream data over to me */
|
||||
if (logger.is_enabled(logging::log_level::debug)) {
|
||||
logger.debug("{}ing from {} ranges {}", _description, source, ranges);
|
||||
future<> range_streamer::stream_async() {
|
||||
return seastar::async([this] {
|
||||
int sleep_time = 60;
|
||||
for (;;) {
|
||||
try {
|
||||
do_stream_async().get();
|
||||
break;
|
||||
} catch (...) {
|
||||
logger.warn("{} failed to stream. Will retry in {} seconds ...", _description, sleep_time);
|
||||
sleep_abortable(std::chrono::seconds(sleep_time)).get();
|
||||
sleep_time *= 1.5;
|
||||
if (++_nr_retried >= _nr_max_retry) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
_stream_plan.request_ranges(source, keyspace, ranges);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> range_streamer::do_stream_async() {
|
||||
auto nr_ranges_remaining = nr_ranges_to_stream();
|
||||
logger.info("{} starts, nr_ranges_remaining={}", _description, nr_ranges_remaining);
|
||||
auto start = lowres_clock::now();
|
||||
return do_for_each(_to_stream, [this, start, description = _description] (auto& stream) {
|
||||
const auto& keyspace = stream.first;
|
||||
auto& ip_range_vec = stream.second;
|
||||
// Fetch from or send to peer node in parallel
|
||||
return parallel_for_each(ip_range_vec, [this, description, keyspace] (auto& ip_range) {
|
||||
auto& source = ip_range.first;
|
||||
auto& range_vec = ip_range.second;
|
||||
return seastar::async([this, description, keyspace, source, &range_vec] () mutable {
|
||||
// TODO: It is better to use fiber instead of thread here because
|
||||
// creating a thread per peer can be some memory in a large cluster.
|
||||
auto start_time = lowres_clock::now();
|
||||
unsigned sp_index = 0;
|
||||
unsigned nr_ranges_streamed = 0;
|
||||
size_t nr_ranges_total = range_vec.size();
|
||||
dht::token_range_vector ranges_to_stream;
|
||||
auto do_streaming = [&] {
|
||||
auto sp = stream_plan(sprint("%s-%s-index-%d", description, keyspace, sp_index++));
|
||||
logger.info("{} with {} for keyspace={}, {} out of {} ranges: ranges = {}",
|
||||
description, source, keyspace, nr_ranges_streamed, nr_ranges_total, ranges_to_stream.size());
|
||||
if (_nr_rx_added) {
|
||||
sp.request_ranges(source, keyspace, ranges_to_stream, _column_families[keyspace]);
|
||||
} else if (_nr_tx_added) {
|
||||
sp.transfer_ranges(source, keyspace, ranges_to_stream, _column_families[keyspace]);
|
||||
}
|
||||
sp.execute().discard_result().get();
|
||||
ranges_to_stream.clear();
|
||||
};
|
||||
try {
|
||||
for (auto it = range_vec.begin(); it < range_vec.end();) {
|
||||
it = range_vec.erase(it);
|
||||
ranges_to_stream.push_back(*it);
|
||||
nr_ranges_streamed++;
|
||||
if (ranges_to_stream.size() < _nr_ranges_per_stream_plan) {
|
||||
continue;
|
||||
} else {
|
||||
do_streaming();
|
||||
}
|
||||
}
|
||||
if (ranges_to_stream.size() > 0) {
|
||||
do_streaming();
|
||||
}
|
||||
} catch (...) {
|
||||
for (auto& range : ranges_to_stream) {
|
||||
range_vec.push_back(range);
|
||||
}
|
||||
auto t = std::chrono::duration_cast<std::chrono::seconds>(lowres_clock::now() - start_time).count();
|
||||
logger.warn("{} with {} for keyspace={} failed, took {} seconds: {}", description, keyspace, source, t, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
auto t = std::chrono::duration_cast<std::chrono::seconds>(lowres_clock::now() - start_time).count();
|
||||
logger.info("{} with {} for keyspace={} succeeded, took {} seconds", description, keyspace, source, t);
|
||||
});
|
||||
|
||||
});
|
||||
}).finally([this, start] {
|
||||
auto t = std::chrono::duration_cast<std::chrono::seconds>(lowres_clock::now() - start).count();
|
||||
auto nr_ranges_remaining = nr_ranges_to_stream();
|
||||
if (nr_ranges_remaining) {
|
||||
logger.warn("{} failed, took {} seconds, nr_ranges_remaining={}", _description, t, nr_ranges_remaining);
|
||||
} else {
|
||||
logger.info("{} succeeded, took {} seconds, nr_ranges_remaining={}", _description, t, nr_ranges_remaining);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
size_t range_streamer::nr_ranges_to_stream() {
|
||||
size_t nr_ranges_remaining = 0;
|
||||
for (auto& fetch : _to_stream) {
|
||||
const auto& keyspace = fetch.first;
|
||||
auto& ip_range_vec = fetch.second;
|
||||
for (auto& ip_range : ip_range_vec) {
|
||||
auto& source = ip_range.first;
|
||||
auto& range_vec = ip_range.second;
|
||||
nr_ranges_remaining += range_vec.size();
|
||||
logger.debug("Remaining: keyspace={}, source={}, ranges={}", keyspace, source, range_vec);
|
||||
}
|
||||
}
|
||||
|
||||
return _stream_plan.execute();
|
||||
return nr_ranges_remaining;
|
||||
}
|
||||
|
||||
|
||||
std::unordered_multimap<inet_address, dht::token_range>
|
||||
range_streamer::get_work_map(const std::unordered_multimap<dht::token_range, inet_address>& ranges_with_source_target,
|
||||
const sstring& keyspace) {
|
||||
|
||||
@@ -119,6 +119,8 @@ public:
|
||||
}
|
||||
|
||||
void add_ranges(const sstring& keyspace_name, dht::token_range_vector ranges);
|
||||
void add_tx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint, std::vector<sstring> column_families = {});
|
||||
void add_rx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint, std::vector<sstring> column_families = {});
|
||||
private:
|
||||
bool use_strict_sources_for_ranges(const sstring& keyspace_name);
|
||||
/**
|
||||
@@ -159,16 +161,27 @@ public:
|
||||
}
|
||||
#endif
|
||||
public:
|
||||
future<streaming::stream_state> fetch_async();
|
||||
future<> stream_async();
|
||||
future<> do_stream_async();
|
||||
size_t nr_ranges_to_stream();
|
||||
private:
|
||||
distributed<database>& _db;
|
||||
token_metadata& _metadata;
|
||||
std::unordered_set<token> _tokens;
|
||||
inet_address _address;
|
||||
sstring _description;
|
||||
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> _to_fetch;
|
||||
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> _to_stream;
|
||||
std::unordered_set<std::unique_ptr<i_source_filter>> _source_filters;
|
||||
stream_plan _stream_plan;
|
||||
std::unordered_map<sstring, std::vector<sstring>> _column_families;
|
||||
// Number of ranges to stream per stream plan
|
||||
unsigned _nr_ranges_per_stream_plan = 10;
|
||||
// Retry the stream plan _nr_max_retry times
|
||||
unsigned _nr_retried = 0;
|
||||
unsigned _nr_max_retry = 5;
|
||||
// Number of tx and rx ranges added
|
||||
unsigned _nr_tx_added = 0;
|
||||
unsigned _nr_rx_added = 0;
|
||||
};
|
||||
|
||||
} // dht
|
||||
|
||||
@@ -2399,15 +2399,12 @@ future<> storage_service::rebuild(sstring source_dc) {
|
||||
for (const auto& keyspace_name : ss._db.local().get_non_system_keyspaces()) {
|
||||
streamer->add_ranges(keyspace_name, ss.get_local_ranges(keyspace_name));
|
||||
}
|
||||
return streamer->fetch_async().then_wrapped([streamer] (auto&& f) {
|
||||
try {
|
||||
auto state = f.get0();
|
||||
} catch (...) {
|
||||
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
|
||||
slogger.error("Error while rebuilding node: {}", std::current_exception());
|
||||
throw std::runtime_error(sprint("Error while rebuilding node: %s", std::current_exception()));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return streamer->stream_async().then([streamer] {
|
||||
slogger.info("Streaming for rebuild successful");
|
||||
}).handle_exception([] (auto ep) {
|
||||
// This is used exclusively through JMX, so log the full trace but only throw a simple RTE
|
||||
slogger.warn("Error while rebuilding node: {}", std::current_exception());
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2534,10 +2531,8 @@ void storage_service::unbootstrap() {
|
||||
}
|
||||
|
||||
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
|
||||
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> ranges_to_fetch;
|
||||
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, get_token_metadata(), get_broadcast_address(), "Restore_replica_count");
|
||||
auto my_address = get_broadcast_address();
|
||||
|
||||
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
|
||||
for (const auto& keyspace_name : non_system_keyspaces) {
|
||||
std::unordered_multimap<dht::token_range, inet_address> changed_ranges = get_changed_ranges_for_leaving(keyspace_name, endpoint);
|
||||
@@ -2548,26 +2543,15 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
|
||||
}
|
||||
}
|
||||
std::unordered_multimap<inet_address, dht::token_range> source_ranges = get_new_source_ranges(keyspace_name, my_new_ranges);
|
||||
std::unordered_map<inet_address, dht::token_range_vector> tmp;
|
||||
std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint;
|
||||
for (auto& x : source_ranges) {
|
||||
tmp[x.first].emplace_back(x.second);
|
||||
ranges_per_endpoint[x.first].emplace_back(x.second);
|
||||
}
|
||||
ranges_to_fetch.emplace(keyspace_name, std::move(tmp));
|
||||
streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint));
|
||||
}
|
||||
auto sp = make_lw_shared<streaming::stream_plan>("Restore replica count");
|
||||
for (auto& x: ranges_to_fetch) {
|
||||
const sstring& keyspace_name = x.first;
|
||||
std::unordered_map<inet_address, dht::token_range_vector>& maps = x.second;
|
||||
for (auto& m : maps) {
|
||||
auto source = m.first;
|
||||
auto ranges = m.second;
|
||||
slogger.debug("Requesting from {} ranges {}", source, ranges);
|
||||
sp->request_ranges(source, keyspace_name, ranges);
|
||||
}
|
||||
}
|
||||
return sp->execute().then_wrapped([this, sp, notify_endpoint] (auto&& f) {
|
||||
return streamer->stream_async().then_wrapped([this, streamer, notify_endpoint] (auto&& f) {
|
||||
try {
|
||||
auto state = f.get0();
|
||||
f.get();
|
||||
return this->send_replication_notification(notify_endpoint);
|
||||
} catch (...) {
|
||||
slogger.warn("Streaming to restore replica count failed: {}", std::current_exception());
|
||||
@@ -2659,8 +2643,7 @@ void storage_service::leave_ring() {
|
||||
|
||||
future<>
|
||||
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, inet_address>> ranges_to_stream_by_keyspace) {
|
||||
// First, we build a list of ranges to stream to each host, per table
|
||||
std::unordered_map<sstring, std::unordered_map<inet_address, dht::token_range_vector>> sessions_to_stream_by_keyspace;
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, get_token_metadata(), get_broadcast_address(), "Unbootstrap");
|
||||
for (auto& entry : ranges_to_stream_by_keyspace) {
|
||||
const auto& keyspace = entry.first;
|
||||
auto& ranges_with_endpoints = entry.second;
|
||||
@@ -2675,26 +2658,13 @@ storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multim
|
||||
inet_address endpoint = end_point_entry.second;
|
||||
ranges_per_endpoint[endpoint].emplace_back(r);
|
||||
}
|
||||
sessions_to_stream_by_keyspace.emplace(keyspace, std::move(ranges_per_endpoint));
|
||||
streamer->add_tx_ranges(keyspace, std::move(ranges_per_endpoint));
|
||||
}
|
||||
auto sp = make_lw_shared<streaming::stream_plan>("Unbootstrap");
|
||||
for (auto& entry : sessions_to_stream_by_keyspace) {
|
||||
const auto& keyspace_name = entry.first;
|
||||
// TODO: we can move to avoid copy of std::vector
|
||||
auto& ranges_per_endpoint = entry.second;
|
||||
|
||||
for (auto& ranges_entry : ranges_per_endpoint) {
|
||||
auto& ranges = ranges_entry.second;
|
||||
auto new_endpoint = ranges_entry.first;
|
||||
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
|
||||
sp->transfer_ranges(new_endpoint, keyspace_name, ranges);
|
||||
}
|
||||
}
|
||||
return sp->execute().discard_result().then([sp] {
|
||||
return streamer->stream_async().then([streamer] {
|
||||
slogger.info("stream_ranges successful");
|
||||
}).handle_exception([] (auto ep) {
|
||||
slogger.info("stream_ranges failed: {}", ep);
|
||||
return make_exception_future(std::runtime_error("stream_ranges failed"));
|
||||
slogger.warn("stream_ranges failed: {}", ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2728,16 +2698,18 @@ future<> storage_service::stream_hints() {
|
||||
// stream all hints -- range list will be a singleton of "the entire ring"
|
||||
dht::token_range_vector ranges = {dht::token_range::make_open_ended_both_sides()};
|
||||
slogger.debug("stream_hints: ranges={}", ranges);
|
||||
std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint;
|
||||
ranges_per_endpoint[hints_destination_host] = std::move(ranges);
|
||||
|
||||
auto sp = make_lw_shared<streaming::stream_plan>("Hints");
|
||||
std::vector<sstring> column_families = { db::system_keyspace::HINTS };
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, get_token_metadata(), get_broadcast_address(), "Hints");
|
||||
auto keyspace = db::system_keyspace::NAME;
|
||||
sp->transfer_ranges(hints_destination_host, keyspace, ranges, column_families);
|
||||
return sp->execute().discard_result().then([sp] {
|
||||
std::vector<sstring> column_families = { db::system_keyspace::HINTS };
|
||||
streamer->add_tx_ranges(keyspace, std::move(ranges_per_endpoint), column_families);
|
||||
return streamer->stream_async().then([streamer] {
|
||||
slogger.info("stream_hints successful");
|
||||
}).handle_exception([] (auto ep) {
|
||||
slogger.info("stream_hints failed: {}", ep);
|
||||
return make_exception_future(std::runtime_error("stream_hints failed"));
|
||||
slogger.warn("stream_hints failed: {}", ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user