repair: move next_repair_command to repair_module

Number of the repair operation was counted both with
next_repair_command from tracer and sequence number
from task_manager::module.

To get rid of redundancy next_repair_command was deleted and all
methods using its value were moved to repair_module.
This commit is contained in:
Aleksandra Martyniuk
2022-09-20 10:39:52 +02:00
parent c81260fb8b
commit a5c05dcb60
3 changed files with 28 additions and 27 deletions

View File

@@ -370,32 +370,35 @@ void tracker::done(repair_uniq_id id, bool succeeded) {
}
_done_cond.broadcast();
}
repair_status tracker::get(int id) const {
if (id >= _next_repair_command) {
repair_status repair_module::get(int id) const {
if (id > _sequence_number) {
throw std::runtime_error(format("unknown repair id {}", id));
}
auto it = _status.find(id);
if (it == _status.end()) {
auto& tracker = _rs.repair_tracker();
auto it = tracker._status.find(id);
if (it == tracker._status.end()) {
return repair_status::SUCCESSFUL;
} else {
return it->second;
}
}
future<repair_status> tracker::repair_await_completion(int id, std::chrono::steady_clock::time_point timeout) {
return seastar::with_gate(_gate, [this, id, timeout] {
if (id >= _next_repair_command) {
future<repair_status> repair_module::repair_await_completion(int id, std::chrono::steady_clock::time_point timeout) {
return seastar::with_gate(_rs.repair_tracker()._gate, [this, id, timeout] {
if (id > _sequence_number) {
return make_exception_future<repair_status>(std::runtime_error(format("unknown repair id {}", id)));
}
return repeat_until_value([this, id, timeout] {
auto it = _status.find(id);
if (it == _status.end()) {
auto& tracker = _rs.repair_tracker();
auto it = tracker._status.find(id);
if (it == tracker._status.end()) {
return make_ready_future<std::optional<repair_status>>(repair_status::SUCCESSFUL);
} else {
if (it->second == repair_status::FAILED) {
return make_ready_future<std::optional<repair_status>>(repair_status::FAILED);
} else {
return _done_cond.wait(timeout).then([] {
return tracker._done_cond.wait(timeout).then([] {
return make_ready_future<std::optional<repair_status>>(std::nullopt);
}).handle_exception_type([] (condition_variable_timed_out&) {
return make_ready_future<std::optional<repair_status>>(repair_status::RUNNING);
@@ -406,10 +409,6 @@ future<repair_status> tracker::repair_await_completion(int id, std::chrono::stea
});
}
repair_uniq_id tracker::next_repair_command() {
return repair_uniq_id{_next_repair_command++, tasks::task_info{tasks::task_id::create_random_id(), this_shard_id()}};
}
future<> tracker::shutdown() {
_shutdown.store(true, std::memory_order_relaxed);
return _gate.close();
@@ -1031,7 +1030,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
// that "Nothing to repair for keyspace '...'". We don't have such a case
// yet. The id field of repair_uniq_ids returned by next_repair_command()
// will be >= 1.
auto id = repair_tracker().next_repair_command();
auto id = _repair_module->new_repair_uniq_id();
rlogger.info("repair[{}]: starting user-requested repair for keyspace {}, repair id {}, options {}", id.uuid(), keyspace, id.id, options_map);
if (!_gossiper.local().is_normal(utils::fb_utilities::get_broadcast_address())) {
@@ -1260,13 +1259,13 @@ future<std::vector<int>> repair_service::get_active_repairs() {
future<repair_status> repair_service::get_status(int id) {
return container().invoke_on(0, [id] (repair_service& rs) {
return rs.repair_tracker().get(id);
return rs.get_repair_module().get(id);
});
}
future<repair_status> repair_service::await_completion(int id, std::chrono::steady_clock::time_point timeout) {
return container().invoke_on(0, [id, timeout] (repair_service& rs) {
return rs.repair_tracker().repair_await_completion(id, timeout);
return rs.get_repair_module().repair_await_completion(id, timeout);
});
}
@@ -1303,7 +1302,7 @@ future<> repair_service::do_sync_data_using_repair(
shared_ptr<node_ops_info> ops_info) {
seastar::sharded<replica::database>& db = get_db();
repair_uniq_id id = repair_tracker().next_repair_command();
repair_uniq_id id = get_repair_module().new_repair_uniq_id();
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace);
return repair_tracker().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
auto cfs = list_column_families(db.local(), keyspace);

View File

@@ -30,6 +30,10 @@
#include "repair/sync_boundary.hh"
#include "tasks/types.hh"
namespace tasks {
class repair_module;
}
namespace replica {
class database;
}
@@ -234,15 +238,11 @@ public:
// be queried about more than once (FIXME: reconsider this. But note that
// failed repairs should be rare anwyay).
// This object is not thread safe, and must be used by only one cpu.
// TODO: track the repair tasks entirely by the repair module
class tracker {
private:
// Each repair_start() call returns a unique int which the user can later
// use to follow the status of this repair with repair_status().
// We can't use the number 0 - if repair_start() returns 0, it means it
// decide quickly that there is nothing to repair.
int _next_repair_command = 1;
// Note that there are no "SUCCESSFUL" entries in the "status" map:
// Successfully-finished repairs are those with id < _next_repair_command
// Successfully-finished repairs are those with id <= repair_module::_sequence_number
// but aren't listed as running or failed the status map.
std::unordered_map<int, repair_status> _status;
// Used to allow shutting down repairs in progress, and waiting for them.
@@ -262,8 +262,6 @@ private:
void done(repair_uniq_id id, bool succeeded);
public:
explicit tracker(size_t max_repair_memory);
repair_status get(int id) const;
repair_uniq_id next_repair_command();
future<> shutdown();
void check_in_shutdown();
void add_repair_info(int id, lw_shared_ptr<repair_info> ri);
@@ -275,9 +273,10 @@ public:
named_semaphore& range_parallelism_semaphore();
static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; }
future<> run(repair_uniq_id id, std::function<void ()> func);
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
float report_progress(streaming::stream_reason reason);
bool is_aborted(const tasks::task_id& uuid);
friend class repair_module;
};
future<uint64_t> estimate_partitions(seastar::sharded<replica::database>& db, const sstring& keyspace,

View File

@@ -44,4 +44,7 @@ public:
.task_info = tasks::task_info(tasks::task_id::create_random_id(), this_shard_id())
};
}
repair_status get(int id) const;
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
};