diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 4cd4f579bf..2e456a442a 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -288,6 +288,7 @@ public: void flush_segments(bool = false); private: + size_t max_request_controller_units() const; segment_id_type _ids = 0; std::vector _segments; std::deque _reserve_segments; @@ -911,7 +912,7 @@ db::commitlog::segment_manager::segment_manager(config c) // an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger // than default_size at the end of the allocation, that allows for every valid mutation to // always be admitted for processing. - , _request_controller(max_mutation_size + db::commitlog::segment::default_size) + , _request_controller(max_request_controller_units()) { assert(max_size > 0); @@ -922,6 +923,10 @@ db::commitlog::segment_manager::segment_manager(config c) _regs = create_counters(); } +size_t db::commitlog::segment_manager::max_request_controller_units() const { + return max_mutation_size + db::commitlog::segment::default_size; +} + future> db::commitlog::segment_manager::list_descriptors(sstring dirname) { struct helper { @@ -1235,9 +1240,12 @@ future<> db::commitlog::segment_manager::shutdown() { if (!_shutdown) { _shutdown = true; // no re-arm, no create new segments. _timer.cancel(); // no more timer calls - // Now first wait for periodic task to finish, then sync and close all - // segments, flushing out any remaining data. - return _gate.close().then(std::bind(&segment_manager::sync_all_segments, this, true)); + // Wait for all pending requests to finish + return get_units(_request_controller, max_request_controller_units()).then([this] (auto permits) { + // Now first wait for periodic task to finish, then sync and close all + // segments, flushing out any remaining data. + return _gate.close().then(std::bind(&segment_manager::sync_all_segments, this, true)); + }); } return make_ready_future<>(); }