diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 5ad843905..d62eea428 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -27,6 +27,14 @@ var ( // against client-disconnect detection latency. const notificationHealthCheckInterval = 250 * time.Millisecond +// caughtUpDiskPollInterval is the (longer) timeout used once a subscriber has +// already proven via ReadFromDiskFn that there is no data past its current +// position on disk. In that state, notifyChan is the primary wakeup path for +// in-memory writes; disk may still gain data from external writers (e.g. +// Schema Registry's external disk-write path), so we still re-probe on +// timeout, just at a much lower cadence than the 250ms health tick. +const caughtUpDiskPollInterval = 2 * time.Second + type MessagePosition struct { Time time.Time // timestamp of the message Offset int64 // Kafka offset for offset-based positioning, or batch index for timestamp-based @@ -65,7 +73,15 @@ func (mp MessagePosition) GetOffset() int64 { // - notificationHealthCheckInterval elapses (returns false; caller // re-checks client-disconnect and other state) func (logBuffer *LogBuffer) awaitNotificationOrTimeout(notifyChan <-chan struct{}) bool { - timer := time.NewTimer(notificationHealthCheckInterval) + return logBuffer.awaitNotificationOrTimeoutFor(notifyChan, notificationHealthCheckInterval) +} + +// awaitNotificationOrTimeoutFor is awaitNotificationOrTimeout with a caller-supplied +// timeout. Used by the ResumeFromDiskError path to back off to +// caughtUpDiskPollInterval once the disk has been proven empty past the +// subscriber's current position. +func (logBuffer *LogBuffer) awaitNotificationOrTimeoutFor(notifyChan <-chan struct{}, timeout time.Duration) bool { + timer := time.NewTimer(timeout) defer timer.Stop() select { @@ -90,6 +106,14 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition var batchIndex int64 lastReadPosition = startPosition var entryCounter int64 + // caughtUpToDiskHead is set when ReadFromDiskFn last returned without + // advancing lastReadPosition, i.e. the disk has no data past our current + // position. In that steady state, the loop still re-probes buffer and disk + // (external writers can land data on disk without notifying subscribers), + // but at caughtUpDiskPollInterval instead of the 250ms health-check tick. + // Any progress (disk advance or notification) clears the flag, so the + // responsive 250ms cadence resumes for active readers. + caughtUpToDiskHead := false defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) @@ -115,8 +139,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition return lastReadPosition, isDone, nil } if lastReadPosition != prevReadPosition { + caughtUpToDiskHead = false continue } + if !caughtUpToDiskHead { + caughtUpToDiskHead = true + glog.V(4).Infof("%s: Caught up to disk head, backing off to %s poll", readerName, caughtUpDiskPollInterval) + } } else if logBuffer.HasData() { return lastReadPosition, isDone, ResumeFromDiskError } @@ -128,12 +157,19 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition return lastReadPosition, true, nil } - // Wait for notification or timeout (instant wake-up when data arrives) - if logBuffer.awaitNotificationOrTimeout(notifyChan) { - glog.V(3).Infof("%s: Woke up from notification after ResumeFromDiskError", readerName) - } else { - glog.V(4).Infof("%s: Notification timeout after ResumeFromDiskError, rechecking state", readerName) + // Wait for notification or timeout. While caught up to disk head, + // use the longer poll interval so idle readers don't spin every + // 250ms re-probing an empty buffer and disk. + waitTimeout := notificationHealthCheckInterval + if caughtUpToDiskHead { + waitTimeout = caughtUpDiskPollInterval } + if logBuffer.awaitNotificationOrTimeoutFor(notifyChan, waitTimeout) { + glog.V(3).Infof("%s: Woke up from notification after ResumeFromDiskError", readerName) + caughtUpToDiskHead = false + } + // Silent on timeout while caught up: the re-probe cadence alone is + // the signal; no new log line per tick. // If the LogBuffer is shutting down, exit cleanly instead of looping // on ResumeFromDiskError. awaitNotificationOrTimeout returns true on @@ -273,6 +309,8 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star var offset int64 lastReadPosition = startPosition var entryCounter int64 + // See LoopProcessLogData for the caughtUpToDiskHead invariant. + caughtUpToDiskHead := false defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) @@ -316,8 +354,13 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star return lastReadPosition, isDone, nil } if lastReadPosition != prevReadPosition { + caughtUpToDiskHead = false continue } + if !caughtUpToDiskHead { + caughtUpToDiskHead = true + glog.V(4).Infof("%s: Caught up to disk head, backing off to %s poll", readerName, caughtUpDiskPollInterval) + } } else if logBuffer.HasData() { return lastReadPosition, isDone, ResumeFromDiskError } @@ -329,12 +372,19 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, star return lastReadPosition, true, nil } - // Wait for notification or timeout (instant wake-up when data arrives) - if logBuffer.awaitNotificationOrTimeout(notifyChan) { - glog.V(3).Infof("%s: Woke up from notification after disk read", readerName) - } else { - glog.V(4).Infof("%s: Notification timeout, rechecking state", readerName) + // Wait for notification or timeout. While caught up to disk head, + // use the longer poll interval so idle readers don't spin every + // 250ms re-probing an empty buffer and disk. + waitTimeout := notificationHealthCheckInterval + if caughtUpToDiskHead { + waitTimeout = caughtUpDiskPollInterval } + if logBuffer.awaitNotificationOrTimeoutFor(notifyChan, waitTimeout) { + glog.V(3).Infof("%s: Woke up from notification after disk read", readerName) + caughtUpToDiskHead = false + } + // Silent on timeout while caught up: the re-probe cadence alone is + // the signal; no new log line per tick. // Exit cleanly on shutdown so we don't loop on ResumeFromDiskError. if logBuffer.IsStopping() {