mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-25 03:01:47 +00:00
fix(filer.sync): keep sync_offset fresh while the source is read-only (#9589)
* fix(filer.sync): keep sync_offset fresh while the source is read-only sync_offset holds the timestamp of the last replicated source event, so monitoring derives lag from now-sync_offset. A read-only source emits no metadata events, so the gauge froze at the last write and the derived lag grew without bound, making thresholds unusable. The source filer now sends an idle heartbeat carrying its current time while a subscriber is caught up to the buffer head. filer.sync uses it to advance the gauge, so now-sync_offset reflects real lag. Heartbeats are opt-in (client_supports_idle_heartbeat), are never written to the metadata log, and do not move the resume checkpoint, so a restart still resumes from the last real event. * fix(filer.sync): gate idle heartbeat on the read cursor, not SinceNs In metadata-chunks mode persisted entries replay as log file refs and never reach eachLogEntryFn, so lastSeenTsNs stays put and a caught-up subscriber with an old SinceNs would never get a heartbeat. Use the read cursor (lastReadTime), which advances in that mode too, max'd with lastSeenTsNs so the in-memory backlog-then-idle case still works while the cursor returned to the caller has not yet updated.
This commit is contained in:
@@ -439,6 +439,13 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi
|
||||
StartTsNs: sourceFilerOffsetTsNs,
|
||||
StopTsNs: 0,
|
||||
EventErrorType: pb.RetryForeverOnError,
|
||||
// While the source has only read activity it emits no metadata events, so
|
||||
// the watermark above never advances and sync_offset would look stuck.
|
||||
// The idle heartbeat moves the gauge to the source's current time once we
|
||||
// are caught up, so now-sync_offset reflects real lag and stays alertable.
|
||||
OnIdleHeartbeat: func(tsNs int64) {
|
||||
statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(tsNs))
|
||||
},
|
||||
}
|
||||
|
||||
return pb.FollowMetadata(sourceFiler, sourceGrpcDialOption, metadataFollowOption, processEventFnWithOffset)
|
||||
|
||||
@@ -421,6 +421,7 @@ message SubscribeMetadataRequest {
|
||||
repeated string directories = 10; // exact directory to watch
|
||||
bool client_supports_batching = 11; // client can unpack SubscribeMetadataResponse.events
|
||||
bool client_supports_metadata_chunks = 12; // client can read log file chunks from volume servers
|
||||
bool client_supports_idle_heartbeat = 13; // server may send empty responses carrying the current time while the client is caught up
|
||||
}
|
||||
message SubscribeMetadataResponse {
|
||||
string directory = 1;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.36.6
|
||||
// protoc v7.34.1
|
||||
// protoc v6.33.4
|
||||
// source: filer.proto
|
||||
|
||||
package filer_pb
|
||||
@@ -3079,6 +3079,7 @@ type SubscribeMetadataRequest struct {
|
||||
Directories []string `protobuf:"bytes,10,rep,name=directories,proto3" json:"directories,omitempty"` // exact directory to watch
|
||||
ClientSupportsBatching bool `protobuf:"varint,11,opt,name=client_supports_batching,json=clientSupportsBatching,proto3" json:"client_supports_batching,omitempty"` // client can unpack SubscribeMetadataResponse.events
|
||||
ClientSupportsMetadataChunks bool `protobuf:"varint,12,opt,name=client_supports_metadata_chunks,json=clientSupportsMetadataChunks,proto3" json:"client_supports_metadata_chunks,omitempty"` // client can read log file chunks from volume servers
|
||||
ClientSupportsIdleHeartbeat bool `protobuf:"varint,13,opt,name=client_supports_idle_heartbeat,json=clientSupportsIdleHeartbeat,proto3" json:"client_supports_idle_heartbeat,omitempty"` // server may send empty responses carrying the current time while the client is caught up
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -3190,6 +3191,13 @@ func (x *SubscribeMetadataRequest) GetClientSupportsMetadataChunks() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *SubscribeMetadataRequest) GetClientSupportsIdleHeartbeat() bool {
|
||||
if x != nil {
|
||||
return x.ClientSupportsIdleHeartbeat
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type SubscribeMetadataResponse struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"`
|
||||
@@ -5753,7 +5761,7 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\vfiler_group\x18\r \x01(\tR\n" +
|
||||
"filerGroup\x12#\n" +
|
||||
"\rmajor_version\x18\x0e \x01(\x05R\fmajorVersion\x12#\n" +
|
||||
"\rminor_version\x18\x0f \x01(\x05R\fminorVersion\"\xb8\x03\n" +
|
||||
"\rminor_version\x18\x0f \x01(\x05R\fminorVersion\"\xfd\x03\n" +
|
||||
"\x18SubscribeMetadataRequest\x12\x1f\n" +
|
||||
"\vclient_name\x18\x01 \x01(\tR\n" +
|
||||
"clientName\x12\x1f\n" +
|
||||
@@ -5768,7 +5776,8 @@ const file_filer_proto_rawDesc = "" +
|
||||
"\vdirectories\x18\n" +
|
||||
" \x03(\tR\vdirectories\x128\n" +
|
||||
"\x18client_supports_batching\x18\v \x01(\bR\x16clientSupportsBatching\x12E\n" +
|
||||
"\x1fclient_supports_metadata_chunks\x18\f \x01(\bR\x1cclientSupportsMetadataChunks\"\x96\x02\n" +
|
||||
"\x1fclient_supports_metadata_chunks\x18\f \x01(\bR\x1cclientSupportsMetadataChunks\x12C\n" +
|
||||
"\x1eclient_supports_idle_heartbeat\x18\r \x01(\bR\x1bclientSupportsIdleHeartbeat\"\x96\x02\n" +
|
||||
"\x19SubscribeMetadataResponse\x12\x1c\n" +
|
||||
"\tdirectory\x18\x01 \x01(\tR\tdirectory\x12J\n" +
|
||||
"\x12event_notification\x18\x02 \x01(\v2\x1b.filer_pb.EventNotificationR\x11eventNotification\x12\x13\n" +
|
||||
|
||||
@@ -38,6 +38,12 @@ type MetadataFollowOption struct {
|
||||
// the server sends log file chunk fids instead of streaming events,
|
||||
// and the client reads directly from volume servers.
|
||||
LogFileReaderFn LogFileReaderFn
|
||||
// OnIdleHeartbeat, when non-nil, opts in to idle heartbeats: while the
|
||||
// subscriber is caught up the server periodically sends an empty response
|
||||
// carrying the current time, and this is called with that timestamp. It is
|
||||
// a freshness signal only and does not advance StartTsNs, so the resume
|
||||
// checkpoint stays on the last real event.
|
||||
OnIdleHeartbeat func(tsNs int64)
|
||||
}
|
||||
|
||||
type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
|
||||
@@ -77,6 +83,7 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc
|
||||
UntilNs: option.StopTsNs,
|
||||
ClientSupportsBatching: true,
|
||||
ClientSupportsMetadataChunks: option.LogFileReaderFn != nil,
|
||||
ClientSupportsIdleHeartbeat: option.OnIdleHeartbeat != nil,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("subscribe: %w", err)
|
||||
@@ -138,6 +145,17 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc
|
||||
pendingRefs = nil
|
||||
}
|
||||
|
||||
// Idle heartbeat: the source is caught up and has no new events, so
|
||||
// it sends an empty response carrying the current time. Surface it as
|
||||
// a freshness signal but leave option.StartTsNs untouched so a restart
|
||||
// still resumes from the last real event.
|
||||
if resp.EventNotification == nil && len(resp.Events) == 0 && resp.TsNs > 0 {
|
||||
if option.OnIdleHeartbeat != nil {
|
||||
option.OnIdleHeartbeat(resp.TsNs)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Process the first event (always present in top-level fields)
|
||||
if resp.EventNotification != nil {
|
||||
if err := processEventFn(resp); err != nil {
|
||||
|
||||
@@ -22,6 +22,12 @@ import (
|
||||
const (
|
||||
// MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered
|
||||
MaxUnsyncedEvents = 1e3
|
||||
|
||||
// idleHeartbeatInterval bounds how often a caught-up subscriber that asked
|
||||
// for idle heartbeats is reminded that the source is alive and has nothing
|
||||
// newer. It keeps freshness signals such as filer.sync's sync_offset metric
|
||||
// from looking stuck during read-only periods on the source.
|
||||
idleHeartbeatInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
// metadataStreamSender is satisfied by both gRPC stream types and pipelinedSender.
|
||||
@@ -186,7 +192,16 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
||||
|
||||
eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName)
|
||||
|
||||
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
||||
// lastSeenTsNs tracks how far the subscriber has read so idle heartbeats are
|
||||
// only emitted once it is caught up to the buffer head. It is read and
|
||||
// written from this single goroutine, so no synchronization is needed.
|
||||
var lastSeenTsNs int64
|
||||
var lastHeartbeatNs int64
|
||||
baseEachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
||||
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
|
||||
lastSeenTsNs = logEntry.TsNs
|
||||
return baseEachLogEntryFn(logEntry)
|
||||
}
|
||||
|
||||
var processedTsNs int64
|
||||
var readPersistedLogErr error
|
||||
@@ -248,7 +263,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
|
||||
return false
|
||||
default:
|
||||
}
|
||||
return fs.hasClient(req.ClientId, req.ClientEpoch)
|
||||
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
|
||||
return false
|
||||
}
|
||||
lastHeartbeatNs = fs.maybeSendIdleHeartbeat(req, sender, fs.filer.MetaAggregator.MetaLogBuffer, lastReadTime.Time.UnixNano(), lastSeenTsNs, lastHeartbeatNs)
|
||||
return true
|
||||
}, eachLogEntryFn)
|
||||
if readInMemoryLogErr != nil {
|
||||
if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) {
|
||||
@@ -314,7 +333,16 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
|
||||
|
||||
eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName)
|
||||
|
||||
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
||||
// lastSeenTsNs tracks how far the subscriber has read so idle heartbeats are
|
||||
// only emitted once it is caught up to the buffer head. It is read and
|
||||
// written from this single goroutine, so no synchronization is needed.
|
||||
var lastSeenTsNs int64
|
||||
var lastHeartbeatNs int64
|
||||
baseEachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
|
||||
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
|
||||
lastSeenTsNs = logEntry.TsNs
|
||||
return baseEachLogEntryFn(logEntry)
|
||||
}
|
||||
|
||||
var processedTsNs int64
|
||||
var readPersistedLogErr error
|
||||
@@ -400,7 +428,11 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
|
||||
return false
|
||||
default:
|
||||
}
|
||||
return fs.hasClient(req.ClientId, req.ClientEpoch)
|
||||
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
|
||||
return false
|
||||
}
|
||||
lastHeartbeatNs = fs.maybeSendIdleHeartbeat(req, sender, fs.filer.LocalMetaLogBuffer, lastReadTime.Time.UnixNano(), lastSeenTsNs, lastHeartbeatNs)
|
||||
return true
|
||||
}, eachLogEntryFn)
|
||||
if readInMemoryLogErr != nil {
|
||||
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
|
||||
@@ -474,6 +506,45 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati
|
||||
}
|
||||
}
|
||||
|
||||
// maybeSendIdleHeartbeat emits an empty response carrying the current time when
|
||||
// the subscriber has consumed everything up to the buffer head. The client uses
|
||||
// it to advance freshness signals (e.g. filer.sync's sync_offset) without moving
|
||||
// its resume checkpoint, so a restart still re-reads from the last real event.
|
||||
//
|
||||
// The catch-up floor is the max of two read-progress markers:
|
||||
// - readPositionTsNs: how far the read cursor has advanced. It starts at
|
||||
// SinceNs and also covers metadata-chunks mode, where persisted entries are
|
||||
// replayed as log file refs rather than through eachLogEntryFn.
|
||||
// - lastSeenTsNs: the timestamp of the most recent entry streamed in this
|
||||
// call. It advances live while reading the in-memory backlog, before the
|
||||
// read cursor returned by LoopProcessLogData has been updated.
|
||||
//
|
||||
// While the buffer head is past that floor the subscriber is still behind (e.g.
|
||||
// replaying a backlog) and no heartbeat is sent. Returns the (possibly advanced)
|
||||
// lastHeartbeatNs.
|
||||
func (fs *FilerServer) maybeSendIdleHeartbeat(req *filer_pb.SubscribeMetadataRequest, sender metadataStreamSender, logBuffer *log_buffer.LogBuffer, readPositionTsNs, lastSeenTsNs, lastHeartbeatNs int64) int64 {
|
||||
if !req.ClientSupportsIdleHeartbeat {
|
||||
return lastHeartbeatNs
|
||||
}
|
||||
floorTsNs := lastSeenTsNs
|
||||
if readPositionTsNs > floorTsNs {
|
||||
floorTsNs = readPositionTsNs
|
||||
}
|
||||
if logBuffer.LastTsNs.Load() > floorTsNs {
|
||||
// the buffer holds data the subscriber has not reached yet
|
||||
return lastHeartbeatNs
|
||||
}
|
||||
now := time.Now().UnixNano()
|
||||
if now-lastHeartbeatNs < int64(idleHeartbeatInterval) {
|
||||
return lastHeartbeatNs
|
||||
}
|
||||
if err := sender.Send(&filer_pb.SubscribeMetadataResponse{TsNs: now}); err != nil {
|
||||
glog.V(0).Infof("=> idle heartbeat to %s: %v", req.ClientName, err)
|
||||
return lastHeartbeatNs
|
||||
}
|
||||
return now
|
||||
}
|
||||
|
||||
// sendLogFileRefs collects persisted log file chunk references and sends them
|
||||
// to the client so it can read the data directly from volume servers.
|
||||
// This does zero volume server I/O — it only lists filer store directory entries.
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
|
||||
)
|
||||
|
||||
// slowStream simulates a gRPC stream with configurable per-Send latency.
|
||||
@@ -32,9 +33,13 @@ func (s *slowStream) Send(msg *filer_pb.SubscribeMetadataResponse) error {
|
||||
|
||||
type collectingStream struct {
|
||||
messages []*filer_pb.SubscribeMetadataResponse
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *collectingStream) Send(msg *filer_pb.SubscribeMetadataResponse) error {
|
||||
if s.err != nil {
|
||||
return s.err
|
||||
}
|
||||
s.messages = append(s.messages, msg)
|
||||
return nil
|
||||
}
|
||||
@@ -409,3 +414,85 @@ func TestPipelinedSingleVsParallelStreams(t *testing.T) {
|
||||
t.Logf("Speedup: %.1fx (%d parallel pipelined streams vs 1)", parallelRate/singleRate, numDirs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMaybeSendIdleHeartbeat(t *testing.T) {
|
||||
lb := log_buffer.NewLogBuffer("test", time.Minute, nil, nil, nil)
|
||||
defer lb.ShutdownLogBuffer()
|
||||
|
||||
fs := &FilerServer{}
|
||||
const recentEvent = int64(1_000_000)
|
||||
|
||||
t.Run("not opted in", func(t *testing.T) {
|
||||
lb.LastTsNs.Store(recentEvent)
|
||||
s := &collectingStream{}
|
||||
req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: false}
|
||||
got := fs.maybeSendIdleHeartbeat(req, s, lb, recentEvent, recentEvent, 0)
|
||||
if got != 0 || len(s.messages) != 0 {
|
||||
t.Fatalf("expected no heartbeat, got lastHeartbeat=%d msgs=%d", got, len(s.messages))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("behind buffer head", func(t *testing.T) {
|
||||
lb.LastTsNs.Store(recentEvent)
|
||||
s := &collectingStream{}
|
||||
req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: true}
|
||||
// startTs and lastSeen both below the buffer head: still replaying.
|
||||
got := fs.maybeSendIdleHeartbeat(req, s, lb, 0, recentEvent-1, 0)
|
||||
if got != 0 || len(s.messages) != 0 {
|
||||
t.Fatalf("expected no heartbeat while behind, got lastHeartbeat=%d msgs=%d", got, len(s.messages))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("caught up via lastSeen", func(t *testing.T) {
|
||||
lb.LastTsNs.Store(recentEvent)
|
||||
s := &collectingStream{}
|
||||
req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: true}
|
||||
got := fs.maybeSendIdleHeartbeat(req, s, lb, 0, recentEvent, 0)
|
||||
if len(s.messages) != 1 {
|
||||
t.Fatalf("expected one heartbeat, got %d", len(s.messages))
|
||||
}
|
||||
hb := s.messages[0]
|
||||
if hb.EventNotification != nil || len(hb.Events) != 0 || hb.TsNs <= 0 {
|
||||
t.Fatalf("heartbeat should be an empty timestamped response, got %+v", hb)
|
||||
}
|
||||
if got != hb.TsNs {
|
||||
t.Fatalf("expected returned lastHeartbeat %d to equal sent ts %d", got, hb.TsNs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("caught up via read position floor", func(t *testing.T) {
|
||||
// The read cursor has advanced past the buffer head while lastSeen stayed
|
||||
// 0. This is the idle-source case (subscribed from "now", read nothing) and
|
||||
// also metadata-chunks mode, where persisted entries replay as log file
|
||||
// refs and never reach eachLogEntryFn.
|
||||
lb.LastTsNs.Store(recentEvent)
|
||||
s := &collectingStream{}
|
||||
req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: true}
|
||||
readPosition := time.Now().UnixNano()
|
||||
got := fs.maybeSendIdleHeartbeat(req, s, lb, readPosition, 0, 0)
|
||||
if len(s.messages) != 1 || got <= 0 {
|
||||
t.Fatalf("expected heartbeat for caught-up subscriber, got msgs=%d lastHeartbeat=%d", len(s.messages), got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("throttled within interval", func(t *testing.T) {
|
||||
lb.LastTsNs.Store(recentEvent)
|
||||
s := &collectingStream{}
|
||||
req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: true}
|
||||
justSent := time.Now().UnixNano()
|
||||
got := fs.maybeSendIdleHeartbeat(req, s, lb, 0, recentEvent, justSent)
|
||||
if got != justSent || len(s.messages) != 0 {
|
||||
t.Fatalf("expected throttled (no send), got lastHeartbeat=%d msgs=%d", got, len(s.messages))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("send error keeps prior heartbeat time", func(t *testing.T) {
|
||||
lb.LastTsNs.Store(recentEvent)
|
||||
s := &collectingStream{err: fmt.Errorf("broken stream")}
|
||||
req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: true}
|
||||
got := fs.maybeSendIdleHeartbeat(req, s, lb, 0, recentEvent, 0)
|
||||
if got != 0 {
|
||||
t.Fatalf("expected lastHeartbeat unchanged on send error, got %d", got)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user