repair: Use more stream_plan

In the very beginning, we use a stream_plan for each checksum range.
Later, we changed to use a single stream_plan for all the checksum
ranges. It pushes memory presure to streaming, e.g., millinons of ranges
in a vector to send over RPC.

To fix, we do checksum and streaming in parallel, limit the number of
checksum ranges stored in memory.

Fixes #2430
This commit is contained in:
Asias He
2017-05-31 11:35:21 +08:00
parent b3ff37e67f
commit 2043ffc064

View File

@@ -56,12 +56,20 @@ public:
std::vector<sstring> data_centers;
std::vector<sstring> hosts;
std::vector<failed_range> failed_ranges;
streaming::stream_plan sp_in;
streaming::stream_plan sp_out;
// Map of peer -> <cf, ranges>
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_in;
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_out;
// FIXME: this "100" needs to be a parameter.
uint64_t target_partitions = 100;
// FIXME: this "10 * 1024 * 1024" needs to be a parameter.
size_t sub_ranges_max = 10 * 1024 * 1024;
// This affects how many ranges we put in a stream plan. The more the more
// memory we use to store the ranges in memory. However, it can reduce the
// total number of stream_plan we use for the repair.
size_t sub_ranges_to_stream = 1 * 1024;
size_t sp_index = 0;
size_t current_sub_ranges_nr_in = 0;
size_t current_sub_ranges_nr_out = 0;
public:
repair_info(seastar::sharded<database>& db_,
const sstring& keyspace_,
@@ -76,14 +84,45 @@ public:
, cfs(cfs_)
, id(id_)
, data_centers(data_centers_)
, hosts(hosts_)
, sp_in(streaming::stream_plan(sprint("repair-in-%d", id)))
, sp_out(streaming::stream_plan(sprint("repair-out-%d", id))) {
, hosts(hosts_) {
}
future<> do_streaming() {
return sp_in.execute().discard_result().then([this] {
return sp_out.execute().discard_result();
size_t ranges_in = 0;
size_t ranges_out = 0;
auto sp_in = make_lw_shared<streaming::stream_plan>(sprint("repair-in-%d-index-%d", id, sp_index));
auto sp_out = make_lw_shared<streaming::stream_plan>(sprint("repair-out-%d-index-%d", id, sp_index));
for (auto& x : ranges_need_repair_in) {
auto& peer = x.first;
for (auto& y : x.second) {
auto& cf = y.first;
auto& stream_ranges = y.second;
ranges_in += stream_ranges.size();
sp_in->request_ranges(peer, keyspace, std::move(stream_ranges), {cf});
}
}
ranges_need_repair_in.clear();
current_sub_ranges_nr_in = 0;
for (auto& x : ranges_need_repair_out) {
auto& peer = x.first;
for (auto& y : x.second) {
auto& cf = y.first;
auto& stream_ranges = y.second;
ranges_out += stream_ranges.size();
sp_out->transfer_ranges(peer, keyspace, std::move(stream_ranges), {cf});
}
}
ranges_need_repair_out.clear();
current_sub_ranges_nr_out = 0;
if (ranges_in || ranges_out) {
rlogger.info("Start streaming for repair {} index {}, ranges_in={}, ranges_out={}", id, sp_index, ranges_in, ranges_out);
}
sp_index++;
return sp_in->execute().discard_result().then([sp_in, sp_out] {
return sp_out->execute().discard_result();
}).handle_exception([] (auto ep) {
rlogger.warn("repair's stream failed: {}", ep);
return make_exception_future(ep);
@@ -101,16 +140,22 @@ public:
return false;
}
}
void request_transfer_ranges(const sstring& cf,
future<> request_transfer_ranges(const sstring& cf,
const ::dht::token_range& range,
const std::vector<gms::inet_address>& neighbors_in,
const std::vector<gms::inet_address>& neighbors_out) {
for (const auto& peer : neighbors_in) {
sp_in.request_ranges(peer, keyspace, {range}, {cf});
ranges_need_repair_in[peer][cf].emplace_back(range);
current_sub_ranges_nr_in++;
}
for (const auto& peer : neighbors_out) {
sp_out.transfer_ranges(peer, keyspace, {range}, {cf});
ranges_need_repair_out[peer][cf].emplace_back(range);
current_sub_ranges_nr_out++;
}
if (current_sub_ranges_nr_in >= sub_ranges_to_stream || current_sub_ranges_nr_out >= sub_ranges_to_stream) {
return do_streaming();
}
return make_ready_future<>();
}
};
@@ -670,8 +715,7 @@ static future<> repair_cf_range(repair_info& ri,
if (!(live_neighbors_in.empty() && live_neighbors_out.empty())) {
rlogger.info("Found differing range {} on nodes {}, in = {}, out = {}", range,
live_neighbors, live_neighbors_in, live_neighbors_out);
ri.request_transfer_ranges(cf, range, live_neighbors_in, live_neighbors_out);
return make_ready_future<>();
return ri.request_transfer_ranges(cf, range, live_neighbors_in, live_neighbors_out);
}
return make_ready_future<>();
}).handle_exception([&ri, &success, &cf, &range] (std::exception_ptr eptr) {
@@ -930,6 +974,8 @@ static future<> repair_ranges(repair_info ri) {
check_in_shutdown();
return repair_range(ri, range);
}).then([&ri] {
// Do streaming for the remaining ranges we do not stream in
// repair_cf_range
return ri.do_streaming();
}).then([&ri] {
repair_tracker.done(ri.id, ri.check_failed_ranges());