repair: Export repair_info

It will be used by the row level repair soon.
This commit is contained in:
Asias He
2018-07-31 17:46:35 +08:00
parent 6be3b35d52
commit e62aeae2db
2 changed files with 157 additions and 136 deletions

View File

@@ -46,141 +46,6 @@
static logging::logger rlogger("repair");
class repair_info {
public:
seastar::sharded<database>& db;
sstring keyspace;
dht::token_range_vector ranges;
std::vector<sstring> cfs;
int id;
shard_id shard;
std::vector<sstring> data_centers;
std::vector<sstring> hosts;
size_t nr_failed_ranges = 0;
bool aborted = false;
// Map of peer -> <cf, ranges>
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_in;
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_out;
// FIXME: this "100" needs to be a parameter.
uint64_t target_partitions = 100;
// This affects how many ranges we put in a stream plan. The more the more
// memory we use to store the ranges in memory. However, it can reduce the
// total number of stream_plan we use for the repair.
size_t sub_ranges_to_stream = 10 * 1024;
size_t sp_index = 0;
size_t current_sub_ranges_nr_in = 0;
size_t current_sub_ranges_nr_out = 0;
int ranges_index = 0;
// Only allow one stream_plan in flight
semaphore sp_parallelism_semaphore{1};
lw_shared_ptr<streaming::stream_plan> _sp_in;
lw_shared_ptr<streaming::stream_plan> _sp_out;
public:
repair_info(seastar::sharded<database>& db_,
const sstring& keyspace_,
const dht::token_range_vector& ranges_,
const std::vector<sstring>& cfs_,
int id_,
const std::vector<sstring>& data_centers_,
const std::vector<sstring>& hosts_)
: db(db_)
, keyspace(keyspace_)
, ranges(ranges_)
, cfs(cfs_)
, id(id_)
, shard(engine().cpu_id())
, data_centers(data_centers_)
, hosts(hosts_) {
}
future<> do_streaming() {
size_t ranges_in = 0;
size_t ranges_out = 0;
_sp_in = make_lw_shared<streaming::stream_plan>(format("repair-in-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair);
_sp_out = make_lw_shared<streaming::stream_plan>(format("repair-out-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair);
for (auto& x : ranges_need_repair_in) {
auto& peer = x.first;
for (auto& y : x.second) {
auto& cf = y.first;
auto& stream_ranges = y.second;
ranges_in += stream_ranges.size();
_sp_in->request_ranges(peer, keyspace, std::move(stream_ranges), {cf});
}
}
ranges_need_repair_in.clear();
current_sub_ranges_nr_in = 0;
for (auto& x : ranges_need_repair_out) {
auto& peer = x.first;
for (auto& y : x.second) {
auto& cf = y.first;
auto& stream_ranges = y.second;
ranges_out += stream_ranges.size();
_sp_out->transfer_ranges(peer, keyspace, std::move(stream_ranges), {cf});
}
}
ranges_need_repair_out.clear();
current_sub_ranges_nr_out = 0;
if (ranges_in || ranges_out) {
rlogger.info("Start streaming for repair id={}, shard={}, index={}, ranges_in={}, ranges_out={}", id, shard, sp_index, ranges_in, ranges_out);
}
sp_index++;
return _sp_in->execute().discard_result().then([this, sp_in = _sp_in, sp_out = _sp_out] {
return _sp_out->execute().discard_result();
}).handle_exception([this] (auto ep) {
rlogger.warn("repair's stream failed: {}", ep);
return make_exception_future(ep);
}).finally([this] {
_sp_in = {};
_sp_out = {};
});
}
void check_failed_ranges() {
if (nr_failed_ranges) {
rlogger.info("repair {} on shard {} failed - {} ranges failed", id, shard, nr_failed_ranges);
throw std::runtime_error(format("repair {:d} on shard {:d} failed to do checksum for {:d} sub ranges", id, shard, nr_failed_ranges));
} else {
rlogger.info("repair {} on shard {} completed successfully", id, shard);
}
}
future<> request_transfer_ranges(const sstring& cf,
const ::dht::token_range& range,
const std::vector<gms::inet_address>& neighbors_in,
const std::vector<gms::inet_address>& neighbors_out) {
rlogger.debug("Add cf {}, range {}, current_sub_ranges_nr_in {}, current_sub_ranges_nr_out {}", cf, range, current_sub_ranges_nr_in, current_sub_ranges_nr_out);
return seastar::with_semaphore(sp_parallelism_semaphore, 1, [this, cf, range, neighbors_in, neighbors_out] {
for (const auto& peer : neighbors_in) {
ranges_need_repair_in[peer][cf].emplace_back(range);
current_sub_ranges_nr_in++;
}
for (const auto& peer : neighbors_out) {
ranges_need_repair_out[peer][cf].emplace_back(range);
current_sub_ranges_nr_out++;
}
if (current_sub_ranges_nr_in >= sub_ranges_to_stream || current_sub_ranges_nr_out >= sub_ranges_to_stream) {
return do_streaming();
}
return make_ready_future<>();
});
}
void abort() {
if (_sp_in) {
_sp_in->abort();
}
if (_sp_out) {
_sp_out->abort();
}
aborted = true;
}
void check_in_abort() {
if (aborted) {
throw std::runtime_error(format("repair id {:d} is aborted on shard {:d}", id, shard));
}
}
};
template <typename T1, typename T2>
inline
static std::ostream& operator<<(std::ostream& os, const std::unordered_map<T1, T2>& v) {
@@ -774,6 +639,115 @@ future<uint64_t> estimate_partitions(seastar::sharded<database>& db, const sstri
);
}
repair_info::repair_info(seastar::sharded<database>& db_,
const sstring& keyspace_,
const dht::token_range_vector& ranges_,
const std::vector<sstring>& cfs_,
int id_,
const std::vector<sstring>& data_centers_,
const std::vector<sstring>& hosts_)
: db(db_)
, keyspace(keyspace_)
, ranges(ranges_)
, cfs(cfs_)
, id(id_)
, shard(engine().cpu_id())
, data_centers(data_centers_)
, hosts(hosts_) {
}
future<> repair_info::do_streaming() {
size_t ranges_in = 0;
size_t ranges_out = 0;
_sp_in = make_lw_shared<streaming::stream_plan>(format("repair-in-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair);
_sp_out = make_lw_shared<streaming::stream_plan>(format("repair-out-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair);
for (auto& x : ranges_need_repair_in) {
auto& peer = x.first;
for (auto& y : x.second) {
auto& cf = y.first;
auto& stream_ranges = y.second;
ranges_in += stream_ranges.size();
_sp_in->request_ranges(peer, keyspace, std::move(stream_ranges), {cf});
}
}
ranges_need_repair_in.clear();
current_sub_ranges_nr_in = 0;
for (auto& x : ranges_need_repair_out) {
auto& peer = x.first;
for (auto& y : x.second) {
auto& cf = y.first;
auto& stream_ranges = y.second;
ranges_out += stream_ranges.size();
_sp_out->transfer_ranges(peer, keyspace, std::move(stream_ranges), {cf});
}
}
ranges_need_repair_out.clear();
current_sub_ranges_nr_out = 0;
if (ranges_in || ranges_out) {
rlogger.info("Start streaming for repair id={}, shard={}, index={}, ranges_in={}, ranges_out={}", id, shard, sp_index, ranges_in, ranges_out);
}
sp_index++;
return _sp_in->execute().discard_result().then([this, sp_in = _sp_in, sp_out = _sp_out] {
return _sp_out->execute().discard_result();
}).handle_exception([this] (auto ep) {
rlogger.warn("repair's stream failed: {}", ep);
return make_exception_future(ep);
}).finally([this] {
_sp_in = {};
_sp_out = {};
});
}
void repair_info::check_failed_ranges() {
if (nr_failed_ranges) {
rlogger.info("repair {} on shard {} failed - {} ranges failed", id, shard, nr_failed_ranges);
throw std::runtime_error(format("repair {:d} on shard {:d} failed to do checksum for {:d} sub ranges", id, shard, nr_failed_ranges));
} else {
rlogger.info("repair {} on shard {} completed successfully", id, shard);
}
}
future<> repair_info::request_transfer_ranges(const sstring& cf,
const ::dht::token_range& range,
const std::vector<gms::inet_address>& neighbors_in,
const std::vector<gms::inet_address>& neighbors_out) {
rlogger.debug("Add cf {}, range {}, current_sub_ranges_nr_in {}, current_sub_ranges_nr_out {}", cf, range, current_sub_ranges_nr_in, current_sub_ranges_nr_out);
return seastar::with_semaphore(sp_parallelism_semaphore, 1, [this, cf, range, neighbors_in, neighbors_out] {
for (const auto& peer : neighbors_in) {
ranges_need_repair_in[peer][cf].emplace_back(range);
current_sub_ranges_nr_in++;
}
for (const auto& peer : neighbors_out) {
ranges_need_repair_out[peer][cf].emplace_back(range);
current_sub_ranges_nr_out++;
}
if (current_sub_ranges_nr_in >= sub_ranges_to_stream || current_sub_ranges_nr_out >= sub_ranges_to_stream) {
return do_streaming();
}
return make_ready_future<>();
});
}
void repair_info::abort() {
if (_sp_in) {
_sp_in->abort();
}
if (_sp_out) {
_sp_out->abort();
}
aborted = true;
}
void repair_info::check_in_abort() {
if (aborted) {
throw std::runtime_error(format("repair id {:d} is aborted on shard {:d}", id, shard));
}
}
// Repair a single cf in a single local range.
// Comparable to RepairJob in Origin.
static future<> repair_cf_range(repair_info& ri,

View File

@@ -30,7 +30,7 @@
#include "database.hh"
#include "utils/UUID.hh"
#include "streaming/stream_plan.hh"
class repair_exception : public std::exception {
private:
@@ -115,6 +115,53 @@ future<partition_checksum> checksum_range(seastar::sharded<database> &db,
const sstring& keyspace, const sstring& cf,
const ::dht::token_range& range, repair_checksum rt);
class repair_info {
public:
seastar::sharded<database>& db;
sstring keyspace;
dht::token_range_vector ranges;
std::vector<sstring> cfs;
int id;
shard_id shard;
std::vector<sstring> data_centers;
std::vector<sstring> hosts;
size_t nr_failed_ranges = 0;
bool aborted = false;
// Map of peer -> <cf, ranges>
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_in;
std::unordered_map<gms::inet_address, std::unordered_map<sstring, dht::token_range_vector>> ranges_need_repair_out;
// FIXME: this "100" needs to be a parameter.
uint64_t target_partitions = 100;
// This affects how many ranges we put in a stream plan. The more the more
// memory we use to store the ranges in memory. However, it can reduce the
// total number of stream_plan we use for the repair.
size_t sub_ranges_to_stream = 10 * 1024;
size_t sp_index = 0;
size_t current_sub_ranges_nr_in = 0;
size_t current_sub_ranges_nr_out = 0;
int ranges_index = 0;
// Only allow one stream_plan in flight
semaphore sp_parallelism_semaphore{1};
lw_shared_ptr<streaming::stream_plan> _sp_in;
lw_shared_ptr<streaming::stream_plan> _sp_out;
public:
repair_info(seastar::sharded<database>& db_,
const sstring& keyspace_,
const dht::token_range_vector& ranges_,
const std::vector<sstring>& cfs_,
int id_,
const std::vector<sstring>& data_centers_,
const std::vector<sstring>& hosts_);
future<> do_streaming();
void check_failed_ranges();
future<> request_transfer_ranges(const sstring& cf,
const ::dht::token_range& range,
const std::vector<gms::inet_address>& neighbors_in,
const std::vector<gms::inet_address>& neighbors_out);
void abort();
void check_in_abort();
};
future<uint64_t> estimate_partitions(seastar::sharded<database>& db, const sstring& keyspace,
const sstring& cf, const dht::token_range& range);