mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
compaction: Make resharding go through compaction manager
Two reasons for this change:
1) every compaction should be multiplexed to manager which in turn
will make decision when to schedule. improvements on it will
immediately benefit every existing compaction type.
2) active tasks metric will now track ongoing reshard jobs.
Fixes #2671.
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20170817224334.6402-1-raphaelsc@scylladb.com>
(cherry picked from commit 10eaa2339e)
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20171103012758.19428-1-raphaelsc@scylladb.com>
This commit is contained in:
committed by
Pekka Enberg
parent
59aae504ae
commit
4bba0c403e
18
database.cc
18
database.cc
@@ -1663,12 +1663,12 @@ template <typename Func>
|
||||
static future<> invoke_all_resharding_jobs(global_column_family_ptr cf, std::vector<sstables::resharding_descriptor> jobs, Func&& func) {
|
||||
return parallel_for_each(std::move(jobs), [cf, func] (sstables::resharding_descriptor& job) mutable {
|
||||
return forward_sstables_to(job.reshard_at, std::move(job.sstables), cf,
|
||||
[func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) {
|
||||
// used to ensure that only one reshard operation will run per shard.
|
||||
static thread_local semaphore sem(1);
|
||||
return with_semaphore(sem, 1, [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable {
|
||||
[cf, func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) {
|
||||
// compaction manager ensures that only one reshard operation will run per shard.
|
||||
auto job = [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable {
|
||||
return func(std::move(sstables), level, max_sstable_bytes);
|
||||
});
|
||||
};
|
||||
return cf->get_compaction_manager().run_resharding_job(&*cf, std::move(job));
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1766,14 +1766,6 @@ void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sst
|
||||
});
|
||||
}
|
||||
});
|
||||
}).then_wrapped([] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (sstables::compaction_stop_exception& e) {
|
||||
dblog.info("resharding was abruptly stopped, reason: {}", e.what());
|
||||
} catch (...) {
|
||||
dblog.error("resharding failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
|
||||
@@ -314,6 +314,42 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
|
||||
return task->compaction_done.get_future().then([task] {});
|
||||
}
|
||||
|
||||
future<> compaction_manager::run_resharding_job(column_family* cf, std::function<future<>()> job) {
|
||||
if (_stopped) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto task = make_lw_shared<compaction_manager::task>();
|
||||
task->compacting_cf = cf;
|
||||
_tasks.push_back(task);
|
||||
|
||||
task->compaction_done = with_semaphore(_resharding_sem, 1, [this, task, cf, job = std::move(job)] {
|
||||
// take read lock for cf, so major compaction and resharding can't proceed in parallel.
|
||||
return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] {
|
||||
_stats.active_tasks++;
|
||||
if (!can_proceed(task)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// NOTE:
|
||||
// no need to register shared sstables because they're excluded from non-resharding
|
||||
// compaction and some of them may not even belong to current shard.
|
||||
|
||||
return job();
|
||||
});
|
||||
}).then_wrapped([this, task] (future<> f) {
|
||||
_stats.active_tasks--;
|
||||
_tasks.remove(task);
|
||||
try {
|
||||
f.get();
|
||||
} catch (sstables::compaction_stop_exception& e) {
|
||||
cmlog.info("resharding was abruptly stopped, reason: {}", e.what());
|
||||
} catch (...) {
|
||||
cmlog.error("resharding failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
return task->compaction_done.get_future().then([task] {});
|
||||
}
|
||||
|
||||
future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> task) {
|
||||
task->stopping = true;
|
||||
auto f = task->compaction_done.get_future();
|
||||
|
||||
@@ -82,6 +82,8 @@ private:
|
||||
semaphore _major_compaction_sem{1};
|
||||
// Prevents column family from running major and minor compaction at same time.
|
||||
std::unordered_map<column_family*, rwlock> _compaction_locks;
|
||||
|
||||
semaphore _resharding_sem{1};
|
||||
private:
|
||||
future<> task_stop(lw_shared_ptr<task> task);
|
||||
|
||||
@@ -147,6 +149,16 @@ public:
|
||||
// Submit a column family for major compaction.
|
||||
future<> submit_major_compaction(column_family* cf);
|
||||
|
||||
// Run a resharding job for a given column family.
|
||||
// it completes when future returned by job is ready or returns immediately
|
||||
// if manager was asked to stop.
|
||||
//
|
||||
// parameter job is a function that will carry the reshard operation on a set
|
||||
// of sstables that belong to different shards for this column family using
|
||||
// sstables::reshard_sstables(), and in the end, it will forward unshared
|
||||
// sstables created by the process to their owner shards.
|
||||
future<> run_resharding_job(column_family* cf, std::function<future<>()> job);
|
||||
|
||||
// Remove a column family from the compaction manager.
|
||||
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
|
||||
future<> remove(column_family* cf);
|
||||
|
||||
Reference in New Issue
Block a user