mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-24 02:20:37 +00:00
compaction_manager: fix use-after-free in postponed_compactions_reevaluation()
drain() signals the postponed_reevaluation condition variable to terminate
the postponed_compactions_reevaluation() coroutine but does not await its
completion. When enable() is called afterwards, it overwrites
_waiting_reevalution with a new coroutine, orphaning the old one. During
shutdown, really_do_stop() only awaits the latest coroutine via
_waiting_reevalution, leaving the orphaned coroutine still alive. After
sharded::stop() destroys the compaction_manager, the orphaned coroutine
resumes and reads freed memory (is_disabled() accesses _state).
Fix by introducing stop_postponed_compactions(), awaiting the reevaluation coroutine in
both drain() and stop() after signaling it, if postponed_compactions_reevaluation() is running.
It uses an std::optional<future<>> for _waiting_reevalution and std::exchange to leave
_waiting_reevalution disengaged when postponed_compactions_reevaluation() is not running.
This prevents a race between drain() and stop().
While at it, fix typo in _waiting_reevalution -> _waiting_reevaluation.
Fixes: SCYLLADB-1463
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Closes scylladb/scylladb#29443
(cherry picked from commit 05a00fe140)
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Closes scylladb/scylladb#29527
This commit is contained in:
committed by
Patryk Jędrzejczak
parent
270ae28c00
commit
cdbf53e9d7
@@ -1128,7 +1128,10 @@ void compaction_manager::enable() {
|
||||
|
||||
_compaction_submission_timer.cancel();
|
||||
_compaction_submission_timer.arm_periodic(periodic_compaction_submission_interval());
|
||||
_waiting_reevalution = postponed_compactions_reevaluation();
|
||||
if (_waiting_reevaluation) {
|
||||
on_internal_error(cmlog, "postponed compactions reevaluation is already running when enabling compaction manager");
|
||||
}
|
||||
_waiting_reevaluation.emplace(postponed_compactions_reevaluation());
|
||||
cmlog.info("Enabled");
|
||||
}
|
||||
|
||||
@@ -1176,6 +1179,16 @@ void compaction_manager::reevaluate_postponed_compactions() noexcept {
|
||||
_postponed_reevaluation.signal();
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop_postponed_compactions() noexcept {
|
||||
auto waiting_reevaluation = std::exchange(_waiting_reevaluation, std::nullopt);
|
||||
if (!waiting_reevaluation) {
|
||||
return make_ready_future();
|
||||
}
|
||||
// Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber
|
||||
reevaluate_postponed_compactions();
|
||||
return std::move(*waiting_reevaluation);
|
||||
}
|
||||
|
||||
void compaction_manager::postpone_compaction_for_table(compaction_group_view* t) {
|
||||
_postponed.insert(t);
|
||||
}
|
||||
@@ -1259,8 +1272,7 @@ future<> compaction_manager::drain() {
|
||||
_compaction_submission_timer.cancel();
|
||||
// Stop ongoing compactions, if the request has not been sent already and wait for them to stop.
|
||||
co_await stop_ongoing_compactions("drain");
|
||||
// Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber
|
||||
reevaluate_postponed_compactions();
|
||||
co_await stop_postponed_compactions();
|
||||
cmlog.info("Drained");
|
||||
}
|
||||
|
||||
@@ -1304,8 +1316,7 @@ future<> compaction_manager::really_do_stop() noexcept {
|
||||
if (!_tasks.empty()) {
|
||||
on_fatal_internal_error(cmlog, format("{} tasks still exist after being stopped", _tasks.size()));
|
||||
}
|
||||
reevaluate_postponed_compactions();
|
||||
co_await std::move(_waiting_reevalution);
|
||||
co_await stop_postponed_compactions();
|
||||
co_await _sys_ks.close();
|
||||
_weight_tracker.clear();
|
||||
_compaction_submission_timer.cancel();
|
||||
|
||||
@@ -127,7 +127,7 @@ private:
|
||||
// a sstable from being compacted twice.
|
||||
std::unordered_set<sstables::shared_sstable> _compacting_sstables;
|
||||
|
||||
future<> _waiting_reevalution = make_ready_future<>();
|
||||
std::optional<future<>> _waiting_reevaluation;
|
||||
condition_variable _postponed_reevaluation;
|
||||
// tables that wait for compaction but had its submission postponed due to ongoing compaction.
|
||||
std::unordered_set<compaction::compaction_group_view*> _postponed;
|
||||
@@ -237,6 +237,7 @@ private:
|
||||
|
||||
future<> postponed_compactions_reevaluation();
|
||||
void reevaluate_postponed_compactions() noexcept;
|
||||
future<> stop_postponed_compactions() noexcept;
|
||||
// Postpone compaction for a table that couldn't be executed due to ongoing
|
||||
// similar-sized compaction.
|
||||
void postpone_compaction_for_table(compaction::compaction_group_view* t);
|
||||
|
||||
Reference in New Issue
Block a user