dirty_memory_manager: flush_when_needed: move error handling to flush_one/seal_active_memtable

Currently flush is retried both by dirty_memory_manager::flush_when_needed
and table::seal_active_memtable, which may be called by other paths
like table::flush.

Unify the retry logic into seal_active_memtable so that
we have similar error handling semantics on all paths.

Refs #4174
Refs #10498

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-05-29 22:54:14 +03:00
parent 93f835a2dd
commit d55a2ac762
3 changed files with 87 additions and 60 deletions

View File

@@ -1752,12 +1752,11 @@ future<> dirty_memory_manager::flush_when_needed() {
if (!_db) {
return make_ready_future<>();
}
auto r = make_lw_shared<exponential_backoff_retry>(100ms, 10s);
// If there are explicit flushes requested, we must wait for them to finish before we stop.
return do_until([this] { return _db_shutdown_requested; }, [this, r] {
return do_until([this] { return _db_shutdown_requested; }, [this] {
auto has_work = [this] { return has_pressure() || _db_shutdown_requested; };
return _should_flush.wait(std::move(has_work)).then([this, r] {
return get_flush_permit().then([this, r] (auto permit) {
return _should_flush.wait(std::move(has_work)).then([this] {
return get_flush_permit().then([this] (auto permit) {
// We give priority to explicit flushes. They are mainly user-initiated flushes,
// flushes coming from a DROP statement, or commitlog flushes.
if (_flush_serializer.waiters()) {
@@ -1788,31 +1787,8 @@ future<> dirty_memory_manager::flush_when_needed() {
// Do not wait. The semaphore will protect us against a concurrent flush. But we
// want to start a new one as soon as the permits are destroyed and the semaphore is
// made ready again, not when we are done with the current one.
(void)this->flush_one(mtlist, std::move(permit)).then_wrapped([this, r](future<> f) {
if (f.failed()) {
auto e = f.get_exception();
_db->cf_stats()->failed_memtables_flushes_count++;
try {
std::rethrow_exception(e);
} catch (const std::bad_alloc& e) {
// There is a chance something else will free the memory, so we can try again
dblog.error("Flush failed due to low memory. Retrying again in {}ms", r->sleep_time().count());
} catch (...) {
try {
// At this point we don't know what has happened and it's better to potentially
// take the node down and rely on commitlog to replay.
on_internal_error(dblog, e);
} catch (const std::exception& ex) {
// If the node is configured to not abort on internal error,
// but propagate it up the chain, we can't do anything reasonable
// at this point. The error is logged and we can try again later
}
}
return r->retry();
}
// Clear the retry timer if the flush succeeds
r->reset();
return make_ready_future<>();
(void)this->flush_one(mtlist, std::move(permit)).handle_exception([this] (std::exception_ptr ex) {
dblog.error("Flushing memtable returned unexpected error: {}", ex);
});
return make_ready_future<>();
});

View File

@@ -1080,6 +1080,9 @@ private:
// But it is possible to synchronously wait for the seal to complete by
// waiting on this future. This is useful in situations where we want to
// synchronously flush data to disk.
//
// The function never fails.
// It either succeeds eventually after retrying or aborts.
future<> seal_active_memtable(flush_permit&&) noexcept;
void check_valid_rp(const db::replay_position&) const;

View File

@@ -11,6 +11,7 @@
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/closeable.hh>
#include "replica/database.hh"
@@ -569,6 +570,8 @@ public:
}
};
// The function never fails.
// It either succeeds eventually after retrying or aborts.
future<>
table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
auto old = _memtables->back();
@@ -580,49 +583,94 @@ table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
}
auto permit = std::move(flush_permit);
auto r = exponential_backoff_retry(100ms, 10s);
std::optional<utils::phased_barrier::operation> op;
size_t memtable_size;
future<> previous_flush = make_ready_future<>();
utils::get_local_injector().inject("table_seal_active_memtable_pre_flush", []() {
throw std::bad_alloc();
// FIXME: the retried func returns a stop_iteration value for now,
// since try_flush_memtable_to_sstable is still doing some error handling on its own.
auto with_retry = [&] (std::function<future<stop_iteration>()> func) -> future<> {
for (;;) {
std::exception_ptr ex;
try {
if ((co_await func()) == stop_iteration::yes) {
co_return;
}
} catch (...) {
ex = std::current_exception();
_config.cf_stats->failed_memtables_flushes_count++;
if (try_catch<std::bad_alloc>(ex)) {
// There is a chance something else will free the memory, so we can try again
// FIXME: limit the number of retries
} else {
// FIXME: we should just abort on unexpected errors.
// This is a temporary measure just to make this patch that
// moved this error handling code from flush_when_needed
// easier to review
try {
// At this point we don't know what has happened and it's better to potentially
// take the node down and rely on commitlog to replay.
on_internal_error(tlogger, ex);
} catch (const std::exception& ex) {
// If the node is configured to not abort on internal error,
// but propagate it up the chain, we can't do anything reasonable
// at this point. The error is logged and we can try again later
}
}
}
tlogger.warn("Memtable flush failed due to: {}. Will retry in {}ms", ex, r.sleep_time().count());
co_await r.retry();
}
};
co_await with_retry([&] {
tlogger.debug("seal_active_memtable: adding memtable");
utils::get_local_injector().inject("table_seal_active_memtable_pre_flush", []() {
throw std::bad_alloc();
});
_memtables->add_memtable();
// no exceptions allowed (nor expected) from this point on
_stats.memtable_switch_count++;
[&] () noexcept {
// This will set evictable occupancy of the old memtable region to zero, so that
// this region is considered last for flushing by dirty_memory_manager::flush_when_needed().
// If we don't do that, the flusher may keep picking up this memtable list for flushing after
// the permit is released even though there is not much to flush in the active memtable of this list.
old->region().ground_evictable_occupancy();
memtable_size = old->occupancy().total_space();
}();
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
_memtables->add_memtable();
_stats.memtable_switch_count++;
// This will set evictable occupancy of the old memtable region to zero, so that
// this region is considered last for flushing by dirty_memory_manager::flush_when_needed().
// If we don't do that, the flusher may keep picking up this memtable list for flushing after
// the permit is released even though there is not much to flush in the active memtable of this list.
old->region().ground_evictable_occupancy();
auto previous_flush = _flush_barrier.advance_and_await();
auto op = _flush_barrier.start();
co_await with_retry([&] {
previous_flush = _flush_barrier.advance_and_await();
op = _flush_barrier.start();
auto memtable_size = old->occupancy().total_space();
// no exceptions allowed (nor expected) from this point on
_stats.pending_flushes++;
_config.cf_stats->pending_memtables_flushes_count++;
_config.cf_stats->pending_memtables_flushes_bytes += memtable_size;
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
_stats.pending_flushes++;
_config.cf_stats->pending_memtables_flushes_count++;
_config.cf_stats->pending_memtables_flushes_bytes += memtable_size;
std::exception_ptr ex;
try {
for (;;) {
auto sstable_write_permit = permit.release_sstable_write_permit();
if (co_await this->try_flush_memtable_to_sstable(old, std::move(sstable_write_permit)) == stop_iteration::yes) {
break;
}
co_await sleep(10s);
co_await with_retry([&] () -> future<stop_iteration> {
// Reacquiring the write permit might be needed if retrying flush
if (!permit.has_sstable_write_permit()) {
tlogger.debug("seal_active_memtable: reacquiring write permit");
permit = co_await std::move(permit).reacquire_sstable_write_permit();
}
} catch (...) {
ex = std::current_exception();
}
auto write_permit = permit.release_sstable_write_permit();
co_return co_await this->try_flush_memtable_to_sstable(old, std::move(write_permit));
});
_stats.pending_flushes--;
_config.cf_stats->pending_memtables_flushes_count--;
_config.cf_stats->pending_memtables_flushes_bytes -= memtable_size;
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
if (_commitlog) {
_commitlog->discard_completed_segments(_schema->id(), old->get_and_discard_rp_set());
}