Merge 'Task manager top level repair tasks' from Aleksandra Martyniuk

The PR introduces top level repair tasks representing repair and node operations
performed with repair. The actions performed as a part of these operations are
moved to corresponding tasks' run methods.

Also a small change to repair module is added.

Closes #11869

* github.com:scylladb/scylladb:
  repair: define run for data_sync_repair_task_impl
  repair: add data_sync_repair_task_impl
  tasks: repair: add noexcept to task impl constructor
  repair: define run for user_requested_repair_task_impl
  repair: add user_requested_repair_task_impl
  repair: allow direct access to max_repair_memory_per_range
This commit is contained in:
Botond Dénes
2022-11-23 14:02:30 +02:00
6 changed files with 102 additions and 30 deletions

View File

@@ -346,12 +346,12 @@ float node_ops_metrics::repair_finished_percentage() {
repair_module::repair_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept
: tasks::task_manager::module(tm, "repair")
, _rs(rs)
, _range_parallelism_semaphore(std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range() / 4)),
, _range_parallelism_semaphore(std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range / 4)),
named_semaphore_exception_factory{"repair range parallelism"})
{
auto nr = _range_parallelism_semaphore.available_units();
rlogger.info("Setting max_repair_memory={}, max_repair_memory_per_range={}, max_repair_ranges_in_parallel={}",
max_repair_memory, max_repair_memory_per_range(), nr);
max_repair_memory, max_repair_memory_per_range, nr);
}
void repair_module::start(repair_uniq_id id) {
@@ -916,6 +916,11 @@ private:
}
};
static future<tasks::task_manager::task_ptr> start_repair_task(tasks::task_manager::task::task_impl_ptr task_impl_ptr, shared_ptr<repair_module> module, tasks::task_info pd = {}) {
auto task = co_await module->make_task(std::move(task_impl_ptr), pd);
task->start();
co_return task;
}
static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
// Repair tables in the keyspace one after another
@@ -1097,9 +1102,20 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
co_return id.id;
}
// Do it in the background.
(void)get_repair_module().run(id, [this, &db, id, keyspace = std::move(keyspace), germs = std::move(germs),
cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable {
auto task_impl_ptr = std::make_unique<user_requested_repair_task_impl>(_repair_module, id, std::move(keyspace), format("{}", streaming::stream_reason::repair), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes));
auto task = co_await start_repair_task(std::move(task_impl_ptr), _repair_module);
co_return id.id;
}
future<> user_requested_repair_task_impl::run() {
auto module = dynamic_pointer_cast<repair_module>(_module);
auto& rs = module->get_repair_service();
auto& sharded_db = rs.get_db();
auto& db = sharded_db.local();
auto id = get_repair_uniq_id();
return module->run(id, [this, &rs, &db, id, keyspace = _status.keyspace, germs = std::move(_germs),
cfs = std::move(_cfs), ranges = std::move(_ranges), hosts = std::move(_hosts), data_centers = std::move(_data_centers), ignore_nodes = std::move(_ignore_nodes)] () mutable {
auto uuid = id.uuid();
bool needs_flush_before_repair = false;
@@ -1114,7 +1130,7 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
}
bool hints_batchlog_flushed = false;
auto participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, options.data_centers, options.hosts, ignore_nodes).get();
auto participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get();
if (needs_flush_before_repair) {
auto waiting_nodes = db.get_token_metadata().get_all_endpoints();
std::erase_if(waiting_nodes, [&] (const auto& addr) {
@@ -1125,11 +1141,11 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
try {
parallel_for_each(waiting_nodes, [this, uuid, &req, &participants] (gms::inet_address node) -> future<> {
parallel_for_each(waiting_nodes, [this, &rs, uuid, &req, &participants] (gms::inet_address node) -> future<> {
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
uuid, node, participants);
try {
auto& ms = get_messaging();
auto& ms = rs.get_messaging();
auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
(void)resp; // nothing to do with response yet
} catch (...) {
@@ -1151,14 +1167,14 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
repair_results.reserve(smp::count);
auto table_ids = get_table_ids(db, keyspace, cfs);
abort_source as;
auto off_strategy_updater = seastar::async([this, uuid = uuid.uuid(), &table_ids, &participants, &as] {
auto off_strategy_updater = seastar::async([this, &rs, uuid = uuid.uuid(), &table_ids, &participants, &as] {
auto tables = std::list<table_id>(table_ids.begin(), table_ids.end());
auto req = node_ops_cmd_request(node_ops_cmd::repair_updater, uuid, {}, {}, {}, {}, std::move(tables));
auto update_interval = std::chrono::seconds(30);
while (!as.abort_requested()) {
sleep_abortable(update_interval, as).get();
parallel_for_each(participants, [this, uuid, &req] (gms::inet_address node) {
return _messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
parallel_for_each(participants, [this, &rs, uuid, &req] (gms::inet_address node) {
return rs._messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
rlogger.debug("repair[{}]: Got node_ops_cmd::repair_updater response from node={}", uuid, node);
}).handle_exception([uuid, node] (std::exception_ptr ep) {
rlogger.warn("repair[{}]: Failed to send node_ops_cmd::repair_updater to node={}", uuid, node);
@@ -1180,21 +1196,21 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
rlogger.info("repair[{}]: Finished to shutdown off-strategy compaction updater", uuid);
});
auto cleanup_repair_range_history = defer([this, uuid] () mutable {
auto cleanup_repair_range_history = defer([this, &rs, uuid] () mutable {
try {
this->cleanup_history(uuid).get();
rs.cleanup_history(uuid).get();
} catch (...) {
rlogger.warn("repair[{}]: Failed to cleanup history: {}", uuid, std::current_exception());
}
});
if (get_repair_module().is_aborted(id.uuid())) {
if (rs.get_repair_module().is_aborted(id.uuid())) {
throw std::runtime_error("aborted by user request");
}
for (auto shard : boost::irange(unsigned(0), smp::count)) {
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed,
data_centers = options.data_centers, hosts = options.hosts, ignore_nodes, germs] (repair_service& local_repair) mutable {
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed,
data_centers, hosts, ignore_nodes, germs] (repair_service& local_repair) mutable {
local_repair.get_metrics().repair_total_ranges_sum += ranges.size();
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
@@ -1219,9 +1235,8 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
}).get();
}).handle_exception([id] (std::exception_ptr ep) {
rlogger.warn("repair[{}]: repair_tracker run failed: {}", id.uuid(), ep);
return make_exception_future<>(ep);
});
co_return id.id;
}
future<int> repair_start(seastar::sharded<repair_service>& repair,
@@ -1271,13 +1286,23 @@ future<> repair_service::sync_data_using_repair(
}
assert(this_shard_id() == 0);
auto& sharded_db = get_db();
auto task_impl_ptr = std::make_unique<data_sync_repair_task_impl>(_repair_module, _repair_module->new_repair_uniq_id(), std::move(keyspace), format("{}", reason), "", std::move(ranges), std::move(neighbors), reason, ops_info);
auto task = co_await start_repair_task(std::move(task_impl_ptr), _repair_module);
task->start();
co_await task->done();
}
future<> data_sync_repair_task_impl::run() {
auto module = dynamic_pointer_cast<repair_module>(_module);
auto& rs = module->get_repair_service();
auto& keyspace = _status.keyspace;
auto& sharded_db = rs.get_db();
auto& db = sharded_db.local();
auto germs = make_lw_shared(co_await locator::make_global_effective_replication_map(sharded_db, keyspace));
repair_uniq_id id = get_repair_module().new_repair_uniq_id();
auto id = get_repair_uniq_id();
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace);
co_await get_repair_module().run(id, [this, id, &db, keyspace, germs = std::move(germs), ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
co_await module->run(id, [this, &rs, id, &db, keyspace, germs = std::move(germs), &ranges = _ranges, &neighbors = _neighbors, reason = _reason, ops_info = _ops_info] () mutable {
auto cfs = list_column_families(db, keyspace);
if (cfs.empty()) {
rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid(), keyspace);
@@ -1286,11 +1311,11 @@ future<> repair_service::sync_data_using_repair(
auto table_ids = get_table_ids(db, keyspace, cfs);
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
if (get_repair_module().is_aborted(id.uuid())) {
if (rs.get_repair_module().is_aborted(id.uuid())) {
throw std::runtime_error("aborted by user request");
}
for (auto shard : boost::irange(unsigned(0), smp::count)) {
auto f = container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_info, germs] (repair_service& local_repair) mutable {
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, neighbors, reason, ops_info, germs] (repair_service& local_repair) mutable {
auto data_centers = std::vector<sstring>();
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();

View File

@@ -13,7 +13,7 @@
class repair_task_impl : public tasks::task_manager::task::impl {
public:
repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, tasks::task_id parent_id)
repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, tasks::task_id parent_id) noexcept
: tasks::task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_id) {
_status.progress_units = "ranges";
}
@@ -28,6 +28,50 @@ protected:
virtual future<> run() override = 0;
};
class user_requested_repair_task_impl : public repair_task_impl {
private:
lw_shared_ptr<locator::global_effective_replication_map> _germs;
std::vector<sstring> _cfs;
dht::token_range_vector _ranges;
std::vector<sstring> _hosts;
std::vector<sstring> _data_centers;
std::unordered_set<gms::inet_address> _ignore_nodes;
public:
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string type, std::string entity, lw_shared_ptr<locator::global_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes) noexcept
: repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(type), std::move(entity), tasks::task_id::create_null_id())
, _germs(germs)
, _cfs(std::move(cfs))
, _ranges(std::move(ranges))
, _hosts(std::move(hosts))
, _data_centers(std::move(data_centers))
, _ignore_nodes(std::move(ignore_nodes))
{}
protected:
future<> run() override;
// TODO: implement progress for user-requested repairs
};
class data_sync_repair_task_impl : public repair_task_impl {
private:
dht::token_range_vector _ranges;
std::unordered_map<dht::token_range, repair_neighbors> _neighbors;
streaming::stream_reason _reason;
shared_ptr<node_ops_info> _ops_info;
public:
data_sync_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string type, std::string entity, dht::token_range_vector ranges, std::unordered_map<dht::token_range, repair_neighbors> neighbors, streaming::stream_reason reason, shared_ptr<node_ops_info> ops_info)
: repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(type), std::move(entity), tasks::task_id::create_null_id())
, _ranges(std::move(ranges))
, _neighbors(std::move(neighbors))
, _reason(reason)
, _ops_info(ops_info)
{}
protected:
future<> run() override;
// TODO: implement progress for data-sync repairs
};
// The repair_module tracks ongoing repair operations and their progress.
// A repair which has already finished successfully is dropped from this
// table, but a failed repair will remain in the table forever so it can
@@ -47,11 +91,12 @@ private:
// The semaphore used to control the maximum
// ranges that can be repaired in parallel.
named_semaphore _range_parallelism_semaphore;
static constexpr size_t _max_repair_memory_per_range = 32 * 1024 * 1024;
seastar::condition_variable _done_cond;
void start(repair_uniq_id id);
void done(repair_uniq_id id, bool succeeded);
public:
static constexpr size_t max_repair_memory_per_range = 32 * 1024 * 1024;
repair_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept;
repair_service& get_repair_service() noexcept {
@@ -74,7 +119,6 @@ public:
size_t nr_running_repair_jobs();
void abort_all_repairs();
named_semaphore& range_parallelism_semaphore();
static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; }
future<> run(repair_uniq_id id, std::function<void ()> func);
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
float report_progress(streaming::stream_reason reason);

View File

@@ -2449,7 +2449,7 @@ private:
size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) {
// Max buffer size per repair round
return is_rpc_stream_supported(algo) ? repair_module::max_repair_memory_per_range() : 256 * 1024;
return is_rpc_stream_supported(algo) ? repair_module::max_repair_memory_per_range : 256 * 1024;
}
// Step A: Negotiate sync boundary to use
@@ -2733,7 +2733,7 @@ public:
auto& mem_sem = _ri.rs.memory_sem();
auto max = _ri.rs.max_repair_memory();
auto wanted = (_all_live_peer_nodes.size() + 1) * repair_module::max_repair_memory_per_range();
auto wanted = (_all_live_peer_nodes.size() + 1) * repair_module::max_repair_memory_per_range;
wanted = std::min(max, wanted);
rlogger.trace("repair[{}]: Started to get memory budget, wanted={}, available={}, max_repair_memory={}",
_ri.id.uuid(), wanted, mem_sem.current(), max);

View File

@@ -227,6 +227,9 @@ public:
future<> remove_repair_meta();
future<uint32_t> get_next_repair_meta_id();
friend class user_requested_repair_task_impl;
friend class data_sync_repair_task_impl;
};
class repair_info;

View File

@@ -98,7 +98,7 @@ public:
module_ptr _module;
abort_source _as;
public:
impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, task_id parent_id)
impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, task_id parent_id) noexcept
: _status({
.id = id,
.type = std::move(type),

View File

@@ -35,7 +35,7 @@ private:
promise<> _finish_run;
bool _finished = false;
public:
test_task_impl(task_manager::module_ptr module, task_id id, uint64_t sequence_number = 0, std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", task_id parent_id = task_id::create_null_id())
test_task_impl(task_manager::module_ptr module, task_id id, uint64_t sequence_number = 0, std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", task_id parent_id = task_id::create_null_id()) noexcept
: task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_id)
{}