feat: implement conditional writes

Closes #821

**Implements conditional operations across object APIs:**

* **PutObject** and **CompleteMultipartUpload**:
  Supports conditional writes with `If-Match` and `If-None-Match` headers (ETag comparisons).
  Evaluation is based on an existing object with the same key in the bucket. The operation is allowed only if the preconditions are satisfied. If no object exists for the key, these headers are ignored.

* **CopyObject** and **UploadPartCopy**:
  Adds conditional reads on the copy source object with the following headers:

  * `x-amz-copy-source-if-match`
  * `x-amz-copy-source-if-none-match`
  * `x-amz-copy-source-if-modified-since`
  * `x-amz-copy-source-if-unmodified-since`
    The first two are ETag comparisons, while the latter two compare against the copy source’s `LastModified` timestamp.

* **AbortMultipartUpload**:
  Supports the `x-amz-if-match-initiated-time` header, which is true only if the multipart upload’s initialization time matches.

* **DeleteObject**:
  Adds support for:

  * `If-Match` (ETag comparison)
  * `x-amz-if-match-last-modified-time` (LastModified comparison)
  * `x-amz-if-match-size` (object size comparison)

Additionally, this PR updates precondition date parsing logic to support both **RFC1123** and **RFC3339** formats. Dates set in the future are ignored, matching AWS S3 behavior.
This commit is contained in:
niksis02
2025-09-08 23:36:05 +04:00
parent 04fbe405ca
commit 7a098b925f
12 changed files with 1111 additions and 160 deletions

View File

@@ -301,6 +301,11 @@ func (az *Azure) PutObject(ctx context.Context, po s3response.PutObjectInput) (s
return s3response.PutObjectOutput{}, err
}
err = az.evaluateWritePreconditions(ctx, po.Bucket, po.Key, po.IfMatch, po.IfNoneMatch)
if err != nil {
return s3response.PutObjectOutput{}, err
}
metadata := parseMetadata(po.Metadata)
// Store the "Expires" property in the object metadata
@@ -850,6 +855,42 @@ func (az *Azure) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input
}
func (az *Azure) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) {
if input.IfMatch != nil || input.IfMatchLastModifiedTime != nil || input.IfMatchSize != nil {
// evaluate the preconditions before deleting the object
props, err := az.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: input.Bucket,
Key: input.Key,
})
if err != nil && !errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchKey)) {
// if object doesn't exist, skip preconditions
// if unexpected error shows up, return the error
return nil, err
}
if err == nil {
var etag string
if props.ETag != nil {
etag = *props.ETag
}
var lastMod time.Time
if props.LastModified != nil {
lastMod = *props.LastModified
}
var size int64
if props.ContentLength != nil {
size = *props.ContentLength
}
err := backend.EvaluateObjectDeletePreconditions(etag, lastMod, size,
backend.ObjectDeletePreconditions{
IfMatch: input.IfMatch,
IfMatchLastModTime: input.IfMatchLastModifiedTime,
IfMatchSize: input.IfMatchSize,
})
if err != nil {
return nil, err
}
}
}
_, err := az.client.DeleteBlob(ctx, *input.Bucket, *input.Key, nil)
if err != nil {
azerr, ok := err.(*azcore.ResponseError)
@@ -899,6 +940,26 @@ func (az *Azure) CopyObject(ctx context.Context, input s3response.CopyObjectInpu
if err != nil {
return s3response.CopyObjectOutput{}, err
}
srcBucket, srcObj, _, err := backend.ParseCopySource(*input.CopySource)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
if !areNils(input.CopySourceIfMatch, input.CopySourceIfNoneMatch) || !areNils(input.CopySourceIfModifiedSince, input.CopySourceIfUnmodifiedSince) {
_, err = az.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: &srcBucket,
Key: &srcObj,
IfMatch: input.CopySourceIfMatch,
IfNoneMatch: input.CopySourceIfNoneMatch,
IfModifiedSince: input.CopySourceIfModifiedSince,
IfUnmodifiedSince: input.CopySourceIfUnmodifiedSince,
})
if err != nil {
return s3response.CopyObjectOutput{}, err
}
}
if strings.Join([]string{*input.Bucket, *input.Key}, "/") == *input.CopySource {
if input.MetadataDirective != types.MetadataDirectiveReplace {
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrInvalidCopyDest)
@@ -977,11 +1038,6 @@ func (az *Azure) CopyObject(ctx context.Context, input s3response.CopyObjectInpu
}, nil
}
srcBucket, srcObj, _, err := backend.ParseCopySource(*input.CopySource)
if err != nil {
return s3response.CopyObjectOutput{}, err
}
// Get the source object
downloadResp, err := az.client.DownloadStream(ctx, srcBucket, srcObj, nil)
if err != nil {
@@ -1210,7 +1266,7 @@ func (az *Azure) UploadPart(ctx context.Context, input *s3.UploadPartInput) (*s3
func (az *Azure) UploadPartCopy(ctx context.Context, input *s3.UploadPartCopyInput) (s3response.CopyPartResult, error) {
client, err := az.getBlockBlobClient(*input.Bucket, *input.Key)
if err != nil {
return s3response.CopyPartResult{}, nil
return s3response.CopyPartResult{}, err
}
if err := az.checkIfMpExists(ctx, *input.Bucket, *input.Key, *input.UploadId); err != nil {
@@ -1413,6 +1469,22 @@ func (az *Azure) ListMultipartUploads(ctx context.Context, input *s3.ListMultipa
// Cleans up the initiated multipart upload in .sgwtmp namespace
func (az *Azure) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) error {
tmpPath := createMetaTmpPath(*input.Key, *input.UploadId)
if input.IfMatchInitiatedTime != nil {
client, err := az.getBlobClient(*input.Bucket, tmpPath)
if err != nil {
return err
}
resp, err := client.GetProperties(ctx, nil)
if err != nil {
return azureErrToS3Err(err)
}
if resp.LastModified != nil && resp.LastModified.Unix() != input.IfMatchInitiatedTime.Unix() {
return s3err.GetAPIError(s3err.ErrPreconditionFailed)
}
}
_, err := az.client.DeleteBlob(ctx, *input.Bucket, tmpPath, nil)
if err != nil {
return parseMpError(err)
@@ -1440,6 +1512,11 @@ func (az *Azure) AbortMultipartUpload(ctx context.Context, input *s3.AbortMultip
func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (s3response.CompleteMultipartUploadResult, string, error) {
var res s3response.CompleteMultipartUploadResult
err := az.evaluateWritePreconditions(ctx, input.Bucket, input.Key, input.IfMatch, input.IfNoneMatch)
if err != nil {
return s3response.CompleteMultipartUploadResult{}, "", err
}
tmpPath := createMetaTmpPath(*input.Key, *input.UploadId)
blobClient, err := az.getBlobClient(*input.Bucket, tmpPath)
if err != nil {
@@ -2058,6 +2135,29 @@ func (az *Azure) deleteContainerMetaData(ctx context.Context, bucket, key string
return nil
}
func (az *Azure) evaluateWritePreconditions(ctx context.Context, bucket, object, ifMatch, ifNoneMatch *string) error {
if areNils(ifMatch, ifNoneMatch) {
return nil
}
// call HeadObject to evaluate preconditions
// if object doesn't exist, move forward with the object creation
// otherwise return the error
_, err := az.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: bucket,
Key: object,
IfMatch: ifMatch,
IfNoneMatch: ifNoneMatch,
})
if errors.Is(err, s3err.GetAPIError(s3err.ErrNotModified)) {
return s3err.GetAPIError(s3err.ErrPreconditionFailed)
}
if err != nil && !errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchKey)) {
return err
}
return nil
}
func getAclFromMetadata(meta map[string]*string, key key) (*auth.ACL, error) {
data, ok := meta[string(key)]
if !ok {
@@ -2105,3 +2205,13 @@ func convertAzureEtag(etag *azcore.ETag) string {
return *backend.TrimEtag(str) + "-1"
}
func areNils[T any](args ...*T) bool {
for _, arg := range args {
if arg != nil {
return false
}
}
return true
}

View File

@@ -532,3 +532,41 @@ func EvaluatePreconditions(etag string, modTime time.Time, preconditions PreCond
return nil
}
// EvaluateMatchPreconditions evaluates if-match and if-none-match preconditions
func EvaluateMatchPreconditions(etag string, ifMatch, ifNoneMatch *string) error {
if ifMatch != nil && *ifMatch != etag {
return errPreconditionFailed
}
if ifNoneMatch != nil && *ifNoneMatch == etag {
return errPreconditionFailed
}
return nil
}
type ObjectDeletePreconditions struct {
IfMatch *string
IfMatchLastModTime *time.Time
IfMatchSize *int64
}
// EvaluateObjectDeletePreconditions evaluates preconditions for DeleteObject
func EvaluateObjectDeletePreconditions(etag string, modTime time.Time, size int64, preconditions ObjectDeletePreconditions) error {
ifMatch := preconditions.IfMatch
if ifMatch != nil && *ifMatch != etag {
return errPreconditionFailed
}
ifMatchTime := preconditions.IfMatchLastModTime
if ifMatchTime != nil && ifMatchTime.Unix() != modTime.Unix() {
return errPreconditionFailed
}
ifMatchSize := preconditions.IfMatchSize
if ifMatchSize != nil && *ifMatchSize != size {
return errPreconditionFailed
}
return nil
}

View File

@@ -1401,6 +1401,14 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
return res, "", err
}
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
if err == nil {
err = backend.EvaluateMatchPreconditions(string(b), input.IfMatch, input.IfNoneMatch)
if err != nil {
return res, "", err
}
}
objdir := filepath.Join(MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
checksums, err := p.retrieveChecksums(nil, bucket, filepath.Join(objdir, uploadID))
@@ -2009,11 +2017,17 @@ func (p *Posix) AbortMultipartUpload(_ context.Context, mpu *s3.AbortMultipartUp
sum := sha256.Sum256([]byte(object))
objdir := filepath.Join(bucket, MetaTmpMultipartDir, fmt.Sprintf("%x", sum))
_, err = os.Stat(filepath.Join(objdir, uploadID))
f, err := os.Stat(filepath.Join(objdir, uploadID))
if err != nil {
return s3err.GetAPIError(s3err.ErrNoSuchUpload)
}
if mpu.IfMatchInitiatedTime != nil {
if mpu.IfMatchInitiatedTime.Unix() != f.ModTime().Unix() {
return s3err.GetAPIError(s3err.ErrPreconditionFailed)
}
}
err = os.RemoveAll(filepath.Join(objdir, uploadID))
if err != nil {
return fmt.Errorf("remove multipart upload container: %w", err)
@@ -2600,6 +2614,32 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
return s3response.CopyPartResult{}, err
}
srcf, err := os.Open(objPath)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("open object: %w", err)
}
defer srcf.Close()
// evaluate preconditions
b, err := p.meta.RetrieveAttribute(srcf, srcBucket, srcObject, etagkey)
srcEtag := string(b)
if err != nil {
srcEtag = ""
}
err = backend.EvaluatePreconditions(srcEtag, fi.ModTime(), backend.PreConditions{
IfMatch: upi.CopySourceIfMatch,
IfNoneMatch: upi.CopySourceIfNoneMatch,
IfModSince: upi.CopySourceIfModifiedSince,
IfUnmodeSince: upi.CopySourceIfUnmodifiedSince,
})
if err != nil {
return s3response.CopyPartResult{}, err
}
f, err := p.openTmpFile(filepath.Join(*upi.Bucket, objdir),
*upi.Bucket, partPath, length, acct, doFalloc, p.forceNoTmpFile)
if err != nil {
@@ -2610,15 +2650,6 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
}
defer f.cleanup()
srcf, err := os.Open(objPath)
if errors.Is(err, fs.ErrNotExist) {
return s3response.CopyPartResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
if err != nil {
return s3response.CopyPartResult{}, fmt.Errorf("open object: %w", err)
}
defer srcf.Close()
rdr := io.NewSectionReader(srcf, startOffset, length)
hash := md5.New()
tr := io.TeeReader(rdr, hash)
@@ -2748,6 +2779,15 @@ func (p *Posix) PutObject(ctx context.Context, po s3response.PutObjectInput) (s3
name := filepath.Join(*po.Bucket, *po.Key)
// evaluate preconditions
etagBytes, err := p.meta.RetrieveAttribute(nil, *po.Bucket, *po.Key, etagkey)
if err == nil {
err := backend.EvaluateMatchPreconditions(string(etagBytes), po.IfMatch, po.IfNoneMatch)
if err != nil {
return s3response.PutObjectOutput{}, err
}
}
uid, gid, doChown := p.getChownIDs(acct)
contentLength := int64(0)
@@ -3073,6 +3113,30 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
return nil, err
}
evalPreconditions := func(f os.FileInfo, bucket, object string) error {
var err error
if f == nil {
f, err = os.Stat(filepath.Join(bucket, object))
if err != nil {
return nil
}
}
b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey)
etag := string(b)
if err != nil {
etag = ""
}
// evaluate preconditions
return backend.EvaluateObjectDeletePreconditions(etag, f.ModTime(), f.Size(),
backend.ObjectDeletePreconditions{
IfMatch: input.IfMatch,
IfMatchLastModTime: input.IfMatchLastModifiedTime,
IfMatchSize: input.IfMatchSize,
})
}
// Directory objects can't have versions
if !isDir && p.versioningEnabled() && vStatus != "" {
if getString(input.VersionId) == "" {
@@ -3089,6 +3153,11 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
err = evalPreconditions(fi, bucket, object)
if err != nil {
return nil, err
}
acct, ok := ctx.Value("account").(auth.Account)
if !ok {
acct = auth.Account{}
@@ -3148,6 +3217,11 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
}
if string(vId) == *input.VersionId {
// evaluate preconditions
err := evalPreconditions(nil, bucket, object)
if err != nil {
return nil, err
}
// if the specified VersionId is the same as in the latest version,
// remove the latest version, find the latest version from the versioning
// directory and move to the place of the deleted object, to make it the latest
@@ -3242,6 +3316,11 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
}, nil
}
err = evalPreconditions(nil, versionPath, *input.VersionId)
if err != nil {
return nil, err
}
isDelMarker, _ := p.isObjDeleteMarker(versionPath, *input.VersionId)
err = os.Remove(filepath.Join(versionPath, *input.VersionId))
@@ -3289,6 +3368,11 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
return &s3.DeleteObjectOutput{}, nil
}
err = evalPreconditions(fi, bucket, object)
if err != nil {
return nil, err
}
err = os.Remove(objpath)
if errors.Is(err, fs.ErrNotExist) {
return nil, s3err.GetAPIError(s3err.ErrNoSuchKey)
@@ -4031,6 +4115,22 @@ func (p *Posix) CopyObject(ctx context.Context, input s3response.CopyObjectInput
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
}
b, err := p.meta.RetrieveAttribute(f, srcBucket, srcObject, etagkey)
srcEtag := string(b)
if err != nil {
srcEtag = ""
}
err = backend.EvaluatePreconditions(srcEtag, fi.ModTime(), backend.PreConditions{
IfMatch: input.CopySourceIfMatch,
IfNoneMatch: input.CopySourceIfNoneMatch,
IfModSince: input.CopySourceIfModifiedSince,
IfUnmodeSince: input.CopySourceIfUnmodifiedSince,
})
if err != nil {
return s3response.CopyObjectOutput{}, err
}
mdmap := make(map[string]string)
p.loadObjectMetaData(nil, srcBucket, srcObject, &fi, mdmap)