commitlog: Ensure we never have more than one new_segment call at a time

Refs #9896

Found by @eliransin. Call to new_segment was wrapped in with_timeout.
This means that if primary caller timed out, we would leave new_segment
calls running, but potentially issue new ones for next caller.

This could lead to reserve segment queue being read simultanously. And
it is not what we want.

Change to always use the shared_future wait, all callers, and clear it
only on result (exception or segment)

Closes #10001
This commit is contained in:
Calle Wilund
2022-01-31 12:00:56 +00:00
committed by Nadav Har'El
parent 8a745593a2
commit 445e1d3e41

View File

@@ -1586,6 +1586,8 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::new_segment() {
gate::holder g(_gate);
if (_shutdown) {
co_return coroutine::make_exception(std::runtime_error("Commitlog has been shut down. Cannot add data"));
}
@@ -1620,20 +1622,23 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
co_return _segments.back();
}
if (_segment_allocating) {
co_await _segment_allocating->get_future(timeout);
continue;
// #9896 - we don't want to issue a new_segment call until
// the old one has terminated with either result or exception.
// Do all waiting through the shared_future
if (!_segment_allocating) {
_segment_allocating.emplace(new_segment().discard_result());
}
promise<> p;
_segment_allocating.emplace(p.get_future());
auto finally = defer([&] () noexcept { _segment_allocating = std::nullopt; });
try {
gate::holder g(_gate);
auto s = co_await with_timeout(timeout, new_segment());
p.set_value();
} catch (...) {
p.set_exception(std::current_exception());
co_await _segment_allocating->get_future(timeout);
// once we've managed to get a result, any of us, the
// shared_future should be released.
_segment_allocating = std::nullopt;
} catch (timed_out_error&) {
throw; // not thrown by new_segment. Just no result yet.
} catch (...) {
// once we've managed to get a result, any of us, the
// shared_future should be released.
_segment_allocating = std::nullopt;
throw;
}
}