diff --git a/database.cc b/database.cc index 998131c495..0d3e2e3086 100644 --- a/database.cc +++ b/database.cc @@ -1663,12 +1663,12 @@ template static future<> invoke_all_resharding_jobs(global_column_family_ptr cf, std::vector jobs, Func&& func) { return parallel_for_each(std::move(jobs), [cf, func] (sstables::resharding_descriptor& job) mutable { return forward_sstables_to(job.reshard_at, std::move(job.sstables), cf, - [func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) { - // used to ensure that only one reshard operation will run per shard. - static thread_local semaphore sem(1); - return with_semaphore(sem, 1, [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable { + [cf, func, level = job.level, max_sstable_bytes = job.max_sstable_bytes] (auto sstables) { + // compaction manager ensures that only one reshard operation will run per shard. + auto job = [func, sstables = std::move(sstables), level, max_sstable_bytes] () mutable { return func(std::move(sstables), level, max_sstable_bytes); - }); + }; + return cf->get_compaction_manager().run_resharding_job(&*cf, std::move(job)); }); }); } @@ -1766,14 +1766,6 @@ void distributed_loader::reshard(distributed& db, sstring ks_name, sst }); } }); - }).then_wrapped([] (future<> f) { - try { - f.get(); - } catch (sstables::compaction_stop_exception& e) { - dblog.info("resharding was abruptly stopped, reason: {}", e.what()); - } catch (...) { - dblog.error("resharding failed: {}", std::current_exception()); - } }); }).get(); }); diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index 5d2ab939c7..56625e218c 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -314,6 +314,42 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) { return task->compaction_done.get_future().then([task] {}); } +future<> compaction_manager::run_resharding_job(column_family* cf, std::function()> job) { + if (_stopped) { + return make_ready_future<>(); + } + auto task = make_lw_shared(); + task->compacting_cf = cf; + _tasks.push_back(task); + + task->compaction_done = with_semaphore(_resharding_sem, 1, [this, task, cf, job = std::move(job)] { + // take read lock for cf, so major compaction and resharding can't proceed in parallel. + return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, job = std::move(job)] { + _stats.active_tasks++; + if (!can_proceed(task)) { + return make_ready_future<>(); + } + + // NOTE: + // no need to register shared sstables because they're excluded from non-resharding + // compaction and some of them may not even belong to current shard. + + return job(); + }); + }).then_wrapped([this, task] (future<> f) { + _stats.active_tasks--; + _tasks.remove(task); + try { + f.get(); + } catch (sstables::compaction_stop_exception& e) { + cmlog.info("resharding was abruptly stopped, reason: {}", e.what()); + } catch (...) { + cmlog.error("resharding failed: {}", std::current_exception()); + } + }); + return task->compaction_done.get_future().then([task] {}); +} + future<> compaction_manager::task_stop(lw_shared_ptr task) { task->stopping = true; auto f = task->compaction_done.get_future(); diff --git a/sstables/compaction_manager.hh b/sstables/compaction_manager.hh index d3b0889c81..72083fd9b5 100644 --- a/sstables/compaction_manager.hh +++ b/sstables/compaction_manager.hh @@ -82,6 +82,8 @@ private: semaphore _major_compaction_sem{1}; // Prevents column family from running major and minor compaction at same time. std::unordered_map _compaction_locks; + + semaphore _resharding_sem{1}; private: future<> task_stop(lw_shared_ptr task); @@ -147,6 +149,16 @@ public: // Submit a column family for major compaction. future<> submit_major_compaction(column_family* cf); + // Run a resharding job for a given column family. + // it completes when future returned by job is ready or returns immediately + // if manager was asked to stop. + // + // parameter job is a function that will carry the reshard operation on a set + // of sstables that belong to different shards for this column family using + // sstables::reshard_sstables(), and in the end, it will forward unshared + // sstables created by the process to their owner shards. + future<> run_resharding_job(column_family* cf, std::function()> job); + // Remove a column family from the compaction manager. // Cancel requests on cf and wait for a possible ongoing compaction on cf. future<> remove(column_family* cf);