diff --git a/weed/s3api/s3api_internal_lifecycle.go b/weed/s3api/s3api_internal_lifecycle.go index 6dd562f16..dff87bafe 100644 --- a/weed/s3api/s3api_internal_lifecycle.go +++ b/weed/s3api/s3api_internal_lifecycle.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "path" "strings" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -102,6 +103,23 @@ func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle if req.VersionId == "" { return blocked("FATAL_EVENT_ERROR: version_id required for noncurrent / delete-marker delete"), nil } + // Latest-pointer guard for noncurrent kinds: refuse to delete + // the version that the .versions/ directory currently points + // to. The router can't always tell current from noncurrent + // without sibling state, so the server checks here. + if req.ActionKind == s3_lifecycle_pb.ActionKind_NONCURRENT_DAYS || + req.ActionKind == s3_lifecycle_pb.ActionKind_NEWER_NONCURRENT { + isLatest, lookupErr := s3a.isCurrentLatestVersion(req.Bucket, req.ObjectPath, req.VersionId) + if lookupErr != nil { + if errors.Is(lookupErr, filer_pb.ErrNotFound) || errors.Is(lookupErr, ErrObjectNotFound) { + return noopResolved("NOT_FOUND"), nil + } + return retryLater("TRANSPORT_ERROR: latest-pointer lookup: " + lookupErr.Error()), nil + } + if isLatest { + return noopResolved("VERSION_IS_LATEST"), nil + } + } if err := s3a.deleteSpecificObjectVersion(req.Bucket, req.ObjectPath, req.VersionId); err != nil { if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrObjectNotFound) { return noopResolved("NOT_FOUND_AT_DELETE"), nil @@ -157,6 +175,34 @@ func (s3a *S3ApiServer) lifecycleAbortMPU(ctx context.Context, req *s3_lifecycle return done(), nil } +// isCurrentLatestVersion reports whether versionId is the version the +// .versions/ directory currently points to. SeaweedFS records the latest +// version on the parent directory's Extended map; without consulting it, +// a noncurrent-kind dispatch can't safely distinguish current from +// noncurrent and would risk deleting the live version. Returns +// (false, nil) when the directory has no latest pointer (e.g., the +// bucket isn't versioned in this object's history). +func (s3a *S3ApiServer) isCurrentLatestVersion(bucket, object, versionId string) (bool, error) { + versionsDir := s3a.bucketDir(bucket) + "/" + object + s3_constants.VersionsFolder + parent, name := path.Split(versionsDir) + parent = strings.TrimRight(parent, "/") + if parent == "" { + parent = "/" + } + entry, err := s3a.getEntry(parent, name) + if err != nil { + return false, err + } + if entry == nil || len(entry.Extended) == 0 { + return false, nil + } + latest, ok := entry.Extended[s3_constants.ExtLatestVersionIdKey] + if !ok { + return false, nil + } + return string(latest) == versionId, nil +} + // computeEntryIdentity captures (mtime, size, head fid, sorted-Extended hash): // an overwrite changes mtime/size/fid; a metadata edit changes Extended; a // snapshot-restore that preserves mtime+size still differs in head_fid. diff --git a/weed/s3api/s3lifecycle/router/router.go b/weed/s3api/s3lifecycle/router/router.go index 569d80b0b..0c61da313 100644 --- a/weed/s3api/s3lifecycle/router/router.go +++ b/weed/s3api/s3lifecycle/router/router.go @@ -14,6 +14,11 @@ import ( // Match is one (event, action) pair where EvaluateAction fired. The // dispatcher runs `LifecycleDelete` at DueTime; identity-CAS in the RPC // guards against drift between schedule time and dispatch time. +// +// For NoncurrentDays / NewerNoncurrent on a versioned bucket, ObjectKey +// is the seaweedfs storage path (logical-key + ".versions/" + version_id) +// so the dispatcher can locate the specific version, and VersionID +// carries the AWS-visible version ID separately. type Match struct { Key s3lifecycle.ActionKey Action *engine.CompiledAction @@ -56,7 +61,7 @@ func Route(snap *engine.Snapshot, ev *reader.Event, now time.Time) []Match { if len(keys) == 0 { return nil } - info := buildObjectInfo(ev) + info := buildObjectInfo(ev, snap.BucketVersioned(ev.Bucket)) if info == nil { return nil } @@ -107,19 +112,25 @@ func Route(snap *engine.Snapshot, ev *reader.Event, now time.Time) []Match { return matches } -// buildObjectInfo derives a non-versioned ObjectInfo from a meta-log event. -// Versioned-bucket semantics (IsLatest, NumVersions, NoncurrentIndex, -// IsDeleteMarker for noncurrent versions) require listing siblings and land -// in Phase 5; for now any non-MPU event is treated as IsLatest=true with the -// LifecycleDelete RPC's identity-CAS catching stale schedules. +// buildObjectInfo derives an ObjectInfo from a meta-log event and the +// bucket's versioning state. Returns nil when the event has no usable +// shape (missing attributes, hard delete already handled upstream). // -// MPU init directories at .uploads/ populate IsMPUInit and use the -// destination object key from the entry's Extended map for filter matching, -// so a rule with Filter.Prefix=foo/ matches an MPU uploading to foo/bar.txt. +// On a versioned bucket the storage layout (.versions/v_) is +// shared between the current latest and the noncurrent versions; the +// latest pointer lives in the .versions/ directory's Extended map and +// is updated separately. Without that pointer-transition signal here, +// the router conservatively classifies every event as if it were the +// current version (IsLatest=true) so it never deletes the live +// latest; bootstrap walking + the server-side dispatch guard handle +// noncurrent retention. NumVersions=0 keeps ExpiredObjectDeleteMarker +// (which requires sole-survivor) suppressed. // -// Returns nil when Attributes are missing — without ModTime, EvaluateAction -// would compute due against year-0001 and fire immediately. -func buildObjectInfo(ev *reader.Event) *s3lifecycle.ObjectInfo { +// MPU init directories at .uploads/ populate IsMPUInit and +// use the destination object key from the entry's Extended map for +// filter matching, so a rule with Filter.Prefix=foo/ matches an MPU +// uploading to foo/bar.txt. +func buildObjectInfo(ev *reader.Event, versioned bool) *s3lifecycle.ObjectInfo { entry := ev.NewEntry if entry == nil || entry.Attributes == nil { return nil @@ -135,6 +146,11 @@ func buildObjectInfo(ev *reader.Event) *s3lifecycle.ObjectInfo { IsMPUInit: true, } } + // Directory entries that aren't MPU inits aren't lifecycle subjects: + // the .versions/ folder itself, prefix dirs, etc. — emit nothing. + if entry.IsDirectory { + return nil + } info := &s3lifecycle.ObjectInfo{ Key: ev.Key, ModTime: time.Unix(entry.Attributes.Mtime, int64(entry.Attributes.MtimeNs)), @@ -142,6 +158,25 @@ func buildObjectInfo(ev *reader.Event) *s3lifecycle.ObjectInfo { IsLatest: true, NumVersions: 1, } + if versioned { + // On a versioned bucket the actual file path doesn't tell us + // whether the entry is the current latest or a noncurrent + // version — the latest pointer lives in the .versions/ + // directory's Extended map and isn't part of this event. We + // also can't compute NumVersions / NoncurrentIndex here. Skip + // any version-folder file event for now; bootstrap walking + // drives noncurrent retention and current-version expiration + // for versioned buckets until pointer-transition routing + // lands. The bare-key path (null-version, pre-versioning + // objects) keeps the regular routing. + if isVersionFolderPath(ev.Key) { + return nil + } + // NumVersions=0 keeps ExpiredObjectDeleteMarker (sole-survivor + // gate) suppressed for the bare-key delete-marker case until + // sibling listing lands. + info.NumVersions = 0 + } if tags := extractTags(entry.Extended); len(tags) > 0 { info.Tags = tags } @@ -151,6 +186,22 @@ func buildObjectInfo(ev *reader.Event) *s3lifecycle.ObjectInfo { return info } +// isVersionFolderPath reports whether the bucket-relative key sits inside a +// .versions/ folder — i.e. the path's parent segment ends with the +// VersionsFolder suffix. Used by the versioned-bucket gate so the router +// skips version-file events that need sibling state to be classified +// safely. +func isVersionFolderPath(key string) bool { + idx := strings.LastIndex(key, "/") + if idx <= 0 { + return false + } + parent := key[:idx] + parentIdx := strings.LastIndex(parent, "/") + leaf := parent[parentIdx+1:] + return strings.HasSuffix(leaf, s3_constants.VersionsFolder) +} + // mpuInitInfo recognizes a multipart-upload init: a directory entry at // `.uploads/` carrying the destination key in Extended. Sub-events // for part uploads (deeper paths under the upload directory) are deliberately diff --git a/weed/s3api/s3lifecycle/router/router_test.go b/weed/s3api/s3lifecycle/router/router_test.go index 0b243a7e3..592485484 100644 --- a/weed/s3api/s3lifecycle/router/router_test.go +++ b/weed/s3api/s3lifecycle/router/router_test.go @@ -398,3 +398,167 @@ func TestRouteRegularObjectUnderDualRuleSkipsAbortMPU(t *testing.T) { t.Fatalf("ActionKind=%v, want ExpirationDays", got) } } + +func compileWithVersioned(rule *s3lifecycle.Rule, prior map[s3lifecycle.ActionKey]engine.PriorState) *engine.Snapshot { + e := engine.New() + return e.Compile([]engine.CompileInput{{Bucket: "bk", Rules: []*s3lifecycle.Rule{rule}, Versioned: true}}, + engine.CompileOptions{PriorStates: prior}) +} + +func TestRouteVersionedNoncurrentEventDoesNotFireFromRouter(t *testing.T) { + // Versioned bucket: the storage layout .versions/v_ is + // shared between the current latest and noncurrent versions, and + // the latest pointer lives in the parent directory's metadata — + // not on the version file itself. The router cannot distinguish + // without consulting the .versions/ directory, so it must not + // emit NONCURRENT_* matches; bootstrap (with sibling listing) is + // responsible for those. + rule := &s3lifecycle.Rule{ + ID: "r", + Status: s3lifecycle.StatusEnabled, + NoncurrentVersionExpirationDays: 7, + } + snap := compileWithVersioned(rule, activatedPrior(rule)) + + now := time.Now() + old := now.AddDate(0, 0, -30) + ev := eventCreate("bk", "logs/foo.versions/v_v1", old.Unix(), 1, old.UnixNano()) + ev.NewEntry.Extended = map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte("v1"), + } + + if got := Route(snap, ev, now); len(got) != 0 { + t.Fatalf("router must not emit noncurrent matches yet, got %v", got) + } +} + +func TestRouteVersionedCurrentEventStaysLatest(t *testing.T) { + // Versioned bucket, ExpirationDays rule. The current-version event + // arrives at the bare ; nothing about its path looks like a + // .versions/ entry, so IsLatest stays true and the rule fires. + rule := &s3lifecycle.Rule{ID: "r", Status: s3lifecycle.StatusEnabled, ExpirationDays: 1} + snap := compileWithVersioned(rule, activatedPrior(rule)) + + now := time.Now() + old := now.AddDate(0, 0, -2) + ev := eventCreate("bk", "logs/foo", old.Unix(), 1, old.UnixNano()) + + matches := Route(snap, ev, now) + if len(matches) != 1 { + t.Fatalf("expected 1 match (EXPIRATION_DAYS), got %v", matches) + } + if matches[0].Key.ActionKind != s3lifecycle.ActionKindExpirationDays { + t.Fatalf("ActionKind=%v, want ExpirationDays", matches[0].Key.ActionKind) + } + if matches[0].VersionID != "" { + t.Fatalf("VersionID=%q, want empty for current-version path", matches[0].VersionID) + } +} + +func TestRouteNonVersionedBucketIgnoresVersionsSuffix(t *testing.T) { + // A non-versioned bucket happens to have an object literally named + // "logs/foo.versions/v1" — that's just a regular path. The router + // must NOT classify it as noncurrent or set IsLatest=false; the + // rule fires as a normal current-object expiration. + rule := &s3lifecycle.Rule{ID: "r", Status: s3lifecycle.StatusEnabled, ExpirationDays: 1} + snap := compileWith(rule, activatedPrior(rule)) + + now := time.Now() + old := now.AddDate(0, 0, -2) + ev := eventCreate("bk", "logs/foo.versions/v1", old.Unix(), 1, old.UnixNano()) + + matches := Route(snap, ev, now) + if len(matches) != 1 { + t.Fatalf("expected 1 match, got %v", matches) + } + if matches[0].VersionID != "" { + t.Fatalf("VersionID=%q, want empty for non-versioned bucket", matches[0].VersionID) + } + if matches[0].ObjectKey != "logs/foo.versions/v1" { + t.Fatalf("ObjectKey=%q, want unchanged for non-versioned bucket", matches[0].ObjectKey) + } +} + +func TestRouteVersionedExpiredDeleteMarkerSuppressedWithoutSiblings(t *testing.T) { + // ExpiredObjectDeleteMarker requires NumVersions==1 — the marker is + // the sole-survivor. Without sibling listing the router can't + // confirm that, so the rule must NOT fire just because the latest + // is a delete marker. A future PR adds sibling listing. + rule := &s3lifecycle.Rule{ + ID: "r", + Status: s3lifecycle.StatusEnabled, + ExpiredObjectDeleteMarker: true, + } + snap := compileWithVersioned(rule, activatedPrior(rule)) + + now := time.Now() + old := now.AddDate(0, 0, -1) + ev := eventCreate("bk", "logs/gone", old.Unix(), 0, old.UnixNano()) + ev.NewEntry.Extended = map[string][]byte{ + s3_constants.ExtDeleteMarkerKey: {1}, + } + + if got := Route(snap, ev, now); len(got) != 0 { + t.Fatalf("ExpiredDeleteMarker without sibling count must not fire, got %v", got) + } +} + +func TestRouteVersionedAllVersionFolderPathsSkipped(t *testing.T) { + // On a versioned bucket the router skips every event whose parent + // directory name ends with ".versions" — both the version files + // SeaweedFS itself writes (logs/foo.versions/v_v1) and any literal + // key the user happens to put under such a parent — because the + // current-vs-noncurrent classification needs the .versions/ + // directory's latest pointer, which isn't carried by these events. + // Bootstrap covers retention for those entries. + rule := &s3lifecycle.Rule{ID: "r", Status: s3lifecycle.StatusEnabled, ExpirationDays: 1} + snap := compileWithVersioned(rule, activatedPrior(rule)) + + now := time.Now() + old := now.AddDate(0, 0, -2) + cases := []struct { + name string + key string + isDir bool + ext map[string][]byte + }{ + { + name: "tracked version file", + key: "logs/foo.versions/v_v1", + ext: map[string][]byte{s3_constants.ExtVersionIdKey: []byte("v1")}, + }, + { + name: "literal-key collision", + key: "logs/backup.versions/2023", + }, + { + name: "bucket-root .versions", + key: ".versions/v_v1", + ext: map[string][]byte{s3_constants.ExtVersionIdKey: []byte("v1")}, + }, + { + // The .versions/ folder itself is a directory entry; the + // router must not emit ObjectInfo for it. Without the + // directory short-circuit it would route as a regular + // object and the dispatcher would target a directory path. + name: "versions dir itself", + key: "logs/foo.versions", + isDir: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ev := eventCreate("bk", tc.key, old.Unix(), 1, old.UnixNano()) + if tc.ext != nil { + ev.NewEntry.Extended = tc.ext + } + if tc.isDir { + ev.NewEntry.IsDirectory = true + } + if got := Route(snap, ev, now); len(got) != 0 { + t.Fatalf("version-folder event should be skipped, got %v", got) + } + }) + } +} +