Files
seaweedfs/weed/util/log_buffer/log_buffer.go
Chris Lu edf7d2a074 fix(filer): eliminate redundant disk reads causing memory/CPU regression (#9039)
* fix(filer): eliminate redundant disk reads causing memory/CPU regression (#9035)

Since 4.18, LocalMetaLogBuffer's ReadFromDiskFn was set to
readPersistedLogBufferPosition, causing LoopProcessLogData to call
ReadPersistedLogBuffer on every 250ms health-check tick when a
subscriber encounters ResumeFromDiskError.  Each call creates an
OrderedLogVisitor (ListDirectoryEntries on the filer store), spawns a
readahead goroutine with a 1024-element channel, finds no data, and
returns — 4 times per second even on an idle filer.

This is redundant because SubscribeLocalMetadata already manages disk
reads explicitly with its own shouldReadFromDisk / lastCheckedFlushTsNs
tracking in the outer loop.

Set ReadFromDiskFn back to nil for LocalMetaLogBuffer.  When
LoopProcessLogData encounters ResumeFromDiskError with nil
ReadFromDiskFn, the HasData() guard returns ResumeFromDiskError to the
caller (SubscribeLocalMetadata), which blocks efficiently on
listenersCond.Wait() instead of polling.

* fix(filer): add gap detection for slow consumers after disk-read stall

When a slow consumer falls behind and LoopProcessLogData returns
ResumeFromDiskError with no flush or read-position progress, there may
be a gap between persisted data and in-memory data (e.g. writes stopped
while consumer was still catching up). Without this, the consumer would
block on listenersCond.Wait() forever.

Skip forward to the earliest in-memory time to resume progress, matching
the gap-handling pattern already used in the shouldReadFromDisk path.

* fix(filer): clear stale ResumeFromDiskError after gap-skip to avoid stall

The gap-detection block added in the previous commit skips lastReadTime
forward to GetEarliestTime() and continues the outer loop.  On the next
iteration, shouldReadFromDisk becomes true (currentReadTsNs >
lastDiskReadTsNs), the disk read returns processedTsNs == 0, and the
existing gap handler at the top of the loop runs its own gap check.
That check uses readInMemoryLogErr == ResumeFromDiskError as the entry
condition — but readInMemoryLogErr is still the stale error from two
iterations ago.  GetEarliestTime() now equals lastReadTime.Time (we
already advanced to it), so earliestTime.After(lastReadTime.Time) is
false and the handler falls into listenersCond.Wait() — stuck.

Clear readInMemoryLogErr at the gap-skip point, matching the existing
pattern at the earlier gap handler that already clears it for the same
reason.

* fix(log_buffer): GetEarliestTime must include sealed prev buffers

GetEarliestTime previously returned only logBuffer.startTime (the active
buffer's first timestamp).  That is narrower than ReadFromBuffer's
tsMemory, which is the min across active + prev buffers.  Callers using
GetEarliestTime for gap detection after ResumeFromDiskError (the
SubscribeLocalMetadata outer loop's disk-read path, the new gap-skip in
the in-memory ResumeFromDiskError handler, and MQ HasData) saw a time
that was *newer* than the real earliest in-memory data.

Impact in SubscribeLocalMetadata's slow-consumer path:
  - tsMemory = earliest prev buffer time (T_prev)
  - GetEarliestTime() = active startTime (T_active, later than T_prev)
  - Consumer position = T1, with T_prev < T1 < T_active
  - ReadFromBuffer returns ResumeFromDiskError (T1 < tsMemory)
  - Gap detect: GetEarliestTime().After(T1) = T_active.After(T1) = true
  - Skip forward to T_active -- silently drops the prev-buffer data
  - And when T_active happens to equal the stuck position, gap detect
    evaluates false, and the subscriber stalls on listenersCond.Wait()

This reproduces the TestMetadataSubscribeSlowConsumerKeepsProgressing
failure in CI where the consumer stalled at 10220/20000 after writing
stopped -- the buffer still had data in prev[0..3], but gap detection
was comparing against the active buffer's startTime.

Fix: scan all sealed prev buffers under RLock, return the true minimum
startTime.  Matches the min-of-buffers logic in ReadFromBuffer.

* test(log_buffer): make DiskReadRetry test deterministic

The previous test added the message via AddToBuffer + ForceFlush and
relied on a race: the second disk read had to happen before the data
was delivered through the in-memory path.  Under the race detector or
on a slow CI runner, the reader is woken by AddToBuffer's notification,
finds the data in the active buffer or its prev slot, and returns after
exactly one disk read — failing the >= 2 disk reads assertion even
though the loop behaved correctly.

Reproduced on master with race detector (2/5 failures).

Rewrite the test to deliver the data exclusively through the disk-read
path: no AddToBuffer, no ForceFlush.  The test waits until the reader
has issued at least one no-op disk read, then atomically flips a
"dataReady" flag.  The reader's next iteration through readFromDiskFn
returns the entry.  This deterministically exercises the retry-loop
behavior the test was originally written to protect, and removes the
in-memory delivery race entirely.
2026-04-11 23:12:54 -07:00

963 lines
31 KiB
Go

package log_buffer
import (
"bytes"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const BufferSize = 8 * 1024 * 1024
const PreviousBufferCount = 4
// Errors that can be returned by log buffer operations
var (
// ErrBufferCorrupted indicates the log buffer contains corrupted data
ErrBufferCorrupted = fmt.Errorf("log buffer is corrupted")
)
type dataToFlush struct {
startTime time.Time
stopTime time.Time
data *bytes.Buffer
minOffset int64
maxOffset int64
done chan struct{} // Signal when flush completes
}
type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error)
type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error)
type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
// DiskChunkCache caches chunks of historical data read from disk
type DiskChunkCache struct {
mu sync.RWMutex
chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize)
maxChunks int // Maximum number of chunks to cache
}
// CachedDiskChunk represents a cached chunk of disk data
type CachedDiskChunk struct {
startOffset int64
endOffset int64
messages []*filer_pb.LogEntry
lastAccess time.Time
}
type LogBuffer struct {
// 8-byte aligned fields
LastTsNs atomic.Int64
lastFlushTsNs atomic.Int64
lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet)
offset int64
bufferStartOffset int64
minOffset int64
maxOffset int64
flushInterval time.Duration
startTime time.Time
stopTime time.Time
// Other fields
name string
prevBuffers *SealedBuffers
buf []byte
idx []int
pos int
sizeBuf []byte
flushFn LogFlushFuncType
ReadFromDiskFn LogReadFromDiskFuncType
notifyFn func()
// Per-subscriber notification channels for instant wake-up
subscribersMu sync.RWMutex
subscribers map[string]chan struct{} // subscriberID -> notification channel
isStopping *atomic.Bool
shutdownCh chan struct{} // closed by ShutdownLogBuffer to wake blocked subscribers
isAllFlushed bool
flushChan chan *dataToFlush
// Offset range tracking for Kafka integration
hasOffsets bool
// Disk chunk cache for historical data reads
diskChunkCache *DiskChunkCache
sync.RWMutex
}
func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType,
readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer {
lb := &LogBuffer{
name: name,
prevBuffers: newSealedBuffers(PreviousBufferCount),
buf: make([]byte, BufferSize),
sizeBuf: make([]byte, 4),
flushInterval: flushInterval,
flushFn: flushFn,
ReadFromDiskFn: readFromDiskFn,
notifyFn: notifyFn,
subscribers: make(map[string]chan struct{}),
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
shutdownCh: make(chan struct{}),
offset: 0, // Will be initialized from existing data if available
diskChunkCache: &DiskChunkCache{
chunks: make(map[int64]*CachedDiskChunk),
maxChunks: 16, // Cache up to 16 chunks (configurable)
},
}
lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet
go lb.loopFlush()
go lb.loopInterval()
return lb
}
// RegisterSubscriber registers a subscriber for instant notifications when data is written
// Returns a channel that will receive notifications (<1ms latency)
func (logBuffer *LogBuffer) RegisterSubscriber(subscriberID string) chan struct{} {
logBuffer.subscribersMu.Lock()
defer logBuffer.subscribersMu.Unlock()
// Check if already registered
if existingChan, exists := logBuffer.subscribers[subscriberID]; exists {
return existingChan
}
// Create buffered channel (size 1) so notifications never block
notifyChan := make(chan struct{}, 1)
logBuffer.subscribers[subscriberID] = notifyChan
return notifyChan
}
// UnregisterSubscriber removes a subscriber and closes its notification channel
func (logBuffer *LogBuffer) UnregisterSubscriber(subscriberID string) {
logBuffer.subscribersMu.Lock()
defer logBuffer.subscribersMu.Unlock()
if ch, exists := logBuffer.subscribers[subscriberID]; exists {
close(ch)
delete(logBuffer.subscribers, subscriberID)
}
}
// IsOffsetInMemory checks if the given offset is available in the in-memory buffer
// Returns true if:
// 1. Offset is newer than what's been flushed to disk (must be in memory)
// 2. Offset is in current buffer or previous buffers (may be flushed but still in memory)
// Returns false if offset is older than memory buffers (only on disk)
func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool {
logBuffer.RLock()
defer logBuffer.RUnlock()
// Check if we're tracking offsets at all
if !logBuffer.hasOffsets {
return false // No offsets tracked yet
}
// OPTIMIZATION: If offset is newer than what's been flushed to disk,
// it MUST be in memory (not written to disk yet)
lastFlushed := logBuffer.lastFlushedOffset.Load()
if lastFlushed >= 0 && offset > lastFlushed {
return true
}
// Check if offset is in current buffer range AND buffer has data
// (data can be both on disk AND in memory during flush window)
if offset >= logBuffer.bufferStartOffset && offset <= logBuffer.offset {
// CRITICAL: Check if buffer actually has data (pos > 0)
// After flush, pos=0 but range is still valid - data is on disk, not in memory
if logBuffer.pos > 0 {
return true
}
// Buffer is empty (just flushed) - data is on disk
return false
}
// Check if offset is in previous buffers AND they have data
for _, buf := range logBuffer.prevBuffers.buffers {
if offset >= buf.startOffset && offset <= buf.offset {
// Check if prevBuffer actually has data
if buf.size > 0 {
return true
}
// Buffer is empty (flushed) - data is on disk
return false
}
}
// Offset is older than memory buffers - only available on disk
return false
}
// notifySubscribers sends notifications to all registered subscribers
// Non-blocking: uses select with default to avoid blocking on full channels
func (logBuffer *LogBuffer) notifySubscribers() {
logBuffer.subscribersMu.RLock()
defer logBuffer.subscribersMu.RUnlock()
if len(logBuffer.subscribers) == 0 {
return // No subscribers, skip notification
}
for _, notifyChan := range logBuffer.subscribers {
select {
case notifyChan <- struct{}{}:
// Notification sent successfully
default:
// Channel full - subscriber hasn't consumed previous notification yet
// This is OK because one notification is sufficient to wake the subscriber
}
}
}
// InitializeOffsetFromExistingData initializes the offset counter from existing data on disk
// This should be called after LogBuffer creation to ensure offset continuity on restart
func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn func() (int64, error)) error {
if getHighestOffsetFn == nil {
return nil // No initialization function provided
}
highestOffset, err := getHighestOffsetFn()
if err != nil {
return nil // Continue with offset 0 if we can't read existing data
}
if highestOffset >= 0 {
// Set the next offset to be one after the highest existing offset
nextOffset := highestOffset + 1
logBuffer.offset = nextOffset
// bufferStartOffset should match offset after initialization
// This ensures that reads for old offsets (0...highestOffset) will trigger disk reads
// New data written after this will start at nextOffset
logBuffer.bufferStartOffset = nextOffset
// CRITICAL: Track that data [0...highestOffset] is on disk
logBuffer.lastFlushedOffset.Store(highestOffset)
// Set lastFlushTsNs to current time (we know data up to highestOffset is on disk)
logBuffer.lastFlushTsNs.Store(time.Now().UnixNano())
} else {
logBuffer.bufferStartOffset = 0 // Start from offset 0
// No data on disk yet
}
return nil
}
func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) error {
return logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
}
// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information
func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) error {
var toFlush *dataToFlush
var marshalErr error
logBuffer.Lock()
defer func() {
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
}
// Only notify if there was no error
if marshalErr == nil {
if logBuffer.notifyFn != nil {
logBuffer.notifyFn()
}
// Notify all registered subscribers instantly (<1ms latency)
logBuffer.notifySubscribers()
}
}()
processingTsNs := logEntry.TsNs
ts := time.Unix(0, processingTsNs)
// Handle timestamp collision inside lock (rare case)
if logBuffer.LastTsNs.Load() >= processingTsNs {
processingTsNs = logBuffer.LastTsNs.Add(1)
ts = time.Unix(0, processingTsNs)
// Re-marshal with corrected timestamp
logEntry.TsNs = processingTsNs
} else {
logBuffer.LastTsNs.Store(processingTsNs)
}
logEntryData, err := proto.Marshal(logEntry)
if err != nil {
marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err)
glog.Errorf("%v", marshalErr)
return marshalErr
}
size := len(logEntryData)
if logBuffer.pos == 0 {
logBuffer.startTime = ts
// Reset offset tracking for new buffer
logBuffer.hasOffsets = false
}
// Track offset ranges for Kafka integration
// Use >= 0 to include offset 0 (first message in a topic)
if logEntry.Offset >= 0 {
if !logBuffer.hasOffsets {
logBuffer.minOffset = logEntry.Offset
logBuffer.maxOffset = logEntry.Offset
logBuffer.hasOffsets = true
} else {
if logEntry.Offset < logBuffer.minOffset {
logBuffer.minOffset = logEntry.Offset
}
if logEntry.Offset > logBuffer.maxOffset {
logBuffer.maxOffset = logEntry.Offset
}
}
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
toFlush = logBuffer.copyToFlush()
logBuffer.startTime = ts
if len(logBuffer.buf) < size+4 {
// Validate size to prevent integer overflow in computation BEFORE allocation
const maxBufferSize = 1 << 30 // 1 GiB practical limit
// Ensure 2*size + 4 won't overflow int and stays within practical bounds
if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size)
glog.Errorf("%v", marshalErr)
return marshalErr
}
// Safe to compute now that we've validated size is in valid range
newSize := 2*size + 4
logBuffer.buf = make([]byte, newSize)
}
}
logBuffer.stopTime = ts
logBuffer.idx = append(logBuffer.idx, logBuffer.pos)
util.Uint32toBytes(logBuffer.sizeBuf, uint32(size))
copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf)
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
logBuffer.pos += size + 4
logBuffer.offset++
return nil
}
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) error {
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
var ts time.Time
if processingTsNs == 0 {
ts = time.Now()
processingTsNs = ts.UnixNano()
} else {
ts = time.Unix(0, processingTsNs)
}
logEntry := &filer_pb.LogEntry{
TsNs: processingTsNs, // Will be updated if needed
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
Key: partitionKey,
}
var toFlush *dataToFlush
var marshalErr error
logBuffer.Lock()
defer func() {
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
}
// Only notify if there was no error
if marshalErr == nil {
if logBuffer.notifyFn != nil {
logBuffer.notifyFn()
}
// Notify all registered subscribers instantly (<1ms latency)
logBuffer.notifySubscribers()
}
}()
// Handle timestamp collision inside lock (rare case)
if logBuffer.LastTsNs.Load() >= processingTsNs {
processingTsNs = logBuffer.LastTsNs.Add(1)
ts = time.Unix(0, processingTsNs)
logEntry.TsNs = processingTsNs
} else {
logBuffer.LastTsNs.Store(processingTsNs)
}
// Set the offset in the LogEntry before marshaling
// This ensures the flushed data contains the correct offset information
// Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads
logEntry.Offset = logBuffer.offset
// Marshal with correct timestamp and offset
logEntryData, err := proto.Marshal(logEntry)
if err != nil {
marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err)
glog.Errorf("%v", marshalErr)
return marshalErr
}
size := len(logEntryData)
if logBuffer.pos == 0 {
logBuffer.startTime = ts
// Reset offset tracking for new buffer
logBuffer.hasOffsets = false
}
// Track offset ranges for Kafka integration
// Track the current offset being written
if !logBuffer.hasOffsets {
logBuffer.minOffset = logBuffer.offset
logBuffer.maxOffset = logBuffer.offset
logBuffer.hasOffsets = true
} else {
if logBuffer.offset < logBuffer.minOffset {
logBuffer.minOffset = logBuffer.offset
}
if logBuffer.offset > logBuffer.maxOffset {
logBuffer.maxOffset = logBuffer.offset
}
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
toFlush = logBuffer.copyToFlush()
logBuffer.startTime = ts
if len(logBuffer.buf) < size+4 {
// Validate size to prevent integer overflow in computation BEFORE allocation
const maxBufferSize = 1 << 30 // 1 GiB practical limit
// Ensure 2*size + 4 won't overflow int and stays within practical bounds
if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size)
glog.Errorf("%v", marshalErr)
return marshalErr
}
// Safe to compute now that we've validated size is in valid range
newSize := 2*size + 4
logBuffer.buf = make([]byte, newSize)
}
}
logBuffer.stopTime = ts
logBuffer.idx = append(logBuffer.idx, logBuffer.pos)
util.Uint32toBytes(logBuffer.sizeBuf, uint32(size))
copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf)
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
logBuffer.pos += size + 4
logBuffer.offset++
return nil
}
func (logBuffer *LogBuffer) IsStopping() bool {
return logBuffer.isStopping.Load()
}
// ForceFlush immediately flushes the current buffer content and WAITS for completion
// This is useful for critical topics that need immediate persistence
// CRITICAL: This function is now SYNCHRONOUS - it blocks until the flush completes
func (logBuffer *LogBuffer) ForceFlush() {
if logBuffer.isStopping.Load() {
return // Don't flush if we're shutting down
}
logBuffer.Lock()
toFlush := logBuffer.copyToFlushWithCallback()
logBuffer.Unlock()
if toFlush != nil {
// Send to flush channel (with reasonable timeout)
select {
case logBuffer.flushChan <- toFlush:
// Successfully queued for flush - now WAIT for it to complete
select {
case <-toFlush.done:
// Flush completed successfully
case <-time.After(5 * time.Second):
// Timeout waiting for flush - this shouldn't happen
}
case <-time.After(2 * time.Second):
// If flush channel is still blocked after 2s, something is wrong
}
}
}
// ShutdownLogBuffer flushes the buffer and stops the log buffer
func (logBuffer *LogBuffer) ShutdownLogBuffer() {
isAlreadyStopped := logBuffer.isStopping.Swap(true)
if isAlreadyStopped {
return
}
// Wake any subscribers blocked in awaitNotificationOrTimeout so they can
// notice IsStopping() and exit promptly, even on an idle buffer where no
// flush notification would otherwise fire.
close(logBuffer.shutdownCh)
toFlush := logBuffer.copyToFlush()
logBuffer.flushChan <- toFlush
close(logBuffer.flushChan)
}
// IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer().
func (logBuffer *LogBuffer) IsAllFlushed() bool {
return logBuffer.isAllFlushed
}
func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan {
if d != nil {
logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset)
d.releaseMemory()
// local logbuffer is different from aggregate logbuffer here
if d.maxOffset >= 0 {
logBuffer.lastFlushedOffset.Store(d.maxOffset)
}
if !d.stopTime.IsZero() {
logBuffer.lastFlushTsNs.Store(d.stopTime.UnixNano())
}
// Wake readers that may be waiting to retry disk reads after the flush lands.
if logBuffer.notifyFn != nil {
logBuffer.notifyFn()
}
logBuffer.notifySubscribers()
// Signal completion if there's a callback channel
if d.done != nil {
close(d.done)
}
}
}
logBuffer.isAllFlushed = true
}
func (logBuffer *LogBuffer) loopInterval() {
for !logBuffer.IsStopping() {
time.Sleep(logBuffer.flushInterval)
if logBuffer.IsStopping() {
return
}
logBuffer.Lock()
toFlush := logBuffer.copyToFlush()
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
}
}
}
func (logBuffer *LogBuffer) copyToFlush() *dataToFlush {
return logBuffer.copyToFlushInternal(false)
}
func (logBuffer *LogBuffer) copyToFlushWithCallback() *dataToFlush {
return logBuffer.copyToFlushInternal(true)
}
func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush {
if logBuffer.pos > 0 {
var d *dataToFlush
if logBuffer.flushFn != nil {
d = &dataToFlush{
startTime: logBuffer.startTime,
stopTime: logBuffer.stopTime,
data: copiedBytes(logBuffer.buf[:logBuffer.pos]),
minOffset: logBuffer.minOffset,
maxOffset: logBuffer.maxOffset,
}
// Add callback channel for synchronous ForceFlush
if withCallback {
d.done = make(chan struct{})
}
}
// CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1
lastOffsetInBuffer := logBuffer.offset - 1
logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.bufferStartOffset, lastOffsetInBuffer)
// Use zero time (time.Time{}) not epoch time (time.Unix(0,0))
// Epoch time (1970) breaks time-based reads after flush
logBuffer.startTime = time.Time{}
logBuffer.stopTime = time.Time{}
logBuffer.pos = 0
logBuffer.idx = logBuffer.idx[:0]
// DON'T increment offset - it's already pointing to the next offset!
// logBuffer.offset++ // REMOVED - this was causing offset gaps!
logBuffer.bufferStartOffset = logBuffer.offset // Next buffer starts at current offset (which is already the next one)
// Reset offset tracking
logBuffer.hasOffsets = false
logBuffer.minOffset = 0
logBuffer.maxOffset = 0
// Invalidate disk cache chunks after flush
// The cache may contain stale data from before this flush
// Invalidating ensures consumers will re-read fresh data from disk after flush
logBuffer.invalidateAllDiskCacheChunks()
return d
}
return nil
}
// invalidateAllDiskCacheChunks clears all cached disk chunks
// This should be called after a buffer flush to ensure consumers read fresh data from disk
func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() {
logBuffer.diskChunkCache.mu.Lock()
defer logBuffer.diskChunkCache.mu.Unlock()
if len(logBuffer.diskChunkCache.chunks) > 0 {
logBuffer.diskChunkCache.chunks = make(map[int64]*CachedDiskChunk)
}
}
// GetEarliestTime returns the oldest timestamp still resident in the buffer.
// It must consider the sealed prev buffers in addition to the active buffer,
// because ReadFromBuffer's tsMemory (and therefore ResumeFromDiskError) is
// computed from the min across both. Returning only the active startTime
// would cause gap-detection callers to skip past data still living in prev
// buffers, and can also silently equal the consumer's lastReadTime and
// stall on listenersCond.Wait().
func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
logBuffer.RLock()
defer logBuffer.RUnlock()
earliest := logBuffer.startTime
for _, prevBuf := range logBuffer.prevBuffers.buffers {
if prevBuf.startTime.IsZero() {
continue
}
if earliest.IsZero() || prevBuf.startTime.Before(earliest) {
earliest = prevBuf.startTime
}
}
return earliest
}
func (logBuffer *LogBuffer) HasData() bool {
logBuffer.RLock()
defer logBuffer.RUnlock()
if logBuffer.pos > 0 {
return true
}
for _, buf := range logBuffer.prevBuffers.buffers {
if buf.size > 0 {
return true
}
}
return false
}
func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition {
return MessagePosition{
Time: logBuffer.startTime,
Offset: logBuffer.offset,
}
}
// GetLastFlushTsNs returns the latest flushed timestamp in Unix nanoseconds.
// Returns 0 if nothing has been flushed yet.
func (logBuffer *LogBuffer) GetLastFlushTsNs() int64 {
return logBuffer.lastFlushTsNs.Load()
}
func (logBuffer *LogBuffer) SetLastFlushTsNs(ts int64) {
logBuffer.lastFlushTsNs.Store(ts)
}
func (d *dataToFlush) releaseMemory() {
d.data.Reset()
bufferPool.Put(d.data)
}
func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bufferCopy *bytes.Buffer, batchIndex int64, err error) {
logBuffer.RLock()
defer logBuffer.RUnlock()
isOffsetBased := lastReadPosition.IsOffsetBased
// For offset-based subscriptions, use offset comparisons, not time comparisons!
if isOffsetBased {
requestedOffset := lastReadPosition.Offset
// Check if the requested offset is in the current buffer range
if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset {
// If current buffer is empty (pos=0), check if data is on disk or not yet written
if logBuffer.pos == 0 {
// If buffer is empty but offset range covers the request,
// it means data was in memory and has been flushed/moved out.
// The bufferStartOffset advancing to cover this offset proves data existed.
//
// Three cases:
// 1. requestedOffset < logBuffer.offset: Data was here, now flushed
// 2. requestedOffset == logBuffer.offset && bufferStartOffset > 0: Buffer advanced, data flushed
// 3. requestedOffset == logBuffer.offset && bufferStartOffset == 0: Initial state - try disk first!
//
// Cases 1 & 2: try disk read
// Case 3: try disk read (historical data might exist)
if requestedOffset < logBuffer.offset {
// Data was in the buffer range but buffer is now empty = flushed to disk
return nil, -2, ResumeFromDiskError
}
// requestedOffset == logBuffer.offset: Current position
// CRITICAL: For subscribers starting from offset 0, try disk read first
// (historical data might exist from previous runs)
if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 {
// Initial state: try disk read before waiting for new data
return nil, -2, ResumeFromDiskError
}
// Otherwise, wait for new data to arrive
return nil, logBuffer.offset, nil
}
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
}
// Check previous buffers for the requested offset
for _, buf := range logBuffer.prevBuffers.buffers {
if requestedOffset >= buf.startOffset && requestedOffset <= buf.offset {
// If prevBuffer is empty, it means the data was flushed to disk
// (prevBuffers are created when buffer is flushed)
if buf.size == 0 {
// Empty prevBuffer covering this offset means data was flushed
return nil, -2, ResumeFromDiskError
}
return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
}
}
// Offset not found in any buffer
if requestedOffset < logBuffer.bufferStartOffset {
// Data not in current buffers - must be on disk (flushed or never existed)
// Return ResumeFromDiskError to trigger disk read
return nil, -2, ResumeFromDiskError
}
if requestedOffset > logBuffer.offset {
// Future data, not available yet
return nil, logBuffer.offset, nil
}
// Offset not found - return nil
return nil, logBuffer.offset, nil
}
// TIMESTAMP-BASED READ (original logic)
// Read from disk and memory
// 1. read from disk, last time is = td
// 2. in memory, the earliest time = tm
// if tm <= td, case 2.1
// read from memory
// if tm is empty, case 2.2
// read from memory
// if td < tm, case 2.3
// read from disk again
var tsMemory time.Time
if !logBuffer.startTime.IsZero() {
tsMemory = logBuffer.startTime
}
for _, prevBuf := range logBuffer.prevBuffers.buffers {
if !prevBuf.startTime.IsZero() {
// If tsMemory is zero, assign directly; otherwise compare
if tsMemory.IsZero() || prevBuf.startTime.Before(tsMemory) {
tsMemory = prevBuf.startTime
}
}
}
if tsMemory.IsZero() { // case 2.2
// Buffer is empty - return ResumeFromDiskError so caller can read from disk
// This fixes issue #4977 where SubscribeMetadata stalls because
// MetaAggregator.MetaLogBuffer is empty in single-filer setups
return nil, -2, ResumeFromDiskError
} else if lastReadPosition.Time.Before(tsMemory) { // case 2.3
// For time-based reads, only check timestamp for disk reads
// Don't use offset comparisons as they're not meaningful for time-based subscriptions
// Special case: If requested time is zero (Unix epoch), treat as "start from beginning"
// This handles queries that want to read all data without knowing the exact start time
if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 {
// Start from the beginning of memory
// Fall through to case 2.1 to read from earliest buffer
} else if lastReadPosition.Offset <= 0 && lastReadPosition.Time.Before(tsMemory) {
// Treat first read with sentinel/zero offset as inclusive of earliest in-memory data
} else {
// Data not in memory buffers - read from disk
return nil, -2, ResumeFromDiskError
}
}
// the following is case 2.1
if lastReadPosition.Time.Equal(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() {
// For first-read sentinel/zero offset, allow inclusive read at the boundary
if lastReadPosition.Offset > 0 {
return nil, logBuffer.offset, nil
}
}
if lastReadPosition.Time.After(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() {
return nil, logBuffer.offset, nil
}
// Also check prevBuffers when current buffer is empty (startTime is zero)
if lastReadPosition.Time.Before(logBuffer.startTime) || logBuffer.startTime.IsZero() {
for _, buf := range logBuffer.prevBuffers.buffers {
if buf.startTime.After(lastReadPosition.Time) {
return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
}
if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) {
searchTime := lastReadPosition.Time
if lastReadPosition.Offset <= 0 {
searchTime = searchTime.Add(-time.Nanosecond)
}
pos, err := buf.locateByTs(searchTime)
if err != nil {
// Buffer corruption detected - return error wrapped with ErrBufferCorrupted
glog.Errorf("ReadFromBuffer: buffer corruption in prevBuffer: %v", err)
return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
}
if pos < buf.size {
return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil
}
}
}
// If current buffer is not empty, return it
if logBuffer.pos > 0 {
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
}
// Buffer is empty and no data in prevBuffers - wait for new data
return nil, logBuffer.offset, nil
}
lastTs := lastReadPosition.Time.UnixNano()
// Inclusive boundary for first-read sentinel/zero offset
searchTs := lastTs
if lastReadPosition.Offset <= 0 {
if searchTs > math.MinInt64+1 { // prevent underflow
searchTs = searchTs - 1
}
}
l, h := 0, len(logBuffer.idx)-1
/*
for i, pos := range m.idx {
logEntry, ts := readTs(m.buf, pos)
event := &filer_pb.SubscribeMetadataResponse{}
proto.Unmarshal(logEntry.Data, event)
entry := event.EventNotification.OldEntry
if entry == nil {
entry = event.EventNotification.NewEntry
}
}
*/
for l <= h {
mid := (l + h) / 2
pos := logBuffer.idx[mid]
_, t, err := readTs(logBuffer.buf, pos)
if err != nil {
// Buffer corruption detected in binary search
glog.Errorf("ReadFromBuffer: buffer corruption at idx[%d] pos %d: %v", mid, pos, err)
return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
}
if t <= searchTs {
l = mid + 1
} else if searchTs < t {
var prevT int64
if mid > 0 {
_, prevT, err = readTs(logBuffer.buf, logBuffer.idx[mid-1])
if err != nil {
// Buffer corruption detected in binary search (previous entry)
glog.Errorf("ReadFromBuffer: buffer corruption at idx[%d] pos %d: %v", mid-1, logBuffer.idx[mid-1], err)
return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
}
}
if prevT <= searchTs {
return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil
}
h = mid
}
}
// Binary search didn't find the timestamp - data may have been flushed to disk already
// Returning -2 signals to caller that data is not available in memory
return nil, -2, nil
}
func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
bufferPool.Put(b)
}
// GetName returns the log buffer name for metadata tracking
func (logBuffer *LogBuffer) GetName() string {
logBuffer.RLock()
defer logBuffer.RUnlock()
return logBuffer.name
}
// GetOffset returns the current offset for metadata tracking
func (logBuffer *LogBuffer) GetOffset() int64 {
logBuffer.RLock()
defer logBuffer.RUnlock()
return logBuffer.offset
}
var bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
// logEntryPool reduces allocations in readTs which is called frequently during binary search
var logEntryPool = sync.Pool{
New: func() interface{} {
return &filer_pb.LogEntry{}
},
}
// resetLogEntry clears a LogEntry for pool reuse
func resetLogEntry(e *filer_pb.LogEntry) {
proto.Reset(e)
}
func copiedBytes(buf []byte) (copied *bytes.Buffer) {
copied = bufferPool.Get().(*bytes.Buffer)
copied.Reset()
copied.Write(buf)
return
}
func readTs(buf []byte, pos int) (size int, ts int64, err error) {
// Bounds check for size field (overflow-safe)
if pos < 0 || pos > len(buf)-4 {
return 0, 0, fmt.Errorf("corrupted log buffer: cannot read size at pos %d, buffer length %d", pos, len(buf))
}
size = int(util.BytesToUint32(buf[pos : pos+4]))
// Bounds check for entry data (overflow-safe, protects against negative size)
if size < 0 || size > len(buf)-pos-4 {
return 0, 0, fmt.Errorf("corrupted log buffer: entry size %d at pos %d exceeds buffer length %d", size, pos, len(buf))
}
entryData := buf[pos+4 : pos+4+size]
// Use pooled LogEntry to avoid allocation on every call
logEntry := logEntryPool.Get().(*filer_pb.LogEntry)
defer func() {
resetLogEntry(logEntry)
logEntryPool.Put(logEntry)
}()
err = proto.Unmarshal(entryData, logEntry)
if err != nil {
// Return error instead of failing fast
// This allows caller to handle corruption gracefully
return 0, 0, fmt.Errorf("corrupted log buffer: failed to unmarshal LogEntry at pos %d, size %d: %w", pos, size, err)
}
return size, logEntry.TsNs, nil
}