|
|
|
|
@@ -254,10 +254,32 @@ compaction_manager::compaction_state& compaction_manager::get_compaction_state(r
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
shared_ptr<compaction_manager::task> compaction_manager::register_task(shared_ptr<compaction_manager::task> task) {
|
|
|
|
|
future<> compaction_manager::perform_task(shared_ptr<compaction_manager::task> task) {
|
|
|
|
|
_tasks.push_back(task);
|
|
|
|
|
auto unregister_task = defer([this, task] {
|
|
|
|
|
_tasks.remove(task);
|
|
|
|
|
});
|
|
|
|
|
cmlog.debug("{}: started", *task);
|
|
|
|
|
return task;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
co_await task->run();
|
|
|
|
|
cmlog.debug("{}: done", *task);
|
|
|
|
|
} catch (sstables::compaction_stopped_exception& e) {
|
|
|
|
|
cmlog.info("{}: stopped, reason: {}", *task, e.what());
|
|
|
|
|
} catch (sstables::compaction_aborted_exception& e) {
|
|
|
|
|
cmlog.error("{}: aborted, reason: {}", *task, e.what());
|
|
|
|
|
_stats.errors++;
|
|
|
|
|
throw;
|
|
|
|
|
} catch (storage_io_error& e) {
|
|
|
|
|
_stats.errors++;
|
|
|
|
|
cmlog.error("{}: failed due to storage io error: {}: stopping", *task, e.what());
|
|
|
|
|
do_stop();
|
|
|
|
|
throw;
|
|
|
|
|
} catch (...) {
|
|
|
|
|
cmlog.error("{}: failed, reason {}: stopping", *task, std::current_exception());
|
|
|
|
|
_stats.errors++;
|
|
|
|
|
throw;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class compaction_manager::major_compaction_task : public compaction_manager::task {
|
|
|
|
|
@@ -265,66 +287,76 @@ public:
|
|
|
|
|
major_compaction_task(compaction_manager& mgr, replica::table* t)
|
|
|
|
|
: task(mgr, t, sstables::compaction_type::Compaction, "Major compaction")
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
// first take major compaction semaphore, then exclusely take compaction lock for table.
|
|
|
|
|
// it cannot be the other way around, or minor compaction for this table would be
|
|
|
|
|
// prevented while an ongoing major compaction doesn't release the semaphore.
|
|
|
|
|
virtual future<> do_run() override {
|
|
|
|
|
switch_state(state::pending);
|
|
|
|
|
auto units = co_await get_units(_cm._maintenance_ops_sem, 1);
|
|
|
|
|
auto lock_holder = co_await _compaction_state.lock.hold_write_lock();
|
|
|
|
|
if (!can_proceed()) {
|
|
|
|
|
co_return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// candidates are sstables that aren't being operated on by other compaction types.
|
|
|
|
|
// those are eligible for major compaction.
|
|
|
|
|
auto* t = _compacting_table;
|
|
|
|
|
sstables::compaction_strategy cs = t->get_compaction_strategy();
|
|
|
|
|
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(t->as_table_state(), _cm.get_candidates(*t));
|
|
|
|
|
auto compacting = compacting_sstable_registration(_cm, descriptor.sstables);
|
|
|
|
|
descriptor.release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
|
|
|
|
compacting.release_compacting(exhausted_sstables);
|
|
|
|
|
};
|
|
|
|
|
setup_new_compaction(descriptor.run_identifier);
|
|
|
|
|
|
|
|
|
|
cmlog.info0("User initiated compaction started on behalf of {}.{}", t->schema()->ks_name(), t->schema()->cf_name());
|
|
|
|
|
compaction_backlog_tracker bt(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory));
|
|
|
|
|
_cm.register_backlog_tracker(bt);
|
|
|
|
|
// FIXME: co_await coroutine::switch_to
|
|
|
|
|
co_await with_scheduling_group(_cm._compaction_controller.sg(), [this, t, descriptor = std::move(descriptor)] () mutable {
|
|
|
|
|
return t->compact_sstables(std::move(descriptor), _compaction_data);
|
|
|
|
|
});
|
|
|
|
|
finish_compaction();
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
future<> compaction_manager::perform_major_compaction(replica::table* t) {
|
|
|
|
|
if (_state != state::enabled) {
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto task = register_task(make_shared<major_compaction_task>(*this, t));
|
|
|
|
|
|
|
|
|
|
// first take major compaction semaphore, then exclusely take compaction lock for table.
|
|
|
|
|
// it cannot be the other way around, or minor compaction for this table would be
|
|
|
|
|
// prevented while an ongoing major compaction doesn't release the semaphore.
|
|
|
|
|
task->_compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, t] {
|
|
|
|
|
return with_lock(task->compaction_state().lock.for_write(), [this, task, t] {
|
|
|
|
|
_stats.active_tasks++;
|
|
|
|
|
if (!task->can_proceed()) {
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// candidates are sstables that aren't being operated on by other compaction types.
|
|
|
|
|
// those are eligible for major compaction.
|
|
|
|
|
sstables::compaction_strategy cs = t->get_compaction_strategy();
|
|
|
|
|
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(t->as_table_state(), get_candidates(*t));
|
|
|
|
|
auto compacting = make_lw_shared<compacting_sstable_registration>(*this, descriptor.sstables);
|
|
|
|
|
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
|
|
|
|
compacting->release_compacting(exhausted_sstables);
|
|
|
|
|
};
|
|
|
|
|
task->setup_new_compaction(descriptor.run_identifier);
|
|
|
|
|
|
|
|
|
|
cmlog.info0("User initiated compaction started on behalf of {}.{}", t->schema()->ks_name(), t->schema()->cf_name());
|
|
|
|
|
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
|
|
|
|
|
return do_with(std::move(user_initiated), [this, t, descriptor = std::move(descriptor), task] (compaction_backlog_tracker& bt) mutable {
|
|
|
|
|
register_backlog_tracker(bt);
|
|
|
|
|
return with_scheduling_group(_compaction_controller.sg(), [this, t, descriptor = std::move(descriptor), task] () mutable {
|
|
|
|
|
return t->compact_sstables(std::move(descriptor), task->compaction_data());
|
|
|
|
|
});
|
|
|
|
|
}).then([compacting = std::move(compacting)] {});
|
|
|
|
|
});
|
|
|
|
|
}).then_wrapped([this, task] (future<> f) {
|
|
|
|
|
_stats.active_tasks--;
|
|
|
|
|
_tasks.remove(task);
|
|
|
|
|
cmlog.debug("Major compaction {}: done", *task);
|
|
|
|
|
try {
|
|
|
|
|
f.get();
|
|
|
|
|
_stats.completed_tasks++;
|
|
|
|
|
} catch (sstables::compaction_stopped_exception& e) {
|
|
|
|
|
cmlog.info("major compaction stopped, reason: {}", e.what());
|
|
|
|
|
} catch (...) {
|
|
|
|
|
cmlog.error("major compaction failed, reason: {}", std::current_exception());
|
|
|
|
|
_stats.errors++;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
return task->compaction_done().then([task] {});
|
|
|
|
|
return perform_task(make_shared<major_compaction_task>(*this, t));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class compaction_manager::custom_compaction_task : public compaction_manager::task {
|
|
|
|
|
noncopyable_function<future<>(sstables::compaction_data&)> _job;
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
custom_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc)
|
|
|
|
|
custom_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc, noncopyable_function<future<>(sstables::compaction_data&)> job)
|
|
|
|
|
: task(mgr, t, type, std::move(desc))
|
|
|
|
|
, _job(std::move(job))
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
virtual future<> do_run() override {
|
|
|
|
|
if (!can_proceed(throw_if_stopping::yes)) {
|
|
|
|
|
co_return;
|
|
|
|
|
}
|
|
|
|
|
switch_state(state::pending);
|
|
|
|
|
auto units = co_await get_units(_cm._maintenance_ops_sem, 1);
|
|
|
|
|
|
|
|
|
|
if (!can_proceed(throw_if_stopping::yes)) {
|
|
|
|
|
co_return;
|
|
|
|
|
}
|
|
|
|
|
setup_new_compaction();
|
|
|
|
|
|
|
|
|
|
// NOTE:
|
|
|
|
|
// no need to register shared sstables because they're excluded from non-resharding
|
|
|
|
|
// compaction and some of them may not even belong to current shard.
|
|
|
|
|
co_await _job(compaction_data());
|
|
|
|
|
finish_compaction();
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
future<> compaction_manager::run_custom_job(replica::table* t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job) {
|
|
|
|
|
@@ -332,40 +364,7 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto task = register_task(make_shared<custom_compaction_task>(*this, t, type, desc));
|
|
|
|
|
|
|
|
|
|
auto job_ptr = std::make_unique<noncopyable_function<future<>(sstables::compaction_data&)>>(std::move(job));
|
|
|
|
|
|
|
|
|
|
task->_compaction_done = with_semaphore(_maintenance_ops_sem, 1, [this, task, &job = *job_ptr] () mutable {
|
|
|
|
|
// Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run.
|
|
|
|
|
if (task->stopping()) {
|
|
|
|
|
return make_exception_future<>(task->make_compaction_stopped_exception());
|
|
|
|
|
}
|
|
|
|
|
_stats.active_tasks++;
|
|
|
|
|
if (!task->can_proceed()) {
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}
|
|
|
|
|
task->setup_new_compaction();
|
|
|
|
|
|
|
|
|
|
// NOTE:
|
|
|
|
|
// no need to register shared sstables because they're excluded from non-resharding
|
|
|
|
|
// compaction and some of them may not even belong to current shard.
|
|
|
|
|
return job(task->compaction_data());
|
|
|
|
|
}).then_wrapped([this, task, job_ptr = std::move(job_ptr), type] (future<> f) {
|
|
|
|
|
_stats.active_tasks--;
|
|
|
|
|
_tasks.remove(task);
|
|
|
|
|
cmlog.debug("{} {}: done", type, *task);
|
|
|
|
|
try {
|
|
|
|
|
f.get();
|
|
|
|
|
} catch (sstables::compaction_stopped_exception& e) {
|
|
|
|
|
cmlog.info("{} was abruptly stopped, reason: {}", *task, e.what());
|
|
|
|
|
throw;
|
|
|
|
|
} catch (...) {
|
|
|
|
|
cmlog.error("{} failed: {}", *task, std::current_exception());
|
|
|
|
|
throw;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
return task->compaction_done().then([task] {});
|
|
|
|
|
return perform_task(make_shared<custom_compaction_task>(*this, t, type, desc, std::move(job)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<>
|
|
|
|
|
@@ -426,6 +425,19 @@ compaction_manager::task::~task() {
|
|
|
|
|
switch_state(state::none);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
compaction_manager::sstables_task::~sstables_task() {
|
|
|
|
|
_cm._stats.pending_tasks -= _sstables.size() - (_state == state::pending);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
future<> compaction_manager::task::run() noexcept {
|
|
|
|
|
try {
|
|
|
|
|
_compaction_done = do_run();
|
|
|
|
|
return compaction_done();
|
|
|
|
|
} catch (...) {
|
|
|
|
|
return current_exception_as_future();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
compaction_manager::task::state compaction_manager::task::switch_state(state new_state) {
|
|
|
|
|
auto old_state = std::exchange(_state, new_state);
|
|
|
|
|
switch (old_state) {
|
|
|
|
|
@@ -461,15 +473,40 @@ compaction_manager::task::state compaction_manager::task::switch_state(state new
|
|
|
|
|
return old_state;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void compaction_manager::sstables_task::set_sstables(std::vector<sstables::shared_sstable> new_sstables) {
|
|
|
|
|
if (!_sstables.empty()) {
|
|
|
|
|
on_internal_error(cmlog, format("sstables were already set"));
|
|
|
|
|
}
|
|
|
|
|
_sstables = std::move(new_sstables);
|
|
|
|
|
cmlog.debug("{}: set_sstables: {} sstable{}", *this, _sstables.size(), _sstables.size() > 1 ? "s" : "");
|
|
|
|
|
_cm._stats.pending_tasks += _sstables.size() - (_state == state::pending);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sstables::shared_sstable compaction_manager::sstables_task::consume_sstable() {
|
|
|
|
|
if (_sstables.empty()) {
|
|
|
|
|
on_internal_error(cmlog, format("no more sstables"));
|
|
|
|
|
}
|
|
|
|
|
auto sst = _sstables.back();
|
|
|
|
|
_sstables.pop_back();
|
|
|
|
|
--_cm._stats.pending_tasks; // from this point on, switch_state(pending|active) works the same way as any other task
|
|
|
|
|
cmlog.debug("{}", format("consumed {}", sst->get_filename()));
|
|
|
|
|
return sst;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void compaction_manager::task::setup_new_compaction(utils::UUID output_run_id) {
|
|
|
|
|
_compaction_data = create_compaction_data();
|
|
|
|
|
_compaction_running = true;
|
|
|
|
|
_output_run_identifier = output_run_id;
|
|
|
|
|
switch_state(state::active);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void compaction_manager::task::finish_compaction() noexcept {
|
|
|
|
|
void compaction_manager::task::finish_compaction(state finish_state) noexcept {
|
|
|
|
|
switch_state(finish_state);
|
|
|
|
|
_compaction_running = false;
|
|
|
|
|
_output_run_identifier = utils::null_uuid();
|
|
|
|
|
if (finish_state != state::failed) {
|
|
|
|
|
_compaction_retry.reset();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void compaction_manager::task::stop(sstring reason) noexcept {
|
|
|
|
|
@@ -581,6 +618,8 @@ void compaction_manager::postponed_compactions_reevaluation() {
|
|
|
|
|
auto postponed = std::move(_postponed);
|
|
|
|
|
try {
|
|
|
|
|
for (auto& t : postponed) {
|
|
|
|
|
auto s = t->schema();
|
|
|
|
|
cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t));
|
|
|
|
|
submit(t);
|
|
|
|
|
}
|
|
|
|
|
} catch (...) {
|
|
|
|
|
@@ -694,9 +733,12 @@ inline bool compaction_manager::can_proceed(replica::table* t) const {
|
|
|
|
|
return (_state == state::enabled) && _compaction_state.contains(t) && !_compaction_state.at(t).compaction_disabled();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inline bool compaction_manager::task::can_proceed() const {
|
|
|
|
|
// Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run.
|
|
|
|
|
inline bool compaction_manager::task::can_proceed(throw_if_stopping do_throw_if_stopping) const {
|
|
|
|
|
if (stopping()) {
|
|
|
|
|
// Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run.
|
|
|
|
|
if (do_throw_if_stopping) {
|
|
|
|
|
throw make_compaction_stopped_exception();
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return _cm.can_proceed(_compacting_table);
|
|
|
|
|
@@ -720,7 +762,7 @@ future<stop_iteration> compaction_manager::task::maybe_retry(std::exception_ptr
|
|
|
|
|
_cm._stats.errors++;
|
|
|
|
|
cmlog.error("{}: failed: {}. Will retry in {} seconds", *this, std::current_exception(),
|
|
|
|
|
std::chrono::duration_cast<std::chrono::seconds>(_compaction_retry.sleep_time()).count());
|
|
|
|
|
_cm._stats.pending_tasks++;
|
|
|
|
|
switch_state(state::pending);
|
|
|
|
|
return _compaction_retry.retry(_compaction_data.abort).handle_exception_type([this] (sleep_aborted&) {
|
|
|
|
|
return make_exception_future<>(make_compaction_stopped_exception());
|
|
|
|
|
}).then([] {
|
|
|
|
|
@@ -737,70 +779,72 @@ public:
|
|
|
|
|
regular_compaction_task(compaction_manager& mgr, replica::table* t)
|
|
|
|
|
: task(mgr, t, sstables::compaction_type::Compaction, "Compaction")
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
virtual future<> do_run() override {
|
|
|
|
|
for (auto completed = stop_iteration::no; !completed; ) {
|
|
|
|
|
if (!can_proceed()) {
|
|
|
|
|
co_return;
|
|
|
|
|
}
|
|
|
|
|
switch_state(state::pending);
|
|
|
|
|
// take read lock for table, so major and regular compaction can't proceed in parallel.
|
|
|
|
|
auto lock_holder = co_await _compaction_state.lock.hold_read_lock();
|
|
|
|
|
if (!can_proceed()) {
|
|
|
|
|
co_return;
|
|
|
|
|
}
|
|
|
|
|
// FIXME: co_await coroutine::switch_to
|
|
|
|
|
completed = co_await with_scheduling_group(_cm._compaction_controller.sg(), [this] () mutable -> future<stop_iteration> {
|
|
|
|
|
replica::table& t = *_compacting_table;
|
|
|
|
|
sstables::compaction_strategy cs = t.get_compaction_strategy();
|
|
|
|
|
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t));
|
|
|
|
|
int weight = calculate_weight(descriptor);
|
|
|
|
|
|
|
|
|
|
if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) {
|
|
|
|
|
cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user());
|
|
|
|
|
co_return stop_iteration::yes;
|
|
|
|
|
}
|
|
|
|
|
if (!_cm.can_register_compaction(&t, weight, descriptor.fan_in())) {
|
|
|
|
|
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...",
|
|
|
|
|
descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
|
|
|
|
|
finish_compaction(state::postponed);
|
|
|
|
|
_cm.postpone_compaction_for_table(&t);
|
|
|
|
|
co_return stop_iteration::yes;
|
|
|
|
|
}
|
|
|
|
|
auto compacting = compacting_sstable_registration(_cm, descriptor.sstables);
|
|
|
|
|
auto weight_r = compaction_weight_registration(&_cm, weight);
|
|
|
|
|
descriptor.release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
|
|
|
|
compacting.release_compacting(exhausted_sstables);
|
|
|
|
|
};
|
|
|
|
|
cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}.{}",
|
|
|
|
|
fmt::ptr(this), descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
|
|
|
|
|
|
|
|
|
|
setup_new_compaction(descriptor.run_identifier);
|
|
|
|
|
std::exception_ptr ex;
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
co_await t.compact_sstables(std::move(descriptor), _compaction_data);
|
|
|
|
|
finish_compaction();
|
|
|
|
|
_cm.reevaluate_postponed_compactions();
|
|
|
|
|
co_return stop_iteration::no;
|
|
|
|
|
} catch (...) {
|
|
|
|
|
ex = std::current_exception();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
finish_compaction(state::failed);
|
|
|
|
|
co_return co_await maybe_retry(std::move(ex));
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
void compaction_manager::submit(replica::table* t) {
|
|
|
|
|
if (t->is_auto_compaction_disabled_by_user()) {
|
|
|
|
|
if (_state != state::enabled || t->is_auto_compaction_disabled_by_user()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto task = register_task(make_shared<regular_compaction_task>(*this, t));
|
|
|
|
|
_stats.pending_tasks++;
|
|
|
|
|
|
|
|
|
|
task->_compaction_done = repeat([this, task, t] () mutable {
|
|
|
|
|
if (!task->can_proceed()) {
|
|
|
|
|
_stats.pending_tasks--;
|
|
|
|
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
|
|
|
}
|
|
|
|
|
// take read lock for table, so major and regular compaction can't proceed in parallel.
|
|
|
|
|
return with_lock(task->compaction_state().lock.for_read(), [this, task] () mutable {
|
|
|
|
|
return with_scheduling_group(_compaction_controller.sg(), [this, task = std::move(task)] () mutable {
|
|
|
|
|
replica::table& t = *task->_compacting_table;
|
|
|
|
|
sstables::compaction_strategy cs = t.get_compaction_strategy();
|
|
|
|
|
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), get_strategy_control(), get_candidates(t));
|
|
|
|
|
int weight = calculate_weight(descriptor);
|
|
|
|
|
|
|
|
|
|
if (descriptor.sstables.empty() || !task->can_proceed() || t.is_auto_compaction_disabled_by_user()) {
|
|
|
|
|
_stats.pending_tasks--;
|
|
|
|
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
|
|
|
}
|
|
|
|
|
if (!can_register_compaction(&t, weight, descriptor.fan_in())) {
|
|
|
|
|
_stats.pending_tasks--;
|
|
|
|
|
cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...",
|
|
|
|
|
descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
|
|
|
|
|
postpone_compaction_for_table(&t);
|
|
|
|
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
|
|
|
}
|
|
|
|
|
auto compacting = make_lw_shared<compacting_sstable_registration>(*this, descriptor.sstables);
|
|
|
|
|
auto weight_r = compaction_weight_registration(this, weight);
|
|
|
|
|
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
|
|
|
|
compacting->release_compacting(exhausted_sstables);
|
|
|
|
|
};
|
|
|
|
|
cmlog.debug("Accepted compaction job ({} sstable(s)) of weight {} for {}.{}",
|
|
|
|
|
descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name());
|
|
|
|
|
|
|
|
|
|
_stats.pending_tasks--;
|
|
|
|
|
_stats.active_tasks++;
|
|
|
|
|
task->setup_new_compaction(descriptor.run_identifier);
|
|
|
|
|
return t.compact_sstables(std::move(descriptor), task->compaction_data()).then_wrapped([this, task, compacting = std::move(compacting), weight_r = std::move(weight_r)] (future<> f) mutable {
|
|
|
|
|
_stats.active_tasks--;
|
|
|
|
|
task->finish_compaction();
|
|
|
|
|
|
|
|
|
|
if (f.failed()) {
|
|
|
|
|
return task->maybe_retry(f.get_exception());
|
|
|
|
|
}
|
|
|
|
|
_stats.pending_tasks++;
|
|
|
|
|
_stats.completed_tasks++;
|
|
|
|
|
task->_compaction_retry.reset();
|
|
|
|
|
reevaluate_postponed_compactions();
|
|
|
|
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
}).finally([this, task] {
|
|
|
|
|
_tasks.remove(task);
|
|
|
|
|
cmlog.debug("Compaction {}: done", *task);
|
|
|
|
|
});
|
|
|
|
|
// OK to drop future.
|
|
|
|
|
// waited via task->stop()
|
|
|
|
|
(void)perform_task(make_shared<regular_compaction_task>(*this, t));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class compaction_manager::offstrategy_compaction_task : public compaction_manager::task {
|
|
|
|
|
@@ -808,189 +852,187 @@ public:
|
|
|
|
|
offstrategy_compaction_task(compaction_manager& mgr, replica::table* t)
|
|
|
|
|
: task(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction")
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
virtual future<> do_run() override {
|
|
|
|
|
for (;;) {
|
|
|
|
|
if (!can_proceed()) {
|
|
|
|
|
co_return;
|
|
|
|
|
}
|
|
|
|
|
switch_state(state::pending);
|
|
|
|
|
auto units = co_await get_units(_cm._maintenance_ops_sem, 1);
|
|
|
|
|
if (!can_proceed()) {
|
|
|
|
|
co_return;
|
|
|
|
|
}
|
|
|
|
|
setup_new_compaction();
|
|
|
|
|
|
|
|
|
|
std::exception_ptr ex;
|
|
|
|
|
try {
|
|
|
|
|
co_await with_scheduling_group(_cm._maintenance_sg.cpu, [this] {
|
|
|
|
|
return _compacting_table->run_offstrategy_compaction(_compaction_data);
|
|
|
|
|
});
|
|
|
|
|
finish_compaction();
|
|
|
|
|
co_return;
|
|
|
|
|
} catch (...) {
|
|
|
|
|
ex = std::current_exception();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
finish_compaction(state::failed);
|
|
|
|
|
if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) {
|
|
|
|
|
co_return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
future<> compaction_manager::perform_offstrategy(replica::table* t) {
|
|
|
|
|
auto task = register_task(make_shared<offstrategy_compaction_task>(*this, t));
|
|
|
|
|
_stats.pending_tasks++;
|
|
|
|
|
|
|
|
|
|
task->_compaction_done = repeat([this, task, t] () mutable {
|
|
|
|
|
if (!task->can_proceed()) {
|
|
|
|
|
_stats.pending_tasks--;
|
|
|
|
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
|
|
|
}
|
|
|
|
|
return with_semaphore(_maintenance_ops_sem, 1, [this, task, t] () mutable {
|
|
|
|
|
_stats.pending_tasks--;
|
|
|
|
|
if (!task->can_proceed()) {
|
|
|
|
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
|
|
|
}
|
|
|
|
|
_stats.active_tasks++;
|
|
|
|
|
task->setup_new_compaction();
|
|
|
|
|
|
|
|
|
|
return with_scheduling_group(_maintenance_sg.cpu, [this, task, t] {
|
|
|
|
|
return t->run_offstrategy_compaction(task->compaction_data()).then_wrapped([this, task, schema = t->schema()] (future<> f) mutable {
|
|
|
|
|
_stats.active_tasks--;
|
|
|
|
|
task->finish_compaction();
|
|
|
|
|
if (!f.failed()) {
|
|
|
|
|
_stats.completed_tasks++;
|
|
|
|
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
|
|
|
}
|
|
|
|
|
return task->maybe_retry(f.get_exception());
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
}).finally([this, task] {
|
|
|
|
|
_tasks.remove(task);
|
|
|
|
|
cmlog.debug("Offstrategy compaction {}: done", *task);
|
|
|
|
|
});
|
|
|
|
|
return task->compaction_done().finally([task] {});
|
|
|
|
|
if (_state != state::enabled) {
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}
|
|
|
|
|
return perform_task(make_shared<offstrategy_compaction_task>(*this, t));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class compaction_manager::rewrite_sstables_compaction_task : public compaction_manager::task {
|
|
|
|
|
class compaction_manager::rewrite_sstables_compaction_task : public compaction_manager::sstables_task {
|
|
|
|
|
sstables::compaction_type_options _options;
|
|
|
|
|
compacting_sstable_registration _compacting;
|
|
|
|
|
can_purge_tombstones _can_purge;
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
rewrite_sstables_compaction_task(compaction_manager& mgr, replica::table* t, const sstables::compaction_type_options& options)
|
|
|
|
|
: task(mgr, t, options.type(), sstring(sstables::to_string(options.type())))
|
|
|
|
|
rewrite_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options, std::vector<sstables::shared_sstable> sstables, can_purge_tombstones can_purge)
|
|
|
|
|
: sstables_task(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables))
|
|
|
|
|
, _options(std::move(options))
|
|
|
|
|
, _compacting(mgr, _sstables)
|
|
|
|
|
, _can_purge(can_purge)
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
virtual future<> do_run() override {
|
|
|
|
|
switch_state(state::pending);
|
|
|
|
|
auto maintenance_permit = co_await seastar::get_units(_cm._maintenance_ops_sem, 1);
|
|
|
|
|
|
|
|
|
|
while (!_sstables.empty() && can_proceed()) {
|
|
|
|
|
auto sst = consume_sstable();
|
|
|
|
|
co_await rewrite_sstable(std::move(sst));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
future<> rewrite_sstable(const sstables::shared_sstable& sst) {
|
|
|
|
|
switch_state(state::active);
|
|
|
|
|
for (auto completed = stop_iteration::no; !completed; ) {
|
|
|
|
|
replica::table& t = *_compacting_table;
|
|
|
|
|
auto sstable_level = sst->get_sstable_level();
|
|
|
|
|
auto run_identifier = sst->run_identifier();
|
|
|
|
|
auto sstable_set_snapshot = _can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt;
|
|
|
|
|
// FIXME: this compaction should run with maintenance priority.
|
|
|
|
|
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(),
|
|
|
|
|
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, _options);
|
|
|
|
|
|
|
|
|
|
// Releases reference to cleaned sstable such that respective used disk space can be freed.
|
|
|
|
|
descriptor.release_exhausted = [this] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
|
|
|
|
_compacting.release_compacting(exhausted_sstables);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
setup_new_compaction(descriptor.run_identifier);
|
|
|
|
|
|
|
|
|
|
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory));
|
|
|
|
|
_cm.register_backlog_tracker(user_initiated);
|
|
|
|
|
// FIXME: use coroutine::switch_to()
|
|
|
|
|
completed = co_await with_scheduling_group(_cm._compaction_controller.sg(), [this, descriptor = std::move(descriptor)] () mutable -> future<stop_iteration> {
|
|
|
|
|
std::exception_ptr ex;
|
|
|
|
|
try {
|
|
|
|
|
co_await _compacting_table->compact_sstables(std::move(descriptor), _compaction_data);
|
|
|
|
|
finish_compaction();
|
|
|
|
|
_cm.reevaluate_postponed_compactions();
|
|
|
|
|
co_return stop_iteration::yes; // done with current sstable
|
|
|
|
|
} catch (...) {
|
|
|
|
|
ex = std::current_exception();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
finish_compaction(state::failed);
|
|
|
|
|
co_return co_await maybe_retry(std::move(ex)); // retry current sstable or rethrows exception
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
|
|
|
|
|
std::vector<sstables::shared_sstable> sstables;
|
|
|
|
|
compacting_sstable_registration compacting(*this);
|
|
|
|
|
if (_state != state::enabled) {
|
|
|
|
|
co_return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// since we might potentially have ongoing compactions, and we
|
|
|
|
|
// must ensure that all sstables created before we run are included
|
|
|
|
|
// in the re-write, we need to barrier out any previously running
|
|
|
|
|
// compaction.
|
|
|
|
|
co_await run_with_compaction_disabled(t, [&] () mutable -> future<> {
|
|
|
|
|
std::vector<sstables::shared_sstable> sstables;
|
|
|
|
|
co_await run_with_compaction_disabled(t, [this, &sstables, get_func = std::move(get_func)] () -> future<> {
|
|
|
|
|
sstables = co_await get_func();
|
|
|
|
|
compacting.register_compacting(sstables);
|
|
|
|
|
|
|
|
|
|
// sort sstables by size in descending order, such that the smallest files will be rewritten first
|
|
|
|
|
// (as sstable to be rewritten is popped off from the back of container), so rewrite will have higher
|
|
|
|
|
// chance to succeed when the biggest files are reached.
|
|
|
|
|
std::sort(sstables.begin(), sstables.end(), [](sstables::shared_sstable& a, sstables::shared_sstable& b) {
|
|
|
|
|
return a->data_size() > b->data_size();
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
// sort sstables by size in descending order, such that the smallest files will be rewritten first
|
|
|
|
|
// (as sstable to be rewritten is popped off from the back of container), so rewrite will have higher
|
|
|
|
|
// chance to succeed when the biggest files are reached.
|
|
|
|
|
std::sort(sstables.begin(), sstables.end(), [](sstables::shared_sstable& a, sstables::shared_sstable& b) {
|
|
|
|
|
return a->data_size() > b->data_size();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
auto task = register_task(make_shared<rewrite_sstables_compaction_task>(*this, t, options));
|
|
|
|
|
|
|
|
|
|
_stats.pending_tasks += sstables.size();
|
|
|
|
|
|
|
|
|
|
auto task_completion = defer([this, &task, &sstables, &options] {
|
|
|
|
|
_stats.pending_tasks -= sstables.size();
|
|
|
|
|
_tasks.remove(task);
|
|
|
|
|
cmlog.debug("{} {}: done", options.type(), *task);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
auto maintenance_permit = co_await seastar::get_units(_maintenance_ops_sem, 1);
|
|
|
|
|
|
|
|
|
|
auto rewrite_sstable = [this, &task, &options, &compacting, can_purge] (const sstables::shared_sstable& sst) mutable -> future<> {
|
|
|
|
|
stop_iteration completed = stop_iteration::no;
|
|
|
|
|
do {
|
|
|
|
|
replica::table& t = *task->_compacting_table;
|
|
|
|
|
auto sstable_level = sst->get_sstable_level();
|
|
|
|
|
auto run_identifier = sst->run_identifier();
|
|
|
|
|
auto sstable_set_snapshot = can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt;
|
|
|
|
|
// FIXME: this compaction should run with maintenance priority.
|
|
|
|
|
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(),
|
|
|
|
|
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
|
|
|
|
|
|
|
|
|
|
// Releases reference to cleaned sstable such that respective used disk space can be freed.
|
|
|
|
|
descriptor.release_exhausted = [&compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
|
|
|
|
compacting.release_compacting(exhausted_sstables);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
_stats.pending_tasks--;
|
|
|
|
|
_stats.active_tasks++;
|
|
|
|
|
task->setup_new_compaction(descriptor.run_identifier);
|
|
|
|
|
|
|
|
|
|
auto perform_rewrite = [this, &t, &descriptor, &task] () mutable -> future<stop_iteration> {
|
|
|
|
|
std::exception_ptr ex;
|
|
|
|
|
try {
|
|
|
|
|
auto compaction_completion = defer([&task, this] {
|
|
|
|
|
task->finish_compaction();
|
|
|
|
|
_stats.active_tasks--;
|
|
|
|
|
});
|
|
|
|
|
co_await t.compact_sstables(std::move(descriptor), task->compaction_data());
|
|
|
|
|
} catch (...) {
|
|
|
|
|
ex = std::current_exception();
|
|
|
|
|
}
|
|
|
|
|
if (ex) {
|
|
|
|
|
co_return co_await task->maybe_retry(std::move(ex));
|
|
|
|
|
}
|
|
|
|
|
_stats.completed_tasks++;
|
|
|
|
|
reevaluate_postponed_compactions();
|
|
|
|
|
co_return stop_iteration::yes;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
|
|
|
|
|
completed = co_await with_scheduling_group(_compaction_controller.sg(), std::ref(perform_rewrite));
|
|
|
|
|
} while (!completed);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
shared_promise<> p;
|
|
|
|
|
task->_compaction_done = p.get_shared_future();
|
|
|
|
|
try {
|
|
|
|
|
while (!sstables.empty() && task->can_proceed()) {
|
|
|
|
|
auto sst = sstables.back();
|
|
|
|
|
sstables.pop_back();
|
|
|
|
|
co_await rewrite_sstable(sst);
|
|
|
|
|
}
|
|
|
|
|
p.set_value();
|
|
|
|
|
} catch (...) {
|
|
|
|
|
p.set_exception(std::current_exception());
|
|
|
|
|
throw;
|
|
|
|
|
}
|
|
|
|
|
co_await perform_task(seastar::make_shared<rewrite_sstables_compaction_task>(*this, t, std::move(options), std::move(sstables), can_purge));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class compaction_manager::validate_sstables_compaction_task : public compaction_manager::sstables_task {
|
|
|
|
|
public:
|
|
|
|
|
validate_sstables_compaction_task(compaction_manager& mgr, replica::table* t, std::vector<sstables::shared_sstable> sstables)
|
|
|
|
|
: sstables_task(mgr, t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables))
|
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
virtual future<> do_run() override {
|
|
|
|
|
while (!_sstables.empty() && can_proceed()) {
|
|
|
|
|
auto sst = consume_sstable();
|
|
|
|
|
co_await validate_sstable(std::move(sst));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
future<> validate_sstable(const sstables::shared_sstable& sst) {
|
|
|
|
|
switch_state(state::active);
|
|
|
|
|
try {
|
|
|
|
|
// FIXME: co_await coroutine::switch_to
|
|
|
|
|
co_await with_scheduling_group(_cm._maintenance_sg.cpu, [this, sst = std::move(sst)] () {
|
|
|
|
|
auto desc = sstables::compaction_descriptor(
|
|
|
|
|
{ sst },
|
|
|
|
|
{},
|
|
|
|
|
_cm._maintenance_sg.io,
|
|
|
|
|
sst->get_sstable_level(),
|
|
|
|
|
sstables::compaction_descriptor::default_max_sstable_bytes,
|
|
|
|
|
sst->run_identifier(),
|
|
|
|
|
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate));
|
|
|
|
|
return compact_sstables(std::move(desc), _compaction_data, _compacting_table->as_table_state());
|
|
|
|
|
});
|
|
|
|
|
} catch (sstables::compaction_stopped_exception&) {
|
|
|
|
|
// ignore, will be handled by can_proceed()
|
|
|
|
|
} catch (storage_io_error& e) {
|
|
|
|
|
cmlog.error("{}: failed due to storage io error: {}: stopping", *this, e.what());
|
|
|
|
|
_cm._stats.errors++;
|
|
|
|
|
_cm.do_stop();
|
|
|
|
|
throw;
|
|
|
|
|
} catch (...) {
|
|
|
|
|
// We are validating potentially corrupt sstables, errors are
|
|
|
|
|
// expected, just continue with the other sstables when seeing
|
|
|
|
|
// one.
|
|
|
|
|
_cm._stats.errors++;
|
|
|
|
|
cmlog.error("Scrubbing in validate mode {} failed due to {}, continuing.", sst->get_filename(), std::current_exception());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
future<> compaction_manager::perform_sstable_scrub_validate_mode(replica::table* t) {
|
|
|
|
|
if (_state != state::enabled) {
|
|
|
|
|
return make_ready_future<>();
|
|
|
|
|
}
|
|
|
|
|
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
|
|
|
|
|
auto all_sstables = boost::copy_range<std::vector<sstables::shared_sstable>>(*t->get_sstables());
|
|
|
|
|
return run_custom_job(t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", [this, &t = *t, sstables = std::move(all_sstables)] (sstables::compaction_data& info) mutable -> future<> {
|
|
|
|
|
class pending_tasks {
|
|
|
|
|
compaction_manager::stats& _stats;
|
|
|
|
|
size_t _n;
|
|
|
|
|
public:
|
|
|
|
|
pending_tasks(compaction_manager::stats& stats, size_t n) : _stats(stats), _n(n) { _stats.pending_tasks += _n; }
|
|
|
|
|
~pending_tasks() { _stats.pending_tasks -= _n; }
|
|
|
|
|
void operator--(int) {
|
|
|
|
|
--_stats.pending_tasks;
|
|
|
|
|
--_n;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
pending_tasks pending(_stats, sstables.size());
|
|
|
|
|
|
|
|
|
|
while (!sstables.empty()) {
|
|
|
|
|
auto sst = sstables.back();
|
|
|
|
|
sstables.pop_back();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
co_await with_scheduling_group(_maintenance_sg.cpu, [&] () {
|
|
|
|
|
auto desc = sstables::compaction_descriptor(
|
|
|
|
|
{ sst },
|
|
|
|
|
{},
|
|
|
|
|
_maintenance_sg.io,
|
|
|
|
|
sst->get_sstable_level(),
|
|
|
|
|
sstables::compaction_descriptor::default_max_sstable_bytes,
|
|
|
|
|
sst->run_identifier(),
|
|
|
|
|
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate));
|
|
|
|
|
return compact_sstables(std::move(desc), info, t.as_table_state());
|
|
|
|
|
});
|
|
|
|
|
} catch (sstables::compaction_stopped_exception&) {
|
|
|
|
|
throw; // let run_custom_job() handle this
|
|
|
|
|
} catch (storage_io_error&) {
|
|
|
|
|
throw; // let run_custom_job() handle this
|
|
|
|
|
} catch (...) {
|
|
|
|
|
// We are validating potentially corrupt sstables, errors are
|
|
|
|
|
// expected, just continue with the other sstables when seeing
|
|
|
|
|
// one.
|
|
|
|
|
_stats.errors++;
|
|
|
|
|
cmlog.error("Scrubbing in validate mode {} failed due to {}, continuing.", sst->get_filename(), std::current_exception());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pending--;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
return perform_task(seastar::make_shared<validate_sstables_compaction_task>(*this, t, std::move(all_sstables)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool needs_cleanup(const sstables::shared_sstable& sst,
|
|
|
|
|
|