From e62aeae2dbb980fe4cb71574689eef62023510d3 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 31 Jul 2018 17:46:35 +0800 Subject: [PATCH] repair: Export repair_info It will be used by the row level repair soon. --- repair/repair.cc | 244 +++++++++++++++++++++-------------------------- repair/repair.hh | 49 +++++++++- 2 files changed, 157 insertions(+), 136 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 5fa84e52c2..0c6f4e56a2 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -46,141 +46,6 @@ static logging::logger rlogger("repair"); -class repair_info { -public: - seastar::sharded& db; - sstring keyspace; - dht::token_range_vector ranges; - std::vector cfs; - int id; - shard_id shard; - std::vector data_centers; - std::vector hosts; - size_t nr_failed_ranges = 0; - bool aborted = false; - // Map of peer -> - std::unordered_map> ranges_need_repair_in; - std::unordered_map> ranges_need_repair_out; - // FIXME: this "100" needs to be a parameter. - uint64_t target_partitions = 100; - // This affects how many ranges we put in a stream plan. The more the more - // memory we use to store the ranges in memory. However, it can reduce the - // total number of stream_plan we use for the repair. - size_t sub_ranges_to_stream = 10 * 1024; - size_t sp_index = 0; - size_t current_sub_ranges_nr_in = 0; - size_t current_sub_ranges_nr_out = 0; - int ranges_index = 0; - // Only allow one stream_plan in flight - semaphore sp_parallelism_semaphore{1}; - lw_shared_ptr _sp_in; - lw_shared_ptr _sp_out; -public: - repair_info(seastar::sharded& db_, - const sstring& keyspace_, - const dht::token_range_vector& ranges_, - const std::vector& cfs_, - int id_, - const std::vector& data_centers_, - const std::vector& hosts_) - : db(db_) - , keyspace(keyspace_) - , ranges(ranges_) - , cfs(cfs_) - , id(id_) - , shard(engine().cpu_id()) - , data_centers(data_centers_) - , hosts(hosts_) { - } - future<> do_streaming() { - size_t ranges_in = 0; - size_t ranges_out = 0; - _sp_in = make_lw_shared(format("repair-in-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair); - _sp_out = make_lw_shared(format("repair-out-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair); - - for (auto& x : ranges_need_repair_in) { - auto& peer = x.first; - for (auto& y : x.second) { - auto& cf = y.first; - auto& stream_ranges = y.second; - ranges_in += stream_ranges.size(); - _sp_in->request_ranges(peer, keyspace, std::move(stream_ranges), {cf}); - } - } - ranges_need_repair_in.clear(); - current_sub_ranges_nr_in = 0; - - for (auto& x : ranges_need_repair_out) { - auto& peer = x.first; - for (auto& y : x.second) { - auto& cf = y.first; - auto& stream_ranges = y.second; - ranges_out += stream_ranges.size(); - _sp_out->transfer_ranges(peer, keyspace, std::move(stream_ranges), {cf}); - } - } - ranges_need_repair_out.clear(); - current_sub_ranges_nr_out = 0; - - if (ranges_in || ranges_out) { - rlogger.info("Start streaming for repair id={}, shard={}, index={}, ranges_in={}, ranges_out={}", id, shard, sp_index, ranges_in, ranges_out); - } - sp_index++; - - return _sp_in->execute().discard_result().then([this, sp_in = _sp_in, sp_out = _sp_out] { - return _sp_out->execute().discard_result(); - }).handle_exception([this] (auto ep) { - rlogger.warn("repair's stream failed: {}", ep); - return make_exception_future(ep); - }).finally([this] { - _sp_in = {}; - _sp_out = {}; - }); - } - void check_failed_ranges() { - if (nr_failed_ranges) { - rlogger.info("repair {} on shard {} failed - {} ranges failed", id, shard, nr_failed_ranges); - throw std::runtime_error(format("repair {:d} on shard {:d} failed to do checksum for {:d} sub ranges", id, shard, nr_failed_ranges)); - } else { - rlogger.info("repair {} on shard {} completed successfully", id, shard); - } - } - future<> request_transfer_ranges(const sstring& cf, - const ::dht::token_range& range, - const std::vector& neighbors_in, - const std::vector& neighbors_out) { - rlogger.debug("Add cf {}, range {}, current_sub_ranges_nr_in {}, current_sub_ranges_nr_out {}", cf, range, current_sub_ranges_nr_in, current_sub_ranges_nr_out); - return seastar::with_semaphore(sp_parallelism_semaphore, 1, [this, cf, range, neighbors_in, neighbors_out] { - for (const auto& peer : neighbors_in) { - ranges_need_repair_in[peer][cf].emplace_back(range); - current_sub_ranges_nr_in++; - } - for (const auto& peer : neighbors_out) { - ranges_need_repair_out[peer][cf].emplace_back(range); - current_sub_ranges_nr_out++; - } - if (current_sub_ranges_nr_in >= sub_ranges_to_stream || current_sub_ranges_nr_out >= sub_ranges_to_stream) { - return do_streaming(); - } - return make_ready_future<>(); - }); - } - void abort() { - if (_sp_in) { - _sp_in->abort(); - } - if (_sp_out) { - _sp_out->abort(); - } - aborted = true; - } - void check_in_abort() { - if (aborted) { - throw std::runtime_error(format("repair id {:d} is aborted on shard {:d}", id, shard)); - } - } -}; - template inline static std::ostream& operator<<(std::ostream& os, const std::unordered_map& v) { @@ -774,6 +639,115 @@ future estimate_partitions(seastar::sharded& db, const sstri ); } +repair_info::repair_info(seastar::sharded& db_, + const sstring& keyspace_, + const dht::token_range_vector& ranges_, + const std::vector& cfs_, + int id_, + const std::vector& data_centers_, + const std::vector& hosts_) + : db(db_) + , keyspace(keyspace_) + , ranges(ranges_) + , cfs(cfs_) + , id(id_) + , shard(engine().cpu_id()) + , data_centers(data_centers_) + , hosts(hosts_) { +} + +future<> repair_info::do_streaming() { + size_t ranges_in = 0; + size_t ranges_out = 0; + _sp_in = make_lw_shared(format("repair-in-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair); + _sp_out = make_lw_shared(format("repair-out-id-{:d}-shard-{:d}-index-{:d}", id, shard, sp_index), streaming::stream_reason::repair); + + for (auto& x : ranges_need_repair_in) { + auto& peer = x.first; + for (auto& y : x.second) { + auto& cf = y.first; + auto& stream_ranges = y.second; + ranges_in += stream_ranges.size(); + _sp_in->request_ranges(peer, keyspace, std::move(stream_ranges), {cf}); + } + } + ranges_need_repair_in.clear(); + current_sub_ranges_nr_in = 0; + + for (auto& x : ranges_need_repair_out) { + auto& peer = x.first; + for (auto& y : x.second) { + auto& cf = y.first; + auto& stream_ranges = y.second; + ranges_out += stream_ranges.size(); + _sp_out->transfer_ranges(peer, keyspace, std::move(stream_ranges), {cf}); + } + } + ranges_need_repair_out.clear(); + current_sub_ranges_nr_out = 0; + + if (ranges_in || ranges_out) { + rlogger.info("Start streaming for repair id={}, shard={}, index={}, ranges_in={}, ranges_out={}", id, shard, sp_index, ranges_in, ranges_out); + } + sp_index++; + + return _sp_in->execute().discard_result().then([this, sp_in = _sp_in, sp_out = _sp_out] { + return _sp_out->execute().discard_result(); + }).handle_exception([this] (auto ep) { + rlogger.warn("repair's stream failed: {}", ep); + return make_exception_future(ep); + }).finally([this] { + _sp_in = {}; + _sp_out = {}; + }); +} + +void repair_info::check_failed_ranges() { + if (nr_failed_ranges) { + rlogger.info("repair {} on shard {} failed - {} ranges failed", id, shard, nr_failed_ranges); + throw std::runtime_error(format("repair {:d} on shard {:d} failed to do checksum for {:d} sub ranges", id, shard, nr_failed_ranges)); + } else { + rlogger.info("repair {} on shard {} completed successfully", id, shard); + } +} + +future<> repair_info::request_transfer_ranges(const sstring& cf, + const ::dht::token_range& range, + const std::vector& neighbors_in, + const std::vector& neighbors_out) { + rlogger.debug("Add cf {}, range {}, current_sub_ranges_nr_in {}, current_sub_ranges_nr_out {}", cf, range, current_sub_ranges_nr_in, current_sub_ranges_nr_out); + return seastar::with_semaphore(sp_parallelism_semaphore, 1, [this, cf, range, neighbors_in, neighbors_out] { + for (const auto& peer : neighbors_in) { + ranges_need_repair_in[peer][cf].emplace_back(range); + current_sub_ranges_nr_in++; + } + for (const auto& peer : neighbors_out) { + ranges_need_repair_out[peer][cf].emplace_back(range); + current_sub_ranges_nr_out++; + } + if (current_sub_ranges_nr_in >= sub_ranges_to_stream || current_sub_ranges_nr_out >= sub_ranges_to_stream) { + return do_streaming(); + } + return make_ready_future<>(); + }); +} + +void repair_info::abort() { + if (_sp_in) { + _sp_in->abort(); + } + if (_sp_out) { + _sp_out->abort(); + } + aborted = true; +} + +void repair_info::check_in_abort() { + if (aborted) { + throw std::runtime_error(format("repair id {:d} is aborted on shard {:d}", id, shard)); + } +} + // Repair a single cf in a single local range. // Comparable to RepairJob in Origin. static future<> repair_cf_range(repair_info& ri, diff --git a/repair/repair.hh b/repair/repair.hh index 37fa90513c..4bffdaadc2 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -30,7 +30,7 @@ #include "database.hh" #include "utils/UUID.hh" - +#include "streaming/stream_plan.hh" class repair_exception : public std::exception { private: @@ -115,6 +115,53 @@ future checksum_range(seastar::sharded &db, const sstring& keyspace, const sstring& cf, const ::dht::token_range& range, repair_checksum rt); +class repair_info { +public: + seastar::sharded& db; + sstring keyspace; + dht::token_range_vector ranges; + std::vector cfs; + int id; + shard_id shard; + std::vector data_centers; + std::vector hosts; + size_t nr_failed_ranges = 0; + bool aborted = false; + // Map of peer -> + std::unordered_map> ranges_need_repair_in; + std::unordered_map> ranges_need_repair_out; + // FIXME: this "100" needs to be a parameter. + uint64_t target_partitions = 100; + // This affects how many ranges we put in a stream plan. The more the more + // memory we use to store the ranges in memory. However, it can reduce the + // total number of stream_plan we use for the repair. + size_t sub_ranges_to_stream = 10 * 1024; + size_t sp_index = 0; + size_t current_sub_ranges_nr_in = 0; + size_t current_sub_ranges_nr_out = 0; + int ranges_index = 0; + // Only allow one stream_plan in flight + semaphore sp_parallelism_semaphore{1}; + lw_shared_ptr _sp_in; + lw_shared_ptr _sp_out; +public: + repair_info(seastar::sharded& db_, + const sstring& keyspace_, + const dht::token_range_vector& ranges_, + const std::vector& cfs_, + int id_, + const std::vector& data_centers_, + const std::vector& hosts_); + future<> do_streaming(); + void check_failed_ranges(); + future<> request_transfer_ranges(const sstring& cf, + const ::dht::token_range& range, + const std::vector& neighbors_in, + const std::vector& neighbors_out); + void abort(); + void check_in_abort(); +}; + future estimate_partitions(seastar::sharded& db, const sstring& keyspace, const sstring& cf, const dht::token_range& range);