fix(log_buffer): back off disk-poll cadence when caught up to disk head (#9161)

Idle subscribers parked in the ResumeFromDiskError path were re-probing
the in-memory buffer and disk every 250ms, emitting a "Notification
timeout after ResumeFromDiskError, rechecking state" log line per tick
even when nothing had changed. Once ReadFromDiskFn returns without
advancing lastReadPosition, we know the disk has no data past the
subscriber's current position. Switch to a 2s poll in that state so
external disk writers (e.g. Schema Registry) are still re-detected on a
bounded cadence, but idle CPU and log noise drop ~8x. Any progress
(disk advance or notifyChan wakeup) clears the flag and restores the
responsive 250ms tick for active readers. The per-timeout V(4) log is
replaced by a single "Caught up to disk head" transition log.
This commit is contained in:
Chris Lu
2026-04-20 14:36:56 -07:00
committed by GitHub
parent 61c1735cdd
commit d2e64e85ce

View File

@@ -27,6 +27,14 @@ var (
// against client-disconnect detection latency.
const notificationHealthCheckInterval = 250 * time.Millisecond
// caughtUpDiskPollInterval is the (longer) timeout used once a subscriber has
// already proven via ReadFromDiskFn that there is no data past its current
// position on disk. In that state, notifyChan is the primary wakeup path for
// in-memory writes; disk may still gain data from external writers (e.g.
// Schema Registry's external disk-write path), so we still re-probe on
// timeout, just at a much lower cadence than the 250ms health tick.
const caughtUpDiskPollInterval = 2 * time.Second
type MessagePosition struct {
Time time.Time // timestamp of the message
Offset int64 // Kafka offset for offset-based positioning, or batch index for timestamp-based
@@ -65,7 +73,15 @@ func (mp MessagePosition) GetOffset() int64 {
// - notificationHealthCheckInterval elapses (returns false; caller
// re-checks client-disconnect and other state)
func (logBuffer *LogBuffer) awaitNotificationOrTimeout(notifyChan <-chan struct{}) bool {
timer := time.NewTimer(notificationHealthCheckInterval)
return logBuffer.awaitNotificationOrTimeoutFor(notifyChan, notificationHealthCheckInterval)
}
// awaitNotificationOrTimeoutFor is awaitNotificationOrTimeout with a caller-supplied
// timeout. Used by the ResumeFromDiskError path to back off to
// caughtUpDiskPollInterval once the disk has been proven empty past the
// subscriber's current position.
func (logBuffer *LogBuffer) awaitNotificationOrTimeoutFor(notifyChan <-chan struct{}, timeout time.Duration) bool {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
@@ -90,6 +106,14 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
var batchIndex int64
lastReadPosition = startPosition
var entryCounter int64
// caughtUpToDiskHead is set when ReadFromDiskFn last returned without
// advancing lastReadPosition, i.e. the disk has no data past our current
// position. In that steady state, the loop still re-probes buffer and disk
// (external writers can land data on disk without notifying subscribers),
// but at caughtUpDiskPollInterval instead of the 250ms health-check tick.
// Any progress (disk advance or notification) clears the flag, so the
// responsive 250ms cadence resumes for active readers.
caughtUpToDiskHead := false
defer func() {
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
@@ -115,8 +139,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
return lastReadPosition, isDone, nil
}
if lastReadPosition != prevReadPosition {
caughtUpToDiskHead = false
continue
}
if !caughtUpToDiskHead {
caughtUpToDiskHead = true
glog.V(4).Infof("%s: Caught up to disk head, backing off to %s poll", readerName, caughtUpDiskPollInterval)
}
} else if logBuffer.HasData() {
return lastReadPosition, isDone, ResumeFromDiskError
}
@@ -128,12 +157,19 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
return lastReadPosition, true, nil
}
// Wait for notification or timeout (instant wake-up when data arrives)
if logBuffer.awaitNotificationOrTimeout(notifyChan) {
glog.V(3).Infof("%s: Woke up from notification after ResumeFromDiskError", readerName)
} else {
glog.V(4).Infof("%s: Notification timeout after ResumeFromDiskError, rechecking state", readerName)
// Wait for notification or timeout. While caught up to disk head,
// use the longer poll interval so idle readers don't spin every
// 250ms re-probing an empty buffer and disk.
waitTimeout := notificationHealthCheckInterval
if caughtUpToDiskHead {
waitTimeout = caughtUpDiskPollInterval
}
if logBuffer.awaitNotificationOrTimeoutFor(notifyChan, waitTimeout) {
glog.V(3).Infof("%s: Woke up from notification after ResumeFromDiskError", readerName)
caughtUpToDiskHead = false
}
// Silent on timeout while caught up: the re-probe cadence alone is
// the signal; no new log line per tick.
// If the LogBuffer is shutting down, exit cleanly instead of looping
// on ResumeFromDiskError. awaitNotificationOrTimeout returns true on
@@ -273,6 +309,8 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star
var offset int64
lastReadPosition = startPosition
var entryCounter int64
// See LoopProcessLogData for the caughtUpToDiskHead invariant.
caughtUpToDiskHead := false
defer func() {
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
@@ -316,8 +354,13 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star
return lastReadPosition, isDone, nil
}
if lastReadPosition != prevReadPosition {
caughtUpToDiskHead = false
continue
}
if !caughtUpToDiskHead {
caughtUpToDiskHead = true
glog.V(4).Infof("%s: Caught up to disk head, backing off to %s poll", readerName, caughtUpDiskPollInterval)
}
} else if logBuffer.HasData() {
return lastReadPosition, isDone, ResumeFromDiskError
}
@@ -329,12 +372,19 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star
return lastReadPosition, true, nil
}
// Wait for notification or timeout (instant wake-up when data arrives)
if logBuffer.awaitNotificationOrTimeout(notifyChan) {
glog.V(3).Infof("%s: Woke up from notification after disk read", readerName)
} else {
glog.V(4).Infof("%s: Notification timeout, rechecking state", readerName)
// Wait for notification or timeout. While caught up to disk head,
// use the longer poll interval so idle readers don't spin every
// 250ms re-probing an empty buffer and disk.
waitTimeout := notificationHealthCheckInterval
if caughtUpToDiskHead {
waitTimeout = caughtUpDiskPollInterval
}
if logBuffer.awaitNotificationOrTimeoutFor(notifyChan, waitTimeout) {
glog.V(3).Infof("%s: Woke up from notification after disk read", readerName)
caughtUpToDiskHead = false
}
// Silent on timeout while caught up: the re-probe cadence alone is
// the signal; no new log line per tick.
// Exit cleanly on shutdown so we don't loop on ResumeFromDiskError.
if logBuffer.IsStopping() {