Files
seaweedfs/weed/s3api/filer_util.go
Chris Lu 8b87ceb0d1 refactor(s3api): strip back-stamp from PutBucketLifecycleConfiguration (Phase 4) (#9367)
* refactor(s3api): strip back-stamp from PutBucketLifecycleConfiguration

The handler used to walk every existing entry under the rule's prefix
and stamp entry.Attributes.TtlSec + the SeaweedFSExpiresS3 flag so that
the filer's compaction filter would expire them. With the event-driven
lifecycle worker live, that retroactive walk is redundant — the worker
drives expiration off the meta-log and a one-time bootstrap scan, so a
PUT lifecycle stays O(rules) instead of O(objects).

New writes still inherit TTL from the filer.conf location entry above;
that volume-routing path is unchanged here and will move to an explicit
operator command later (Phase 11).

Drops updateEntriesTTL + processDirectoryTTL + processTTLBatch +
updateEntryTTL from filer_util.go.

* fix(s3api): clear stale lifecycle TTL entries on PUT

PutBucketLifecycleConfiguration only ever appended/updated filer.conf
entries — it never cleared ones the operator removed, renamed-prefix on,
disabled, retagged with a tag filter, or bucket-versioned out of the
fast path. The stale day-TTL kept routing new writes (and would expire
old ones if any landed under the prefix) after the policy was updated.

Treat PUT as a full replacement: walk this bucket's existing day-TTL
entries, clear them, then add fresh entries from the new rule set.

* test(command): bump mini default plugin job-type count to 7

The s3_lifecycle plugin handler registered in #9362 is the seventh
default; the test still asserted six.

* fix(s3api): delete stale lifecycle PathConf instead of blanking Ttl

Just clearing pathConf.Ttl leaves the rule's Collection, Replication,
and VolumeGrowthCount in place, so new writes still match the stale
prefix and inherit outdated routing/placement. Use
fc.DeleteLocationConf so the lifecycle-owned PathConf goes away
entirely. Same fix in DeleteBucketLifecycleHandler, which had the
same bug.
2026-05-08 11:03:03 -07:00

205 lines
5.9 KiB
Go

package s3api
import (
"context"
"errors"
"fmt"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error {
return filer_pb.Mkdir(context.Background(), s3a, parentDirectoryPath, dirName, fn)
}
func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk, fn func(entry *filer_pb.Entry)) error {
return filer_pb.MkFile(context.Background(), s3a, parentDirectoryPath, fileName, chunks, fn)
}
func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit uint32) (entries []*filer_pb.Entry, isLast bool, err error) {
err = filer_pb.List(context.Background(), s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLastEntry bool) error {
entries = append(entries, entry)
if isLastEntry {
isLast = true
}
return nil
}, startFrom, inclusive, limit)
if len(entries) == 0 {
isLast = true
}
return
}
func (s3a *S3ApiServer) rm(parentDirectoryPath, entryName string, isDeleteData, isRecursive bool) error {
return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
})
}
func (s3a *S3ApiServer) rmObject(parentDirectoryPath, entryName string, isDeleteData, isRecursive bool) error {
return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return deleteObjectEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
})
}
func deleteObjectEntry(client filer_pb.SeaweedFilerClient, parentDirectoryPath, entryName string, isDeleteData, isRecursive bool) error {
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
return nil
}
if !strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
return err
}
return demoteDirectoryMarkerToImplicitDirectory(client, parentDirectoryPath, entryName)
}
func doDeleteEntry(client filer_pb.SeaweedFilerClient, parentDirectoryPath string, entryName string, isDeleteData bool, isRecursive bool) error {
request := &filer_pb.DeleteEntryRequest{
Directory: parentDirectoryPath,
Name: entryName,
IsDeleteData: isDeleteData,
IsRecursive: isRecursive,
IgnoreRecursiveError: true,
}
glog.V(1).Infof("delete entry %v/%v: %v", parentDirectoryPath, entryName, request)
if resp, err := client.DeleteEntry(context.Background(), request); err != nil {
glog.V(1).Infof("delete entry %v: %v", request, err)
return fmt.Errorf("delete entry %s/%s: %v", parentDirectoryPath, entryName, err)
} else {
if resp.Error != "" {
return fmt.Errorf("delete entry %s/%s: %v", parentDirectoryPath, entryName, resp.Error)
}
}
return nil
}
func demoteDirectoryMarkerToImplicitDirectory(client filer_pb.SeaweedFilerClient, parentDirectoryPath, entryName string) error {
resp, err := filer_pb.LookupEntry(context.Background(), client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parentDirectoryPath,
Name: entryName,
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
return nil
}
return fmt.Errorf("lookup entry %s/%s: %w", parentDirectoryPath, entryName, err)
}
if resp.Entry == nil || !resp.Entry.IsDirectory {
return nil
}
if !resp.Entry.IsDirectoryKeyObject() {
return nil
}
clearDirectoryMarkerMetadata(resp.Entry)
if err := filer_pb.UpdateEntry(context.Background(), client, &filer_pb.UpdateEntryRequest{
Directory: parentDirectoryPath,
Entry: resp.Entry,
}); err != nil {
if errors.Is(err, filer_pb.ErrNotFound) || status.Code(err) == codes.NotFound {
return nil
}
return fmt.Errorf("update entry %s/%s: %w", parentDirectoryPath, entryName, err)
}
return nil
}
func clearDirectoryMarkerMetadata(entry *filer_pb.Entry) {
if entry == nil {
return
}
if entry.Attributes == nil {
entry.Attributes = &filer_pb.FuseAttributes{}
}
entry.Attributes.Mime = ""
entry.Attributes.Md5 = nil
entry.Attributes.FileSize = 0
entry.Content = nil
entry.Chunks = nil
if len(entry.Extended) == 0 {
return
}
filtered := make(map[string][]byte)
for k, v := range entry.Extended {
lowerKey := strings.ToLower(k)
if strings.HasPrefix(lowerKey, "xattr-") || strings.HasPrefix(lowerKey, s3_constants.SeaweedFSInternalPrefix) {
filtered[k] = v
}
}
if len(filtered) == 0 {
entry.Extended = nil
return
}
entry.Extended = filtered
}
func (s3a *S3ApiServer) exists(parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
return filer_pb.Exists(context.Background(), s3a, parentDirectoryPath, entryName, isDirectory)
}
func (s3a *S3ApiServer) getEntry(parentDirectoryPath, entryName string) (entry *filer_pb.Entry, err error) {
fullPath := util.NewFullPath(parentDirectoryPath, entryName)
return filer_pb.GetEntry(context.Background(), s3a, fullPath)
}
func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_pb.Entry) error {
updateEntryRequest := &filer_pb.UpdateEntryRequest{
Directory: parentDirectoryPath,
Entry: newEntry,
}
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
err := filer_pb.UpdateEntry(context.Background(), client, updateEntryRequest)
if err != nil {
return err
}
return nil
})
return err
}
func (s3a *S3ApiServer) getCollectionName(bucket string) string {
if s3a.option.FilerGroup != "" {
return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket)
}
return bucket
}
func objectKey(key *string) *string {
if strings.HasPrefix(*key, "/") {
t := (*key)[1:]
return &t
}
return key
}