compaction: Make resharding go through compaction manager
Two reasons for this change: 1) every compaction should be multiplexed to manager which in turn will make decision when to schedule. improvements on it will immediately benefit every existing compaction type. 2) active tasks metric will now track ongoing reshard jobs. Fixes #2671. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Message-Id: <20170817224334.6402-1-raphaelsc@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
38b2ff617f
commit
10eaa2339e
18
database.cc
18
database.cc
@@ -1667,12 +1667,12 @@ template <typename Func>
|
||||
static future<> invoke_all_resharding_jobs(global_column_family_ptr cf, std::vector<sstables::resharding_descriptor> jobs, Func&& func) {
|
||||
return parallel_for_each(std::move(jobs), [cf, func] (sstables::resharding_descriptor& job) mutable {
|
||||
return forward_sstables_to(job.reshard_at, std::move(job.sstables), cf,
|
||||
[func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) {
|
||||
// used to ensure that only one reshard operation will run per shard.
|
||||
static thread_local semaphore sem(1);
|
||||
return with_semaphore(sem, 1, [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable {
|
||||
[cf, func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) {
|
||||
// compaction manager ensures that only one reshard operation will run per shard.
|
||||
auto job = [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable {
|
||||
return func(std::move(sstables), level, max_sstable_bytes);
|
||||
});
|
||||
};
|
||||
return cf->get_compaction_manager().submit_resharding_job(&*cf, std::move(job));
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1770,14 +1770,6 @@ void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sst
|
||||
});
|
||||
}
|
||||
});
|
||||
}).then_wrapped([] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (sstables::compaction_stop_exception& e) {
|
||||
dblog.info("resharding was abruptly stopped, reason: {}", e.what());
|
||||
} catch (...) {
|
||||
dblog.error("resharding failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
|
||||
@@ -314,6 +314,42 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
|
||||
return task->compaction_done.get_future().then([task] {});
|
||||
}
|
||||
|
||||
future<> compaction_manager::submit_resharding_job(column_family* cf, std::function<future<>()> job) {
|
||||
if (_stopped) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto task = make_lw_shared<compaction_manager::task>();
|
||||
task->compacting_cf = cf;
|
||||
_tasks.push_back(task);
|
||||
|
||||
task->compaction_done = with_semaphore(_resharding_sem, 1, [this, task, cf, job = std::move(job)] {
|
||||
// take read lock for cf, so major compaction and resharding can't proceed in parallel.
|
||||
return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] {
|
||||
_stats.active_tasks++;
|
||||
if (!can_proceed(task)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// NOTE:
|
||||
// no need to register shared sstables because they're excluded from non-resharding
|
||||
// compaction and some of them may not even belong to current shard.
|
||||
|
||||
return job();
|
||||
});
|
||||
}).then_wrapped([this, task] (future<> f) {
|
||||
_stats.active_tasks--;
|
||||
_tasks.remove(task);
|
||||
try {
|
||||
f.get();
|
||||
} catch (sstables::compaction_stop_exception& e) {
|
||||
cmlog.info("resharding was abruptly stopped, reason: {}", e.what());
|
||||
} catch (...) {
|
||||
cmlog.error("resharding failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
return task->compaction_done.get_future().then([task] {});
|
||||
}
|
||||
|
||||
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> task) {
|
||||
task->stopping = true;
|
||||
auto f = task->compaction_done.get_future();
|
||||
|
||||
@@ -83,6 +83,8 @@ private:
|
||||
// Prevents column family from running major and minor compaction at same time.
|
||||
std::unordered_map<column_family*, rwlock> _compaction_locks;
|
||||
|
||||
semaphore _resharding_sem{1};
|
||||
|
||||
std::function<void()> compaction_submission_callback();
|
||||
// all registered column families are submitted for compaction at a constant interval.
|
||||
// Submission is a NO-OP when there's nothing to do, so it's fine to call it regularly.
|
||||
@@ -153,6 +155,14 @@ public:
|
||||
// Submit a column family for major compaction.
|
||||
future<> submit_major_compaction(column_family* cf);
|
||||
|
||||
// Submit a resharding job for resharding compaction on behalf of a single
|
||||
// column family.
|
||||
// parameter job is a function that will carry the reshard operation on a set
|
||||
// of sstables that belong to different shards for this column family using
|
||||
// sstables::reshard_sstables(), and in the end, it will forward unshared
|
||||
// sstables created by the process to their owner shards.
|
||||
future<> submit_resharding_job(column_family* cf, std::function<future<>()> job);
|
||||
|
||||
// Remove a column family from the compaction manager.
|
||||
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
|
||||
future<> remove(column_family* cf);
|
||||
|
||||
Reference in New Issue
Block a user