diff --git a/weed/s3api/s3lifecycle/dailyrun/cursor_summary_test.go b/weed/s3api/s3lifecycle/dailyrun/cursor_summary_test.go new file mode 100644 index 000000000..4ac97c6b1 --- /dev/null +++ b/weed/s3api/s3lifecycle/dailyrun/cursor_summary_test.go @@ -0,0 +1,74 @@ +package dailyrun + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestSummarizeShardCursorLag_AllColdStart pins the marker tokens for +// the worker's very first heartbeat — no cursor file exists yet, no +// walker has fired, the line must say "cold" rather than "0s" so +// operators can tell the difference between "I'm caught up" and +// "I haven't started." Both states would be 0s otherwise. +func TestSummarizeShardCursorLag_AllColdStart(t *testing.T) { + p := newMemPersister() + cfg := Config{Persister: p, Shards: []int{0, 1, 2}} + runNow := time.Unix(1_700_000_000, 0).UTC() + + got := summarizeShardCursorLag(context.Background(), cfg, runNow) + assert.Equal(t, "cursor_lag_max=cold walked_max_age=cold", got) +} + +// TestSummarizeShardCursorLag_PicksMaxAcrossShards confirms the worst- +// case lag wins, not the average. Operators alert on the worst shard; +// the cluster is only as healthy as its slowest one. +func TestSummarizeShardCursorLag_PicksMaxAcrossShards(t *testing.T) { + p := newMemPersister() + runNow := time.Unix(1_700_000_000, 0).UTC() + require.NoError(t, p.Save(context.Background(), 0, Cursor{ + TsNs: runNow.Add(-5 * time.Minute).UnixNano(), + LastWalkedNs: runNow.Add(-30 * time.Minute).UnixNano(), + })) + require.NoError(t, p.Save(context.Background(), 1, Cursor{ + TsNs: runNow.Add(-3 * time.Hour).UnixNano(), // worst cursor + LastWalkedNs: runNow.Add(-10 * time.Minute).UnixNano(), + })) + require.NoError(t, p.Save(context.Background(), 2, Cursor{ + TsNs: runNow.Add(-30 * time.Minute).UnixNano(), + LastWalkedNs: runNow.Add(-2 * time.Hour).UnixNano(), // worst walker + })) + cfg := Config{Persister: p, Shards: []int{0, 1, 2}} + + got := summarizeShardCursorLag(context.Background(), cfg, runNow) + assert.Contains(t, got, "cursor_lag_max=3h0m0s") + assert.Contains(t, got, "walked_max_age=2h0m0s") +} + +// TestSummarizeShardCursorLag_PartialFill — some shards have cursors, +// some don't. Don't let the unwalked-yet shards contaminate the worst- +// case calculation; just compute the max over what we have. +func TestSummarizeShardCursorLag_PartialFill(t *testing.T) { + p := newMemPersister() + runNow := time.Unix(1_700_000_000, 0).UTC() + require.NoError(t, p.Save(context.Background(), 0, Cursor{ + TsNs: runNow.Add(-15 * time.Minute).UnixNano(), + // LastWalkedNs not set — shard 0 hasn't walked yet + })) + require.NoError(t, p.Save(context.Background(), 1, Cursor{ + // TsNs not set — shard 1 hasn't drained yet + LastWalkedNs: runNow.Add(-45 * time.Minute).UnixNano(), + })) + cfg := Config{Persister: p, Shards: []int{0, 1, 2}} + + got := summarizeShardCursorLag(context.Background(), cfg, runNow) + // Only shard 0 contributes to cursor_lag_max; only shard 1 to walked_max_age. + assert.Contains(t, got, "cursor_lag_max=15m0s") + assert.Contains(t, got, "walked_max_age=45m0s") + // Shard 2 wasn't saved at all — it doesn't show up as either. + assert.False(t, strings.Contains(got, "cold"), "any-fill case should not emit cold markers") +} diff --git a/weed/s3api/s3lifecycle/dailyrun/run.go b/weed/s3api/s3lifecycle/dailyrun/run.go index 61bd6496b..330ad48d6 100644 --- a/weed/s3api/s3lifecycle/dailyrun/run.go +++ b/weed/s3api/s3lifecycle/dailyrun/run.go @@ -189,11 +189,60 @@ func Run(ctx context.Context, cfg Config) error { if first != nil { status = "error" } - glog.V(0).Infof("daily_run: status=%s shards=%d errors=%d duration=%s", - status, len(cfg.Shards), errCount, time.Since(startedAt).Round(time.Millisecond)) + // Summary line is parsed by operator scripts and CI log greps; keep + // the existing key=value tokens stable and append new ones at the + // end. cursor_lag_max / walked_max_age are the per-pass observability + // floor — for finer detail, scrape the per-shard gauges. + lagSummary := summarizeShardCursorLag(ctx, cfg, runNow) + glog.V(0).Infof("daily_run: status=%s shards=%d errors=%d duration=%s %s", + status, len(cfg.Shards), errCount, time.Since(startedAt).Round(time.Millisecond), lagSummary) return first } +// summarizeShardCursorLag walks the persisted cursors one more time +// after the run, computes the worst (largest) lag and walker-age across +// the cfg.Shards set, and renders them as key=value tokens. Returns +// a string suitable for appending to the heartbeat line. Load errors +// short-circuit to a marker token so the heartbeat doesn't lie about +// the state. +func summarizeShardCursorLag(ctx context.Context, cfg Config, runNow time.Time) string { + var ( + maxLag time.Duration + maxAge time.Duration + anyCursor bool + anyWalked bool + ) + for _, sh := range cfg.Shards { + c, found, err := cfg.Persister.Load(ctx, sh) + if err != nil || !found { + continue + } + if c.TsNs > 0 { + lag := runNow.Sub(time.Unix(0, c.TsNs)) + if !anyCursor || lag > maxLag { + maxLag = lag + anyCursor = true + } + } + if c.LastWalkedNs > 0 { + age := runNow.Sub(time.Unix(0, c.LastWalkedNs)) + if !anyWalked || age > maxAge { + maxAge = age + anyWalked = true + } + } + } + lagStr := "cursor_lag_max=cold" + if anyCursor { + lagStr = fmt.Sprintf("cursor_lag_max=%s", maxLag.Round(time.Second)) + } + ageStr := "walked_max_age=cold" + if anyWalked { + ageStr = fmt.Sprintf("walked_max_age=%s", maxAge.Round(time.Second)) + } + return lagStr + " " + ageStr +} + // computeGlobalStartTsNs scans every shard's persisted cursor and // returns the minimum startTsNs the shared subscription must cover. // For each shard the start point is the persisted cursor (steady state) @@ -427,7 +476,7 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim } } _ = walkedThisPass // empty-replay branch returns; no downstream walker. - return cfg.Persister.Save(ctx, shardID, Cursor{ + return saveCursorAndPublish(ctx, cfg.Persister, shardID, Cursor{ TsNs: 0, RuleSetHash: rsh, PromotedHash: promoted, @@ -464,7 +513,7 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim PromotedHash: promoted, LastWalkedNs: lastWalkedNs, } - return cfg.Persister.Save(ctx, shardID, next) + return saveCursorAndPublish(ctx, cfg.Persister, shardID, next) } // Cold start: keep TsNs=0 so the drain below floors to // runNow - maxTTL and the cursor is saved fresh after the run. @@ -513,7 +562,7 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim saveCtx, saveCancel := context.WithTimeout(context.Background(), 5*time.Second) defer saveCancel() if drainErr != nil { - _ = cfg.Persister.Save(saveCtx, shardID, Cursor{TsNs: lastOK, RuleSetHash: rsh, PromotedHash: promoted, LastWalkedNs: lastWalkedNs}) + _ = saveCursorAndPublish(saveCtx, cfg.Persister, shardID, Cursor{TsNs: lastOK, RuleSetHash: rsh, PromotedHash: promoted, LastWalkedNs: lastWalkedNs}) // passCtx timeout is the expected end-of-pass for an idle // subscription; not a real error. Other drain errors still // propagate. @@ -523,7 +572,7 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim return fmt.Errorf("shard=%d: drain: %w", shardID, drainErr) } - return cfg.Persister.Save(saveCtx, shardID, Cursor{ + return saveCursorAndPublish(saveCtx, cfg.Persister, shardID, Cursor{ TsNs: lastOK, RuleSetHash: rsh, PromotedHash: promoted, @@ -531,6 +580,22 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim }) } +// saveCursorAndPublish persists the cursor and, on success, updates the +// per-shard Prometheus gauges so operators can read cursor lag +// (now - cursor_min_ts_ns) and walker freshness (now - last_walked_ns) +// without having to read the cursor file directly. On save failure the +// gauges are left untouched: their last successful-save value is more +// useful than a value that doesn't match what's on the filer. +func saveCursorAndPublish(ctx context.Context, p CursorPersister, shardID int, c Cursor) error { + if err := p.Save(ctx, shardID, c); err != nil { + return err + } + label := strconv.Itoa(shardID) + stats.S3LifecycleCursorMinTsNs.WithLabelValues(label).Set(float64(c.TsNs)) + stats.S3LifecycleDailyRunLastWalkedNs.WithLabelValues(label).Set(float64(c.LastWalkedNs)) + return nil +} + // walkerDue answers the persisted-state throttle: has enough time // elapsed since the last walker fire on this shard? interval == 0 // means "fire every pass" (the prior, unconditional behavior). diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 2083d78f3..2d6b2d576 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -625,6 +625,24 @@ var ( Name: "daily_run_events_scanned_total", Help: "Counter of meta-log events drainShardEvents processed on the daily_replay path, partitioned by shard.", }, []string{"shard"}) + + // S3LifecycleDailyRunLastWalkedNs is the per-shard wall-clock + // timestamp (UnixNano) of the most recent successful steady-state / + // empty-replay walker fire. Set by dailyrun.runShard after each + // cursor save. Zero means the shard hasn't completed a walk yet + // (either cold start, or the walker never fired because the bucket + // has only replay-eligible rules and the throttle hasn't elapsed). + // Operators read (now - last_walked_ns) to confirm the walker + // cadence matches WalkerInterval; a stuck value means the + // scheduler isn't invoking the worker, the throttle is too long, + // or the walker is failing. + S3LifecycleDailyRunLastWalkedNs = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: "s3_lifecycle", + Name: "daily_run_last_walked_ns", + Help: "Per-shard timestamp (UnixNano) of the most recent successful walker fire. 0 means the shard hasn't completed a walk yet.", + }, []string{"shard"}) ) func init() { @@ -704,6 +722,7 @@ func init() { Gather.MustRegister(S3LifecycleDispatchLimiterWaitSeconds) Gather.MustRegister(S3LifecycleDailyRunShardDurationSeconds) Gather.MustRegister(S3LifecycleDailyRunEventsScanned) + Gather.MustRegister(S3LifecycleDailyRunLastWalkedNs) Gather.MustRegister(UploadErrorCounter)