repair: Add ranges_parallelism option
This patch adds the ranges_parallelism option to repair restful API. Users can use this option to optionally specify the number of ranges to repair in parallel per repair job to a smaller number than the Scylla core calculated default max_repair_ranges_in_parallel. Scylla manager can also use this option to provide more ranges (>N) in a single repair job but only repairing N ranges_parallelism in parallel, instead of providing N ranges in a repair job. To make it safer, unlike the PR #4848, this patch does not allow user to exceed the max_repair_ranges_in_parallel. Fixes #4847
This commit is contained in:
@@ -1114,6 +1114,14 @@
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"ranges_parallelism",
|
||||
"description":"An integer specifying the number of ranges to repair in parallel by user request. If this number is bigger than the max_repair_ranges_in_parallel calculated by Scylla core, the smaller one will be used.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
@@ -320,7 +320,7 @@ void set_repair(http_context& ctx, routes& r, sharded<repair_service>& repair) {
|
||||
ss::repair_async.set(r, [&ctx, &repair](std::unique_ptr<http::request> req) {
|
||||
static std::vector<sstring> options = {"primaryRange", "parallelism", "incremental",
|
||||
"jobThreads", "ranges", "columnFamilies", "dataCenters", "hosts", "ignore_nodes", "trace",
|
||||
"startToken", "endToken" };
|
||||
"startToken", "endToken", "ranges_parallelism"};
|
||||
std::unordered_map<sstring, sstring> options_map;
|
||||
for (auto o : options) {
|
||||
auto s = req->get_query_param(o);
|
||||
|
||||
@@ -563,7 +563,8 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
|
||||
const std::vector<sstring>& hosts_,
|
||||
const std::unordered_set<gms::inet_address>& ignore_nodes_,
|
||||
streaming::stream_reason reason_,
|
||||
bool hints_batchlog_flushed)
|
||||
bool hints_batchlog_flushed,
|
||||
std::optional<int> ranges_parallelism)
|
||||
: repair_task_impl(module, id, 0, keyspace, "", "", parent_id_.uuid(), reason_)
|
||||
, rs(repair)
|
||||
, db(repair.get_db())
|
||||
@@ -584,7 +585,11 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
|
||||
, total_rf(erm->get_replication_factor())
|
||||
, nr_ranges_total(ranges.size())
|
||||
, _hints_batchlog_flushed(std::move(hints_batchlog_flushed))
|
||||
{ }
|
||||
, _user_ranges_parallelism(ranges_parallelism ? std::optional<semaphore>(semaphore(*ranges_parallelism)) : std::nullopt)
|
||||
{
|
||||
rlogger.debug("repair[{}]: Setting user_ranges_parallelism to {}", global_repair_id.uuid(),
|
||||
_user_ranges_parallelism ? std::to_string(_user_ranges_parallelism->available_units()) : "unlimited");
|
||||
}
|
||||
|
||||
void repair::shard_repair_task_impl::check_failed_ranges() {
|
||||
rlogger.info("repair[{}]: stats: repair_reason={}, keyspace={}, tables={}, ranges_nr={}, {}",
|
||||
@@ -794,6 +799,8 @@ struct repair_options {
|
||||
// repair to a data center other than the named one returns an error.
|
||||
std::vector<sstring> data_centers;
|
||||
|
||||
int ranges_parallelism = -1;
|
||||
|
||||
repair_options(std::unordered_map<sstring, sstring> options) {
|
||||
bool_opt(primary_range, options, PRIMARY_RANGE_KEY);
|
||||
ranges_opt(ranges, options, RANGES_KEY);
|
||||
@@ -829,6 +836,8 @@ struct repair_options {
|
||||
int job_threads;
|
||||
int_opt(job_threads, options, JOB_THREADS_KEY);
|
||||
|
||||
int_opt(ranges_parallelism, options, RANGES_PARALLELISM_KEY);
|
||||
|
||||
// The parsing code above removed from the map options we have parsed.
|
||||
// If anything is left there in the end, it's an unsupported option.
|
||||
if (!options.empty()) {
|
||||
@@ -849,6 +858,7 @@ struct repair_options {
|
||||
static constexpr const char* TRACE_KEY = "trace";
|
||||
static constexpr const char* START_TOKEN = "startToken";
|
||||
static constexpr const char* END_TOKEN = "endToken";
|
||||
static constexpr const char* RANGES_PARALLELISM_KEY = "ranges_parallelism";
|
||||
|
||||
// Settings of "parallelism" option. Numbers must match Cassandra's
|
||||
// RepairParallelism enum, which is used by the caller.
|
||||
@@ -952,6 +962,8 @@ future<> repair::shard_repair_task_impl::do_repair_ranges() {
|
||||
co_await coroutine::parallel_for_each(ranges, [this, table_id] (auto&& range) -> future<> {
|
||||
// Get the system range parallelism
|
||||
auto permit = co_await seastar::get_units(rs.get_repair_module().range_parallelism_semaphore(), 1);
|
||||
// Get the range parallelism specified by user
|
||||
auto user_permit = _user_ranges_parallelism ? co_await seastar::get_units(*_user_ranges_parallelism, 1) : semaphore_units<>();
|
||||
co_await repair_range(range, table_id);
|
||||
if (_reason == streaming::stream_reason::bootstrap) {
|
||||
rs.get_metrics().bootstrap_finished_ranges++;
|
||||
@@ -1128,7 +1140,8 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
|
||||
co_return id.id;
|
||||
}
|
||||
|
||||
auto task = co_await _repair_module->make_and_start_task<repair::user_requested_repair_task_impl>({}, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes));
|
||||
auto ranges_parallelism = options.ranges_parallelism == -1 ? std::nullopt : std::optional<int>(options.ranges_parallelism);
|
||||
auto task = co_await _repair_module->make_and_start_task<repair::user_requested_repair_task_impl>({}, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes), ranges_parallelism);
|
||||
co_return id.id;
|
||||
}
|
||||
|
||||
@@ -1235,13 +1248,14 @@ future<> repair::user_requested_repair_task_impl::run() {
|
||||
throw std::runtime_error("aborted by user request");
|
||||
}
|
||||
|
||||
auto ranges_parallelism = _ranges_parallelism;
|
||||
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
||||
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed,
|
||||
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, ranges_parallelism,
|
||||
data_centers, hosts, ignore_nodes, parent_data = get_repair_uniq_id().task_info, germs] (repair_service& local_repair) mutable -> future<> {
|
||||
local_repair.get_metrics().repair_total_ranges_sum += ranges.size();
|
||||
auto task = co_await local_repair._repair_module->make_and_start_task<repair::shard_repair_task_impl>(parent_data, tasks::task_id::create_random_id(), keyspace,
|
||||
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed);
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, ranges_parallelism);
|
||||
co_await task->done();
|
||||
});
|
||||
repair_results.push_back(std::move(f));
|
||||
@@ -1345,9 +1359,10 @@ future<> repair::data_sync_repair_task_impl::run() {
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ignore_nodes = std::unordered_set<gms::inet_address>();
|
||||
bool hints_batchlog_flushed = false;
|
||||
auto ranges_parallelism = std::nullopt;
|
||||
auto task_impl_ptr = seastar::make_shared<repair::shard_repair_task_impl>(local_repair._repair_module, tasks::task_id::create_random_id(), keyspace,
|
||||
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed);
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, ranges_parallelism);
|
||||
task_impl_ptr->neighbors = std::move(neighbors);
|
||||
auto task = co_await local_repair._repair_module->make_task(std::move(task_impl_ptr), parent_data);
|
||||
task->start();
|
||||
|
||||
@@ -45,8 +45,9 @@ private:
|
||||
std::vector<sstring> _hosts;
|
||||
std::vector<sstring> _data_centers;
|
||||
std::unordered_set<gms::inet_address> _ignore_nodes;
|
||||
std::optional<int> _ranges_parallelism;
|
||||
public:
|
||||
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_vnode_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes) noexcept
|
||||
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_vnode_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes, std::optional<int> ranges_parallelism) noexcept
|
||||
: repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair)
|
||||
, _germs(germs)
|
||||
, _cfs(std::move(cfs))
|
||||
@@ -54,6 +55,7 @@ public:
|
||||
, _hosts(std::move(hosts))
|
||||
, _data_centers(std::move(data_centers))
|
||||
, _ignore_nodes(std::move(ignore_nodes))
|
||||
, _ranges_parallelism(ranges_parallelism)
|
||||
{}
|
||||
|
||||
virtual tasks::is_abortable is_abortable() const noexcept override {
|
||||
@@ -123,6 +125,7 @@ public:
|
||||
private:
|
||||
bool _aborted = false;
|
||||
std::optional<sstring> _failed_because;
|
||||
std::optional<semaphore> _user_ranges_parallelism;
|
||||
public:
|
||||
shard_repair_task_impl(tasks::task_manager::module_ptr module,
|
||||
tasks::task_id id,
|
||||
@@ -136,7 +139,8 @@ public:
|
||||
const std::vector<sstring>& hosts_,
|
||||
const std::unordered_set<gms::inet_address>& ignore_nodes_,
|
||||
streaming::stream_reason reason_,
|
||||
bool hints_batchlog_flushed);
|
||||
bool hints_batchlog_flushed,
|
||||
std::optional<int> ranges_parallelism);
|
||||
virtual tasks::is_internal is_internal() const noexcept override {
|
||||
return tasks::is_internal::yes;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user