feat(s3/lifecycle): publish per-shard cursor + walker gauges and heartbeat (#9486)

Operator visibility was the last item on the daily-replay must-have
list. The `S3LifecycleCursorMinTsNs` gauge already existed but nothing
ever set it — leftover from the streaming worker that got deleted.
Wire it up and add a parallel one for the walker so a single PromQL
query answers "is this thing working?":

- `cursor_min_ts_ns{shard}` set after each cursor save. Operators read
  `now - cursor_min_ts_ns` as the per-shard replay lag.
- `daily_run_last_walked_ns{shard}` new — set in parallel so operators
  can confirm WalkerInterval is actually being honored. A stuck value
  means the scheduler isn't invoking the worker, the throttle is too
  long, or the walker is failing.
- saveCursorAndPublish wraps every Save call site in runShard so the
  gauges and the persisted state stay aligned (gauges only advance on
  successful saves).
- Enhance the `daily_run: status=... duration=...` heartbeat with
  `cursor_lag_max=` and `walked_max_age=` summary tokens for ops grep.
  Existing tokens stay positional-stable; new ones append at the end.
  Marker `cold` distinguishes "not started" from "0s caught up."

Tests pin the summary line: cold-start state, max-across-shards
selection, and partial-fill (some shards drained, others walked).

Stacked on #9485.
This commit is contained in:
Chris Lu
2026-05-13 14:18:35 -07:00
committed by GitHub
parent bbc075b353
commit d5e54f217d
3 changed files with 164 additions and 6 deletions

View File

@@ -0,0 +1,74 @@
package dailyrun
import (
"context"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestSummarizeShardCursorLag_AllColdStart pins the marker tokens for
// the worker's very first heartbeat — no cursor file exists yet, no
// walker has fired, the line must say "cold" rather than "0s" so
// operators can tell the difference between "I'm caught up" and
// "I haven't started." Both states would be 0s otherwise.
func TestSummarizeShardCursorLag_AllColdStart(t *testing.T) {
p := newMemPersister()
cfg := Config{Persister: p, Shards: []int{0, 1, 2}}
runNow := time.Unix(1_700_000_000, 0).UTC()
got := summarizeShardCursorLag(context.Background(), cfg, runNow)
assert.Equal(t, "cursor_lag_max=cold walked_max_age=cold", got)
}
// TestSummarizeShardCursorLag_PicksMaxAcrossShards confirms the worst-
// case lag wins, not the average. Operators alert on the worst shard;
// the cluster is only as healthy as its slowest one.
func TestSummarizeShardCursorLag_PicksMaxAcrossShards(t *testing.T) {
p := newMemPersister()
runNow := time.Unix(1_700_000_000, 0).UTC()
require.NoError(t, p.Save(context.Background(), 0, Cursor{
TsNs: runNow.Add(-5 * time.Minute).UnixNano(),
LastWalkedNs: runNow.Add(-30 * time.Minute).UnixNano(),
}))
require.NoError(t, p.Save(context.Background(), 1, Cursor{
TsNs: runNow.Add(-3 * time.Hour).UnixNano(), // worst cursor
LastWalkedNs: runNow.Add(-10 * time.Minute).UnixNano(),
}))
require.NoError(t, p.Save(context.Background(), 2, Cursor{
TsNs: runNow.Add(-30 * time.Minute).UnixNano(),
LastWalkedNs: runNow.Add(-2 * time.Hour).UnixNano(), // worst walker
}))
cfg := Config{Persister: p, Shards: []int{0, 1, 2}}
got := summarizeShardCursorLag(context.Background(), cfg, runNow)
assert.Contains(t, got, "cursor_lag_max=3h0m0s")
assert.Contains(t, got, "walked_max_age=2h0m0s")
}
// TestSummarizeShardCursorLag_PartialFill — some shards have cursors,
// some don't. Don't let the unwalked-yet shards contaminate the worst-
// case calculation; just compute the max over what we have.
func TestSummarizeShardCursorLag_PartialFill(t *testing.T) {
p := newMemPersister()
runNow := time.Unix(1_700_000_000, 0).UTC()
require.NoError(t, p.Save(context.Background(), 0, Cursor{
TsNs: runNow.Add(-15 * time.Minute).UnixNano(),
// LastWalkedNs not set — shard 0 hasn't walked yet
}))
require.NoError(t, p.Save(context.Background(), 1, Cursor{
// TsNs not set — shard 1 hasn't drained yet
LastWalkedNs: runNow.Add(-45 * time.Minute).UnixNano(),
}))
cfg := Config{Persister: p, Shards: []int{0, 1, 2}}
got := summarizeShardCursorLag(context.Background(), cfg, runNow)
// Only shard 0 contributes to cursor_lag_max; only shard 1 to walked_max_age.
assert.Contains(t, got, "cursor_lag_max=15m0s")
assert.Contains(t, got, "walked_max_age=45m0s")
// Shard 2 wasn't saved at all — it doesn't show up as either.
assert.False(t, strings.Contains(got, "cold"), "any-fill case should not emit cold markers")
}

View File

@@ -189,11 +189,60 @@ func Run(ctx context.Context, cfg Config) error {
if first != nil {
status = "error"
}
glog.V(0).Infof("daily_run: status=%s shards=%d errors=%d duration=%s",
status, len(cfg.Shards), errCount, time.Since(startedAt).Round(time.Millisecond))
// Summary line is parsed by operator scripts and CI log greps; keep
// the existing key=value tokens stable and append new ones at the
// end. cursor_lag_max / walked_max_age are the per-pass observability
// floor — for finer detail, scrape the per-shard gauges.
lagSummary := summarizeShardCursorLag(ctx, cfg, runNow)
glog.V(0).Infof("daily_run: status=%s shards=%d errors=%d duration=%s %s",
status, len(cfg.Shards), errCount, time.Since(startedAt).Round(time.Millisecond), lagSummary)
return first
}
// summarizeShardCursorLag walks the persisted cursors one more time
// after the run, computes the worst (largest) lag and walker-age across
// the cfg.Shards set, and renders them as key=value tokens. Returns
// a string suitable for appending to the heartbeat line. Load errors
// short-circuit to a marker token so the heartbeat doesn't lie about
// the state.
func summarizeShardCursorLag(ctx context.Context, cfg Config, runNow time.Time) string {
var (
maxLag time.Duration
maxAge time.Duration
anyCursor bool
anyWalked bool
)
for _, sh := range cfg.Shards {
c, found, err := cfg.Persister.Load(ctx, sh)
if err != nil || !found {
continue
}
if c.TsNs > 0 {
lag := runNow.Sub(time.Unix(0, c.TsNs))
if !anyCursor || lag > maxLag {
maxLag = lag
anyCursor = true
}
}
if c.LastWalkedNs > 0 {
age := runNow.Sub(time.Unix(0, c.LastWalkedNs))
if !anyWalked || age > maxAge {
maxAge = age
anyWalked = true
}
}
}
lagStr := "cursor_lag_max=cold"
if anyCursor {
lagStr = fmt.Sprintf("cursor_lag_max=%s", maxLag.Round(time.Second))
}
ageStr := "walked_max_age=cold"
if anyWalked {
ageStr = fmt.Sprintf("walked_max_age=%s", maxAge.Round(time.Second))
}
return lagStr + " " + ageStr
}
// computeGlobalStartTsNs scans every shard's persisted cursor and
// returns the minimum startTsNs the shared subscription must cover.
// For each shard the start point is the persisted cursor (steady state)
@@ -427,7 +476,7 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim
}
}
_ = walkedThisPass // empty-replay branch returns; no downstream walker.
return cfg.Persister.Save(ctx, shardID, Cursor{
return saveCursorAndPublish(ctx, cfg.Persister, shardID, Cursor{
TsNs: 0,
RuleSetHash: rsh,
PromotedHash: promoted,
@@ -464,7 +513,7 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim
PromotedHash: promoted,
LastWalkedNs: lastWalkedNs,
}
return cfg.Persister.Save(ctx, shardID, next)
return saveCursorAndPublish(ctx, cfg.Persister, shardID, next)
}
// Cold start: keep TsNs=0 so the drain below floors to
// runNow - maxTTL and the cursor is saved fresh after the run.
@@ -513,7 +562,7 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim
saveCtx, saveCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer saveCancel()
if drainErr != nil {
_ = cfg.Persister.Save(saveCtx, shardID, Cursor{TsNs: lastOK, RuleSetHash: rsh, PromotedHash: promoted, LastWalkedNs: lastWalkedNs})
_ = saveCursorAndPublish(saveCtx, cfg.Persister, shardID, Cursor{TsNs: lastOK, RuleSetHash: rsh, PromotedHash: promoted, LastWalkedNs: lastWalkedNs})
// passCtx timeout is the expected end-of-pass for an idle
// subscription; not a real error. Other drain errors still
// propagate.
@@ -523,7 +572,7 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim
return fmt.Errorf("shard=%d: drain: %w", shardID, drainErr)
}
return cfg.Persister.Save(saveCtx, shardID, Cursor{
return saveCursorAndPublish(saveCtx, cfg.Persister, shardID, Cursor{
TsNs: lastOK,
RuleSetHash: rsh,
PromotedHash: promoted,
@@ -531,6 +580,22 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim
})
}
// saveCursorAndPublish persists the cursor and, on success, updates the
// per-shard Prometheus gauges so operators can read cursor lag
// (now - cursor_min_ts_ns) and walker freshness (now - last_walked_ns)
// without having to read the cursor file directly. On save failure the
// gauges are left untouched: their last successful-save value is more
// useful than a value that doesn't match what's on the filer.
func saveCursorAndPublish(ctx context.Context, p CursorPersister, shardID int, c Cursor) error {
if err := p.Save(ctx, shardID, c); err != nil {
return err
}
label := strconv.Itoa(shardID)
stats.S3LifecycleCursorMinTsNs.WithLabelValues(label).Set(float64(c.TsNs))
stats.S3LifecycleDailyRunLastWalkedNs.WithLabelValues(label).Set(float64(c.LastWalkedNs))
return nil
}
// walkerDue answers the persisted-state throttle: has enough time
// elapsed since the last walker fire on this shard? interval == 0
// means "fire every pass" (the prior, unconditional behavior).

View File

@@ -625,6 +625,24 @@ var (
Name: "daily_run_events_scanned_total",
Help: "Counter of meta-log events drainShardEvents processed on the daily_replay path, partitioned by shard.",
}, []string{"shard"})
// S3LifecycleDailyRunLastWalkedNs is the per-shard wall-clock
// timestamp (UnixNano) of the most recent successful steady-state /
// empty-replay walker fire. Set by dailyrun.runShard after each
// cursor save. Zero means the shard hasn't completed a walk yet
// (either cold start, or the walker never fired because the bucket
// has only replay-eligible rules and the throttle hasn't elapsed).
// Operators read (now - last_walked_ns) to confirm the walker
// cadence matches WalkerInterval; a stuck value means the
// scheduler isn't invoking the worker, the throttle is too long,
// or the walker is failing.
S3LifecycleDailyRunLastWalkedNs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "s3_lifecycle",
Name: "daily_run_last_walked_ns",
Help: "Per-shard timestamp (UnixNano) of the most recent successful walker fire. 0 means the shard hasn't completed a walk yet.",
}, []string{"shard"})
)
func init() {
@@ -704,6 +722,7 @@ func init() {
Gather.MustRegister(S3LifecycleDispatchLimiterWaitSeconds)
Gather.MustRegister(S3LifecycleDailyRunShardDurationSeconds)
Gather.MustRegister(S3LifecycleDailyRunEventsScanned)
Gather.MustRegister(S3LifecycleDailyRunLastWalkedNs)
Gather.MustRegister(UploadErrorCounter)