diff --git a/weed/command/filer_sync_jobs_test.go b/weed/command/filer_sync_jobs_test.go index 7b532fc86..8066cca08 100644 --- a/weed/command/filer_sync_jobs_test.go +++ b/weed/command/filer_sync_jobs_test.go @@ -4,6 +4,7 @@ import ( "container/heap" "fmt" "testing" + "time" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -582,3 +583,30 @@ func BenchmarkConflictCheck(b *testing.B) { }) } } + +// TestMetadataProcessorEmptyMarkerKeepsWatermarkStale: the MaxUnsyncedEvents +// marker (empty EventNotification, fresh timestamp) is dropped by AddSyncJob and +// does NOT advance processedTsWatermark, so offsetFunc keeps publishing the stale +// offset. This is why the client must not drive sync_offset off the watermark +// for these markers. +func TestMetadataProcessorEmptyMarkerKeepsWatermarkStale(t *testing.T) { + const staleOffset = int64(1_000_000_000) + freshTs := staleOffset + int64(time.Hour) // a "now"-ish source timestamp + + p := NewMetadataProcessor(func(*filer_pb.SubscribeMetadataResponse) error { return nil }, 4, staleOffset) + + marker := &filer_pb.SubscribeMetadataResponse{ + TsNs: freshTs, + EventNotification: &filer_pb.EventNotification{}, + } + if !filer_pb.IsEmpty(marker) { + t.Fatal("marker should be IsEmpty") + } + + p.AddSyncJob(marker) + + if got := p.processedTsWatermark.Load(); got != staleOffset { + t.Fatalf("empty marker advanced watermark to %d; want it to stay stale at %d", got, staleOffset) + } + t.Logf("marker carried fresh ts %d but watermark stayed stale at %d", freshTs, staleOffset) +} diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index 9305b87e7..40c3b04a7 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -145,14 +145,19 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc pendingRefs = nil } - // Idle heartbeat: the source is caught up and has no new events, so - // it sends an empty response carrying the current time. Surface it as - // a freshness signal but leave option.StartTsNs untouched so a restart - // still resumes from the last real event. - if resp.EventNotification == nil && len(resp.Events) == 0 && resp.TsNs > 0 { + // Freshness signals carry a timestamp but no entry: the idle heartbeat + // (nil EventNotification) and the MaxUnsyncedEvents marker (empty one). + // Route both to OnIdleHeartbeat, else the marker pins sync_offset to the + // stale processed watermark. + if len(resp.Events) == 0 && resp.TsNs > 0 && (resp.EventNotification == nil || filer_pb.IsEmpty(resp)) { if option.OnIdleHeartbeat != nil { option.OnIdleHeartbeat(resp.TsNs) } + // Marker advances the resume cursor past the filtered range; the + // heartbeat leaves StartTsNs put so a restart cannot outrun a straggler. + if resp.EventNotification != nil { + option.StartTsNs = resp.TsNs + } continue } diff --git a/weed/pb/filer_pb_tail_test.go b/weed/pb/filer_pb_tail_test.go new file mode 100644 index 000000000..e647165a6 --- /dev/null +++ b/weed/pb/filer_pb_tail_test.go @@ -0,0 +1,125 @@ +package pb + +import ( + "context" + "io" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "google.golang.org/grpc" +) + +// fakeSubscribeStream plays back a scripted sequence of responses, one per Recv. +type fakeSubscribeStream struct { + grpc.ClientStream + responses []*filer_pb.SubscribeMetadataResponse + delay time.Duration + idx int +} + +func (s *fakeSubscribeStream) Recv() (*filer_pb.SubscribeMetadataResponse, error) { + if s.idx >= len(s.responses) { + return nil, io.EOF + } + if s.idx > 0 && s.delay > 0 { + // advance the wall clock so AddOffsetFunc's interval gate opens + time.Sleep(s.delay) + } + r := s.responses[s.idx] + s.idx++ + return r, nil +} + +// fakeFilerClient only needs SubscribeMetadata; the embedded nil interface +// covers the rest, which makeSubscribeMetadataFunc never calls. +type fakeFilerClient struct { + filer_pb.SeaweedFilerClient + stream *fakeSubscribeStream +} + +func (c *fakeFilerClient) SubscribeMetadata(ctx context.Context, in *filer_pb.SubscribeMetadataRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[filer_pb.SubscribeMetadataResponse], error) { + return c.stream, nil +} + +// TestFilerSyncOffsetStaysFreshOnFilteredMarker: on a read-only watched path with +// a busy source, the MaxUnsyncedEvents marker (empty EventNotification, fresh +// timestamp) must be treated as a freshness signal, not fed to the offset path +// where it would pin the gauge to the stale watermark. Wiring mirrors filer.sync. +func TestFilerSyncOffsetStaysFreshOnFilteredMarker(t *testing.T) { + const oldEventTs = int64(1_000_000_000) // t0: last real synced event (stale) + nowTs := time.Now().UnixNano() // current source time + markerTs := nowTs + int64(time.Second) + + var watermark = oldEventTs // MetadataProcessor.processedTsWatermark + + type gaugeWrite struct { + src string + ts int64 + } + var timeline []gaugeWrite + var heartbeatCalls, markerToProcessFn int + + // AddSyncJob drops empty events and does not advance the watermark; the real + // processor is checked in command.TestMetadataProcessorEmptyMarkerKeepsWatermarkStale. + realProcessFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + if filer_pb.IsEmpty(resp) { + markerToProcessFn++ + return nil + } + watermark = resp.TsNs + return nil + } + // offsetFunc publishes the watermark to the gauge (filer_sync.go). + processEventFn := AddOffsetFunc(realProcessFn, 0, func(counter, lastTsNs int64) error { + timeline = append(timeline, gaugeWrite{"offset", watermark}) + return nil + }) + + option := &MetadataFollowOption{ + ClientName: "syncFrom_A_To_B", + StartTsNs: oldEventTs, + EventErrorType: DontLogError, + OnIdleHeartbeat: func(tsNs int64) { + heartbeatCalls++ + timeline = append(timeline, gaugeWrite{"heartbeat", tsNs}) + }, + } + + stream := &fakeSubscribeStream{ + delay: 2 * time.Millisecond, + responses: []*filer_pb.SubscribeMetadataResponse{ + // real create on the watched path, long ago -> watermark = t0 + {Directory: "/watched", TsNs: oldEventTs, EventNotification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file"}, + }}, + // genuine idle heartbeat + {TsNs: nowTs}, + // MaxUnsyncedEvents marker: empty EventNotification, fresh timestamp + {TsNs: markerTs, EventNotification: &filer_pb.EventNotification{}}, + }, + } + + if err := makeSubscribeMetadataFunc(option, processEventFn)(&fakeFilerClient{stream: stream}); err != nil { + t.Fatalf("follow: %v", err) + } + + t.Logf("gauge timeline: %+v", timeline) + + // the marker fires OnIdleHeartbeat instead of reaching processEventFn + if markerToProcessFn != 0 { + t.Errorf("empty marker must not reach processEventFn, got %d calls", markerToProcessFn) + } + if heartbeatCalls != 2 { + t.Errorf("expected OnIdleHeartbeat for both the heartbeat and the marker (2), got %d", heartbeatCalls) + } + if option.StartTsNs != markerTs { + t.Errorf("marker should advance StartTsNs to %d, got %d", markerTs, option.StartTsNs) + } + + // gauge stays fresh: last write is the marker's timestamp, not the stale watermark + last := timeline[len(timeline)-1] + if last.src != "heartbeat" || last.ts != markerTs { + t.Fatalf("expected final gauge write fresh at %d, got %+v (spike is back if stale %d)", markerTs, last, oldEventTs) + } +} diff --git a/weed/server/filer_grpc_server_sub_meta_test.go b/weed/server/filer_grpc_server_sub_meta_test.go index 45a776a22..74b0f908b 100644 --- a/weed/server/filer_grpc_server_sub_meta_test.go +++ b/weed/server/filer_grpc_server_sub_meta_test.go @@ -496,3 +496,47 @@ func TestMaybeSendIdleHeartbeat(t *testing.T) { } }) } + +// TestFilteredEventsEmitMaxUnsyncedMarker pins the source-side shape the client +// keys off: after MaxUnsyncedEvents filtered events, eachEventNotificationFn +// emits a marker with a fresh timestamp and a non-nil but empty EventNotification. +// Consumed by TestFilerSyncOffsetStaysFreshOnFilteredMarker. +func TestFilteredEventsEmitMaxUnsyncedMarker(t *testing.T) { + fs := &FilerServer{ + option: &FilerOption{Host: pb.ServerAddress("127.0.0.1:8888")}, + filer: &filer.Filer{Signature: 123}, + } + req := &filer_pb.SubscribeMetadataRequest{ClientName: "syncFrom_A_To_B", PathPrefix: "/watched/"} + + stream := &collectingStream{} + eachEventFn := fs.eachEventNotificationFn(req, stream, "client") + + base := time.Now().UnixNano() + var lastTsNs int64 + // Feed MaxUnsyncedEvents+1 events on a NON-watched path so every one is filtered. + total := int(MaxUnsyncedEvents) + 1 + for i := 0; i < total; i++ { + lastTsNs = base + int64(i) + err := eachEventFn("/other/dir", &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: fmt.Sprintf("file%d", i)}, + }, lastTsNs) + if err != nil { + t.Fatalf("eachEventFn: %v", err) + } + } + + if len(stream.messages) != 1 { + t.Fatalf("expected exactly 1 MaxUnsyncedEvents marker, got %d", len(stream.messages)) + } + marker := stream.messages[0] + if !filer_pb.IsEmpty(marker) { + t.Errorf("marker should have empty EventNotification (IsEmpty), got %+v", marker.EventNotification) + } + if marker.EventNotification == nil { + t.Error("marker EventNotification should be non-nil but empty (the shape the client keys off)") + } + if marker.TsNs != lastTsNs { + t.Errorf("marker TsNs = %d, want fresh source ts %d", marker.TsNs, lastTsNs) + } + t.Logf("source emits marker{EventNotification:&{}, TsNs:%d} after %d filtered events", marker.TsNs, total) +}