mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 09:30:45 +00:00
Merge "retry policy for compaction" from Raphael
This commit is contained in:
66
database.cc
66
database.cc
@@ -440,22 +440,8 @@ column_family::stop() {
|
||||
|
||||
return _in_flight_seals.close().then([this] {
|
||||
_compaction_sem.broken();
|
||||
return _compaction_done.then_wrapped([this] (future<> f) {
|
||||
// NOTE: broken_semaphore and seastar::gate_closed_exception exceptions
|
||||
// are used for regular termination of compaction fiber.
|
||||
try {
|
||||
f.get();
|
||||
} catch (broken_semaphore& e) {
|
||||
dblog.info("compaction for column_family {}/{} not restarted due to shutdown", _schema->ks_name(), _schema->cf_name());
|
||||
} catch (seastar::gate_closed_exception& e) {
|
||||
dblog.info("compaction for column_family {}/{} not restarted due to shutdown", _schema->ks_name(), _schema->cf_name());
|
||||
} catch (std::exception& e) {
|
||||
dblog.error("compaction failed: {}", e.what());
|
||||
throw;
|
||||
} catch (...) {
|
||||
dblog.error("compaction failed: unknown error");
|
||||
throw;
|
||||
}
|
||||
return _compaction_done.then([] {
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -533,10 +519,56 @@ void column_family::start_compaction() {
|
||||
sstables::compaction_strategy strategy = _compaction_strategy;
|
||||
return do_with(std::move(strategy), [this] (sstables::compaction_strategy& cs) {
|
||||
dblog.info("started compaction for column_family {}/{}", _schema->ks_name(), _schema->cf_name());
|
||||
return cs.compact(*this);
|
||||
return cs.compact(*this).then([this] {
|
||||
// If compaction completed successfully, let's reset sleep time of _compaction_retry.
|
||||
_compaction_retry.reset();
|
||||
});
|
||||
});
|
||||
});
|
||||
}).then_wrapped([this] (future<> f) {
|
||||
bool retry = false;
|
||||
|
||||
// NOTE: broken_semaphore and seastar::gate_closed_exception exceptions
|
||||
// are used for regular termination of compaction fiber.
|
||||
try {
|
||||
f.get();
|
||||
} catch (broken_semaphore& e) {
|
||||
dblog.info("compaction for column_family {}/{} not restarted due to shutdown", _schema->ks_name(), _schema->cf_name());
|
||||
throw;
|
||||
} catch (seastar::gate_closed_exception& e) {
|
||||
dblog.info("compaction for column_family {}/{} not restarted due to shutdown", _schema->ks_name(), _schema->cf_name());
|
||||
throw;
|
||||
} catch (std::exception& e) {
|
||||
dblog.error("compaction for column_family {}/{} failed: {}", _schema->ks_name(), _schema->cf_name(), e.what());
|
||||
retry = true;
|
||||
} catch (...) {
|
||||
dblog.error("compaction for column_family {}/{} failed: unknown error", _schema->ks_name(), _schema->cf_name());
|
||||
retry = true;
|
||||
}
|
||||
|
||||
if (retry) {
|
||||
dblog.info("compaction task for column_family {}/{} sleeping for {} seconds",
|
||||
_schema->ks_name(), _schema->cf_name(), std::chrono::duration_cast<std::chrono::seconds>(_compaction_retry.sleep_time()).count());
|
||||
return _compaction_retry.retry().then([this] {
|
||||
// after sleeping, signal semaphore for the next compaction attempt.
|
||||
_compaction_sem.signal();
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then_wrapped([this] (future<> f) {
|
||||
// here, we ignore both broken_semaphore and seastar::gate_closed_exception that
|
||||
// were used for regular termination of the compaction fiber.
|
||||
try {
|
||||
f.get();
|
||||
} catch (broken_semaphore& e) {
|
||||
// exception logged in keep_doing.
|
||||
} catch (seastar::gate_closed_exception& e) {
|
||||
// exception logged in keep_doing.
|
||||
} catch (...) {
|
||||
// this shouldn't happen, let's log it anyway.
|
||||
dblog.error("compaction for column_family {}/{} failed: unexpected error", _schema->ks_name(), _schema->cf_name());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@
|
||||
#include "mutation_reader.hh"
|
||||
#include "row_cache.hh"
|
||||
#include "compaction_strategy.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
|
||||
class frozen_mutation;
|
||||
class reconcilable_result;
|
||||
@@ -101,6 +102,7 @@ private:
|
||||
sstables::compaction_strategy _compaction_strategy;
|
||||
future<> _compaction_done = make_ready_future<>();
|
||||
semaphore _compaction_sem;
|
||||
exponential_backoff_retry _compaction_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300));
|
||||
private:
|
||||
void add_sstable(sstables::sstable&& sstable);
|
||||
void add_memtable();
|
||||
|
||||
39
utils/exponential_backoff_retry.hh
Normal file
39
utils/exponential_backoff_retry.hh
Normal file
@@ -0,0 +1,39 @@
|
||||
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "core/sleep.hh"
|
||||
#include <chrono>
|
||||
|
||||
// Implements retry policy that exponentially increases sleep time between retries.
|
||||
class exponential_backoff_retry {
|
||||
std::chrono::milliseconds _base_sleep_time;
|
||||
std::chrono::milliseconds _sleep_time;
|
||||
std::chrono::milliseconds _max_sleep_time;
|
||||
public:
|
||||
exponential_backoff_retry(std::chrono::milliseconds base_sleep_time, std::chrono::milliseconds max_sleep_time)
|
||||
: _base_sleep_time(std::min(base_sleep_time, max_sleep_time))
|
||||
, _sleep_time(_base_sleep_time)
|
||||
, _max_sleep_time(max_sleep_time) {}
|
||||
|
||||
future<> retry() {
|
||||
auto old_sleep_time = _sleep_time;
|
||||
// calculating sleep time seconds for the next retry.
|
||||
_sleep_time = std::min(_sleep_time * 2, _max_sleep_time);
|
||||
|
||||
return sleep(old_sleep_time);
|
||||
}
|
||||
|
||||
// Return sleep time in seconds to be used for next retry.
|
||||
std::chrono::milliseconds sleep_time() const {
|
||||
return _sleep_time;
|
||||
}
|
||||
|
||||
// Reset sleep time to base sleep time.
|
||||
void reset() {
|
||||
_sleep_time = _base_sleep_time;
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user