mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
fix(s3/lifecycle): trust persisted cursor; never bump past pending events
The drain freezes cursorAdvanceTo at the last pre-skip event so pending matches (DueTime > runNow) re-enter the subscription next pass. Combined with the new cursor persistence, the floor bump (runNow - maxTTL) then orphans the very events the drain stopped at. Concrete: a rule with TTL == maxTTL fires at runNow == PUT_TIME + maxTTL, so floor (= runNow - maxTTL) lands exactly on PUT_TIME. If the last advance saved a cursor right before the not-yet-due PUT (e.g., keep2/* between expire1/* and expire3/* on the same shard), the floor bump on pass 9 skips past the expire3 event itself — the worker never re-reads it. Test symptom: expire3/* never expires when worker shards include other earlier no-match events. Cold start (found=false) still subscribes from runNow - maxTTL. Steady state honors the cursor verbatim.
This commit is contained in:
@@ -272,11 +272,17 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim
|
||||
}
|
||||
|
||||
// Cold start: scan from now-maxTTL so already-due objects within
|
||||
// meta-log retention still expire.
|
||||
// meta-log retention still expire. In steady state honor the
|
||||
// cursor as-is: the drain freezes the cursor at the last pre-skip
|
||||
// event so pending matches with DueTime == TsNs+maxTTL stay in
|
||||
// scope across passes. Bumping forward to runNow-maxTTL would
|
||||
// orphan exactly those events (the test_lifecyclev2_expiration
|
||||
// regression: cursor saved at the no-match event right before
|
||||
// the not-yet-due expire3 PUT, then floor at runNow=PUT+maxTTL
|
||||
// equals PUT — bumping past the expire3 event itself).
|
||||
startTsNs := persisted.TsNs
|
||||
floor := runNow.Add(-maxTTL).UnixNano()
|
||||
if startTsNs < floor {
|
||||
startTsNs = floor
|
||||
if !found {
|
||||
startTsNs = runNow.Add(-maxTTL).UnixNano()
|
||||
}
|
||||
|
||||
lastOK, _, drainErr := drainShardEvents(ctx, cfg, runNow, shardID, snap, startTsNs)
|
||||
|
||||
Reference in New Issue
Block a user