mirror of
https://github.com/versity/versitygw.git
synced 2026-04-21 21:20:29 +00:00
fix: fix azure multipart upload objects masking
As multipart uploads are translated to blobs in azure blob storage, they were visible in ListObjects(V2) as complete objects. Now the blobs with multipart prefix are filtered out during listing. The listing logic is rewritten client-side to implement proper S3 semantics: flat blob enumeration with manual delimiter handling, correct truncation (IsTruncated only set when more items genuinely exist beyond maxKeys), and StartAfter/Marker/ContinuationToken applied via the lexicographic max of both constraints in ListObjectsV2. For the same reason bucket deletion was not allowed. Now multipart objects are explicitly checked on bucket deletion and any pending multipart upload doesn't block the bucket deletion anymore.
This commit is contained in:
@@ -304,15 +304,21 @@ func (az *Azure) HeadBucket(ctx context.Context, input *s3.HeadBucketInput) (*s3
|
||||
func (az *Azure) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
pager := az.client.NewListBlobsFlatPager(bucket, nil)
|
||||
|
||||
pg, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return azureErrToS3Err(err)
|
||||
for pager.More() {
|
||||
pg, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
for _, item := range pg.Segment.BlobItems {
|
||||
// ignore temp multipart objects when determining if bucket non-empty
|
||||
if !strings.HasPrefix(backend.GetStringFromPtr(item.Name), string(metaTmpMultipartPrefix)) {
|
||||
return s3err.GetAPIError(s3err.ErrBucketNotEmpty)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(pg.Segment.BlobItems) > 0 {
|
||||
return s3err.GetAPIError(s3err.ErrBucketNotEmpty)
|
||||
}
|
||||
_, err = az.client.DeleteContainer(ctx, bucket, nil)
|
||||
_, err := az.client.DeleteContainer(ctx, bucket, nil)
|
||||
return azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
@@ -659,92 +665,124 @@ func (az *Azure) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s
|
||||
maxKeys = *input.MaxKeys
|
||||
}
|
||||
|
||||
pager := client.NewListBlobsHierarchyPager(*input.Delimiter, &container.ListBlobsHierarchyOptions{
|
||||
MaxResults: &maxKeys,
|
||||
Prefix: input.Prefix,
|
||||
delimiter := backend.GetStringFromPtr(input.Delimiter)
|
||||
prefix := backend.GetStringFromPtr(input.Prefix)
|
||||
effectiveMarker := backend.GetStringFromPtr(input.Marker)
|
||||
|
||||
if maxKeys == 0 {
|
||||
isFalse := false
|
||||
return s3response.ListObjectsResult{
|
||||
IsTruncated: &isFalse,
|
||||
MaxKeys: &maxKeys,
|
||||
Name: input.Bucket,
|
||||
Prefix: backend.GetPtrFromString(prefix),
|
||||
Marker: backend.GetPtrFromString(effectiveMarker),
|
||||
Delimiter: backend.GetPtrFromString(delimiter),
|
||||
CommonPrefixes: []types.CommonPrefix{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Use flat listing (empty delimiter) and handle delimiter logic client-side,
|
||||
// matching S3 semantics. Only pass Prefix and Marker to Azure.
|
||||
pager := client.NewListBlobsHierarchyPager("", &container.ListBlobsHierarchyOptions{
|
||||
Prefix: input.Prefix,
|
||||
Marker: input.Marker,
|
||||
})
|
||||
|
||||
var objects []s3response.Object
|
||||
var cPrefixes []types.CommonPrefix
|
||||
var nextMarker *string
|
||||
var isTruncated bool
|
||||
cpSet := make(map[string]struct{})
|
||||
var pastMax, isTruncated bool
|
||||
var candidateMarker string
|
||||
var totalFound int32
|
||||
|
||||
// Convert marker to filter criteria
|
||||
var markerFilter string
|
||||
if input.Marker != nil && *input.Marker != "" {
|
||||
markerFilter = *input.Marker
|
||||
}
|
||||
|
||||
// Loop through pages until we have enough objects or no more pages
|
||||
objectsFound := int32(0)
|
||||
for pager.More() && objectsFound < maxKeys {
|
||||
loop:
|
||||
for pager.More() {
|
||||
resp, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return s3response.ListObjectsResult{}, azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
// Process objects from this page
|
||||
var pageObjects []s3response.Object
|
||||
for _, v := range resp.Segment.BlobItems {
|
||||
// Skip objects that come before or equal to marker
|
||||
if markerFilter != "" && *v.Name <= markerFilter {
|
||||
name := backend.GetStringFromPtr(v.Name)
|
||||
|
||||
// Filter out multipart upload blobs
|
||||
if strings.HasPrefix(name, string(metaTmpMultipartPrefix)) {
|
||||
continue
|
||||
}
|
||||
|
||||
pageObjects = append(pageObjects, s3response.Object{
|
||||
ETag: backend.GetPtrFromString(convertAzureEtag(v.Properties.ETag)),
|
||||
Key: v.Name,
|
||||
LastModified: v.Properties.LastModified,
|
||||
Size: v.Properties.ContentLength,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
Owner: &types.Owner{
|
||||
ID: &acl.Owner,
|
||||
},
|
||||
})
|
||||
|
||||
objectsFound++
|
||||
if objectsFound >= maxKeys {
|
||||
// Set next marker to the current object name for pagination
|
||||
nextMarker = v.Name
|
||||
isTruncated = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
objects = append(objects, pageObjects...)
|
||||
|
||||
// Process common prefixes from this page
|
||||
for _, v := range resp.Segment.BlobPrefixes {
|
||||
// Skip prefixes that come before or equal to marker
|
||||
if markerFilter != "" && *v.Name <= markerFilter {
|
||||
continue
|
||||
// Apply delimiter logic to determine if this blob contributes to
|
||||
// a common prefix or is a regular object
|
||||
isCP := false
|
||||
cpKey := ""
|
||||
if delimiter != "" {
|
||||
suffix := strings.TrimPrefix(name, prefix)
|
||||
before, _, found := strings.Cut(suffix, delimiter)
|
||||
if found {
|
||||
isCP = true
|
||||
cpKey = prefix + before + delimiter
|
||||
}
|
||||
}
|
||||
|
||||
cPrefixes = append(cPrefixes, types.CommonPrefix{
|
||||
Prefix: v.Name,
|
||||
})
|
||||
if isCP {
|
||||
// Skip common prefixes at or before the marker
|
||||
if cpKey <= effectiveMarker {
|
||||
continue
|
||||
}
|
||||
// Deduplicate: multiple blobs can map to the same common prefix
|
||||
if _, exists := cpSet[cpKey]; exists {
|
||||
continue
|
||||
}
|
||||
// If we already reached maxKeys, this new unique CP means truncation
|
||||
if pastMax {
|
||||
isTruncated = true
|
||||
break loop
|
||||
}
|
||||
cp := cpKey
|
||||
cPrefixes = append(cPrefixes, types.CommonPrefix{Prefix: &cp})
|
||||
cpSet[cpKey] = struct{}{}
|
||||
candidateMarker = cpKey
|
||||
totalFound++
|
||||
if totalFound == maxKeys {
|
||||
pastMax = true
|
||||
}
|
||||
} else {
|
||||
if pastMax {
|
||||
isTruncated = true
|
||||
break loop
|
||||
}
|
||||
objects = append(objects, s3response.Object{
|
||||
ETag: backend.GetPtrFromString(convertAzureEtag(v.Properties.ETag)),
|
||||
Key: v.Name,
|
||||
LastModified: v.Properties.LastModified,
|
||||
Size: v.Properties.ContentLength,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
Owner: &types.Owner{
|
||||
ID: &acl.Owner,
|
||||
},
|
||||
})
|
||||
candidateMarker = name
|
||||
totalFound++
|
||||
if totalFound == maxKeys {
|
||||
pastMax = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we've reached maxKeys, break
|
||||
if objectsFound >= maxKeys {
|
||||
break
|
||||
}
|
||||
|
||||
// If Azure indicates more pages but we need to continue for more objects
|
||||
if resp.NextMarker != nil && *resp.NextMarker != "" && objectsFound < maxKeys {
|
||||
continue
|
||||
}
|
||||
if !isTruncated {
|
||||
candidateMarker = ""
|
||||
}
|
||||
|
||||
return s3response.ListObjectsResult{
|
||||
Contents: objects,
|
||||
Marker: backend.GetPtrFromString(*input.Marker),
|
||||
MaxKeys: input.MaxKeys,
|
||||
Marker: backend.GetPtrFromString(effectiveMarker),
|
||||
MaxKeys: &maxKeys,
|
||||
Name: input.Bucket,
|
||||
NextMarker: nextMarker,
|
||||
Prefix: backend.GetPtrFromString(*input.Prefix),
|
||||
NextMarker: backend.GetPtrFromString(candidateMarker),
|
||||
Prefix: backend.GetPtrFromString(prefix),
|
||||
IsTruncated: &isTruncated,
|
||||
Delimiter: backend.GetPtrFromString(*input.Delimiter),
|
||||
Delimiter: backend.GetPtrFromString(delimiter),
|
||||
CommonPrefixes: cPrefixes,
|
||||
}, nil
|
||||
}
|
||||
@@ -772,95 +810,139 @@ func (az *Azure) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input
|
||||
maxKeys = *input.MaxKeys
|
||||
}
|
||||
|
||||
pager := client.NewListBlobsHierarchyPager(*input.Delimiter, &container.ListBlobsHierarchyOptions{
|
||||
Marker: input.ContinuationToken,
|
||||
MaxResults: &maxKeys,
|
||||
Prefix: input.Prefix,
|
||||
delimiter := backend.GetStringFromPtr(input.Delimiter)
|
||||
prefix := backend.GetStringFromPtr(input.Prefix)
|
||||
startAfterVal := backend.GetStringFromPtr(input.StartAfter)
|
||||
continuationTokenVal := backend.GetStringFromPtr(input.ContinuationToken)
|
||||
|
||||
// Take the lexicographically larger of startAfter and continuationToken so
|
||||
// listing starts strictly after both constraints.
|
||||
effectiveMarker := startAfterVal
|
||||
if continuationTokenVal > effectiveMarker {
|
||||
effectiveMarker = continuationTokenVal
|
||||
}
|
||||
|
||||
if maxKeys == 0 {
|
||||
isFalse := false
|
||||
return s3response.ListObjectsV2Result{
|
||||
IsTruncated: &isFalse,
|
||||
MaxKeys: &maxKeys,
|
||||
Name: input.Bucket,
|
||||
Prefix: backend.GetPtrFromString(prefix),
|
||||
ContinuationToken: backend.GetPtrFromString(continuationTokenVal),
|
||||
Delimiter: backend.GetPtrFromString(delimiter),
|
||||
StartAfter: backend.GetPtrFromString(startAfterVal),
|
||||
CommonPrefixes: []types.CommonPrefix{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Use flat listing (empty delimiter) and handle delimiter logic client-side,
|
||||
// matching S3 semantics. Only pass Prefix and Marker to Azure.
|
||||
// effectiveMarker is passed as Marker so Azure skips blobs before it.
|
||||
pager := client.NewListBlobsHierarchyPager("", &container.ListBlobsHierarchyOptions{
|
||||
Prefix: input.Prefix,
|
||||
Marker: backend.GetPtrFromString(effectiveMarker),
|
||||
})
|
||||
|
||||
var objects []s3response.Object
|
||||
var resp container.ListBlobsHierarchyResponse
|
||||
var cPrefixes []types.CommonPrefix
|
||||
cpSet := make(map[string]struct{})
|
||||
var pastMax, isTruncated bool
|
||||
var candidateMarker string
|
||||
var totalFound int32
|
||||
|
||||
// Loop through pages until we find objects or no more pages
|
||||
for {
|
||||
resp, err = pager.NextPage(ctx)
|
||||
loop:
|
||||
for pager.More() {
|
||||
resp, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return s3response.ListObjectsV2Result{}, azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
// Convert Azure objects to S3 objects
|
||||
var pageObjects []s3response.Object
|
||||
for _, v := range resp.Segment.BlobItems {
|
||||
pageObjects = append(pageObjects, s3response.Object{
|
||||
ETag: backend.GetPtrFromString(convertAzureEtag(v.Properties.ETag)),
|
||||
Key: v.Name,
|
||||
LastModified: v.Properties.LastModified,
|
||||
Size: v.Properties.ContentLength,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
Owner: &types.Owner{
|
||||
ID: &acl.Owner,
|
||||
},
|
||||
})
|
||||
}
|
||||
name := backend.GetStringFromPtr(v.Name)
|
||||
|
||||
// If StartAfter is specified, filter objects
|
||||
if input.StartAfter != nil && *input.StartAfter != "" {
|
||||
startAfter := *input.StartAfter
|
||||
startIndex := -1
|
||||
for i, obj := range pageObjects {
|
||||
if *obj.Key > startAfter {
|
||||
startIndex = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if startIndex != -1 {
|
||||
// Found objects after StartAfter in this page
|
||||
objects = append(objects, pageObjects[startIndex:]...)
|
||||
break
|
||||
} else {
|
||||
// No objects after StartAfter in this page
|
||||
// Check if there are more pages to examine
|
||||
if resp.NextMarker == nil || *resp.NextMarker == "" {
|
||||
// No more pages, so no objects after StartAfter
|
||||
break
|
||||
}
|
||||
// Continue to next page without adding any objects
|
||||
// Filter out multipart upload blobs
|
||||
if strings.HasPrefix(name, string(metaTmpMultipartPrefix)) {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// No StartAfter specified, add all objects from this page
|
||||
objects = append(objects, pageObjects...)
|
||||
break
|
||||
|
||||
// Apply delimiter logic to determine if this blob contributes to
|
||||
// a common prefix or is a regular object
|
||||
isCP := false
|
||||
cpKey := ""
|
||||
if delimiter != "" {
|
||||
suffix := strings.TrimPrefix(name, prefix)
|
||||
before, _, found := strings.Cut(suffix, delimiter)
|
||||
if found {
|
||||
isCP = true
|
||||
cpKey = prefix + before + delimiter
|
||||
}
|
||||
}
|
||||
|
||||
if isCP {
|
||||
// Skip common prefixes at or before the effective marker
|
||||
if cpKey <= effectiveMarker {
|
||||
continue
|
||||
}
|
||||
// Deduplicate: multiple blobs can map to the same common prefix
|
||||
if _, exists := cpSet[cpKey]; exists {
|
||||
continue
|
||||
}
|
||||
// If we already reached maxKeys, this new unique CP means truncation
|
||||
if pastMax {
|
||||
isTruncated = true
|
||||
break loop
|
||||
}
|
||||
cp := cpKey
|
||||
cPrefixes = append(cPrefixes, types.CommonPrefix{Prefix: &cp})
|
||||
cpSet[cpKey] = struct{}{}
|
||||
candidateMarker = cpKey
|
||||
totalFound++
|
||||
if totalFound == maxKeys {
|
||||
pastMax = true
|
||||
}
|
||||
} else {
|
||||
if pastMax {
|
||||
isTruncated = true
|
||||
break loop
|
||||
}
|
||||
objects = append(objects, s3response.Object{
|
||||
ETag: backend.GetPtrFromString(convertAzureEtag(v.Properties.ETag)),
|
||||
Key: v.Name,
|
||||
LastModified: v.Properties.LastModified,
|
||||
Size: v.Properties.ContentLength,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
Owner: &types.Owner{
|
||||
ID: &acl.Owner,
|
||||
},
|
||||
})
|
||||
candidateMarker = name
|
||||
totalFound++
|
||||
if totalFound == maxKeys {
|
||||
pastMax = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var cPrefixes []types.CommonPrefix
|
||||
for _, v := range resp.Segment.BlobPrefixes {
|
||||
cPrefixes = append(cPrefixes, types.CommonPrefix{
|
||||
Prefix: v.Name,
|
||||
})
|
||||
if !isTruncated {
|
||||
candidateMarker = ""
|
||||
}
|
||||
|
||||
var isTruncated bool
|
||||
var nextMarker *string
|
||||
// If Azure returned a NextMarker, set it for the next request
|
||||
if resp.NextMarker != nil && *resp.NextMarker != "" {
|
||||
nextMarker = resp.NextMarker
|
||||
isTruncated = true
|
||||
}
|
||||
keyCount := int32(len(objects) + len(cPrefixes))
|
||||
|
||||
return s3response.ListObjectsV2Result{
|
||||
Contents: objects,
|
||||
ContinuationToken: backend.GetPtrFromString(*input.ContinuationToken),
|
||||
MaxKeys: input.MaxKeys,
|
||||
ContinuationToken: backend.GetPtrFromString(continuationTokenVal),
|
||||
KeyCount: &keyCount,
|
||||
MaxKeys: &maxKeys,
|
||||
Name: input.Bucket,
|
||||
NextContinuationToken: nextMarker,
|
||||
Prefix: backend.GetPtrFromString(*input.Prefix),
|
||||
NextContinuationToken: backend.GetPtrFromString(candidateMarker),
|
||||
Prefix: backend.GetPtrFromString(prefix),
|
||||
IsTruncated: &isTruncated,
|
||||
Delimiter: backend.GetPtrFromString(*input.Delimiter),
|
||||
Delimiter: backend.GetPtrFromString(delimiter),
|
||||
CommonPrefixes: cPrefixes,
|
||||
StartAfter: backend.GetPtrFromString(*input.StartAfter),
|
||||
StartAfter: backend.GetPtrFromString(startAfterVal),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -585,3 +585,205 @@ func ListObjects_non_truncated_common_prefixes(s *S3Conf) error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjects should not list any pending multipart uploads
|
||||
// and no pending mp should block the bucket from deletion
|
||||
func ListObjects_should_not_list_pending_mps(s *S3Conf) error {
|
||||
testName := "ListObjects_should_not_list_pending_mps"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
for i := range 5 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("obj-%d", i)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
res, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
|
||||
Bucket: &bucket,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(res.Contents) != 0 {
|
||||
return fmt.Errorf("expected empty object list result, instead got %v", res.Contents)
|
||||
}
|
||||
if len(res.CommonPrefixes) != 0 {
|
||||
return fmt.Errorf("expected empty object common prefixes result, instead got %v", res.CommonPrefixes)
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err = s3client.DeleteBucket(ctx, &s3.DeleteBucketInput{
|
||||
Bucket: &bucket,
|
||||
})
|
||||
cancel()
|
||||
return err
|
||||
}, withSkipTearDown())
|
||||
}
|
||||
|
||||
// ListObjects with a marker should not surface pending multipart uploads
|
||||
// even when real objects are interleaved with the marker boundary.
|
||||
func ListObjects_mp_masking_with_marker(s *S3Conf) error {
|
||||
testName := "ListObjects_mp_masking_with_marker"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads with keys that sort after all real objects
|
||||
for i := range 3 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
contents, err := putObjects(s3client, []string{"aaa", "bbb", "ccc"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
|
||||
Bucket: &bucket,
|
||||
Marker: getPtr("aaa"),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Expect only bbb and ccc (after marker "aaa"), no multipart upload objects
|
||||
if !compareObjects(contents[1:], out.Contents) {
|
||||
return fmt.Errorf("expected objects %v, instead got %v",
|
||||
contents[1:], out.Contents)
|
||||
}
|
||||
if out.IsTruncated == nil || *out.IsTruncated {
|
||||
return fmt.Errorf("expected non-truncated result")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjects truncation should count only real objects, not pending multipart uploads.
|
||||
func ListObjects_mp_masking_truncation(s *S3Conf) error {
|
||||
testName := "ListObjects_mp_masking_truncation"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads with keys that sort after real objects
|
||||
for i := range 2 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
contents, err := putObjects(s3client, []string{"obj-a", "obj-b", "obj-c", "obj-d"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxKeys := int32(2)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out1, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
|
||||
Bucket: &bucket,
|
||||
MaxKeys: &maxKeys,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out1.IsTruncated == nil || !*out1.IsTruncated {
|
||||
return fmt.Errorf("expected first page to be truncated")
|
||||
}
|
||||
if !compareObjects(contents[:2], out1.Contents) {
|
||||
return fmt.Errorf("expected first page objects %v, instead got %v",
|
||||
contents[:2], out1.Contents)
|
||||
}
|
||||
if out1.NextMarker == nil || *out1.NextMarker == "" {
|
||||
return fmt.Errorf("expected non-empty NextMarker")
|
||||
}
|
||||
if *out1.NextMarker != "obj-b" {
|
||||
return fmt.Errorf("expected NextMarker to be obj-b, instead got %v", *out1.NextMarker)
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
|
||||
out2, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
|
||||
Bucket: &bucket,
|
||||
Marker: out1.NextMarker,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out2.IsTruncated == nil || *out2.IsTruncated {
|
||||
return fmt.Errorf("expected second page to not be truncated")
|
||||
}
|
||||
if !compareObjects(contents[2:], out2.Contents) {
|
||||
return fmt.Errorf("expected second page objects %v, instead got %v",
|
||||
contents[2:], out2.Contents)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjects with a delimiter should not include the .sgwtmp/ multipart prefix
|
||||
// in common prefixes, even when pending multipart uploads exist.
|
||||
func ListObjects_mp_masking_delimiter(s *S3Conf) error {
|
||||
testName := "ListObjects_mp_masking_delimiter"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads
|
||||
for i := range 2 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
_, err := putObjects(s3client, []string{"dir1/file1", "dir2/file2"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
|
||||
Bucket: &bucket,
|
||||
Delimiter: getPtr("/"),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(out.Contents) != 0 {
|
||||
return fmt.Errorf("expected empty Contents, instead got %v", out.Contents)
|
||||
}
|
||||
if !comparePrefixes([]string{"dir1/", "dir2/"}, out.CommonPrefixes) {
|
||||
return fmt.Errorf("expected common prefixes [dir1/ dir2/], instead got %v",
|
||||
sprintPrefixes(out.CommonPrefixes))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -658,3 +658,203 @@ func ListObjectsV2_invalid_parent_prefix(s *S3Conf) error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjects should not list any pending multipart uploads
|
||||
// and no pending mp should block the bucket from deletion
|
||||
func ListObjectsV2_should_not_list_pending_mps(s *S3Conf) error {
|
||||
testName := "ListObjectsV2_should_not_list_pending_mps"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
for i := range 5 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("obj-%d", i)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
res, err := s3client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(res.Contents) != 0 {
|
||||
return fmt.Errorf("expected empty object list result, instead got %v", res.Contents)
|
||||
}
|
||||
if len(res.CommonPrefixes) != 0 {
|
||||
return fmt.Errorf("expected empty object common prefixes result, instead got %v", res.CommonPrefixes)
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err = s3client.DeleteBucket(ctx, &s3.DeleteBucketInput{
|
||||
Bucket: &bucket,
|
||||
})
|
||||
cancel()
|
||||
return err
|
||||
}, withSkipTearDown())
|
||||
}
|
||||
|
||||
// ListObjectsV2 with startAfter should not surface pending multipart uploads
|
||||
// even when real objects are interleaved with the startAfter boundary.
|
||||
func ListObjectsV2_mp_masking_start_after(s *S3Conf) error {
|
||||
testName := "ListObjectsV2_mp_masking_start_after"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads with keys that sort after all real objects
|
||||
for i := range 2 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
contents, err := putObjects(s3client, []string{"alpha", "beta", "gamma"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out, err := s3client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
StartAfter: getPtr("alpha"),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Expect only beta and gamma (after startAfter "alpha"), no multipart upload objects
|
||||
if !compareObjects(contents[1:], out.Contents) {
|
||||
return fmt.Errorf("expected objects %v, instead got %v",
|
||||
contents[1:], out.Contents)
|
||||
}
|
||||
if out.IsTruncated == nil || *out.IsTruncated {
|
||||
return fmt.Errorf("expected non-truncated result")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjectsV2 truncation should count only real objects, not pending multipart uploads,
|
||||
// and the continuation token should allow correct pagination.
|
||||
func ListObjectsV2_mp_masking_truncation(s *S3Conf) error {
|
||||
testName := "ListObjectsV2_mp_masking_truncation"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads with keys that sort after real objects
|
||||
for i := range 2 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
contents, err := putObjects(s3client, []string{"obj-a", "obj-b", "obj-c", "obj-d"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxKeys := int32(2)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out1, err := s3client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
MaxKeys: &maxKeys,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out1.IsTruncated == nil || !*out1.IsTruncated {
|
||||
return fmt.Errorf("expected first page to be truncated")
|
||||
}
|
||||
if !compareObjects(contents[:2], out1.Contents) {
|
||||
return fmt.Errorf("expected first page objects %v, instead got %v",
|
||||
contents[:2], out1.Contents)
|
||||
}
|
||||
if out1.NextContinuationToken == nil || *out1.NextContinuationToken == "" {
|
||||
return fmt.Errorf("expected non-empty NextContinuationToken")
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
|
||||
out2, err := s3client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
ContinuationToken: out1.NextContinuationToken,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out2.IsTruncated == nil || *out2.IsTruncated {
|
||||
return fmt.Errorf("expected second page to not be truncated")
|
||||
}
|
||||
if !compareObjects(contents[2:], out2.Contents) {
|
||||
return fmt.Errorf("expected second page objects %v, instead got %v",
|
||||
contents[2:], out2.Contents)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjectsV2 with a delimiter should not include the .sgwtmp/ multipart prefix
|
||||
// in common prefixes, even when pending multipart uploads exist.
|
||||
func ListObjectsV2_mp_masking_delimiter(s *S3Conf) error {
|
||||
testName := "ListObjectsV2_mp_masking_delimiter"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads
|
||||
for i := range 2 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
_, err := putObjects(s3client, []string{"dir1/file1", "dir2/file2"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out, err := s3client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
Delimiter: getPtr("/"),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(out.Contents) != 0 {
|
||||
return fmt.Errorf("expected empty Contents, instead got %v", out.Contents)
|
||||
}
|
||||
if !comparePrefixes([]string{"dir1/", "dir2/"}, out.CommonPrefixes) {
|
||||
return fmt.Errorf("expected common prefixes [dir1/ dir2/], instead got %v",
|
||||
sprintPrefixes(out.CommonPrefixes))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -274,6 +274,10 @@ func TestListObjects(ts *TestState) {
|
||||
ts.Run(ListObjects_nested_dir_file_objs)
|
||||
ts.Run(ListObjects_check_owner)
|
||||
ts.Run(ListObjects_non_truncated_common_prefixes)
|
||||
ts.Run(ListObjects_should_not_list_pending_mps)
|
||||
ts.Run(ListObjects_mp_masking_with_marker)
|
||||
ts.Run(ListObjects_mp_masking_truncation)
|
||||
ts.Run(ListObjects_mp_masking_delimiter)
|
||||
//TODO: remove the condition after implementing checksums in azure
|
||||
if !ts.conf.azureTests {
|
||||
ts.Run(ListObjects_with_checksum)
|
||||
@@ -282,10 +286,7 @@ func TestListObjects(ts *TestState) {
|
||||
|
||||
func TestListObjectsV2(ts *TestState) {
|
||||
ts.Run(ListObjectsV2_start_after)
|
||||
// posix continuation token not compatible with azure
|
||||
if !ts.conf.azureTests {
|
||||
ts.Run(ListObjectsV2_both_start_after_and_continuation_token)
|
||||
}
|
||||
ts.Run(ListObjectsV2_both_start_after_and_continuation_token)
|
||||
ts.Run(ListObjectsV2_start_after_not_in_list)
|
||||
ts.Run(ListObjectsV2_start_after_empty_result)
|
||||
ts.Run(ListObjectsV2_both_delimiter_and_prefix)
|
||||
@@ -301,6 +302,10 @@ func TestListObjectsV2(ts *TestState) {
|
||||
ts.Run(ListObjectsV2_with_checksum)
|
||||
}
|
||||
ts.Run(ListObjectsV2_invalid_parent_prefix)
|
||||
ts.Run(ListObjectsV2_should_not_list_pending_mps)
|
||||
ts.Run(ListObjectsV2_mp_masking_start_after)
|
||||
ts.Run(ListObjectsV2_mp_masking_truncation)
|
||||
ts.Run(ListObjectsV2_mp_masking_delimiter)
|
||||
}
|
||||
|
||||
// VD stands for Versioning Disabled
|
||||
@@ -1433,7 +1438,16 @@ func GetIntTests() IntTests {
|
||||
"ListObjects_nested_dir_file_objs": ListObjects_nested_dir_file_objs,
|
||||
"ListObjects_check_owner": ListObjects_check_owner,
|
||||
"ListObjects_non_truncated_common_prefixes": ListObjects_non_truncated_common_prefixes,
|
||||
"ListObjects_should_not_list_pending_mps": ListObjects_should_not_list_pending_mps,
|
||||
"ListObjects_mp_masking_with_marker": ListObjects_mp_masking_with_marker,
|
||||
"ListObjects_mp_masking_truncation": ListObjects_mp_masking_truncation,
|
||||
"ListObjects_mp_masking_delimiter": ListObjects_mp_masking_delimiter,
|
||||
"ListObjectsV2_non_truncated_common_prefixes": ListObjectsV2_non_truncated_common_prefixes,
|
||||
"ListObjectsV2_invalid_parent_prefix": ListObjectsV2_invalid_parent_prefix,
|
||||
"ListObjectsV2_should_not_list_pending_mps": ListObjectsV2_should_not_list_pending_mps,
|
||||
"ListObjectsV2_mp_masking_start_after": ListObjectsV2_mp_masking_start_after,
|
||||
"ListObjectsV2_mp_masking_truncation": ListObjectsV2_mp_masking_truncation,
|
||||
"ListObjectsV2_mp_masking_delimiter": ListObjectsV2_mp_masking_delimiter,
|
||||
"ListObjects_with_checksum": ListObjects_with_checksum,
|
||||
"ListObjectsV2_start_after": ListObjectsV2_start_after,
|
||||
"ListObjectsV2_both_start_after_and_continuation_token": ListObjectsV2_both_start_after_and_continuation_token,
|
||||
|
||||
Reference in New Issue
Block a user