repair: Use more stream_plan
In the very beginning, we use a stream_plan for each checksum range. Later, we changed to use a single stream_plan for all the checksum ranges. It pushes memory presure to streaming, e.g., millinons of ranges in a vector to send over RPC. To fix, we do checksum and streaming in parallel, limit the number of checksum ranges stored in memory. Fixes #2430
This commit is contained in:
@@ -56,12 +56,20 @@ public:
|
||||
std::vector<sstring> data_centers;
|
||||
std::vector<sstring> hosts;
|
||||
std::vector<failed_range> failed_ranges;
|
||||
streaming::stream_plan sp_in;
|
||||
streaming::stream_plan sp_out;
|
||||
// Map of peer -> <cf, ranges>
|
||||
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_in;
|
||||
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_out;
|
||||
// FIXME: this "100" needs to be a parameter.
|
||||
uint64_t target_partitions = 100;
|
||||
// FIXME: this "10 * 1024 * 1024" needs to be a parameter.
|
||||
size_t sub_ranges_max = 10 * 1024 * 1024;
|
||||
// This affects how many ranges we put in a stream plan. The more the more
|
||||
// memory we use to store the ranges in memory. However, it can reduce the
|
||||
// total number of stream_plan we use for the repair.
|
||||
size_t sub_ranges_to_stream = 1 * 1024;
|
||||
size_t sp_index = 0;
|
||||
size_t current_sub_ranges_nr_in = 0;
|
||||
size_t current_sub_ranges_nr_out = 0;
|
||||
public:
|
||||
repair_info(seastar::sharded<database>& db_,
|
||||
const sstring& keyspace_,
|
||||
@@ -76,14 +84,45 @@ public:
|
||||
, cfs(cfs_)
|
||||
, id(id_)
|
||||
, data_centers(data_centers_)
|
||||
, hosts(hosts_)
|
||||
, sp_in(streaming::stream_plan(sprint("repair-in-%d", id)))
|
||||
, sp_out(streaming::stream_plan(sprint("repair-out-%d", id))) {
|
||||
|
||||
, hosts(hosts_) {
|
||||
}
|
||||
future<> do_streaming() {
|
||||
return sp_in.execute().discard_result().then([this] {
|
||||
return sp_out.execute().discard_result();
|
||||
size_t ranges_in = 0;
|
||||
size_t ranges_out = 0;
|
||||
auto sp_in = make_lw_shared<streaming::stream_plan>(sprint("repair-in-%d-index-%d", id, sp_index));
|
||||
auto sp_out = make_lw_shared<streaming::stream_plan>(sprint("repair-out-%d-index-%d", id, sp_index));
|
||||
|
||||
for (auto& x : ranges_need_repair_in) {
|
||||
auto& peer = x.first;
|
||||
for (auto& y : x.second) {
|
||||
auto& cf = y.first;
|
||||
auto& stream_ranges = y.second;
|
||||
ranges_in += stream_ranges.size();
|
||||
sp_in->request_ranges(peer, keyspace, std::move(stream_ranges), {cf});
|
||||
}
|
||||
}
|
||||
ranges_need_repair_in.clear();
|
||||
current_sub_ranges_nr_in = 0;
|
||||
|
||||
for (auto& x : ranges_need_repair_out) {
|
||||
auto& peer = x.first;
|
||||
for (auto& y : x.second) {
|
||||
auto& cf = y.first;
|
||||
auto& stream_ranges = y.second;
|
||||
ranges_out += stream_ranges.size();
|
||||
sp_out->transfer_ranges(peer, keyspace, std::move(stream_ranges), {cf});
|
||||
}
|
||||
}
|
||||
ranges_need_repair_out.clear();
|
||||
current_sub_ranges_nr_out = 0;
|
||||
|
||||
if (ranges_in || ranges_out) {
|
||||
rlogger.info("Start streaming for repair {} index {}, ranges_in={}, ranges_out={}", id, sp_index, ranges_in, ranges_out);
|
||||
}
|
||||
sp_index++;
|
||||
|
||||
return sp_in->execute().discard_result().then([sp_in, sp_out] {
|
||||
return sp_out->execute().discard_result();
|
||||
}).handle_exception([] (auto ep) {
|
||||
rlogger.warn("repair's stream failed: {}", ep);
|
||||
return make_exception_future(ep);
|
||||
@@ -101,16 +140,22 @@ public:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
void request_transfer_ranges(const sstring& cf,
|
||||
future<> request_transfer_ranges(const sstring& cf,
|
||||
const ::dht::token_range& range,
|
||||
const std::vector<gms::inet_address>& neighbors_in,
|
||||
const std::vector<gms::inet_address>& neighbors_out) {
|
||||
for (const auto& peer : neighbors_in) {
|
||||
sp_in.request_ranges(peer, keyspace, {range}, {cf});
|
||||
ranges_need_repair_in[peer][cf].emplace_back(range);
|
||||
current_sub_ranges_nr_in++;
|
||||
}
|
||||
for (const auto& peer : neighbors_out) {
|
||||
sp_out.transfer_ranges(peer, keyspace, {range}, {cf});
|
||||
ranges_need_repair_out[peer][cf].emplace_back(range);
|
||||
current_sub_ranges_nr_out++;
|
||||
}
|
||||
if (current_sub_ranges_nr_in >= sub_ranges_to_stream || current_sub_ranges_nr_out >= sub_ranges_to_stream) {
|
||||
return do_streaming();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -670,8 +715,7 @@ static future<> repair_cf_range(repair_info& ri,
|
||||
if (!(live_neighbors_in.empty() && live_neighbors_out.empty())) {
|
||||
rlogger.info("Found differing range {} on nodes {}, in = {}, out = {}", range,
|
||||
live_neighbors, live_neighbors_in, live_neighbors_out);
|
||||
ri.request_transfer_ranges(cf, range, live_neighbors_in, live_neighbors_out);
|
||||
return make_ready_future<>();
|
||||
return ri.request_transfer_ranges(cf, range, live_neighbors_in, live_neighbors_out);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([&ri, &success, &cf, &range] (std::exception_ptr eptr) {
|
||||
@@ -930,6 +974,8 @@ static future<> repair_ranges(repair_info ri) {
|
||||
check_in_shutdown();
|
||||
return repair_range(ri, range);
|
||||
}).then([&ri] {
|
||||
// Do streaming for the remaining ranges we do not stream in
|
||||
// repair_cf_range
|
||||
return ri.do_streaming();
|
||||
}).then([&ri] {
|
||||
repair_tracker.done(ri.id, ri.check_failed_ranges());
|
||||
|
||||
Reference in New Issue
Block a user