diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 06547cd02..29fbd2262 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -439,6 +439,13 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi StartTsNs: sourceFilerOffsetTsNs, StopTsNs: 0, EventErrorType: pb.RetryForeverOnError, + // While the source has only read activity it emits no metadata events, so + // the watermark above never advances and sync_offset would look stuck. + // The idle heartbeat moves the gauge to the source's current time once we + // are caught up, so now-sync_offset reflects real lag and stays alertable. + OnIdleHeartbeat: func(tsNs int64) { + statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(tsNs)) + }, } return pb.FollowMetadata(sourceFiler, sourceGrpcDialOption, metadataFollowOption, processEventFnWithOffset) diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index 15c2aa429..2dbb32888 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -421,6 +421,7 @@ message SubscribeMetadataRequest { repeated string directories = 10; // exact directory to watch bool client_supports_batching = 11; // client can unpack SubscribeMetadataResponse.events bool client_supports_metadata_chunks = 12; // client can read log file chunks from volume servers + bool client_supports_idle_heartbeat = 13; // server may send empty responses carrying the current time while the client is caught up } message SubscribeMetadataResponse { string directory = 1; diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index f9d3dc6c7..2610a2f8f 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v7.34.1 +// protoc v6.33.4 // source: filer.proto package filer_pb @@ -3079,6 +3079,7 @@ type SubscribeMetadataRequest struct { Directories []string `protobuf:"bytes,10,rep,name=directories,proto3" json:"directories,omitempty"` // exact directory to watch ClientSupportsBatching bool `protobuf:"varint,11,opt,name=client_supports_batching,json=clientSupportsBatching,proto3" json:"client_supports_batching,omitempty"` // client can unpack SubscribeMetadataResponse.events ClientSupportsMetadataChunks bool `protobuf:"varint,12,opt,name=client_supports_metadata_chunks,json=clientSupportsMetadataChunks,proto3" json:"client_supports_metadata_chunks,omitempty"` // client can read log file chunks from volume servers + ClientSupportsIdleHeartbeat bool `protobuf:"varint,13,opt,name=client_supports_idle_heartbeat,json=clientSupportsIdleHeartbeat,proto3" json:"client_supports_idle_heartbeat,omitempty"` // server may send empty responses carrying the current time while the client is caught up unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -3190,6 +3191,13 @@ func (x *SubscribeMetadataRequest) GetClientSupportsMetadataChunks() bool { return false } +func (x *SubscribeMetadataRequest) GetClientSupportsIdleHeartbeat() bool { + if x != nil { + return x.ClientSupportsIdleHeartbeat + } + return false +} + type SubscribeMetadataResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` @@ -5753,7 +5761,7 @@ const file_filer_proto_rawDesc = "" + "\vfiler_group\x18\r \x01(\tR\n" + "filerGroup\x12#\n" + "\rmajor_version\x18\x0e \x01(\x05R\fmajorVersion\x12#\n" + - "\rminor_version\x18\x0f \x01(\x05R\fminorVersion\"\xb8\x03\n" + + "\rminor_version\x18\x0f \x01(\x05R\fminorVersion\"\xfd\x03\n" + "\x18SubscribeMetadataRequest\x12\x1f\n" + "\vclient_name\x18\x01 \x01(\tR\n" + "clientName\x12\x1f\n" + @@ -5768,7 +5776,8 @@ const file_filer_proto_rawDesc = "" + "\vdirectories\x18\n" + " \x03(\tR\vdirectories\x128\n" + "\x18client_supports_batching\x18\v \x01(\bR\x16clientSupportsBatching\x12E\n" + - "\x1fclient_supports_metadata_chunks\x18\f \x01(\bR\x1cclientSupportsMetadataChunks\"\x96\x02\n" + + "\x1fclient_supports_metadata_chunks\x18\f \x01(\bR\x1cclientSupportsMetadataChunks\x12C\n" + + "\x1eclient_supports_idle_heartbeat\x18\r \x01(\bR\x1bclientSupportsIdleHeartbeat\"\x96\x02\n" + "\x19SubscribeMetadataResponse\x12\x1c\n" + "\tdirectory\x18\x01 \x01(\tR\tdirectory\x12J\n" + "\x12event_notification\x18\x02 \x01(\v2\x1b.filer_pb.EventNotificationR\x11eventNotification\x12\x13\n" + diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index 8b49b6d11..9305b87e7 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -38,6 +38,12 @@ type MetadataFollowOption struct { // the server sends log file chunk fids instead of streaming events, // and the client reads directly from volume servers. LogFileReaderFn LogFileReaderFn + // OnIdleHeartbeat, when non-nil, opts in to idle heartbeats: while the + // subscriber is caught up the server periodically sends an empty response + // carrying the current time, and this is called with that timestamp. It is + // a freshness signal only and does not advance StartTsNs, so the resume + // checkpoint stays on the last real event. + OnIdleHeartbeat func(tsNs int64) } type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error @@ -77,6 +83,7 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc UntilNs: option.StopTsNs, ClientSupportsBatching: true, ClientSupportsMetadataChunks: option.LogFileReaderFn != nil, + ClientSupportsIdleHeartbeat: option.OnIdleHeartbeat != nil, }) if err != nil { return fmt.Errorf("subscribe: %w", err) @@ -138,6 +145,17 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc pendingRefs = nil } + // Idle heartbeat: the source is caught up and has no new events, so + // it sends an empty response carrying the current time. Surface it as + // a freshness signal but leave option.StartTsNs untouched so a restart + // still resumes from the last real event. + if resp.EventNotification == nil && len(resp.Events) == 0 && resp.TsNs > 0 { + if option.OnIdleHeartbeat != nil { + option.OnIdleHeartbeat(resp.TsNs) + } + continue + } + // Process the first event (always present in top-level fields) if resp.EventNotification != nil { if err := processEventFn(resp); err != nil { diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 89c5f2324..6bec4e2c7 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -22,6 +22,12 @@ import ( const ( // MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered MaxUnsyncedEvents = 1e3 + + // idleHeartbeatInterval bounds how often a caught-up subscriber that asked + // for idle heartbeats is reminded that the source is alive and has nothing + // newer. It keeps freshness signals such as filer.sync's sync_offset metric + // from looking stuck during read-only periods on the source. + idleHeartbeatInterval = 5 * time.Second ) // metadataStreamSender is satisfied by both gRPC stream types and pipelinedSender. @@ -186,7 +192,16 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName) - eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + // lastSeenTsNs tracks how far the subscriber has read so idle heartbeats are + // only emitted once it is caught up to the buffer head. It is read and + // written from this single goroutine, so no synchronization is needed. + var lastSeenTsNs int64 + var lastHeartbeatNs int64 + baseEachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (bool, error) { + lastSeenTsNs = logEntry.TsNs + return baseEachLogEntryFn(logEntry) + } var processedTsNs int64 var readPersistedLogErr error @@ -248,7 +263,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return false default: } - return fs.hasClient(req.ClientId, req.ClientEpoch) + if !fs.hasClient(req.ClientId, req.ClientEpoch) { + return false + } + lastHeartbeatNs = fs.maybeSendIdleHeartbeat(req, sender, fs.filer.MetaAggregator.MetaLogBuffer, lastReadTime.Time.UnixNano(), lastSeenTsNs, lastHeartbeatNs) + return true }, eachLogEntryFn) if readInMemoryLogErr != nil { if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) { @@ -314,7 +333,16 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq eachEventNotificationFn := fs.eachEventNotificationFn(req, sender, clientName) - eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + // lastSeenTsNs tracks how far the subscriber has read so idle heartbeats are + // only emitted once it is caught up to the buffer head. It is read and + // written from this single goroutine, so no synchronization is needed. + var lastSeenTsNs int64 + var lastHeartbeatNs int64 + baseEachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (bool, error) { + lastSeenTsNs = logEntry.TsNs + return baseEachLogEntryFn(logEntry) + } var processedTsNs int64 var readPersistedLogErr error @@ -400,7 +428,11 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq return false default: } - return fs.hasClient(req.ClientId, req.ClientEpoch) + if !fs.hasClient(req.ClientId, req.ClientEpoch) { + return false + } + lastHeartbeatNs = fs.maybeSendIdleHeartbeat(req, sender, fs.filer.LocalMetaLogBuffer, lastReadTime.Time.UnixNano(), lastSeenTsNs, lastHeartbeatNs) + return true }, eachLogEntryFn) if readInMemoryLogErr != nil { if readInMemoryLogErr == log_buffer.ResumeFromDiskError { @@ -474,6 +506,45 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } +// maybeSendIdleHeartbeat emits an empty response carrying the current time when +// the subscriber has consumed everything up to the buffer head. The client uses +// it to advance freshness signals (e.g. filer.sync's sync_offset) without moving +// its resume checkpoint, so a restart still re-reads from the last real event. +// +// The catch-up floor is the max of two read-progress markers: +// - readPositionTsNs: how far the read cursor has advanced. It starts at +// SinceNs and also covers metadata-chunks mode, where persisted entries are +// replayed as log file refs rather than through eachLogEntryFn. +// - lastSeenTsNs: the timestamp of the most recent entry streamed in this +// call. It advances live while reading the in-memory backlog, before the +// read cursor returned by LoopProcessLogData has been updated. +// +// While the buffer head is past that floor the subscriber is still behind (e.g. +// replaying a backlog) and no heartbeat is sent. Returns the (possibly advanced) +// lastHeartbeatNs. +func (fs *FilerServer) maybeSendIdleHeartbeat(req *filer_pb.SubscribeMetadataRequest, sender metadataStreamSender, logBuffer *log_buffer.LogBuffer, readPositionTsNs, lastSeenTsNs, lastHeartbeatNs int64) int64 { + if !req.ClientSupportsIdleHeartbeat { + return lastHeartbeatNs + } + floorTsNs := lastSeenTsNs + if readPositionTsNs > floorTsNs { + floorTsNs = readPositionTsNs + } + if logBuffer.LastTsNs.Load() > floorTsNs { + // the buffer holds data the subscriber has not reached yet + return lastHeartbeatNs + } + now := time.Now().UnixNano() + if now-lastHeartbeatNs < int64(idleHeartbeatInterval) { + return lastHeartbeatNs + } + if err := sender.Send(&filer_pb.SubscribeMetadataResponse{TsNs: now}); err != nil { + glog.V(0).Infof("=> idle heartbeat to %s: %v", req.ClientName, err) + return lastHeartbeatNs + } + return now +} + // sendLogFileRefs collects persisted log file chunk references and sends them // to the client so it can read the data directly from volume servers. // This does zero volume server I/O — it only lists filer store directory entries. diff --git a/weed/server/filer_grpc_server_sub_meta_test.go b/weed/server/filer_grpc_server_sub_meta_test.go index 5a60c5761..45a776a22 100644 --- a/weed/server/filer_grpc_server_sub_meta_test.go +++ b/weed/server/filer_grpc_server_sub_meta_test.go @@ -10,6 +10,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" ) // slowStream simulates a gRPC stream with configurable per-Send latency. @@ -32,9 +33,13 @@ func (s *slowStream) Send(msg *filer_pb.SubscribeMetadataResponse) error { type collectingStream struct { messages []*filer_pb.SubscribeMetadataResponse + err error } func (s *collectingStream) Send(msg *filer_pb.SubscribeMetadataResponse) error { + if s.err != nil { + return s.err + } s.messages = append(s.messages, msg) return nil } @@ -409,3 +414,85 @@ func TestPipelinedSingleVsParallelStreams(t *testing.T) { t.Logf("Speedup: %.1fx (%d parallel pipelined streams vs 1)", parallelRate/singleRate, numDirs) } } + +func TestMaybeSendIdleHeartbeat(t *testing.T) { + lb := log_buffer.NewLogBuffer("test", time.Minute, nil, nil, nil) + defer lb.ShutdownLogBuffer() + + fs := &FilerServer{} + const recentEvent = int64(1_000_000) + + t.Run("not opted in", func(t *testing.T) { + lb.LastTsNs.Store(recentEvent) + s := &collectingStream{} + req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: false} + got := fs.maybeSendIdleHeartbeat(req, s, lb, recentEvent, recentEvent, 0) + if got != 0 || len(s.messages) != 0 { + t.Fatalf("expected no heartbeat, got lastHeartbeat=%d msgs=%d", got, len(s.messages)) + } + }) + + t.Run("behind buffer head", func(t *testing.T) { + lb.LastTsNs.Store(recentEvent) + s := &collectingStream{} + req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: true} + // startTs and lastSeen both below the buffer head: still replaying. + got := fs.maybeSendIdleHeartbeat(req, s, lb, 0, recentEvent-1, 0) + if got != 0 || len(s.messages) != 0 { + t.Fatalf("expected no heartbeat while behind, got lastHeartbeat=%d msgs=%d", got, len(s.messages)) + } + }) + + t.Run("caught up via lastSeen", func(t *testing.T) { + lb.LastTsNs.Store(recentEvent) + s := &collectingStream{} + req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: true} + got := fs.maybeSendIdleHeartbeat(req, s, lb, 0, recentEvent, 0) + if len(s.messages) != 1 { + t.Fatalf("expected one heartbeat, got %d", len(s.messages)) + } + hb := s.messages[0] + if hb.EventNotification != nil || len(hb.Events) != 0 || hb.TsNs <= 0 { + t.Fatalf("heartbeat should be an empty timestamped response, got %+v", hb) + } + if got != hb.TsNs { + t.Fatalf("expected returned lastHeartbeat %d to equal sent ts %d", got, hb.TsNs) + } + }) + + t.Run("caught up via read position floor", func(t *testing.T) { + // The read cursor has advanced past the buffer head while lastSeen stayed + // 0. This is the idle-source case (subscribed from "now", read nothing) and + // also metadata-chunks mode, where persisted entries replay as log file + // refs and never reach eachLogEntryFn. + lb.LastTsNs.Store(recentEvent) + s := &collectingStream{} + req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: true} + readPosition := time.Now().UnixNano() + got := fs.maybeSendIdleHeartbeat(req, s, lb, readPosition, 0, 0) + if len(s.messages) != 1 || got <= 0 { + t.Fatalf("expected heartbeat for caught-up subscriber, got msgs=%d lastHeartbeat=%d", len(s.messages), got) + } + }) + + t.Run("throttled within interval", func(t *testing.T) { + lb.LastTsNs.Store(recentEvent) + s := &collectingStream{} + req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: true} + justSent := time.Now().UnixNano() + got := fs.maybeSendIdleHeartbeat(req, s, lb, 0, recentEvent, justSent) + if got != justSent || len(s.messages) != 0 { + t.Fatalf("expected throttled (no send), got lastHeartbeat=%d msgs=%d", got, len(s.messages)) + } + }) + + t.Run("send error keeps prior heartbeat time", func(t *testing.T) { + lb.LastTsNs.Store(recentEvent) + s := &collectingStream{err: fmt.Errorf("broken stream")} + req := &filer_pb.SubscribeMetadataRequest{ClientSupportsIdleHeartbeat: true} + got := fs.maybeSendIdleHeartbeat(req, s, lb, 0, recentEvent, 0) + if got != 0 { + t.Fatalf("expected lastHeartbeat unchanged on send error, got %d", got) + } + }) +}