From 61db571a44e6ed8ec1257ea92e1b4e36b6dc6d7a Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 31 Jan 2022 12:00:56 +0000 Subject: [PATCH] commitlog: Ensure we never have more than one new_segment call at a time Refs #9896 Found by @eliransin. Call to new_segment was wrapped in with_timeout. This means that if primary caller timed out, we would leave new_segment calls running, but potentially issue new ones for next caller. This could lead to reserve segment queue being read simultanously. And it is not what we want. Change to always use the shared_future wait, all callers, and clear it only on result (exception or segment) Closes #10001 (cherry picked from commit 445e1d3e41872372035db2aa20b1d5a3623c9e4a) --- db/commitlog/commitlog.cc | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index b059a8d72c..6b59aa0536 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -1614,6 +1614,8 @@ future db::commitlog::segment_manager: } future db::commitlog::segment_manager::new_segment() { + gate::holder g(_gate); + if (_shutdown) { co_return coroutine::make_exception(std::runtime_error("Commitlog has been shut down. Cannot add data")); } @@ -1648,20 +1650,23 @@ future db::commitlog::segment_manager: co_return _segments.back(); } - if (_segment_allocating) { - co_await _segment_allocating->get_future(timeout); - continue; + // #9896 - we don't want to issue a new_segment call until + // the old one has terminated with either result or exception. + // Do all waiting through the shared_future + if (!_segment_allocating) { + _segment_allocating.emplace(new_segment().discard_result()); } - - promise<> p; - _segment_allocating.emplace(p.get_future()); - auto finally = defer([&] () noexcept { _segment_allocating = std::nullopt; }); try { - gate::holder g(_gate); - auto s = co_await with_timeout(timeout, new_segment()); - p.set_value(); - } catch (...) { - p.set_exception(std::current_exception()); + co_await _segment_allocating->get_future(timeout); + // once we've managed to get a result, any of us, the + // shared_future should be released. + _segment_allocating = std::nullopt; + } catch (timed_out_error&) { + throw; // not thrown by new_segment. Just no result yet. + } catch (...) { + // once we've managed to get a result, any of us, the + // shared_future should be released. + _segment_allocating = std::nullopt; throw; } }