diff --git a/repair/repair.cc b/repair/repair.cc index 34c751eb4f..3304cffc73 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -370,32 +370,35 @@ void tracker::done(repair_uniq_id id, bool succeeded) { } _done_cond.broadcast(); } -repair_status tracker::get(int id) const { - if (id >= _next_repair_command) { + +repair_status repair_module::get(int id) const { + if (id > _sequence_number) { throw std::runtime_error(format("unknown repair id {}", id)); } - auto it = _status.find(id); - if (it == _status.end()) { + auto& tracker = _rs.repair_tracker(); + auto it = tracker._status.find(id); + if (it == tracker._status.end()) { return repair_status::SUCCESSFUL; } else { return it->second; } } -future tracker::repair_await_completion(int id, std::chrono::steady_clock::time_point timeout) { - return seastar::with_gate(_gate, [this, id, timeout] { - if (id >= _next_repair_command) { +future repair_module::repair_await_completion(int id, std::chrono::steady_clock::time_point timeout) { + return seastar::with_gate(_rs.repair_tracker()._gate, [this, id, timeout] { + if (id > _sequence_number) { return make_exception_future(std::runtime_error(format("unknown repair id {}", id))); } return repeat_until_value([this, id, timeout] { - auto it = _status.find(id); - if (it == _status.end()) { + auto& tracker = _rs.repair_tracker(); + auto it = tracker._status.find(id); + if (it == tracker._status.end()) { return make_ready_future>(repair_status::SUCCESSFUL); } else { if (it->second == repair_status::FAILED) { return make_ready_future>(repair_status::FAILED); } else { - return _done_cond.wait(timeout).then([] { + return tracker._done_cond.wait(timeout).then([] { return make_ready_future>(std::nullopt); }).handle_exception_type([] (condition_variable_timed_out&) { return make_ready_future>(repair_status::RUNNING); @@ -406,10 +409,6 @@ future tracker::repair_await_completion(int id, std::chrono::stea }); } -repair_uniq_id tracker::next_repair_command() { - return repair_uniq_id{_next_repair_command++, tasks::task_info{tasks::task_id::create_random_id(), this_shard_id()}}; -} - future<> tracker::shutdown() { _shutdown.store(true, std::memory_order_relaxed); return _gate.close(); @@ -1031,7 +1030,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map= 1. - auto id = repair_tracker().next_repair_command(); + auto id = _repair_module->new_repair_uniq_id(); rlogger.info("repair[{}]: starting user-requested repair for keyspace {}, repair id {}, options {}", id.uuid(), keyspace, id.id, options_map); if (!_gossiper.local().is_normal(utils::fb_utilities::get_broadcast_address())) { @@ -1260,13 +1259,13 @@ future> repair_service::get_active_repairs() { future repair_service::get_status(int id) { return container().invoke_on(0, [id] (repair_service& rs) { - return rs.repair_tracker().get(id); + return rs.get_repair_module().get(id); }); } future repair_service::await_completion(int id, std::chrono::steady_clock::time_point timeout) { return container().invoke_on(0, [id, timeout] (repair_service& rs) { - return rs.repair_tracker().repair_await_completion(id, timeout); + return rs.get_repair_module().repair_await_completion(id, timeout); }); } @@ -1303,7 +1302,7 @@ future<> repair_service::do_sync_data_using_repair( shared_ptr ops_info) { seastar::sharded& db = get_db(); - repair_uniq_id id = repair_tracker().next_repair_command(); + repair_uniq_id id = get_repair_module().new_repair_uniq_id(); rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace); return repair_tracker().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable { auto cfs = list_column_families(db.local(), keyspace); diff --git a/repair/repair.hh b/repair/repair.hh index 1dec55dbd2..4a6b417c49 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -30,6 +30,10 @@ #include "repair/sync_boundary.hh" #include "tasks/types.hh" +namespace tasks { +class repair_module; +} + namespace replica { class database; } @@ -234,15 +238,11 @@ public: // be queried about more than once (FIXME: reconsider this. But note that // failed repairs should be rare anwyay). // This object is not thread safe, and must be used by only one cpu. +// TODO: track the repair tasks entirely by the repair module class tracker { private: - // Each repair_start() call returns a unique int which the user can later - // use to follow the status of this repair with repair_status(). - // We can't use the number 0 - if repair_start() returns 0, it means it - // decide quickly that there is nothing to repair. - int _next_repair_command = 1; // Note that there are no "SUCCESSFUL" entries in the "status" map: - // Successfully-finished repairs are those with id < _next_repair_command + // Successfully-finished repairs are those with id <= repair_module::_sequence_number // but aren't listed as running or failed the status map. std::unordered_map _status; // Used to allow shutting down repairs in progress, and waiting for them. @@ -262,8 +262,6 @@ private: void done(repair_uniq_id id, bool succeeded); public: explicit tracker(size_t max_repair_memory); - repair_status get(int id) const; - repair_uniq_id next_repair_command(); future<> shutdown(); void check_in_shutdown(); void add_repair_info(int id, lw_shared_ptr ri); @@ -275,9 +273,10 @@ public: named_semaphore& range_parallelism_semaphore(); static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; } future<> run(repair_uniq_id id, std::function func); - future repair_await_completion(int id, std::chrono::steady_clock::time_point timeout); float report_progress(streaming::stream_reason reason); bool is_aborted(const tasks::task_id& uuid); + + friend class repair_module; }; future estimate_partitions(seastar::sharded& db, const sstring& keyspace, diff --git a/repair/repair_task.hh b/repair/repair_task.hh index ab5fd3e2b0..6e0e61c925 100644 --- a/repair/repair_task.hh +++ b/repair/repair_task.hh @@ -44,4 +44,7 @@ public: .task_info = tasks::task_info(tasks::task_id::create_random_id(), this_shard_id()) }; } + + repair_status get(int id) const; + future repair_await_completion(int id, std::chrono::steady_clock::time_point timeout); };