mirror of
https://github.com/versity/versitygw.git
synced 2026-04-23 06:00:30 +00:00
Merge pull request #2076 from versity/ben/sidecar-fixes
fix: add explicit sidecar metadata cleanup on object/bucket deletion
This commit is contained in:
@@ -145,15 +145,24 @@ func (s SideCar) ListAttributes(bucket, object string) ([]string, error) {
|
||||
}
|
||||
|
||||
// DeleteAttributes removes all attributes for an object or a bucket.
|
||||
// When object is empty the entire bucket sidecar directory is removed,
|
||||
// cleaning up any orphaned object or multipart metadata within it.
|
||||
func (s SideCar) DeleteAttributes(bucket, object string) error {
|
||||
metadir := filepath.Join(s.dir, bucket, object, sidecarmeta)
|
||||
if object == "" {
|
||||
metadir = filepath.Join(s.dir, bucket, sidecarmeta)
|
||||
// Remove the entire bucket sidecar directory so that orphaned
|
||||
// object/multipart metadata does not accumulate after DeleteBucket.
|
||||
bucketDir := filepath.Join(s.dir, bucket)
|
||||
err := os.RemoveAll(bucketDir)
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return fmt.Errorf("failed to remove bucket attributes: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
metadir := filepath.Join(s.dir, bucket, object, sidecarmeta)
|
||||
err := os.RemoveAll(metadir)
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return fmt.Errorf("failed to remove attributes: %v", err)
|
||||
return fmt.Errorf("failed to remove attributes: %w", err)
|
||||
}
|
||||
s.cleanupEmptyDirs(metadir, bucket, object)
|
||||
return nil
|
||||
|
||||
@@ -641,6 +641,13 @@ func (p *Posix) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("remove bucket: %w", err)
|
||||
}
|
||||
// Bucket data is already removed; a metadata cleanup failure orphans only
|
||||
// the sidecar directory, which is not user-visible. Log and continue rather
|
||||
// than returning an error that would mislead callers into thinking the
|
||||
// bucket still exists.
|
||||
if err = p.meta.DeleteAttributes(bucket, ""); err != nil {
|
||||
debuglogger.Logf("failed to delete bucket sidecar attributes (%q): %v", bucket, err)
|
||||
}
|
||||
// Remove the bucket from versioning directory
|
||||
if p.versioningEnabled() {
|
||||
err = os.RemoveAll(filepath.Join(p.versioningDir, bucket))
|
||||
@@ -882,8 +889,12 @@ func (p *Posix) deleteNullVersionIdObject(bucket, key string) error {
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
_ = p.meta.DeleteAttributes(versionPath, "")
|
||||
return nil
|
||||
}
|
||||
|
||||
func isRemovableAttr(attr string) bool {
|
||||
@@ -1042,6 +1053,18 @@ func (p *Posix) ensureNotDeleteMarker(bucket, object, versionId string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// With path-based metadata backends (e.g. sidecar), RetrieveAttribute
|
||||
// returns ErrNoSuchKey whether the sidecar attribute is absent OR the
|
||||
// data file simply doesn't exist — the two cases are indistinguishable
|
||||
// from metadata alone. Verify the data file directly so callers
|
||||
// receive the correct NoSuchVersion / NoSuchKey error.
|
||||
if _, statErr := os.Stat(filepath.Join(bucket, object)); errors.Is(statErr, fs.ErrNotExist) || errors.Is(statErr, syscall.ENOTDIR) {
|
||||
if versionId != "" {
|
||||
return s3err.GetAPIError(s3err.ErrNoSuchVersion)
|
||||
}
|
||||
return s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
|
||||
_, err := p.meta.RetrieveAttribute(nil, bucket, object, deleteMarkerKey)
|
||||
if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) {
|
||||
if versionId != "" {
|
||||
@@ -1518,6 +1541,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu s3response.Create
|
||||
// cleanup object if returning error
|
||||
os.RemoveAll(filepath.Join(tmppath, uploadID))
|
||||
os.Remove(tmppath)
|
||||
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
|
||||
return s3response.InitiateMultipartUploadResult{}, err
|
||||
}
|
||||
}
|
||||
@@ -1536,6 +1560,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu s3response.Create
|
||||
// cleanup object if returning error
|
||||
os.RemoveAll(filepath.Join(tmppath, uploadID))
|
||||
os.Remove(tmppath)
|
||||
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
|
||||
return s3response.InitiateMultipartUploadResult{}, err
|
||||
}
|
||||
|
||||
@@ -1549,6 +1574,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu s3response.Create
|
||||
// cleanup object if returning error
|
||||
os.RemoveAll(filepath.Join(tmppath, uploadID))
|
||||
os.Remove(tmppath)
|
||||
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
|
||||
return s3response.InitiateMultipartUploadResult{}, err
|
||||
}
|
||||
}
|
||||
@@ -1564,6 +1590,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu s3response.Create
|
||||
// cleanup object if returning error
|
||||
os.RemoveAll(filepath.Join(tmppath, uploadID))
|
||||
os.Remove(tmppath)
|
||||
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
|
||||
return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("parse object lock retention: %w", err)
|
||||
}
|
||||
err = p.PutObjectRetention(withCtxNoSlot(ctx), bucket, filepath.Join(objdir, uploadID), "", retParsed)
|
||||
@@ -1574,6 +1601,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu s3response.Create
|
||||
// cleanup object if returning error
|
||||
os.RemoveAll(filepath.Join(tmppath, uploadID))
|
||||
os.Remove(tmppath)
|
||||
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
|
||||
return s3response.InitiateMultipartUploadResult{}, err
|
||||
}
|
||||
}
|
||||
@@ -1588,6 +1616,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu s3response.Create
|
||||
// cleanup object if returning error
|
||||
_ = os.RemoveAll(filepath.Join(tmppath, uploadID))
|
||||
_ = os.Remove(tmppath)
|
||||
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
|
||||
return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("store mp checksum algorithm: %w", err)
|
||||
}
|
||||
}
|
||||
@@ -2051,6 +2080,10 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("create object version: %w", err)
|
||||
}
|
||||
// Clean up object-lock attrs that may have leaked from the previous
|
||||
// version's path-based metadata into this (new) version's sidecar.
|
||||
_ = p.meta.DeleteAttribute(bucket, object, objectLegalHoldKey)
|
||||
_ = p.meta.DeleteAttribute(bucket, object, objectRetentionKey)
|
||||
}
|
||||
|
||||
// if the versioning is enabled, generate a new versionID for the object
|
||||
@@ -2488,6 +2521,11 @@ func (p *Posix) AbortMultipartUpload(ctx context.Context, mpu *s3.AbortMultipart
|
||||
}
|
||||
os.Remove(objdir)
|
||||
|
||||
// Clean up sidecar metadata for the aborted upload. With xattr this is
|
||||
// a no-op; with sidecar the metadata directory would otherwise be orphaned.
|
||||
uploadMetaPath := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadID)
|
||||
_ = p.meta.DeleteAttributes(bucket, uploadMetaPath)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3556,6 +3594,13 @@ func (p *Posix) PutObjectWithPostFunc(ctx context.Context, po s3response.PutObje
|
||||
if err != nil {
|
||||
return s3response.PutObjectOutput{}, fmt.Errorf("create object version: %w", err)
|
||||
}
|
||||
// With path-based metadata backends (e.g. sidecar), object-lock
|
||||
// attributes written on the previous version persist at this path
|
||||
// after createObjVersion because metadata is not replaced atomically
|
||||
// the way xattrs are on file rename. Delete them so they do not
|
||||
// bleed into the new version.
|
||||
_ = p.meta.DeleteAttribute(*po.Bucket, *po.Key, objectLegalHoldKey)
|
||||
_ = p.meta.DeleteAttribute(*po.Bucket, *po.Key, objectRetentionKey)
|
||||
}
|
||||
}
|
||||
if errors.Is(err, syscall.ENAMETOOLONG) {
|
||||
@@ -3644,6 +3689,11 @@ func (p *Posix) PutObjectWithPostFunc(ctx context.Context, po s3response.PutObje
|
||||
return s3response.PutObjectOutput{}, err
|
||||
}
|
||||
versionID = nullVersionId
|
||||
// Clear any stale versionId sidecar attribute left from a previous
|
||||
// versioned object at this path. With xattr this is implicit (the
|
||||
// new file carries only the attrs set on the tmpfile), but with
|
||||
// path-based metadata the old attr persists until explicitly deleted.
|
||||
_ = p.meta.DeleteAttribute(*po.Bucket, *po.Key, versionIdKey)
|
||||
}
|
||||
|
||||
var sum string
|
||||
@@ -3921,6 +3971,16 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
|
||||
return nil, fmt.Errorf("get obj versionId: %w", err)
|
||||
}
|
||||
if errors.Is(err, meta.ErrNoSuchKey) {
|
||||
// With sidecar, ErrNoSuchKey means "attribute absent" regardless of
|
||||
// whether the data file exists. If the file is absent the object
|
||||
// does not exist at all → AWS returns success for DeleteObject.
|
||||
// Also handle ENOTDIR: when a key such as "foo/bar" is requested
|
||||
// but "foo" is a regular file (not a directory), the path cannot
|
||||
// contain any object.
|
||||
_, statErr := os.Stat(filepath.Join(bucket, object))
|
||||
if errors.Is(statErr, fs.ErrNotExist) || errors.Is(statErr, syscall.ENOTDIR) {
|
||||
return &s3.DeleteObjectOutput{VersionId: input.VersionId}, nil
|
||||
}
|
||||
vId = []byte(nullVersionId)
|
||||
}
|
||||
|
||||
@@ -3994,6 +4054,15 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
|
||||
return nil, fmt.Errorf("link tmp file: %w", err)
|
||||
}
|
||||
|
||||
// With path-based metadata (sidecar) the live object's attrs are
|
||||
// not replaced atomically. The restored version may not have all
|
||||
// the attrs that the deleted version had (e.g. a null version has
|
||||
// no versionIdKey). Clear attrs that belong to the deleted version
|
||||
// before copying the restored version's attrs so that the restored
|
||||
// version presents a clean state.
|
||||
_ = p.meta.DeleteAttribute(bucket, object, versionIdKey)
|
||||
_ = p.meta.DeleteAttribute(bucket, object, deleteMarkerKey)
|
||||
|
||||
attrs, err := p.meta.ListAttributes(versionPath, srcVersionId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list object attributes: %w", err)
|
||||
@@ -4016,6 +4085,7 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
|
||||
return nil, fmt.Errorf("remove obj version %w", err)
|
||||
}
|
||||
|
||||
_ = p.meta.DeleteAttributes(versionPath, srcVersionId)
|
||||
p.removeParents(filepath.Join(p.versioningDir, bucket), filepath.Join(genObjVersionKey(object), *input.VersionId))
|
||||
|
||||
return &s3.DeleteObjectOutput{
|
||||
@@ -4042,6 +4112,7 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
|
||||
return nil, fmt.Errorf("delete object: %w", err)
|
||||
}
|
||||
|
||||
_ = p.meta.DeleteAttributes(versionPath, *input.VersionId)
|
||||
p.removeParents(filepath.Join(p.versioningDir, bucket), filepath.Join(genObjVersionKey(object), *input.VersionId))
|
||||
|
||||
return &s3.DeleteObjectOutput{
|
||||
@@ -5553,6 +5624,19 @@ func (p *Posix) GetObjectTagging(ctx context.Context, bucket, object, versionId
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if versionId == "" {
|
||||
_, err = os.Stat(filepath.Join(bucket, object))
|
||||
if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
if errors.Is(err, syscall.ENAMETOOLONG) {
|
||||
return nil, s3err.GetAPIError(s3err.ErrKeyTooLong)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stat object: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if versionId != "" {
|
||||
if !p.versioningEnabled() {
|
||||
//TODO: Maybe we need to return our custom error here?
|
||||
@@ -5630,6 +5714,19 @@ func (p *Posix) PutObjectTagging(ctx context.Context, bucket, object, versionId
|
||||
return err
|
||||
}
|
||||
|
||||
if versionId == "" {
|
||||
_, err = os.Stat(filepath.Join(bucket, object))
|
||||
if errors.Is(err, fs.ErrNotExist) || errors.Is(err, syscall.ENOTDIR) {
|
||||
return s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
if errors.Is(err, syscall.ENAMETOOLONG) {
|
||||
return s3err.GetAPIError(s3err.ErrKeyTooLong)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat object: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if versionId != "" {
|
||||
if !p.versioningEnabled() {
|
||||
//TODO: Maybe we need to return our custom error here?
|
||||
|
||||
Reference in New Issue
Block a user