db: commitlog: use fmt::streamed() to print segment

before this change, we rely on the default-generated fmt::formatter
created from operator<<, but fmt v10 dropped the default-generated
formatter.

in this change:

* add `format_as()` for `segment` so we can use it as a fallback
  after upgrading to {fmt} v10
* use fmt::streamed() when formatting `segment`, this will be used
  the intermediate solution before {fmt} v10 after dropping
  `FMT_DEPRECATED_OSTREAM` macro

Refs #13245

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>

Closes scylladb/scylladb#18019
This commit is contained in:
Kefu Chai
2024-03-21 18:00:52 +08:00
committed by Botond Dénes
parent cd9589ec78
commit f3532cbaa0

View File

@@ -747,6 +747,9 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
std::unordered_set<table_schema_version> _known_schema_versions;
// TODO: remove all explicit fmt::streamed(*this) calls with *this when using {fmt} v10,
// as v10 can fallback to use format_as()
friend sstring format_as(const segment&);
friend std::ostream& operator<<(std::ostream&, const segment&);
friend class segment_manager;
@@ -808,20 +811,20 @@ public:
_sync_time(clock_type::now()), _pending_ops(true) // want exception propagation
{
++_segment_manager->totals.segments_created;
clogger.debug("Created new segment {}", *this);
clogger.debug("Created new segment {}", fmt::streamed(*this));
}
~segment() {
dispose_mode mode = dispose_mode::Keep;
if (is_clean()) {
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
clogger.debug("Segment {} is no longer active and will submitted for delete now", fmt::streamed(*this));
++_segment_manager->totals.segments_destroyed;
_segment_manager->totals.active_size_on_disk -= file_position();
_segment_manager->totals.bytes_released += file_position();
_segment_manager->totals.wasted_size_on_disk -= _waste;
mode = dispose_mode::Delete;
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
clogger.warn("Segment {} is dirty and is left on disk.", *this);
clogger.warn("Segment {} is dirty and is left on disk.", fmt::streamed(*this));
}
_segment_manager->totals.buffer_list_bytes -= _buffer.size_bytes();
@@ -860,7 +863,7 @@ public:
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now - _sync_time).count();
if ((_segment_manager->cfg.commitlog_sync_period_in_ms * 2) < uint64_t(ms)) {
clogger.debug("{} needs sync. {} ms elapsed", *this, ms);
clogger.debug("{} needs sync. {} ms elapsed", fmt::streamed(*this), ms);
return true;
}
return false;
@@ -919,7 +922,7 @@ public:
assert(me.use_count() > 1);
uint64_t pos = _file_pos;
clogger.trace("Syncing {} {} -> {}", *this, _flush_pos, pos);
clogger.trace("Syncing {} {} -> {}", fmt::streamed(*this), _flush_pos, pos);
// Only run the flush when all write ops at lower rp:s
// have completed.
@@ -940,7 +943,7 @@ public:
// we should only get here when all actual data is
// already flushed (see below, close()).
if (file_position() < _segment_manager->max_size) {
clogger.trace("{} is closed but not terminated.", *this);
clogger.trace("{} is closed but not terminated.", fmt::streamed(*this));
if (_buffer.empty()) {
new_buffer(0);
}
@@ -967,7 +970,7 @@ public:
});
if (pos <= _flush_pos) {
clogger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
clogger.trace("{} already synced! ({} < {})", fmt::streamed(*this), pos, _flush_pos);
co_return me;
}
@@ -977,7 +980,7 @@ public:
// we fast-fail the whole commit.
_flush_pos = std::max(pos, _flush_pos);
++_segment_manager->totals.flush_count;
clogger.trace("{} synced to {}", *this, _flush_pos);
clogger.trace("{} synced to {}", fmt::streamed(*this), _flush_pos);
} catch (...) {
clogger.error("Failed to flush commits to disk: {}", std::current_exception());
throw;
@@ -1087,7 +1090,7 @@ public:
} else {
assert(num == 0);
assert(_closed);
clogger.trace("Terminating {} at pos {}", *this, _file_pos);
clogger.trace("Terminating {} at pos {}", fmt::streamed(*this), _file_pos);
write(out, uint64_t(0));
}
@@ -1148,7 +1151,7 @@ public:
_segment_manager->totals.active_size_on_disk += bytes;
++_segment_manager->totals.cycle_count;
if (bytes == view.size_bytes()) {
clogger.trace("Final write of {} to {}: {}/{} bytes at {}", bytes, *this, size, size, off);
clogger.trace("Final write of {} to {}: {}/{} bytes at {}", bytes, fmt::streamed(*this), size, size, off);
break;
}
// gah, partial write. should always get here with dma chunk sized
@@ -1156,12 +1159,12 @@ public:
bytes = align_down(bytes, _alignment);
off += bytes;
view.remove_prefix(bytes);
clogger.trace("Partial write of {} to {}: {}/{} bytes at at {}", bytes, *this, size - view.size_bytes(), size, off - bytes);
clogger.trace("Partial write of {} to {}: {}/{} bytes at at {}", bytes, fmt::streamed(*this), size - view.size_bytes(), size, off - bytes);
continue;
// TODO: retry/ignore/fail/stop - optional behaviour in origin.
// we fast-fail the whole commit.
} catch (...) {
clogger.error("Failed to persist commits to disk for {}: {}", *this, std::current_exception());
clogger.error("Failed to persist commits to disk for {}: {}", fmt::streamed(*this), std::current_exception());
throw;
}
}
@@ -2046,8 +2049,12 @@ future<> db::commitlog::segment_manager::wait_for_pending_deletes() noexcept {
namespace db {
sstring format_as(const db::commitlog::segment& s) {
return s._desc.filename();
}
std::ostream& operator<<(std::ostream& out, const db::commitlog::segment& s) {
return out << s._desc.filename();
return out << format_as(s);
}
}
@@ -2071,15 +2078,15 @@ void db::commitlog::segment_manager::discard_unused_segments() noexcept {
std::erase_if(_segments, [=](sseg_ptr s) {
if (s->can_delete()) {
clogger.debug("Segment {} is unused", *s);
clogger.debug("Segment {} is unused", fmt::streamed(*s));
return true;
}
if (s->is_still_allocating()) {
clogger.debug("Not safe to delete segment {}; still allocating.", s);
clogger.debug("Not safe to delete segment {}; still allocating.", fmt::streamed(*s));
} else if (!s->is_clean()) {
clogger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s});
clogger.debug("Not safe to delete segment {}; dirty is {}", fmt::streamed(*s), segment::cf_mark {*s});
} else {
clogger.debug("Not safe to delete segment {}; disk ops pending", s);
clogger.debug("Not safe to delete segment {}; disk ops pending", fmt::streamed(*s));
}
return false;
});
@@ -2121,7 +2128,7 @@ future<> db::commitlog::segment_manager::sync_all_segments() {
auto def_copy = _segments;
co_await coroutine::parallel_for_each(def_copy, [] (sseg_ptr s) -> future<> {
co_await s->sync();
clogger.debug("Synced segment {}", *s);
clogger.debug("Synced segment {}", fmt::streamed(*s));
});
}
@@ -2132,7 +2139,7 @@ future<> db::commitlog::segment_manager::shutdown_all_segments() {
auto def_copy = _segments;
co_await coroutine::parallel_for_each(def_copy, [] (sseg_ptr s) -> future<> {
co_await s->shutdown();
clogger.debug("Shutdown segment {}", *s);
clogger.debug("Shutdown segment {}", fmt::streamed(*s));
});
}