mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-25 11:10:20 +00:00
feat(s3/lifecycle): classify versioned events by storage path (Phase 5b/1) (#9373)
* feat(s3/lifecycle/router): classify versioned events by storage path Phase 5b first slice. Pass the bucket's Versioned flag from the engine snapshot into buildObjectInfo and: - Recognize <key>.versions/<vid> events as noncurrent versions. IsLatest=false, info.Key strips the .versions/<vid> suffix so a rule's Filter.Prefix matches the user's logical key, and the AWS-visible version_id rides on Match.VersionID for the dispatcher to target a single version on the server. - Read IsDeleteMarker from Extended unconditionally — the engine rejects ExpiredObjectDeleteMarker when NumVersions != 1, so without sibling listing the marker case stays correctly suppressed (a separate PR will add the listing). - Non-versioned buckets keep the existing behavior even when an object literally named "*.versions/v1" exists; Versioned=false short-circuits the path classification. Time-based NoncurrentDays now fires on noncurrent events. NewerNoncurrent and ExpiredObjectDeleteMarker still need sibling listing — left for a follow-up. * fix(s3/lifecycle/router): require ExtVersionIdKey to confirm noncurrent Path classification alone misclassifies a literal-key collision: a versioned bucket holding an object with key "logs/backup.versions/2023" would be flagged noncurrent and have its key stripped to "logs/backup", losing the user's actual rule-prefix-matching path. SeaweedFS doesn't reserve the .versions/ segment, so the path shape is necessary but not sufficient. Add an authoritative confirmation: the entry must declare the same version_id via ExtVersionIdKey (the field SeaweedFS sets when storing a tracked version). Also reject idx==0 paths so ".versions/<vid>" can't yield an empty logical key. Tests: - collision: versioned bucket + .versions/ in literal key + no metadata (and the mismatched-vid variant) → still classified as a current-version object; - root-versions: .versions/v1 (idx==0) → treated as a regular key; - existing noncurrent test now sets ExtVersionIdKey to mirror the storage shape. * fix(s3/lifecycle/router): skip versioned-bucket version-folder events The previous attempt tried to classify <key>.versions/<vid> events as noncurrent versions by storage path. That's broken on three counts: - SeaweedFS stores version files as v_<id> (getVersionFileName), so comparing the path suffix to the raw ExtVersionIdKey never matches. - The "current latest" version on a versioned bucket lives at the same .versions/v_<id> path shape as noncurrent versions; the latest pointer is on the parent .versions/ directory's Extended[ExtLatestVersionIdKey], which the router doesn't see. - Even with a correct vid match, IsLatest=false plus the storage path as ObjectKey would have the dispatcher recompose <storagepath>.versions/v_<id> and no-op (or worse, target the wrong file). Until we route from .versions/ directory pointer-transition events (or supply IsLatest/SuccessorModTime/index from sibling listing), skip every event under a *.versions/ folder. Bare-key events (null versions) still route normally; bootstrap walking covers the versioned-storage cases. Tests assert the skip across tracked, literal-collision, and bucket-root .versions paths. * feat(s3api): refuse noncurrent-kind delete on the current latest version Defense-in-depth for the noncurrent kinds: even when bootstrap (or a future event-driven path) thinks a version is noncurrent, the server must verify against the .versions/ directory's Extended[ExtLatestVersionIdKey] before deleting. If the target version matches the latest pointer the action is silently dropped as NOOP_RESOLVED:VERSION_IS_LATEST instead of deleting the live data. * refactor(s3/lifecycle): tidy versioning gates per review - router: skip directory entries (other than MPU init) in buildObjectInfo so .versions/ folder events never become ObjectInfo. Subtest "versions dir itself" added. - s3api: switch isCurrentLatestVersion's path split from filepath.Split (OS-dependent) to path.Split so filer paths always use '/'.
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
@@ -102,6 +103,23 @@ func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle
|
||||
if req.VersionId == "" {
|
||||
return blocked("FATAL_EVENT_ERROR: version_id required for noncurrent / delete-marker delete"), nil
|
||||
}
|
||||
// Latest-pointer guard for noncurrent kinds: refuse to delete
|
||||
// the version that the .versions/ directory currently points
|
||||
// to. The router can't always tell current from noncurrent
|
||||
// without sibling state, so the server checks here.
|
||||
if req.ActionKind == s3_lifecycle_pb.ActionKind_NONCURRENT_DAYS ||
|
||||
req.ActionKind == s3_lifecycle_pb.ActionKind_NEWER_NONCURRENT {
|
||||
isLatest, lookupErr := s3a.isCurrentLatestVersion(req.Bucket, req.ObjectPath, req.VersionId)
|
||||
if lookupErr != nil {
|
||||
if errors.Is(lookupErr, filer_pb.ErrNotFound) || errors.Is(lookupErr, ErrObjectNotFound) {
|
||||
return noopResolved("NOT_FOUND"), nil
|
||||
}
|
||||
return retryLater("TRANSPORT_ERROR: latest-pointer lookup: " + lookupErr.Error()), nil
|
||||
}
|
||||
if isLatest {
|
||||
return noopResolved("VERSION_IS_LATEST"), nil
|
||||
}
|
||||
}
|
||||
if err := s3a.deleteSpecificObjectVersion(req.Bucket, req.ObjectPath, req.VersionId); err != nil {
|
||||
if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrObjectNotFound) {
|
||||
return noopResolved("NOT_FOUND_AT_DELETE"), nil
|
||||
@@ -157,6 +175,34 @@ func (s3a *S3ApiServer) lifecycleAbortMPU(ctx context.Context, req *s3_lifecycle
|
||||
return done(), nil
|
||||
}
|
||||
|
||||
// isCurrentLatestVersion reports whether versionId is the version the
|
||||
// .versions/ directory currently points to. SeaweedFS records the latest
|
||||
// version on the parent directory's Extended map; without consulting it,
|
||||
// a noncurrent-kind dispatch can't safely distinguish current from
|
||||
// noncurrent and would risk deleting the live version. Returns
|
||||
// (false, nil) when the directory has no latest pointer (e.g., the
|
||||
// bucket isn't versioned in this object's history).
|
||||
func (s3a *S3ApiServer) isCurrentLatestVersion(bucket, object, versionId string) (bool, error) {
|
||||
versionsDir := s3a.bucketDir(bucket) + "/" + object + s3_constants.VersionsFolder
|
||||
parent, name := path.Split(versionsDir)
|
||||
parent = strings.TrimRight(parent, "/")
|
||||
if parent == "" {
|
||||
parent = "/"
|
||||
}
|
||||
entry, err := s3a.getEntry(parent, name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if entry == nil || len(entry.Extended) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
latest, ok := entry.Extended[s3_constants.ExtLatestVersionIdKey]
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
return string(latest) == versionId, nil
|
||||
}
|
||||
|
||||
// computeEntryIdentity captures (mtime, size, head fid, sorted-Extended hash):
|
||||
// an overwrite changes mtime/size/fid; a metadata edit changes Extended; a
|
||||
// snapshot-restore that preserves mtime+size still differs in head_fid.
|
||||
|
||||
@@ -14,6 +14,11 @@ import (
|
||||
// Match is one (event, action) pair where EvaluateAction fired. The
|
||||
// dispatcher runs `LifecycleDelete` at DueTime; identity-CAS in the RPC
|
||||
// guards against drift between schedule time and dispatch time.
|
||||
//
|
||||
// For NoncurrentDays / NewerNoncurrent on a versioned bucket, ObjectKey
|
||||
// is the seaweedfs storage path (logical-key + ".versions/" + version_id)
|
||||
// so the dispatcher can locate the specific version, and VersionID
|
||||
// carries the AWS-visible version ID separately.
|
||||
type Match struct {
|
||||
Key s3lifecycle.ActionKey
|
||||
Action *engine.CompiledAction
|
||||
@@ -56,7 +61,7 @@ func Route(snap *engine.Snapshot, ev *reader.Event, now time.Time) []Match {
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
info := buildObjectInfo(ev)
|
||||
info := buildObjectInfo(ev, snap.BucketVersioned(ev.Bucket))
|
||||
if info == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -107,19 +112,25 @@ func Route(snap *engine.Snapshot, ev *reader.Event, now time.Time) []Match {
|
||||
return matches
|
||||
}
|
||||
|
||||
// buildObjectInfo derives a non-versioned ObjectInfo from a meta-log event.
|
||||
// Versioned-bucket semantics (IsLatest, NumVersions, NoncurrentIndex,
|
||||
// IsDeleteMarker for noncurrent versions) require listing siblings and land
|
||||
// in Phase 5; for now any non-MPU event is treated as IsLatest=true with the
|
||||
// LifecycleDelete RPC's identity-CAS catching stale schedules.
|
||||
// buildObjectInfo derives an ObjectInfo from a meta-log event and the
|
||||
// bucket's versioning state. Returns nil when the event has no usable
|
||||
// shape (missing attributes, hard delete already handled upstream).
|
||||
//
|
||||
// MPU init directories at .uploads/<upload_id> populate IsMPUInit and use the
|
||||
// destination object key from the entry's Extended map for filter matching,
|
||||
// so a rule with Filter.Prefix=foo/ matches an MPU uploading to foo/bar.txt.
|
||||
// On a versioned bucket the storage layout (<key>.versions/v_<id>) is
|
||||
// shared between the current latest and the noncurrent versions; the
|
||||
// latest pointer lives in the .versions/ directory's Extended map and
|
||||
// is updated separately. Without that pointer-transition signal here,
|
||||
// the router conservatively classifies every event as if it were the
|
||||
// current version (IsLatest=true) so it never deletes the live
|
||||
// latest; bootstrap walking + the server-side dispatch guard handle
|
||||
// noncurrent retention. NumVersions=0 keeps ExpiredObjectDeleteMarker
|
||||
// (which requires sole-survivor) suppressed.
|
||||
//
|
||||
// Returns nil when Attributes are missing — without ModTime, EvaluateAction
|
||||
// would compute due against year-0001 and fire immediately.
|
||||
func buildObjectInfo(ev *reader.Event) *s3lifecycle.ObjectInfo {
|
||||
// MPU init directories at .uploads/<upload_id> populate IsMPUInit and
|
||||
// use the destination object key from the entry's Extended map for
|
||||
// filter matching, so a rule with Filter.Prefix=foo/ matches an MPU
|
||||
// uploading to foo/bar.txt.
|
||||
func buildObjectInfo(ev *reader.Event, versioned bool) *s3lifecycle.ObjectInfo {
|
||||
entry := ev.NewEntry
|
||||
if entry == nil || entry.Attributes == nil {
|
||||
return nil
|
||||
@@ -135,6 +146,11 @@ func buildObjectInfo(ev *reader.Event) *s3lifecycle.ObjectInfo {
|
||||
IsMPUInit: true,
|
||||
}
|
||||
}
|
||||
// Directory entries that aren't MPU inits aren't lifecycle subjects:
|
||||
// the .versions/ folder itself, prefix dirs, etc. — emit nothing.
|
||||
if entry.IsDirectory {
|
||||
return nil
|
||||
}
|
||||
info := &s3lifecycle.ObjectInfo{
|
||||
Key: ev.Key,
|
||||
ModTime: time.Unix(entry.Attributes.Mtime, int64(entry.Attributes.MtimeNs)),
|
||||
@@ -142,6 +158,25 @@ func buildObjectInfo(ev *reader.Event) *s3lifecycle.ObjectInfo {
|
||||
IsLatest: true,
|
||||
NumVersions: 1,
|
||||
}
|
||||
if versioned {
|
||||
// On a versioned bucket the actual file path doesn't tell us
|
||||
// whether the entry is the current latest or a noncurrent
|
||||
// version — the latest pointer lives in the .versions/
|
||||
// directory's Extended map and isn't part of this event. We
|
||||
// also can't compute NumVersions / NoncurrentIndex here. Skip
|
||||
// any version-folder file event for now; bootstrap walking
|
||||
// drives noncurrent retention and current-version expiration
|
||||
// for versioned buckets until pointer-transition routing
|
||||
// lands. The bare-key path (null-version, pre-versioning
|
||||
// objects) keeps the regular routing.
|
||||
if isVersionFolderPath(ev.Key) {
|
||||
return nil
|
||||
}
|
||||
// NumVersions=0 keeps ExpiredObjectDeleteMarker (sole-survivor
|
||||
// gate) suppressed for the bare-key delete-marker case until
|
||||
// sibling listing lands.
|
||||
info.NumVersions = 0
|
||||
}
|
||||
if tags := extractTags(entry.Extended); len(tags) > 0 {
|
||||
info.Tags = tags
|
||||
}
|
||||
@@ -151,6 +186,22 @@ func buildObjectInfo(ev *reader.Event) *s3lifecycle.ObjectInfo {
|
||||
return info
|
||||
}
|
||||
|
||||
// isVersionFolderPath reports whether the bucket-relative key sits inside a
|
||||
// .versions/ folder — i.e. the path's parent segment ends with the
|
||||
// VersionsFolder suffix. Used by the versioned-bucket gate so the router
|
||||
// skips version-file events that need sibling state to be classified
|
||||
// safely.
|
||||
func isVersionFolderPath(key string) bool {
|
||||
idx := strings.LastIndex(key, "/")
|
||||
if idx <= 0 {
|
||||
return false
|
||||
}
|
||||
parent := key[:idx]
|
||||
parentIdx := strings.LastIndex(parent, "/")
|
||||
leaf := parent[parentIdx+1:]
|
||||
return strings.HasSuffix(leaf, s3_constants.VersionsFolder)
|
||||
}
|
||||
|
||||
// mpuInitInfo recognizes a multipart-upload init: a directory entry at
|
||||
// `.uploads/<upload_id>` carrying the destination key in Extended. Sub-events
|
||||
// for part uploads (deeper paths under the upload directory) are deliberately
|
||||
|
||||
@@ -398,3 +398,167 @@ func TestRouteRegularObjectUnderDualRuleSkipsAbortMPU(t *testing.T) {
|
||||
t.Fatalf("ActionKind=%v, want ExpirationDays", got)
|
||||
}
|
||||
}
|
||||
|
||||
func compileWithVersioned(rule *s3lifecycle.Rule, prior map[s3lifecycle.ActionKey]engine.PriorState) *engine.Snapshot {
|
||||
e := engine.New()
|
||||
return e.Compile([]engine.CompileInput{{Bucket: "bk", Rules: []*s3lifecycle.Rule{rule}, Versioned: true}},
|
||||
engine.CompileOptions{PriorStates: prior})
|
||||
}
|
||||
|
||||
func TestRouteVersionedNoncurrentEventDoesNotFireFromRouter(t *testing.T) {
|
||||
// Versioned bucket: the storage layout <key>.versions/v_<id> is
|
||||
// shared between the current latest and noncurrent versions, and
|
||||
// the latest pointer lives in the parent directory's metadata —
|
||||
// not on the version file itself. The router cannot distinguish
|
||||
// without consulting the .versions/ directory, so it must not
|
||||
// emit NONCURRENT_* matches; bootstrap (with sibling listing) is
|
||||
// responsible for those.
|
||||
rule := &s3lifecycle.Rule{
|
||||
ID: "r",
|
||||
Status: s3lifecycle.StatusEnabled,
|
||||
NoncurrentVersionExpirationDays: 7,
|
||||
}
|
||||
snap := compileWithVersioned(rule, activatedPrior(rule))
|
||||
|
||||
now := time.Now()
|
||||
old := now.AddDate(0, 0, -30)
|
||||
ev := eventCreate("bk", "logs/foo.versions/v_v1", old.Unix(), 1, old.UnixNano())
|
||||
ev.NewEntry.Extended = map[string][]byte{
|
||||
s3_constants.ExtVersionIdKey: []byte("v1"),
|
||||
}
|
||||
|
||||
if got := Route(snap, ev, now); len(got) != 0 {
|
||||
t.Fatalf("router must not emit noncurrent matches yet, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteVersionedCurrentEventStaysLatest(t *testing.T) {
|
||||
// Versioned bucket, ExpirationDays rule. The current-version event
|
||||
// arrives at the bare <key>; nothing about its path looks like a
|
||||
// .versions/ entry, so IsLatest stays true and the rule fires.
|
||||
rule := &s3lifecycle.Rule{ID: "r", Status: s3lifecycle.StatusEnabled, ExpirationDays: 1}
|
||||
snap := compileWithVersioned(rule, activatedPrior(rule))
|
||||
|
||||
now := time.Now()
|
||||
old := now.AddDate(0, 0, -2)
|
||||
ev := eventCreate("bk", "logs/foo", old.Unix(), 1, old.UnixNano())
|
||||
|
||||
matches := Route(snap, ev, now)
|
||||
if len(matches) != 1 {
|
||||
t.Fatalf("expected 1 match (EXPIRATION_DAYS), got %v", matches)
|
||||
}
|
||||
if matches[0].Key.ActionKind != s3lifecycle.ActionKindExpirationDays {
|
||||
t.Fatalf("ActionKind=%v, want ExpirationDays", matches[0].Key.ActionKind)
|
||||
}
|
||||
if matches[0].VersionID != "" {
|
||||
t.Fatalf("VersionID=%q, want empty for current-version path", matches[0].VersionID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteNonVersionedBucketIgnoresVersionsSuffix(t *testing.T) {
|
||||
// A non-versioned bucket happens to have an object literally named
|
||||
// "logs/foo.versions/v1" — that's just a regular path. The router
|
||||
// must NOT classify it as noncurrent or set IsLatest=false; the
|
||||
// rule fires as a normal current-object expiration.
|
||||
rule := &s3lifecycle.Rule{ID: "r", Status: s3lifecycle.StatusEnabled, ExpirationDays: 1}
|
||||
snap := compileWith(rule, activatedPrior(rule))
|
||||
|
||||
now := time.Now()
|
||||
old := now.AddDate(0, 0, -2)
|
||||
ev := eventCreate("bk", "logs/foo.versions/v1", old.Unix(), 1, old.UnixNano())
|
||||
|
||||
matches := Route(snap, ev, now)
|
||||
if len(matches) != 1 {
|
||||
t.Fatalf("expected 1 match, got %v", matches)
|
||||
}
|
||||
if matches[0].VersionID != "" {
|
||||
t.Fatalf("VersionID=%q, want empty for non-versioned bucket", matches[0].VersionID)
|
||||
}
|
||||
if matches[0].ObjectKey != "logs/foo.versions/v1" {
|
||||
t.Fatalf("ObjectKey=%q, want unchanged for non-versioned bucket", matches[0].ObjectKey)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteVersionedExpiredDeleteMarkerSuppressedWithoutSiblings(t *testing.T) {
|
||||
// ExpiredObjectDeleteMarker requires NumVersions==1 — the marker is
|
||||
// the sole-survivor. Without sibling listing the router can't
|
||||
// confirm that, so the rule must NOT fire just because the latest
|
||||
// is a delete marker. A future PR adds sibling listing.
|
||||
rule := &s3lifecycle.Rule{
|
||||
ID: "r",
|
||||
Status: s3lifecycle.StatusEnabled,
|
||||
ExpiredObjectDeleteMarker: true,
|
||||
}
|
||||
snap := compileWithVersioned(rule, activatedPrior(rule))
|
||||
|
||||
now := time.Now()
|
||||
old := now.AddDate(0, 0, -1)
|
||||
ev := eventCreate("bk", "logs/gone", old.Unix(), 0, old.UnixNano())
|
||||
ev.NewEntry.Extended = map[string][]byte{
|
||||
s3_constants.ExtDeleteMarkerKey: {1},
|
||||
}
|
||||
|
||||
if got := Route(snap, ev, now); len(got) != 0 {
|
||||
t.Fatalf("ExpiredDeleteMarker without sibling count must not fire, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteVersionedAllVersionFolderPathsSkipped(t *testing.T) {
|
||||
// On a versioned bucket the router skips every event whose parent
|
||||
// directory name ends with ".versions" — both the version files
|
||||
// SeaweedFS itself writes (logs/foo.versions/v_v1) and any literal
|
||||
// key the user happens to put under such a parent — because the
|
||||
// current-vs-noncurrent classification needs the .versions/
|
||||
// directory's latest pointer, which isn't carried by these events.
|
||||
// Bootstrap covers retention for those entries.
|
||||
rule := &s3lifecycle.Rule{ID: "r", Status: s3lifecycle.StatusEnabled, ExpirationDays: 1}
|
||||
snap := compileWithVersioned(rule, activatedPrior(rule))
|
||||
|
||||
now := time.Now()
|
||||
old := now.AddDate(0, 0, -2)
|
||||
cases := []struct {
|
||||
name string
|
||||
key string
|
||||
isDir bool
|
||||
ext map[string][]byte
|
||||
}{
|
||||
{
|
||||
name: "tracked version file",
|
||||
key: "logs/foo.versions/v_v1",
|
||||
ext: map[string][]byte{s3_constants.ExtVersionIdKey: []byte("v1")},
|
||||
},
|
||||
{
|
||||
name: "literal-key collision",
|
||||
key: "logs/backup.versions/2023",
|
||||
},
|
||||
{
|
||||
name: "bucket-root .versions",
|
||||
key: ".versions/v_v1",
|
||||
ext: map[string][]byte{s3_constants.ExtVersionIdKey: []byte("v1")},
|
||||
},
|
||||
{
|
||||
// The .versions/ folder itself is a directory entry; the
|
||||
// router must not emit ObjectInfo for it. Without the
|
||||
// directory short-circuit it would route as a regular
|
||||
// object and the dispatcher would target a directory path.
|
||||
name: "versions dir itself",
|
||||
key: "logs/foo.versions",
|
||||
isDir: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ev := eventCreate("bk", tc.key, old.Unix(), 1, old.UnixNano())
|
||||
if tc.ext != nil {
|
||||
ev.NewEntry.Extended = tc.ext
|
||||
}
|
||||
if tc.isDir {
|
||||
ev.NewEntry.IsDirectory = true
|
||||
}
|
||||
if got := Route(snap, ev, now); len(got) != 0 {
|
||||
t.Fatalf("version-folder event should be skipped, got %v", got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user