Files
Chris Lu 2f7ac1d664 feat(s3/lifecycle): NoncurrentVersionExpiration via bootstrap (Phase 5b/3) (#9383)
* feat(s3/lifecycle): NoncurrentVersionExpiration via bootstrap (Phase 5b/3)

Bootstrap now expands every <key>.versions/ directory into one event
per version with sibling state pre-computed. The router fires
NoncurrentDays / NewerNoncurrent off these events using
SuccessorModTime as the noncurrent clock; previously these rules
never ran on a versioned bucket because buildObjectInfo couldn't
classify version-folder events without the latest pointer.

Mechanics

walkBucketDir treats a directory ending in .versions and carrying
ExtLatestVersionIdKey as a SeaweedFS .versions container — emit it
once and skip the recursion. Coincidentally-named directories without
the latest pointer recurse normally.

BucketBootstrapper.expandVersionsDir lists the children, sorts
newest-first by mtime, resolves the latest position from the pointer,
and injects a synthesized reader.Event per version with
BootstrapVersion populated. NoncurrentIndex is 0-based among
noncurrents in newest-first order; SuccessorModTime is the immediate
newer sibling's mtime (zero for the latest). Pointer naming a missing
or absent version falls back to the newest-by-mtime sibling so a
race window can't flag every entry as noncurrent.

routeBootstrapVersion uses BootstrapVersion to build ObjectInfo
directly (bypassing the version-folder skip in buildObjectInfo) and
runs the standard match loop. ABORT_MPU is excluded by kind-shape
gate. The schedule clock uses SuccessorModTime for noncurrents and
ModTime for the latest, so the dispatcher fires when the rule's days
threshold is met. Match.ObjectKey is the LOGICAL key,
Match.VersionID is the marker's stored version_id — the dispatcher
reaches deleteSpecificObjectVersion or createDeleteMarker correctly.

Layer 2 tests cover both sides. Router: latest fires ExpirationDays;
noncurrent fires NoncurrentDays; NewerNoncurrentVersions retains the
N newest noncurrents; ABORT_MPU never matches. Bootstrap: .versions
dir emitted once and not recursed; missing latest pointer falls back
to newest; backdated PUT (latest pointer is older by mtime) keeps
the right noncurrent index; delete-marker flag propagates.

* fix(s3/lifecycle): no VersionID for latest expirations, child-based dir disambig

Two correctness gaps in Phase 5b/3.

Bootstrap was pinning the version_id on every Match. For
EXPIRATION_DAYS / EXPIRATION_DATE on the latest version this is
unsafe: between schedule and dispatch a fresh PUT can land, the
dispatcher would still identity-match against the original version's
bytes (it still exists at that path) and the resulting delete marker
would hide the new latest. Drop VersionID for those kinds; an empty
VersionID makes the dispatcher fetch the current latest, where
identity-CAS resolves to STALE_IDENTITY and bootstrap re-schedules
with the new latest's identity. NoncurrentDays / NewerNoncurrent /
EXPIRED_DELETE_MARKER still pin the version_id since those are
version-targeted.

isVersionsDir gating on ExtLatestVersionIdKey lost a race window:
createDeleteMarker writes the version file before updating the
parent's Extended pointer, so a walk between those two steps would
see a .versions/ dir without the pointer, recurse into it, and emit
raw version files that the router drops. Match the suffix only and
let expandVersionsDir disambiguate by child inspection: if any child
carries ExtVersionIdKey it's a real .versions container and we expand;
otherwise it's a coincidentally-named user folder and we recurse via
the bucket-walk's own callback so nested entries still flow through.

Tests: latest-expiration assertion flipped to expect empty VersionID;
new tests cover the coincidentally-named-folder recursion and the
race-window expansion (children present, pointer absent).

* fix(s3/lifecycle): filter directory + missing-version-id children at listing

expandVersionsDir's listing callback collected every child with
attributes; subdirectories or entries without ExtVersionIdKey would
make it past the empty-id skip in the inner loop but still inflate
NumVersions and skew NoncurrentIndex (the rank derives from the
filtered slice's position, which was wrong when the unfiltered slice
was sorted). Drop directories at listing time and partition the
file children into a versions slice that's the actual rank source.

Test cleanups: out-of-order-mtime test now sets v1 older than v2 so
latestPos > 0 actually exercises the rank-skip branch in
expandVersionsDir; bootstrapVersionEntry preserves nanosecond
precision via MtimeNs to match markerLoneEntry's pattern; drop a
leftover unused idx variable.

* fix(s3/lifecycle): null version + canonical version-id tiebreak

Two correctness gaps in Phase 5b/3 bootstrap.

Null versions live at the bare logical path, not under .versions/.
Bootstrap previously expanded only .versions/<key>/ children, so:
  - pre-versioning objects with newer .versions/ history never had
    their null version expired by NoncurrentDays
  - suspended-bucket writes (which clear the .versions/ latest pointer
    so null becomes current) had every .versions/ child wrongly
    classified as latest by the buildObjectInfo fallback

expandVersionsDir now looks up the bare key via NewFullPath +
LookupEntry, accepts a regular file or an explicit S3 directory-key
marker (Mime set), and folds it into the sibling set with
VersionID="null". Latest resolution: pointer present + names a real
id wins; pointer absent + null exists makes null latest; otherwise
falls back to newest sibling. The walker's regular emission for the
bare entry would otherwise duplicate, so walkBucketDir now does a
two-pass walk per directory level — .versions/ first, then everything
else with a per-walk skipBare set keyed by bucket-relative path that
expandVersionsDir populates when it claims a bare null sibling.

Sort tiebreak: PUTs only set second-level Mtime, so two versions
written in the same second tied. The unstable secondary order let
old-format version filenames sort oldest-first and corrupt
NoncurrentIndex under NewerNoncurrentVersions retention. Add
CompareVersionIds to s3lifecycle/version_time.go (mirrors the
canonical comparator in s3api/s3api_version_id.go to avoid the
import cycle) and use it as a secondary key after mtime equality.

Tests: pre-versioning null-as-noncurrent, suspended null-as-current,
directory-key marker as null version, end-to-end claim through
walkBucketDir's two-pass ordering, and same-second tiebreak via
canonical version-id ordering. fakeFilerClient grows a
LookupDirectoryEntry implementation backed by the same in-memory tree.

* fix(s3/lifecycle): only treat explicit-null bare entries as current

The pointer-missing branch in expandVersionsDir made null latest as
soon as a bare object was found. That's correct for suspended-bucket
writes (s3api_object_handlers_put.go writes the bare entry with
ExtVersionIdKey="null") but wrong for the pre-versioning race window:
a brand-new version under .versions/<file> exists before the parent's
ExtLatestVersionIdKey update lands, and a pre-versioning bare object
has no version-id marker. Marking that older bare object latest hides
the real new version and skips noncurrent expiration of the null
until the next process restart/bootstrap.

Distinguish the two: lookupNullVersion now returns whether the bare
entry's Extended map carries ExtVersionIdKey="null" (the suspended
write marker). expandVersionsDir's pointer-missing branch only
promotes null to latest when explicit; otherwise it falls back to
newest-sibling, which is safe for the race window since the new
version's mtime is fresher than the bare object's.

The existing suspended-null test now uses a new helper that adds the
explicit marker. New regression test covers the race window: bare
entry without the marker + a fresh .versions/<v1> file + missing
parent pointer must keep v1 as latest and the null as noncurrent.

* fix(s3/lifecycle): only the newest item can be the explicit-null latest

The pointer-missing branch in expandVersionsDir scanned every item for
an explicit null and promoted it to latest. After a suspended->enabled
transition that's the wrong call: createVersion writes the version
file before updating ExtLatestVersionIdKey, so a bootstrap that lands
in the race window sees an older bare null with ExtVersionIdKey="null"
plus a newer .versions/<v-new> child and no parent pointer. Promoting
the null misclassifies v-new as noncurrent and skips both the new
version's current-version expiration and the null's noncurrent
scheduling until the next bootstrap.

Constrain the explicit-null branch to items[0]: if the suspended-null
write is genuinely current it'll be the newest by mtime AND tagged.
Anything else falls through to the newest-sibling default.

Adds a regression test for the suspended->re-enabled race.

* fix(s3/lifecycle): paginate bootstrap directory listings

SeaweedList(..., limit=0) is a single-page request: the filer caps
limit=0 at DirListingLimit (1000 by default) and returns whatever fits
in one round trip. expandVersionsDir and walkBucketDir both relied on
that, so any directory bigger than the cap silently truncated. For
noncurrent retention this is correctness, not just scale — a hot key
with more versions than the cap had its rank/sort math computed off
the first page only, NumVersions, NoncurrentIndex, SuccessorModTime,
and the latest-fallback all wrong, with the older versions never
scheduled until a future bootstrap.

Add a listAll helper that drives pagination via StartFromFileName +
inclusive=false, looping until a page returns fewer entries than the
configured page size. Use it in both call sites. Page size is a var
(listPageSize, default 1024) so tests can shrink it without
generating thousands of entries.

The fake filer client now mirrors the real semantics: sort children
by name, honor StartFromFileName/InclusiveStartFrom, cap at Limit.
New regression tests force a small page size and assert the full
result set is processed and the call count matches what pagination
should drive.

* perf(s3/lifecycle): stream bucket walk in two passes instead of buffering

walkBucketDir was paginating into a children slice and then iterating
twice (pass 1: .versions/, pass 2: everything else). For flat buckets
with millions of entries the buffer is a real memory spike. Drop the
materialization: each pass now drives its own listAll over the same
directory and acts on entries as they stream in. The skipBare ordering
contract is preserved — pass 2 still runs after pass 1 finishes — and
the per-pass paging keeps memory bounded by listPageSize.

Tradeoff: each directory level is listed twice. For workloads where
that matters more than the memory headroom, we can revisit; the
correctness/scale dial here is what the noncurrent rules need.

Updated three tests for the new call count: each walk now records 2
listings per directory (pass 1 + pass 2). The KickOffNew dedup tests
expect 2 calls per bucket; the pagination test expects 6 instead of 3.
2026-05-09 10:48:32 -07:00

269 lines
8.1 KiB
Go

package reader
import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
)
// Event is one in-shard meta-log event delivered to the router.
//
// BootstrapVersion is set only by the bucket bootstrapper when it
// expands a .versions/ directory; the meta-log path leaves it nil.
// Carries pre-computed sibling state so the router can fire
// NoncurrentDays / NewerNoncurrent without listing again.
type Event struct {
TsNs int64
Bucket string
Key string
ShardID int
OldEntry *filer_pb.Entry
NewEntry *filer_pb.Entry
NewParent string
BootstrapVersion *BootstrapVersion
}
// BootstrapVersion is the per-version state computed once per
// .versions/<key>/ directory at bootstrap time. Key fields shape
// EvaluateAction: IsLatest gates current vs. noncurrent rules,
// NoncurrentIndex gates NewerNoncurrentVersions retention,
// SuccessorModTime sets the noncurrent clock (when this version was
// replaced).
type BootstrapVersion struct {
LogicalKey string
VersionID string
IsLatest bool
IsDeleteMarker bool
NumVersions int
NoncurrentIndex int // 0 = newest noncurrent
SuccessorModTime time.Time
}
// IsDelete reports whether this event removes an entry.
func (e *Event) IsDelete() bool {
return e.NewEntry == nil && e.OldEntry != nil
}
// IsCreate reports whether this event creates an entry.
func (e *Event) IsCreate() bool {
return e.OldEntry == nil && e.NewEntry != nil
}
// Reader subscribes to the filer meta-log and emits in-range Events to a
// channel. One subscription handles a contiguous span (or arbitrary set)
// of shards via ShardPredicate; the downstream router/dispatcher consume
// events and ack-advance the per-shard cursor for matched ActionKeys
// when their actions complete.
type Reader struct {
// ShardID and ShardPredicate are alternatives — set at most one.
// ShardPredicate wins if both are populated.
ShardID int // [0, s3lifecycle.ShardCount); used when ShardPredicate is nil
ShardPredicate func(int) bool // accepts an event when true; nil falls back to ShardID equality
BucketsPath string // e.g. "/buckets"
// Cursor is the single-shard cursor used for SinceNs when StartTsNs is 0.
// Range callers pass StartTsNs directly and leave Cursor nil; SinceNs=0
// then means "subscribe from the start of the meta-log".
Cursor *Cursor
StartTsNs int64
Events chan<- *Event
// EventBudget caps how many events Run processes before returning nil.
// Zero = unbounded; the run continues until ctx cancellation or stream
// error. Used by the worker scheduler to bound a single READ task.
EventBudget int
// bucketsPathSlash is BucketsPath with a guaranteed trailing slash,
// computed once on Run and reused per event to avoid recomputing the
// normalized prefix in extractBucketKey.
bucketsPathSlash string
}
// Run subscribes via SubscribeMetadata starting at the configured position,
// filters to the configured shard set, and emits Events. Returns on
// ctx.Done(), io.EOF, or stream error. Caller is responsible for closing
// Events if it owns it.
func (r *Reader) Run(ctx context.Context, client filer_pb.SeaweedFilerClient, clientName string, clientID int32) error {
if r.ShardPredicate == nil {
if r.ShardID < 0 || r.ShardID >= s3lifecycle.ShardCount {
return fmt.Errorf("reader: shard_id %d out of range and no ShardPredicate", r.ShardID)
}
}
if r.Events == nil {
return errors.New("reader: nil Events channel")
}
if r.BucketsPath == "" {
return errors.New("reader: empty BucketsPath")
}
r.bucketsPathSlash = r.BucketsPath
if !strings.HasSuffix(r.bucketsPathSlash, "/") {
r.bucketsPathSlash += "/"
}
sinceNs := r.StartTsNs
if sinceNs == 0 && r.Cursor != nil {
sinceNs = r.Cursor.MinTsNs()
}
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: clientName,
PathPrefix: r.BucketsPath,
SinceNs: sinceNs,
ClientId: clientID,
ClientSupportsBatching: true,
})
if err != nil {
return fmt.Errorf("subscribe: %w", err)
}
processed := 0
for {
resp, recvErr := stream.Recv()
if recvErr == io.EOF {
return nil
}
if recvErr != nil {
return recvErr
}
// First event in resp is the primary; resp.Events is the batched tail.
if err := r.dispatchOne(ctx, resp, &processed); err != nil {
return err
}
for _, ev := range resp.Events {
if err := r.dispatchOne(ctx, ev, &processed); err != nil {
return err
}
}
if r.EventBudget > 0 && processed >= r.EventBudget {
return nil
}
}
}
func (r *Reader) dispatchOne(ctx context.Context, resp *filer_pb.SubscribeMetadataResponse, processed *int) error {
if resp == nil || resp.EventNotification == nil {
return nil
}
bucket, key, ok := r.extractBucketKey(resp)
if !ok {
return nil
}
shardID := s3lifecycle.ShardID(bucket, key)
if r.ShardPredicate != nil {
if !r.ShardPredicate(shardID) {
return nil
}
} else if shardID != r.ShardID {
return nil
}
ev := &Event{
TsNs: resp.TsNs,
Bucket: bucket,
Key: key,
ShardID: shardID,
OldEntry: resp.EventNotification.OldEntry,
NewEntry: resp.EventNotification.NewEntry,
NewParent: resp.EventNotification.NewParentPath,
}
select {
case <-ctx.Done():
return ctx.Err()
case r.Events <- ev:
*processed++
return nil
}
}
// extractBucketKey turns a meta-log event's path into (bucket, key) when the
// event lies under BucketsPath. Returns ok=false for events outside that
// subtree (cluster admin entries, system files, etc.) so the reader can skip
// them without engaging the routing index.
//
// The path is reconstructed as DirectoryPath/Name, where DirectoryPath comes
// from the entry context and Name from old_entry/new_entry. We prefer
// new_entry on creates/updates and old_entry on deletes; both carry the same
// Name on renames where new_parent_path differs.
func (r *Reader) extractBucketKey(resp *filer_pb.SubscribeMetadataResponse) (string, string, bool) {
notif := resp.EventNotification
dir := notif.NewParentPath
if dir == "" {
// On deletes, NewParentPath may be empty; the directory is encoded
// in resp.Directory.
dir = resp.Directory
}
var name string
switch {
case notif.NewEntry != nil:
name = notif.NewEntry.Name
case notif.OldEntry != nil:
name = notif.OldEntry.Name
default:
return "", "", false
}
// Pre-normalized prefix (BucketsPath with trailing slash) is computed
// once in Run; bucket-root events arrive as either "/buckets" or
// "/buckets/", so accept both. The fallback path mirrors Run's
// normalization for tests that call extractBucketKey directly.
prefix := r.bucketsPathSlash
if prefix == "" {
prefix = r.BucketsPath
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
}
bare := strings.TrimSuffix(prefix, "/")
var rest string
switch {
case dir == bare || dir == prefix:
// Bucket create/delete at /buckets root: bucket name is the entry name.
if name == "" {
return "", "", false
}
return name, "", true
case strings.HasPrefix(dir, prefix):
rest = dir[len(prefix):]
default:
return "", "", false
}
// rest = "<bucket>" or "<bucket>/<sub>/<sub>..."
slash := strings.IndexByte(rest, '/')
var bucket, parentInBucket string
if slash < 0 {
bucket = rest
} else {
bucket = rest[:slash]
parentInBucket = rest[slash+1:]
}
if bucket == "" {
return "", "", false
}
if parentInBucket != "" {
return bucket, parentInBucket + "/" + name, true
}
return bucket, name, true
}
// LogStartup is a small helper for callers that want a one-line readable
// description of where the reader is starting.
func (r *Reader) LogStartup() {
sinceNs := r.StartTsNs
if sinceNs == 0 && r.Cursor != nil {
sinceNs = r.Cursor.MinTsNs()
}
if r.ShardPredicate != nil {
glog.V(1).Infof("lifecycle reader: shard=range sinceNs=%d budget=%d", sinceNs, r.EventBudget)
return
}
glog.V(1).Infof("lifecycle reader: shard=%d sinceNs=%d budget=%d",
r.ShardID, sinceNs, r.EventBudget)
}