Files
versitygw/backend/posix/posix.go
Ben McClelland 9610ef8a4e fix: prevent connection errors in space/quotas error paths
When uploads hit ENOSPC or EDQUOT, the server was returning the
correct S3 error but could close the connection while unread
request bytes were still in flight, which caused TCP resets and
surfaced as broken pipe/connection reset errors in SDKs instead
of a clean Insufficient Storage response. This change drains the
remaining upload body before returning the error so the response
can be delivered and the connection can close gracefully,
preserving correct client-visible behavior under disk-full and
quota-exceeded conditions.

Fixes #2209
2026-06-24 09:18:25 -07:00

6793 lines
203 KiB
Go

// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package posix
import (
"context"
"crypto/md5"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"syscall"
"time"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/google/uuid"
"github.com/oklog/ulid/v2"
"github.com/versity/versitygw/auth"
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/backend/meta"
"github.com/versity/versitygw/debuglogger"
"github.com/versity/versitygw/s3api/middlewares"
"github.com/versity/versitygw/s3api/utils"
"github.com/versity/versitygw/s3err"
"github.com/versity/versitygw/s3response"
"golang.org/x/sync/semaphore"
)
type Posix struct {
backend.BackendUnsupported
// bucket/object metadata storage facility
meta meta.MetadataStorer
rootfd *os.File
rootdir string
// chownuid/gid enable chowning of files to the account uid/gid
// when objects are uploaded
chownuid bool
chowngid bool
// euid/egid are the effective uid/gid of the running versitygw process
// used to determine if chowning is needed
euid int
egid int
// bucketlinks is a flag to enable symlinks to directories at the top
// level gateway directory to be treated as buckets the same as directories
bucketlinks bool
// bucket versioning directory path
versioningDir string
// newDirPerm is the permission to set on newly created directories
newDirPerm fs.FileMode
// forceNoTmpFile is a flag to disable the use of O_TMPFILE even
// if the filesystem supports it. This is needed for cases where
// there are different filesystems mounted below the bucket level.
forceNoTmpFile bool
// forceNoCopyFileRange is a flag to disable the use of io.Copy to
// reassemble multipart upload parts, which uses copy_file_range on
// linux. This is needed for cases where a filesystem that does not
// support copy_file_range is mounted over NFSv4.2.
forceNoCopyFileRange bool
// enable posix level bucket name validations, not needed if the
// frontend handlers are already validating bucket names
validateBucketName bool
// actionLimiter limits the number of concurrently running POSIX actions.
// The primary goal is to bound OS thread growth: when goroutines block in
// filesystem syscalls, the Go scheduler may spawn additional OS threads to
// keep other goroutines runnable. Since nearly all POSIX actions eventually
// execute blocking syscalls (stat, readdir, xattr, open, etc.), this limiter
// constrains parallelism to prevent excessive thread creation under load.
actionLimiter *semaphore.Weighted
// copyObjectThreshold is the maximum allowed size (in bytes) for a copy
// source object. Requests to copy objects larger than this value are
// rejected with an 'InvalidRequest' to comply with the S3 limit
// of 5 GiB.
copyObjectThreshold int64
}
var _ backend.Backend = &Posix{}
const (
MetaTmpDir = ".sgwtmp"
MetaTmpMultipartDir = MetaTmpDir + "/multipart"
onameAttr = "objname"
inProgressSuffix = ".inprogress"
tagHdr = "X-Amz-Tagging"
oldMetaHdr = "X-Amz-Meta"
metadataHdr = "metadata"
contentTypeHdr = "content-type"
contentEncHdr = "content-encoding"
contentLangHdr = "content-language"
contentDispHdr = "content-disposition"
cacheCtrlHdr = "cache-control"
expiresHdr = "expires"
websiteRedirectHdr = "website-redirect-location"
emptyMD5 = "\"d41d8cd98f00b204e9800998ecf8427e\""
aclkey = "acl"
ownershipkey = "ownership"
etagkey = "etag"
checksumsKey = "checksums"
policykey = "policy"
bucketLockKey = "bucket-lock"
objectRetentionKey = "object-retention"
objectLegalHoldKey = "object-legal-hold"
corskey = "cors"
websitekey = "website"
versioningKey = "versioning"
deleteMarkerKey = "delete-marker"
versionIdKey = "version-id"
partCrc64nvme = "part-crc64nvme"
mpMetaKey = "mp-metadata"
nullVersionId = "null"
doFalloc = true
skipFalloc = false
// defaultConcurrency is the default limit for concurrent POSIX actions.
defaultConcurrency = 5000
)
// PosixOpts are the options for the Posix backend
type PosixOpts struct {
// ChownUID sets the UID of the object to the UID of the user on PUT
ChownUID bool
// ChownGID sets the GID of the object to the GID of the user on PUT
ChownGID bool
// BucketLinks enables symlinks to directories to be treated as buckets
BucketLinks bool
//VersioningDir sets the version directory to enable object versioning
VersioningDir string
// NewDirPerm specifies the permission to set on newly created directories
NewDirPerm fs.FileMode
// SideCarDir sets the directory to store sidecar metadata
SideCarDir string
// ForceNoTmpFile disables the use of O_TMPFILE even if the filesystem
// supports it
ForceNoTmpFile bool
// ForceNoCopyFileRange disables the use of io.Copy for multipart uploads parts
ForceNoCopyFileRange bool
// ValidateBucketNames enables minimal bucket name validation to prevent
// incorrect access to the filesystem. This is only needed if the
// frontend is not already validating bucket names.
ValidateBucketNames bool
// Concurrency sets the maximum number of concurrently running POSIX actions.
// It is intended to bound OS thread growth caused by goroutines blocking in
// filesystem-heavy syscalls (e.g., stat, readdir, xattr), which can otherwise
// trigger additional thread creation under load.
// This acts as a backpressure mechanism: once the limit is reached, new
// actions are queued until capacity becomes available. As a result, if the
// queue depth grows under sustained load, request latency increases and
// upstream timeouts may occur.
Concurrency int
// CopyObjectThreshold sets the maximum allowed source object size (in bytes)
// for CopyObject and UploadPartCopy operations. Requests exceeding this
// threshold are rejected with an 'InvalidRequest' error. Defaults to the
// S3 specification limit of 5 GiB.
CopyObjectThreshold int64
}
func New(rootdir string, meta meta.MetadataStorer, opts PosixOpts) (*Posix, error) {
if opts.SideCarDir != "" && strings.HasPrefix(opts.SideCarDir, rootdir) {
return nil, fmt.Errorf("sidecar directory cannot be inside the gateway root directory")
}
err := os.Chdir(rootdir)
if err != nil {
return nil, fmt.Errorf("chdir %v: %w", rootdir, err)
}
f, err := os.Open(rootdir)
if err != nil {
return nil, fmt.Errorf("open %v: %w", rootdir, err)
}
rootdirAbs, err := filepath.Abs(rootdir)
if err != nil {
return nil, fmt.Errorf("get absolute path of %v: %w", rootdir, err)
}
var versioningdirAbs string
// Ensure the versioning directory isn't within the root directory
if opts.VersioningDir != "" {
versioningdirAbs, err = validateSubDir(rootdirAbs, opts.VersioningDir)
if err != nil {
return nil, err
}
}
var sidecardirAbs string
// Ensure the sidecar directory isn't within the root directory
if opts.SideCarDir != "" {
sidecardirAbs, err = validateSubDir(rootdirAbs, opts.SideCarDir)
if err != nil {
return nil, err
}
}
if versioningdirAbs != "" {
fmt.Println("Bucket versioning enabled with directory:", versioningdirAbs)
}
if sidecardirAbs != "" {
fmt.Println("Using sidecar directory for metadata:", sidecardirAbs)
}
return &Posix{
meta: meta,
rootfd: f,
rootdir: rootdir,
euid: os.Geteuid(),
egid: os.Getegid(),
chownuid: opts.ChownUID,
chowngid: opts.ChownGID,
bucketlinks: opts.BucketLinks,
versioningDir: versioningdirAbs,
newDirPerm: opts.NewDirPerm,
forceNoTmpFile: opts.ForceNoTmpFile,
forceNoCopyFileRange: opts.ForceNoCopyFileRange,
validateBucketName: opts.ValidateBucketNames,
actionLimiter: semaphore.NewWeighted(int64(concurrencyOrDefault(opts.Concurrency))),
copyObjectThreshold: opts.CopyObjectThreshold,
}, nil
}
// concurrencyOrDefault returns n if it is positive, otherwise defaultConcurrency.
func concurrencyOrDefault(n int) int {
if n > 0 {
return n
}
return defaultConcurrency
}
func validateSubDir(root, dir string) (string, error) {
absDir, err := filepath.Abs(dir)
if err != nil {
return "", fmt.Errorf("get absolute path of %v: %w",
dir, err)
}
if isDirBelowRoot(root, absDir) {
return "", fmt.Errorf("the root directory %v contains the directory %v",
root, dir)
}
vDir, err := os.Stat(absDir)
if err != nil {
return "", fmt.Errorf("stat %q: %w", absDir, err)
}
if !vDir.IsDir() {
return "", fmt.Errorf("path %q is not a directory", absDir)
}
return absDir, nil
}
func isDirBelowRoot(root, dir string) bool {
// Ensure the paths ends with a separator
if !strings.HasSuffix(root, string(filepath.Separator)) {
root += string(filepath.Separator)
}
if !strings.HasSuffix(dir, string(filepath.Separator)) {
dir += string(filepath.Separator)
}
// Ensure the root directory doesn't contain the directory
if strings.HasPrefix(dir, root) {
return true
}
return false
}
func (p *Posix) Shutdown() {
p.rootfd.Close()
}
func (p *Posix) String() string {
return "Posix Gateway"
}
// define a private key type
type ctxKey int
const (
ctxKeyNoAcquireSlot ctxKey = iota
)
// withCtxNoSlot is a context wrapper with ctxKeyNoAcquireSlot key
func withCtxNoSlot(ctx context.Context) context.Context {
return context.WithValue(ctx, ctxKeyNoAcquireSlot, struct{}{})
}
// acquireActionSlot reserves a concurrency slot for an action
// it blocks until a slot is available or the context is canceled.
// On success, it returns a releaser that must be called to free the slot.
// If ctxKeyNoAcquireSlot is set in the context, the limiter is bypassed
// and a no-op releaser is returned.
func (p *Posix) acquireActionSlot(ctx context.Context) (func(), error) {
// if no acquire lock is set, do not reserve a slot
// and return a blank releaser
val := ctx.Value(ctxKeyNoAcquireSlot)
if val != nil {
return func() {}, nil
}
if err := p.actionLimiter.Acquire(ctx, 1); err != nil {
return nil, err
}
return func() {
p.actionLimiter.Release(1)
}, nil
}
// returns the versioning state
func (p *Posix) versioningEnabled() bool {
return p.versioningDir != ""
}
// validateVersionId checks if the input versionId is 'ulid' compatible
func (p *Posix) validateVersionId(versionId string) error {
if versionId == "" || versionId == "null" {
return nil
}
_, err := ulid.Parse(versionId)
if err != nil {
return s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, versionId)
}
return nil
}
func (p *Posix) doesBucketAndObjectExist(bucket, object string) error {
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
_, err = os.Stat(filepath.Join(bucket, object))
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return fmt.Errorf("stat object: %w", err)
}
return nil
}
func (p *Posix) ListBuckets(ctx context.Context, input s3response.ListBucketsInput) (s3response.ListAllMyBucketsResult, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.ListAllMyBucketsResult{}, err
}
defer release()
fis, err := listBucketFileInfos(p.bucketlinks)
if err != nil {
return s3response.ListAllMyBucketsResult{}, fmt.Errorf("listBucketFileInfos : %w", err)
}
var cToken string
var buckets []s3response.ListAllMyBucketsEntry
for _, fi := range fis {
if !strings.HasPrefix(fi.Name(), input.Prefix) {
continue
}
if !utils.IsValidBucketName(fi.Name()) {
continue
}
if len(buckets) == int(input.MaxBuckets) {
cToken = buckets[len(buckets)-1].Name
break
}
if fi.Name() <= input.ContinuationToken {
continue
}
// return all the buckets for admin users
if input.IsAdmin {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: fi.Name(),
CreationDate: fi.ModTime(),
})
continue
}
aclJSON, err := p.meta.RetrieveAttribute(nil, fi.Name(), "", aclkey)
if errors.Is(err, meta.ErrNoSuchKey) {
// skip buckets without acl tag
continue
}
if err != nil {
return s3response.ListAllMyBucketsResult{}, fmt.Errorf("get acl tag: %w", err)
}
acl, err := auth.ParseACL(aclJSON)
if err != nil {
return s3response.ListAllMyBucketsResult{}, err
}
if acl.Owner == input.Owner {
buckets = append(buckets, s3response.ListAllMyBucketsEntry{
Name: fi.Name(),
CreationDate: fi.ModTime(),
})
}
}
return s3response.ListAllMyBucketsResult{
Buckets: s3response.ListAllMyBucketsList{
Bucket: buckets,
},
Owner: s3response.CanonicalUser{
ID: input.Owner,
},
Prefix: input.Prefix,
ContinuationToken: cToken,
}, nil
}
func (p *Posix) isBucketValid(bucket string) bool {
if !p.validateBucketName {
return true
}
return backend.IsValidDirectoryName(bucket)
}
func (p *Posix) HeadBucket(ctx context.Context, input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if !p.isBucketValid(*input.Bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, *input.Bucket)
}
_, err = os.Lstat(*input.Bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, *input.Bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
return &s3.HeadBucketOutput{}, nil
}
func (p *Posix) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, acl []byte) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
acct, ok := ctx.Value("bucket-owner").(auth.Account)
if !ok {
acct = auth.Account{}
}
uid, gid, doChown := p.getChownIDs(acct)
bucket := *input.Bucket
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
tagging, err := backend.ParseCreateBucketTags(input.CreateBucketConfiguration.Tags)
if err != nil {
return err
}
err = os.Mkdir(bucket, p.newDirPerm)
if err != nil && os.IsExist(err) {
aclJSON, err := p.meta.RetrieveAttribute(nil, bucket, "", aclkey)
if err != nil {
return fmt.Errorf("get bucket acl: %w", err)
}
acl, err := auth.ParseACL(aclJSON)
if err != nil {
return err
}
if acl.Owner == acct.Access {
return s3err.GetBucketErr(s3err.ErrBucketAlreadyOwnedByYou, bucket)
}
return s3err.GetBucketErr(s3err.ErrBucketAlreadyExists, bucket)
}
if err != nil {
if errors.Is(err, syscall.EROFS) {
return s3err.GetAPIError(s3err.ErrMethodNotAllowed)
}
return fmt.Errorf("mkdir bucket: %w", err)
}
if doChown {
err := os.Chown(bucket, uid, gid)
if err != nil {
return fmt.Errorf("chown bucket: %w", err)
}
}
err = p.meta.StoreAttribute(nil, bucket, "", aclkey, acl)
if err != nil {
return fmt.Errorf("set acl: %w", err)
}
err = p.meta.StoreAttribute(nil, bucket, "", ownershipkey, []byte(input.ObjectOwnership))
if err != nil {
return fmt.Errorf("set ownership: %w", err)
}
if tagging != nil {
tags, err := json.Marshal(tagging)
if err != nil {
return fmt.Errorf("marshal tags: %w", err)
}
err = p.meta.StoreAttribute(nil, bucket, "", tagHdr, tags)
if err != nil {
return fmt.Errorf("set tags: %w", err)
}
}
if input.ObjectLockEnabledForBucket != nil && *input.ObjectLockEnabledForBucket {
// First enable bucket versioning
// Bucket versioning is enabled automatically with object lock
if p.versioningEnabled() {
err = p.PutBucketVersioning(withCtxNoSlot(ctx), bucket, types.BucketVersioningStatusEnabled)
if err != nil {
return err
}
}
now := time.Now()
defaultLock := auth.BucketLockConfig{
Enabled: true,
CreatedAt: &now,
}
defaultLockParsed, err := json.Marshal(defaultLock)
if err != nil {
return fmt.Errorf("parse default bucket lock state: %w", err)
}
err = p.meta.StoreAttribute(nil, bucket, "", bucketLockKey, defaultLockParsed)
if err != nil {
return fmt.Errorf("set default bucket lock: %w", err)
}
}
return nil
}
func (p *Posix) isBucketEmpty(bucket string) error {
if p.versioningEnabled() {
ents, err := os.ReadDir(filepath.Join(p.versioningDir, bucket))
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("readdir bucket: %w", err)
}
if err == nil {
if len(ents) == 1 && ents[0].Name() != MetaTmpDir {
return s3err.GetBucketErr(s3err.ErrVersionedBucketNotEmpty, bucket)
} else if len(ents) > 1 {
return s3err.GetBucketErr(s3err.ErrVersionedBucketNotEmpty, bucket)
}
}
}
ents, err := os.ReadDir(bucket)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("readdir bucket: %w", err)
}
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if len(ents) == 1 && ents[0].Name() != MetaTmpDir {
return s3err.GetBucketErr(s3err.ErrBucketNotEmpty, bucket)
} else if len(ents) > 1 {
return s3err.GetBucketErr(s3err.ErrBucketNotEmpty, bucket)
}
return nil
}
func (p *Posix) DeleteBucket(ctx context.Context, bucket string) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
// Check if the bucket is empty
err = p.isBucketEmpty(bucket)
if err != nil {
return err
}
// Remove the bucket
err = os.RemoveAll(bucket)
if err != nil {
return fmt.Errorf("remove bucket: %w", err)
}
// Bucket data is already removed; a metadata cleanup failure orphans only
// the sidecar directory, which is not user-visible. Log and continue rather
// than returning an error that would mislead callers into thinking the
// bucket still exists.
if err = p.meta.DeleteAttributes(bucket, ""); err != nil {
debuglogger.Logf("failed to delete bucket sidecar attributes (%q): %v", bucket, err)
}
// Remove the bucket from versioning directory
if p.versioningEnabled() {
err = os.RemoveAll(filepath.Join(p.versioningDir, bucket))
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove bucket version: %w", err)
}
}
return nil
}
func (p *Posix) PutBucketOwnershipControls(ctx context.Context, bucket string, ownership types.ObjectOwnership) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
err = p.meta.StoreAttribute(nil, bucket, "", ownershipkey, []byte(ownership))
if err != nil {
return fmt.Errorf("set ownership: %w", err)
}
return nil
}
func (p *Posix) GetBucketOwnershipControls(ctx context.Context, bucket string) (types.ObjectOwnership, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return "", err
}
defer release()
var ownship types.ObjectOwnership
if !p.isBucketValid(bucket) {
return ownship, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return ownship, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return ownship, fmt.Errorf("stat bucket: %w", err)
}
ownership, err := p.meta.RetrieveAttribute(nil, bucket, "", ownershipkey)
if errors.Is(err, meta.ErrNoSuchKey) {
return ownship, s3err.GetBucketErr(s3err.ErrOwnershipControlsNotFound, bucket)
}
if err != nil {
return ownship, fmt.Errorf("get bucket ownership status: %w", err)
}
return types.ObjectOwnership(ownership), nil
}
func (p *Posix) DeleteBucketOwnershipControls(ctx context.Context, bucket string) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
err = p.meta.DeleteAttribute(bucket, "", ownershipkey)
if err != nil {
if errors.Is(err, meta.ErrNoSuchKey) {
return nil
}
return fmt.Errorf("delete ownership: %w", err)
}
return nil
}
func (p *Posix) PutBucketVersioning(ctx context.Context, bucket string, status types.BucketVersioningStatus) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
if !p.versioningEnabled() {
return s3err.GetAPIError(s3err.ErrVersioningNotConfigured)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
// Store 1 bit for bucket versioning state
var versioning []byte
switch status {
case types.BucketVersioningStatusEnabled:
// '1' maps to 'Enabled'
versioning = []byte{1}
case types.BucketVersioningStatusSuspended:
lockRaw, err := p.GetObjectLockConfiguration(withCtxNoSlot(ctx), bucket)
if err != nil && !errors.Is(err, s3err.GetAPIError(s3err.ErrObjectLockConfigurationNotFound)) {
return err
}
if err == nil {
lockStatus, err := auth.ParseBucketLockConfigurationOutput(lockRaw)
if err != nil {
return err
}
if lockStatus.ObjectLockEnabled == types.ObjectLockEnabledEnabled {
return s3err.GetAPIError(s3err.ErrSuspendedVersioningNotAllowed)
}
}
// '0' maps to 'Suspended'
versioning = []byte{0}
}
err = p.meta.StoreAttribute(nil, bucket, "", versioningKey, versioning)
if err != nil {
return fmt.Errorf("set versioning: %w", err)
}
return nil
}
func (p *Posix) GetBucketVersioning(ctx context.Context, bucket string) (s3response.GetBucketVersioningOutput, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.GetBucketVersioningOutput{}, err
}
defer release()
if !p.versioningEnabled() {
return s3response.GetBucketVersioningOutput{}, s3err.GetAPIError(s3err.ErrVersioningNotConfigured)
}
if !p.isBucketValid(bucket) {
return s3response.GetBucketVersioningOutput{}, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.GetBucketVersioningOutput{}, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return s3response.GetBucketVersioningOutput{}, fmt.Errorf("stat bucket: %w", err)
}
vData, err := p.meta.RetrieveAttribute(nil, bucket, "", versioningKey)
if errors.Is(err, meta.ErrNoSuchKey) {
return s3response.GetBucketVersioningOutput{}, nil
} else if err != nil {
return s3response.GetBucketVersioningOutput{}, fmt.Errorf("get bucket versioning config: %w", err)
}
enabled, suspended := types.BucketVersioningStatusEnabled, types.BucketVersioningStatusSuspended
switch vData[0] {
case 1:
return s3response.GetBucketVersioningOutput{
Status: &enabled,
}, nil
case 0:
return s3response.GetBucketVersioningOutput{
Status: &suspended,
}, nil
}
return s3response.GetBucketVersioningOutput{}, nil
}
// Returns the specified bucket versioning status
func (p *Posix) getBucketVersioningStatus(ctx context.Context, bucket string) (types.BucketVersioningStatus, error) {
res, err := p.GetBucketVersioning(withCtxNoSlot(ctx), bucket)
if errors.Is(err, s3err.GetAPIError(s3err.ErrVersioningNotConfigured)) {
return "", nil
}
if err != nil {
return "", err
}
if res.Status == nil {
return "", nil
}
return *res.Status, nil
}
// Checks if the given bucket versioning status is 'Enabled'
func (p *Posix) isBucketVersioningEnabled(s types.BucketVersioningStatus) bool {
return s == types.BucketVersioningStatusEnabled
}
// Checks if the given bucket versioning status is 'Suspended'
func (p *Posix) isBucketVersioningSuspended(s types.BucketVersioningStatus) bool {
return s == types.BucketVersioningStatusSuspended
}
// Generates the object version path in the versioning directory
func (p *Posix) genObjVersionPath(bucket, key string) string {
return filepath.Join(p.versioningDir, bucket, genObjVersionKey(key))
}
// Generates the versioning path for the given object key
func genObjVersionKey(key string) string {
sum := fmt.Sprintf("%x", sha256.Sum256([]byte(key)))
return filepath.Join(sum[:2], sum[2:4], sum[4:6], sum)
}
// Removes the null versionId object from versioning directory
func (p *Posix) deleteNullVersionIdObject(bucket, key string) error {
versionPath := filepath.Join(p.genObjVersionPath(bucket, key), nullVersionId)
err := os.Remove(versionPath)
if errors.Is(err, fs.ErrNotExist) {
return nil
}
if err != nil {
return err
}
_ = p.meta.DeleteAttributes(versionPath, "")
return nil
}
func isRemovableAttr(attr string) bool {
switch attr {
case objectLegalHoldKey, objectRetentionKey:
return true
default:
return false
}
}
// Creates a new copy(version) of an object in the versioning directory
func (p *Posix) createObjVersion(bucket, key string, size int64, acc auth.Account, removeAttributes bool) (versionPath string, err error) {
sf, err := os.Open(filepath.Join(bucket, key))
if err != nil {
return "", err
}
defer sf.Close()
var versionId string
data, err := p.meta.RetrieveAttribute(sf, bucket, key, versionIdKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return versionPath, fmt.Errorf("get object versionId: %w", err)
}
if err == nil {
versionId = string(data)
} else {
versionId = nullVersionId
}
attrs, err := p.meta.ListAttributes(bucket, key)
if err != nil {
return versionPath, fmt.Errorf("load object attributes: %w", err)
}
versionBucketPath := filepath.Join(p.versioningDir, bucket)
versioningKey := filepath.Join(genObjVersionKey(key), versionId)
versionTmpPath := filepath.Join(versionBucketPath, MetaTmpDir)
f, err := p.openTmpFile(versionTmpPath, versionBucketPath, versioningKey,
size, acc, doFalloc, p.forceNoTmpFile)
if err != nil {
return versionPath, err
}
defer f.cleanup()
_, err = io.Copy(f.File(), sf)
if err != nil {
return versionPath, err
}
versionPath = filepath.Join(versionBucketPath, versioningKey)
err = os.MkdirAll(filepath.Join(versionBucketPath, genObjVersionKey(key)), p.newDirPerm)
if err != nil {
return versionPath, err
}
// Copy the object attributes(metadata)
for _, attr := range attrs {
data, err := p.meta.RetrieveAttribute(sf, bucket, key, attr)
if err != nil {
return versionPath, fmt.Errorf("list %v attribute: %w", attr, err)
}
err = p.meta.StoreAttribute(f.File(), versionPath, "", attr, data)
if err != nil {
return versionPath, fmt.Errorf("store %v attribute: %w", attr, err)
}
// remove object lock attributes in delete marker
if removeAttributes && isRemovableAttr(attr) {
err := p.meta.DeleteAttribute(bucket, key, attr)
if err != nil {
return versionPath, fmt.Errorf("remove %s attribute: %w", attr, err)
}
}
}
if err := f.link(); err != nil {
return versionPath, err
}
return versionPath, nil
}
func (p *Posix) ListObjectVersions(ctx context.Context, input *s3.ListObjectVersionsInput) (s3response.ListVersionsResult, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.ListVersionsResult{}, err
}
defer release()
bucket := *input.Bucket
var prefix, delim, keyMarker, versionIdMarker string
var max int
if !p.isBucketValid(bucket) {
return s3response.ListVersionsResult{}, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
if input.Prefix != nil {
prefix = *input.Prefix
}
if input.Delimiter != nil {
delim = *input.Delimiter
}
if input.KeyMarker != nil {
keyMarker = *input.KeyMarker
}
if input.VersionIdMarker != nil {
versionIdMarker = *input.VersionIdMarker
}
if input.MaxKeys != nil {
max = int(*input.MaxKeys)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.ListVersionsResult{}, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return s3response.ListVersionsResult{}, fmt.Errorf("stat bucket: %w", err)
}
fileSystem := os.DirFS(bucket)
results, err := backend.WalkVersions(ctx, fileSystem, prefix, delim, keyMarker, versionIdMarker, max,
p.fileToObjVersions(bucket), []string{MetaTmpDir})
if err != nil {
return s3response.ListVersionsResult{}, fmt.Errorf("walk %v: %w", bucket, err)
}
return s3response.ListVersionsResult{
CommonPrefixes: results.CommonPrefixes,
DeleteMarkers: results.DelMarkers,
Delimiter: &delim,
IsTruncated: &results.Truncated,
KeyMarker: &keyMarker,
MaxKeys: input.MaxKeys,
Name: input.Bucket,
NextKeyMarker: &results.NextMarker,
NextVersionIdMarker: &results.NextVersionIdMarker,
Prefix: &prefix,
VersionIdMarker: &versionIdMarker,
Versions: results.ObjectVersions,
}, nil
}
func getBoolPtr(b bool) *bool {
return &b
}
// ensureNotDeleteMarker return a `MethodNotAllowd` error
// if the provided object(version) is a delete marker
func (p *Posix) ensureNotDeleteMarker(bucket, object, versionId string) error {
if !p.versioningEnabled() {
return nil
}
// With path-based metadata backends (e.g. sidecar), RetrieveAttribute
// returns ErrNoSuchKey whether the sidecar attribute is absent OR the
// data file simply doesn't exist — the two cases are indistinguishable
// from metadata alone. Verify the data file directly so callers
// receive the correct NoSuchVersion / NoSuchKey error.
if _, statErr := os.Stat(filepath.Join(bucket, object)); errors.Is(statErr, fs.ErrNotExist) || isErrNotDir(statErr) {
if versionId != "" {
return s3err.GetAPIError(s3err.ErrNoSuchVersion)
}
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
_, err := p.meta.RetrieveAttribute(nil, bucket, object, deleteMarkerKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if versionId != "" {
return s3err.GetNoSuchVersionErr(object, versionId)
}
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if errors.Is(err, meta.ErrNoSuchKey) {
return nil
}
if err != nil {
return fmt.Errorf("get delete marker attr: %w", err)
}
return s3err.GetAPIError(s3err.ErrMethodNotAllowed)
}
// Check if the given object is a delete marker
func (p *Posix) isObjDeleteMarker(bucket, object string) (bool, error) {
_, err := p.meta.RetrieveAttribute(nil, bucket, object, deleteMarkerKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return false, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if errors.Is(err, meta.ErrNoSuchKey) {
return false, nil
}
if err != nil {
return false, fmt.Errorf("get object delete-marker: %w", err)
}
return true, nil
}
// Converts the file to object version. Finds all the object versions,
// delete markers from the versioning directory and returns
func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
return func(path, versionIdMarker string, pastVersionIdMarker *bool, availableObjCount int, d fs.DirEntry) (*backend.ObjVersionFuncResult, error) {
var objects []s3response.ObjectVersion
var delMarkers []types.DeleteMarkerEntry
// if the number of available objects is 0, return truncated response
if availableObjCount <= 0 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
}, nil
}
if d.IsDir() {
// directory object only happens if directory empty
// check to see if this is a directory object by checking etag
etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey)
if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
if err != nil {
return nil, fmt.Errorf("get etag: %w", err)
}
etag := string(etagBytes)
fi, err := d.Info()
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
if err != nil {
return nil, fmt.Errorf("get fileinfo: %w", err)
}
key := path + "/"
// Directory objects don't contain data
size := int64(0)
versionId := "null"
objects = append(objects, s3response.ObjectVersion{
ETag: &etag,
Key: &key,
LastModified: backend.GetTimePtr(fi.ModTime()),
IsLatest: getBoolPtr(true),
Size: &size,
VersionId: &versionId,
StorageClass: types.ObjectVersionStorageClassStandard,
})
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: availableObjCount == 1,
}, nil
}
// file object, get object info and fill out object data
etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get etag: %w", err)
}
// note: meta.ErrNoSuchKey will return etagBytes = []byte{}
// so this will just set etag to "" if its not already set
etag := string(etagBytes)
// If the object doesn't have versionId, it's 'null'
versionId := "null"
versionIdBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, versionIdKey)
if err == nil {
versionId = string(versionIdBytes)
}
if versionId == versionIdMarker {
*pastVersionIdMarker = true
}
if *pastVersionIdMarker {
fi, err := d.Info()
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
if err != nil {
return nil, fmt.Errorf("get fileinfo: %w", err)
}
size := fi.Size()
isDel, err := p.isObjDeleteMarker(bucket, path)
if err != nil {
return nil, err
}
if isDel {
delMarkers = append(delMarkers, types.DeleteMarkerEntry{
IsLatest: getBoolPtr(true),
VersionId: &versionId,
LastModified: backend.GetTimePtr(fi.ModTime()),
Key: &path,
})
} else {
// Retrieve checksum
checksum, err := p.retrieveChecksums(nil, bucket, path)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get checksum: %w", err)
}
objects = append(objects, s3response.ObjectVersion{
ETag: &etag,
Key: &path,
LastModified: backend.GetTimePtr(fi.ModTime()),
Size: &size,
VersionId: &versionId,
IsLatest: getBoolPtr(true),
StorageClass: types.ObjectVersionStorageClassStandard,
ChecksumAlgorithm: []types.ChecksumAlgorithm{checksum.Algorithm},
ChecksumType: checksum.Type,
})
}
availableObjCount--
if availableObjCount == 0 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
NextVersionIdMarker: versionId,
}, nil
}
}
if !p.versioningEnabled() {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
}, nil
}
// List all the versions of the object in the versioning directory
versionPath := p.genObjVersionPath(bucket, path)
dirEnts, err := os.ReadDir(versionPath)
if errors.Is(err, fs.ErrNotExist) {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
}, nil
}
if err != nil {
return nil, fmt.Errorf("read version dir: %w", err)
}
if len(dirEnts) == 0 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
}, nil
}
// First find the null versionId object(if exists)
// before starting the object versions listing
var nullVersionIdObj *s3response.ObjectVersion
var nullObjDelMarker *types.DeleteMarkerEntry
nf, err := os.Stat(filepath.Join(versionPath, nullVersionId))
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return nil, err
}
if err == nil {
isDel, err := p.isObjDeleteMarker(versionPath, nullVersionId)
if err != nil {
return nil, err
}
// Check to see if the null versionId object is delete marker or not
if isDel {
nullObjDelMarker = &types.DeleteMarkerEntry{
VersionId: backend.GetPtrFromString("null"),
LastModified: backend.GetTimePtr(nf.ModTime()),
Key: &path,
IsLatest: getBoolPtr(false),
}
} else {
etagBytes, err := p.meta.RetrieveAttribute(nil, versionPath, nullVersionId, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get etag: %w", err)
}
// note: meta.ErrNoSuchKey will return etagBytes = []byte{}
// so this will just set etag to "" if its not already set
etag := string(etagBytes)
size := nf.Size()
// Retrieve checksum
checksum, err := p.retrieveChecksums(nil, versionPath, nullVersionId)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get checksum: %w", err)
}
nullVersionIdObj = &s3response.ObjectVersion{
ETag: &etag,
Key: &path,
LastModified: backend.GetTimePtr(nf.ModTime()),
Size: &size,
VersionId: backend.GetPtrFromString("null"),
IsLatest: getBoolPtr(false),
StorageClass: types.ObjectVersionStorageClassStandard,
ChecksumAlgorithm: []types.ChecksumAlgorithm{
checksum.Algorithm,
},
ChecksumType: checksum.Type,
}
}
}
isNullVersionIdObjFound := nullVersionIdObj != nil || nullObjDelMarker != nil
if len(dirEnts) == 1 && (isNullVersionIdObjFound) {
if nullObjDelMarker != nil {
delMarkers = append(delMarkers, *nullObjDelMarker)
}
if nullVersionIdObj != nil {
objects = append(objects, *nullVersionIdObj)
}
if availableObjCount == 1 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
NextVersionIdMarker: nullVersionId,
}, nil
} else {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
}, nil
}
}
isNullVersionIdObjAdded := false
for i := len(dirEnts) - 1; i >= 0; i-- {
dEntry := dirEnts[i]
// Skip the null versionId object to not
// break the object versions list
if dEntry.Name() == nullVersionId {
continue
}
f, err := dEntry.Info()
if errors.Is(err, fs.ErrNotExist) {
continue
}
if err != nil {
return nil, fmt.Errorf("get fileinfo: %w", err)
}
// If the null versionId object is found, first push it
// by checking its creation date, then continue the adding
if isNullVersionIdObjFound && !isNullVersionIdObjAdded {
if nf.ModTime().After(f.ModTime()) {
if nullVersionIdObj != nil {
objects = append(objects, *nullVersionIdObj)
}
if nullObjDelMarker != nil {
delMarkers = append(delMarkers, *nullObjDelMarker)
}
isNullVersionIdObjAdded = true
if availableObjCount--; availableObjCount == 0 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
NextVersionIdMarker: nullVersionId,
}, nil
}
}
}
versionId := f.Name()
size := f.Size()
if !*pastVersionIdMarker {
if versionId == versionIdMarker {
*pastVersionIdMarker = true
}
continue
}
etagBytes, err := p.meta.RetrieveAttribute(nil, versionPath, versionId, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get etag: %w", err)
}
// note: meta.ErrNoSuchKey will return etagBytes = []byte{}
// so this will just set etag to "" if its not already set
etag := string(etagBytes)
isDel, err := p.isObjDeleteMarker(versionPath, versionId)
if err != nil {
return nil, err
}
if isDel {
delMarkers = append(delMarkers, types.DeleteMarkerEntry{
VersionId: &versionId,
LastModified: backend.GetTimePtr(f.ModTime()),
Key: &path,
IsLatest: getBoolPtr(false),
})
} else {
// Retrieve checksum
checksum, err := p.retrieveChecksums(nil, versionPath, versionId)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get checksum: %w", err)
}
objects = append(objects, s3response.ObjectVersion{
ETag: &etag,
Key: &path,
LastModified: backend.GetTimePtr(f.ModTime()),
Size: &size,
VersionId: &versionId,
IsLatest: getBoolPtr(false),
StorageClass: types.ObjectVersionStorageClassStandard,
ChecksumAlgorithm: []types.ChecksumAlgorithm{checksum.Algorithm},
ChecksumType: checksum.Type,
})
}
// if the available object count reaches to 0, return truncated response with nextVersionIdMarker
availableObjCount--
if availableObjCount == 0 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
NextVersionIdMarker: versionId,
}, nil
}
}
// If null versionId object is found but not yet pushed,
// push it after the listing, as it's the oldest object version
if isNullVersionIdObjFound && !isNullVersionIdObjAdded {
if nullVersionIdObj != nil {
objects = append(objects, *nullVersionIdObj)
}
if nullObjDelMarker != nil {
delMarkers = append(delMarkers, *nullObjDelMarker)
}
if availableObjCount--; availableObjCount == 0 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
NextVersionIdMarker: nullVersionId,
}, nil
}
}
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
}, nil
}
}
func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu s3response.CreateMultipartUploadInput) (s3response.InitiateMultipartUploadResult, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.InitiateMultipartUploadResult{}, err
}
defer release()
if mpu.Key == nil {
return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
bucket := *mpu.Bucket
object := *mpu.Key
if !p.isBucketValid(bucket) {
return s3response.InitiateMultipartUploadResult{}, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.InitiateMultipartUploadResult{}, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("stat bucket: %w", err)
}
if strings.HasSuffix(*mpu.Key, "/") {
// directory objects can't be uploaded with multipart uploads
// because posix directories can't contain data
return s3response.InitiateMultipartUploadResult{}, s3err.GetAPIError(s3err.ErrDirectoryObjectContainsData)
}
// parse object tags
tags, err := backend.ParseObjectTags(getString(mpu.Tagging))
if err != nil {
return s3response.InitiateMultipartUploadResult{}, err
}
// generate random uuid for upload id
uploadID := uuid.New().String()
// hash object name for multipart container
objNameSum := sha256.Sum256([]byte(*mpu.Key))
// multiple uploads for same object name allowed,
// they will all go into the same hashed name directory
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", objNameSum))
tmppath := filepath.Join(bucket, objdir)
// the unique upload id is a directory for all of the parts
// associated with this specific multipart upload
err = os.MkdirAll(filepath.Join(tmppath, uploadID), 0755)
if err != nil {
return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("create upload temp dir: %w", err)
}
// set an attribute with the original object name so that we can
// map the hashed name back to the original object name
err = p.meta.StoreAttribute(nil, bucket, objdir, onameAttr, []byte(object))
if err != nil {
// if we fail, cleanup the container directories
// but ignore errors because there might still be
// other uploads for the same object name outstanding
os.RemoveAll(filepath.Join(tmppath, uploadID))
os.Remove(tmppath)
return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("set name attr for upload: %w", err)
}
// set object tagging
if tags != nil {
err := p.PutObjectTagging(withCtxNoSlot(ctx), bucket, filepath.Join(objdir, uploadID), "", tags)
if err != nil {
// cleanup object if returning error
os.RemoveAll(filepath.Join(tmppath, uploadID))
os.Remove(tmppath)
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
return s3response.InitiateMultipartUploadResult{}, err
}
}
err = p.storeObjectMetaProperties(nil, bucket, filepath.Join(objdir, uploadID),
metaProperties{
ContentType: mpu.ContentType,
ContentEncoding: mpu.ContentEncoding,
ContentDisposition: mpu.ContentDisposition,
ContentLanguage: mpu.ContentLanguage,
CacheControl: mpu.CacheControl,
Expires: mpu.Expires,
WebsiteRedirectLocation: mpu.WebsiteRedirectLocation,
Metadata: mpu.Metadata,
})
if err != nil {
// cleanup object if returning error
os.RemoveAll(filepath.Join(tmppath, uploadID))
os.Remove(tmppath)
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
return s3response.InitiateMultipartUploadResult{}, err
}
// set object legal hold
if mpu.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn {
err := p.PutObjectLegalHold(withCtxNoSlot(ctx), bucket, filepath.Join(objdir, uploadID), "", true)
if err != nil {
if errors.Is(err, s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)) {
err = s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
// cleanup object if returning error
os.RemoveAll(filepath.Join(tmppath, uploadID))
os.Remove(tmppath)
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
return s3response.InitiateMultipartUploadResult{}, err
}
}
// Set object retention
if mpu.ObjectLockMode != "" {
retention := types.ObjectLockRetention{
Mode: types.ObjectLockRetentionMode(mpu.ObjectLockMode),
RetainUntilDate: mpu.ObjectLockRetainUntilDate,
}
retParsed, err := json.Marshal(retention)
if err != nil {
// cleanup object if returning error
os.RemoveAll(filepath.Join(tmppath, uploadID))
os.Remove(tmppath)
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("parse object lock retention: %w", err)
}
err = p.PutObjectRetention(withCtxNoSlot(ctx), bucket, filepath.Join(objdir, uploadID), "", retParsed)
if err != nil {
if errors.Is(err, s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)) {
err = s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
// cleanup object if returning error
os.RemoveAll(filepath.Join(tmppath, uploadID))
os.Remove(tmppath)
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
return s3response.InitiateMultipartUploadResult{}, err
}
}
// Set object checksum algorithm
if mpu.ChecksumAlgorithm != "" {
err := p.storeChecksums(nil, bucket, filepath.Join(objdir, uploadID), s3response.Checksum{
Algorithm: mpu.ChecksumAlgorithm,
Type: mpu.ChecksumType,
})
if err != nil {
// cleanup object if returning error
_ = os.RemoveAll(filepath.Join(tmppath, uploadID))
_ = os.Remove(tmppath)
_ = p.meta.DeleteAttributes(bucket, filepath.Join(objdir, uploadID))
return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("store mp checksum algorithm: %w", err)
}
}
return s3response.InitiateMultipartUploadResult{
Bucket: bucket,
Key: object,
UploadId: uploadID,
}, nil
}
// getChownIDs returns the uid and gid that should be used for chowning
// the object to the account uid/gid. It also returns a boolean indicating
// if chowning is needed.
func (p *Posix) getChownIDs(acct auth.Account) (int, int, bool) {
uid := p.euid
gid := p.egid
var needsChown bool
if p.chownuid && acct.UserID != p.euid {
uid = acct.UserID
needsChown = true
}
if p.chowngid && acct.GroupID != p.egid {
gid = acct.GroupID
needsChown = true
}
return uid, gid, needsChown
}
func getPartChecksum(algo types.ChecksumAlgorithm, part types.CompletedPart) string {
switch algo {
case types.ChecksumAlgorithmCrc32:
return backend.GetStringFromPtr(part.ChecksumCRC32)
case types.ChecksumAlgorithmCrc32c:
return backend.GetStringFromPtr(part.ChecksumCRC32C)
case types.ChecksumAlgorithmSha1:
return backend.GetStringFromPtr(part.ChecksumSHA1)
case types.ChecksumAlgorithmSha256:
return backend.GetStringFromPtr(part.ChecksumSHA256)
case types.ChecksumAlgorithmCrc64nvme:
return backend.GetStringFromPtr(part.ChecksumCRC64NVME)
case types.ChecksumAlgorithmSha512:
return backend.GetStringFromPtr(part.ChecksumSHA512)
case types.ChecksumAlgorithmMd5:
return backend.GetStringFromPtr(part.ChecksumMD5)
case types.ChecksumAlgorithmXxhash64:
return backend.GetStringFromPtr(part.ChecksumXXHASH64)
case types.ChecksumAlgorithmXxhash3:
return backend.GetStringFromPtr(part.ChecksumXXHASH3)
case types.ChecksumAlgorithmXxhash128:
return backend.GetStringFromPtr(part.ChecksumXXHASH128)
default:
return ""
}
}
func setStoredChecksum(checksum *s3response.Checksum, algo types.ChecksumAlgorithm, sum *string) {
if sum == nil {
return
}
switch algo {
case types.ChecksumAlgorithmCrc32:
checksum.CRC32 = sum
case types.ChecksumAlgorithmCrc32c:
checksum.CRC32C = sum
case types.ChecksumAlgorithmSha1:
checksum.SHA1 = sum
case types.ChecksumAlgorithmSha256:
checksum.SHA256 = sum
case types.ChecksumAlgorithmCrc64nvme:
checksum.CRC64NVME = sum
case types.ChecksumAlgorithmSha512:
checksum.SHA512 = sum
case types.ChecksumAlgorithmMd5:
checksum.MD5 = sum
case types.ChecksumAlgorithmXxhash64:
checksum.XXHASH64 = sum
case types.ChecksumAlgorithmXxhash3:
checksum.XXHASH3 = sum
case types.ChecksumAlgorithmXxhash128:
checksum.XXHASH128 = sum
}
}
func setUploadPartChecksum(res *s3.UploadPartOutput, algo types.ChecksumAlgorithm, sum *string) {
if sum == nil {
return
}
switch algo {
case types.ChecksumAlgorithmCrc32:
res.ChecksumCRC32 = sum
case types.ChecksumAlgorithmCrc32c:
res.ChecksumCRC32C = sum
case types.ChecksumAlgorithmSha1:
res.ChecksumSHA1 = sum
case types.ChecksumAlgorithmSha256:
res.ChecksumSHA256 = sum
case types.ChecksumAlgorithmCrc64nvme:
res.ChecksumCRC64NVME = sum
case types.ChecksumAlgorithmSha512:
res.ChecksumSHA512 = sum
case types.ChecksumAlgorithmMd5:
res.ChecksumMD5 = sum
case types.ChecksumAlgorithmXxhash64:
res.ChecksumXXHASH64 = sum
case types.ChecksumAlgorithmXxhash3:
res.ChecksumXXHASH3 = sum
case types.ChecksumAlgorithmXxhash128:
res.ChecksumXXHASH128 = sum
}
}
func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.CompleteMultipartUploadResult{}, "", err
}
defer release()
return p.CompleteMultipartUploadWithCopy(ctx, input, nil)
}
type onlyRead struct {
io.Reader
}
// CustomCopyFunc implements copying/moving data from one file descriptor
// to another. The bool return signifies if function is idempotent. If true
// the system will allow clients to retry CompleteMultipartUpload. If
// false, then once a copy function is called successfully the upload will
// be completely aborted on any subsequent failure.
type CustomCopyFunc func(from *os.File, to *os.File) (bool, error)
func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.CompleteMultipartUploadInput, customCopy CustomCopyFunc) (s3response.CompleteMultipartUploadResult, string, error) {
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
res := s3response.CompleteMultipartUploadResult{}
if input.Key == nil {
return res, "", s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if input.UploadId == nil {
return res, "", s3err.GetNoSuchUploadErr("")
}
if input.MultipartUpload == nil {
return res, "", s3err.GetAPIError(s3err.ErrInvalidRequest)
}
bucket := *input.Bucket
object := *input.Key
uploadID := *input.UploadId
parts := input.MultipartUpload.Parts
if !p.isBucketValid(bucket) {
return res, "", s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return res, "", s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return res, "", fmt.Errorf("stat bucket: %w", err)
}
// Rename the upload directory to <uploadId><ETag> to atomically claim
// the processing slot. A concurrent call with the same uploadId will compute
// the same ETag, so it will either find the directory still present (still
// processing) or gone (already completed) and react accordingly.
sum := sha256.Sum256([]byte(object))
objdirFull := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
uploadIDDir := filepath.Join(objdirFull, uploadID)
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := backend.GetMultipartMD5(parts)
if err != nil {
return res, "", err
}
activeUploadName := fmt.Sprintf("%s.%s%s", uploadID, strings.Trim(s3MD5, "\""), inProgressSuffix)
uploadIDInProgress := filepath.Join(objdirFull, activeUploadName)
err = os.Rename(uploadIDDir, uploadIDInProgress)
if errors.Is(err, fs.ErrNotExist) {
// Another call already claimed this slot and is still assembling the object.
if _, statErr := os.Stat(uploadIDInProgress); statErr == nil {
// Still in progress — treat as success for idempotency.
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, "", nil
}
// Directory is gone: the concurrent call already completed and cleaned up.
if _, statErr := os.Stat(filepath.Join(bucket, object)); statErr == nil {
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, "", nil
}
// Last resort: the object stat above may have lost a race with the
// concurrent call's link step. Check the mp-metadata xattr, as this
// multipart upload may have been finalized and the final object has been created
// before or by the racing request
if mpMetaBytes, statErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey); statErr == nil {
mpMeta, err := backend.UnmarshalMpUploadMetadata(mpMetaBytes, false)
if err != nil {
return res, "", fmt.Errorf("parse object multipart metadata: %w", err)
}
// The object may have been overwritten by a newer upload or
// it's the result of a completely different multipart upload; only
// treat it as our completion if the upload IDs match.
if mpMeta.UploadID != uploadID {
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
}, "", nil
}
return res, "", s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
if err != nil {
return res, "", fmt.Errorf("rename upload to etag dir: %w", err)
}
// Rename sidecar metadata to match the new data directory path.
// For xattr this is a no-op since attributes follow the inode.
metaObjDir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
oldMetaObj := filepath.Join(metaObjDir, uploadID)
newMetaObj := filepath.Join(metaObjDir, activeUploadName)
if err := p.meta.RenameObject(bucket, oldMetaObj, newMetaObj); err != nil {
// Roll back the data directory rename so a future retry can succeed.
os.Rename(uploadIDInProgress, uploadIDDir)
return res, "", fmt.Errorf("rename metadata for in-progress: %w", err)
}
// Best-effort rename back on failure so a future retry can still complete.
// On success, os.RemoveAll below removes uploadIDInProgress so this is a no-op.
defer os.Rename(uploadIDInProgress, uploadIDDir)
defer p.meta.RenameObject(bucket, newMetaObj, oldMetaObj)
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
if err == nil || errors.Is(err, fs.ErrNotExist) || errors.Is(err, meta.ErrNoSuchKey) {
err = backend.EvaluateObjectPutPreconditions(string(b), input.IfMatch, input.IfNoneMatch, err == nil)
if err != nil {
return res, "", err
}
}
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
checksums, err := p.retrieveChecksums(nil, bucket, filepath.Join(objdir, activeUploadName))
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return res, "", fmt.Errorf("get mp checksums: %w", err)
}
// ChecksumType should be the same as specified on CreateMultipartUpload
if input.ChecksumType != "" && checksums.Type != input.ChecksumType {
checksumType := checksums.Type
if checksumType == "" {
checksumType = types.ChecksumType("null")
}
return res, "", s3err.GetChecksumTypeMismatchOnMpErr(checksumType)
}
// mpChecksumType holds the multipart upload checksum type
mpChecksumType := checksums.Type
// The checksum type/algorithm should default to FULL_OBJECT(crc64nvme)
if checksums.Type == "" {
checksums.Type = types.ChecksumTypeFullObject
checksums.Algorithm = types.ChecksumAlgorithmCrc64nvme
}
// Initialize composite checksum reader
var compositeChecksumRdr *utils.CompositeChecksumReader
if checksums.Type == types.ChecksumTypeComposite {
compositeChecksumRdr, err = utils.NewCompositeChecksumReader(utils.HashType(strings.ToLower(string(checksums.Algorithm))))
if err != nil {
return res, "", fmt.Errorf("initialize composite checksum reader: %w", err)
}
}
// check all parts ok
last := len(parts) - 1
var totalsize int64
// cumulative byte offsets: partSizes[i] = sum of sizes of parts 1..i+1
var partSizes []int64
var composableCsum string
// The initial value is the lower limit of partNumber: 0
var partNumber int32
for i, part := range parts {
if part.PartNumber == nil {
return res, "", s3err.GetAPIError(s3err.ErrMalformedXML)
}
if *part.PartNumber < 1 {
return res, "", s3err.GetInvalidArgumentErr(s3err.InvalidArgCompleteMpPartNumber, fmt.Sprint(*part.PartNumber))
}
if *part.PartNumber <= partNumber {
return res, "", s3err.GetAPIError(s3err.ErrInvalidPartOrder)
}
partNumber = *part.PartNumber
partObjPath := filepath.Join(objdir, activeUploadName, fmt.Sprintf("%v", *part.PartNumber))
fullPartPath := filepath.Join(bucket, partObjPath)
fi, err := os.Lstat(fullPartPath)
if err != nil {
return res, "", s3err.GetInvalidPartErr(uploadID, *part.PartNumber, backend.GetStringFromPtr(part.ETag))
}
totalsize += fi.Size()
partSizes = append(partSizes, totalsize)
// all parts except the last need to be greater, than or equal to
// the minimum allowed size (5 Mib)
if i < last && fi.Size() < backend.MinPartSize {
return res, "", s3err.GetEntityTooSmallErr(fi.Size(), backend.MinPartSize)
}
b, err := p.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey)
etag := string(b)
if err != nil {
etag = ""
}
if parts[i].ETag == nil {
return res, "", s3err.GetAPIError(s3err.ErrMalformedXML)
}
if !backend.AreEtagsSame(etag, *parts[i].ETag) {
return res, "", s3err.GetInvalidPartErr(uploadID, *part.PartNumber, etag)
}
partChecksum, err := p.retrieveChecksums(nil, bucket, partObjPath)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return res, "", fmt.Errorf("get part checksum: %w", err)
}
// If checksum has been provided on mp initialization
err = validatePartChecksum(partChecksum, part, uploadID)
if err != nil {
return res, "", err
}
// Accumulate checksum state.
switch checksums.Type {
case types.ChecksumTypeFullObject:
var pcs string
if mpChecksumType != "" {
pcs = getPartChecksum(checksums.Algorithm, part)
} else {
crc64nvme, err := p.meta.RetrieveAttribute(nil, bucket, partObjPath, partCrc64nvme)
if err != nil {
return res, "", fmt.Errorf("retrieve part internal crc64nvme: %w", err)
}
pcs = string(crc64nvme)
}
if i == 0 {
composableCsum = pcs
} else {
composableCsum, err = utils.AddCRCChecksum(checksums.Algorithm, composableCsum, pcs, fi.Size())
if err != nil {
return res, "", fmt.Errorf("add part %v checksum: %w", *part.PartNumber, err)
}
}
case types.ChecksumTypeComposite:
if err := compositeChecksumRdr.Process(getPartChecksum(checksums.Algorithm, part)); err != nil {
return res, "", fmt.Errorf("process %v part checksum: %w", *part.PartNumber, err)
}
}
}
if input.MpuObjectSize != nil && totalsize != *input.MpuObjectSize {
return res, "", s3err.GetIncorrectMpObjectSizeErr(totalsize, *input.MpuObjectSize)
}
// Compute the final checksum value.
var value string
switch checksums.Type {
case types.ChecksumTypeComposite:
value = fmt.Sprintf("%s-%v", compositeChecksumRdr.Sum(), len(parts))
case types.ChecksumTypeFullObject:
value = composableCsum
}
var crc32 *string
var crc32c *string
var sha1 *string
var sha256 *string
var crc64nvme *string
var sha512 *string
var md5sum *string
var xxhash64 *string
var xxhash3 *string
var xxhash128 *string
var gotSum *string
switch checksums.Algorithm {
case types.ChecksumAlgorithmCrc32:
gotSum = input.ChecksumCRC32
checksums.CRC32 = &value
crc32 = &value
case types.ChecksumAlgorithmCrc32c:
gotSum = input.ChecksumCRC32C
checksums.CRC32C = &value
crc32c = &value
case types.ChecksumAlgorithmSha1:
gotSum = input.ChecksumSHA1
checksums.SHA1 = &value
sha1 = &value
case types.ChecksumAlgorithmSha256:
gotSum = input.ChecksumSHA256
checksums.SHA256 = &value
sha256 = &value
case types.ChecksumAlgorithmCrc64nvme:
gotSum = input.ChecksumCRC64NVME
checksums.CRC64NVME = &value
crc64nvme = &value
case types.ChecksumAlgorithmSha512:
gotSum = input.ChecksumSHA512
checksums.SHA512 = &value
sha512 = &value
case types.ChecksumAlgorithmMd5:
gotSum = input.ChecksumMD5
checksums.MD5 = &value
md5sum = &value
case types.ChecksumAlgorithmXxhash64:
gotSum = input.ChecksumXXHASH64
checksums.XXHASH64 = &value
xxhash64 = &value
case types.ChecksumAlgorithmXxhash3:
gotSum = input.ChecksumXXHASH3
checksums.XXHASH3 = &value
xxhash3 = &value
case types.ChecksumAlgorithmXxhash128:
gotSum = input.ChecksumXXHASH128
checksums.XXHASH128 = &value
xxhash128 = &value
}
// Check if the provided checksum and the calculated one are the same.
if mpChecksumType != "" && gotSum != nil {
s := *gotSum
if checksums.Type == types.ChecksumTypeComposite && !strings.Contains(s, "-") {
s = fmt.Sprintf("%s-%v", s, len(parts))
}
if s != value {
return res, "", s3err.GetChecksumBadDigestErr(checksums.Algorithm)
}
}
f, err := p.openTmpFile(filepath.Join(bucket, MetaTmpDir), bucket, object,
totalsize, acct, skipFalloc, p.forceNoTmpFile)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return res, "", s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
if errors.Is(err, syscall.ENOSPC) {
return res, "", s3err.GetAPIError(s3err.ErrNoSpaceLeftOnDevice)
}
return res, "", fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
var abortOnErrSet bool
for _, part := range parts {
partObjPath := filepath.Join(objdir, activeUploadName, fmt.Sprintf("%v", *part.PartNumber))
fullPartPath := filepath.Join(bucket, partObjPath)
pf, err := os.Open(fullPartPath)
if err != nil {
return res, "", fmt.Errorf("open part %v: %v", *part.PartNumber, err)
}
if customCopy != nil {
idemp, err := customCopy(pf, f.File())
if err != nil {
// Fail back to standard copy
debuglogger.Logf("custom data block move failed (%q/%q): %v, failing back to io.Copy()",
bucket, object, err)
fw := f.File()
fw.Seek(0, io.SeekEnd)
if p.forceNoCopyFileRange {
_, err = io.Copy(fw, &onlyRead{pf})
} else {
_, err = io.Copy(fw, pf)
}
}
if !idemp && err == nil {
// a successful non-idempotent call means we can no longer
// retry this upload. any failure needs to abort the complete
// upload.
if !abortOnErrSet {
defer func() {
// cleanup tmp dirs
os.RemoveAll(filepath.Join(bucket, objdir, activeUploadName))
// use Remove for objdir in case there are still other
// uploads for same object name outstanding, this will
// fail if there are any
os.Remove(filepath.Join(bucket, objdir))
}()
}
abortOnErrSet = true
}
} else {
if p.forceNoCopyFileRange {
_, err = io.Copy(f.File(), &onlyRead{pf})
} else {
_, err = io.Copy(f.File(), pf)
}
}
pf.Close()
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return res, "", s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
if errors.Is(err, syscall.ENOSPC) {
return res, "", s3err.GetAPIError(s3err.ErrNoSpaceLeftOnDevice)
}
return res, "", fmt.Errorf("copy part %v: %v", part.PartNumber, err)
}
}
upiddir := filepath.Join(objdir, activeUploadName)
objMeta := p.loadObjectMetaProperties(nil, bucket, upiddir, nil)
err = p.storeObjectMetaProperties(f.File(), bucket, object, objMeta)
if err != nil {
return res, "", err
}
objname := filepath.Join(bucket, object)
dir := filepath.Dir(objname)
if dir != "" {
uid, gid, doChown := p.getChownIDs(acct)
err = backend.MkdirAll(dir, uid, gid, doChown, p.newDirPerm)
if err != nil {
return res, "", err
}
}
vStatus, err := p.getBucketVersioningStatus(ctx, bucket)
if err != nil {
return res, "", err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)
d, err := os.Stat(objname)
// if the versioning is enabled first create the file object version
if p.versioningEnabled() && vEnabled && err == nil && !d.IsDir() {
_, err := p.createObjVersion(bucket, object, d.Size(), acct, false)
if err != nil {
return res, "", fmt.Errorf("create object version: %w", err)
}
// Clean up object-lock attrs that may have leaked from the previous
// version's path-based metadata into this (new) version's sidecar.
_ = p.meta.DeleteAttribute(bucket, object, objectLegalHoldKey)
_ = p.meta.DeleteAttribute(bucket, object, objectRetentionKey)
}
// if the versioning is enabled, generate a new versionID for the object
var versionID string
if p.versioningEnabled() && vEnabled {
versionID = ulid.Make().String()
err := p.meta.StoreAttribute(f.File(), bucket, object, versionIdKey, []byte(versionID))
if err != nil {
return res, "", fmt.Errorf("set versionId attr: %w", err)
}
}
// load and set tagging
tagging, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return res, "", fmt.Errorf("get object tagging: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging)
if err != nil {
return res, "", fmt.Errorf("set object tagging: %w", err)
}
}
// load and set legal hold
lHold, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return res, "", fmt.Errorf("get object legal hold: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold)
if err != nil {
return res, "", fmt.Errorf("set object legal hold: %w", err)
}
}
err = p.storeChecksums(f.File(), bucket, object, checksums)
if err != nil {
return res, "", fmt.Errorf("store object checksum: %w", err)
}
// load and set retention
ret, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return res, "", fmt.Errorf("get object retention: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret)
if err != nil {
return res, "", fmt.Errorf("set object retention: %w", err)
}
}
err = p.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5))
if err != nil {
return res, "", fmt.Errorf("set etag attr: %w", err)
}
// Store multipart upload metadata on the final object so that GetObject /
// HeadObject can serve individual parts by part-number.
mpMeta := backend.MpUploadMetadata{UploadID: uploadID, Parts: partSizes}
mpMetaBytes, err := backend.MarshalMpUploadMetadata(mpMeta, false)
if err != nil {
return res, "", fmt.Errorf("marshal object multipart metadata: %w", err)
}
err = p.meta.StoreAttribute(f.File(), bucket, object, mpMetaKey, mpMetaBytes)
if err != nil {
return res, "", fmt.Errorf("set object multipart metadata: %w", err)
}
err = f.link()
if err != nil {
return res, "", fmt.Errorf("link object in namespace: %w", err)
}
// cleanup tmp dirs
os.RemoveAll(filepath.Join(bucket, objdir, activeUploadName))
// use Remove for objdir in case there are still other uploads
// for same object name outstanding, this will fail if there are any
os.Remove(filepath.Join(bucket, objdir))
return s3response.CompleteMultipartUploadResult{
Bucket: &bucket,
ETag: &s3MD5,
Key: &object,
ChecksumCRC32: crc32,
ChecksumCRC32C: crc32c,
ChecksumSHA1: sha1,
ChecksumSHA256: sha256,
ChecksumCRC64NVME: crc64nvme,
ChecksumSHA512: sha512,
ChecksumMD5: md5sum,
ChecksumXXHASH64: xxhash64,
ChecksumXXHASH3: xxhash3,
ChecksumXXHASH128: xxhash128,
ChecksumType: &checksums.Type,
}, versionID, nil
}
func validatePartChecksum(checksum s3response.Checksum, part types.CompletedPart, uploadId string) error {
n, argValue := numberOfChecksums(part)
if n > 1 {
return s3err.GetInvalidArgumentErr(s3err.InvalidArgChecksumPart, argValue)
}
if checksum.Algorithm == "" {
if n != 0 {
return s3err.GetInvalidPartErr(uploadId, *part.PartNumber, *part.ETag)
}
return nil
}
algo := checksum.Algorithm
if n == 0 {
return s3err.APIError{
Code: "InvalidRequest",
Description: fmt.Sprintf("The upload was created using a %v checksum. The complete request must include the checksum for each part. It was missing for part %v in the request.", strings.ToLower(string(algo)), *part.PartNumber),
HTTPStatusCode: http.StatusBadRequest,
}
}
for _, cs := range []struct {
checksum *string
expectedChecksum string
algo types.ChecksumAlgorithm
}{
{part.ChecksumCRC32, getString(checksum.CRC32), types.ChecksumAlgorithmCrc32},
{part.ChecksumCRC32C, getString(checksum.CRC32C), types.ChecksumAlgorithmCrc32c},
{part.ChecksumSHA1, getString(checksum.SHA1), types.ChecksumAlgorithmSha1},
{part.ChecksumSHA256, getString(checksum.SHA256), types.ChecksumAlgorithmSha256},
{part.ChecksumCRC64NVME, getString(checksum.CRC64NVME), types.ChecksumAlgorithmCrc64nvme},
{part.ChecksumSHA512, getString(checksum.SHA512), types.ChecksumAlgorithmSha512},
{part.ChecksumMD5, getString(checksum.MD5), types.ChecksumAlgorithmMd5},
{part.ChecksumXXHASH64, getString(checksum.XXHASH64), types.ChecksumAlgorithmXxhash64},
{part.ChecksumXXHASH3, getString(checksum.XXHASH3), types.ChecksumAlgorithmXxhash3},
{part.ChecksumXXHASH128, getString(checksum.XXHASH128), types.ChecksumAlgorithmXxhash128},
} {
if cs.checksum == nil {
continue
}
if !utils.IsValidChecksum(*cs.checksum, cs.algo) {
return s3err.GetInvalidArgumentErr(s3err.InvalidArgChecksumPart, *cs.checksum)
}
if *cs.checksum != cs.expectedChecksum {
if algo == cs.algo {
return s3err.GetInvalidPartErr(uploadId, *part.PartNumber, *part.ETag)
}
return s3err.APIError{
Code: "BadDigest",
Description: fmt.Sprintf("The %v you specified for part %v did not match what we received.", strings.ToLower(string(cs.algo)), *part.PartNumber),
HTTPStatusCode: http.StatusBadRequest,
}
}
}
return nil
}
func numberOfChecksums(part types.CompletedPart) (int, string) {
counter := 0
builder := &strings.Builder{}
for _, ch := range []struct {
algo types.ChecksumAlgorithm
value *string
}{
{types.ChecksumAlgorithmCrc32, part.ChecksumCRC32},
{types.ChecksumAlgorithmCrc32c, part.ChecksumCRC32C},
{types.ChecksumAlgorithmCrc64nvme, part.ChecksumCRC64NVME},
{types.ChecksumAlgorithmSha1, part.ChecksumSHA1},
{types.ChecksumAlgorithmSha256, part.ChecksumSHA256},
} {
if getString(ch.value) != "" {
counter++
fmt.Fprintf(builder, "%s:%s;", string(ch.algo), getString(ch.value))
}
}
if getString(part.ChecksumSHA512) != "" {
counter++
}
if getString(part.ChecksumMD5) != "" {
counter++
}
if getString(part.ChecksumXXHASH64) != "" {
counter++
}
if getString(part.ChecksumXXHASH3) != "" {
counter++
}
if getString(part.ChecksumXXHASH128) != "" {
counter++
}
return counter, builder.String()
}
func (p *Posix) checkUploadIDExists(bucket, object, uploadID string) ([32]byte, error) {
sum := sha256.Sum256([]byte(object))
objdir := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
_, err := os.Stat(filepath.Join(objdir, uploadID))
if errors.Is(err, fs.ErrNotExist) {
return [32]byte{}, s3err.GetNoSuchUploadErr(uploadID)
}
if err != nil {
return [32]byte{}, fmt.Errorf("stat upload: %w", err)
}
return sum, nil
}
type metaProperties struct {
ContentType *string
ContentEncoding *string
ContentDisposition *string
ContentLanguage *string
CacheControl *string
Expires *string
WebsiteRedirectLocation *string
Metadata map[string]string
}
// loadObjectMetadata loads the given object metadata, if it fails to load
// it falls back to the old metadata key -> xattr key mapping mechanism
// and converts the old metadata storing schema with the new one
func (p *Posix) loadObjectMetadata(f *os.File, bucket, object string) map[string]string {
m, err := p.meta.RetrieveAttribute(f, bucket, object, metadataHdr)
if err != nil {
if errors.Is(err, meta.ErrNoSuchKey) {
// fallback to the deprecated mechanism
ents, err := p.meta.ListAttributes(bucket, object)
if err != nil || len(ents) == 0 {
return nil
}
result := map[string]string{}
legacyAttrs := make([]string, 0, len(ents))
for _, e := range ents {
if !isValidMeta(e) {
continue
}
b, err := p.meta.RetrieveAttribute(f, bucket, object, e)
if err != nil {
continue
}
if b == nil {
b = []byte{}
}
legacyAttrs = append(legacyAttrs, e)
result[strings.TrimPrefix(e, fmt.Sprintf("%v.", oldMetaHdr))] = string(b)
}
if len(result) != 0 {
err = p.storeObjectMetadata(f, bucket, object, result)
if err == nil {
// if it succeeded to store the metadata as a json
// object in user.metadata, cleanup the legacy X-Amz-Meta.
// attributes
for _, attr := range legacyAttrs {
_ = p.meta.DeleteAttribute(bucket, object, attr)
}
}
}
return result
}
return nil
}
var metadata map[string]string
_ = json.Unmarshal(m, &metadata)
return metadata
}
// loadObjectMetaProperties loads the given object meta properties
// including Content-Type, Cache-Control ... and the object Metadata
func (p *Posix) loadObjectMetaProperties(f *os.File, bucket, object string, fi *os.FileInfo) metaProperties {
var result metaProperties
b, err := p.meta.RetrieveAttribute(f, bucket, object, contentTypeHdr)
if err == nil {
result.ContentType = backend.GetPtrFromString(string(b))
}
if (result.ContentType == nil || *result.ContentType == "") && fi != nil {
if (*fi).IsDir() {
// this is the media type for directories in AWS and Nextcloud
result.ContentType = backend.GetPtrFromString("application/x-directory")
}
}
b, err = p.meta.RetrieveAttribute(f, bucket, object, contentEncHdr)
if err == nil {
result.ContentEncoding = backend.GetPtrFromString(string(b))
}
b, err = p.meta.RetrieveAttribute(f, bucket, object, contentDispHdr)
if err == nil {
result.ContentDisposition = backend.GetPtrFromString(string(b))
}
b, err = p.meta.RetrieveAttribute(f, bucket, object, contentLangHdr)
if err == nil {
result.ContentLanguage = backend.GetPtrFromString(string(b))
}
b, err = p.meta.RetrieveAttribute(f, bucket, object, cacheCtrlHdr)
if err == nil {
result.CacheControl = backend.GetPtrFromString(string(b))
}
b, err = p.meta.RetrieveAttribute(f, bucket, object, expiresHdr)
if err == nil {
result.Expires = backend.GetPtrFromString(string(b))
}
b, err = p.meta.RetrieveAttribute(f, bucket, object, websiteRedirectHdr)
if err == nil {
result.WebsiteRedirectLocation = backend.GetPtrFromString(string(b))
}
result.Metadata = p.loadObjectMetadata(f, bucket, object)
return result
}
// storeObjectMetadata parses the given metadata map[string]string
// to raw json to store in file/directory meta storage and checks/cleans up
// any legacy metadata persent in the object meta storage
func (p *Posix) storeObjectMetadata(f *os.File, bucket, object string, meta map[string]string) error {
if len(meta) == 0 {
return nil
}
parsed, err := json.Marshal(meta)
if err != nil {
return fmt.Errorf("marshal metadata: %w", err)
}
err = p.meta.StoreAttribute(f, bucket, object, metadataHdr, parsed)
if err != nil {
return err
}
// cleanup any previously set legacy metadata
ents, err := p.meta.ListAttributes(bucket, object)
if err != nil || len(ents) == 0 {
return nil
}
for _, ent := range ents {
if !isValidMeta(ent) {
continue
}
_ = p.meta.DeleteAttribute(bucket, object, ent)
}
return nil
}
// storeObjectMetaProperties stores the object meta properties like:
// Content-Type, Content-Encoding, object Metadata ...
func (p *Posix) storeObjectMetaProperties(f *os.File, bucket, object string, m metaProperties) error {
if getString(m.ContentType) != "" {
err := p.meta.StoreAttribute(f, bucket, object, contentTypeHdr, []byte(*m.ContentType))
if err != nil {
return fmt.Errorf("set content-type: %w", err)
}
}
if getString(m.ContentEncoding) != "" {
err := p.meta.StoreAttribute(f, bucket, object, contentEncHdr, []byte(*m.ContentEncoding))
if err != nil {
return fmt.Errorf("set content-encoding: %w", err)
}
}
if getString(m.ContentDisposition) != "" {
err := p.meta.StoreAttribute(f, bucket, object, contentDispHdr, []byte(*m.ContentDisposition))
if err != nil {
return fmt.Errorf("set content-disposition: %w", err)
}
}
if getString(m.ContentLanguage) != "" {
err := p.meta.StoreAttribute(f, bucket, object, contentLangHdr, []byte(*m.ContentLanguage))
if err != nil {
return fmt.Errorf("set content-language: %w", err)
}
}
if getString(m.CacheControl) != "" {
err := p.meta.StoreAttribute(f, bucket, object, cacheCtrlHdr, []byte(*m.CacheControl))
if err != nil {
return fmt.Errorf("set cache-control: %w", err)
}
}
if getString(m.Expires) != "" {
err := p.meta.StoreAttribute(f, bucket, object, expiresHdr, []byte(*m.Expires))
if err != nil {
return fmt.Errorf("set expires: %w", err)
}
}
if getString(m.WebsiteRedirectLocation) != "" {
err := p.meta.StoreAttribute(f, bucket, object, websiteRedirectHdr, []byte(*m.WebsiteRedirectLocation))
if err != nil {
return fmt.Errorf("set website-redirect-location: %w", err)
}
}
if m.Metadata != nil {
err := p.storeObjectMetadata(f, bucket, object, m.Metadata)
if err != nil {
return fmt.Errorf("set metadata: %w", err)
}
}
return nil
}
func isValidMeta(val string) bool {
return strings.HasPrefix(val, oldMetaHdr)
}
func (p *Posix) AbortMultipartUpload(ctx context.Context, mpu *s3.AbortMultipartUploadInput) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if mpu.Key == nil {
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if mpu.UploadId == nil {
return s3err.GetNoSuchUploadErr("")
}
bucket := *mpu.Bucket
object := *mpu.Key
uploadID := *mpu.UploadId
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
sum := sha256.Sum256([]byte(object))
objdir := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
f, err := os.Stat(filepath.Join(objdir, uploadID))
if err != nil {
return s3err.GetNoSuchUploadErr(uploadID)
}
if mpu.IfMatchInitiatedTime != nil {
if mpu.IfMatchInitiatedTime.Unix() != f.ModTime().Unix() {
return s3err.GetPreconditionFailedErr(s3err.ConditionIfMatchInitiatedTime)
}
}
err = os.RemoveAll(filepath.Join(objdir, uploadID))
if err != nil {
return fmt.Errorf("remove multipart upload container: %w", err)
}
os.Remove(objdir)
// Clean up sidecar metadata for the aborted upload. With xattr this is
// a no-op; with sidecar the metadata directory would otherwise be orphaned.
uploadMetaPath := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum), uploadID)
_ = p.meta.DeleteAttributes(bucket, uploadMetaPath)
return nil
}
func (p *Posix) ListMultipartUploads(ctx context.Context, mpu *s3.ListMultipartUploadsInput) (s3response.ListMultipartUploadsResult, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.ListMultipartUploadsResult{}, err
}
defer release()
var lmu s3response.ListMultipartUploadsResult
bucket := *mpu.Bucket
if !p.isBucketValid(bucket) {
return lmu, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
var delimiter string
if mpu.Delimiter != nil {
delimiter = *mpu.Delimiter
}
var prefix string
if mpu.Prefix != nil {
prefix = *mpu.Prefix
}
var keyMarker string
if mpu.KeyMarker != nil {
keyMarker = *mpu.KeyMarker
}
var uploadIDMarker string
if mpu.UploadIdMarker != nil {
uploadIDMarker = *mpu.UploadIdMarker
}
maxUploads := int(*mpu.MaxUploads)
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return lmu, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return lmu, fmt.Errorf("stat bucket: %w", err)
}
// ignore readdir error and use the empty list returned
objs, _ := os.ReadDir(filepath.Join(bucket, MetaTmpMultipartDir))
var uploads []s3response.Upload
for _, obj := range objs {
if !obj.IsDir() {
continue
}
b, err := p.meta.RetrieveAttribute(nil, bucket, filepath.Join(MetaTmpMultipartDir, obj.Name()), onameAttr)
if err != nil {
continue
}
objectName := string(b)
// filter by prefix
if prefix != "" && !strings.HasPrefix(objectName, prefix) {
continue
}
// filter by keyMarker
if keyMarker != "" && objectName <= keyMarker {
continue
}
upids, err := os.ReadDir(filepath.Join(bucket, MetaTmpMultipartDir, obj.Name()))
if err != nil {
continue
}
for _, upid := range upids {
if !upid.IsDir() {
continue
}
// skip directories that are currently being completed
if strings.HasSuffix(upid.Name(), inProgressSuffix) {
continue
}
fi, err := upid.Info()
if err != nil {
return lmu, fmt.Errorf("stat %q: %w", upid.Name(), err)
}
uploadID := upid.Name()
checksum, err := p.retrieveChecksums(nil, bucket, filepath.Join(MetaTmpMultipartDir, obj.Name(), uploadID))
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return lmu, fmt.Errorf("get mp checksum: %w", err)
}
uploads = append(uploads, s3response.Upload{
Key: objectName,
UploadID: uploadID,
StorageClass: types.StorageClassStandard,
Initiated: fi.ModTime(),
ChecksumAlgorithm: checksum.Algorithm,
ChecksumType: checksum.Type,
})
}
}
// Sort once: Key asc, Initiated asc
sort.SliceStable(uploads, func(i, j int) bool {
if uploads[i].Key != uploads[j].Key {
return uploads[i].Key < uploads[j].Key
}
return uploads[i].Initiated.Before(uploads[j].Initiated)
})
result, err := backend.ListMultipartUploads(uploads, prefix, delimiter, keyMarker, uploadIDMarker, maxUploads)
if err != nil {
return lmu, err
}
return s3response.ListMultipartUploadsResult{
Bucket: bucket,
Delimiter: delimiter,
KeyMarker: keyMarker,
MaxUploads: maxUploads,
Prefix: prefix,
NextKeyMarker: result.NextKeyMarker,
NextUploadIDMarker: result.NextUploadIDMarker,
UploadIDMarker: uploadIDMarker,
IsTruncated: result.IsTruncated,
Uploads: result.Uploads,
CommonPrefixes: result.CommonPrefixes,
}, nil
}
func (p *Posix) ListParts(ctx context.Context, input *s3.ListPartsInput) (s3response.ListPartsResult, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.ListPartsResult{}, err
}
defer release()
var lpr s3response.ListPartsResult
if input.Key == nil {
return lpr, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if input.UploadId == nil {
return lpr, s3err.GetNoSuchUploadErr("")
}
bucket := *input.Bucket
object := *input.Key
uploadID := *input.UploadId
if !p.isBucketValid(bucket) {
return lpr, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
stringMarker := ""
if input.PartNumberMarker != nil {
stringMarker = *input.PartNumberMarker
}
maxParts := int(*input.MaxParts)
var partNumberMarker int
if stringMarker != "" {
var err error
partNumberMarker, err = strconv.Atoi(stringMarker)
if err != nil {
return lpr, s3err.GetInvalidArgMaxLimiter("part-number-marker", *input.PartNumberMarker)
}
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return lpr, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return lpr, fmt.Errorf("stat bucket: %w", err)
}
sum, err := p.checkUploadIDExists(bucket, object, uploadID)
if err != nil {
return lpr, err
}
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
tmpdir := filepath.Join(bucket, objdir)
ents, err := os.ReadDir(filepath.Join(tmpdir, uploadID))
if errors.Is(err, fs.ErrNotExist) {
return lpr, s3err.GetNoSuchUploadErr(uploadID)
}
if err != nil {
return lpr, fmt.Errorf("readdir upload: %w", err)
}
checksum, err := p.retrieveChecksums(nil, tmpdir, uploadID)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return lpr, fmt.Errorf("get mp checksum: %w", err)
}
if checksum.Algorithm == "" {
checksum.Algorithm = types.ChecksumAlgorithm("null")
}
if checksum.Type == "" {
checksum.Type = types.ChecksumType("null")
}
parts := make([]s3response.Part, 0, len(ents))
for i, e := range ents {
if i%128 == 0 {
select {
case <-ctx.Done():
return s3response.ListPartsResult{}, ctx.Err()
default:
}
}
pn, err := strconv.Atoi(e.Name())
if err != nil {
// file is not a valid part file
continue
}
if pn <= partNumberMarker {
continue
}
partPath := filepath.Join(objdir, uploadID, e.Name())
b, err := p.meta.RetrieveAttribute(nil, bucket, partPath, etagkey)
etag := string(b)
if err != nil {
etag = ""
}
checksum, err := p.retrieveChecksums(nil, bucket, partPath)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
continue
}
fi, err := os.Lstat(filepath.Join(bucket, partPath))
if err != nil {
continue
}
parts = append(parts, s3response.Part{
PartNumber: pn,
ETag: etag,
LastModified: fi.ModTime(),
Size: fi.Size(),
ChecksumCRC32: checksum.CRC32,
ChecksumCRC32C: checksum.CRC32C,
ChecksumSHA1: checksum.SHA1,
ChecksumSHA256: checksum.SHA256,
ChecksumCRC64NVME: checksum.CRC64NVME,
ChecksumSHA512: checksum.SHA512,
ChecksumMD5: checksum.MD5,
ChecksumXXHASH64: checksum.XXHASH64,
ChecksumXXHASH3: checksum.XXHASH3,
ChecksumXXHASH128: checksum.XXHASH128,
})
}
sort.Slice(parts,
func(i int, j int) bool { return parts[i].PartNumber < parts[j].PartNumber })
oldLen := len(parts)
if maxParts > 0 && len(parts) > maxParts {
parts = parts[:maxParts]
}
newLen := len(parts)
nextpart := 0
if len(parts) != 0 {
nextpart = parts[len(parts)-1].PartNumber
}
return s3response.ListPartsResult{
Bucket: bucket,
IsTruncated: oldLen != newLen,
Key: object,
MaxParts: maxParts,
NextPartNumberMarker: nextpart,
PartNumberMarker: partNumberMarker,
Parts: parts,
UploadID: uploadID,
StorageClass: types.StorageClassStandard,
ChecksumAlgorithm: checksum.Algorithm,
ChecksumType: checksum.Type,
}, nil
}
type hashConfig struct {
value *string
hashType utils.HashType
}
func (p *Posix) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3.UploadPartOutput, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
return p.UploadPartWithPostFunc(ctx, input, func(*os.File) error { return nil })
}
func (p *Posix) UploadPartWithPostFunc(ctx context.Context, input *s3.UploadPartInput, postprocess func(f *os.File) error) (*s3.UploadPartOutput, error) {
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
if input.Key == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
bucket := *input.Bucket
object := *input.Key
uploadID := *input.UploadId
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
part := input.PartNumber
length := int64(0)
if input.ContentLength != nil {
length = *input.ContentLength
}
r := input.Body
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
sum := sha256.Sum256([]byte(object))
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
mpPath := filepath.Join(objdir, uploadID)
_, err = os.Stat(filepath.Join(bucket, mpPath))
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetNoSuchUploadErr(uploadID)
}
if err != nil {
return nil, fmt.Errorf("stat uploadid: %w", err)
}
partPath := filepath.Join(mpPath, fmt.Sprintf("%v", *part))
f, err := p.openTmpFile(filepath.Join(bucket, objdir),
bucket, partPath, length, acct, doFalloc, p.forceNoTmpFile)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
drainBody(r)
return nil, s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
if errors.Is(err, syscall.ENOSPC) {
drainBody(r)
return nil, s3err.GetAPIError(s3err.ErrNoSpaceLeftOnDevice)
}
return nil, fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
hash := md5.New()
tr := io.TeeReader(r, hash)
chRdr, chunkUpload := input.Body.(middlewares.ChecksumReader)
isTrailingChecksum := chunkUpload && chRdr.Algorithm() != ""
// user input checksum algorithm: either with chunk uploads or with request headers
var inputChAlgo utils.HashType
// user input checksum value specified with request headers
var inputSum string
if !isTrailingChecksum {
hashConfigs := []hashConfig{
{input.ChecksumCRC32, utils.HashTypeCRC32},
{input.ChecksumCRC32C, utils.HashTypeCRC32C},
{input.ChecksumSHA1, utils.HashTypeSha1},
{input.ChecksumSHA256, utils.HashTypeSha256},
{input.ChecksumCRC64NVME, utils.HashTypeCRC64NVME},
{input.ChecksumSHA512, utils.HashTypeSha512},
{input.ChecksumMD5, utils.HashTypeMd5},
{input.ChecksumXXHASH64, utils.HashTypeXXHASH64},
{input.ChecksumXXHASH3, utils.HashTypeXXHASH3},
{input.ChecksumXXHASH128, utils.HashTypeXXHASH128},
}
for _, config := range hashConfigs {
if config.value != nil {
inputChAlgo = config.hashType
inputSum = *config.value
break
}
}
} else {
inputChAlgo = utils.HashType(chRdr.Algorithm())
}
exposeChecksum := inputChAlgo != ""
checksums, err := p.retrieveChecksums(nil, bucket, mpPath)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("retrieve mp checksum: %w", err)
}
// If checksum isn't provided for the part,
// but it has been provided on mp initialization
// and checksum type is 'COMPOSITE', return mismatch error
if inputChAlgo == "" && checksums.Type == types.ChecksumTypeComposite {
return nil, s3err.GetChecksumTypeMismatchErr(checksums.Algorithm, "null")
}
// Check if the provided checksum algorithm match
// the one specified on mp initialization
if inputChAlgo != "" && checksums.Type != "" {
algo := types.ChecksumAlgorithm(strings.ToUpper(string(inputChAlgo)))
if checksums.Algorithm != algo {
return nil, s3err.GetChecksumTypeMismatchErr(checksums.Algorithm, algo)
}
}
if inputChAlgo == "" {
// default to crc64nvme
inputChAlgo = utils.HashTypeCRC64NVME
}
// hashreader is responsible to calculate and validate
// user input checksums
var hashRdr *utils.HashReader
// crc64nvmeRdr is used to calculate the object crc64nvme
// only for internal usage
var crc64nvmeRdr *utils.HashReader
if checksums.Type == "" {
if inputChAlgo != utils.HashTypeCRC64NVME {
// if the input checksum algorithm isn't crc64nvme, create a
// crc64nvme reader no matter if checksum is
// received from chunk reader or request headers.
crc64nvmeRdr, err = utils.NewHashReader(tr, "", utils.HashTypeCRC64NVME)
if err != nil {
return nil, fmt.Errorf("initialize crc64nvme hash reader: %w", err)
}
tr = crc64nvmeRdr
}
if !isTrailingChecksum {
// create a new hash reader for the user input checksum for calculation
// if the checksum doesn't come from chunk readers
hashRdr, err = utils.NewHashReader(tr, inputSum, inputChAlgo)
if err != nil {
return nil, fmt.Errorf("initialize hash reader: %w", err)
}
tr = hashRdr
}
} else {
// if no checksum algorithm or precalculated checksum is
// provided, but one has been on multipart upload initialization,
// anyways calculate and store the uploaded part checksum
if !isTrailingChecksum {
chAlgo := utils.HashType(strings.ToLower(string(checksums.Algorithm)))
hashRdr, err = utils.NewHashReader(tr, inputSum, chAlgo)
if err != nil {
return nil, fmt.Errorf("initialize hash reader: %w", err)
}
tr = hashRdr
}
}
_, err = io.Copy(f, tr)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
drainBody(tr)
return nil, s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
if errors.Is(err, syscall.ENOSPC) {
drainBody(tr)
return nil, s3err.GetAPIError(s3err.ErrNoSpaceLeftOnDevice)
}
// Return the error itself, if it implements the s3err.S3Error interface
if _, ok := err.(s3err.S3Error); ok {
return nil, err
}
return nil, fmt.Errorf("write part data: %w", err)
}
etag := backend.GenerateEtag(hash)
err = p.meta.StoreAttribute(f.File(), bucket, partPath, etagkey, []byte(etag))
if err != nil {
return nil, fmt.Errorf("set etag attr: %w", err)
}
res := &s3.UploadPartOutput{
ETag: &etag,
}
// if a checksum algorithm has been provided on mp initiation
// the checksums should be stored, otherwise only returned
// in the response without storing
storeChecksum := checksums.Type != ""
if storeChecksum {
checksum := s3response.Checksum{
Algorithm: checksums.Algorithm,
}
var sum string
if isTrailingChecksum {
sum = chRdr.Checksum()
}
if hashRdr != nil {
sum = hashRdr.Sum()
}
setStoredChecksum(&checksum, checksums.Algorithm, &sum)
setUploadPartChecksum(res, checksums.Algorithm, &sum)
err := p.storeChecksums(f.File(), bucket, partPath, checksum)
if err != nil {
return nil, fmt.Errorf("store checksum: %w", err)
}
} else {
var internalCrc64NvmeSum string
if inputChAlgo == utils.HashTypeCRC64NVME {
if isTrailingChecksum {
internalCrc64NvmeSum = chRdr.Checksum()
} else {
internalCrc64NvmeSum = hashRdr.Sum()
}
} else {
internalCrc64NvmeSum = crc64nvmeRdr.Sum()
}
err := p.meta.StoreAttribute(f.File(), bucket, partPath, partCrc64nvme, []byte(internalCrc64NvmeSum))
if err != nil {
return nil, fmt.Errorf("store part internal crc64nvme: %w", err)
}
if exposeChecksum {
var sumToReturn string
if isTrailingChecksum {
sumToReturn = chRdr.Checksum()
} else {
sumToReturn = hashRdr.Sum()
}
switch inputChAlgo {
case utils.HashTypeCRC32:
res.ChecksumCRC32 = &sumToReturn
case utils.HashTypeCRC32C:
res.ChecksumCRC32C = &sumToReturn
case utils.HashTypeSha1:
res.ChecksumSHA1 = &sumToReturn
case utils.HashTypeSha256:
res.ChecksumSHA256 = &sumToReturn
case utils.HashTypeCRC64NVME:
res.ChecksumCRC64NVME = &sumToReturn
case utils.HashTypeSha512:
res.ChecksumSHA512 = &sumToReturn
case utils.HashTypeMd5:
res.ChecksumMD5 = &sumToReturn
case utils.HashTypeXXHASH64:
res.ChecksumXXHASH64 = &sumToReturn
case utils.HashTypeXXHASH3:
res.ChecksumXXHASH3 = &sumToReturn
case utils.HashTypeXXHASH128:
res.ChecksumXXHASH128 = &sumToReturn
}
}
}
err = postprocess(f.File())
if err != nil {
return nil, fmt.Errorf("upload part post process failed: %w", err)
}
err = f.link()
if err != nil {
return nil, fmt.Errorf("link object in namespace: %w", err)
}
return res, nil
}
func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput) (s3response.CopyPartResult, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.CopyPartResult{}, err
}
defer release()
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
if upi.Key == nil {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if !p.isBucketValid(*upi.Bucket) {
return s3response.CopyPartResult{}, s3err.GetBucketErr(s3err.ErrInvalidBucketName, *upi.Bucket)
}
_, err = os.Stat(*upi.Bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyPartResult{}, s3err.GetBucketErr(s3err.ErrNoSuchBucket, *upi.Bucket)
}
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("stat bucket: %w", err)
}
sum := sha256.Sum256([]byte(*upi.Key))
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
_, err = os.Stat(filepath.Join(*upi.Bucket, objdir, *upi.UploadId))
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyPartResult{}, s3err.GetNoSuchUploadErr(*upi.UploadId)
}
if isErrNameTooLong(err) {
return s3response.CopyPartResult{}, s3err.GetKeyTooLongErr(int64(len(*upi.Key)), 1024)
}
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("stat uploadid: %w", err)
}
partPath := filepath.Join(objdir, *upi.UploadId, fmt.Sprintf("%v", *upi.PartNumber))
srcBucket, srcObject, srcVersionId, err := backend.ParseCopySource(*upi.CopySource)
if err != nil {
return s3response.CopyPartResult{}, err
}
if err := p.validateVersionId(srcVersionId); err != nil {
return s3response.CopyPartResult{}, err
}
_, err = os.Stat(srcBucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyPartResult{}, s3err.GetBucketErr(s3err.ErrNoSuchBucket, srcBucket)
}
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("stat bucket: %w", err)
}
if upi.ExpectedSourceBucketOwner != nil && *upi.ExpectedSourceBucketOwner != "" {
aclData, err := p.meta.RetrieveAttribute(nil, srcBucket, "", aclkey)
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("get src bucket acl: %w", err)
}
srcAcl, err := auth.ParseACL(aclData)
if err != nil {
return s3response.CopyPartResult{}, err
}
if srcAcl.Owner != *upi.ExpectedSourceBucketOwner {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrAccessDenied)
}
}
vStatus, err := p.getBucketVersioningStatus(ctx, srcBucket)
if err != nil {
return s3response.CopyPartResult{}, err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)
if srcVersionId != "" {
if !p.versioningEnabled() || !vEnabled {
return s3response.CopyPartResult{}, s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, srcVersionId)
}
vId, err := p.meta.RetrieveAttribute(nil, srcBucket, srcObject, versionIdKey)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.CopyPartResult{}, fmt.Errorf("get src object version id: %w", err)
}
if string(vId) != srcVersionId {
srcBucket = filepath.Join(p.versioningDir, srcBucket)
srcObject = filepath.Join(genObjVersionKey(srcObject), srcVersionId)
}
}
objPath := filepath.Join(srcBucket, srcObject)
fi, err := os.Stat(objPath)
if errors.Is(err, fs.ErrNotExist) {
if p.versioningEnabled() && vEnabled {
return s3response.CopyPartResult{}, s3err.GetNoSuchVersionErr(srcObject, srcVersionId)
}
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if isErrNameTooLong(err) {
return s3response.CopyPartResult{}, s3err.GetKeyTooLongErr(int64(len(srcObject)), 1024)
}
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("stat object: %w", err)
}
startOffset, length, err := backend.ParseCopySourceRange(fi.Size(), *upi.CopySourceRange)
if err != nil {
return s3response.CopyPartResult{}, err
}
if length > p.copyObjectThreshold {
return s3response.CopyPartResult{}, s3err.GetCopySourceObjectTooLargeErr(p.copyObjectThreshold)
}
srcf, err := os.Open(objPath)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("open object: %w", err)
}
defer srcf.Close()
// evaluate preconditions
b, err := p.meta.RetrieveAttribute(srcf, srcBucket, srcObject, etagkey)
srcEtag := string(b)
if err != nil {
srcEtag = ""
}
err = backend.EvaluatePreconditions(srcEtag, fi.ModTime(), backend.PreConditions{
IfMatch: upi.CopySourceIfMatch,
IfNoneMatch: upi.CopySourceIfNoneMatch,
IfModSince: upi.CopySourceIfModifiedSince,
IfUnmodeSince: upi.CopySourceIfUnmodifiedSince,
})
if err != nil {
return s3response.CopyPartResult{}, err
}
f, err := p.openTmpFile(filepath.Join(*upi.Bucket, objdir),
*upi.Bucket, partPath, length, acct, doFalloc, p.forceNoTmpFile)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
if errors.Is(err, syscall.ENOSPC) {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrNoSpaceLeftOnDevice)
}
return s3response.CopyPartResult{}, fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
rdr := io.NewSectionReader(srcf, startOffset, length)
hash := md5.New()
tr := io.TeeReader(rdr, hash)
mpChecksums, err := p.retrieveChecksums(nil, *upi.Bucket, filepath.Join(objdir, *upi.UploadId))
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.CopyPartResult{}, fmt.Errorf("retrieve mp checksums: %w", err)
}
checksums, err := p.retrieveChecksums(nil, objPath, "")
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.CopyPartResult{}, fmt.Errorf("retrieve object part checksums: %w", err)
}
// TODO: Should the checksum be recalculated or just copied ?
var hashRdr *utils.HashReader
var crc64nvmeRdr *utils.HashReader
if mpChecksums.Algorithm != "" {
if checksums.Algorithm == "" || mpChecksums.Algorithm != checksums.Algorithm {
hashRdr, err = utils.NewHashReader(tr, "", utils.HashType(strings.ToLower(string(mpChecksums.Algorithm))))
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("initialize hash reader: %w", err)
}
tr = hashRdr
}
} else {
// if no checksum has been specified on multipart upload initiation
// create an internal crc64nvme reader to calculate and stored the internal crc64nvme
crc64nvmeRdr, err = utils.NewHashReader(tr, "", utils.HashTypeCRC64NVME)
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("initialize internal crc64nvme reader: %w", err)
}
tr = crc64nvmeRdr
}
_, err = io.Copy(f, tr)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
if errors.Is(err, syscall.ENOSPC) {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrNoSpaceLeftOnDevice)
}
return s3response.CopyPartResult{}, fmt.Errorf("copy part data: %w", err)
}
if checksums.Algorithm != "" {
if mpChecksums.Algorithm == "" {
checksums = s3response.Checksum{}
} else {
if hashRdr == nil {
err := p.storeChecksums(f.File(), *upi.Bucket, partPath, checksums)
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("store part checksum: %w", err)
}
}
}
}
if hashRdr != nil {
algo := types.ChecksumAlgorithm(strings.ToUpper(string(hashRdr.Type())))
checksums = s3response.Checksum{
Algorithm: algo,
}
sum := hashRdr.Sum()
setStoredChecksum(&checksums, algo, &sum)
err := p.storeChecksums(f.File(), *upi.Bucket, partPath, checksums)
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("store part checksum: %w", err)
}
}
if crc64nvmeRdr != nil {
// store the internal crc64nvme
internalCrc64NvmeSum := crc64nvmeRdr.Sum()
err := p.meta.StoreAttribute(f.File(), *upi.Bucket, partPath, partCrc64nvme, []byte(internalCrc64NvmeSum))
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("store part internal crc64nvme: %w", err)
}
}
etag := backend.GenerateEtag(hash)
err = p.meta.StoreAttribute(f.File(), *upi.Bucket, partPath, etagkey, []byte(etag))
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("set etag attr: %w", err)
}
err = f.link()
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("link object in namespace: %w", err)
}
fi, err = os.Stat(filepath.Join(*upi.Bucket, partPath))
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("stat part path: %w", err)
}
return s3response.CopyPartResult{
ETag: &etag,
LastModified: fi.ModTime(),
CopySourceVersionId: srcVersionId,
ChecksumCRC32: checksums.CRC32,
ChecksumCRC32C: checksums.CRC32C,
ChecksumSHA1: checksums.SHA1,
ChecksumSHA256: checksums.SHA256,
ChecksumCRC64NVME: checksums.CRC64NVME,
ChecksumSHA512: checksums.SHA512,
ChecksumMD5: checksums.MD5,
ChecksumXXHASH64: checksums.XXHASH64,
ChecksumXXHASH3: checksums.XXHASH3,
ChecksumXXHASH128: checksums.XXHASH128,
}, nil
}
// getEmptyChecksumValue returns the base64-encoded checksum
// for an empty payload for the given algorithm defaulting to crc64nvme
func getEmptyChecksumValue(algo types.ChecksumAlgorithm) string {
switch algo {
case types.ChecksumAlgorithmCrc32:
return "AAAAAA=="
case types.ChecksumAlgorithmCrc32c:
return "AAAAAA=="
case types.ChecksumAlgorithmSha1:
return "2jmj7l5rSw0yVb/vlWAYkK/YBwk="
case types.ChecksumAlgorithmSha256:
return "47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU="
case types.ChecksumAlgorithmSha512:
return "z4PhNX7vuL3xVChQ1m2AB9Yg5AULVxXcg/SpIdNs6c5H0NE8XYXysP+DGNKHfuwvY7kxvUdBeoGlODJ6+SfaPg=="
case types.ChecksumAlgorithmMd5:
return "1B2M2Y8AsgTpgAmY7PhCfg=="
case types.ChecksumAlgorithmXxhash64:
return "70bbN1HY6Zk="
case types.ChecksumAlgorithmXxhash3:
return "LQaABTjTlMI="
case types.ChecksumAlgorithmXxhash128:
return "maoG0wFHmNhgAcMkRo1Jfw=="
default:
// default to crc64nvme
return "AAAAAAAAAAA="
}
}
func (p *Posix) PutObject(ctx context.Context, po s3response.PutObjectInput) (s3response.PutObjectOutput, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.PutObjectOutput{}, err
}
defer release()
return p.PutObjectWithPostFunc(ctx, po, func(*os.File) error { return nil })
}
func (p *Posix) PutObjectWithPostFunc(ctx context.Context, po s3response.PutObjectInput, postprocess func(f *os.File) error) (s3response.PutObjectOutput, error) {
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
if po.Key == nil {
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if !p.isBucketValid(*po.Bucket) {
return s3response.PutObjectOutput{}, s3err.GetBucketErr(s3err.ErrInvalidBucketName, *po.Bucket)
}
_, err := os.Stat(*po.Bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.PutObjectOutput{}, s3err.GetBucketErr(s3err.ErrNoSuchBucket, *po.Bucket)
}
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("stat bucket: %w", err)
}
tags, err := backend.ParseObjectTags(getString(po.Tagging))
if err != nil {
return s3response.PutObjectOutput{}, err
}
name := filepath.Join(*po.Bucket, *po.Key)
// evaluate preconditions
etagBytes, err := p.meta.RetrieveAttribute(nil, *po.Bucket, *po.Key, etagkey)
if err == nil || errors.Is(err, fs.ErrNotExist) || errors.Is(err, meta.ErrNoSuchKey) {
err = backend.EvaluateObjectPutPreconditions(string(etagBytes), po.IfMatch, po.IfNoneMatch, err == nil)
if err != nil {
return s3response.PutObjectOutput{}, err
}
}
uid, gid, doChown := p.getChownIDs(acct)
contentLength := int64(0)
if po.ContentLength != nil {
contentLength = *po.ContentLength
}
chRdr, chunkUpload := po.Body.(middlewares.ChecksumReader)
isTrailingChecksum := chunkUpload && chRdr.Algorithm() != ""
checksumAlgorithm := po.ChecksumAlgorithm
checksumValue := ""
if isTrailingChecksum {
checksumAlgorithm = types.ChecksumAlgorithm(strings.ToUpper(chRdr.Algorithm()))
checksumValue = chRdr.Checksum()
} else {
hashConfigs := []hashConfig{
{po.ChecksumCRC32, utils.HashTypeCRC32},
{po.ChecksumCRC32C, utils.HashTypeCRC32C},
{po.ChecksumSHA1, utils.HashTypeSha1},
{po.ChecksumSHA256, utils.HashTypeSha256},
{po.ChecksumCRC64NVME, utils.HashTypeCRC64NVME},
{po.ChecksumSHA512, utils.HashTypeSha512},
{po.ChecksumMD5, utils.HashTypeMd5},
{po.ChecksumXXHASH64, utils.HashTypeXXHASH64},
{po.ChecksumXXHASH3, utils.HashTypeXXHASH3},
{po.ChecksumXXHASH128, utils.HashTypeXXHASH128},
}
for _, config := range hashConfigs {
if config.value != nil {
checksumAlgorithm = types.ChecksumAlgorithm(strings.ToUpper(string(config.hashType)))
checksumValue = *config.value
break
}
}
if checksumAlgorithm == "" {
// default to crc64nvme
checksumAlgorithm = types.ChecksumAlgorithmCrc64nvme
}
}
if strings.HasSuffix(*po.Key, "/") {
// object is directory
if contentLength != 0 {
// posix directories can't contain data, send error
// if reuests has a data payload associated with a
// directory object
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrDirectoryObjectContainsData)
}
err = backend.MkdirAll(name, uid, gid, doChown, p.newDirPerm)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
if errors.Is(err, syscall.ENOSPC) {
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSpaceLeftOnDevice)
}
return s3response.PutObjectOutput{}, err
}
err = p.storeObjectMetadata(nil, *po.Bucket, *po.Key, po.Metadata)
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set object metadata: %w", err)
}
// Set object tagging
if tags != nil {
err := p.PutObjectTagging(withCtxNoSlot(ctx), *po.Bucket, *po.Key, "", tags)
if err != nil {
return s3response.PutObjectOutput{}, err
}
}
// set etag attribute to signify this dir was specifically put
err = p.meta.StoreAttribute(nil, *po.Bucket, *po.Key, etagkey,
[]byte(emptyMD5))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set etag attr: %w", err)
}
// set "application/x-directory" content-type
err = p.meta.StoreAttribute(nil, *po.Bucket, *po.Key, contentTypeHdr,
[]byte(backend.DirContentType))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set content-type attr: %w", err)
}
if getString(po.WebsiteRedirectLocation) != "" {
err = p.meta.StoreAttribute(nil, *po.Bucket, *po.Key, websiteRedirectHdr,
[]byte(*po.WebsiteRedirectLocation))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set website-redirect-location attr: %w", err)
}
}
expectedSum := getEmptyChecksumValue(checksumAlgorithm)
if checksumValue != "" && expectedSum != checksumValue {
return s3response.PutObjectOutput{}, s3err.GetChecksumBadDigestErr(checksumAlgorithm)
}
// set empty checksum
checksum := s3response.Checksum{
Type: types.ChecksumTypeFullObject,
Algorithm: checksumAlgorithm,
}
setStoredChecksum(&checksum, checksumAlgorithm, &expectedSum)
err = p.storeChecksums(nil, *po.Bucket, *po.Key, checksum)
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("store checksum: %w", err)
}
// for directory object no version is created
return s3response.PutObjectOutput{
ETag: emptyMD5,
Size: &contentLength,
ChecksumType: checksum.Type,
ChecksumCRC32: checksum.CRC32,
ChecksumCRC32C: checksum.CRC32C,
ChecksumCRC64NVME: checksum.CRC64NVME,
ChecksumSHA1: checksum.SHA1,
ChecksumSHA256: checksum.SHA256,
ChecksumSHA512: checksum.SHA512,
ChecksumMD5: checksum.MD5,
ChecksumXXHASH64: checksum.XXHASH64,
ChecksumXXHASH3: checksum.XXHASH3,
ChecksumXXHASH128: checksum.XXHASH128,
}, nil
}
vStatus, err := p.getBucketVersioningStatus(ctx, *po.Bucket)
if err != nil {
return s3response.PutObjectOutput{}, err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)
// object is file
d, err := os.Stat(name)
if err == nil && d.IsDir() {
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrExistingObjectIsDirectory)
}
// if the versioning is enabled first create the file object version
if p.versioningEnabled() && vStatus != "" && err == nil {
var isVersionIdMissing bool
if p.isBucketVersioningSuspended(vStatus) {
vIdBytes, err := p.meta.RetrieveAttribute(nil, *po.Bucket, *po.Key, versionIdKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.PutObjectOutput{}, fmt.Errorf("get object versionId: %w", err)
}
isVersionIdMissing = len(vIdBytes) == 0
}
if !isVersionIdMissing {
_, err := p.createObjVersion(*po.Bucket, *po.Key, d.Size(), acct, false)
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("create object version: %w", err)
}
// With path-based metadata backends (e.g. sidecar), object-lock
// attributes written on the previous version persist at this path
// after createObjVersion because metadata is not replaced atomically
// the way xattrs are on file rename. Delete them so they do not
// bleed into the new version.
_ = p.meta.DeleteAttribute(*po.Bucket, *po.Key, objectLegalHoldKey)
_ = p.meta.DeleteAttribute(*po.Bucket, *po.Key, objectRetentionKey)
}
}
if isErrNameTooLong(err) {
return s3response.PutObjectOutput{}, s3err.GetKeyTooLongErr(int64(len(*po.Key)), 1024)
}
if isErrNotDir(err) {
parentErr := handleParentDirError(name)
if parentErr != nil {
return s3response.PutObjectOutput{}, parentErr
}
}
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return s3response.PutObjectOutput{}, fmt.Errorf("stat object: %w", err)
}
f, err := p.openTmpFile(filepath.Join(*po.Bucket, MetaTmpDir),
*po.Bucket, *po.Key, contentLength, acct, doFalloc, p.forceNoTmpFile)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
drainBody(po.Body)
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
if errors.Is(err, syscall.ENOSPC) {
drainBody(po.Body)
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSpaceLeftOnDevice)
}
return s3response.PutObjectOutput{}, fmt.Errorf("open temp file: %w", err)
}
defer f.cleanup()
objsize := f.size
hash := md5.New()
rdr := io.TeeReader(po.Body, hash)
var hashRdr *utils.HashReader
if !isTrailingChecksum {
hashRdr, err = utils.NewHashReader(rdr, checksumValue, utils.HashType(strings.ToLower(string(checksumAlgorithm))))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("initialize hash reader: %w", err)
}
rdr = hashRdr
}
_, err = io.Copy(f, rdr)
if err != nil {
if errors.Is(err, syscall.EDQUOT) {
drainBody(rdr)
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrQuotaExceeded)
}
if errors.Is(err, syscall.ENOSPC) {
drainBody(rdr)
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSpaceLeftOnDevice)
}
// Return the error itself, if it implements the s3err.S3Error interface
if _, ok := err.(s3err.S3Error); ok {
return s3response.PutObjectOutput{}, err
}
return s3response.PutObjectOutput{}, fmt.Errorf("write object data: %w", err)
}
// If the file was pre-allocated (via fallocate) to a size larger than the
// bytes actually written, truncate it to the real content size. This can
// happen when Content-Length includes epilogue bytes after the multipart
// final boundary (e.g. browser-based POST Object with trailing data).
if f.size > 0 {
objsize -= f.size
if err := f.File().Truncate(objsize); err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("truncate object to actual size: %w", err)
}
}
dir := filepath.Dir(name)
if dir != "" {
err = backend.MkdirAll(dir, uid, gid, doChown, p.newDirPerm)
if err != nil {
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrExistingObjectIsDirectory)
}
}
etag := backend.GenerateEtag(hash)
// if the versioning is enabled, generate a new versionID for the object
var versionID string
if p.versioningEnabled() && vEnabled {
versionID = ulid.Make().String()
}
// Before finalizing the object creation remove
// null versionId object from versioning directory
// if it exists and the versioning status is Suspended
if p.isBucketVersioningSuspended(vStatus) {
err = p.deleteNullVersionIdObject(*po.Bucket, *po.Key)
if err != nil {
return s3response.PutObjectOutput{}, err
}
versionID = nullVersionId
// Clear any stale versionId sidecar attribute left from a previous
// versioned object at this path. With xattr this is implicit (the
// new file carries only the attrs set on the tmpfile), but with
// path-based metadata the old attr persists until explicitly deleted.
_ = p.meta.DeleteAttribute(*po.Bucket, *po.Key, versionIdKey)
}
var sum string
if isTrailingChecksum {
sum = chRdr.Checksum()
} else {
sum = hashRdr.Sum()
}
checksum := s3response.Checksum{
Type: types.ChecksumTypeFullObject,
Algorithm: checksumAlgorithm,
}
setStoredChecksum(&checksum, checksumAlgorithm, &sum)
err = p.storeChecksums(f.File(), *po.Bucket, *po.Key, checksum)
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("store checksum: %w", err)
}
err = p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, etagkey, []byte(etag))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set etag attr: %w", err)
}
err = p.storeObjectMetaProperties(f.File(), *po.Bucket, *po.Key,
metaProperties{
ContentType: po.ContentType,
ContentEncoding: po.ContentEncoding,
ContentLanguage: po.ContentLanguage,
ContentDisposition: po.ContentDisposition,
CacheControl: po.CacheControl,
Expires: po.Expires,
WebsiteRedirectLocation: po.WebsiteRedirectLocation,
Metadata: po.Metadata,
})
if err != nil {
return s3response.PutObjectOutput{}, err
}
if versionID != "" && versionID != nullVersionId {
err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, versionIdKey, []byte(versionID))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set versionId attr: %w", err)
}
}
err = postprocess(f.File())
if err != nil {
return s3response.PutObjectOutput{},
fmt.Errorf("put object post process failed: %w", err)
}
err = f.link()
if errors.Is(err, syscall.EEXIST) {
return s3response.PutObjectOutput{
ETag: etag,
VersionID: versionID,
}, nil
}
if err != nil {
return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrExistingObjectIsDirectory)
}
// Set object tagging
if tags != nil {
err := p.PutObjectTagging(withCtxNoSlot(ctx), *po.Bucket, *po.Key, "", tags)
if errors.Is(err, fs.ErrNotExist) {
return s3response.PutObjectOutput{
ETag: etag,
VersionID: versionID,
}, nil
}
if err != nil {
return s3response.PutObjectOutput{}, err
}
}
// Set object legal hold
if po.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn {
err := p.PutObjectLegalHold(withCtxNoSlot(ctx), *po.Bucket, *po.Key, "", true)
if err != nil {
if errors.Is(err, s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)) {
err = s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
return s3response.PutObjectOutput{}, err
}
}
// Set object retention
if po.ObjectLockMode != "" {
retention := types.ObjectLockRetention{
Mode: types.ObjectLockRetentionMode(po.ObjectLockMode),
RetainUntilDate: po.ObjectLockRetainUntilDate,
}
retParsed, err := json.Marshal(retention)
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("parse object lock retention: %w", err)
}
err = p.PutObjectRetention(withCtxNoSlot(ctx), *po.Bucket, *po.Key, "", retParsed)
if err != nil {
if errors.Is(err, s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)) {
err = s3err.GetAPIError(s3err.ErrMissingObjectLockConfigurationNoSpaces)
}
return s3response.PutObjectOutput{}, err
}
}
return s3response.PutObjectOutput{
ETag: etag,
VersionID: versionID,
ChecksumCRC32: checksum.CRC32,
ChecksumCRC32C: checksum.CRC32C,
ChecksumSHA1: checksum.SHA1,
ChecksumSHA256: checksum.SHA256,
ChecksumCRC64NVME: checksum.CRC64NVME,
ChecksumSHA512: checksum.SHA512,
ChecksumMD5: checksum.MD5,
ChecksumXXHASH64: checksum.XXHASH64,
ChecksumXXHASH3: checksum.XXHASH3,
ChecksumXXHASH128: checksum.XXHASH128,
Size: &objsize,
ChecksumType: checksum.Type,
}, nil
}
func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if input.Key == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
bucket := *input.Bucket
object := *input.Key
isDir := strings.HasSuffix(object, "/")
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
if err := p.validateVersionId(backend.GetStringFromPtr(input.VersionId)); err != nil {
return nil, err
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
objpath := filepath.Join(bucket, object)
vStatus, err := p.getBucketVersioningStatus(ctx, bucket)
if err != nil {
return nil, err
}
evalPreconditions := func(f os.FileInfo, bucket, object string) error {
var err error
if f == nil {
f, err = os.Stat(filepath.Join(bucket, object))
if err != nil {
return nil
}
}
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
}
// evaluate preconditions
return backend.EvaluateObjectDeletePreconditions(etag, f.ModTime(), f.Size(),
backend.ObjectDeletePreconditions{
IfMatch: input.IfMatch,
IfMatchLastModTime: input.IfMatchLastModifiedTime,
IfMatchSize: input.IfMatchSize,
})
}
// Directory objects can't have versions
if !isDir && p.versioningEnabled() && vStatus != "" {
if getString(input.VersionId) == "" {
// if the versionId is not specified, make the current version a delete marker
fi, err := os.Stat(objpath)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
// AWS returns success if the object does not exist
return &s3.DeleteObjectOutput{}, nil
}
if isErrNameTooLong(err) {
return nil, s3err.GetKeyTooLongErr(int64(len(object)), 1024)
}
if err != nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
err = evalPreconditions(fi, bucket, object)
if err != nil {
return nil, err
}
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
// Get object versionId
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) && !errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("get obj versionId: %w", err)
}
if errors.Is(err, meta.ErrNoSuchKey) {
vId = []byte(nullVersionId)
}
// Creates a new object version in the versioning directory
if p.isBucketVersioningEnabled(vStatus) || string(vId) != nullVersionId {
_, err = p.createObjVersion(bucket, object, fi.Size(), acct, true)
if err != nil {
return nil, err
}
}
// Mark the object as a delete marker
err = p.meta.StoreAttribute(nil, bucket, object, deleteMarkerKey, []byte{})
if err != nil {
return nil, fmt.Errorf("set delete marker: %w", err)
}
versionId := nullVersionId
if p.isBucketVersioningEnabled(vStatus) {
// Generate & set a unique versionId for the delete marker
versionId = ulid.Make().String()
err = p.meta.StoreAttribute(nil, bucket, object, versionIdKey, []byte(versionId))
if err != nil {
return nil, fmt.Errorf("set versionId: %w", err)
}
} else {
err = p.meta.DeleteAttribute(bucket, object, versionIdKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("delete versionId: %w", err)
}
}
return &s3.DeleteObjectOutput{
DeleteMarker: getBoolPtr(true),
VersionId: &versionId,
}, nil
} else {
versionPath := p.genObjVersionPath(bucket, object)
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
// AWS returns success if the object does not exist
return &s3.DeleteObjectOutput{
VersionId: input.VersionId,
}, nil
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get obj versionId: %w", err)
}
if errors.Is(err, meta.ErrNoSuchKey) {
// With sidecar, ErrNoSuchKey means "attribute absent" regardless of
// whether the data file exists. If the file is absent the object
// does not exist at all → AWS returns success for DeleteObject.
// Also handle ENOTDIR: when a key such as "foo/bar" is requested
// but "foo" is a regular file (not a directory), the path cannot
// contain any object.
_, statErr := os.Stat(filepath.Join(bucket, object))
if errors.Is(statErr, fs.ErrNotExist) || isErrNotDir(statErr) {
return &s3.DeleteObjectOutput{VersionId: input.VersionId}, nil
}
vId = []byte(nullVersionId)
}
if string(vId) == *input.VersionId {
// evaluate preconditions
err := evalPreconditions(nil, bucket, object)
if err != nil {
return nil, err
}
// if the specified VersionId is the same as in the latest version,
// remove the latest version, find the latest version from the versioning
// directory and move to the place of the deleted object, to make it the latest
isDelMarker, err := p.isObjDeleteMarker(bucket, object)
if err != nil {
return nil, err
}
err = os.Remove(objpath)
if err != nil {
return nil, fmt.Errorf("remove obj version: %w", err)
}
ents, err := os.ReadDir(versionPath)
if errors.Is(err, fs.ErrNotExist) {
p.removeParents(bucket, object)
return &s3.DeleteObjectOutput{
DeleteMarker: &isDelMarker,
VersionId: input.VersionId,
}, nil
}
if err != nil {
return nil, fmt.Errorf("read version dir: %w", err)
}
if len(ents) == 0 {
p.removeParents(bucket, object)
return &s3.DeleteObjectOutput{
DeleteMarker: &isDelMarker,
VersionId: input.VersionId,
}, nil
}
srcObjVersion, err := ents[len(ents)-1].Info()
if err != nil {
return nil, fmt.Errorf("get file info: %w", err)
}
srcVersionId := srcObjVersion.Name()
sf, err := os.Open(filepath.Join(versionPath, srcVersionId))
if err != nil {
return nil, fmt.Errorf("open obj version: %w", err)
}
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
}
f, err := p.openTmpFile(filepath.Join(bucket, MetaTmpDir),
bucket, object, srcObjVersion.Size(), acct, doFalloc,
p.forceNoTmpFile)
if err != nil {
return nil, fmt.Errorf("open tmp file: %w", err)
}
defer f.cleanup()
_, err = io.Copy(f, sf)
if err != nil {
_ = sf.Close()
return nil, fmt.Errorf("copy object %w", err)
}
if err := sf.Close(); err != nil {
return nil, fmt.Errorf("close obj version: %w", err)
}
if err := f.link(); err != nil {
return nil, fmt.Errorf("link tmp file: %w", err)
}
// With path-based metadata (sidecar) the live object's attrs are
// not replaced atomically. The restored version may not have all
// the attrs that the deleted version had (e.g. a null version has
// no versionIdKey). Clear attrs that belong to the deleted version
// before copying the restored version's attrs so that the restored
// version presents a clean state.
_ = p.meta.DeleteAttribute(bucket, object, versionIdKey)
_ = p.meta.DeleteAttribute(bucket, object, deleteMarkerKey)
attrs, err := p.meta.ListAttributes(versionPath, srcVersionId)
if err != nil {
return nil, fmt.Errorf("list object attributes: %w", err)
}
for _, attr := range attrs {
data, err := p.meta.RetrieveAttribute(nil, versionPath, srcVersionId, attr)
if err != nil {
return nil, fmt.Errorf("load %v attribute", attr)
}
err = p.meta.StoreAttribute(nil, bucket, object, attr, data)
if err != nil {
return nil, fmt.Errorf("store %v attribute", attr)
}
}
err = os.Remove(filepath.Join(versionPath, srcVersionId))
if err != nil {
return nil, fmt.Errorf("remove obj version %w", err)
}
_ = p.meta.DeleteAttributes(versionPath, srcVersionId)
p.removeParents(filepath.Join(p.versioningDir, bucket), filepath.Join(genObjVersionKey(object), *input.VersionId))
return &s3.DeleteObjectOutput{
DeleteMarker: &isDelMarker,
VersionId: input.VersionId,
}, nil
}
err = evalPreconditions(nil, versionPath, *input.VersionId)
if err != nil {
return nil, err
}
isDelMarker, _ := p.isObjDeleteMarker(versionPath, *input.VersionId)
err = os.Remove(filepath.Join(versionPath, *input.VersionId))
if isErrNameTooLong(err) {
return nil, s3err.GetKeyTooLongErr(int64(len(object)), 1024)
}
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return nil, s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, *input.VersionId)
}
if err != nil {
return nil, fmt.Errorf("delete object: %w", err)
}
_ = p.meta.DeleteAttributes(versionPath, *input.VersionId)
p.removeParents(filepath.Join(p.versioningDir, bucket), filepath.Join(genObjVersionKey(object), *input.VersionId))
return &s3.DeleteObjectOutput{
DeleteMarker: &isDelMarker,
VersionId: input.VersionId,
}, nil
}
}
fi, err := os.Stat(objpath)
if isErrNameTooLong(err) {
return nil, s3err.GetKeyTooLongErr(int64(len(object)), 1024)
}
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
// AWS returns success if the object does not exist
return &s3.DeleteObjectOutput{}, nil
}
if err != nil {
return &s3.DeleteObjectOutput{}, fmt.Errorf("stat object: %w", err)
}
if strings.HasSuffix(object, "/") && !fi.IsDir() {
// requested object is expecting a directory with a trailing
// slash, but the object is not a directory. treat this as
// a non-existent object.
// AWS returns success if the object does not exist
return &s3.DeleteObjectOutput{}, nil
}
if !strings.HasSuffix(object, "/") && fi.IsDir() {
// requested object is expecting a file, but the object is a
// directory. treat this as a non-existent object.
// AWS returns success if the object does not exist
return &s3.DeleteObjectOutput{}, nil
}
err = evalPreconditions(fi, bucket, object)
if err != nil {
return nil, err
}
err = os.Remove(objpath)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if isErrDirNotEmpty(err) {
// If the directory object has been uploaded explicitly
// remove the directory object (remove the ETag)
_, err = p.meta.RetrieveAttribute(nil, objpath, "", etagkey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object etag: %w", err)
}
if errors.Is(err, meta.ErrNoSuchKey) {
return nil, s3err.GetAPIError(s3err.ErrDirectoryNotEmpty)
}
err = p.meta.DeleteAttribute(objpath, "", etagkey)
if err != nil {
return nil, fmt.Errorf("delete object etag: %w", err)
}
return &s3.DeleteObjectOutput{}, nil
}
if err != nil {
return nil, fmt.Errorf("delete object: %w", err)
}
err = p.meta.DeleteAttributes(bucket, object)
if err != nil {
return nil, fmt.Errorf("delete object attributes: %w", err)
}
p.removeParents(bucket, object)
return &s3.DeleteObjectOutput{}, nil
}
func (p *Posix) removeParents(bucket, object string) {
// this will remove all parent directories that were not
// specifically uploaded with a put object. we detect
// this with a special attribute to indicate these. stop
// at either the bucket or the first parent we encounter
// with the attribute, whichever comes first.
// Remove the last path separator for the directory objects
// to correctly detect the parent in the loop
objPath := strings.TrimSuffix(object, "/")
for {
parent := filepath.Dir(objPath)
if parent == string(filepath.Separator) || parent == "." {
// stop removing parents if we hit the bucket directory.
break
}
_, err := p.meta.RetrieveAttribute(nil, bucket, parent, etagkey)
if err == nil {
// a directory with a valid etag means this was specifically
// uploaded with a put object, so stop here and leave this
// directory in place.
break
}
err = os.Remove(filepath.Join(bucket, parent))
if err != nil {
break
}
objPath = parent
}
}
func (p *Posix) DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput) (s3response.DeleteResult, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.DeleteResult{}, err
}
defer release()
// delete object already checks bucket
delResult, errs := []types.DeletedObject{}, []types.Error{}
for _, obj := range input.Delete.Objects {
//TODO: Make the delete operation concurrent
// once concurrency is implemented, the posix rate limiter should
// be taken into account.
res, err := p.DeleteObject(withCtxNoSlot(ctx), &s3.DeleteObjectInput{
Bucket: input.Bucket,
Key: obj.Key,
VersionId: obj.VersionId,
})
if err == nil {
delEntity := types.DeletedObject{
Key: obj.Key,
DeleteMarker: res.DeleteMarker,
VersionId: obj.VersionId,
}
if delEntity.DeleteMarker != nil && *delEntity.DeleteMarker {
delEntity.DeleteMarkerVersionId = res.VersionId
}
delResult = append(delResult, delEntity)
} else {
serr, ok := err.(s3err.S3Error)
if ok {
errCode := serr.BaseError().Code
errMessage := serr.BaseError().Code
errs = append(errs, types.Error{
Key: obj.Key,
Code: &errCode,
Message: &errMessage,
})
} else {
errs = append(errs, types.Error{
Key: obj.Key,
Code: backend.GetPtrFromString("InternalError"),
Message: backend.GetPtrFromString(err.Error()),
})
}
}
}
return s3response.DeleteResult{
Deleted: delResult,
Error: errs,
}, nil
}
func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if input.Key == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
var versionId string
if input.VersionId != nil {
versionId = *input.VersionId
}
if err := p.validateVersionId(versionId); err != nil {
return nil, err
}
if !p.versioningEnabled() && versionId != "" {
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, versionId)
}
bucket := *input.Bucket
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
object := *input.Key
if versionId != "" {
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get obj versionId: %w", err)
}
if errors.Is(err, meta.ErrNoSuchKey) {
vId = []byte(nullVersionId)
}
if string(vId) != versionId {
bucket = filepath.Join(p.versioningDir, bucket)
object = filepath.Join(genObjVersionKey(object), versionId)
}
}
objPath := filepath.Join(bucket, object)
fid, err := os.Stat(objPath)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if versionId != "" {
return nil, s3err.GetNoSuchVersionErr(*input.Key, versionId)
}
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if isErrNameTooLong(err) {
return nil, s3err.GetKeyTooLongErr(int64(len(*input.Key)), 1024)
}
if err != nil {
return nil, fmt.Errorf("stat object: %w", err)
}
if strings.HasSuffix(object, "/") && !fid.IsDir() {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if !strings.HasSuffix(object, "/") && fid.IsDir() {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if fid.IsDir() {
// Only directories explicitly created via S3 (put-object with key ending
// in '/') have an etag attribute. Directories created incidentally on the
// filesystem or as parent directories during object upload should not be
// accessible via get-object.
_, derr := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
if errors.Is(derr, meta.ErrNoSuchKey) || errors.Is(derr, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if derr != nil {
return nil, fmt.Errorf("get dir etag: %w", derr)
}
}
if p.versioningEnabled() {
isDelMarker, err := p.isObjDeleteMarker(bucket, object)
if err != nil {
return nil, err
}
// if the specified object version is a delete marker, return MethodNotAllowed
if isDelMarker {
if versionId != "" {
err = s3err.GetAPIError(s3err.ErrMethodNotAllowed)
} else {
err = s3err.GetAPIError(s3err.ErrNoSuchKey)
}
return &s3.GetObjectOutput{
DeleteMarker: getBoolPtr(true),
LastModified: backend.GetTimePtr(fid.ModTime()),
}, err
}
}
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
}
// evaluate preconditions
err = backend.EvaluatePreconditions(etag, fid.ModTime(), backend.PreConditions{
IfMatch: input.IfMatch,
IfNoneMatch: input.IfNoneMatch,
IfModSince: input.IfModifiedSince,
IfUnmodeSince: input.IfUnmodifiedSince,
})
if err != nil {
return nil, err
}
if fid.IsDir() {
_, _, _, err := backend.ParseObjectRange(0, *input.Range)
if err != nil {
return nil, err
}
objMeta := p.loadObjectMetaProperties(nil, bucket, object, &fid)
var tagCount *int32
tags, err := p.getAttrTags(bucket, object, versionId)
if err != nil {
return nil, err
}
if len(tags) != 0 {
tgCount := int32(len(tags))
tagCount = &tgCount
}
var checksums s3response.Checksum
if input.ChecksumMode == types.ChecksumModeEnabled {
checksums, err = p.retrieveChecksums(nil, bucket, object)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object checksums: %w", err)
}
}
var length int64 = 0
return &s3.GetObjectOutput{
ChecksumCRC32: checksums.CRC32,
ChecksumCRC32C: checksums.CRC32C,
ChecksumSHA1: checksums.SHA1,
ChecksumSHA256: checksums.SHA256,
ChecksumCRC64NVME: checksums.CRC64NVME,
ChecksumSHA512: checksums.SHA512,
ChecksumMD5: checksums.MD5,
ChecksumXXHASH64: checksums.XXHASH64,
ChecksumXXHASH3: checksums.XXHASH3,
ChecksumXXHASH128: checksums.XXHASH128,
ChecksumType: checksums.Type,
AcceptRanges: backend.GetPtrFromString("bytes"),
ContentLength: &length,
ContentEncoding: objMeta.ContentEncoding,
ContentType: objMeta.ContentType,
ContentLanguage: objMeta.ContentLanguage,
ContentDisposition: objMeta.ContentDisposition,
CacheControl: objMeta.CacheControl,
ExpiresString: objMeta.Expires,
WebsiteRedirectLocation: objMeta.WebsiteRedirectLocation,
ETag: &etag,
LastModified: backend.GetTimePtr(fid.ModTime()),
Metadata: objMeta.Metadata,
TagCount: tagCount,
ContentRange: nil,
StorageClass: types.StorageClassStandard,
VersionId: &versionId,
}, nil
}
// If versioning is configured get the object versionId
if p.versioningEnabled() && versionId == "" {
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, meta.ErrNoSuchKey) {
versionId = nullVersionId
} else if err != nil {
return nil, err
}
versionId = string(vId)
}
// openForRead opens with FILE_SHARE_DELETE on Windows so that a concurrent
// DeleteObject can call os.Remove on this file while the GET response body
// is still being streamed. On POSIX, os.Open is sufficient.
f, err := openForRead(objPath)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return nil, fmt.Errorf("open object: %w", err)
}
fi, err := f.Stat()
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if versionId != "" {
return nil, s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, versionId)
}
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if isErrNameTooLong(err) {
return nil, s3err.GetKeyTooLongErr(int64(len(*input.Key)), 1024)
}
if err != nil {
return nil, fmt.Errorf("stat object: %w", err)
}
objSize := fi.Size()
if fi.IsDir() {
objSize = 0
}
var contentRange *string
var startOffset, length int64
var partsCount *int32
// If partNumber is requested and mp-metadata exists, serve that specific part.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the full
// object with no Content-Range; any other partNumber is out of range.
// Both range read and partNumber can't be used together.
if input.PartNumber != nil {
mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey)
if metaErr == nil {
mpMeta, err := backend.UnmarshalMpUploadMetadata(mpMetaBytes, false)
if err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetInvalidPartNumberRangeErr(totalParts, partNum)
}
// Parts holds cumulative sizes: Parts[i] = sum of sizes 1..i+1
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length = mpMeta.Parts[partNum-1] - startOffset
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, objSize))
} else if errors.Is(metaErr, meta.ErrNoSuchKey) {
// Non-multipart object: partNumber=1 means the whole object; anything
// higher is out of range
if *input.PartNumber > 1 {
return nil, s3err.GetInvalidPartNumberRangeErr(1, *input.PartNumber)
}
length = objSize
if objSize != 0 {
// if object size is 0, the whole object is served, no content range should be set
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes 0-%d/%d", objSize-1, objSize))
}
} else {
return nil, fmt.Errorf("retrieve mp metadata: %w", metaErr)
}
} else {
start, lgth, isValid, err := backend.ParseObjectRange(objSize, getString(input.Range))
if err != nil {
return nil, err
}
startOffset, length = start, lgth
if isValid {
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", start, start+lgth-1, objSize))
}
}
objMeta := p.loadObjectMetaProperties(f, bucket, object, &fi)
var tagCount *int32
tags, err := p.getAttrTags(bucket, object, versionId)
if err != nil {
return nil, err
}
if len(tags) != 0 {
tgCount := int32(len(tags))
tagCount = &tgCount
}
var checksums s3response.Checksum
// Return checksums only when the full object is requested
if input.ChecksumMode == types.ChecksumModeEnabled && length-startOffset == objSize {
checksums, err = p.retrieveChecksums(f, bucket, object)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object checksums: %w", err)
}
}
// using an os.File allows zero-copy sendfile via io.Copy(os.File, net.Conn)
var body io.ReadCloser = f
if startOffset != 0 || length != objSize {
rdr := io.NewSectionReader(f, startOffset, length)
body = &backend.FileSectionReadCloser{R: rdr, F: f}
}
return &s3.GetObjectOutput{
AcceptRanges: backend.GetPtrFromString("bytes"),
ContentLength: &length,
ContentEncoding: objMeta.ContentEncoding,
ContentType: objMeta.ContentType,
ContentDisposition: objMeta.ContentDisposition,
ContentLanguage: objMeta.ContentLanguage,
CacheControl: objMeta.CacheControl,
ExpiresString: objMeta.Expires,
WebsiteRedirectLocation: objMeta.WebsiteRedirectLocation,
ETag: &etag,
LastModified: backend.GetTimePtr(fi.ModTime()),
Metadata: objMeta.Metadata,
TagCount: tagCount,
ContentRange: contentRange,
StorageClass: types.StorageClassStandard,
VersionId: &versionId,
Body: body,
ChecksumCRC32: checksums.CRC32,
ChecksumCRC32C: checksums.CRC32C,
ChecksumSHA1: checksums.SHA1,
ChecksumSHA256: checksums.SHA256,
ChecksumCRC64NVME: checksums.CRC64NVME,
ChecksumSHA512: checksums.SHA512,
ChecksumMD5: checksums.MD5,
ChecksumXXHASH64: checksums.XXHASH64,
ChecksumXXHASH3: checksums.XXHASH3,
ChecksumXXHASH128: checksums.XXHASH128,
ChecksumType: checksums.Type,
PartsCount: partsCount,
}, nil
}
func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if input.Key == nil {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
versionId := backend.GetStringFromPtr(input.VersionId)
if err := p.validateVersionId(versionId); err != nil {
return nil, err
}
if !p.versioningEnabled() && versionId != "" {
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, versionId)
}
bucket := *input.Bucket
object := *input.Key
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
if versionId != "" {
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get obj versionId: %w", err)
}
if errors.Is(err, meta.ErrNoSuchKey) {
bucket = filepath.Join(p.versioningDir, bucket)
object = filepath.Join(genObjVersionKey(object), versionId)
}
if string(vId) != versionId {
bucket = filepath.Join(p.versioningDir, bucket)
object = filepath.Join(genObjVersionKey(object), versionId)
}
}
objPath := filepath.Join(bucket, object)
fi, err := os.Stat(objPath)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if versionId != "" {
return nil, s3err.GetNoSuchVersionErr(*input.Key, versionId)
}
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if isErrNameTooLong(err) {
return nil, s3err.GetKeyTooLongErr(int64(len(*input.Key)), 1024)
}
if err != nil {
return nil, fmt.Errorf("stat object: %w", err)
}
if strings.HasSuffix(object, "/") && !fi.IsDir() {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if !strings.HasSuffix(object, "/") && fi.IsDir() {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if fi.IsDir() {
// Only directories explicitly created via S3 (put-object with key ending
// in '/') have an etag attribute. Directories created incidentally on the
// filesystem or as parent directories during object upload should not be
// accessible via head-object.
_, derr := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
if errors.Is(derr, meta.ErrNoSuchKey) || errors.Is(derr, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if derr != nil {
return nil, fmt.Errorf("get dir etag: %w", derr)
}
}
if p.versioningEnabled() {
isDelMarker, err := p.isObjDeleteMarker(bucket, object)
if err != nil {
return nil, err
}
// if the specified object version is a delete marker, return MethodNotAllowed
if isDelMarker {
if versionId != "" {
return &s3.HeadObjectOutput{
DeleteMarker: getBoolPtr(true),
LastModified: backend.GetTimePtr(fi.ModTime()),
}, s3err.GetAPIError(s3err.ErrMethodNotAllowed)
} else {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
}
}
if p.versioningEnabled() && versionId == "" {
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object versionId: %v", err)
}
versionId = string(vId)
}
objMeta := p.loadObjectMetaProperties(nil, bucket, object, &fi)
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
}
// evaluate preconditions
err = backend.EvaluatePreconditions(etag, fi.ModTime(), backend.PreConditions{
IfMatch: input.IfMatch,
IfNoneMatch: input.IfNoneMatch,
IfModSince: input.IfModifiedSince,
IfUnmodeSince: input.IfUnmodifiedSince,
})
if err != nil {
return nil, err
}
size := fi.Size()
if fi.IsDir() {
size = 0
}
var contentRange *string
var startOffset, length int64
var partsCount *int32
// If partNumber is requested and mp-metadata exists, serve that specific part.
// For non-multipart objects (no mp-metadata), partNumber=1 returns the full
// object with no Content-Range; any other partNumber is out of range.
// Both range read and partNumber can't be used together.
if input.PartNumber != nil {
mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey)
if metaErr == nil {
mpMeta, err := backend.UnmarshalMpUploadMetadata(mpMetaBytes, false)
if err != nil {
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
}
partNum := *input.PartNumber
totalParts := int32(len(mpMeta.Parts))
partsCount = &totalParts
if partNum > totalParts {
return nil, s3err.GetInvalidPartNumberRangeErr(totalParts, partNum)
}
// Parts holds cumulative sizes: Parts[i] = sum of sizes 1..i+1
if partNum > 1 {
startOffset = mpMeta.Parts[partNum-2]
}
length = mpMeta.Parts[partNum-1] - startOffset
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %d-%d/%d", startOffset, startOffset+length-1, size))
} else if errors.Is(metaErr, meta.ErrNoSuchKey) {
// Non-multipart object: partNumber=1 means the whole object; anything
// higher is out of range
if *input.PartNumber > 1 {
return nil, s3err.GetInvalidPartNumberRangeErr(1, *input.PartNumber)
}
length = size
if length != 0 {
// if object size is 0, the whole object is served, no content range should be set
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes 0-%d/%d", length-1, length))
}
} else {
return nil, fmt.Errorf("retrieve mp metadata: %w", metaErr)
}
} else {
start, lgth, isValid, err := backend.ParseObjectRange(size, getString(input.Range))
if err != nil {
return nil, err
}
startOffset, length = start, lgth
if isValid {
contentRange = backend.GetPtrFromString(fmt.Sprintf("bytes %v-%v/%v", start, start+lgth-1, size))
}
}
var objectLockLegalHoldStatus types.ObjectLockLegalHoldStatus
status, err := p.GetObjectLegalHold(withCtxNoSlot(ctx), bucket, object, versionId)
if err == nil {
if *status {
objectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOn
} else {
objectLockLegalHoldStatus = types.ObjectLockLegalHoldStatusOff
}
}
var objectLockMode types.ObjectLockMode
var objectLockRetainUntilDate *time.Time
retention, err := p.GetObjectRetention(withCtxNoSlot(ctx), bucket, object, versionId)
if err == nil {
var config types.ObjectLockRetention
if err := json.Unmarshal(retention, &config); err == nil {
objectLockMode = types.ObjectLockMode(config.Mode)
objectLockRetainUntilDate = config.RetainUntilDate
}
}
var checksums s3response.Checksum
// Return checksums only when the full object is requested
if input.ChecksumMode == types.ChecksumModeEnabled && length-startOffset == size {
checksums, err = p.retrieveChecksums(nil, bucket, object)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get object checksums: %w", err)
}
}
var tagCount *int32
tags, err := p.getAttrTags(bucket, object, versionId)
if err != nil {
return nil, err
}
if len(tags) != 0 {
tc := int32(len(tags))
tagCount = &tc
}
return &s3.HeadObjectOutput{
ContentLength: &length,
AcceptRanges: backend.GetPtrFromString("bytes"),
ContentRange: contentRange,
ContentType: objMeta.ContentType,
ContentEncoding: objMeta.ContentEncoding,
ContentDisposition: objMeta.ContentDisposition,
ContentLanguage: objMeta.ContentLanguage,
CacheControl: objMeta.CacheControl,
ExpiresString: objMeta.Expires,
WebsiteRedirectLocation: objMeta.WebsiteRedirectLocation,
ETag: &etag,
LastModified: backend.GetTimePtr(fi.ModTime()),
Metadata: objMeta.Metadata,
ObjectLockLegalHoldStatus: objectLockLegalHoldStatus,
ObjectLockMode: objectLockMode,
ObjectLockRetainUntilDate: objectLockRetainUntilDate,
StorageClass: types.StorageClassStandard,
VersionId: &versionId,
ChecksumCRC32: checksums.CRC32,
ChecksumCRC32C: checksums.CRC32C,
ChecksumSHA1: checksums.SHA1,
ChecksumSHA256: checksums.SHA256,
ChecksumCRC64NVME: checksums.CRC64NVME,
ChecksumSHA512: checksums.SHA512,
ChecksumMD5: checksums.MD5,
ChecksumXXHASH64: checksums.XXHASH64,
ChecksumXXHASH3: checksums.XXHASH3,
ChecksumXXHASH128: checksums.XXHASH128,
ChecksumType: checksums.Type,
TagCount: tagCount,
PartsCount: partsCount,
}, nil
}
func (p *Posix) GetObjectAttributes(ctx context.Context, input *s3.GetObjectAttributesInput) (s3response.GetObjectAttributesResponse, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.GetObjectAttributesResponse{}, err
}
defer release()
data, err := p.HeadObject(withCtxNoSlot(ctx), &s3.HeadObjectInput{
Bucket: input.Bucket,
Key: input.Key,
VersionId: input.VersionId,
ChecksumMode: types.ChecksumModeEnabled,
})
if err != nil {
if errors.Is(err, s3err.GetAPIError(s3err.ErrMethodNotAllowed)) && data != nil {
return s3response.GetObjectAttributesResponse{
DeleteMarker: data.DeleteMarker,
VersionId: data.VersionId,
}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
return s3response.GetObjectAttributesResponse{}, err
}
return s3response.GetObjectAttributesResponse{
ETag: backend.TrimEtag(data.ETag),
ObjectSize: data.ContentLength,
StorageClass: data.StorageClass,
LastModified: data.LastModified,
VersionId: data.VersionId,
DeleteMarker: data.DeleteMarker,
Checksum: &types.Checksum{
ChecksumCRC32: data.ChecksumCRC32,
ChecksumCRC32C: data.ChecksumCRC32C,
ChecksumSHA1: data.ChecksumSHA1,
ChecksumSHA256: data.ChecksumSHA256,
ChecksumCRC64NVME: data.ChecksumCRC64NVME,
ChecksumSHA512: data.ChecksumSHA512,
ChecksumMD5: data.ChecksumMD5,
ChecksumXXHASH64: data.ChecksumXXHASH64,
ChecksumXXHASH3: data.ChecksumXXHASH3,
ChecksumXXHASH128: data.ChecksumXXHASH128,
ChecksumType: data.ChecksumType,
},
}, nil
}
func (p *Posix) CopyObject(ctx context.Context, input s3response.CopyObjectInput) (s3response.CopyObjectOutput, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
defer release()
if input.Key == nil {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrInvalidCopyDest)
}
if input.ExpectedBucketOwner == nil {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrInvalidRequest)
}
srcBucket, srcObject, srcVersionId, err := backend.ParseCopySource(*input.CopySource)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
if err := p.validateVersionId(srcVersionId); err != nil {
return s3response.CopyObjectOutput{}, err
}
if !p.isBucketValid(srcBucket) {
return s3response.CopyObjectOutput{}, s3err.GetBucketErr(s3err.ErrInvalidBucketName, srcBucket)
}
dstBucket := *input.Bucket
dstObject := *input.Key
if !p.isBucketValid(dstBucket) {
return s3response.CopyObjectOutput{}, s3err.GetBucketErr(s3err.ErrInvalidBucketName, dstBucket)
}
_, err = os.Stat(srcBucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyObjectOutput{}, s3err.GetBucketErr(s3err.ErrNoSuchBucket, srcBucket)
}
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("stat bucket: %w", err)
}
if input.ExpectedSourceBucketOwner != nil && *input.ExpectedSourceBucketOwner != "" {
aclData, err := p.meta.RetrieveAttribute(nil, srcBucket, "", aclkey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.CopyObjectOutput{}, fmt.Errorf("get src bucket acl: %w", err)
}
srcAcl, err := auth.ParseACL(aclData)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
if srcAcl.Owner != *input.ExpectedSourceBucketOwner {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrAccessDenied)
}
}
vStatus, err := p.getBucketVersioningStatus(ctx, srcBucket)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)
origSrcObject := srcObject
if srcVersionId != "" {
if !p.versioningEnabled() || !vEnabled {
return s3response.CopyObjectOutput{}, s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, srcVersionId)
}
vId, err := p.meta.RetrieveAttribute(nil, srcBucket, srcObject, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.CopyObjectOutput{}, fmt.Errorf("get src object version id: %w", err)
}
if string(vId) != srcVersionId {
srcBucket = joinPathWithTrailer(p.versioningDir, srcBucket)
srcObject = joinPathWithTrailer(genObjVersionKey(srcObject), srcVersionId)
}
}
_, err = os.Stat(dstBucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyObjectOutput{}, s3err.GetBucketErr(s3err.ErrNoSuchBucket, dstBucket)
}
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("stat bucket: %w", err)
}
objPath := joinPathWithTrailer(srcBucket, srcObject)
f, err := os.Open(objPath)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if p.versioningEnabled() && vEnabled {
return s3response.CopyObjectOutput{}, s3err.GetNoSuchVersionErr(origSrcObject, srcVersionId)
}
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if isErrNameTooLong(err) {
return s3response.CopyObjectOutput{}, s3err.GetKeyTooLongErr(int64(len(origSrcObject)), 1024)
}
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("open object: %w", err)
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("stat object: %w", err)
}
if strings.HasSuffix(srcObject, "/") && !fi.IsDir() {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if !strings.HasSuffix(srcObject, "/") && fi.IsDir() {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if fi.Size() > p.copyObjectThreshold {
return s3response.CopyObjectOutput{}, s3err.GetCopySourceObjectTooLargeErr(p.copyObjectThreshold)
}
b, err := p.meta.RetrieveAttribute(f, srcBucket, srcObject, etagkey)
srcEtag := string(b)
if err != nil {
srcEtag = ""
}
err = backend.EvaluatePreconditions(srcEtag, fi.ModTime(), backend.PreConditions{
IfMatch: input.CopySourceIfMatch,
IfNoneMatch: input.CopySourceIfNoneMatch,
IfModSince: input.CopySourceIfModifiedSince,
IfUnmodeSince: input.CopySourceIfUnmodifiedSince,
})
if err != nil {
return s3response.CopyObjectOutput{}, err
}
var etag string
var version *string
var crc32 *string
var crc32c *string
var sha1 *string
var sha256 *string
var crc64nvme *string
var sha512 *string
var md5sum *string
var xxhash64 *string
var xxhash3 *string
var xxhash128 *string
var chType types.ChecksumType
dstObjdPath := joinPathWithTrailer(dstBucket, dstObject)
if dstObjdPath == objPath {
if input.MetadataDirective == types.MetadataDirectiveCopy {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrInvalidCopyDest)
}
// Delete the object metadata
err = p.meta.DeleteAttribute(dstBucket, dstObject, metadataHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.CopyObjectOutput{}, fmt.Errorf("delete object metadata: %w", err)
}
checksums, err := p.retrieveChecksums(nil, dstBucket, dstObject)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.CopyObjectOutput{}, fmt.Errorf("get obj checksums: %w", err)
}
chType = checksums.Type
if input.ChecksumAlgorithm != "" {
// If a different checksum algorith is specified
// first caclculate and store the checksum
if checksums.Algorithm != input.ChecksumAlgorithm {
f, err := os.Open(dstObjdPath)
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("open obj file: %w", err)
}
defer f.Close()
hashReader, err := utils.NewHashReader(f, "", utils.HashType(strings.ToLower(string(input.ChecksumAlgorithm))))
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("initialize hash reader: %w", err)
}
_, err = hashReader.Read(nil)
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("read err: %w", err)
}
checksums = s3response.Checksum{}
sum := hashReader.Sum()
switch hashReader.Type() {
case utils.HashTypeCRC32:
checksums.CRC32 = &sum
crc32 = &sum
case utils.HashTypeCRC32C:
checksums.CRC32C = &sum
crc32c = &sum
case utils.HashTypeSha1:
checksums.SHA1 = &sum
sha1 = &sum
case utils.HashTypeSha256:
checksums.SHA256 = &sum
sha256 = &sum
case utils.HashTypeCRC64NVME:
checksums.CRC64NVME = &sum
crc64nvme = &sum
case utils.HashTypeSha512:
checksums.SHA512 = &sum
sha512 = &sum
case utils.HashTypeMd5:
checksums.MD5 = &sum
md5sum = &sum
case utils.HashTypeXXHASH64:
checksums.XXHASH64 = &sum
xxhash64 = &sum
case utils.HashTypeXXHASH3:
checksums.XXHASH3 = &sum
xxhash3 = &sum
case utils.HashTypeXXHASH128:
checksums.XXHASH128 = &sum
xxhash128 = &sum
}
// If a new checksum is calculated, the checksum type
// should be FULL_OBJECT
chType = types.ChecksumTypeFullObject
err = p.storeChecksums(f, dstBucket, dstObject, checksums)
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("store checksum: %w", err)
}
}
}
b, _ := p.meta.RetrieveAttribute(nil, dstBucket, dstObject, etagkey)
etag = string(b)
vId, _ := p.meta.RetrieveAttribute(nil, dstBucket, dstObject, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
version = backend.GetPtrFromString(string(vId))
// Store the provided object meta properties
err = p.storeObjectMetaProperties(nil, dstBucket, dstObject,
metaProperties{
ContentType: input.ContentType,
ContentEncoding: input.ContentEncoding,
ContentLanguage: input.ContentLanguage,
ContentDisposition: input.ContentDisposition,
CacheControl: input.CacheControl,
Expires: input.Expires,
WebsiteRedirectLocation: input.WebsiteRedirectLocation,
Metadata: input.Metadata,
})
if err != nil {
return s3response.CopyObjectOutput{}, err
}
// explicitly delete the website redirect location, as if it's not
// provided as CopyObject input, it should not be copied
if getString(input.WebsiteRedirectLocation) == "" {
err := p.meta.DeleteAttribute(dstBucket, dstObject, websiteRedirectHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.CopyObjectOutput{}, fmt.Errorf("delete website-redirect-location: %w", err)
}
}
if input.TaggingDirective == types.TaggingDirectiveReplace {
tags, err := backend.ParseObjectTags(getString(input.Tagging))
if err != nil {
return s3response.CopyObjectOutput{}, err
}
err = p.PutObjectTagging(withCtxNoSlot(ctx), dstBucket, dstObject, "", tags)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
}
} else {
contentLength := fi.Size()
checksums, err := p.retrieveChecksums(f, srcBucket, srcObject)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.CopyObjectOutput{}, fmt.Errorf("get obj checksum: %w", err)
}
// If any checksum algorithm is provided, replace, otherwise
// use the existing one
if input.ChecksumAlgorithm != "" {
checksums.Algorithm = input.ChecksumAlgorithm
}
putObjectInput := s3response.PutObjectInput{
Bucket: &dstBucket,
Key: &dstObject,
Body: f,
ContentLength: &contentLength,
ChecksumAlgorithm: checksums.Algorithm,
ContentType: input.ContentType,
ContentEncoding: input.ContentEncoding,
ContentDisposition: input.ContentDisposition,
ContentLanguage: input.ContentLanguage,
CacheControl: input.CacheControl,
Expires: input.Expires,
WebsiteRedirectLocation: input.WebsiteRedirectLocation,
Metadata: input.Metadata,
ObjectLockRetainUntilDate: input.ObjectLockRetainUntilDate,
ObjectLockMode: input.ObjectLockMode,
ObjectLockLegalHoldStatus: input.ObjectLockLegalHoldStatus,
}
// load and pass the source object meta properties, if metadata directive is "COPY"
if input.MetadataDirective != types.MetadataDirectiveReplace {
metaProps := p.loadObjectMetaProperties(nil, srcBucket, srcObject, &fi)
putObjectInput.ContentEncoding = metaProps.ContentEncoding
putObjectInput.ContentDisposition = metaProps.ContentDisposition
putObjectInput.ContentLanguage = metaProps.ContentLanguage
putObjectInput.ContentType = metaProps.ContentType
putObjectInput.CacheControl = metaProps.CacheControl
putObjectInput.Expires = metaProps.Expires
putObjectInput.Metadata = metaProps.Metadata
}
// pass the input tagging to PutObject, if tagging directive is "REPLACE"
if input.TaggingDirective == types.TaggingDirectiveReplace {
putObjectInput.Tagging = input.Tagging
}
res, err := p.PutObject(withCtxNoSlot(ctx), putObjectInput)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
// copy the source object tagging after the destination object
// creation, if tagging directive is "COPY"
if input.TaggingDirective == types.TaggingDirectiveCopy {
tagging, err := p.meta.RetrieveAttribute(nil, srcBucket, srcObject, tagHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.CopyObjectOutput{}, fmt.Errorf("get source object tagging: %w", err)
}
if err == nil {
err := p.meta.StoreAttribute(nil, dstBucket, dstObject, tagHdr, tagging)
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("set destination object tagging: %w", err)
}
}
}
etag = res.ETag
version = &res.VersionID
crc32 = res.ChecksumCRC32
crc32c = res.ChecksumCRC32C
sha1 = res.ChecksumSHA1
sha256 = res.ChecksumSHA256
crc64nvme = res.ChecksumCRC64NVME
sha512 = res.ChecksumSHA512
md5sum = res.ChecksumMD5
xxhash64 = res.ChecksumXXHASH64
xxhash3 = res.ChecksumXXHASH3
xxhash128 = res.ChecksumXXHASH128
chType = res.ChecksumType
}
fi, err = os.Stat(dstObjdPath)
if err != nil {
return s3response.CopyObjectOutput{}, fmt.Errorf("stat dst object: %w", err)
}
return s3response.CopyObjectOutput{
CopyObjectResult: &s3response.CopyObjectResult{
ETag: &etag,
LastModified: backend.GetTimePtr(fi.ModTime()),
ChecksumCRC32: crc32,
ChecksumCRC32C: crc32c,
ChecksumSHA1: sha1,
ChecksumSHA256: sha256,
ChecksumCRC64NVME: crc64nvme,
ChecksumSHA512: sha512,
ChecksumMD5: md5sum,
ChecksumXXHASH64: xxhash64,
ChecksumXXHASH3: xxhash3,
ChecksumXXHASH128: xxhash128,
ChecksumType: chType,
},
VersionId: version,
CopySourceVersionId: &srcVersionId,
}, nil
}
func (p *Posix) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s3response.ListObjectsResult, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.ListObjectsResult{}, err
}
defer release()
return p.ListObjectsParametrized(ctx, input, p.FileToObj)
}
func (p *Posix) ListObjectsParametrized(ctx context.Context, input *s3.ListObjectsInput, customFileToObj func(string, bool) backend.GetObjFunc) (s3response.ListObjectsResult, error) {
bucket := *input.Bucket
prefix := ""
if input.Prefix != nil {
prefix = *input.Prefix
}
marker := ""
if input.Marker != nil {
marker = *input.Marker
}
delim := ""
if input.Delimiter != nil {
delim = *input.Delimiter
}
maxkeys := int32(0)
if input.MaxKeys != nil {
maxkeys = *input.MaxKeys
}
if !p.isBucketValid(bucket) {
return s3response.ListObjectsResult{}, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.ListObjectsResult{}, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return s3response.ListObjectsResult{}, fmt.Errorf("stat bucket: %w", err)
}
fileSystem := os.DirFS(bucket)
results, err := backend.Walk(ctx, fileSystem, prefix, delim, marker, maxkeys,
customFileToObj(bucket, true), []string{MetaTmpDir})
if err != nil {
return s3response.ListObjectsResult{}, fmt.Errorf("walk %v: %w", bucket, err)
}
return s3response.ListObjectsResult{
CommonPrefixes: results.CommonPrefixes,
Contents: results.Objects,
Delimiter: backend.GetPtrFromString(delim),
Marker: backend.GetPtrFromString(marker),
NextMarker: backend.GetPtrFromString(results.NextMarker),
Prefix: backend.GetPtrFromString(prefix),
IsTruncated: &results.Truncated,
MaxKeys: &maxkeys,
Name: &bucket,
}, nil
}
func (p *Posix) FileToObj(bucket string, fetchOwner bool) backend.GetObjFunc {
if !p.isBucketValid(bucket) {
return func(string, fs.DirEntry) (s3response.Object, error) {
return s3response.Object{}, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
}
return func(path string, d fs.DirEntry) (s3response.Object, error) {
var owner *types.Owner
// Retrieve the object owner data from bucket ACL, if fetchOwner is true
// All the objects in the bucket are owned by the bucket owner
if fetchOwner {
aclJSON, err := p.meta.RetrieveAttribute(nil, bucket, "", aclkey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.Object{}, fmt.Errorf("get bucket acl: %w", err)
}
acl, err := auth.ParseACL(aclJSON)
if err != nil {
return s3response.Object{}, err
}
owner = &types.Owner{
ID: &acl.Owner,
}
}
if d.IsDir() {
// directory object only happens if directory empty
// check to see if this is a directory object by checking etag
etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey)
if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) {
return s3response.Object{}, backend.ErrSkipObj
}
if err != nil {
return s3response.Object{}, fmt.Errorf("get etag: %w", err)
}
etag := string(etagBytes)
fi, err := d.Info()
if errors.Is(err, fs.ErrNotExist) {
return s3response.Object{}, backend.ErrSkipObj
}
if err != nil {
return s3response.Object{}, fmt.Errorf("get fileinfo: %w", err)
}
size := int64(0)
mtime := fi.ModTime()
return s3response.Object{
ETag: &etag,
Key: &path,
LastModified: &mtime,
Size: &size,
StorageClass: types.ObjectStorageClassStandard,
Owner: owner,
}, nil
}
// If the object is a delete marker, skip
isDel, _ := p.isObjDeleteMarker(bucket, path)
if isDel {
return s3response.Object{}, backend.ErrSkipObj
}
// Retrieve the object checksum algorithm
checksums, err := p.retrieveChecksums(nil, bucket, path)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.Object{}, backend.ErrSkipObj
}
// file object, get object info and fill out object data
etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return s3response.Object{}, backend.ErrSkipObj
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.Object{}, fmt.Errorf("get etag: %w", err)
}
// note: meta.ErrNoSuchKey will return etagBytes = []byte{}
// so this will just set etag to "" if its not already set
etag := string(etagBytes)
fi, err := d.Info()
if errors.Is(err, fs.ErrNotExist) {
return s3response.Object{}, backend.ErrSkipObj
}
if err != nil {
return s3response.Object{}, fmt.Errorf("get fileinfo: %w", err)
}
size := fi.Size()
mtime := fi.ModTime()
return s3response.Object{
ETag: &etag,
Key: &path,
LastModified: &mtime,
Size: &size,
StorageClass: types.ObjectStorageClassStandard,
ChecksumAlgorithm: []types.ChecksumAlgorithm{checksums.Algorithm},
ChecksumType: checksums.Type,
Owner: owner,
}, nil
}
}
func (p *Posix) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input) (s3response.ListObjectsV2Result, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return s3response.ListObjectsV2Result{}, err
}
defer release()
return p.ListObjectsV2Parametrized(ctx, input, p.FileToObj)
}
func (p *Posix) ListObjectsV2Parametrized(ctx context.Context, input *s3.ListObjectsV2Input, customFileToObj func(string, bool) backend.GetObjFunc) (s3response.ListObjectsV2Result, error) {
bucket := *input.Bucket
prefix := ""
if input.Prefix != nil {
prefix = *input.Prefix
}
marker := ""
if input.ContinuationToken != nil {
if input.StartAfter != nil {
marker = max(*input.StartAfter, *input.ContinuationToken)
} else {
marker = *input.ContinuationToken
}
}
delim := ""
if input.Delimiter != nil {
delim = *input.Delimiter
}
maxkeys := int32(0)
if input.MaxKeys != nil {
maxkeys = *input.MaxKeys
}
var fetchOwner bool
if input.FetchOwner != nil {
fetchOwner = *input.FetchOwner
}
if !p.isBucketValid(bucket) {
return s3response.ListObjectsV2Result{}, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err := os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3response.ListObjectsV2Result{}, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return s3response.ListObjectsV2Result{}, fmt.Errorf("stat bucket: %w", err)
}
fileSystem := os.DirFS(bucket)
results, err := backend.Walk(ctx, fileSystem, prefix, delim, marker, maxkeys,
customFileToObj(bucket, fetchOwner), []string{MetaTmpDir})
if err != nil {
return s3response.ListObjectsV2Result{}, fmt.Errorf("walk %v: %w", bucket, err)
}
count := int32(len(results.Objects))
return s3response.ListObjectsV2Result{
CommonPrefixes: results.CommonPrefixes,
Contents: results.Objects,
IsTruncated: &results.Truncated,
MaxKeys: &maxkeys,
Name: &bucket,
KeyCount: &count,
Delimiter: backend.GetPtrFromString(delim),
ContinuationToken: backend.GetPtrFromString(marker),
NextContinuationToken: backend.GetPtrFromString(results.NextMarker),
Prefix: backend.GetPtrFromString(prefix),
StartAfter: backend.GetPtrFromString(*input.StartAfter),
}, nil
}
func (p *Posix) PutBucketAcl(ctx context.Context, bucket string, data []byte) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
err = p.meta.StoreAttribute(nil, bucket, "", aclkey, data)
if err != nil {
return fmt.Errorf("set acl: %w", err)
}
return nil
}
func (p *Posix) GetBucketAcl(ctx context.Context, input *s3.GetBucketAclInput) ([]byte, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if !p.isBucketValid(*input.Bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, *input.Bucket)
}
_, err = os.Stat(*input.Bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, *input.Bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
b, err := p.meta.RetrieveAttribute(nil, *input.Bucket, "", aclkey)
if errors.Is(err, meta.ErrNoSuchKey) {
return []byte{}, nil
}
if err != nil {
return nil, fmt.Errorf("get acl: %w", err)
}
return b, nil
}
func (p *Posix) PutBucketTagging(ctx context.Context, bucket string, tags map[string]string) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
if tags == nil {
err = p.meta.DeleteAttribute(bucket, "", tagHdr)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return fmt.Errorf("remove tags: %w", err)
}
return nil
}
b, err := json.Marshal(tags)
if err != nil {
return fmt.Errorf("marshal tags: %w", err)
}
err = p.meta.StoreAttribute(nil, bucket, "", tagHdr, b)
if err != nil {
return fmt.Errorf("set tags: %w", err)
}
return nil
}
func (p *Posix) GetBucketTagging(ctx context.Context, bucket string) (map[string]string, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
tags, err := p.getAttrTags(bucket, "", "")
if err != nil {
return nil, err
}
return tags, nil
}
func (p *Posix) DeleteBucketTagging(ctx context.Context, bucket string) error {
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
return p.PutBucketTagging(ctx, bucket, nil)
}
func (p *Posix) GetObjectTagging(ctx context.Context, bucket, object, versionId string) (map[string]string, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
if err := p.validateVersionId(versionId); err != nil {
return nil, err
}
if versionId == "" {
_, err = os.Stat(filepath.Join(bucket, object))
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if isErrNameTooLong(err) {
return nil, s3err.GetAPIError(s3err.ErrKeyTooLong)
}
if err != nil {
return nil, fmt.Errorf("stat object: %w", err)
}
}
if versionId != "" {
if !p.versioningEnabled() {
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, versionId)
}
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get obj versionId: %w", err)
}
if string(vId) != versionId {
bucket = filepath.Join(p.versioningDir, bucket)
object = filepath.Join(genObjVersionKey(object), versionId)
}
}
err = p.ensureNotDeleteMarker(bucket, object, versionId)
if err != nil {
return nil, err
}
return p.getAttrTags(bucket, object, versionId)
}
func (p *Posix) getAttrTags(bucket, object, versionId string) (map[string]string, error) {
tags := make(map[string]string)
b, err := p.meta.RetrieveAttribute(nil, bucket, object, tagHdr)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if versionId != "" {
return nil, s3err.GetNoSuchVersionErr(object, versionId)
}
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if errors.Is(err, meta.ErrNoSuchKey) {
if object != "" {
// return empty tag set for object tagging
return tags, nil
}
return nil, s3err.GetBucketErr(s3err.ErrBucketTaggingNotFound, bucket)
}
if err != nil {
return nil, fmt.Errorf("get tags: %w", err)
}
err = json.Unmarshal(b, &tags)
if err != nil {
return nil, fmt.Errorf("unmarshal tags: %w", err)
}
return tags, nil
}
func (p *Posix) PutObjectTagging(ctx context.Context, bucket, object, versionId string, tags map[string]string) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
if err := p.validateVersionId(versionId); err != nil {
return err
}
if versionId == "" {
_, err = os.Stat(filepath.Join(bucket, object))
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if isErrNameTooLong(err) {
return s3err.GetAPIError(s3err.ErrKeyTooLong)
}
if err != nil {
return fmt.Errorf("stat object: %w", err)
}
}
if versionId != "" {
if !p.versioningEnabled() {
//TODO: Maybe we need to return our custom error here?
return s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, versionId)
}
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return fmt.Errorf("get obj versionId: %w", err)
}
if string(vId) != versionId {
bucket = filepath.Join(p.versioningDir, bucket)
object = filepath.Join(genObjVersionKey(object), versionId)
}
}
err = p.ensureNotDeleteMarker(bucket, object, versionId)
if err != nil {
return err
}
if tags == nil {
err = p.meta.DeleteAttribute(bucket, object, tagHdr)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if versionId != "" {
return s3err.GetNoSuchVersionErr(object, versionId)
}
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if errors.Is(err, meta.ErrNoSuchKey) {
return nil
}
if err != nil {
return fmt.Errorf("remove tags: %w", err)
}
return nil
}
b, err := json.Marshal(tags)
if err != nil {
return fmt.Errorf("marshal tags: %w", err)
}
err = p.meta.StoreAttribute(nil, bucket, object, tagHdr, b)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if versionId != "" {
return s3err.GetNoSuchVersionErr(object, versionId)
}
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return fmt.Errorf("set tags: %w", err)
}
return nil
}
func (p *Posix) DeleteObjectTagging(ctx context.Context, bucket, object, versionId string) error {
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
return p.PutObjectTagging(ctx, bucket, object, versionId, nil)
}
func (p *Posix) PutBucketPolicy(ctx context.Context, bucket string, policy []byte) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
if policy == nil {
err := p.meta.DeleteAttribute(bucket, "", policykey)
if err != nil {
if errors.Is(err, meta.ErrNoSuchKey) {
return nil
}
return fmt.Errorf("remove policy: %w", err)
}
return nil
}
err = p.meta.StoreAttribute(nil, bucket, "", policykey, policy)
if err != nil {
return fmt.Errorf("set policy: %w", err)
}
return nil
}
func (p *Posix) GetBucketPolicy(ctx context.Context, bucket string) ([]byte, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
policy, err := p.meta.RetrieveAttribute(nil, bucket, "", policykey)
if errors.Is(err, meta.ErrNoSuchKey) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucketPolicy, bucket)
}
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return nil, fmt.Errorf("get bucket policy: %w", err)
}
return policy, nil
}
func (p *Posix) DeleteBucketPolicy(ctx context.Context, bucket string) error {
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
return p.PutBucketPolicy(ctx, bucket, nil)
}
func (p *Posix) PutBucketCors(ctx context.Context, bucket string, cors []byte) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
if cors == nil {
err = p.meta.DeleteAttribute(bucket, "", corskey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return fmt.Errorf("remove cors: %w", err)
}
return nil
}
err = p.meta.StoreAttribute(nil, bucket, "", corskey, cors)
if err != nil {
return fmt.Errorf("set cors: %w", err)
}
return nil
}
func (p *Posix) GetBucketCors(ctx context.Context, bucket string) ([]byte, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
cors, err := p.meta.RetrieveAttribute(nil, bucket, "", corskey)
if errors.Is(err, meta.ErrNoSuchKey) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchCORSConfiguration, bucket)
}
if err != nil {
return nil, err
}
return cors, nil
}
func (p *Posix) DeleteBucketCors(ctx context.Context, bucket string) error {
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
return p.PutBucketCors(ctx, bucket, nil)
}
func (p *Posix) PutBucketWebsite(ctx context.Context, bucket string, website []byte) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetAPIError(s3err.ErrInvalidBucketName)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
if website == nil {
err = p.meta.DeleteAttribute(bucket, "", websitekey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return fmt.Errorf("remove website: %w", err)
}
return nil
}
// The website configuration can be up to 128KB
// compress the data to fit in 64KB xattr limits
encoded, err := backend.MarshalWebsiteConfig(website, false)
if err != nil {
return err
}
err = p.meta.StoreAttribute(nil, bucket, "", websitekey, encoded)
if err != nil {
return fmt.Errorf("set website: %w", err)
}
return nil
}
func (p *Posix) GetBucketWebsite(ctx context.Context, bucket string) ([]byte, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if !p.isBucketValid(bucket) {
return nil, s3err.GetAPIError(s3err.ErrInvalidBucketName)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchBucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
website, err := p.meta.RetrieveAttribute(nil, bucket, "", websitekey)
if errors.Is(err, meta.ErrNoSuchKey) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchWebsiteConfiguration, bucket)
}
if err != nil {
return nil, err
}
decoded, err := backend.UnmarshalWebsiteConfig(website, false)
if err != nil {
return nil, err
}
return decoded, nil
}
func (p *Posix) DeleteBucketWebsite(ctx context.Context, bucket string) error {
if !p.isBucketValid(bucket) {
return s3err.GetAPIError(s3err.ErrInvalidBucketName)
}
return p.PutBucketWebsite(ctx, bucket, nil)
}
func (p *Posix) isBucketObjectLockEnabled(bucket string) error {
cfg, err := p.meta.RetrieveAttribute(nil, bucket, "", bucketLockKey)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if errors.Is(err, meta.ErrNoSuchKey) {
return s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)
}
if err != nil {
return fmt.Errorf("get object lock config: %w", err)
}
var bucketLockConfig auth.BucketLockConfig
if err := json.Unmarshal(cfg, &bucketLockConfig); err != nil {
return fmt.Errorf("parse bucket lock config: %w", err)
}
if !bucketLockConfig.Enabled {
return s3err.GetAPIError(s3err.ErrMissingObjectLockConfiguration)
}
return nil
}
func (p *Posix) PutObjectLockConfiguration(ctx context.Context, bucket string, config []byte) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return fmt.Errorf("stat bucket: %w", err)
}
if p.versioningEnabled() {
// if versioning is enabled on gateway level and bucket versioning
// status is not `Enabled`, object lock can't be enabled.
// if object lock has been enabled on bucket creation
// it means the versioning has been enabled alongside with object lock
// and it can't be suspended ever again
status, err := p.getBucketVersioningStatus(ctx, bucket)
if err != nil {
return err
}
if status != types.BucketVersioningStatusEnabled {
// if versioning is enabled on gateway level
return s3err.GetAPIError(s3err.ErrObjectLockConfigurationNotAllowed)
}
}
err = p.meta.StoreAttribute(nil, bucket, "", bucketLockKey, config)
if err != nil {
return fmt.Errorf("set object lock config: %w", err)
}
return nil
}
func (p *Posix) GetObjectLockConfiguration(ctx context.Context, bucket string) ([]byte, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
_, err = os.Stat(bucket)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetBucketErr(s3err.ErrNoSuchBucket, bucket)
}
if err != nil {
return nil, fmt.Errorf("stat bucket: %w", err)
}
cfg, err := p.meta.RetrieveAttribute(nil, bucket, "", bucketLockKey)
if errors.Is(err, meta.ErrNoSuchKey) {
return nil, s3err.GetBucketErr(s3err.ErrObjectLockConfigurationNotFound, bucket)
}
if err != nil {
return nil, fmt.Errorf("get object lock config: %w", err)
}
return cfg, nil
}
func (p *Posix) PutObjectLegalHold(ctx context.Context, bucket, object, versionId string, status bool) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
err = p.doesBucketAndObjectExist(bucket, object)
if err != nil {
return err
}
err = p.isBucketObjectLockEnabled(bucket)
if err != nil {
return err
}
if err := p.validateVersionId(versionId); err != nil {
return err
}
var statusData []byte
if status {
statusData = []byte{1}
} else {
statusData = []byte{0}
}
if versionId != "" {
if !p.versioningEnabled() {
//TODO: Maybe we need to return our custom error here?
return s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, versionId)
}
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return fmt.Errorf("get obj versionId: %w", err)
}
if string(vId) != versionId {
bucket = filepath.Join(p.versioningDir, bucket)
object = filepath.Join(genObjVersionKey(object), versionId)
}
}
err = p.ensureNotDeleteMarker(bucket, object, versionId)
if err != nil {
return err
}
err = p.meta.StoreAttribute(nil, bucket, object, objectLegalHoldKey, statusData)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if versionId != "" {
return s3err.GetNoSuchVersionErr(object, versionId)
}
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return fmt.Errorf("set object lock config: %w", err)
}
return nil
}
func (p *Posix) GetObjectLegalHold(ctx context.Context, bucket, object, versionId string) (*bool, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
err = p.doesBucketAndObjectExist(bucket, object)
if err != nil {
return nil, err
}
err = p.isBucketObjectLockEnabled(bucket)
if err != nil {
return nil, err
}
if err := p.validateVersionId(versionId); err != nil {
return nil, err
}
if versionId != "" {
if !p.versioningEnabled() {
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, versionId)
}
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get obj versionId: %w", err)
}
if string(vId) != versionId {
bucket = filepath.Join(p.versioningDir, bucket)
object = filepath.Join(genObjVersionKey(object), versionId)
}
}
err = p.ensureNotDeleteMarker(bucket, object, versionId)
if err != nil {
return nil, err
}
data, err := p.meta.RetrieveAttribute(nil, bucket, object, objectLegalHoldKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if versionId != "" {
return nil, s3err.GetNoSuchVersionErr(object, versionId)
}
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if errors.Is(err, meta.ErrNoSuchKey) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchObjectLockConfiguration)
}
if err != nil {
return nil, fmt.Errorf("get object lock config: %w", err)
}
result := data[0] == 1
return &result, nil
}
func (p *Posix) PutObjectRetention(ctx context.Context, bucket, object, versionId string, retention []byte) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
err = p.doesBucketAndObjectExist(bucket, object)
if err != nil {
return err
}
err = p.isBucketObjectLockEnabled(bucket)
if err != nil {
return err
}
if err := p.validateVersionId(versionId); err != nil {
return err
}
if versionId != "" {
if !p.versioningEnabled() {
//TODO: Maybe we need to return our custom error here?
return s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, versionId)
}
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return fmt.Errorf("get obj versionId: %w", err)
}
if string(vId) != versionId {
bucket = filepath.Join(p.versioningDir, bucket)
object = filepath.Join(genObjVersionKey(object), versionId)
}
}
err = p.ensureNotDeleteMarker(bucket, object, versionId)
if err != nil {
return err
}
err = p.meta.StoreAttribute(nil, bucket, object, objectRetentionKey, retention)
if err != nil {
return fmt.Errorf("set object lock config: %w", err)
}
return nil
}
func (p *Posix) GetObjectRetention(ctx context.Context, bucket, object, versionId string) ([]byte, error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
if !p.isBucketValid(bucket) {
return nil, s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
err = p.doesBucketAndObjectExist(bucket, object)
if err != nil {
return nil, err
}
err = p.isBucketObjectLockEnabled(bucket)
if err != nil {
return nil, err
}
if err := p.validateVersionId(versionId); err != nil {
return nil, err
}
if versionId != "" {
if !p.versioningEnabled() {
//TODO: Maybe we need to return our custom error here?
return nil, s3err.GetInvalidArgumentErr(s3err.InvalidArgVersionId, versionId)
}
vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get obj versionId: %w", err)
}
if string(vId) != versionId {
bucket = filepath.Join(p.versioningDir, bucket)
object = filepath.Join(genObjVersionKey(object), versionId)
}
}
err = p.ensureNotDeleteMarker(bucket, object, versionId)
if err != nil {
return nil, err
}
data, err := p.meta.RetrieveAttribute(nil, bucket, object, objectRetentionKey)
if errors.Is(err, fs.ErrNotExist) || isErrNotDir(err) {
if versionId != "" {
return nil, s3err.GetNoSuchVersionErr(object, versionId)
}
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if errors.Is(err, meta.ErrNoSuchKey) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchObjectLockConfiguration)
}
if err != nil {
return nil, fmt.Errorf("get object lock config: %w", err)
}
return data, nil
}
func (p *Posix) ChangeBucketOwner(ctx context.Context, bucket, owner string) error {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return err
}
defer release()
if !p.isBucketValid(bucket) {
return s3err.GetBucketErr(s3err.ErrInvalidBucketName, bucket)
}
return auth.UpdateBucketACLOwner(ctx, p, bucket, owner)
}
func listBucketFileInfos(bucketlinks bool) ([]fs.FileInfo, error) {
entries, err := os.ReadDir(".")
if err != nil {
return nil, fmt.Errorf("readdir buckets: %w", err)
}
var fis []fs.FileInfo
for _, entry := range entries {
fi, err := entry.Info()
if err != nil {
continue
}
if bucketlinks && entry.Type() == fs.ModeSymlink {
fi, err = os.Stat(entry.Name())
if err != nil {
// skip entries returning errors
continue
}
}
if !fi.IsDir() {
// buckets must be a directory
continue
}
fis = append(fis, fi)
}
return fis, nil
}
func (p *Posix) ListBucketsAndOwners(ctx context.Context) (buckets []s3response.Bucket, err error) {
release, err := p.acquireActionSlot(ctx)
if err != nil {
return nil, err
}
defer release()
fis, err := listBucketFileInfos(p.bucketlinks)
if err != nil {
return buckets, fmt.Errorf("listBucketFileInfos: %w", err)
}
for _, fi := range fis {
aclJSON, err := p.meta.RetrieveAttribute(nil, fi.Name(), "", aclkey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return buckets, fmt.Errorf("get acl tag: %w", err)
}
acl, err := auth.ParseACL(aclJSON)
if err != nil {
return buckets, fmt.Errorf("parse acl tag: %w", err)
}
buckets = append(buckets, s3response.Bucket{
Name: fi.Name(),
Owner: acl.Owner,
})
}
sort.SliceStable(buckets, func(i, j int) bool {
return buckets[i].Name < buckets[j].Name
})
return buckets, nil
}
func (p *Posix) NormalizeObjectKey(bucket, object string) string {
fullPath := filepath.Join(bucket, object)
key, err := filepath.Rel(filepath.Clean(bucket), fullPath)
if err != nil {
return fullPath
}
if key == "." {
return ""
}
return key
}
func (p *Posix) storeChecksums(f *os.File, bucket, object string, chs s3response.Checksum) error {
checksums, err := json.Marshal(chs)
if err != nil {
return fmt.Errorf("parse checksum: %w", err)
}
return p.meta.StoreAttribute(f, bucket, object, checksumsKey, checksums)
}
func (p *Posix) retrieveChecksums(f *os.File, bucket, object string) (checksums s3response.Checksum, err error) {
checksumsAtr, err := p.meta.RetrieveAttribute(f, bucket, object, checksumsKey)
if err != nil {
return checksums, err
}
err = json.Unmarshal(checksumsAtr, &checksums)
return checksums, err
}
func getString(str *string) string {
if str == nil {
return ""
}
return *str
}
// drainBody consumes and discards all remaining bytes from r.
// It is called after a server error is detected mid-stream so that
// the client can read the error response before the write-side of the
// connection is shut down.
func drainBody(r io.Reader) {
if r == nil {
return
}
_, _ = io.Copy(io.Discard, r)
}
func joinPathWithTrailer(paths ...string) string {
joined := filepath.Join(paths...)
if strings.HasSuffix(paths[len(paths)-1], "/") {
joined += "/"
}
return joined
}