dirty_memory_manager: flush_when_needed: move error handling to flush_one/seal_active_memtable
Currently flush is retried both by dirty_memory_manager::flush_when_needed and table::seal_active_memtable, which may be called by other paths like table::flush. Unify the retry logic into seal_active_memtable so that we have similar error handling semantics on all paths. Refs #4174 Refs #10498 Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -1752,12 +1752,11 @@ future<> dirty_memory_manager::flush_when_needed() {
|
||||
if (!_db) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto r = make_lw_shared<exponential_backoff_retry>(100ms, 10s);
|
||||
// If there are explicit flushes requested, we must wait for them to finish before we stop.
|
||||
return do_until([this] { return _db_shutdown_requested; }, [this, r] {
|
||||
return do_until([this] { return _db_shutdown_requested; }, [this] {
|
||||
auto has_work = [this] { return has_pressure() || _db_shutdown_requested; };
|
||||
return _should_flush.wait(std::move(has_work)).then([this, r] {
|
||||
return get_flush_permit().then([this, r] (auto permit) {
|
||||
return _should_flush.wait(std::move(has_work)).then([this] {
|
||||
return get_flush_permit().then([this] (auto permit) {
|
||||
// We give priority to explicit flushes. They are mainly user-initiated flushes,
|
||||
// flushes coming from a DROP statement, or commitlog flushes.
|
||||
if (_flush_serializer.waiters()) {
|
||||
@@ -1788,31 +1787,8 @@ future<> dirty_memory_manager::flush_when_needed() {
|
||||
// Do not wait. The semaphore will protect us against a concurrent flush. But we
|
||||
// want to start a new one as soon as the permits are destroyed and the semaphore is
|
||||
// made ready again, not when we are done with the current one.
|
||||
(void)this->flush_one(mtlist, std::move(permit)).then_wrapped([this, r](future<> f) {
|
||||
if (f.failed()) {
|
||||
auto e = f.get_exception();
|
||||
_db->cf_stats()->failed_memtables_flushes_count++;
|
||||
try {
|
||||
std::rethrow_exception(e);
|
||||
} catch (const std::bad_alloc& e) {
|
||||
// There is a chance something else will free the memory, so we can try again
|
||||
dblog.error("Flush failed due to low memory. Retrying again in {}ms", r->sleep_time().count());
|
||||
} catch (...) {
|
||||
try {
|
||||
// At this point we don't know what has happened and it's better to potentially
|
||||
// take the node down and rely on commitlog to replay.
|
||||
on_internal_error(dblog, e);
|
||||
} catch (const std::exception& ex) {
|
||||
// If the node is configured to not abort on internal error,
|
||||
// but propagate it up the chain, we can't do anything reasonable
|
||||
// at this point. The error is logged and we can try again later
|
||||
}
|
||||
}
|
||||
return r->retry();
|
||||
}
|
||||
// Clear the retry timer if the flush succeeds
|
||||
r->reset();
|
||||
return make_ready_future<>();
|
||||
(void)this->flush_one(mtlist, std::move(permit)).handle_exception([this] (std::exception_ptr ex) {
|
||||
dblog.error("Flushing memtable returned unexpected error: {}", ex);
|
||||
});
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
@@ -1080,6 +1080,9 @@ private:
|
||||
// But it is possible to synchronously wait for the seal to complete by
|
||||
// waiting on this future. This is useful in situations where we want to
|
||||
// synchronously flush data to disk.
|
||||
//
|
||||
// The function never fails.
|
||||
// It either succeeds eventually after retrying or aborts.
|
||||
future<> seal_active_memtable(flush_permit&&) noexcept;
|
||||
|
||||
void check_valid_rp(const db::replay_position&) const;
|
||||
|
||||
110
replica/table.cc
110
replica/table.cc
@@ -11,6 +11,7 @@
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/coroutine/exception.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
|
||||
#include "replica/database.hh"
|
||||
@@ -569,6 +570,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// The function never fails.
|
||||
// It either succeeds eventually after retrying or aborts.
|
||||
future<>
|
||||
table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
|
||||
auto old = _memtables->back();
|
||||
@@ -580,49 +583,94 @@ table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
|
||||
}
|
||||
|
||||
auto permit = std::move(flush_permit);
|
||||
auto r = exponential_backoff_retry(100ms, 10s);
|
||||
std::optional<utils::phased_barrier::operation> op;
|
||||
size_t memtable_size;
|
||||
future<> previous_flush = make_ready_future<>();
|
||||
|
||||
utils::get_local_injector().inject("table_seal_active_memtable_pre_flush", []() {
|
||||
throw std::bad_alloc();
|
||||
// FIXME: the retried func returns a stop_iteration value for now,
|
||||
// since try_flush_memtable_to_sstable is still doing some error handling on its own.
|
||||
auto with_retry = [&] (std::function<future<stop_iteration>()> func) -> future<> {
|
||||
for (;;) {
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
if ((co_await func()) == stop_iteration::yes) {
|
||||
co_return;
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
_config.cf_stats->failed_memtables_flushes_count++;
|
||||
if (try_catch<std::bad_alloc>(ex)) {
|
||||
// There is a chance something else will free the memory, so we can try again
|
||||
// FIXME: limit the number of retries
|
||||
} else {
|
||||
// FIXME: we should just abort on unexpected errors.
|
||||
// This is a temporary measure just to make this patch that
|
||||
// moved this error handling code from flush_when_needed
|
||||
// easier to review
|
||||
try {
|
||||
// At this point we don't know what has happened and it's better to potentially
|
||||
// take the node down and rely on commitlog to replay.
|
||||
on_internal_error(tlogger, ex);
|
||||
} catch (const std::exception& ex) {
|
||||
// If the node is configured to not abort on internal error,
|
||||
// but propagate it up the chain, we can't do anything reasonable
|
||||
// at this point. The error is logged and we can try again later
|
||||
}
|
||||
}
|
||||
}
|
||||
tlogger.warn("Memtable flush failed due to: {}. Will retry in {}ms", ex, r.sleep_time().count());
|
||||
co_await r.retry();
|
||||
}
|
||||
};
|
||||
|
||||
co_await with_retry([&] {
|
||||
tlogger.debug("seal_active_memtable: adding memtable");
|
||||
utils::get_local_injector().inject("table_seal_active_memtable_pre_flush", []() {
|
||||
throw std::bad_alloc();
|
||||
});
|
||||
|
||||
_memtables->add_memtable();
|
||||
|
||||
// no exceptions allowed (nor expected) from this point on
|
||||
_stats.memtable_switch_count++;
|
||||
[&] () noexcept {
|
||||
// This will set evictable occupancy of the old memtable region to zero, so that
|
||||
// this region is considered last for flushing by dirty_memory_manager::flush_when_needed().
|
||||
// If we don't do that, the flusher may keep picking up this memtable list for flushing after
|
||||
// the permit is released even though there is not much to flush in the active memtable of this list.
|
||||
old->region().ground_evictable_occupancy();
|
||||
memtable_size = old->occupancy().total_space();
|
||||
}();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
|
||||
_memtables->add_memtable();
|
||||
_stats.memtable_switch_count++;
|
||||
// This will set evictable occupancy of the old memtable region to zero, so that
|
||||
// this region is considered last for flushing by dirty_memory_manager::flush_when_needed().
|
||||
// If we don't do that, the flusher may keep picking up this memtable list for flushing after
|
||||
// the permit is released even though there is not much to flush in the active memtable of this list.
|
||||
old->region().ground_evictable_occupancy();
|
||||
auto previous_flush = _flush_barrier.advance_and_await();
|
||||
auto op = _flush_barrier.start();
|
||||
co_await with_retry([&] {
|
||||
previous_flush = _flush_barrier.advance_and_await();
|
||||
op = _flush_barrier.start();
|
||||
|
||||
auto memtable_size = old->occupancy().total_space();
|
||||
// no exceptions allowed (nor expected) from this point on
|
||||
_stats.pending_flushes++;
|
||||
_config.cf_stats->pending_memtables_flushes_count++;
|
||||
_config.cf_stats->pending_memtables_flushes_bytes += memtable_size;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
});
|
||||
|
||||
_stats.pending_flushes++;
|
||||
_config.cf_stats->pending_memtables_flushes_count++;
|
||||
_config.cf_stats->pending_memtables_flushes_bytes += memtable_size;
|
||||
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
for (;;) {
|
||||
auto sstable_write_permit = permit.release_sstable_write_permit();
|
||||
if (co_await this->try_flush_memtable_to_sstable(old, std::move(sstable_write_permit)) == stop_iteration::yes) {
|
||||
break;
|
||||
}
|
||||
co_await sleep(10s);
|
||||
co_await with_retry([&] () -> future<stop_iteration> {
|
||||
// Reacquiring the write permit might be needed if retrying flush
|
||||
if (!permit.has_sstable_write_permit()) {
|
||||
tlogger.debug("seal_active_memtable: reacquiring write permit");
|
||||
permit = co_await std::move(permit).reacquire_sstable_write_permit();
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
auto write_permit = permit.release_sstable_write_permit();
|
||||
|
||||
co_return co_await this->try_flush_memtable_to_sstable(old, std::move(write_permit));
|
||||
});
|
||||
|
||||
_stats.pending_flushes--;
|
||||
_config.cf_stats->pending_memtables_flushes_count--;
|
||||
_config.cf_stats->pending_memtables_flushes_bytes -= memtable_size;
|
||||
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
|
||||
if (_commitlog) {
|
||||
_commitlog->discard_completed_segments(_schema->id(), old->get_and_discard_rp_set());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user