mirror of
https://github.com/versity/versitygw.git
synced 2026-04-22 05:30:29 +00:00
Merge pull request #2012 from versity/sis/azure-mp-blobs
fix: fix azure multipart upload objects masking
This commit is contained in:
@@ -304,15 +304,21 @@ func (az *Azure) HeadBucket(ctx context.Context, input *s3.HeadBucketInput) (*s3
|
||||
func (az *Azure) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
pager := az.client.NewListBlobsFlatPager(bucket, nil)
|
||||
|
||||
pg, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return azureErrToS3Err(err)
|
||||
for pager.More() {
|
||||
pg, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
for _, item := range pg.Segment.BlobItems {
|
||||
// ignore temp multipart objects when determining if bucket non-empty
|
||||
if !strings.HasPrefix(backend.GetStringFromPtr(item.Name), string(metaTmpMultipartPrefix)) {
|
||||
return s3err.GetAPIError(s3err.ErrBucketNotEmpty)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(pg.Segment.BlobItems) > 0 {
|
||||
return s3err.GetAPIError(s3err.ErrBucketNotEmpty)
|
||||
}
|
||||
_, err = az.client.DeleteContainer(ctx, bucket, nil)
|
||||
_, err := az.client.DeleteContainer(ctx, bucket, nil)
|
||||
return azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
@@ -659,92 +665,124 @@ func (az *Azure) ListObjects(ctx context.Context, input *s3.ListObjectsInput) (s
|
||||
maxKeys = *input.MaxKeys
|
||||
}
|
||||
|
||||
pager := client.NewListBlobsHierarchyPager(*input.Delimiter, &container.ListBlobsHierarchyOptions{
|
||||
MaxResults: &maxKeys,
|
||||
Prefix: input.Prefix,
|
||||
delimiter := backend.GetStringFromPtr(input.Delimiter)
|
||||
prefix := backend.GetStringFromPtr(input.Prefix)
|
||||
effectiveMarker := backend.GetStringFromPtr(input.Marker)
|
||||
|
||||
if maxKeys == 0 {
|
||||
isFalse := false
|
||||
return s3response.ListObjectsResult{
|
||||
IsTruncated: &isFalse,
|
||||
MaxKeys: &maxKeys,
|
||||
Name: input.Bucket,
|
||||
Prefix: backend.GetPtrFromString(prefix),
|
||||
Marker: backend.GetPtrFromString(effectiveMarker),
|
||||
Delimiter: backend.GetPtrFromString(delimiter),
|
||||
CommonPrefixes: []types.CommonPrefix{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Use flat listing (empty delimiter) and handle delimiter logic client-side,
|
||||
// matching S3 semantics. Only pass Prefix and Marker to Azure.
|
||||
pager := client.NewListBlobsHierarchyPager("", &container.ListBlobsHierarchyOptions{
|
||||
Prefix: input.Prefix,
|
||||
Marker: input.Marker,
|
||||
})
|
||||
|
||||
var objects []s3response.Object
|
||||
var cPrefixes []types.CommonPrefix
|
||||
var nextMarker *string
|
||||
var isTruncated bool
|
||||
cpSet := make(map[string]struct{})
|
||||
var pastMax, isTruncated bool
|
||||
var candidateMarker string
|
||||
var totalFound int32
|
||||
|
||||
// Convert marker to filter criteria
|
||||
var markerFilter string
|
||||
if input.Marker != nil && *input.Marker != "" {
|
||||
markerFilter = *input.Marker
|
||||
}
|
||||
|
||||
// Loop through pages until we have enough objects or no more pages
|
||||
objectsFound := int32(0)
|
||||
for pager.More() && objectsFound < maxKeys {
|
||||
loop:
|
||||
for pager.More() {
|
||||
resp, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return s3response.ListObjectsResult{}, azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
// Process objects from this page
|
||||
var pageObjects []s3response.Object
|
||||
for _, v := range resp.Segment.BlobItems {
|
||||
// Skip objects that come before or equal to marker
|
||||
if markerFilter != "" && *v.Name <= markerFilter {
|
||||
name := backend.GetStringFromPtr(v.Name)
|
||||
|
||||
// Filter out multipart upload blobs
|
||||
if strings.HasPrefix(name, string(metaTmpMultipartPrefix)) {
|
||||
continue
|
||||
}
|
||||
|
||||
pageObjects = append(pageObjects, s3response.Object{
|
||||
ETag: backend.GetPtrFromString(convertAzureEtag(v.Properties.ETag)),
|
||||
Key: v.Name,
|
||||
LastModified: v.Properties.LastModified,
|
||||
Size: v.Properties.ContentLength,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
Owner: &types.Owner{
|
||||
ID: &acl.Owner,
|
||||
},
|
||||
})
|
||||
|
||||
objectsFound++
|
||||
if objectsFound >= maxKeys {
|
||||
// Set next marker to the current object name for pagination
|
||||
nextMarker = v.Name
|
||||
isTruncated = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
objects = append(objects, pageObjects...)
|
||||
|
||||
// Process common prefixes from this page
|
||||
for _, v := range resp.Segment.BlobPrefixes {
|
||||
// Skip prefixes that come before or equal to marker
|
||||
if markerFilter != "" && *v.Name <= markerFilter {
|
||||
continue
|
||||
// Apply delimiter logic to determine if this blob contributes to
|
||||
// a common prefix or is a regular object
|
||||
isCP := false
|
||||
cpKey := ""
|
||||
if delimiter != "" {
|
||||
suffix := strings.TrimPrefix(name, prefix)
|
||||
before, _, found := strings.Cut(suffix, delimiter)
|
||||
if found {
|
||||
isCP = true
|
||||
cpKey = prefix + before + delimiter
|
||||
}
|
||||
}
|
||||
|
||||
cPrefixes = append(cPrefixes, types.CommonPrefix{
|
||||
Prefix: v.Name,
|
||||
})
|
||||
if isCP {
|
||||
// Skip common prefixes at or before the marker
|
||||
if cpKey <= effectiveMarker {
|
||||
continue
|
||||
}
|
||||
// Deduplicate: multiple blobs can map to the same common prefix
|
||||
if _, exists := cpSet[cpKey]; exists {
|
||||
continue
|
||||
}
|
||||
// If we already reached maxKeys, this new unique CP means truncation
|
||||
if pastMax {
|
||||
isTruncated = true
|
||||
break loop
|
||||
}
|
||||
cp := cpKey
|
||||
cPrefixes = append(cPrefixes, types.CommonPrefix{Prefix: &cp})
|
||||
cpSet[cpKey] = struct{}{}
|
||||
candidateMarker = cpKey
|
||||
totalFound++
|
||||
if totalFound == maxKeys {
|
||||
pastMax = true
|
||||
}
|
||||
} else {
|
||||
if pastMax {
|
||||
isTruncated = true
|
||||
break loop
|
||||
}
|
||||
objects = append(objects, s3response.Object{
|
||||
ETag: backend.GetPtrFromString(convertAzureEtag(v.Properties.ETag)),
|
||||
Key: v.Name,
|
||||
LastModified: v.Properties.LastModified,
|
||||
Size: v.Properties.ContentLength,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
Owner: &types.Owner{
|
||||
ID: &acl.Owner,
|
||||
},
|
||||
})
|
||||
candidateMarker = name
|
||||
totalFound++
|
||||
if totalFound == maxKeys {
|
||||
pastMax = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we've reached maxKeys, break
|
||||
if objectsFound >= maxKeys {
|
||||
break
|
||||
}
|
||||
|
||||
// If Azure indicates more pages but we need to continue for more objects
|
||||
if resp.NextMarker != nil && *resp.NextMarker != "" && objectsFound < maxKeys {
|
||||
continue
|
||||
}
|
||||
if !isTruncated {
|
||||
candidateMarker = ""
|
||||
}
|
||||
|
||||
return s3response.ListObjectsResult{
|
||||
Contents: objects,
|
||||
Marker: backend.GetPtrFromString(*input.Marker),
|
||||
MaxKeys: input.MaxKeys,
|
||||
Marker: backend.GetPtrFromString(effectiveMarker),
|
||||
MaxKeys: &maxKeys,
|
||||
Name: input.Bucket,
|
||||
NextMarker: nextMarker,
|
||||
Prefix: backend.GetPtrFromString(*input.Prefix),
|
||||
NextMarker: backend.GetPtrFromString(candidateMarker),
|
||||
Prefix: backend.GetPtrFromString(prefix),
|
||||
IsTruncated: &isTruncated,
|
||||
Delimiter: backend.GetPtrFromString(*input.Delimiter),
|
||||
Delimiter: backend.GetPtrFromString(delimiter),
|
||||
CommonPrefixes: cPrefixes,
|
||||
}, nil
|
||||
}
|
||||
@@ -772,95 +810,139 @@ func (az *Azure) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input
|
||||
maxKeys = *input.MaxKeys
|
||||
}
|
||||
|
||||
pager := client.NewListBlobsHierarchyPager(*input.Delimiter, &container.ListBlobsHierarchyOptions{
|
||||
Marker: input.ContinuationToken,
|
||||
MaxResults: &maxKeys,
|
||||
Prefix: input.Prefix,
|
||||
delimiter := backend.GetStringFromPtr(input.Delimiter)
|
||||
prefix := backend.GetStringFromPtr(input.Prefix)
|
||||
startAfterVal := backend.GetStringFromPtr(input.StartAfter)
|
||||
continuationTokenVal := backend.GetStringFromPtr(input.ContinuationToken)
|
||||
|
||||
// Take the lexicographically larger of startAfter and continuationToken so
|
||||
// listing starts strictly after both constraints.
|
||||
effectiveMarker := startAfterVal
|
||||
if continuationTokenVal > effectiveMarker {
|
||||
effectiveMarker = continuationTokenVal
|
||||
}
|
||||
|
||||
if maxKeys == 0 {
|
||||
isFalse := false
|
||||
return s3response.ListObjectsV2Result{
|
||||
IsTruncated: &isFalse,
|
||||
MaxKeys: &maxKeys,
|
||||
Name: input.Bucket,
|
||||
Prefix: backend.GetPtrFromString(prefix),
|
||||
ContinuationToken: backend.GetPtrFromString(continuationTokenVal),
|
||||
Delimiter: backend.GetPtrFromString(delimiter),
|
||||
StartAfter: backend.GetPtrFromString(startAfterVal),
|
||||
CommonPrefixes: []types.CommonPrefix{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Use flat listing (empty delimiter) and handle delimiter logic client-side,
|
||||
// matching S3 semantics. Only pass Prefix and Marker to Azure.
|
||||
// effectiveMarker is passed as Marker so Azure skips blobs before it.
|
||||
pager := client.NewListBlobsHierarchyPager("", &container.ListBlobsHierarchyOptions{
|
||||
Prefix: input.Prefix,
|
||||
Marker: backend.GetPtrFromString(effectiveMarker),
|
||||
})
|
||||
|
||||
var objects []s3response.Object
|
||||
var resp container.ListBlobsHierarchyResponse
|
||||
var cPrefixes []types.CommonPrefix
|
||||
cpSet := make(map[string]struct{})
|
||||
var pastMax, isTruncated bool
|
||||
var candidateMarker string
|
||||
var totalFound int32
|
||||
|
||||
// Loop through pages until we find objects or no more pages
|
||||
for {
|
||||
resp, err = pager.NextPage(ctx)
|
||||
loop:
|
||||
for pager.More() {
|
||||
resp, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return s3response.ListObjectsV2Result{}, azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
// Convert Azure objects to S3 objects
|
||||
var pageObjects []s3response.Object
|
||||
for _, v := range resp.Segment.BlobItems {
|
||||
pageObjects = append(pageObjects, s3response.Object{
|
||||
ETag: backend.GetPtrFromString(convertAzureEtag(v.Properties.ETag)),
|
||||
Key: v.Name,
|
||||
LastModified: v.Properties.LastModified,
|
||||
Size: v.Properties.ContentLength,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
Owner: &types.Owner{
|
||||
ID: &acl.Owner,
|
||||
},
|
||||
})
|
||||
}
|
||||
name := backend.GetStringFromPtr(v.Name)
|
||||
|
||||
// If StartAfter is specified, filter objects
|
||||
if input.StartAfter != nil && *input.StartAfter != "" {
|
||||
startAfter := *input.StartAfter
|
||||
startIndex := -1
|
||||
for i, obj := range pageObjects {
|
||||
if *obj.Key > startAfter {
|
||||
startIndex = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if startIndex != -1 {
|
||||
// Found objects after StartAfter in this page
|
||||
objects = append(objects, pageObjects[startIndex:]...)
|
||||
break
|
||||
} else {
|
||||
// No objects after StartAfter in this page
|
||||
// Check if there are more pages to examine
|
||||
if resp.NextMarker == nil || *resp.NextMarker == "" {
|
||||
// No more pages, so no objects after StartAfter
|
||||
break
|
||||
}
|
||||
// Continue to next page without adding any objects
|
||||
// Filter out multipart upload blobs
|
||||
if strings.HasPrefix(name, string(metaTmpMultipartPrefix)) {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// No StartAfter specified, add all objects from this page
|
||||
objects = append(objects, pageObjects...)
|
||||
break
|
||||
|
||||
// Apply delimiter logic to determine if this blob contributes to
|
||||
// a common prefix or is a regular object
|
||||
isCP := false
|
||||
cpKey := ""
|
||||
if delimiter != "" {
|
||||
suffix := strings.TrimPrefix(name, prefix)
|
||||
before, _, found := strings.Cut(suffix, delimiter)
|
||||
if found {
|
||||
isCP = true
|
||||
cpKey = prefix + before + delimiter
|
||||
}
|
||||
}
|
||||
|
||||
if isCP {
|
||||
// Skip common prefixes at or before the effective marker
|
||||
if cpKey <= effectiveMarker {
|
||||
continue
|
||||
}
|
||||
// Deduplicate: multiple blobs can map to the same common prefix
|
||||
if _, exists := cpSet[cpKey]; exists {
|
||||
continue
|
||||
}
|
||||
// If we already reached maxKeys, this new unique CP means truncation
|
||||
if pastMax {
|
||||
isTruncated = true
|
||||
break loop
|
||||
}
|
||||
cp := cpKey
|
||||
cPrefixes = append(cPrefixes, types.CommonPrefix{Prefix: &cp})
|
||||
cpSet[cpKey] = struct{}{}
|
||||
candidateMarker = cpKey
|
||||
totalFound++
|
||||
if totalFound == maxKeys {
|
||||
pastMax = true
|
||||
}
|
||||
} else {
|
||||
if pastMax {
|
||||
isTruncated = true
|
||||
break loop
|
||||
}
|
||||
objects = append(objects, s3response.Object{
|
||||
ETag: backend.GetPtrFromString(convertAzureEtag(v.Properties.ETag)),
|
||||
Key: v.Name,
|
||||
LastModified: v.Properties.LastModified,
|
||||
Size: v.Properties.ContentLength,
|
||||
StorageClass: types.ObjectStorageClassStandard,
|
||||
Owner: &types.Owner{
|
||||
ID: &acl.Owner,
|
||||
},
|
||||
})
|
||||
candidateMarker = name
|
||||
totalFound++
|
||||
if totalFound == maxKeys {
|
||||
pastMax = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var cPrefixes []types.CommonPrefix
|
||||
for _, v := range resp.Segment.BlobPrefixes {
|
||||
cPrefixes = append(cPrefixes, types.CommonPrefix{
|
||||
Prefix: v.Name,
|
||||
})
|
||||
if !isTruncated {
|
||||
candidateMarker = ""
|
||||
}
|
||||
|
||||
var isTruncated bool
|
||||
var nextMarker *string
|
||||
// If Azure returned a NextMarker, set it for the next request
|
||||
if resp.NextMarker != nil && *resp.NextMarker != "" {
|
||||
nextMarker = resp.NextMarker
|
||||
isTruncated = true
|
||||
}
|
||||
keyCount := int32(len(objects) + len(cPrefixes))
|
||||
|
||||
return s3response.ListObjectsV2Result{
|
||||
Contents: objects,
|
||||
ContinuationToken: backend.GetPtrFromString(*input.ContinuationToken),
|
||||
MaxKeys: input.MaxKeys,
|
||||
ContinuationToken: backend.GetPtrFromString(continuationTokenVal),
|
||||
KeyCount: &keyCount,
|
||||
MaxKeys: &maxKeys,
|
||||
Name: input.Bucket,
|
||||
NextContinuationToken: nextMarker,
|
||||
Prefix: backend.GetPtrFromString(*input.Prefix),
|
||||
NextContinuationToken: backend.GetPtrFromString(candidateMarker),
|
||||
Prefix: backend.GetPtrFromString(prefix),
|
||||
IsTruncated: &isTruncated,
|
||||
Delimiter: backend.GetPtrFromString(*input.Delimiter),
|
||||
Delimiter: backend.GetPtrFromString(delimiter),
|
||||
CommonPrefixes: cPrefixes,
|
||||
StartAfter: backend.GetPtrFromString(*input.StartAfter),
|
||||
StartAfter: backend.GetPtrFromString(startAfterVal),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -585,3 +585,205 @@ func ListObjects_non_truncated_common_prefixes(s *S3Conf) error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjects should not list any pending multipart uploads
|
||||
// and no pending mp should block the bucket from deletion
|
||||
func ListObjects_should_not_list_pending_mps(s *S3Conf) error {
|
||||
testName := "ListObjects_should_not_list_pending_mps"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
for i := range 5 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("obj-%d", i)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
res, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
|
||||
Bucket: &bucket,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(res.Contents) != 0 {
|
||||
return fmt.Errorf("expected empty object list result, instead got %v", res.Contents)
|
||||
}
|
||||
if len(res.CommonPrefixes) != 0 {
|
||||
return fmt.Errorf("expected empty object common prefixes result, instead got %v", res.CommonPrefixes)
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err = s3client.DeleteBucket(ctx, &s3.DeleteBucketInput{
|
||||
Bucket: &bucket,
|
||||
})
|
||||
cancel()
|
||||
return err
|
||||
}, withSkipTearDown())
|
||||
}
|
||||
|
||||
// ListObjects with a marker should not surface pending multipart uploads
|
||||
// even when real objects are interleaved with the marker boundary.
|
||||
func ListObjects_mp_masking_with_marker(s *S3Conf) error {
|
||||
testName := "ListObjects_mp_masking_with_marker"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads with keys that sort after all real objects
|
||||
for i := range 3 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
contents, err := putObjects(s3client, []string{"aaa", "bbb", "ccc"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
|
||||
Bucket: &bucket,
|
||||
Marker: getPtr("aaa"),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Expect only bbb and ccc (after marker "aaa"), no multipart upload objects
|
||||
if !compareObjects(contents[1:], out.Contents) {
|
||||
return fmt.Errorf("expected objects %v, instead got %v",
|
||||
contents[1:], out.Contents)
|
||||
}
|
||||
if out.IsTruncated == nil || *out.IsTruncated {
|
||||
return fmt.Errorf("expected non-truncated result")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjects truncation should count only real objects, not pending multipart uploads.
|
||||
func ListObjects_mp_masking_truncation(s *S3Conf) error {
|
||||
testName := "ListObjects_mp_masking_truncation"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads with keys that sort after real objects
|
||||
for i := range 2 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
contents, err := putObjects(s3client, []string{"obj-a", "obj-b", "obj-c", "obj-d"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxKeys := int32(2)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out1, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
|
||||
Bucket: &bucket,
|
||||
MaxKeys: &maxKeys,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out1.IsTruncated == nil || !*out1.IsTruncated {
|
||||
return fmt.Errorf("expected first page to be truncated")
|
||||
}
|
||||
if !compareObjects(contents[:2], out1.Contents) {
|
||||
return fmt.Errorf("expected first page objects %v, instead got %v",
|
||||
contents[:2], out1.Contents)
|
||||
}
|
||||
if out1.NextMarker == nil || *out1.NextMarker == "" {
|
||||
return fmt.Errorf("expected non-empty NextMarker")
|
||||
}
|
||||
if *out1.NextMarker != "obj-b" {
|
||||
return fmt.Errorf("expected NextMarker to be obj-b, instead got %v", *out1.NextMarker)
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
|
||||
out2, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
|
||||
Bucket: &bucket,
|
||||
Marker: out1.NextMarker,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out2.IsTruncated == nil || *out2.IsTruncated {
|
||||
return fmt.Errorf("expected second page to not be truncated")
|
||||
}
|
||||
if !compareObjects(contents[2:], out2.Contents) {
|
||||
return fmt.Errorf("expected second page objects %v, instead got %v",
|
||||
contents[2:], out2.Contents)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjects with a delimiter should not include the .sgwtmp/ multipart prefix
|
||||
// in common prefixes, even when pending multipart uploads exist.
|
||||
func ListObjects_mp_masking_delimiter(s *S3Conf) error {
|
||||
testName := "ListObjects_mp_masking_delimiter"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads
|
||||
for i := range 2 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
_, err := putObjects(s3client, []string{"dir1/file1", "dir2/file2"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out, err := s3client.ListObjects(ctx, &s3.ListObjectsInput{
|
||||
Bucket: &bucket,
|
||||
Delimiter: getPtr("/"),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(out.Contents) != 0 {
|
||||
return fmt.Errorf("expected empty Contents, instead got %v", out.Contents)
|
||||
}
|
||||
if !comparePrefixes([]string{"dir1/", "dir2/"}, out.CommonPrefixes) {
|
||||
return fmt.Errorf("expected common prefixes [dir1/ dir2/], instead got %v",
|
||||
sprintPrefixes(out.CommonPrefixes))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -658,3 +658,203 @@ func ListObjectsV2_invalid_parent_prefix(s *S3Conf) error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjects should not list any pending multipart uploads
|
||||
// and no pending mp should block the bucket from deletion
|
||||
func ListObjectsV2_should_not_list_pending_mps(s *S3Conf) error {
|
||||
testName := "ListObjectsV2_should_not_list_pending_mps"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
for i := range 5 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("obj-%d", i)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
res, err := s3client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(res.Contents) != 0 {
|
||||
return fmt.Errorf("expected empty object list result, instead got %v", res.Contents)
|
||||
}
|
||||
if len(res.CommonPrefixes) != 0 {
|
||||
return fmt.Errorf("expected empty object common prefixes result, instead got %v", res.CommonPrefixes)
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err = s3client.DeleteBucket(ctx, &s3.DeleteBucketInput{
|
||||
Bucket: &bucket,
|
||||
})
|
||||
cancel()
|
||||
return err
|
||||
}, withSkipTearDown())
|
||||
}
|
||||
|
||||
// ListObjectsV2 with startAfter should not surface pending multipart uploads
|
||||
// even when real objects are interleaved with the startAfter boundary.
|
||||
func ListObjectsV2_mp_masking_start_after(s *S3Conf) error {
|
||||
testName := "ListObjectsV2_mp_masking_start_after"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads with keys that sort after all real objects
|
||||
for i := range 2 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
contents, err := putObjects(s3client, []string{"alpha", "beta", "gamma"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out, err := s3client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
StartAfter: getPtr("alpha"),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Expect only beta and gamma (after startAfter "alpha"), no multipart upload objects
|
||||
if !compareObjects(contents[1:], out.Contents) {
|
||||
return fmt.Errorf("expected objects %v, instead got %v",
|
||||
contents[1:], out.Contents)
|
||||
}
|
||||
if out.IsTruncated == nil || *out.IsTruncated {
|
||||
return fmt.Errorf("expected non-truncated result")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjectsV2 truncation should count only real objects, not pending multipart uploads,
|
||||
// and the continuation token should allow correct pagination.
|
||||
func ListObjectsV2_mp_masking_truncation(s *S3Conf) error {
|
||||
testName := "ListObjectsV2_mp_masking_truncation"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads with keys that sort after real objects
|
||||
for i := range 2 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
contents, err := putObjects(s3client, []string{"obj-a", "obj-b", "obj-c", "obj-d"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxKeys := int32(2)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out1, err := s3client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
MaxKeys: &maxKeys,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out1.IsTruncated == nil || !*out1.IsTruncated {
|
||||
return fmt.Errorf("expected first page to be truncated")
|
||||
}
|
||||
if !compareObjects(contents[:2], out1.Contents) {
|
||||
return fmt.Errorf("expected first page objects %v, instead got %v",
|
||||
contents[:2], out1.Contents)
|
||||
}
|
||||
if out1.NextContinuationToken == nil || *out1.NextContinuationToken == "" {
|
||||
return fmt.Errorf("expected non-empty NextContinuationToken")
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), shortTimeout)
|
||||
out2, err := s3client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
ContinuationToken: out1.NextContinuationToken,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if out2.IsTruncated == nil || *out2.IsTruncated {
|
||||
return fmt.Errorf("expected second page to not be truncated")
|
||||
}
|
||||
if !compareObjects(contents[2:], out2.Contents) {
|
||||
return fmt.Errorf("expected second page objects %v, instead got %v",
|
||||
contents[2:], out2.Contents)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// ListObjectsV2 with a delimiter should not include the .sgwtmp/ multipart prefix
|
||||
// in common prefixes, even when pending multipart uploads exist.
|
||||
func ListObjectsV2_mp_masking_delimiter(s *S3Conf) error {
|
||||
testName := "ListObjectsV2_mp_masking_delimiter"
|
||||
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
|
||||
// Create pending multipart uploads
|
||||
for i := range 2 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
_, err := s3client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: &bucket,
|
||||
Key: getPtr(fmt.Sprintf("zzz-mp-%d", i+1)),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
_, err := putObjects(s3client, []string{"dir1/file1", "dir2/file2"}, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
|
||||
out, err := s3client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: &bucket,
|
||||
Delimiter: getPtr("/"),
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(out.Contents) != 0 {
|
||||
return fmt.Errorf("expected empty Contents, instead got %v", out.Contents)
|
||||
}
|
||||
if !comparePrefixes([]string{"dir1/", "dir2/"}, out.CommonPrefixes) {
|
||||
return fmt.Errorf("expected common prefixes [dir1/ dir2/], instead got %v",
|
||||
sprintPrefixes(out.CommonPrefixes))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -274,6 +274,10 @@ func TestListObjects(ts *TestState) {
|
||||
ts.Run(ListObjects_nested_dir_file_objs)
|
||||
ts.Run(ListObjects_check_owner)
|
||||
ts.Run(ListObjects_non_truncated_common_prefixes)
|
||||
ts.Run(ListObjects_should_not_list_pending_mps)
|
||||
ts.Run(ListObjects_mp_masking_with_marker)
|
||||
ts.Run(ListObjects_mp_masking_truncation)
|
||||
ts.Run(ListObjects_mp_masking_delimiter)
|
||||
//TODO: remove the condition after implementing checksums in azure
|
||||
if !ts.conf.azureTests {
|
||||
ts.Run(ListObjects_with_checksum)
|
||||
@@ -282,10 +286,7 @@ func TestListObjects(ts *TestState) {
|
||||
|
||||
func TestListObjectsV2(ts *TestState) {
|
||||
ts.Run(ListObjectsV2_start_after)
|
||||
// posix continuation token not compatible with azure
|
||||
if !ts.conf.azureTests {
|
||||
ts.Run(ListObjectsV2_both_start_after_and_continuation_token)
|
||||
}
|
||||
ts.Run(ListObjectsV2_both_start_after_and_continuation_token)
|
||||
ts.Run(ListObjectsV2_start_after_not_in_list)
|
||||
ts.Run(ListObjectsV2_start_after_empty_result)
|
||||
ts.Run(ListObjectsV2_both_delimiter_and_prefix)
|
||||
@@ -301,6 +302,10 @@ func TestListObjectsV2(ts *TestState) {
|
||||
ts.Run(ListObjectsV2_with_checksum)
|
||||
}
|
||||
ts.Run(ListObjectsV2_invalid_parent_prefix)
|
||||
ts.Run(ListObjectsV2_should_not_list_pending_mps)
|
||||
ts.Run(ListObjectsV2_mp_masking_start_after)
|
||||
ts.Run(ListObjectsV2_mp_masking_truncation)
|
||||
ts.Run(ListObjectsV2_mp_masking_delimiter)
|
||||
}
|
||||
|
||||
// VD stands for Versioning Disabled
|
||||
@@ -1433,7 +1438,16 @@ func GetIntTests() IntTests {
|
||||
"ListObjects_nested_dir_file_objs": ListObjects_nested_dir_file_objs,
|
||||
"ListObjects_check_owner": ListObjects_check_owner,
|
||||
"ListObjects_non_truncated_common_prefixes": ListObjects_non_truncated_common_prefixes,
|
||||
"ListObjects_should_not_list_pending_mps": ListObjects_should_not_list_pending_mps,
|
||||
"ListObjects_mp_masking_with_marker": ListObjects_mp_masking_with_marker,
|
||||
"ListObjects_mp_masking_truncation": ListObjects_mp_masking_truncation,
|
||||
"ListObjects_mp_masking_delimiter": ListObjects_mp_masking_delimiter,
|
||||
"ListObjectsV2_non_truncated_common_prefixes": ListObjectsV2_non_truncated_common_prefixes,
|
||||
"ListObjectsV2_invalid_parent_prefix": ListObjectsV2_invalid_parent_prefix,
|
||||
"ListObjectsV2_should_not_list_pending_mps": ListObjectsV2_should_not_list_pending_mps,
|
||||
"ListObjectsV2_mp_masking_start_after": ListObjectsV2_mp_masking_start_after,
|
||||
"ListObjectsV2_mp_masking_truncation": ListObjectsV2_mp_masking_truncation,
|
||||
"ListObjectsV2_mp_masking_delimiter": ListObjectsV2_mp_masking_delimiter,
|
||||
"ListObjects_with_checksum": ListObjects_with_checksum,
|
||||
"ListObjectsV2_start_after": ListObjectsV2_start_after,
|
||||
"ListObjectsV2_both_start_after_and_continuation_token": ListObjectsV2_both_start_after_and_continuation_token,
|
||||
|
||||
Reference in New Issue
Block a user