compaction: Make resharding go through compaction manager

Two reasons for this change:
1) every compaction should be multiplexed to manager which in turn
will make decision when to schedule. improvements on it will
immediately benefit every existing compaction type.
2) active tasks metric will now track ongoing reshard jobs.

Fixes #2671.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20170817224334.6402-1-raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2017-08-17 19:43:34 -03:00
committed by Avi Kivity
parent 38b2ff617f
commit 10eaa2339e
3 changed files with 51 additions and 13 deletions

View File

@@ -1667,12 +1667,12 @@ template <typename Func>
static future<> invoke_all_resharding_jobs(global_column_family_ptr cf, std::vector<sstables::resharding_descriptor> jobs, Func&& func) {
return parallel_for_each(std::move(jobs), [cf, func] (sstables::resharding_descriptor& job) mutable {
return forward_sstables_to(job.reshard_at, std::move(job.sstables), cf,
[func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) {
// used to ensure that only one reshard operation will run per shard.
static thread_local semaphore sem(1);
return with_semaphore(sem, 1, [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable {
[cf, func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) {
// compaction manager ensures that only one reshard operation will run per shard.
auto job = [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable {
return func(std::move(sstables), level, max_sstable_bytes);
});
};
return cf->get_compaction_manager().submit_resharding_job(&*cf, std::move(job));
});
});
}
@@ -1770,14 +1770,6 @@ void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sst
});
}
});
}).then_wrapped([] (future<> f) {
try {
f.get();
} catch (sstables::compaction_stop_exception& e) {
dblog.info("resharding was abruptly stopped, reason: {}", e.what());
} catch (...) {
dblog.error("resharding failed: {}", std::current_exception());
}
});
}).get();
});

View File

@@ -314,6 +314,42 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
return task->compaction_done.get_future().then([task] {});
}
future<> compaction_manager::submit_resharding_job(column_family* cf, std::function<future<>()> job) {
if (_stopped) {
return make_ready_future<>();
}
auto task = make_lw_shared<compaction_manager::task>();
task->compacting_cf = cf;
_tasks.push_back(task);
task->compaction_done = with_semaphore(_resharding_sem, 1, [this, task, cf, job = std::move(job)] {
// take read lock for cf, so major compaction and resharding can't proceed in parallel.
return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] {
_stats.active_tasks++;
if (!can_proceed(task)) {
return make_ready_future<>();
}
// NOTE:
// no need to register shared sstables because they're excluded from non-resharding
// compaction and some of them may not even belong to current shard.
return job();
});
}).then_wrapped([this, task] (future<> f) {
_stats.active_tasks--;
_tasks.remove(task);
try {
f.get();
} catch (sstables::compaction_stop_exception& e) {
cmlog.info("resharding was abruptly stopped, reason: {}", e.what());
} catch (...) {
cmlog.error("resharding failed: {}", std::current_exception());
}
});
return task->compaction_done.get_future().then([task] {});
}
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> task) {
task->stopping = true;
auto f = task->compaction_done.get_future();

View File

@@ -83,6 +83,8 @@ private:
// Prevents column family from running major and minor compaction at same time.
std::unordered_map<column_family*, rwlock> _compaction_locks;
semaphore _resharding_sem{1};
std::function<void()> compaction_submission_callback();
// all registered column families are submitted for compaction at a constant interval.
// Submission is a NO-OP when there's nothing to do, so it's fine to call it regularly.
@@ -153,6 +155,14 @@ public:
// Submit a column family for major compaction.
future<> submit_major_compaction(column_family* cf);
// Submit a resharding job for resharding compaction on behalf of a single
// column family.
// parameter job is a function that will carry the reshard operation on a set
// of sstables that belong to different shards for this column family using
// sstables::reshard_sstables(), and in the end, it will forward unshared
// sstables created by the process to their owner shards.
future<> submit_resharding_job(column_family* cf, std::function<future<>()> job);
// Remove a column family from the compaction manager.
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
future<> remove(column_family* cf);