diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index fb6f6dab26..ad1fffeca5 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -316,6 +316,7 @@ public: uint64_t buffer_list_bytes = 0; // size on disk, actually used - i.e. containing data (allocate+cycle) uint64_t active_size_on_disk = 0; + uint64_t wasted_size_on_disk = 0; // size allocated on disk - i.e. files created (new, reserve, recycled) uint64_t total_size_on_disk = 0; uint64_t requests_blocked_memory = 0; @@ -419,7 +420,11 @@ public: void flush_segments(uint64_t size_to_remove); private: + class shutdown_marker{}; + future<> clear_reserve_segments(); + void abort_recycled_list(std::exception_ptr); + void abort_deletion_promise(std::exception_ptr); future<> rename_file(sstring, sstring) const; size_t max_request_controller_units() const; @@ -433,6 +438,7 @@ private: timer _timer; future<> replenish_reserve(); future<> _reserve_replenisher; + future<> _background_sync; seastar::gate _gate; uint64_t _new_counter = 0; }; @@ -541,6 +547,9 @@ class db::commitlog::segment : public enable_shared_from_this, public c void end_flush() { _segment_manager->end_flush(); + if (can_delete()) { + _segment_manager->discard_unused_segments(); + } } public: @@ -584,6 +593,7 @@ public: clogger.debug("Segment {} is no longer active and will submitted for delete now", *this); ++_segment_manager->totals.segments_destroyed; _segment_manager->totals.active_size_on_disk -= file_position(); + _segment_manager->totals.wasted_size_on_disk -= (_size_on_disk - file_position()); _segment_manager->add_file_to_delete(_file_name, _desc); } else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) { clogger.warn("Segment {} is dirty and is left on disk.", *this); @@ -695,7 +705,14 @@ public: } future close() { _closed = true; - return sync().then([] (sseg_ptr s) { return s->flush(); }).then([] (sseg_ptr s) { return s->terminate(); }); + return sync().then([] (sseg_ptr s) { + return s->flush(); + }).then([](sseg_ptr s) { + return s->terminate(); + }).then([](sseg_ptr s) { + s->_segment_manager->totals.wasted_size_on_disk += (s->_size_on_disk - s->file_position()); + return s; + }); } future do_flush(uint64_t pos) { auto me = shared_from_this(); @@ -1143,7 +1160,9 @@ db::commitlog::segment_manager::segment_manager(config c) // our threshold for trying to force a flush. needs heristics, for now max - segment_size/2. , disk_usage_threshold(cfg.commitlog_flush_threshold_in_mb.has_value() ? size_t(std::ceil(*cfg.commitlog_flush_threshold_in_mb / double(smp::count))) * 1024 * 1024 - : (max_disk_size - (max_disk_size > (max_size/2) ? (max_size/2) : 0))) + : (max_disk_size - + (max_disk_size >= (max_size*2) ? max_size + : (max_disk_size > (max_size/2) ? (max_size/2) : max_disk_size/3)))) , _flush_semaphore(cfg.max_active_flushes) // That is enough concurrency to allow for our largest mutation (max_mutation_size), plus // an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger @@ -1153,6 +1172,7 @@ db::commitlog::segment_manager::segment_manager(config c) , _reserve_segments(1) , _recycled_segments(std::numeric_limits::max()) , _reserve_replenisher(make_ready_future<>()) + , _background_sync(make_ready_future<>()) { assert(max_size > 0); assert(max_mutation_size < segment::multi_entry_size_magic); @@ -1190,6 +1210,12 @@ future<> db::commitlog::segment_manager::replenish_reserve() { return make_ready_future<>(); }); }).handle_exception([](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (shutdown_marker&) { + return make_ready_future<>(); + } catch (...) { + } clogger.warn("Exception in segment reservation: {}", ep); return sleep(100ms); }); @@ -1334,6 +1360,10 @@ void db::commitlog::segment_manager::create_counters(const sstring& metrics_cate sm::description("Holds a size of disk space in bytes used for data so far. " "A too high value indicates that we have some bottleneck in the writing to sstables path.")), + sm::make_gauge("disk_slack_end_bytes", totals.wasted_size_on_disk, + sm::description("Holds a size of disk space in bytes unused because of segment switching (end slack). " + "A too high value indicates that we do not write enough data to each segment.")), + sm::make_gauge("memory_buffer_bytes", totals.buffer_list_bytes, sm::description("Holds the total number of bytes in internal memory buffers.")), }); @@ -1370,7 +1400,8 @@ void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) { // Now get a set of used CF ids: std::unordered_set ids; - std::for_each(_segments.begin(), _segments.end() - 1, [&ids](sseg_ptr& s) { + auto e = std::find_if(_segments.begin(), _segments.end(), std::mem_fn(&segment::is_still_allocating)); + std::for_each(_segments.begin(), e, [&ids](sseg_ptr& s) { for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) { ids.insert(id); } @@ -1516,7 +1547,17 @@ future db::commitlog::segment_manager: if (!cfg.allow_going_over_size_limit && max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) { clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024)); auto f = cfg.reuse_segments ? _recycled_segments.not_empty() : _disk_deletions.get_shared_future(); + if (!f.available()) { + _new_counter = 0; // zero this so timer task does not duplicate the below flush + flush_segments(0); // force memtable flush already + } return f.handle_exception([this](auto ep) { + try { + std::rethrow_exception(ep); + } catch (shutdown_marker&) { + throw; + } catch (...) { + } clogger.warn("Exception while waiting for segments {}. Will retry allocation...", ep); }).then([this] { return allocate_segment(); @@ -1540,7 +1581,8 @@ future db::commitlog::segment_manager: clogger.debug("Increased segment reserve count to {}", _reserve_segments.max_size()); } // if we have no reserve and we're above/at limits, make background task a little more eager. - if (!_shutdown && totals.total_size_on_disk >= disk_usage_threshold) { + auto cur = totals.active_size_on_disk + totals.wasted_size_on_disk; + if (!_shutdown && cur >= disk_usage_threshold) { _timer.cancel(); _timer.arm(std::chrono::milliseconds(0)); } @@ -1708,14 +1750,28 @@ future<> db::commitlog::segment_manager::shutdown() { return std::move(block_new_requests).then([this] (auto permits) { _timer.cancel(); // no more timer calls _shutdown = true; // no re-arm, no create new segments. + + // do a discard + delete sweep to force + // gate holder (i.e. replenish) to wake up + discard_unused_segments(); + auto f = do_pending_deletes().then([this] { + auto ep = std::make_exception_ptr(shutdown_marker{}); + if (_recycled_segments.empty()) { + abort_recycled_list(ep); + } + abort_deletion_promise(ep); + return std::exchange(_background_sync, make_ready_future<>()); + }); + + // Now first wait for periodic task to finish, then sync and close all // segments, flushing out any remaining data. - return _gate.close().finally([this, permits = std::move(permits)] () mutable { - return shutdown_all_segments().handle_exception([permits = std::move(permits)] (std::exception_ptr ex) { + return _gate.close().then([this, f = std::move(f)]() mutable { + return std::move(f).then(std::bind(&segment_manager::shutdown_all_segments, this)).handle_exception([](std::exception_ptr ex) { clogger.error("Shutting down all segments failed during shutdown: {}. Aborting.", ex); abort(); }); - }); + }).finally([permits = std::move(permits)] { }); }); }).finally([this] { discard_unused_segments(); @@ -1778,7 +1834,7 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi } // We allow reuse of the segment if the current disk size is less than shard max. - if (!_shutdown && cfg.reuse_segments) { + if (cfg.reuse_segments) { auto usage = totals.total_size_on_disk; auto recycle = usage <= max_disk_size; @@ -1826,12 +1882,22 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi // for new allocs at least. Or more likely, everything is broken, but // we will at least make more noise. if (recycle_error && _recycled_segments.empty()) { - _recycled_segments.abort(recycle_error); - // and ensure next lap(s) still has a queue - _recycled_segments = queue(std::numeric_limits::max()); + abort_recycled_list(recycle_error); } } +void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) { + // may not call here with elements in list. that would leak files. + assert(_recycled_segments.empty()); + _recycled_segments.abort(ep); + // and ensure next lap(s) still has a queue + _recycled_segments = queue(std::numeric_limits::max()); +} + +void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr ep) { + std::exchange(_disk_deletions, {}).set_exception(ep); +} + future<> db::commitlog::segment_manager::do_pending_deletes() { auto ftc = std::exchange(_files_to_close, {}); auto ftd = std::exchange(_files_to_delete, {}); @@ -1868,9 +1934,12 @@ future<> db::commitlog::segment_manager::clear() { * Called by timer in periodic mode. */ void db::commitlog::segment_manager::sync() { - for (auto s : _segments) { - (void)s->sync(); // we do not care about waiting... - } + auto f = std::exchange(_background_sync, make_ready_future<>()); + _background_sync = parallel_for_each(_segments, [](sseg_ptr s) { + return s->sync().discard_result(); + }).then([f = std::move(f)]() mutable { + return std::move(f); + }); } void db::commitlog::segment_manager::on_timer() { @@ -1885,10 +1954,11 @@ void db::commitlog::segment_manager::on_timer() { // above threshold, request flush. if (_new_counter > 0) { auto max = disk_usage_threshold; - auto cur = totals.active_size_on_disk; + auto cur = totals.active_size_on_disk + totals.wasted_size_on_disk; + if (max != 0 && cur >= max) { - _new_counter = 0; clogger.debug("Used size on disk {} MB exceeds local threshold {} MB", cur / (1024 * 1024), max / (1024 * 1024)); + _new_counter = 0; flush_segments(cur - max); } } @@ -2512,7 +2582,10 @@ uint64_t db::commitlog::disk_footprint() const { } uint64_t db::commitlog::get_total_size() const { - return _segment_manager->totals.active_size_on_disk + _segment_manager->totals.buffer_list_bytes; + return _segment_manager->totals.active_size_on_disk + + _segment_manager->totals.wasted_size_on_disk + + _segment_manager->totals.buffer_list_bytes + ; } uint64_t db::commitlog::get_completed_tasks() const { diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index f86a992b18..650a2731bc 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -250,7 +250,7 @@ SEASTAR_TEST_CASE(test_commitlog_discard_completed_segments){ }).then([&log] { return log.shutdown().then([&log] { return log.list_existing_segments().then([] (auto descs) { - BOOST_REQUIRE(descs.empty()); + BOOST_CHECK_EQUAL(descs, decltype(descs){}); }); }); }); @@ -821,3 +821,57 @@ SEASTAR_TEST_CASE(test_commitlog_deadlock_in_recycle) { co_await log.clear(); } } + +SEASTAR_TEST_CASE(test_commitlog_deadlock_with_flush_threshold) { + commitlog::config cfg; + + constexpr auto max_size_mb = 1; + + cfg.commitlog_segment_size_in_mb = max_size_mb; + cfg.commitlog_total_space_in_mb = 2 * max_size_mb * smp::count; + cfg.commitlog_sync_period_in_ms = 10; + cfg.reuse_segments = true; + cfg.allow_going_over_size_limit = false; + cfg.use_o_dsync = true; // make sure we pre-allocate. + + // not using cl_test, because we need to be able to abandon + // the log. + + tmpdir tmp; + cfg.commit_log_location = tmp.path().string(); + auto log = co_await commitlog::create_commitlog(cfg); + + rp_set rps; + // uncomment for verbosity + // logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug); + + auto uuid = utils::UUID_gen::get_time_UUID(); + auto size = log.max_record_size(); + + bool done = false; + + auto r = log.add_flush_handler([&](cf_id_type id, replay_position pos) { + log.discard_completed_segments(id, rps); + done = true; + }); + + bool release = true; + + try { + while (!done) { + auto now = timeout_clock::now(); + rp_handle h = co_await with_timeout(now + 30s, log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + dst.fill('1', size); + })); + rps.put(std::move(h)); + } + } catch (timed_out_error&) { + BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow..."); + release = false; + } + + if (release) { + co_await log.shutdown(); + co_await log.clear(); + } +}