From f3532cbaa0bf03ba3f97f89ca943dbdecfc9cf5a Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Thu, 21 Mar 2024 18:00:52 +0800 Subject: [PATCH] db: commitlog: use fmt::streamed() to print segment before this change, we rely on the default-generated fmt::formatter created from operator<<, but fmt v10 dropped the default-generated formatter. in this change: * add `format_as()` for `segment` so we can use it as a fallback after upgrading to {fmt} v10 * use fmt::streamed() when formatting `segment`, this will be used the intermediate solution before {fmt} v10 after dropping `FMT_DEPRECATED_OSTREAM` macro Refs #13245 Signed-off-by: Kefu Chai Closes scylladb/scylladb#18019 --- db/commitlog/commitlog.cc | 45 ++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 66ebfc3ba5..159172509c 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -747,6 +747,9 @@ class db::commitlog::segment : public enable_shared_from_this, public c std::unordered_set _known_schema_versions; + // TODO: remove all explicit fmt::streamed(*this) calls with *this when using {fmt} v10, + // as v10 can fallback to use format_as() + friend sstring format_as(const segment&); friend std::ostream& operator<<(std::ostream&, const segment&); friend class segment_manager; @@ -808,20 +811,20 @@ public: _sync_time(clock_type::now()), _pending_ops(true) // want exception propagation { ++_segment_manager->totals.segments_created; - clogger.debug("Created new segment {}", *this); + clogger.debug("Created new segment {}", fmt::streamed(*this)); } ~segment() { dispose_mode mode = dispose_mode::Keep; if (is_clean()) { - clogger.debug("Segment {} is no longer active and will submitted for delete now", *this); + clogger.debug("Segment {} is no longer active and will submitted for delete now", fmt::streamed(*this)); ++_segment_manager->totals.segments_destroyed; _segment_manager->totals.active_size_on_disk -= file_position(); _segment_manager->totals.bytes_released += file_position(); _segment_manager->totals.wasted_size_on_disk -= _waste; mode = dispose_mode::Delete; } else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) { - clogger.warn("Segment {} is dirty and is left on disk.", *this); + clogger.warn("Segment {} is dirty and is left on disk.", fmt::streamed(*this)); } _segment_manager->totals.buffer_list_bytes -= _buffer.size_bytes(); @@ -860,7 +863,7 @@ public: auto ms = std::chrono::duration_cast( now - _sync_time).count(); if ((_segment_manager->cfg.commitlog_sync_period_in_ms * 2) < uint64_t(ms)) { - clogger.debug("{} needs sync. {} ms elapsed", *this, ms); + clogger.debug("{} needs sync. {} ms elapsed", fmt::streamed(*this), ms); return true; } return false; @@ -919,7 +922,7 @@ public: assert(me.use_count() > 1); uint64_t pos = _file_pos; - clogger.trace("Syncing {} {} -> {}", *this, _flush_pos, pos); + clogger.trace("Syncing {} {} -> {}", fmt::streamed(*this), _flush_pos, pos); // Only run the flush when all write ops at lower rp:s // have completed. @@ -940,7 +943,7 @@ public: // we should only get here when all actual data is // already flushed (see below, close()). if (file_position() < _segment_manager->max_size) { - clogger.trace("{} is closed but not terminated.", *this); + clogger.trace("{} is closed but not terminated.", fmt::streamed(*this)); if (_buffer.empty()) { new_buffer(0); } @@ -967,7 +970,7 @@ public: }); if (pos <= _flush_pos) { - clogger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos); + clogger.trace("{} already synced! ({} < {})", fmt::streamed(*this), pos, _flush_pos); co_return me; } @@ -977,7 +980,7 @@ public: // we fast-fail the whole commit. _flush_pos = std::max(pos, _flush_pos); ++_segment_manager->totals.flush_count; - clogger.trace("{} synced to {}", *this, _flush_pos); + clogger.trace("{} synced to {}", fmt::streamed(*this), _flush_pos); } catch (...) { clogger.error("Failed to flush commits to disk: {}", std::current_exception()); throw; @@ -1087,7 +1090,7 @@ public: } else { assert(num == 0); assert(_closed); - clogger.trace("Terminating {} at pos {}", *this, _file_pos); + clogger.trace("Terminating {} at pos {}", fmt::streamed(*this), _file_pos); write(out, uint64_t(0)); } @@ -1148,7 +1151,7 @@ public: _segment_manager->totals.active_size_on_disk += bytes; ++_segment_manager->totals.cycle_count; if (bytes == view.size_bytes()) { - clogger.trace("Final write of {} to {}: {}/{} bytes at {}", bytes, *this, size, size, off); + clogger.trace("Final write of {} to {}: {}/{} bytes at {}", bytes, fmt::streamed(*this), size, size, off); break; } // gah, partial write. should always get here with dma chunk sized @@ -1156,12 +1159,12 @@ public: bytes = align_down(bytes, _alignment); off += bytes; view.remove_prefix(bytes); - clogger.trace("Partial write of {} to {}: {}/{} bytes at at {}", bytes, *this, size - view.size_bytes(), size, off - bytes); + clogger.trace("Partial write of {} to {}: {}/{} bytes at at {}", bytes, fmt::streamed(*this), size - view.size_bytes(), size, off - bytes); continue; // TODO: retry/ignore/fail/stop - optional behaviour in origin. // we fast-fail the whole commit. } catch (...) { - clogger.error("Failed to persist commits to disk for {}: {}", *this, std::current_exception()); + clogger.error("Failed to persist commits to disk for {}: {}", fmt::streamed(*this), std::current_exception()); throw; } } @@ -2046,8 +2049,12 @@ future<> db::commitlog::segment_manager::wait_for_pending_deletes() noexcept { namespace db { +sstring format_as(const db::commitlog::segment& s) { + return s._desc.filename(); +} + std::ostream& operator<<(std::ostream& out, const db::commitlog::segment& s) { - return out << s._desc.filename(); + return out << format_as(s); } } @@ -2071,15 +2078,15 @@ void db::commitlog::segment_manager::discard_unused_segments() noexcept { std::erase_if(_segments, [=](sseg_ptr s) { if (s->can_delete()) { - clogger.debug("Segment {} is unused", *s); + clogger.debug("Segment {} is unused", fmt::streamed(*s)); return true; } if (s->is_still_allocating()) { - clogger.debug("Not safe to delete segment {}; still allocating.", s); + clogger.debug("Not safe to delete segment {}; still allocating.", fmt::streamed(*s)); } else if (!s->is_clean()) { - clogger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s}); + clogger.debug("Not safe to delete segment {}; dirty is {}", fmt::streamed(*s), segment::cf_mark {*s}); } else { - clogger.debug("Not safe to delete segment {}; disk ops pending", s); + clogger.debug("Not safe to delete segment {}; disk ops pending", fmt::streamed(*s)); } return false; }); @@ -2121,7 +2128,7 @@ future<> db::commitlog::segment_manager::sync_all_segments() { auto def_copy = _segments; co_await coroutine::parallel_for_each(def_copy, [] (sseg_ptr s) -> future<> { co_await s->sync(); - clogger.debug("Synced segment {}", *s); + clogger.debug("Synced segment {}", fmt::streamed(*s)); }); } @@ -2132,7 +2139,7 @@ future<> db::commitlog::segment_manager::shutdown_all_segments() { auto def_copy = _segments; co_await coroutine::parallel_for_each(def_copy, [] (sseg_ptr s) -> future<> { co_await s->shutdown(); - clogger.debug("Shutdown segment {}", *s); + clogger.debug("Shutdown segment {}", fmt::streamed(*s)); }); }