From 4bba0c403e28c2b812e7deff02ab2cfcc0420d25 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 2 Nov 2017 23:27:58 -0200 Subject: [PATCH] compaction: Make resharding go through compaction manager Two reasons for this change: 1) every compaction should be multiplexed to manager which in turn will make decision when to schedule. improvements on it will immediately benefit every existing compaction type. 2) active tasks metric will now track ongoing reshard jobs. Fixes #2671. Signed-off-by: Raphael S. Carvalho Message-Id: <20170817224334.6402-1-raphaelsc@scylladb.com> (cherry picked from commit 10eaa2339e4c767470fe151849f5d38b5f7dbcc0) Signed-off-by: Raphael S. Carvalho Message-Id: <20171103012758.19428-1-raphaelsc@scylladb.com> --- database.cc | 18 +++++------------ sstables/compaction_manager.cc | 36 ++++++++++++++++++++++++++++++++++ sstables/compaction_manager.hh | 12 ++++++++++++ 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/database.cc b/database.cc index 998131c495..0d3e2e3086 100644 --- a/database.cc +++ b/database.cc @@ -1663,12 +1663,12 @@ template static future<> invoke_all_resharding_jobs(global_column_family_ptr cf, std::vector jobs, Func&& func) { return parallel_for_each(std::move(jobs), [cf, func] (sstables::resharding_descriptor& job) mutable { return forward_sstables_to(job.reshard_at, std::move(job.sstables), cf, - [func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) { - // used to ensure that only one reshard operation will run per shard. - static thread_local semaphore sem(1); - return with_semaphore(sem, 1, [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable { + [cf, func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) { + // compaction manager ensures that only one reshard operation will run per shard. + auto job = [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable { return func(std::move(sstables), level, max_sstable_bytes); - }); + }; + return cf->get_compaction_manager().run_resharding_job(&*cf, std::move(job)); }); }); } @@ -1766,14 +1766,6 @@ void distributed_loader::reshard(distributed& db, sstring ks_name, sst }); } }); - }).then_wrapped([] (future<> f) { - try { - f.get(); - } catch (sstables::compaction_stop_exception& e) { - dblog.info("resharding was abruptly stopped, reason: {}", e.what()); - } catch (...) { - dblog.error("resharding failed: {}", std::current_exception()); - } }); }).get(); }); diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index 5d2ab939c7..56625e218c 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -314,6 +314,42 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) { return task->compaction_done.get_future().then([task] {}); } +future<> compaction_manager::run_resharding_job(column_family* cf, std::function()> job) { + if (_stopped) { + return make_ready_future<>(); + } + auto task = make_lw_shared(); + task->compacting_cf = cf; + _tasks.push_back(task); + + task->compaction_done = with_semaphore(_resharding_sem, 1, [this, task, cf, job = std::move(job)] { + // take read lock for cf, so major compaction and resharding can't proceed in parallel. + return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] { + _stats.active_tasks++; + if (!can_proceed(task)) { + return make_ready_future<>(); + } + + // NOTE: + // no need to register shared sstables because they're excluded from non-resharding + // compaction and some of them may not even belong to current shard. + + return job(); + }); + }).then_wrapped([this, task] (future<> f) { + _stats.active_tasks--; + _tasks.remove(task); + try { + f.get(); + } catch (sstables::compaction_stop_exception& e) { + cmlog.info("resharding was abruptly stopped, reason: {}", e.what()); + } catch (...) { + cmlog.error("resharding failed: {}", std::current_exception()); + } + }); + return task->compaction_done.get_future().then([task] {}); +} + future<> compaction_manager::task_stop(lw_shared_ptr task) { task->stopping = true; auto f = task->compaction_done.get_future(); diff --git a/sstables/compaction_manager.hh b/sstables/compaction_manager.hh index d3b0889c81..72083fd9b5 100644 --- a/sstables/compaction_manager.hh +++ b/sstables/compaction_manager.hh @@ -82,6 +82,8 @@ private: semaphore _major_compaction_sem{1}; // Prevents column family from running major and minor compaction at same time. std::unordered_map _compaction_locks; + + semaphore _resharding_sem{1}; private: future<> task_stop(lw_shared_ptr task); @@ -147,6 +149,16 @@ public: // Submit a column family for major compaction. future<> submit_major_compaction(column_family* cf); + // Run a resharding job for a given column family. + // it completes when future returned by job is ready or returns immediately + // if manager was asked to stop. + // + // parameter job is a function that will carry the reshard operation on a set + // of sstables that belong to different shards for this column family using + // sstables::reshard_sstables(), and in the end, it will forward unshared + // sstables created by the process to their owner shards. + future<> run_resharding_job(column_family* cf, std::function()> job); + // Remove a column family from the compaction manager. // Cancel requests on cf and wait for a possible ongoing compaction on cf. future<> remove(column_family* cf);