From f89f4e69a0d209b0646e9804db281166fc679081 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 23 Jun 2021 15:43:57 +0300 Subject: [PATCH] =?UTF-8?q?Merge=20'Commitlog:=20Handle=20disk=20usage=20a?= =?UTF-8?q?nd=20disk=20footprint=20discrepancies,=20ensuring=20we=20flush?= =?UTF-8?q?=20when=20needed=20(#8695)=E2=80=8B=20(v3)'=20from=20Calle=20Wi?= =?UTF-8?q?lund?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #8270 If we have an allocation pattern where we leave large parts of segments "wasted" (typically because the segment has empty space, but cannot hold the mutation being added), we can have a disk usage that is below threshold, yet still get a disk footprint that is over limit causing new segment allocation to stall. We need to take a few things into account: 1.) Need to include wasted space in the threshold check. Whether or not disk is actually used does not matter here. 2.) If we stall a segment alloc, we should just flush immediately. No point in waiting for the timer task. 3.) Need to adjust the thresholds a bit. Depending on sizes, we should probably consider start flushing once we've used up space enough to be in the last available segment, so a new one is hopefully available by the time we hit the limit. 4.) (v2) Must ensure discard/delete routines are executed. Because we can race with background disk syncs, we may need to issue segment prunes from end_flush() so we wake up actual file deletion/recycling 5.) (v2) Shutdown must ensure discard/delete is run after we've disabled background task etc, otherwise we might fail waking up replenish and get stuck in gate 6.) (v2) Recycling or deleting segments must be consistent, regardless of shutdown. For same reason as above. 7.) (v3) Signal recycle/delete queues/promise on shutdown (with recognized marker) to handle edge case where we only have a single (allocating) segment in the list, and cannot wake up replenisher in any more civilized way. Also fix edge case (for tests), when we have too few segment to have an active one (i.e. need flush everything). New attempt at this, should fix intermittent shutdown deadlocks in commitlog_test. Closes #8764 * github.com:scylladb/scylla: commitlog_test: Add test case for usage/disk size threshold mismatch commitlog_test: Improve test assertion commitlog: Add waitable future for background sync/flush commitlog: abort queues on shutdown commitlog: break out "abort" calls into member functions commitlog: Do explicit discard+delete in shutdown commitlog: Recycle or not should not depend on shutdown state commitlog: Issue discard_unused_segments on segment::flush end IFF deletable commitlog: Flush all segments if we only have one. commitlog: Always force flush if segment allocation is waiting commitlog: Include segment wasted (slack) size in footprint check commitlog: Adjust (lower) usage threshold (cherry picked from commit 14252c8b71d746d02e6af9d0aeaa2c8ad32fe1bb) --- db/commitlog/commitlog.cc | 107 +++++++++++++++++++++++++++++------ test/boost/commitlog_test.cc | 56 +++++++++++++++++- 2 files changed, 145 insertions(+), 18 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index fb6f6dab26..ad1fffeca5 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -316,6 +316,7 @@ public: uint64_t buffer_list_bytes = 0; // size on disk, actually used - i.e. containing data (allocate+cycle) uint64_t active_size_on_disk = 0; + uint64_t wasted_size_on_disk = 0; // size allocated on disk - i.e. files created (new, reserve, recycled) uint64_t total_size_on_disk = 0; uint64_t requests_blocked_memory = 0; @@ -419,7 +420,11 @@ public: void flush_segments(uint64_t size_to_remove); private: + class shutdown_marker{}; + future<> clear_reserve_segments(); + void abort_recycled_list(std::exception_ptr); + void abort_deletion_promise(std::exception_ptr); future<> rename_file(sstring, sstring) const; size_t max_request_controller_units() const; @@ -433,6 +438,7 @@ private: timer _timer; future<> replenish_reserve(); future<> _reserve_replenisher; + future<> _background_sync; seastar::gate _gate; uint64_t _new_counter = 0; }; @@ -541,6 +547,9 @@ class db::commitlog::segment : public enable_shared_from_this, public c void end_flush() { _segment_manager->end_flush(); + if (can_delete()) { + _segment_manager->discard_unused_segments(); + } } public: @@ -584,6 +593,7 @@ public: clogger.debug("Segment {} is no longer active and will submitted for delete now", *this); ++_segment_manager->totals.segments_destroyed; _segment_manager->totals.active_size_on_disk -= file_position(); + _segment_manager->totals.wasted_size_on_disk -= (_size_on_disk - file_position()); _segment_manager->add_file_to_delete(_file_name, _desc); } else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) { clogger.warn("Segment {} is dirty and is left on disk.", *this); @@ -695,7 +705,14 @@ public: } future close() { _closed = true; - return sync().then([] (sseg_ptr s) { return s->flush(); }).then([] (sseg_ptr s) { return s->terminate(); }); + return sync().then([] (sseg_ptr s) { + return s->flush(); + }).then([](sseg_ptr s) { + return s->terminate(); + }).then([](sseg_ptr s) { + s->_segment_manager->totals.wasted_size_on_disk += (s->_size_on_disk - s->file_position()); + return s; + }); } future do_flush(uint64_t pos) { auto me = shared_from_this(); @@ -1143,7 +1160,9 @@ db::commitlog::segment_manager::segment_manager(config c) // our threshold for trying to force a flush. needs heristics, for now max - segment_size/2. , disk_usage_threshold(cfg.commitlog_flush_threshold_in_mb.has_value() ? size_t(std::ceil(*cfg.commitlog_flush_threshold_in_mb / double(smp::count))) * 1024 * 1024 - : (max_disk_size - (max_disk_size > (max_size/2) ? (max_size/2) : 0))) + : (max_disk_size - + (max_disk_size >= (max_size*2) ? max_size + : (max_disk_size > (max_size/2) ? (max_size/2) : max_disk_size/3)))) , _flush_semaphore(cfg.max_active_flushes) // That is enough concurrency to allow for our largest mutation (max_mutation_size), plus // an existing in-flight buffer. Since we'll force the cycling() of any buffer that is bigger @@ -1153,6 +1172,7 @@ db::commitlog::segment_manager::segment_manager(config c) , _reserve_segments(1) , _recycled_segments(std::numeric_limits::max()) , _reserve_replenisher(make_ready_future<>()) + , _background_sync(make_ready_future<>()) { assert(max_size > 0); assert(max_mutation_size < segment::multi_entry_size_magic); @@ -1190,6 +1210,12 @@ future<> db::commitlog::segment_manager::replenish_reserve() { return make_ready_future<>(); }); }).handle_exception([](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (shutdown_marker&) { + return make_ready_future<>(); + } catch (...) { + } clogger.warn("Exception in segment reservation: {}", ep); return sleep(100ms); }); @@ -1334,6 +1360,10 @@ void db::commitlog::segment_manager::create_counters(const sstring& metrics_cate sm::description("Holds a size of disk space in bytes used for data so far. " "A too high value indicates that we have some bottleneck in the writing to sstables path.")), + sm::make_gauge("disk_slack_end_bytes", totals.wasted_size_on_disk, + sm::description("Holds a size of disk space in bytes unused because of segment switching (end slack). " + "A too high value indicates that we do not write enough data to each segment.")), + sm::make_gauge("memory_buffer_bytes", totals.buffer_list_bytes, sm::description("Holds the total number of bytes in internal memory buffers.")), }); @@ -1370,7 +1400,8 @@ void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) { // Now get a set of used CF ids: std::unordered_set ids; - std::for_each(_segments.begin(), _segments.end() - 1, [&ids](sseg_ptr& s) { + auto e = std::find_if(_segments.begin(), _segments.end(), std::mem_fn(&segment::is_still_allocating)); + std::for_each(_segments.begin(), e, [&ids](sseg_ptr& s) { for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) { ids.insert(id); } @@ -1516,7 +1547,17 @@ future db::commitlog::segment_manager: if (!cfg.allow_going_over_size_limit && max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) { clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024)); auto f = cfg.reuse_segments ? _recycled_segments.not_empty() : _disk_deletions.get_shared_future(); + if (!f.available()) { + _new_counter = 0; // zero this so timer task does not duplicate the below flush + flush_segments(0); // force memtable flush already + } return f.handle_exception([this](auto ep) { + try { + std::rethrow_exception(ep); + } catch (shutdown_marker&) { + throw; + } catch (...) { + } clogger.warn("Exception while waiting for segments {}. Will retry allocation...", ep); }).then([this] { return allocate_segment(); @@ -1540,7 +1581,8 @@ future db::commitlog::segment_manager: clogger.debug("Increased segment reserve count to {}", _reserve_segments.max_size()); } // if we have no reserve and we're above/at limits, make background task a little more eager. - if (!_shutdown && totals.total_size_on_disk >= disk_usage_threshold) { + auto cur = totals.active_size_on_disk + totals.wasted_size_on_disk; + if (!_shutdown && cur >= disk_usage_threshold) { _timer.cancel(); _timer.arm(std::chrono::milliseconds(0)); } @@ -1708,14 +1750,28 @@ future<> db::commitlog::segment_manager::shutdown() { return std::move(block_new_requests).then([this] (auto permits) { _timer.cancel(); // no more timer calls _shutdown = true; // no re-arm, no create new segments. + + // do a discard + delete sweep to force + // gate holder (i.e. replenish) to wake up + discard_unused_segments(); + auto f = do_pending_deletes().then([this] { + auto ep = std::make_exception_ptr(shutdown_marker{}); + if (_recycled_segments.empty()) { + abort_recycled_list(ep); + } + abort_deletion_promise(ep); + return std::exchange(_background_sync, make_ready_future<>()); + }); + + // Now first wait for periodic task to finish, then sync and close all // segments, flushing out any remaining data. - return _gate.close().finally([this, permits = std::move(permits)] () mutable { - return shutdown_all_segments().handle_exception([permits = std::move(permits)] (std::exception_ptr ex) { + return _gate.close().then([this, f = std::move(f)]() mutable { + return std::move(f).then(std::bind(&segment_manager::shutdown_all_segments, this)).handle_exception([](std::exception_ptr ex) { clogger.error("Shutting down all segments failed during shutdown: {}. Aborting.", ex); abort(); }); - }); + }).finally([permits = std::move(permits)] { }); }); }).finally([this] { discard_unused_segments(); @@ -1778,7 +1834,7 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi } // We allow reuse of the segment if the current disk size is less than shard max. - if (!_shutdown && cfg.reuse_segments) { + if (cfg.reuse_segments) { auto usage = totals.total_size_on_disk; auto recycle = usage <= max_disk_size; @@ -1826,12 +1882,22 @@ future<> db::commitlog::segment_manager::delete_segments(std::vector fi // for new allocs at least. Or more likely, everything is broken, but // we will at least make more noise. if (recycle_error && _recycled_segments.empty()) { - _recycled_segments.abort(recycle_error); - // and ensure next lap(s) still has a queue - _recycled_segments = queue(std::numeric_limits::max()); + abort_recycled_list(recycle_error); } } +void db::commitlog::segment_manager::abort_recycled_list(std::exception_ptr ep) { + // may not call here with elements in list. that would leak files. + assert(_recycled_segments.empty()); + _recycled_segments.abort(ep); + // and ensure next lap(s) still has a queue + _recycled_segments = queue(std::numeric_limits::max()); +} + +void db::commitlog::segment_manager::abort_deletion_promise(std::exception_ptr ep) { + std::exchange(_disk_deletions, {}).set_exception(ep); +} + future<> db::commitlog::segment_manager::do_pending_deletes() { auto ftc = std::exchange(_files_to_close, {}); auto ftd = std::exchange(_files_to_delete, {}); @@ -1868,9 +1934,12 @@ future<> db::commitlog::segment_manager::clear() { * Called by timer in periodic mode. */ void db::commitlog::segment_manager::sync() { - for (auto s : _segments) { - (void)s->sync(); // we do not care about waiting... - } + auto f = std::exchange(_background_sync, make_ready_future<>()); + _background_sync = parallel_for_each(_segments, [](sseg_ptr s) { + return s->sync().discard_result(); + }).then([f = std::move(f)]() mutable { + return std::move(f); + }); } void db::commitlog::segment_manager::on_timer() { @@ -1885,10 +1954,11 @@ void db::commitlog::segment_manager::on_timer() { // above threshold, request flush. if (_new_counter > 0) { auto max = disk_usage_threshold; - auto cur = totals.active_size_on_disk; + auto cur = totals.active_size_on_disk + totals.wasted_size_on_disk; + if (max != 0 && cur >= max) { - _new_counter = 0; clogger.debug("Used size on disk {} MB exceeds local threshold {} MB", cur / (1024 * 1024), max / (1024 * 1024)); + _new_counter = 0; flush_segments(cur - max); } } @@ -2512,7 +2582,10 @@ uint64_t db::commitlog::disk_footprint() const { } uint64_t db::commitlog::get_total_size() const { - return _segment_manager->totals.active_size_on_disk + _segment_manager->totals.buffer_list_bytes; + return _segment_manager->totals.active_size_on_disk + + _segment_manager->totals.wasted_size_on_disk + + _segment_manager->totals.buffer_list_bytes + ; } uint64_t db::commitlog::get_completed_tasks() const { diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index f86a992b18..650a2731bc 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -250,7 +250,7 @@ SEASTAR_TEST_CASE(test_commitlog_discard_completed_segments){ }).then([&log] { return log.shutdown().then([&log] { return log.list_existing_segments().then([] (auto descs) { - BOOST_REQUIRE(descs.empty()); + BOOST_CHECK_EQUAL(descs, decltype(descs){}); }); }); }); @@ -821,3 +821,57 @@ SEASTAR_TEST_CASE(test_commitlog_deadlock_in_recycle) { co_await log.clear(); } } + +SEASTAR_TEST_CASE(test_commitlog_deadlock_with_flush_threshold) { + commitlog::config cfg; + + constexpr auto max_size_mb = 1; + + cfg.commitlog_segment_size_in_mb = max_size_mb; + cfg.commitlog_total_space_in_mb = 2 * max_size_mb * smp::count; + cfg.commitlog_sync_period_in_ms = 10; + cfg.reuse_segments = true; + cfg.allow_going_over_size_limit = false; + cfg.use_o_dsync = true; // make sure we pre-allocate. + + // not using cl_test, because we need to be able to abandon + // the log. + + tmpdir tmp; + cfg.commit_log_location = tmp.path().string(); + auto log = co_await commitlog::create_commitlog(cfg); + + rp_set rps; + // uncomment for verbosity + // logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug); + + auto uuid = utils::UUID_gen::get_time_UUID(); + auto size = log.max_record_size(); + + bool done = false; + + auto r = log.add_flush_handler([&](cf_id_type id, replay_position pos) { + log.discard_completed_segments(id, rps); + done = true; + }); + + bool release = true; + + try { + while (!done) { + auto now = timeout_clock::now(); + rp_handle h = co_await with_timeout(now + 30s, log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + dst.fill('1', size); + })); + rps.put(std::move(h)); + } + } catch (timed_out_error&) { + BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow..."); + release = false; + } + + if (release) { + co_await log.shutdown(); + co_await log.clear(); + } +}